Compare commits

..

10 Commits

Author SHA1 Message Date
ccb2add607
fix: stream kind 2025-01-30 13:19:26 +00:00
32f342efde
fix: stream kind 2025-01-30 13:19:02 +00:00
3b151563a7
feat: pass deleted segments list to overseer 2025-01-30 13:10:11 +00:00
6d1edb1b21
fix: docker build 2025-01-30 12:29:10 +00:00
70b8cd3c55
fix: docker build 2025-01-30 12:15:07 +00:00
e11d7dc787
chore: cleanup 2025-01-30 11:45:05 +00:00
9045bb93e4
refactor: convert to workspace 2025-01-29 11:48:57 +00:00
20c9d107b7
feat: setup api 2025-01-13 14:51:39 +00:00
0202a7da5f
feat: overseer API setup 2024-12-09 14:31:26 +00:00
f38f436b6c
feat: stream costs 2024-12-09 11:36:05 +00:00
59 changed files with 6961 additions and 1378 deletions

View File

@ -1,3 +1,3 @@
target/
.git/
out/
**/target/
**/.git/
**/out/

View File

@ -19,6 +19,6 @@ steps:
- docker login -u kieran -p $TOKEN git.v0l.io
- docker login -u voidic -p $DOCKER_TOKEN
- docker buildx create --name mybuilder --bootstrap --use
- docker buildx build --push --platform linux/amd64 -t git.v0l.io/kieran/zap-stream-core:latest -t voidic/zap-stream-core:latest .
- docker buildx build --push --platform linux/amd64 -t git.v0l.io/kieran/zap-stream-core:latest -t voidic/zap-stream-core:latest -f ./crates/zap-stream/Dockerfile .
- kill $(cat /var/run/docker.pid)

1690
Cargo.lock generated Executable file → Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,77 +1,20 @@
[package]
name = "zap-stream-core"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "zap-stream-core"
path = "src/bin/zap_stream_core.rs"
[features]
default = ["test-pattern", "srt", "rtmp"]
srt = ["dep:srt-tokio"]
rtmp = ["dep:rml_rtmp"]
local-overseer = [] # WIP
webhook-overseer = [] # WIP
zap-stream = [
"dep:nostr-sdk",
"dep:zap-stream-db",
"dep:fedimint-tonic-lnd",
"dep:reqwest",
"dep:base64",
"dep:sha2",
"tokio/fs",
]
test-pattern = [
"dep:resvg",
"dep:usvg",
"dep:tiny-skia",
"dep:fontdue",
"dep:ringbuf",
"zap-stream-db/test-pattern"
[workspace]
resolver = "2"
members = [
"crates/core",
"crates/zap-stream",
"crates/zap-stream-db"
]
[dependencies]
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" }
[workspace.dependencies]
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" }
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
anyhow = { version = "^1.0.91", features = ["backtrace"] }
pretty_env_logger = "0.5.0"
tokio-stream = "0.1.14"
futures-util = "0.3.30"
async-trait = "0.1.77"
log = "0.4.21"
uuid = { version = "1.8.0", features = ["v4", "serde"] }
serde = { version = "1.0.197", features = ["derive"] }
config = { version = "0.14.0", features = ["yaml"] }
url = "2.5.0"
itertools = "0.13.0"
rand = "0.8.5"
clap = { version = "4.5.16", features = ["derive"] }
warp = "0.3.7"
libc = "0.2.162"
m3u8-rs = "6.0.0"
itertools = "0.14.0"
chrono = "^0.4.38"
hex = "0.4.3"
# srt
srt-tokio = { version = "0.4.3", optional = true }
# rtmp
rml_rtmp = { version = "0.8.0", optional = true }
# test-pattern
resvg = { version = "0.44.0", optional = true }
usvg = { version = "0.44.0", optional = true }
tiny-skia = { version = "0.11.4", optional = true }
fontdue = { version = "0.9.2", optional = true }
ringbuf = { version = "0.4.7", optional = true }
# zap-stream
zap-stream-db = { path = "zap-stream-db", optional = true }
nostr-sdk = { version = "0.36.0", optional = true }
fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = false, features = ["invoicesrpc", "versionrpc"] }
reqwest = { version = "0.12.9", optional = true, features = ["stream"] }
base64 = { version = "0.22.1", optional = true }
sha2 = { version = "0.10.8", optional = true }
bytes = "1.8.0"

View File

@ -1,5 +1,4 @@
- RTMP?
- Setup multi-variant output
- API parity
- fMP4 instead of MPEG-TS segments
- API parity https://git.v0l.io/Kieran/zap.stream/issues/7
- HLS-LL
- Delete old segments (N94)

4818
crates/core/Cargo.lock generated Executable file

File diff suppressed because it is too large Load Diff

44
crates/core/Cargo.toml Normal file
View File

@ -0,0 +1,44 @@
[package]
name = "zap-stream-core"
version = "0.1.0"
edition = "2021"
[features]
default = ["test-pattern", "srt", "rtmp"]
srt = ["dep:srt-tokio"]
rtmp = ["dep:rml_rtmp"]
local-overseer = [] # WIP
webhook-overseer = [] # WIP
test-pattern = [
"dep:resvg",
"dep:usvg",
"dep:tiny-skia",
"dep:fontdue",
"dep:ringbuf",
]
[dependencies]
ffmpeg-rs-raw.workspace = true
tokio.workspace = true
anyhow.workspace = true
async-trait.workspace = true
log.workspace = true
uuid.workspace = true
serde.workspace = true
hex.workspace = true
itertools.workspace = true
futures-util = "0.3.30"
m3u8-rs = "6.0.0"
# srt
srt-tokio = { version = "0.4.3", optional = true }
# rtmp
rml_rtmp = { version = "0.8.0", optional = true }
# test-pattern
resvg = { version = "0.44.0", optional = true }
usvg = { version = "0.44.0", optional = true }
tiny-skia = { version = "0.11.4", optional = true }
fontdue = { version = "0.9.2", optional = true }
ringbuf = { version = "0.4.7", optional = true }

View File

@ -47,7 +47,7 @@ relay {
port = 7777
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
nofiles = 1000000
nofiles = 0
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
realIpHeader = ""

View File

@ -14,11 +14,7 @@ impl Egress for HlsMuxer {
packet: *mut AVPacket,
variant: &Uuid,
) -> Result<EgressResult> {
if let Some(ns) = self.mux_packet(packet, variant)? {
Ok(EgressResult::NewSegment(ns))
} else {
Ok(EgressResult::None)
}
self.mux_packet(packet, variant)
}
unsafe fn reset(&mut self) -> Result<()> {

View File

@ -25,13 +25,16 @@ pub trait Egress {
pub enum EgressResult {
/// Nothing to report
None,
/// A new segment was created
NewSegment(NewSegment),
/// Egress created/deleted some segments
Segments {
created: Vec<EgressSegment>,
deleted: Vec<EgressSegment>,
},
}
/// Basic details of new segment created by a muxer
#[derive(Debug, Clone)]
pub struct NewSegment {
pub struct EgressSegment {
/// The id of the variant (video or audio)
pub variant: Uuid,
/// Segment index

View File

@ -65,6 +65,6 @@ impl Egress for RecorderEgress {
}
unsafe fn reset(&mut self) -> Result<()> {
self.muxer.reset()
self.muxer.close()
}
}

View File

@ -1,9 +1,6 @@
#[cfg(feature = "zap-stream")]
pub mod blossom;
pub mod egress;
pub mod ingress;
pub mod mux;
pub mod overseer;
pub mod pipeline;
pub mod settings;
pub mod variant;

View File

@ -1,4 +1,4 @@
use crate::egress::NewSegment;
use crate::egress::{EgressResult, EgressSegment};
use crate::variant::{StreamMapping, VariantStream};
use anyhow::{bail, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
@ -79,6 +79,8 @@ pub struct HlsVariant {
pub streams: Vec<HlsVariantStream>,
/// Segment length in seconds
pub segment_length: f32,
/// Total number of segments to store for this variant
pub segment_window: Option<u16>,
/// Current segment index
pub idx: u64,
/// Current segment start time in seconds (duration)
@ -91,20 +93,24 @@ pub struct HlsVariant {
pub segment_type: SegmentType,
}
struct SegmentInfo(u64, f32, SegmentType);
struct SegmentInfo {
pub index: u64,
pub duration: f32,
pub kind: SegmentType,
}
impl SegmentInfo {
fn to_media_segment(&self) -> MediaSegment {
MediaSegment {
uri: self.filename(),
duration: self.1,
duration: self.duration,
title: None,
..MediaSegment::default()
}
}
fn filename(&self) -> String {
HlsVariant::segment_name(self.2, self.0)
HlsVariant::segment_name(self.kind, self.index)
}
}
@ -175,11 +181,16 @@ impl HlsVariant {
Ok(Self {
name: name.clone(),
segment_length,
segment_window: Some(10), //TODO: configure window
mux,
streams,
idx: 1,
pkt_start: 0.0,
segments: Vec::from([SegmentInfo(1, segment_length, segment_type)]),
segments: Vec::from([SegmentInfo {
index: 1,
duration: segment_length,
kind: segment_type,
}]),
out_dir: out_dir.to_string(),
segment_type,
})
@ -205,31 +216,32 @@ impl HlsVariant {
}
/// Mux a packet created by the encoder for this variant
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<Option<NewSegment>> {
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
let pkt_q = av_q2d((*pkt).time_base);
// time of this packet in seconds
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
// what segment this pkt should be in (index)
let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64;
let mut result = None;
let mut result = EgressResult::None;
let pkt_stream = *(*self.mux.context())
.streams
.add((*pkt).stream_index as usize);
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
if pkt_seg != self.idx && can_split {
result = Some(self.split_next_seg(pkt_time)?);
result = self.split_next_seg(pkt_time)?;
}
self.mux.write_packet(pkt)?;
Ok(result)
}
pub unsafe fn reset(&mut self) -> Result<()> {
self.mux.reset()
self.mux.close()
}
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<NewSegment> {
/// Reset the muxer state and start the next segment
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<EgressResult> {
self.idx += 1;
// Manually reset muxer avio
@ -257,19 +269,40 @@ impl HlsVariant {
let duration = pkt_time - self.pkt_start;
info!("Writing segment {} [{}s]", &next_seg_url, duration);
if let Err(e) = self.add_segment(self.idx, duration) {
if let Err(e) = self.push_segment(self.idx, duration) {
warn!("Failed to update playlist: {}", e);
}
/// Get the video variant for this group
/// since this could actually be audio which would not be useful for
/// [Overseer] impl
let video_var = self.video_stream().unwrap_or(self.streams.first().unwrap());
let video_var_id = self
.video_stream()
.unwrap_or(self.streams.first().unwrap())
.id()
.clone();
// cleanup old segments
let deleted = self
.clean_segments()?
.into_iter()
.map(|seg| EgressSegment {
variant: video_var_id,
idx: seg.index,
duration: seg.duration,
path: PathBuf::from(Self::map_segment_path(
&self.out_dir,
&self.name,
seg.index,
self.segment_type,
)),
})
.collect();
// emit result of the previously completed segment,
let prev_seg = self.idx - 1;
let ret = NewSegment {
variant: *video_var.id(),
let created = EgressSegment {
variant: video_var_id,
idx: prev_seg,
duration,
path: PathBuf::from(Self::map_segment_path(
@ -280,7 +313,10 @@ impl HlsVariant {
)),
};
self.pkt_start = pkt_time;
Ok(ret)
Ok(EgressResult::Segments {
created: vec![created],
deleted,
})
}
fn video_stream(&self) -> Option<&HlsVariantStream> {
@ -289,22 +325,39 @@ impl HlsVariant {
.find(|a| matches!(*a, HlsVariantStream::Video { .. }))
}
fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
self.segments
.push(SegmentInfo(idx, duration, self.segment_type));
/// Add a new segment to the variant and return a list of deleted segments
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
self.segments.push(SegmentInfo {
index: idx,
duration,
kind: self.segment_type,
});
self.write_playlist()
}
/// Delete segments which are too old
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
const MAX_SEGMENTS: usize = 10;
let mut ret = vec![];
if self.segments.len() > MAX_SEGMENTS {
let n_drain = self.segments.len() - MAX_SEGMENTS;
let seg_dir = self.out_dir();
for seg in self.segments.drain(..n_drain) {
// delete file
let seg_path = seg_dir.join(seg.filename());
std::fs::remove_file(seg_path)?;
if let Err(e) = std::fs::remove_file(&seg_path) {
warn!(
"Failed to remove segment file: {} {}",
seg_path.display(),
e
);
}
ret.push(seg);
}
}
self.write_playlist()
Ok(ret)
}
fn write_playlist(&mut self) -> Result<()> {
@ -312,7 +365,7 @@ impl HlsVariant {
pl.target_duration = self.segment_length as u64;
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
pl.version = Some(3);
pl.media_sequence = self.segments.first().map(|s| s.0).unwrap_or(0);
pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0);
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
pl.write_to(&mut f_out)?;
@ -430,7 +483,7 @@ impl HlsMuxer {
&mut self,
pkt: *mut AVPacket,
variant: &Uuid,
) -> Result<Option<NewSegment>> {
) -> Result<EgressResult> {
for var in self.variants.iter_mut() {
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
// very important for muxer to know which stream this pkt belongs to

View File

@ -20,6 +20,10 @@ impl LocalOverseer {
#[async_trait]
impl Overseer for LocalOverseer {
async fn check_streams(&self) -> Result<()> {
todo!()
}
async fn start_stream(
&self,
_connection: &ConnectionInfo,

View File

@ -0,0 +1,80 @@
use crate::ingress::ConnectionInfo;
use crate::egress::EgressSegment;
use crate::pipeline::PipelineConfig;
use anyhow::Result;
use async_trait::async_trait;
use std::cmp::PartialEq;
use std::path::PathBuf;
use uuid::Uuid;
#[cfg(feature = "local-overseer")]
mod local;
#[cfg(feature = "webhook-overseer")]
mod webhook;
/// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr
#[derive(PartialEq, Clone)]
pub struct IngressInfo {
pub bitrate: usize,
pub streams: Vec<IngressStream>,
}
/// A copy of [ffmpeg_rs_raw::StreamInfo] without ptr
#[derive(PartialEq, Clone)]
pub struct IngressStream {
pub index: usize,
pub stream_type: IngressStreamType,
pub codec: isize,
pub format: isize,
pub width: usize,
pub height: usize,
pub fps: f32,
pub sample_rate: usize,
pub language: String,
}
#[derive(PartialEq, Eq, Clone)]
pub enum IngressStreamType {
Video,
Audio,
Subtitle,
}
#[async_trait]
/// The control process that oversees streaming operations
pub trait Overseer: Send + Sync {
/// Check all streams
async fn check_streams(&self) -> Result<()>;
/// Set up a new streaming pipeline
async fn start_stream(
&self,
connection: &ConnectionInfo,
stream_info: &IngressInfo,
) -> Result<PipelineConfig>;
/// A new segment(s) (HLS etc.) was generated for a stream variant
///
/// This handler is usually used for distribution / billing
async fn on_segments(
&self,
pipeline_id: &Uuid,
added: &Vec<EgressSegment>,
deleted: &Vec<EgressSegment>,
) -> Result<()>;
/// At a regular interval, pipeline will emit one of the frames for processing as a
/// thumbnail
async fn on_thumbnail(
&self,
pipeline_id: &Uuid,
width: usize,
height: usize,
path: &PathBuf,
) -> Result<()>;
/// Stream is finished
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>;
}

View File

@ -21,6 +21,10 @@ impl WebhookOverseer {
#[async_trait]
impl Overseer for WebhookOverseer {
async fn check_streams(&self) -> Result<()> {
todo!()
}
async fn start_stream(
&self,
connection: &ConnectionInfo,

View File

@ -73,6 +73,9 @@ pub struct PipelineRunner {
overseer: Arc<dyn Overseer>,
fps_counter_start: Instant,
fps_last_frame_ctr: u64,
/// Total number of frames produced
frame_ctr: u64,
out_dir: String,
}
@ -100,6 +103,7 @@ impl PipelineRunner {
fps_counter_start: Instant::now(),
egress: Vec::new(),
frame_ctr: 0,
fps_last_frame_ctr: 0,
info: None,
})
}
@ -140,7 +144,7 @@ impl PipelineRunner {
};
// run transcoder pipeline
let (mut pkt, stream) = self.demuxer.get_packet()?;
let (mut pkt, _stream) = self.demuxer.get_packet()?;
if pkt.is_null() {
return Ok(false);
}
@ -155,16 +159,14 @@ impl PipelineRunner {
};
let mut egress_results = vec![];
for frame in frames {
for (frame, stream) in frames {
// Copy frame from GPU if using hwaccel decoding
let mut frame = get_frame_from_hw(frame)?;
(*frame).time_base = (*stream).time_base;
let p = (*stream).codecpar;
if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
let pts_sec = ((*frame).pts as f64 * av_q2d((*stream).time_base)).floor() as u64;
// write thumbnail every 1min
if pts_sec % 60 == 0 && pts_sec != 0 {
if (self.frame_ctr % 1800) == 0 {
let dst_pic = PathBuf::from(&self.out_dir)
.join(config.id.to_string())
.join("thumb.webp");
@ -268,22 +270,24 @@ impl PipelineRunner {
// egress results
self.handle.block_on(async {
for er in egress_results {
if let EgressResult::NewSegment(seg) = er {
if let EgressResult::Segments { created, deleted } = er {
if let Err(e) = self
.overseer
.on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path)
.on_segments(&config.id, &created, &deleted)
.await
{
error!("Failed to process segment: {}", e);
bail!("Failed to process segment {}", e.to_string());
}
}
}
});
Ok(())
})?;
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
if elapsed >= 2f32 {
info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed);
let n_frames = self.frame_ctr - self.fps_last_frame_ctr;
info!("Average fps: {:.2}", n_frames as f32 / elapsed);
self.fps_counter_start = Instant::now();
self.frame_ctr = 0;
self.fps_last_frame_ctr = self.frame_ctr;
}
Ok(true)
}

View File

Before

Width:  |  Height:  |  Size: 39 KiB

After

Width:  |  Height:  |  Size: 39 KiB

View File

@ -8,8 +8,8 @@ default = []
test-pattern = []
[dependencies]
anyhow = "^1.0.70"
chrono = { version = "0.4.38", features = ["serde"] }
anyhow.workspace = true
chrono.workspace = true
uuid.workspace = true
sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono"] }
log = "0.4.22"
uuid = { version = "1.11.0", features = ["v4"] }

View File

@ -1,8 +1,9 @@
use crate::{User, UserStream};
use anyhow::Result;
use sqlx::{MySqlPool, Row};
use sqlx::{Executor, MySqlPool, Row};
use uuid::Uuid;
#[derive(Clone)]
pub struct ZapStreamDb {
db: MySqlPool,
}
@ -42,6 +43,16 @@ impl ZapStreamDb {
.map_err(anyhow::Error::new)?)
}
/// Update a users balance
pub async fn update_user_balance(&self, uid: u64, diff: i64) -> Result<()> {
sqlx::query("update user set balance = balance + ? where id = ?")
.bind(diff)
.bind(uid)
.execute(&self.db)
.await?;
Ok(())
}
pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id")
.bind(pubkey.as_slice())
@ -101,4 +112,45 @@ impl ZapStreamDb {
.await
.map_err(anyhow::Error::new)?)
}
/// Get the list of active streams
pub async fn list_live_streams(&self) -> Result<Vec<UserStream>> {
Ok(sqlx::query_as("select * from user_stream where state = 2")
.fetch_all(&self.db)
.await?)
}
/// Add [duration] & [cost] to a stream and return the new user balance
pub async fn tick_stream(
&self,
stream_id: &Uuid,
user_id: u64,
duration: f32,
cost: i64,
) -> Result<i64> {
let mut tx = self.db.begin().await?;
sqlx::query("update user_stream set duration = duration + ?, cost = cost + ? where id = ?")
.bind(&duration)
.bind(&cost)
.bind(stream_id.to_string())
.execute(&mut *tx)
.await?;
sqlx::query("update user set balance = balance - ? where id = ?")
.bind(&cost)
.bind(&user_id)
.execute(&mut *tx)
.await?;
let balance: i64 = sqlx::query("select balance from user where id = ?")
.bind(&user_id)
.fetch_one(&mut *tx)
.await?
.try_get(0)?;
tx.commit().await?;
Ok(balance)
}
}

View File

@ -1,7 +1,6 @@
use chrono::{DateTime, Utc};
use sqlx::{FromRow, Type};
use std::fmt::{Display, Formatter};
use uuid::Uuid;
#[derive(Debug, Clone, FromRow)]
pub struct User {

7
crates/zap-stream/Cargo.lock generated Normal file
View File

@ -0,0 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "zap-stream"
version = "0.1.0"

View File

@ -0,0 +1,43 @@
[package]
name = "zap-stream"
version = "0.1.0"
edition = "2021"
[features]
default = ["srt", "rtmp", "test-pattern"]
srt = ["zap-stream-core/srt"]
rtmp = ["zap-stream-core/rtmp"]
test-pattern = ["zap-stream-core/test-pattern", "zap-stream-db/test-pattern"]
[dependencies]
zap-stream-db = { path = "../zap-stream-db" }
zap-stream-core = { path = "../core" }
uuid.workspace = true
ffmpeg-rs-raw.workspace = true
anyhow.workspace = true
log.workspace = true
tokio.workspace = true
async-trait.workspace = true
serde.workspace = true
chrono.workspace = true
hex.workspace = true
url.workspace = true
# http setuff
hyper = { version = "1.5.1", features = ["server"] }
bytes = "1.8.0"
http-body-util = "0.1.2"
tokio-util = "0.7.13"
hyper-util = "0.1.10"
# direct deps
config = { version = "0.15.6", features = ["yaml"] }
nostr-sdk = { version = "0.38.0" }
fedimint-tonic-lnd = { version = "0.2.0", default-features = false, features = ["invoicesrpc", "versionrpc"] }
reqwest = { version = "0.12.9", features = ["stream", "json"] }
base64 = { version = "0.22.1" }
sha2 = { version = "0.10.8" }
pretty_env_logger = "0.5.0"
clap = { version = "4.5.16", features = ["derive"] }
futures-util = "0.3.31"

View File

@ -31,7 +31,7 @@ RUN git clone --single-branch --branch release/7.1 https://git.ffmpeg.org/ffmpeg
--disable-static \
--enable-shared && \
make -j$(nproc) && make install
RUN cargo install --path . --bin zap-stream-core --root /app/build --features zap-stream
RUN cargo install --path ./crates/zap-stream --root /app/build
FROM $IMAGE AS runner
WORKDIR /app
@ -40,4 +40,4 @@ RUN apt update && \
rm -rf /var/lib/apt/lists/*
COPY --from=build /app/build .
COPY --from=build /app/ffmpeg/lib/ /lib
ENTRYPOINT ["/app/bin/zap-stream-core"]
ENTRYPOINT ["/app/bin/zap-stream"]

View File

@ -5,6 +5,10 @@ endpoints:
- "rtmp://127.0.0.1:3336"
- "srt://127.0.0.1:3335"
- "tcp://127.0.0.1:3334"
- "test-pattern://"
# Public hostname which points to the IP address used to listen for all [endpoints]
endpoints_public_hostname: "localhost"
# Output directory for recording / hls
output_dir: "./out"
@ -38,6 +42,7 @@ listen_http: "127.0.0.1:8080"
#
overseer:
zap-stream:
cost: 16
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
#blossom:
# - "http://localhost:8881"

View File

@ -0,0 +1,2 @@
create database route96;
create database zap_stream;

View File

@ -0,0 +1,5 @@
listen: "0.0.0.0:8000"
database: "mysql://root:root@db:3306/route96"
storage_dir: "./data"
max_upload_bytes: 5e+9
public_url: "http://localhost:8881"

View File

@ -0,0 +1,144 @@
##
## Default strfry config
##
# Directory that contains the strfry LMDB database (restart required)
db = "./strfry-db/"
dbParams {
# Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required)
maxreaders = 256
# Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used) (restart required)
mapsize = 10995116277760
# Disables read-ahead when accessing the LMDB mapping. Reduces IO activity when DB size is larger than RAM. (restart required)
noReadAhead = false
}
events {
# Maximum size of normalised JSON, in bytes
maxEventSize = 65536
# Events newer than this will be rejected
rejectEventsNewerThanSeconds = 900
# Events older than this will be rejected
rejectEventsOlderThanSeconds = 94608000
# Ephemeral events older than this will be rejected
rejectEphemeralEventsOlderThanSeconds = 60
# Ephemeral events will be deleted from the DB when older than this
ephemeralEventsLifetimeSeconds = 300
# Maximum number of tags allowed
maxNumTags = 2000
# Maximum size for tag values, in bytes
maxTagValSize = 1024
}
relay {
# Interface to listen on. Use 0.0.0.0 to listen on all interfaces (restart required)
bind = "0.0.0.0"
# Port to open for the nostr websocket protocol (restart required)
port = 7777
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
nofiles = 0
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
realIpHeader = ""
info {
# NIP-11: Name of this server. Short/descriptive (< 30 characters)
name = "strfry default"
# NIP-11: Detailed information about relay, free-form
description = "This is a strfry instance."
# NIP-11: Administrative nostr pubkey, for contact purposes
pubkey = ""
# NIP-11: Alternative administrative contact (email, website, etc)
contact = ""
# NIP-11: URL pointing to an image to be used as an icon for the relay
icon = ""
# List of supported lists as JSON array, or empty string to use default. Example: "[1,2]"
nips = ""
}
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
maxWebsocketPayloadSize = 131072
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
autoPingSeconds = 55
# If TCP keep-alive should be enabled (detect dropped connections to upstream reverse proxy)
enableTcpKeepalive = false
# How much uninterrupted CPU time a REQ query should get during its DB scan
queryTimesliceBudgetMicroseconds = 10000
# Maximum records that can be returned per filter
maxFilterLimit = 500
# Maximum number of subscriptions (concurrent REQs) a connection can have open at any time
maxSubsPerConnection = 20
writePolicy {
# If non-empty, path to an executable script that implements the writePolicy plugin logic
plugin = ""
}
compression {
# Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required)
enabled = true
# Maintain a sliding window buffer for each connection. Improves compression, but uses more memory (restart required)
slidingWindow = true
}
logging {
# Dump all incoming messages
dumpInAll = false
# Dump all incoming EVENT messages
dumpInEvents = false
# Dump all incoming REQ/CLOSE messages
dumpInReqs = false
# Log performance metrics for initial REQ database scans
dbScanPerf = false
# Log reason for invalid event rejection? Can be disabled to silence excessive logging
invalidEvents = true
}
numThreads {
# Ingester threads: route incoming requests, validate events/sigs (restart required)
ingester = 3
# reqWorker threads: Handle initial DB scan for events (restart required)
reqWorker = 3
# reqMonitor threads: Handle filtering of new events (restart required)
reqMonitor = 3
# negentropy threads: Handle negentropy protocol messages (restart required)
negentropy = 2
}
negentropy {
# Support negentropy protocol messages
enabled = true
# Maximum records that sync will process before returning an error
maxSyncEvents = 1000000
}
}

View File

@ -18,14 +18,14 @@ services:
blossom:
depends_on:
- db
image: voidic/route96
image: voidic/route96:latest
environment:
- "RUST_LOG=info"
ports:
- "8881:8000"
volumes:
- "blossom:/app/data"
- "./dev-setup/route96.toml:/app/config.toml"
- "./dev-setup/route96.yaml:/app/config.yaml"
volumes:
db:
blossom:

View File

@ -0,0 +1,204 @@
use crate::http::check_nip98_auth;
use crate::settings::Settings;
use crate::ListenerEndpoint;
use anyhow::{anyhow, bail, Result};
use bytes::Bytes;
use fedimint_tonic_lnd::tonic::codegen::Body;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::{Method, Request, Response};
use nostr_sdk::{serde_json, Event, PublicKey};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::str::FromStr;
use url::Url;
use zap_stream_db::ZapStreamDb;
#[derive(Clone)]
pub struct Api {
db: ZapStreamDb,
settings: Settings,
}
impl Api {
pub fn new(db: ZapStreamDb, settings: Settings) -> Self {
Self { db, settings }
}
pub async fn handler(
self,
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
let base = Response::builder()
.header("server", "zap-stream")
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET");
Ok(match (req.method(), req.uri().path()) {
(&Method::GET, "/api/v1/account") => {
let auth = check_nip98_auth(&req)?;
let rsp = self.get_account(&auth.pubkey).await?;
return Ok(base.body(Self::body_json(&rsp)?)?);
}
(&Method::PATCH, "/api/v1/account") => {
let auth = check_nip98_auth(&req)?;
let body = req.collect().await?.to_bytes();
let r_body: PatchAccount = serde_json::from_slice(&body)?;
let rsp = self.update_account(&auth.pubkey, r_body).await?;
return Ok(base.body(Self::body_json(&rsp)?)?);
}
(&Method::GET, "/api/v1/topup") => {
let auth = check_nip98_auth(&req)?;
let url: Url = req.uri().to_string().parse()?;
let amount: usize = url
.query_pairs()
.find_map(|(k, v)| if k == "amount" { Some(v) } else { None })
.and_then(|v| v.parse().ok())
.ok_or(anyhow!("Missing amount"))?;
let rsp = self.topup(&auth.pubkey, amount).await?;
return Ok(base.body(Self::body_json(&rsp)?)?);
}
(&Method::PATCH, "/api/v1/event") => {
bail!("Not implemented")
}
(&Method::POST, "/api/v1/withdraw") => {
bail!("Not implemented")
}
(&Method::POST, "/api/v1/account/forward") => {
bail!("Not implemented")
}
(&Method::DELETE, "/api/v1/account/forward/<id>") => {
bail!("Not implemented")
}
(&Method::GET, "/api/v1/account/history") => {
bail!("Not implemented")
}
(&Method::GET, "/api/v1/account/keys") => {
bail!("Not implemented")
}
_ => {
if req.method() == Method::OPTIONS {
base.body(Default::default())?
} else {
base.status(404).body(Default::default())?
}
}
})
}
fn body_json<T: Serialize>(obj: &T) -> Result<BoxBody<Bytes, anyhow::Error>> {
Ok(Full::from(serde_json::to_string(obj)?)
.map_err(|e| match e {})
.boxed())
}
async fn get_account(&self, pubkey: &PublicKey) -> Result<AccountInfo> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
let user = self.db.get_user(uid).await?;
Ok(AccountInfo {
endpoints: self
.settings
.endpoints
.iter()
.filter_map(|e| match ListenerEndpoint::from_str(&e).ok()? {
ListenerEndpoint::SRT { endpoint } => {
let addr: SocketAddr = endpoint.parse().ok()?;
Some(Endpoint {
name: "SRT".to_string(),
url: format!(
"srt://{}:{}",
self.settings.endpoints_public_hostname,
addr.port()
),
key: user.stream_key.clone(),
capabilities: vec![],
})
}
ListenerEndpoint::RTMP { endpoint } => {
let addr: SocketAddr = endpoint.parse().ok()?;
Some(Endpoint {
name: "RTMP".to_string(),
url: format!(
"rtmp://{}:{}",
self.settings.endpoints_public_hostname,
addr.port()
),
key: user.stream_key.clone(),
capabilities: vec![],
})
}
ListenerEndpoint::TCP { endpoint } => {
let addr: SocketAddr = endpoint.parse().ok()?;
Some(Endpoint {
name: "TCP".to_string(),
url: format!(
"tcp://{}:{}",
self.settings.endpoints_public_hostname,
addr.port()
),
key: user.stream_key.clone(),
capabilities: vec![],
})
}
ListenerEndpoint::File { .. } => None,
ListenerEndpoint::TestPattern => None,
})
.collect(),
event: None,
balance: user.balance as u64,
tos: AccountTos {
accepted: user.tos_accepted.is_some(),
link: "https://zap.stream/tos".to_string(),
},
})
}
async fn update_account(&self, pubkey: &PublicKey, account: PatchAccount) -> Result<()> {
bail!("Not implemented")
}
async fn topup(&self, pubkey: &PublicKey, amount: usize) -> Result<TopupResponse> {
bail!("Not implemented")
}
}
#[derive(Deserialize, Serialize)]
struct AccountInfo {
pub endpoints: Vec<Endpoint>,
pub event: Option<Event>,
pub balance: u64,
pub tos: AccountTos,
}
#[derive(Deserialize, Serialize)]
struct Endpoint {
pub name: String,
pub url: String,
pub key: String,
pub capabilities: Vec<String>,
}
#[derive(Deserialize, Serialize)]
struct EndpointCost {
pub unit: String,
pub rate: u16,
}
#[derive(Deserialize, Serialize)]
struct AccountTos {
pub accepted: bool,
pub link: String,
}
#[derive(Deserialize, Serialize)]
struct PatchAccount {
pub accept_tos: Option<bool>,
}
#[derive(Deserialize, Serialize)]
struct TopupResponse {
pub pr: String,
}

View File

@ -59,15 +59,11 @@ impl Blossom {
) -> Result<BlobDescriptor> {
let mut f = File::open(from_file).await?;
let hash = Self::hash_file(&mut f).await?;
let auth_event = EventBuilder::new(
Kind::Custom(24242),
"Upload blob",
[
Tag::hashtag("upload"),
Tag::parse(&["x", &hash])?,
Tag::expiration(Timestamp::now().add(60)),
],
);
let auth_event = EventBuilder::new(Kind::Custom(24242), "Upload blob").tags([
Tag::hashtag("upload"),
Tag::parse(["x", &hash])?,
Tag::expiration(Timestamp::now().add(60)),
]);
let auth_event = auth_event.sign_with_keys(keys)?;

View File

@ -0,0 +1,118 @@
use crate::api::Api;
use crate::overseer::ZapStreamOverseer;
use anyhow::{bail, Result};
use base64::Engine;
use bytes::Bytes;
use futures_util::TryStreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{Frame, Incoming};
use hyper::service::Service;
use hyper::{Method, Request, Response};
use log::{error, info};
use nostr_sdk::{serde_json, Event};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use zap_stream_core::overseer::Overseer;
#[derive(Clone)]
pub struct HttpServer {
index: String,
files_dir: PathBuf,
api: Api,
}
impl HttpServer {
pub fn new(index: String, files_dir: PathBuf, api: Api) -> Self {
Self {
index,
files_dir,
api,
}
}
}
impl Service<Request<Incoming>> for HttpServer {
type Response = Response<BoxBody<Bytes, Self::Error>>;
type Error = anyhow::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
// check is index.html
if req.method() == Method::GET && req.uri().path() == "/"
|| req.uri().path() == "/index.html"
{
let index = self.index.clone();
return Box::pin(async move {
Ok(Response::builder()
.header("content-type", "text/html")
.header("server", "zap-stream-core")
.body(
Full::new(Bytes::from(index))
.map_err(|e| match e {})
.boxed(),
)?)
});
}
// check if mapped to file
let mut dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
if dst_path.exists() {
return Box::pin(async move {
let mut rsp = Response::builder()
.header("server", "zap-stream-core")
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET");
if req.method() == Method::HEAD {
return Ok(rsp.body(BoxBody::default())?);
}
let f = File::open(&dst_path).await?;
let f_stream = ReaderStream::new(f);
let body = StreamBody::new(
f_stream
.map_ok(Frame::data)
.map_err(|e| Self::Error::new(e)),
)
.boxed();
Ok(rsp.body(body)?)
});
}
// otherwise handle in overseer
let mut api = self.api.clone();
Box::pin(async move {
match api.handler(req).await {
Ok(res) => Ok(res),
Err(e) => {
error!("{}", e);
Ok(Response::builder().status(500).body(BoxBody::default())?)
}
}
})
}
}
pub fn check_nip98_auth(req: &Request<Incoming>) -> Result<Event> {
let auth = if let Some(a) = req.headers().get("authorization") {
a.to_str()?
} else {
bail!("Authorization header missing");
};
if !auth.starts_with("Nostr ") {
bail!("Invalid authorization scheme");
}
let json =
String::from_utf8(base64::engine::general_purpose::STANDARD.decode(auth[6..].as_bytes())?)?;
info!("{}", json);
// TODO: check tags
Ok(serde_json::from_str::<Event>(&json)?)
}

View File

@ -0,0 +1,183 @@
use anyhow::{bail, Result};
use clap::Parser;
use config::Config;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
use ffmpeg_rs_raw::{av_log_redirect, rstr};
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use log::{error, info};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use url::Url;
#[cfg(feature = "rtmp")]
use zap_stream_core::ingress::rtmp;
#[cfg(feature = "srt")]
use zap_stream_core::ingress::srt;
#[cfg(feature = "test-pattern")]
use zap_stream_core::ingress::test;
use crate::api::Api;
use crate::http::HttpServer;
use crate::monitor::BackgroundMonitor;
use crate::overseer::ZapStreamOverseer;
use crate::settings::Settings;
use zap_stream_core::ingress::{file, tcp};
use zap_stream_core::overseer::Overseer;
mod api;
mod blossom;
mod http;
mod monitor;
mod overseer;
mod settings;
#[derive(Parser, Debug)]
struct Args {}
#[tokio::main]
async fn main() -> Result<()> {
pretty_env_logger::init();
let _args = Args::parse();
unsafe {
av_log_set_callback(Some(av_log_redirect));
info!("FFMPEG version={}", rstr!(av_version_info()));
}
let builder = Config::builder()
.add_source(config::File::with_name("config.yaml"))
.add_source(config::Environment::with_prefix("APP"))
.build()?;
let settings: Settings = builder.try_deserialize()?;
let overseer = settings.get_overseer().await?;
// Create ingress listeners
let mut tasks = vec![];
for e in &settings.endpoints {
match try_create_listener(e, &settings.output_dir, &overseer) {
Ok(l) => tasks.push(l),
Err(e) => error!("{}", e),
}
}
let http_addr: SocketAddr = settings.listen_http.parse()?;
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
let api = Api::new(overseer.database(), settings.clone());
// HTTP server
let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api);
tasks.push(tokio::spawn(async move {
let listener = TcpListener::bind(&http_addr).await?;
loop {
let (socket, _) = listener.accept().await?;
let io = TokioIo::new(socket);
let server = server.clone();
tokio::spawn(async move {
if let Err(e) = http1::Builder::new().serve_connection(io, server).await {
error!("Failed to handle request: {}", e);
}
});
}
}));
// Background worker
let mut bg = BackgroundMonitor::new(overseer.clone());
tasks.push(tokio::spawn(async move {
loop {
if let Err(e) = bg.check().await {
error!("{}", e);
}
sleep(Duration::from_secs(10)).await;
}
}));
// Join tasks and get errors
for handle in tasks {
if let Err(e) = handle.await? {
error!("{e}");
}
}
info!("Server closed");
Ok(())
}
pub enum ListenerEndpoint {
SRT { endpoint: String },
RTMP { endpoint: String },
TCP { endpoint: String },
File { path: PathBuf },
TestPattern,
}
impl FromStr for ListenerEndpoint {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let url: Url = s.parse()?;
match url.scheme() {
"srt" => Ok(Self::SRT {
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
}),
"rtmp" => Ok(Self::RTMP {
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
}),
"tcp" => Ok(Self::TCP {
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
}),
"file" => Ok(Self::File {
path: PathBuf::from(url.path()),
}),
"test-pattern" => Ok(Self::TestPattern),
_ => bail!("Unsupported endpoint scheme: {}", url.scheme()),
}
}
}
fn try_create_listener(
u: &str,
out_dir: &str,
overseer: &Arc<ZapStreamOverseer>,
) -> Result<JoinHandle<Result<()>>> {
let ep = ListenerEndpoint::from_str(u)?;
match ep {
#[cfg(feature = "srt")]
ListenerEndpoint::SRT { endpoint } => Ok(tokio::spawn(srt::listen(
out_dir.to_string(),
endpoint,
overseer.clone(),
))),
#[cfg(feature = "rtmp")]
ListenerEndpoint::RTMP { endpoint } => Ok(tokio::spawn(rtmp::listen(
out_dir.to_string(),
endpoint,
overseer.clone(),
))),
ListenerEndpoint::TCP { endpoint } => Ok(tokio::spawn(tcp::listen(
out_dir.to_string(),
endpoint,
overseer.clone(),
))),
ListenerEndpoint::File { path } => Ok(tokio::spawn(file::listen(
out_dir.to_string(),
path,
overseer.clone(),
))),
#[cfg(feature = "test-pattern")]
ListenerEndpoint::TestPattern => Ok(tokio::spawn(test::listen(
out_dir.to_string(),
overseer.clone(),
))),
_ => {
bail!("Unknown endpoint config: {u}");
}
}
}

View File

@ -0,0 +1,19 @@
use crate::overseer::ZapStreamOverseer;
use anyhow::Result;
use std::sync::Arc;
use zap_stream_core::overseer::Overseer;
/// Monitor stream status, perform any necessary cleanup
pub struct BackgroundMonitor {
overseer: Arc<ZapStreamOverseer>,
}
impl BackgroundMonitor {
pub fn new(overseer: Arc<ZapStreamOverseer>) -> Self {
Self { overseer }
}
pub async fn check(&mut self) -> Result<()> {
self.overseer.check_streams().await
}
}

View File

@ -1,30 +1,44 @@
use crate::blossom::{BlobDescriptor, Blossom};
use crate::egress::hls::HlsEgress;
use crate::egress::EgressConfig;
use crate::ingress::ConnectionInfo;
use crate::overseer::{get_default_variants, IngressInfo, Overseer};
use crate::pipeline::{EgressType, PipelineConfig};
use crate::settings::LndSettings;
use crate::variant::StreamMapping;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use base64::alphabet::STANDARD;
use base64::Engine;
use bytes::Bytes;
use chrono::Utc;
use fedimint_tonic_lnd::verrpc::VersionRequest;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::Encoder;
use futures_util::FutureExt;
use log::{info, warn};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::{Method, Request, Response};
use log::{error, info, warn};
use nostr_sdk::bitcoin::PrivateKey;
use nostr_sdk::prelude::Coordinate;
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
use serde::Serialize;
use std::collections::HashSet;
use std::env::temp_dir;
use std::fs::create_dir_all;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use url::Url;
use uuid::Uuid;
use warp::Filter;
use zap_stream_core::egress::hls::HlsEgress;
use zap_stream_core::egress::{EgressConfig, EgressSegment};
use zap_stream_core::ingress::ConnectionInfo;
use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer};
use zap_stream_core::pipeline::{EgressType, PipelineConfig};
use zap_stream_core::variant::audio::AudioVariant;
use zap_stream_core::variant::mapping::VariantMapping;
use zap_stream_core::variant::video::VideoVariant;
use zap_stream_core::variant::{StreamMapping, VariantStream};
use zap_stream_db::sqlx::Encode;
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
@ -46,6 +60,11 @@ pub struct ZapStreamOverseer {
blossom_servers: Vec<Blossom>,
/// Public facing URL pointing to [out_dir]
public_url: String,
/// Cost / second / variant
cost: i64,
/// Currently active streams
/// Any streams which are not contained in this set are dead
active_streams: Arc<RwLock<HashSet<Uuid>>>,
}
impl ZapStreamOverseer {
@ -57,10 +76,25 @@ impl ZapStreamOverseer {
lnd: &LndSettings,
relays: &Vec<String>,
blossom_servers: &Option<Vec<String>>,
cost: i64,
) -> Result<Self> {
let db = ZapStreamDb::new(db).await?;
db.migrate().await?;
#[cfg(debug_assertions)]
{
let uid = db.upsert_user(&[0; 32]).await?;
db.update_user_balance(uid, 100_000_000).await?;
let user = db.get_user(uid).await?;
info!(
"ZERO pubkey: uid={},key={},balance={}",
user.id,
user.stream_key,
user.balance / 1000
);
}
let mut lnd = fedimint_tonic_lnd::connect(
lnd.address.clone(),
PathBuf::from(&lnd.cert),
@ -94,9 +128,15 @@ impl ZapStreamOverseer {
.map(|b| Blossom::new(b))
.collect(),
public_url: public_url.clone(),
cost,
active_streams: Arc::new(RwLock::new(HashSet::new())),
})
}
pub(crate) fn database(&self) -> ZapStreamDb {
self.db.clone()
}
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
let mut tags = vec![
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
@ -141,71 +181,95 @@ impl ZapStreamOverseer {
let kind = Kind::from(STREAM_EVENT_KIND);
let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id);
tags.push(Tag::parse(&[
tags.push(Tag::parse([
"alt",
&format!("Watch live on https://zap.stream/{}", coord.to_bech32()?),
])?);
Ok(EventBuilder::new(kind, "", tags))
Ok(EventBuilder::new(kind, "").tags(tags))
}
fn blob_to_event_builder(&self, stream: &BlobDescriptor) -> Result<EventBuilder> {
let tags = if let Some(tags) = stream.nip94.as_ref() {
tags.iter()
.map_while(|(k, v)| Tag::parse(&[k, v]).ok())
.map_while(|(k, v)| Tag::parse([k, v]).ok())
.collect()
} else {
let mut tags = vec![
Tag::parse(&["x", &stream.sha256])?,
Tag::parse(&["url", &stream.url])?,
Tag::parse(&["size", &stream.size.to_string()])?,
Tag::parse(["x", &stream.sha256])?,
Tag::parse(["url", &stream.url])?,
Tag::parse(["size", &stream.size.to_string()])?,
];
if let Some(m) = stream.mime_type.as_ref() {
tags.push(Tag::parse(&["m", m])?)
tags.push(Tag::parse(["m", m])?)
}
tags
};
Ok(EventBuilder::new(Kind::FileMetadata, "", tags))
Ok(EventBuilder::new(Kind::FileMetadata, "").tags(tags))
}
async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
let mut extra_tags = vec![
Tag::parse(&["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse(&[
let extra_tags = vec![
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse([
"streaming",
self.map_to_public_url(stream, "live.m3u8")?.as_str(),
self.map_to_stream_public_url(stream, "live.m3u8")?.as_str(),
])?,
Tag::parse(&[
Tag::parse([
"image",
self.map_to_public_url(stream, "thumb.webp")?.as_str(),
self.map_to_stream_public_url(stream, "thumb.webp")?
.as_str(),
])?,
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
];
// flag NIP94 streaming when using blossom servers
if self.blossom_servers.len() > 0 {
extra_tags.push(Tag::parse(&["streaming", "nip94"])?);
}
let ev = self
.stream_to_event_builder(stream)?
.add_tags(extra_tags)
.tags(extra_tags)
.sign_with_keys(&self.keys)?;
self.client.send_event(ev.clone()).await?;
Ok(ev)
}
fn map_to_public_url<'a>(
&self,
stream: &UserStream,
path: impl Into<&'a str>,
) -> Result<String> {
fn map_to_stream_public_url(&self, stream: &UserStream, path: &str) -> Result<String> {
self.map_to_public_url(&format!("{}/{}", stream.id, path))
}
fn map_to_public_url(&self, path: &str) -> Result<String> {
let u: Url = self.public_url.parse()?;
Ok(u.join(&format!("/{}/", stream.id))?
.join(path.into())?
.to_string())
Ok(u.join(path)?.to_string())
}
}
#[derive(Serialize)]
struct Endpoint {}
#[derive(Serialize)]
struct AccountInfo {
pub endpoints: Vec<Endpoint>,
pub event: Event,
pub balance: u64,
}
#[async_trait]
impl Overseer for ZapStreamOverseer {
async fn check_streams(&self) -> Result<()> {
let active_streams = self.db.list_live_streams().await?;
for stream in active_streams {
// check
let id = Uuid::parse_str(&stream.id)?;
info!("Checking stream is alive: {}", stream.id);
let is_active = {
let streams = self.active_streams.read().await;
streams.contains(&id)
};
if !is_active {
if let Err(e) = self.on_end(&id).await {
error!("Failed to end dead stream {}: {}", &id, e);
}
}
}
Ok(())
}
async fn start_stream(
&self,
connection: &ConnectionInfo,
@ -217,6 +281,11 @@ impl Overseer for ZapStreamOverseer {
.await?
.ok_or_else(|| anyhow::anyhow!("User not found"))?;
let user = self.db.get_user(uid).await?;
if user.balance <= 0 {
bail!("Not enough balance");
}
let variants = get_default_variants(&stream_info)?;
let mut egress = vec![];
@ -225,7 +294,6 @@ impl Overseer for ZapStreamOverseer {
variants: variants.iter().map(|v| v.id()).collect(),
}));
let user = self.db.get_user(uid).await?;
let stream_id = Uuid::new_v4();
// insert new stream record
let mut new_stream = UserStream {
@ -238,8 +306,12 @@ impl Overseer for ZapStreamOverseer {
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
new_stream.event = Some(stream_event.as_json());
let mut streams = self.active_streams.write().await;
streams.insert(stream_id.clone());
self.db.insert_stream(&new_stream).await?;
self.db.update_stream(&new_stream).await?;
Ok(PipelineConfig {
id: stream_id,
variants,
@ -247,42 +319,65 @@ impl Overseer for ZapStreamOverseer {
})
}
async fn on_segment(
async fn on_segments(
&self,
pipeline_id: &Uuid,
variant_id: &Uuid,
index: u64,
duration: f32,
path: &PathBuf,
added: &Vec<EgressSegment>,
deleted: &Vec<EgressSegment>,
) -> Result<()> {
// Upload to blossom servers if configured
let mut blobs = vec![];
for b in &self.blossom_servers {
blobs.push(b.upload(path, &self.keys, Some("video/mp2t")).await?);
let stream = self.db.get_stream(pipeline_id).await?;
let duration = added.iter().fold(0.0, |acc, v| acc + v.duration);
let cost = self.cost * duration.round() as i64;
let bal = self
.db
.tick_stream(pipeline_id, stream.user_id, duration, cost)
.await?;
if bal <= 0 {
bail!("Not enough balance");
}
if let Some(blob) = blobs.first() {
let a_tag = format!(
"{}:{}:{}",
STREAM_EVENT_KIND,
self.keys.public_key.to_hex(),
pipeline_id
);
let mut n94 = self.blob_to_event_builder(blob)?.add_tags([
Tag::parse(&["a", &a_tag])?,
Tag::parse(&["d", variant_id.to_string().as_str()])?,
Tag::parse(&["duration", duration.to_string().as_str()])?,
]);
for b in blobs.iter().skip(1) {
n94 = n94.add_tags(Tag::parse(&["url", &b.url]));
// Upload to blossom servers if configured (N94)
let mut blobs = vec![];
for seg in added {
for b in &self.blossom_servers {
blobs.push(b.upload(&seg.path, &self.keys, Some("video/mp2t")).await?);
}
let n94 = n94.sign_with_keys(&self.keys)?;
let cc = self.client.clone();
tokio::spawn(async move {
if let Err(e) = cc.send_event(n94).await {
warn!("Error sending event: {}", e);
if let Some(blob) = blobs.first() {
let a_tag = format!(
"{}:{}:{}",
STREAM_EVENT_KIND,
self.keys.public_key.to_hex(),
pipeline_id
);
let mut n94 = self.blob_to_event_builder(blob)?.tags([
Tag::parse(["a", &a_tag])?,
Tag::parse(["d", seg.variant.to_string().as_str()])?,
Tag::parse(["index", seg.idx.to_string().as_str()])?,
]);
// some servers add duration tag
if blob
.nip94
.as_ref()
.map(|a| a.contains_key("duration"))
.is_none()
{
n94 = n94.tag(Tag::parse(["duration", seg.duration.to_string().as_str()])?);
}
});
info!("Published N94 segment to {}", blob.url);
for b in blobs.iter().skip(1) {
n94 = n94.tag(Tag::parse(["url", &b.url])?);
}
let n94 = n94.sign_with_keys(&self.keys)?;
let cc = self.client.clone();
tokio::spawn(async move {
if let Err(e) = cc.send_event(n94).await {
warn!("Error sending event: {}", e);
}
});
info!("Published N94 segment to {}", blob.url);
}
}
Ok(())
@ -303,6 +398,9 @@ impl Overseer for ZapStreamOverseer {
let mut stream = self.db.get_stream(pipeline_id).await?;
let user = self.db.get_user(stream.user_id).await?;
let mut streams = self.active_streams.write().await;
streams.remove(pipeline_id);
stream.state = UserStreamState::Ended;
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
stream.event = Some(event.as_json());
@ -312,3 +410,64 @@ impl Overseer for ZapStreamOverseer {
Ok(())
}
}
fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
let mut vars: Vec<VariantStream> = vec![];
if let Some(video_src) = info
.streams
.iter()
.find(|c| c.stream_type == IngressStreamType::Video)
{
vars.push(VariantStream::CopyVideo(VariantMapping {
id: Uuid::new_v4(),
src_index: video_src.index,
dst_index: 0,
group_id: 0,
}));
vars.push(VariantStream::Video(VideoVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: video_src.index,
dst_index: 1,
group_id: 1,
},
width: 1280,
height: 720,
fps: video_src.fps,
bitrate: 3_000_000,
codec: "libx264".to_string(),
profile: 100,
level: 51,
keyframe_interval: video_src.fps as u16 * 2,
pixel_format: AV_PIX_FMT_YUV420P as u32,
}));
}
if let Some(audio_src) = info
.streams
.iter()
.find(|c| c.stream_type == IngressStreamType::Audio)
{
vars.push(VariantStream::CopyAudio(VariantMapping {
id: Uuid::new_v4(),
src_index: audio_src.index,
dst_index: 2,
group_id: 0,
}));
vars.push(VariantStream::Audio(AudioVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: audio_src.index,
dst_index: 3,
group_id: 1,
},
bitrate: 192_000,
codec: "aac".to_string(),
channels: 2,
sample_rate: 48_000,
sample_fmt: "fltp".to_owned(),
}));
}
Ok(vars)
}

View File

@ -1,4 +1,6 @@
use crate::overseer::ZapStreamOverseer;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Settings {
@ -9,6 +11,9 @@ pub struct Settings {
/// - rtmp://localhost:1935
pub endpoints: Vec<String>,
/// Public facing hostname that maps to [endpoints]
pub endpoints_public_hostname: String,
/// Where to store output (static files)
pub output_dir: String,
@ -18,7 +23,7 @@ pub struct Settings {
/// Binding address for http server serving files from [output_dir]
pub listen_http: String,
/// Overseer service see [crate::overseer::Overseer] for more info
/// Overseer service see [Overseer] for more info
pub overseer: OverseerConfig,
}
@ -44,6 +49,8 @@ pub enum OverseerConfig {
nsec: String,
/// Blossom servers
blossom: Option<Vec<String>>,
/// Cost (milli-sats) / second / variant
cost: i64,
},
}
@ -53,3 +60,33 @@ pub struct LndSettings {
pub cert: String,
pub macaroon: String,
}
impl Settings {
pub async fn get_overseer(&self) -> anyhow::Result<Arc<ZapStreamOverseer>> {
match &self.overseer {
OverseerConfig::ZapStream {
nsec: private_key,
database,
lnd,
relays,
blossom,
cost,
} => Ok(Arc::new(
ZapStreamOverseer::new(
&self.output_dir,
&self.public_url,
private_key,
database,
lnd,
relays,
blossom,
*cost,
)
.await?,
)),
_ => {
panic!("Unsupported overseer");
}
}
}
}

View File

Before

Width:  |  Height:  |  Size: 118 KiB

After

Width:  |  Height:  |  Size: 118 KiB

View File

@ -1,121 +0,0 @@
use anyhow::{bail, Result};
use clap::Parser;
use config::Config;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
use ffmpeg_rs_raw::{av_log_redirect, rstr};
use log::{error, info};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::task::JoinHandle;
use url::Url;
use warp::{cors, Filter};
#[cfg(feature = "rtmp")]
use zap_stream_core::ingress::rtmp;
#[cfg(feature = "srt")]
use zap_stream_core::ingress::srt;
#[cfg(feature = "test-pattern")]
use zap_stream_core::ingress::test;
use zap_stream_core::ingress::{file, tcp};
use zap_stream_core::overseer::Overseer;
use zap_stream_core::settings::Settings;
#[derive(Parser, Debug)]
struct Args {}
#[tokio::main]
async fn main() -> Result<()> {
pretty_env_logger::init();
let _args = Args::parse();
unsafe {
av_log_set_callback(Some(av_log_redirect));
info!("FFMPEG version={}", rstr!(av_version_info()));
}
let builder = Config::builder()
.add_source(config::File::with_name("config.yaml"))
.add_source(config::Environment::with_prefix("APP"))
.build()?;
let settings: Settings = builder.try_deserialize()?;
let overseer = settings.get_overseer().await?;
let mut listeners = vec![];
for e in &settings.endpoints {
match try_create_listener(e, &settings.output_dir, &overseer) {
Ok(l) => listeners.push(l),
Err(e) => error!("{}", e),
}
}
let http_addr: SocketAddr = settings.listen_http.parse()?;
let http_dir = settings.output_dir.clone();
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
listeners.push(tokio::spawn(async move {
let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
let index_handle = warp::get()
.or(warp::path("index.html"))
.and(warp::path::end())
.map(move |_| warp::reply::html(index_html.clone()));
let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors);
warp::serve(index_handle.or(dir_handle))
.run(http_addr)
.await;
Ok(())
}));
for handle in listeners {
if let Err(e) = handle.await? {
error!("{e}");
}
}
info!("Server closed");
Ok(())
}
fn try_create_listener(
u: &str,
out_dir: &str,
overseer: &Arc<dyn Overseer>,
) -> Result<JoinHandle<Result<()>>> {
let url: Url = u.parse()?;
match url.scheme() {
#[cfg(feature = "srt")]
"srt" => Ok(tokio::spawn(srt::listen(
out_dir.to_string(),
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
overseer.clone(),
))),
#[cfg(feature = "srt")]
"rtmp" => Ok(tokio::spawn(rtmp::listen(
out_dir.to_string(),
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
overseer.clone(),
))),
"tcp" => Ok(tokio::spawn(tcp::listen(
out_dir.to_string(),
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
overseer.clone(),
))),
"file" => Ok(tokio::spawn(file::listen(
out_dir.to_string(),
PathBuf::from(url.path()),
overseer.clone(),
))),
#[cfg(feature = "test-pattern")]
"test-pattern" => Ok(tokio::spawn(test::listen(
out_dir.to_string(),
overseer.clone(),
))),
_ => {
bail!("Unknown endpoint config: {u}");
}
}
}

View File

@ -1,194 +0,0 @@
use crate::ingress::ConnectionInfo;
#[cfg(feature = "local-overseer")]
use crate::overseer::local::LocalOverseer;
#[cfg(feature = "webhook-overseer")]
use crate::overseer::webhook::WebhookOverseer;
#[cfg(feature = "zap-stream")]
use crate::overseer::zap_stream::ZapStreamOverseer;
use crate::pipeline::PipelineConfig;
#[cfg(any(
feature = "local-overseer",
feature = "webhook-overseer",
feature = "zap-stream"
))]
use crate::settings::OverseerConfig;
use crate::settings::Settings;
use crate::variant::audio::AudioVariant;
use crate::variant::mapping::VariantMapping;
use crate::variant::video::VideoVariant;
use crate::variant::VariantStream;
use anyhow::Result;
use async_trait::async_trait;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use std::cmp::PartialEq;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
#[cfg(feature = "local-overseer")]
mod local;
#[cfg(feature = "webhook-overseer")]
mod webhook;
#[cfg(feature = "zap-stream")]
mod zap_stream;
/// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr
#[derive(PartialEq, Clone)]
pub struct IngressInfo {
pub bitrate: usize,
pub streams: Vec<IngressStream>,
}
/// A copy of [ffmpeg_rs_raw::StreamInfo] without ptr
#[derive(PartialEq, Clone)]
pub struct IngressStream {
pub index: usize,
pub stream_type: IngressStreamType,
pub codec: isize,
pub format: isize,
pub width: usize,
pub height: usize,
pub fps: f32,
pub sample_rate: usize,
pub language: String,
}
#[derive(PartialEq, Eq, Clone)]
pub enum IngressStreamType {
Video,
Audio,
Subtitle,
}
#[async_trait]
/// The control process that oversees streaming operations
pub trait Overseer: Send + Sync {
/// Set up a new streaming pipeline
async fn start_stream(
&self,
connection: &ConnectionInfo,
stream_info: &IngressInfo,
) -> Result<PipelineConfig>;
/// A new segment (HLS etc.) was generated for a stream variant
///
/// This handler is usually used for distribution / billing
async fn on_segment(
&self,
pipeline_id: &Uuid,
variant_id: &Uuid,
index: u64,
duration: f32,
path: &PathBuf,
) -> Result<()>;
/// At a regular interval, pipeline will emit one of the frames for processing as a
/// thumbnail
async fn on_thumbnail(
&self,
pipeline_id: &Uuid,
width: usize,
height: usize,
path: &PathBuf,
) -> Result<()>;
/// Stream is finished
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>;
}
impl Settings {
pub async fn get_overseer(&self) -> Result<Arc<dyn Overseer>> {
match &self.overseer {
#[cfg(feature = "local-overseer")]
OverseerConfig::Local => Ok(Arc::new(LocalOverseer::new())),
#[cfg(feature = "webhook-overseer")]
OverseerConfig::Webhook { url } => Ok(Arc::new(WebhookOverseer::new(&url))),
#[cfg(feature = "zap-stream")]
OverseerConfig::ZapStream {
nsec: private_key,
database,
lnd,
relays,
blossom,
} => Ok(Arc::new(
ZapStreamOverseer::new(
&self.output_dir,
&self.public_url,
private_key,
database,
lnd,
relays,
blossom,
)
.await?,
)),
_ => {
panic!("Unsupported overseer");
}
}
}
}
pub(crate) fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
let mut vars: Vec<VariantStream> = vec![];
if let Some(video_src) = info
.streams
.iter()
.find(|c| c.stream_type == IngressStreamType::Video)
{
vars.push(VariantStream::CopyVideo(VariantMapping {
id: Uuid::new_v4(),
src_index: video_src.index,
dst_index: 0,
group_id: 0,
}));
vars.push(VariantStream::Video(VideoVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: video_src.index,
dst_index: 1,
group_id: 1,
},
width: 1280,
height: 720,
fps: video_src.fps,
bitrate: 3_000_000,
codec: "libx264".to_string(),
profile: 100,
level: 51,
keyframe_interval: video_src.fps as u16 * 2,
pixel_format: AV_PIX_FMT_YUV420P as u32,
}));
}
if let Some(audio_src) = info
.streams
.iter()
.find(|c| c.stream_type == IngressStreamType::Audio)
{
vars.push(VariantStream::CopyAudio(VariantMapping {
id: Uuid::new_v4(),
src_index: audio_src.index,
dst_index: 2,
group_id: 0,
}));
vars.push(VariantStream::Audio(AudioVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: audio_src.index,
dst_index: 3,
group_id: 1,
},
bitrate: 192_000,
codec: "aac".to_string(),
channels: 2,
sample_rate: 48_000,
sample_fmt: "fltp".to_owned(),
}));
}
Ok(vars)
}