diff --git a/Cargo.lock b/Cargo.lock index c32cfdc..99897da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -857,9 +857,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.7.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e60eed09d8c01d3cee5b7d30acb059b76614c918fa0f992e0dd6eeb10daad6f" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" [[package]] name = "data-url" @@ -964,7 +964,7 @@ checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" dependencies = [ "humantime", "is-terminal", - "log", + "log 0.4.25", "regex", "termcolor", ] @@ -1045,12 +1045,12 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=29ab0547478256c574766b4acc6fcda8ebf4cae6#29ab0547478256c574766b4acc6fcda8ebf4cae6" +source = "git+https://github.com/v0l/ffmpeg-rs-raw.git?rev=8307b0a225267cefac9c174d5f6a0314a2f0a66b#8307b0a225267cefac9c174d5f6a0314a2f0a66b" dependencies = [ "anyhow", "ffmpeg-sys-the-third", "libc", - "log", + "log 0.4.25", "slimbox", ] @@ -1123,16 +1123,16 @@ dependencies = [ [[package]] name = "fontdb" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3a6f9af55fb97ad673fb7a69533eb2f967648a06fa21f8c9bb2cd6d33975716" +checksum = "457e789b3d1202543297a350643cf459f836cade38934e7a4cf6a39e7cde2905" dependencies = [ "fontconfig-parser", - "log", + "log 0.4.25", "memmap2", "slotmap", "tinyvec", - "ttf-parser 0.24.1", + "ttf-parser 0.25.1", ] [[package]] @@ -1857,9 +1857,9 @@ dependencies = [ [[package]] name = "image-webp" -version = "0.1.3" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f79afb8cbee2ef20f59ccd477a218c12a93943d075b492015ecb1bb81f8ee904" +checksum = "b77d01e822461baa8409e156015a1d91735549f0f2c17691bd2d996bef238f7f" dependencies = [ "byteorder-lite", "quick-error", @@ -2069,6 +2069,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" +dependencies = [ + "log 0.4.25", +] + [[package]] name = "log" version = "0.4.25" @@ -2161,6 +2170,16 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "mustache" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51956ef1c5d20a1384524d91e616fb44dfc7d8f249bf696d49c97dd3289ecab5" +dependencies = [ + "log 0.3.9", + "serde", +] + [[package]] name = "native-tls" version = "0.2.13" @@ -2168,7 +2187,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dab59f8e050d5df8e4dd87d9206fb6f65a483e20ac9fda365ade4fab353196c" dependencies = [ "libc", - "log", + "log 0.4.25", "openssl", "openssl-probe", "openssl-sys", @@ -2627,7 +2646,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" dependencies = [ "env_logger", - "log", + "log 0.4.25", ] [[package]] @@ -2668,7 +2687,7 @@ dependencies = [ "bytes", "heck", "itertools 0.12.1", - "log", + "log 0.4.25", "multimap", "once_cell", "petgraph", @@ -2806,7 +2825,7 @@ dependencies = [ "hyper-util", "ipnet", "js-sys", - "log", + "log 0.4.25", "mime", "native-tls", "once_cell", @@ -2833,13 +2852,13 @@ dependencies = [ [[package]] name = "resvg" -version = "0.44.0" +version = "0.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a325d5e8d1cebddd070b13f44cec8071594ab67d1012797c121f27a669b7958" +checksum = "a8928798c0a55e03c9ca6c4c6846f76377427d2c1e1f7e6de3c06ae57942df43" dependencies = [ "gif", "image-webp", - "log", + "log 0.4.25", "pico-args", "rgb", "svgtypes", @@ -2996,7 +3015,7 @@ version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ - "log", + "log 0.4.25", "ring", "rustls-webpki 0.101.7", "sct", @@ -3069,16 +3088,16 @@ checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" [[package]] name = "rustybuzz" -version = "0.18.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c85d1ccd519e61834798eb52c4e886e8c2d7d698dd3d6ce0b1b47eb8557f1181" +checksum = "fd3c7c96f8a08ee34eff8857b11b49b07d71d1c3f4e88f8a88d4c9e9f90b1702" dependencies = [ "bitflags 2.8.0", "bytemuck", "core_maths", - "log", + "log 0.4.25", "smallvec", - "ttf-parser 0.24.1", + "ttf-parser 0.25.1", "unicode-bidi-mirroring", "unicode-ccc", "unicode-properties", @@ -3315,7 +3334,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a9c6883ca9c3c7c90e888de77b7a5c849c779d25d74a1269b0218b14e8b136c" dependencies = [ - "log", + "log 0.4.25", ] [[package]] @@ -3418,7 +3437,7 @@ dependencies = [ "hashbrown 0.15.2", "hashlink 0.10.0", "indexmap 2.7.1", - "log", + "log 0.4.25", "memchr", "once_cell", "percent-encoding", @@ -3497,7 +3516,7 @@ dependencies = [ "hkdf", "hmac 0.12.1", "itoa", - "log", + "log 0.4.25", "md-5", "memchr", "once_cell", @@ -3537,7 +3556,7 @@ dependencies = [ "hmac 0.12.1", "home", "itoa", - "log", + "log 0.4.25", "md-5", "memchr", "once_cell", @@ -3568,7 +3587,7 @@ dependencies = [ "futures-intrusive", "futures-util", "libsqlite3-sys", - "log", + "log 0.4.25", "percent-encoding", "serde", "serde_urlencoded", @@ -3594,7 +3613,7 @@ dependencies = [ "hex", "hmac 0.12.1", "keyed_priority_queue", - "log", + "log 0.4.25", "pbkdf2", "rand", "regex", @@ -3613,7 +3632,7 @@ checksum = "0a55cb90afac5672b00954e3291846dd262cfef3b52d1b507f580180433373d3" dependencies = [ "bytes", "futures", - "log", + "log 0.4.25", "rand", "socket2", "srt-protocol", @@ -3824,7 +3843,7 @@ dependencies = [ "arrayvec", "bytemuck", "cfg-if", - "log", + "log 0.4.25", "png", "tiny-skia-path", ] @@ -3963,7 +3982,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" dependencies = [ "futures-util", - "log", + "log 0.4.25", "rustls 0.23.21", "rustls-pki-types", "tokio", @@ -4115,7 +4134,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ - "log", + "log 0.4.25", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4161,9 +4180,9 @@ checksum = "2c591d83f69777866b9126b24c6dd9a18351f177e49d625920d19f989fd31cf8" [[package]] name = "ttf-parser" -version = "0.24.1" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5be21190ff5d38e8b4a2d3b6a3ae57f612cc39c96e83cedeaf7abc338a8bac4a" +checksum = "d2df906b07856748fa3f6e0ad0cbaa047052d4a7dd609e231c4f72cee8c36f31" dependencies = [ "core_maths", ] @@ -4179,7 +4198,7 @@ dependencies = [ "data-encoding", "http 1.2.0", "httparse", - "log", + "log 0.4.25", "rand", "rustls 0.23.21", "rustls-pki-types", @@ -4208,15 +4227,15 @@ checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" [[package]] name = "unicode-bidi-mirroring" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64af057ad7466495ca113126be61838d8af947f41d93a949980b2389a118082f" +checksum = "5dfa6e8c60bb66d49db113e0125ee8711b7647b5579dc7f5f19c42357ed039fe" [[package]] name = "unicode-ccc" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "260bc6647b3893a9a90668360803a15f96b85a5257b1c3a0c3daf6ae2496de42" +checksum = "ce61d488bcdc9bc8b5d1772c404828b17fc481c0a582b5581e95fb233aef503e" [[package]] name = "unicode-ident" @@ -4287,9 +4306,9 @@ dependencies = [ [[package]] name = "usvg" -version = "0.44.0" +version = "0.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7447e703d7223b067607655e625e0dbca80822880248937da65966194c4864e6" +checksum = "80be9b06fbae3b8b303400ab20778c80bbaf338f563afe567cf3c9eea17b47ef" dependencies = [ "base64 0.22.1", "data-url", @@ -4297,7 +4316,7 @@ dependencies = [ "fontdb", "imagesize", "kurbo", - "log", + "log 0.4.25", "pico-args", "roxmltree", "rustybuzz", @@ -4407,7 +4426,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" dependencies = [ "bumpalo", - "log", + "log 0.4.25", "proc-macro2", "quote", "syn", @@ -4792,9 +4811,10 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-util", - "log", + "log 0.4.25", "m3u8-rs", "matchit 0.8.6", + "mustache", "nostr-sdk", "pretty_env_logger", "reqwest", @@ -4814,17 +4834,20 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "data-encoding", "ffmpeg-rs-raw", "fontdue", "futures-util", "hex", "itertools 0.14.0", - "log", + "libc", + "log 0.4.25", "m3u8-rs", "resvg", "ringbuf", "rml_rtmp", "serde", + "sha2 0.10.8", "srt-tokio", "tiny-skia", "tokio", diff --git a/Cargo.toml b/Cargo.toml index e80c695..ff49da2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,10 @@ members = [ opt-level = 3 lto = true codegen-units = 1 -panic = "abort" +panic = "unwind" [workspace.dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "29ab0547478256c574766b4acc6fcda8ebf4cae6" } +ffmpeg-rs-raw = { git = "https://github.com/v0l/ffmpeg-rs-raw.git", rev = "8307b0a225267cefac9c174d5f6a0314a2f0a66b" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } async-trait = "0.1.77" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 14af795..2572634 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -4,18 +4,9 @@ version = "0.1.0" edition = "2021" [features] -default = ["test-pattern", "srt", "rtmp"] +default = ["srt", "rtmp"] srt = ["dep:srt-tokio"] rtmp = ["dep:rml_rtmp"] -local-overseer = [] # WIP -webhook-overseer = [] # WIP -test-pattern = [ - "dep:resvg", - "dep:usvg", - "dep:tiny-skia", - "dep:fontdue", - "dep:ringbuf", -] [dependencies] ffmpeg-rs-raw.workspace = true @@ -27,20 +18,20 @@ uuid.workspace = true serde.workspace = true hex.workspace = true itertools.workspace = true -futures-util = "0.3.30" m3u8-rs.workspace = true sha2.workspace = true data-encoding.workspace = true +futures-util = "0.3.30" +resvg = "0.45.1" +usvg = "0.45.1" +tiny-skia = "0.11.4" +fontdue = "0.9.2" +ringbuf = "0.4.7" + # srt srt-tokio = { version = "0.4.3", optional = true } # rtmp rml_rtmp = { version = "0.8.0", optional = true } - -# test-pattern -resvg = { version = "0.44.0", optional = true } -usvg = { version = "0.44.0", optional = true } -tiny-skia = { version = "0.11.4", optional = true } -fontdue = { version = "0.9.2", optional = true } -ringbuf = { version = "0.4.7", optional = true } \ No newline at end of file +libc = "0.2.169" \ No newline at end of file diff --git a/crates/core/src/generator.rs b/crates/core/src/generator.rs new file mode 100644 index 0000000..da03923 --- /dev/null +++ b/crates/core/src/generator.rs @@ -0,0 +1,431 @@ +use crate::overseer::IngressStream; +use anyhow::{bail, Result}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_RGB; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_RGBA; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_channel_layout_default, av_frame_alloc, av_frame_free, av_frame_get_buffer, AVFrame, + AVPixelFormat, AVRational, +}; +use ffmpeg_rs_raw::Scaler; +use fontdue::layout::{CoordinateSystem, Layout, TextStyle}; +use fontdue::Font; +use std::mem::transmute; +use std::time::{Duration, Instant}; +use std::{ptr, slice}; + +/// Frame generator +pub struct FrameGenerator { + fps: f32, + width: u16, + height: u16, + video_sample_fmt: AVPixelFormat, + + audio_sample_rate: u32, + audio_frame_size: i32, + audio_channels: u8, + + frame_idx: u64, + audio_samples: u64, + + // internal + next_frame: *mut AVFrame, + scaler: Scaler, + font: Font, + start: Instant, +} + +impl Drop for FrameGenerator { + fn drop(&mut self) { + unsafe { + if !self.next_frame.is_null() { + av_frame_free(&mut self.next_frame); + self.next_frame = std::ptr::null_mut(); + } + } + } +} + +impl FrameGenerator { + pub fn new( + fps: f32, + width: u16, + height: u16, + pix_fmt: AVPixelFormat, + sample_rate: u32, + frame_size: i32, + channels: u8, + ) -> Result { + let font = include_bytes!("../SourceCodePro-Regular.ttf") as &[u8]; + let font = Font::from_bytes(font, Default::default()).unwrap(); + + Ok(Self { + fps, + width, + height, + video_sample_fmt: pix_fmt, + audio_sample_rate: sample_rate, + audio_frame_size: frame_size, + audio_channels: channels, + frame_idx: 0, + audio_samples: 0, + font, + start: Instant::now(), + scaler: Scaler::default(), + next_frame: ptr::null_mut(), + }) + } + + pub fn from_stream( + video_stream: &IngressStream, + audio_stream: Option<&IngressStream>, + ) -> Result { + Ok(Self::new( + video_stream.fps, + video_stream.width as _, + video_stream.height as _, + unsafe { transmute(video_stream.format as i32) }, + audio_stream.map(|i| i.sample_rate as _).unwrap_or(0), + if audio_stream.is_none() { 0 } else { 1024 }, + audio_stream.map(|i| i.channels as _).unwrap_or(0), + )?) + } + + pub fn frame_no(&self) -> u64 { + self.frame_idx + } + + /// Create a new frame for composing text / images + pub fn begin(&mut self) -> Result<()> { + if self.next_frame.is_null() { + unsafe { + let mut src_frame = av_frame_alloc(); + if src_frame.is_null() { + bail!("Failed to allocate placeholder video frame"); + } + + (*src_frame).width = self.width as _; + (*src_frame).height = self.height as _; + (*src_frame).pict_type = AV_PICTURE_TYPE_NONE; + (*src_frame).key_frame = 1; + (*src_frame).colorspace = AVCOL_SPC_RGB; + //internally always use RGBA, we convert frame to target pixel format at the end + (*src_frame).format = AV_PIX_FMT_RGBA as _; + (*src_frame).pts = self.frame_idx as _; + (*src_frame).duration = 1; + (*src_frame).time_base = AVRational { + num: 1, + den: self.fps as i32, + }; + if av_frame_get_buffer(src_frame, 0) < 0 { + av_frame_free(&mut src_frame); + bail!("Failed to get frame buffer"); + } + self.next_frame = src_frame; + } + } + Ok(()) + } + + /// Write some text into the next frame + pub fn write_text(&mut self, msg: &str, size: f32, x: f32, y: f32) -> Result<()> { + if self.next_frame.is_null() { + bail!("Must call begin() before writing text") + } + let mut layout = Layout::new(CoordinateSystem::PositiveYDown); + layout.append(&[&self.font], &TextStyle::new(msg, size, 0)); + + self.write_layout(layout, x, y)?; + Ok(()) + } + + /// Write text layout into frame + fn write_layout(&mut self, layout: Layout, x: f32, y: f32) -> Result<()> { + for g in layout.glyphs() { + let (metrics, bitmap) = self.font.rasterize_config_subpixel(g.key); + for y1 in 0..metrics.height { + for x1 in 0..metrics.width { + let dst_x = x as usize + x1 + g.x as usize; + let dst_y = y as usize + y1 + g.y as usize; + let offset_src = (x1 + y1 * metrics.width) * 3; + unsafe { + let offset_dst = + 4 * dst_x + dst_y * (*self.next_frame).linesize[0] as usize; + let pixel_dst = (*self.next_frame).data[0].add(offset_dst); + *pixel_dst.offset(0) = bitmap[offset_src]; + *pixel_dst.offset(1) = bitmap[offset_src + 1]; + *pixel_dst.offset(2) = bitmap[offset_src + 2]; + } + } + } + } + Ok(()) + } + + /// Copy data directly into the frame buffer (must be RGBA data) + pub unsafe fn copy_frame_data(&mut self, data: &[u8]) -> Result<()> { + if self.next_frame.is_null() { + bail!("Must call begin() before writing frame data") + } + let buf = slice::from_raw_parts_mut( + (*self.next_frame).data[0], + (self.width as usize * self.height as usize * 4) as usize, + ); + if buf.len() < data.len() { + bail!("Frame buffer is too small"); + } + buf.copy_from_slice(data); + Ok(()) + } + + /// Generate audio to stay synchronized with video frames + unsafe fn generate_audio_frame(&mut self) -> Result<*mut AVFrame> { + const FREQUENCY: f32 = 440.0; // A4 note + + // audio is disabled if sample rate is 0 + if self.audio_sample_rate == 0 { + return Ok(ptr::null_mut()); + } + + // Calculate how many audio samples we need to cover the next video frame + let samples_per_frame = (self.audio_sample_rate as f32 / self.fps) as u64; + let next_frame_needs_samples = (self.frame_idx + 1) * samples_per_frame; + + // Generate audio if we don't have enough to cover the next video frame + if self.audio_samples < next_frame_needs_samples { + let audio_frame = av_frame_alloc(); + (*audio_frame).format = AV_SAMPLE_FMT_FLTP as _; + (*audio_frame).nb_samples = self.audio_frame_size as _; + (*audio_frame).duration = self.audio_frame_size as _; + (*audio_frame).sample_rate = self.audio_sample_rate as _; + (*audio_frame).pts = self.audio_samples as _; + (*audio_frame).time_base = AVRational { + num: 1, + den: self.audio_sample_rate as _, + }; + av_channel_layout_default(&mut (*audio_frame).ch_layout, self.audio_channels as _); + av_frame_get_buffer(audio_frame, 0); + + // Generate sine wave samples + let data = (*audio_frame).data[0] as *mut f32; + for i in 0..self.audio_frame_size { + let sample_time = + (self.audio_samples + i as u64) as f32 / self.audio_sample_rate as f32; + let sample_value = + (2.0 * std::f32::consts::PI * FREQUENCY * sample_time).sin() * 0.5; + *data.add(i as _) = sample_value; + } + + self.audio_samples += self.audio_frame_size as u64; + return Ok(audio_frame); + } + + Ok(ptr::null_mut()) + } + + /// Return the next frame for encoding (blocking) + pub unsafe fn next(&mut self) -> Result<*mut AVFrame> { + // set start time to now if this is the first call to next() + if self.frame_idx == 0 { + self.start = Instant::now(); + } + + // try to get audio frames before video frames (non-blocking) + let audio_frame = self.generate_audio_frame()?; + if !audio_frame.is_null() { + return Ok(audio_frame); + } + + // auto-init frame + if self.next_frame.is_null() { + self.begin()?; + } + + let stream_time = Duration::from_secs_f64(self.frame_idx as f64 / self.fps as f64); + let real_time = Instant::now().duration_since(self.start); + let wait_time = if stream_time > real_time { + stream_time - real_time + } else { + Duration::new(0, 0) + }; + if !wait_time.is_zero() && wait_time.as_secs_f32() > 1f32 / self.fps { + std::thread::sleep(wait_time); + } + + // convert to output pixel format, or just return internal frame if it matches output + if self.video_sample_fmt != transmute((*self.next_frame).format) { + let out_frame = self.scaler.process_frame( + self.next_frame, + self.width, + self.height, + self.video_sample_fmt, + )?; + av_frame_free(&mut self.next_frame); + self.next_frame = ptr::null_mut(); + self.frame_idx += 1; + Ok(out_frame) + } else { + let ret = self.next_frame; + self.next_frame = ptr::null_mut(); + self.frame_idx += 1; + Ok(ret) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; + + #[test] + fn test_frame_timing_synchronization() { + unsafe { + let fps = 30.0; + let sample_rate = 44100; + let frame_size = 1024; + let channels = 2; + + let mut gen = FrameGenerator::new( + fps, + 1280, + 720, + AV_PIX_FMT_YUV420P, + sample_rate, + frame_size, + channels, + ) + .unwrap(); + + let samples_per_frame = sample_rate as f64 / fps as f64; // Expected: 1470 samples per frame + println!("Expected samples per video frame: {:.2}", samples_per_frame); + + let mut audio_frames = 0; + let mut video_frames = 0; + let mut total_audio_samples = 0; + + // Generate frames for 2 seconds (60 video frames at 30fps) + for i in 0..120 { + let mut frame = gen.next().unwrap(); + + if (*frame).sample_rate > 0 { + // Audio frame + audio_frames += 1; + total_audio_samples += (*frame).nb_samples as u64; + println!( + "Frame {}: AUDIO - PTS: {}, samples: {}, total_samples: {}", + i, + (*frame).pts, + (*frame).nb_samples, + total_audio_samples + ); + } else { + // Video frame + video_frames += 1; + let expected_audio_samples = (video_frames as f64 * samples_per_frame) as u64; + let audio_deficit = if total_audio_samples >= expected_audio_samples { + 0 + } else { + expected_audio_samples - total_audio_samples + }; + + println!("Frame {}: VIDEO - PTS: {}, frame_idx: {}, expected_audio: {}, actual_audio: {}, deficit: {}", + i, (*frame).pts, video_frames, expected_audio_samples, total_audio_samples, audio_deficit); + + // Verify we have enough audio for this video frame + assert!( + total_audio_samples >= expected_audio_samples, + "Video frame {} needs {} audio samples but only have {}", + video_frames, + expected_audio_samples, + total_audio_samples + ); + } + + av_frame_free(&mut frame); + } + + println!("\nSummary:"); + println!("Video frames: {}", video_frames); + println!("Audio frames: {}", audio_frames); + println!("Total audio samples: {}", total_audio_samples); + println!( + "Expected audio samples for {} video frames: {:.2}", + video_frames, + video_frames as f64 * samples_per_frame + ); + + // Verify the ratio is correct + let expected_total_audio = video_frames as f64 * samples_per_frame; + let sample_accuracy = (total_audio_samples as f64 - expected_total_audio).abs(); + println!("Sample accuracy (difference): {:.2}", sample_accuracy); + + // Allow for some tolerance due to frame size constraints + assert!( + sample_accuracy < frame_size as f64, + "Audio sample count too far from expected: got {}, expected {:.2}, diff {:.2}", + total_audio_samples, + expected_total_audio, + sample_accuracy + ); + } + } + + #[test] + fn test_pts_progression() { + unsafe { + let fps = 30.0; + let sample_rate = 44100; + + let mut gen = + FrameGenerator::new(fps, 1280, 720, AV_PIX_FMT_YUV420P, sample_rate, 1024, 2) + .unwrap(); + + let mut last_audio_pts = -1i64; + let mut last_video_pts = -1i64; + let mut audio_pts_gaps = Vec::new(); + let mut video_pts_gaps = Vec::new(); + + // Generate 60 frames to test PTS progression + for _ in 0..60 { + let mut frame = gen.next().unwrap(); + + if (*frame).sample_rate > 0 { + // Audio frame - check PTS progression + if last_audio_pts >= 0 { + let gap = (*frame).pts - last_audio_pts; + audio_pts_gaps.push(gap); + println!("Audio PTS gap: {}", gap); + } + last_audio_pts = (*frame).pts; + } else { + // Video frame - check PTS progression + if last_video_pts >= 0 { + let gap = (*frame).pts - last_video_pts; + video_pts_gaps.push(gap); + println!("Video PTS gap: {}", gap); + } + last_video_pts = (*frame).pts; + } + + av_frame_free(&mut frame); + } + + // Verify audio PTS gaps are consistent (should be 1024 samples) + for gap in &audio_pts_gaps { + assert_eq!( + *gap, 1024, + "Audio PTS should increment by frame_size (1024)" + ); + } + + // Verify video PTS gaps are consistent (should be 1 frame) + for gap in &video_pts_gaps { + assert_eq!(*gap, 1, "Video PTS should increment by 1 frame"); + } + + println!("PTS progression test passed - all gaps are consistent"); + } + } +} diff --git a/crates/core/src/ingress/mod.rs b/crates/core/src/ingress/mod.rs index 4e665f0..a9ecf76 100644 --- a/crates/core/src/ingress/mod.rs +++ b/crates/core/src/ingress/mod.rs @@ -12,7 +12,6 @@ pub mod rtmp; #[cfg(feature = "srt")] pub mod srt; pub mod tcp; -#[cfg(feature = "test-pattern")] pub mod test; #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/crates/core/src/ingress/test.rs b/crates/core/src/ingress/test.rs index 683387f..f333bb0 100644 --- a/crates/core/src/ingress/test.rs +++ b/crates/core/src/ingress/test.rs @@ -1,23 +1,16 @@ +use crate::generator::FrameGenerator; use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; use anyhow::Result; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_RGB; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::{AV_PIX_FMT_RGBA, AV_PIX_FMT_YUV420P}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_alloc, av_frame_free, av_frame_get_buffer, av_packet_free, AVRational, - AV_PROFILE_H264_MAIN, -}; -use ffmpeg_rs_raw::{Encoder, Muxer, Scaler}; -use fontdue::layout::{CoordinateSystem, Layout, TextStyle}; -use fontdue::Font; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_frame_free, av_packet_free, AV_PROFILE_H264_MAIN}; +use ffmpeg_rs_raw::{Encoder, Muxer}; use log::info; use ringbuf::traits::{Observer, Split}; use ringbuf::{HeapCons, HeapRb}; use std::io::Read; use std::sync::Arc; -use std::time::{Duration, Instant}; use tiny_skia::Pixmap; use tokio::runtime::Handle; @@ -42,33 +35,31 @@ pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> } struct TestPatternSrc { + gen: FrameGenerator, video_encoder: Encoder, audio_encoder: Encoder, - scaler: Scaler, - muxer: Muxer, background: Pixmap, - font: [Font; 1], - frame_no: u64, - audio_sample_no: u64, - start: Instant, + muxer: Muxer, reader: HeapCons, } unsafe impl Send for TestPatternSrc {} const VIDEO_FPS: f32 = 30.0; +const VIDEO_WIDTH: u16 = 1280; +const VIDEO_HEIGHT: u16 = 720; +const SAMPLE_RATE: u32 = 44100; impl TestPatternSrc { pub fn new() -> Result { - let scaler = Scaler::new(); let video_encoder = unsafe { Encoder::new_with_name("libx264")? .with_stream_index(0) .with_framerate(VIDEO_FPS)? .with_bitrate(1_000_000) .with_pix_fmt(AV_PIX_FMT_YUV420P) - .with_width(1280) - .with_height(720) + .with_width(VIDEO_WIDTH as _) + .with_height(VIDEO_HEIGHT as _) .with_level(51) .with_profile(AV_PROFILE_H264_MAIN) .open(None)? @@ -80,22 +71,20 @@ impl TestPatternSrc { .with_default_channel_layout(1) .with_bitrate(128_000) .with_sample_format(AV_SAMPLE_FMT_FLTP) - .with_sample_rate(44100)? + .with_sample_rate(SAMPLE_RATE as _)? .open(None)? }; let svg_data = include_bytes!("../../test.svg"); let tree = usvg::Tree::from_data(svg_data, &Default::default())?; - let mut pixmap = Pixmap::new(1280, 720).unwrap(); + + let mut pixmap = Pixmap::new(VIDEO_WIDTH as _, VIDEO_HEIGHT as _).unwrap(); let render_ts = tiny_skia::Transform::from_scale( pixmap.width() as f32 / tree.size().width(), pixmap.height() as f32 / tree.size().height(), ); resvg::render(&tree, render_ts, &mut pixmap.as_mut()); - let font = include_bytes!("../../SourceCodePro-Regular.ttf") as &[u8]; - let font = Font::from_bytes(font, Default::default()).unwrap(); - let buf = HeapRb::new(1024 * 1024); let (writer, reader) = buf.split(); @@ -109,140 +98,51 @@ impl TestPatternSrc { m }; + let frame_size = unsafe { (*audio_encoder.codec_context()).frame_size as _ }; Ok(Self { + gen: FrameGenerator::new( + VIDEO_FPS, + VIDEO_WIDTH, + VIDEO_HEIGHT, + AV_PIX_FMT_YUV420P, + SAMPLE_RATE, + frame_size, + 1, + )?, video_encoder, audio_encoder, - scaler, muxer, background: pixmap, - font: [font], - frame_no: 0, - audio_sample_no: 0, - start: Instant::now(), reader, }) } pub unsafe fn next_pkt(&mut self) -> Result<()> { - let stream_time = Duration::from_secs_f64(self.frame_no as f64 / VIDEO_FPS as f64); - let real_time = Instant::now().duration_since(self.start); - let wait_time = if stream_time > real_time { - stream_time - real_time - } else { - Duration::new(0, 0) - }; - if !wait_time.is_zero() && wait_time.as_secs_f32() > 1f32 / VIDEO_FPS { - std::thread::sleep(wait_time); + self.gen.begin()?; + self.gen.copy_frame_data(self.background.data())?; + self.gen + .write_text(&format!("frame={}", self.gen.frame_no()), 40.0, 5.0, 5.0)?; + + let mut frame = self.gen.next()?; + if frame.is_null() { + return Ok(()); } - let mut src_frame = unsafe { - let src_frame = av_frame_alloc(); - - (*src_frame).width = 1280; - (*src_frame).height = 720; - (*src_frame).pict_type = AV_PICTURE_TYPE_NONE; - (*src_frame).key_frame = 1; - (*src_frame).colorspace = AVCOL_SPC_RGB; - (*src_frame).format = AV_PIX_FMT_RGBA as _; - (*src_frame).pts = self.frame_no as i64; - (*src_frame).duration = 1; - av_frame_get_buffer(src_frame, 0); - - self.background - .data() - .as_ptr() - .copy_to((*src_frame).data[0] as *mut _, 1280 * 720 * 4); - src_frame - }; - let mut layout = Layout::new(CoordinateSystem::PositiveYDown); - layout.clear(); - layout.append( - &self.font, - &TextStyle::new(&format!("frame={}", self.frame_no), 40.0, 0), - ); - for g in layout.glyphs() { - let (metrics, bitmap) = self.font[0].rasterize_config_subpixel(g.key); - for y in 0..metrics.height { - for x in 0..metrics.width { - let dst_x = x + g.x as usize; - let dst_y = y + g.y as usize; - let offset_src = (x + y * metrics.width) * 3; - unsafe { - let offset_dst = 4 * dst_x + dst_y * (*src_frame).linesize[0] as usize; - let pixel_dst = (*src_frame).data[0].add(offset_dst); - *pixel_dst.offset(0) = bitmap[offset_src]; - *pixel_dst.offset(1) = bitmap[offset_src + 1]; - *pixel_dst.offset(2) = bitmap[offset_src + 2]; - } - } - } - } - - // scale/encode video - let mut frame = self - .scaler - .process_frame(src_frame, 1280, 720, AV_PIX_FMT_YUV420P)?; - for mut pkt in self.video_encoder.encode_frame(frame)? { - self.muxer.write_packet(pkt)?; - av_packet_free(&mut pkt); - } - av_frame_free(&mut frame); - av_frame_free(&mut src_frame); - - // Generate and encode audio (sine wave) - self.generate_audio_frame()?; - - self.frame_no += 1; - - Ok(()) - } - - /// Generate audio to stay synchronized with video frames - unsafe fn generate_audio_frame(&mut self) -> Result<()> { - const SAMPLE_RATE: f32 = 44100.0; - const FREQUENCY: f32 = 440.0; // A4 note - const SAMPLES_PER_FRAME: usize = 1024; // Fixed AAC frame size - - // Calculate how many audio samples we should have by now - // At 30fps, each video frame = 1/30 sec = 1470 audio samples at 44.1kHz - let audio_samples_per_video_frame = (SAMPLE_RATE / VIDEO_FPS) as u64; // ~1470 samples - let target_audio_samples = self.frame_no * audio_samples_per_video_frame; - - // Generate audio frames to catch up to the target - while self.audio_sample_no < target_audio_samples { - let mut audio_frame = av_frame_alloc(); - (*audio_frame).format = AV_SAMPLE_FMT_FLTP as _; - (*audio_frame).nb_samples = SAMPLES_PER_FRAME as _; - (*audio_frame).ch_layout.nb_channels = 1; - (*audio_frame).sample_rate = SAMPLE_RATE as _; - (*audio_frame).pts = self.audio_sample_no as i64; - (*audio_frame).duration = 1; - (*audio_frame).time_base = AVRational { - num: 1, - den: SAMPLE_RATE as _, - }; - - av_frame_get_buffer(audio_frame, 0); - - // Generate sine wave samples - let data = (*audio_frame).data[0] as *mut f32; - for i in 0..SAMPLES_PER_FRAME { - let sample_time = (self.audio_sample_no + i as u64) as f32 / SAMPLE_RATE; - let sample_value = - (2.0 * std::f32::consts::PI * FREQUENCY * sample_time).sin() * 0.5; - *data.add(i) = sample_value; - } - - // Encode audio frame - for mut pkt in self.audio_encoder.encode_frame(audio_frame)? { + // if sample_rate is set this frame is audio + if (*frame).sample_rate > 0 { + for mut pkt in self.audio_encoder.encode_frame(frame)? { + self.muxer.write_packet(pkt)?; + av_packet_free(&mut pkt); + } + } else { + for mut pkt in self.video_encoder.encode_frame(frame)? { self.muxer.write_packet(pkt)?; av_packet_free(&mut pkt); } - - self.audio_sample_no += SAMPLES_PER_FRAME as u64; - av_frame_free(&mut audio_frame); } + av_frame_free(&mut frame); + Ok(()) } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 38d95c7..fc373d9 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -5,3 +5,4 @@ pub mod overseer; pub mod pipeline; pub mod variant; pub mod viewer; +mod generator; diff --git a/crates/core/src/overseer/mod.rs b/crates/core/src/overseer/mod.rs index 2ac226e..1062a5e 100644 --- a/crates/core/src/overseer/mod.rs +++ b/crates/core/src/overseer/mod.rs @@ -8,12 +8,6 @@ use std::cmp::PartialEq; use std::path::PathBuf; use uuid::Uuid; -#[cfg(feature = "local-overseer")] -mod local; - -#[cfg(feature = "webhook-overseer")] -mod webhook; - /// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr #[derive(PartialEq, Clone)] pub struct IngressInfo { @@ -32,6 +26,7 @@ pub struct IngressStream { pub height: usize, pub fps: f32, pub sample_rate: usize, + pub channels: u8, pub language: String, } diff --git a/crates/core/src/pipeline/mod.rs b/crates/core/src/pipeline/mod.rs index 57079ea..c2c978f 100644 --- a/crates/core/src/pipeline/mod.rs +++ b/crates/core/src/pipeline/mod.rs @@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; pub mod runner; -pub mod placeholder; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum EgressType { @@ -41,7 +40,7 @@ impl Display for EgressType { } } -#[derive(Clone, Debug, Serialize, Deserialize, Default)] +#[derive(Clone)] pub struct PipelineConfig { pub id: Uuid, /// Transcoded/Copied stream config @@ -49,7 +48,11 @@ pub struct PipelineConfig { /// Output muxers pub egress: Vec, /// Source stream information for placeholder generation - pub ingress_info: Option, + pub ingress_info: IngressInfo, + /// Primary source video stream + pub video_src: usize, + /// Primary audio source stream + pub audio_src: Option, } impl Display for PipelineConfig { diff --git a/crates/core/src/pipeline/placeholder.rs b/crates/core/src/pipeline/placeholder.rs deleted file mode 100644 index cfc26d2..0000000 --- a/crates/core/src/pipeline/placeholder.rs +++ /dev/null @@ -1,188 +0,0 @@ -use anyhow::{bail, Result}; -use crate::variant::video::VideoVariant; -use crate::variant::audio::AudioVariant; -use crate::overseer::{IngressStream, IngressStreamType}; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_alloc, av_frame_get_buffer, av_frame_free, av_get_sample_fmt, AVFrame, - AVPixelFormat, AVSampleFormat -}; -use std::ffi::CString; - -/// Placeholder frame generator for idle mode when stream disconnects -pub struct PlaceholderGenerator; - -impl PlaceholderGenerator { - /// Generate a placeholder video frame based on ingress stream info - pub unsafe fn generate_video_frame_from_stream( - stream: &IngressStream, - stream_time_base: (i32, i32), - frame_index: u64 - ) -> Result<*mut AVFrame> { - let frame = av_frame_alloc(); - if frame.is_null() { - bail!("Failed to allocate placeholder video frame"); - } - - (*frame).format = AVPixelFormat::AV_PIX_FMT_YUV420P as i32; - (*frame).width = stream.width as i32; - (*frame).height = stream.height as i32; - (*frame).time_base.num = stream_time_base.0; - (*frame).time_base.den = stream_time_base.1; - - // Set PTS based on frame rate and total frame index - let fps = if stream.fps > 0.0 { stream.fps } else { 30.0 }; - let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; - (*frame).pts = (frame_index as f64 / fps / time_base_f64) as i64; - - if av_frame_get_buffer(frame, 0) < 0 { - av_frame_free(&mut frame); - bail!("Failed to allocate buffer for placeholder video frame"); - } - - // Fill with black (Y=16, U=V=128 for limited range YUV420P) - let y_size = ((*frame).width * (*frame).height) as usize; - let uv_size = y_size / 4; - - if !(*frame).data[0].is_null() { - std::ptr::write_bytes((*frame).data[0], 16, y_size); - } - if !(*frame).data[1].is_null() { - std::ptr::write_bytes((*frame).data[1], 128, uv_size); - } - if !(*frame).data[2].is_null() { - std::ptr::write_bytes((*frame).data[2], 128, uv_size); - } - - Ok(frame) - } - - /// Generate a placeholder audio frame based on ingress stream info - pub unsafe fn generate_audio_frame_from_stream( - stream: &IngressStream, - stream_time_base: (i32, i32), - frame_index: u64, - sample_fmt: &str, - channels: u32 - ) -> Result<*mut AVFrame> { - let frame = av_frame_alloc(); - if frame.is_null() { - bail!("Failed to allocate placeholder audio frame"); - } - - // Use the provided sample format - let sample_fmt_cstr = CString::new(sample_fmt) - .map_err(|_| anyhow::anyhow!("Invalid sample format string"))?; - let sample_fmt_int = av_get_sample_fmt(sample_fmt_cstr.as_ptr()); - (*frame).format = sample_fmt_int; - (*frame).channels = channels as i32; - (*frame).sample_rate = stream.sample_rate as i32; - (*frame).nb_samples = 1024; // Standard audio frame size - (*frame).time_base.num = stream_time_base.0; - (*frame).time_base.den = stream_time_base.1; - - // Set PTS based on sample rate and frame index - let samples_per_second = stream.sample_rate as f64; - let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; - (*frame).pts = ((frame_index * 1024) as f64 / samples_per_second / time_base_f64) as i64; - - if av_frame_get_buffer(frame, 0) < 0 { - av_frame_free(&mut frame); - bail!("Failed to allocate buffer for placeholder audio frame"); - } - - // Fill with silence (zeros) - for i in 0..8 { - if !(*frame).data[i].is_null() && (*frame).linesize[i] > 0 { - std::ptr::write_bytes((*frame).data[i], 0, (*frame).linesize[i] as usize); - } - } - - Ok(frame) - } - - /// Generate a placeholder black video frame - pub unsafe fn generate_video_frame( - variant: &VideoVariant, - stream_time_base: (i32, i32), - frame_index: u64 - ) -> Result<*mut AVFrame> { - let frame = av_frame_alloc(); - if frame.is_null() { - bail!("Failed to allocate placeholder video frame"); - } - - (*frame).format = AVPixelFormat::AV_PIX_FMT_YUV420P as i32; - (*frame).width = variant.width as i32; - (*frame).height = variant.height as i32; - (*frame).time_base.num = stream_time_base.0; - (*frame).time_base.den = stream_time_base.1; - - // Set PTS based on frame rate and total frame index - let fps = if variant.fps > 0.0 { variant.fps } else { 30.0 }; - let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; - (*frame).pts = (frame_index as f64 / fps / time_base_f64) as i64; - - if av_frame_get_buffer(frame, 0) < 0 { - av_frame_free(&mut frame); - bail!("Failed to allocate buffer for placeholder video frame"); - } - - // Fill with black (Y=16, U=V=128 for limited range YUV420P) - let y_size = ((*frame).width * (*frame).height) as usize; - let uv_size = y_size / 4; - - if !(*frame).data[0].is_null() { - std::ptr::write_bytes((*frame).data[0], 16, y_size); - } - if !(*frame).data[1].is_null() { - std::ptr::write_bytes((*frame).data[1], 128, uv_size); - } - if !(*frame).data[2].is_null() { - std::ptr::write_bytes((*frame).data[2], 128, uv_size); - } - - Ok(frame) - } - - /// Generate a placeholder silent audio frame - pub unsafe fn generate_audio_frame( - variant: &AudioVariant, - stream_time_base: (i32, i32), - frame_index: u64 - ) -> Result<*mut AVFrame> { - let frame = av_frame_alloc(); - if frame.is_null() { - bail!("Failed to allocate placeholder audio frame"); - } - - // Use the sample format from the variant configuration - let sample_fmt_cstr = CString::new(variant.sample_fmt.as_str()) - .map_err(|_| anyhow::anyhow!("Invalid sample format string"))?; - let sample_fmt_int = av_get_sample_fmt(sample_fmt_cstr.as_ptr()); - (*frame).format = sample_fmt_int; - (*frame).channels = variant.channels as i32; - (*frame).sample_rate = variant.sample_rate as i32; - (*frame).nb_samples = 1024; // Standard audio frame size - (*frame).time_base.num = stream_time_base.0; - (*frame).time_base.den = stream_time_base.1; - - // Set PTS based on sample rate and frame index - let samples_per_second = variant.sample_rate as f64; - let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; - (*frame).pts = ((frame_index * 1024) as f64 / samples_per_second / time_base_f64) as i64; - - if av_frame_get_buffer(frame, 0) < 0 { - av_frame_free(&mut frame); - bail!("Failed to allocate buffer for placeholder audio frame"); - } - - // Fill with silence (zeros) - for i in 0..8 { - if !(*frame).data[i].is_null() && (*frame).linesize[i] > 0 { - std::ptr::write_bytes((*frame).data[i], 0, (*frame).linesize[i] as usize); - } - } - - Ok(frame) - } -} \ No newline at end of file diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 3d9288b..644e03b 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -10,18 +10,19 @@ use std::time::{Duration, Instant}; use crate::egress::hls::HlsEgress; use crate::egress::recorder::RecorderEgress; use crate::egress::{Egress, EgressResult}; +use crate::generator::FrameGenerator; use crate::ingress::ConnectionInfo; use crate::mux::SegmentType; use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use crate::pipeline::{EgressType, PipelineConfig}; use crate::variant::{StreamMapping, VariantStream}; -use crate::pipeline::placeholder::PlaceholderGenerator; -use anyhow::{bail, Result}; +use anyhow::{bail, Context, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVMediaType, + av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVFrame, AVMediaType, + AVStream, }; use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, @@ -32,15 +33,14 @@ use tokio::runtime::Handle; use uuid::Uuid; /// Runner state for handling normal vs idle modes -#[derive(Debug, Clone)] pub enum RunnerState { /// Normal operation - processing live stream Normal, /// Idle mode - generating placeholder content after disconnection Idle { start_time: Instant, - variant_index: usize, last_frame_time: Option, + gen: FrameGenerator, }, } @@ -129,142 +129,131 @@ impl PipelineRunner { }) } - /// Process a single idle frame - generates one source frame and processes it through all variants - unsafe fn process_single_idle_frame(&mut self, config: &PipelineConfig) -> Result<()> { - use std::time::{Duration, Instant}; - - if config.variants.is_empty() { - return Ok(()); - } + /// process the frame in the pipeline + unsafe fn process_frame( + &mut self, + config: &PipelineConfig, + stream: *mut AVStream, + frame: *mut AVFrame, + ) -> Result> { + // Copy frame from GPU if using hwaccel decoding + let mut frame = get_frame_from_hw(frame)?; + (*frame).time_base = (*stream).time_base; - // Extract timing info from current state - let (mut last_frame_time, variant_index) = match &mut self.state { - RunnerState::Idle { last_frame_time, variant_index, .. } => (last_frame_time, variant_index), - _ => return Ok(()), // Only process in idle state - }; + let p = (*stream).codecpar; + if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO { + // Conditionally generate thumbnails based on interval (0 = disabled) + if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { + let thumb_start = Instant::now(); + let dst_pic = PathBuf::from(&self.out_dir) + .join(config.id.to_string()) + .join("thumb.webp"); + { + let mut sw = Scaler::new(); + let mut scaled_frame = sw.process_frame( + frame, + (*frame).width as _, + (*frame).height as _, + AV_PIX_FMT_YUV420P, + )?; - // Time-based frame rate calculation - let now = Instant::now(); - if let Some(last_time) = *last_frame_time { - // Calculate target frame interval (assume 30fps for now) - let target_interval = Duration::from_millis(33); // ~30fps - let elapsed = now.duration_since(last_time); - - if elapsed < target_interval { - // Not time for next frame yet - std::thread::sleep(target_interval - elapsed); + let encoder = Encoder::new(AV_CODEC_ID_WEBP)? + .with_height((*scaled_frame).height) + .with_width((*scaled_frame).width) + .with_pix_fmt(transmute((*scaled_frame).format)) + .open(None)?; + + encoder.save_picture(scaled_frame, dst_pic.to_str().unwrap())?; + av_frame_free(&mut scaled_frame); + } + + let thumb_duration = thumb_start.elapsed(); + info!( + "Saved thumb ({:.2}ms) to: {}", + thumb_duration.as_millis() as f32 / 1000.0, + dst_pic.display(), + ); } - } - *last_frame_time = Some(Instant::now()); - // Get source video stream info from stored ingress info - let video_stream = config.ingress_info.as_ref() - .and_then(|info| info.streams.iter().find(|s| matches!(s.stream_type, crate::overseer::IngressStreamType::Video))); - - let mut egress_results = vec![]; - - // Generate one source frame and process it through all relevant variants - if let Some(stream) = video_stream { - // Generate a single source placeholder video frame based on original stream properties - let fps = if stream.fps > 0.0 { stream.fps } else { 30.0 }; - let time_base = (1, fps as i32); - let mut source_frame = PlaceholderGenerator::generate_video_frame_from_stream(stream, time_base, self.frame_ctr)?; - - // Set the frame time_base - (*source_frame).time_base.num = time_base.0; - (*source_frame).time_base.den = time_base.1; - - // Increment frame counter for all video processing self.frame_ctr += 1; - - // Process this single frame through all video variants (like normal pipeline) - for variant in &config.variants { - if let VariantStream::Video(v) = variant { - // Scale/encode the source frame for this variant - if let Some(enc) = self.encoders.get_mut(&v.id()) { - // Use scaler if needed for different resolutions - let frame_to_encode = if v.width as i32 == (*source_frame).width && - v.height as i32 == (*source_frame).height { - // Same resolution, use source frame directly - source_frame - } else { - // Different resolution, need to scale - if let Some(scaler) = self.scalers.get_mut(&v.id()) { - scaler.process_frame(source_frame, v.width, v.height, AV_PIX_FMT_YUV420P)? - } else { - source_frame // Fallback to source frame - } - }; - - let packets = enc.encode_frame(frame_to_encode)?; - for mut pkt in packets { - for eg in self.egress.iter_mut() { - let er = eg.process_pkt(pkt, &v.id())?; - egress_results.push(er); - } - av_packet_free(&mut pkt); - } - } - } - } - - av_frame_free(&mut source_frame); } - // Generate and process audio frames separately (audio doesn't share like video) - let audio_stream = config.ingress_info.as_ref() - .and_then(|info| info.streams.iter().find(|s| matches!(s.stream_type, crate::overseer::IngressStreamType::Audio))); - - for variant in &config.variants { - if let VariantStream::Audio(a) = variant { - let time_base = (1, a.sample_rate as i32); - let mut frame = if let Some(stream) = audio_stream { - // Use original stream properties for placeholder generation - PlaceholderGenerator::generate_audio_frame_from_stream(stream, time_base, self.frame_ctr, &a.sample_fmt, a.channels)? - } else { - // Fallback to variant properties if no stream info available - PlaceholderGenerator::generate_audio_frame(a, time_base, self.frame_ctr)? - }; - - // Set the frame time_base - (*frame).time_base.num = time_base.0; - (*frame).time_base.den = time_base.1; - - // Process through the encoding pipeline - if let Some(enc) = self.encoders.get_mut(&a.id()) { - let packets = enc.encode_frame(frame)?; - for mut pkt in packets { - for eg in self.egress.iter_mut() { - let er = eg.process_pkt(pkt, &a.id())?; - egress_results.push(er); - } - av_packet_free(&mut pkt); + let mut egress_results = Vec::new(); + // Get the variants which want this pkt + let pkt_vars = config + .variants + .iter() + .filter(|v| v.src_index() == (*stream).index as usize); + for var in pkt_vars { + let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) { + enc + } else { + //warn!("Frame had nowhere to go in {} :/", var.id()); + continue; + }; + + // scaling / resampling + let mut new_frame = false; + let mut frame = match var { + VariantStream::Video(v) => { + if let Some(s) = self.scalers.get_mut(&v.id()) { + new_frame = true; + s.process_frame(frame, v.width, v.height, transmute(v.pixel_format))? + } else { + frame } } - + VariantStream::Audio(a) => { + if let Some((r, f)) = self.resampler.get_mut(&a.id()) { + let frame_size = (*enc.codec_context()).frame_size; + new_frame = true; + let mut resampled_frame = r.process_frame(frame)?; + if let Some(ret) = f.buffer_frame(resampled_frame, frame_size as usize)? { + // Set correct timebase for audio (1/sample_rate) + (*ret).time_base.num = 1; + (*ret).time_base.den = a.sample_rate as i32; + av_frame_free(&mut resampled_frame); + ret + } else { + av_frame_free(&mut resampled_frame); + continue; + } + } else { + frame + } + } + _ => frame, + }; + + // before encoding frame, rescale timestamps + if !frame.is_null() { + let enc_ctx = enc.codec_context(); + (*frame).pict_type = AV_PICTURE_TYPE_NONE; + (*frame).pts = av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).pkt_dts = + av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).duration = + av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); + (*frame).time_base = (*enc_ctx).time_base; + } + + let packets = enc.encode_frame(frame)?; + // pass new packets to egress + for mut pkt in packets { + for eg in self.egress.iter_mut() { + let er = eg.process_pkt(pkt, &var.id())?; + egress_results.push(er); + } + av_packet_free(&mut pkt); + } + + if new_frame { av_frame_free(&mut frame); } } - - // Handle egress results (same as normal processing) - if !egress_results.is_empty() { - self.handle.block_on(async { - for er in egress_results { - if let EgressResult::Segments { created, deleted } = er { - if let Err(e) = self - .overseer - .on_segments(&config.id, &created, &deleted) - .await - { - bail!("Failed to process segment {}", e.to_string()); - } - } - } - Ok(()) - })?; - } - - Ok(()) + + av_frame_free(&mut frame); + Ok(egress_results) } /// EOF, cleanup @@ -297,23 +286,36 @@ impl PipelineRunner { self.setup()?; let config = if let Some(config) = &self.config { - config + config.clone() } else { bail!("Pipeline not configured, cannot run") }; // run transcoder pipeline - let (mut pkt, stream_info) = self.demuxer.get_packet()?; - + let (mut pkt, _) = self.demuxer.get_packet()?; + + let src_video_stream = config + .ingress_info + .streams + .iter() + .find(|s| s.index == config.video_src) + .unwrap(); + let src_audio_stream = config + .ingress_info + .streams + .iter() + .find(|s| Some(s.index) == config.audio_src); + // Handle state transitions based on packet availability match (&self.state, pkt.is_null()) { (RunnerState::Normal, true) => { // First time entering idle mode - info!("Stream input disconnected, entering idle mode with placeholder content"); + info!("Stream input disconnected, entering idle mode"); + self.state = RunnerState::Idle { start_time: Instant::now(), - variant_index: 0, last_frame_time: None, + gen: FrameGenerator::from_stream(src_video_stream, src_audio_stream)?, }; } (RunnerState::Idle { start_time, .. }, true) => { @@ -332,27 +334,23 @@ impl PipelineRunner { // Normal operation continues } } - + // Process based on current state - match &self.state { - RunnerState::Idle { .. } => { - // Process a single idle frame (rotating through variants) - self.process_single_idle_frame(config)?; - - // Free the null packet if needed - if !pkt.is_null() { - av_packet_free(&mut pkt); - } - - return Ok(true); // Continue processing + let result = match &mut self.state { + RunnerState::Idle { gen, .. } => { + let frame = gen.next()?; + let stream = if (*frame).sample_rate > 0 { + self.demuxer.get_stream( + src_audio_stream + .context("frame generator created an audio frame with no src stream")? + .index, + )? + } else { + self.demuxer.get_stream(src_video_stream.index)? + }; + self.process_frame(&config, stream, frame)? } RunnerState::Normal => { - // Normal packet processing - if pkt.is_null() { - // This shouldn't happen in Normal state but handle gracefully - return Ok(true); - } - // TODO: For copy streams, skip decoder let frames = match self.decoder.decode_pkt(pkt) { Ok(f) => f, @@ -364,133 +362,19 @@ impl PipelineRunner { let mut egress_results = vec![]; for (frame, stream) in frames { - // Copy frame from GPU if using hwaccel decoding - let mut frame = get_frame_from_hw(frame)?; - (*frame).time_base = (*stream).time_base; - - let p = (*stream).codecpar; - if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO { - // Conditionally generate thumbnails based on interval (0 = disabled) - if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { - let thumb_start = Instant::now(); - let dst_pic = PathBuf::from(&self.out_dir) - .join(config.id.to_string()) - .join("thumb.webp"); - { - let mut sw = Scaler::new(); - let mut scaled_frame = sw.process_frame( - frame, - (*frame).width as _, - (*frame).height as _, - AV_PIX_FMT_YUV420P, - )?; - - let mut encoder = Encoder::new(AV_CODEC_ID_WEBP)? - .with_height((*scaled_frame).height) - .with_width((*scaled_frame).width) - .with_pix_fmt(transmute((*scaled_frame).format)) - .open(None)?; - - encoder.save_picture(scaled_frame, dst_pic.to_str().unwrap())?; - av_frame_free(&mut scaled_frame); - } - - let thumb_duration = thumb_start.elapsed(); - info!( - "Saved thumb ({:.2}ms) to: {}", - thumb_duration.as_millis() as f32 / 1000.0, - dst_pic.display(), - ); + let results = self.process_frame(&config, stream, frame)?; + egress_results.extend(results); } - self.frame_ctr += 1; + av_packet_free(&mut pkt); + egress_results } - - // Get the variants which want this pkt - let pkt_vars = config - .variants - .iter() - .filter(|v| v.src_index() == (*stream).index as usize); - for var in pkt_vars { - let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) { - enc - } else { - //warn!("Frame had nowhere to go in {} :/", var.id()); - continue; - }; - - // scaling / resampling - let mut new_frame = false; - let mut frame = match var { - VariantStream::Video(v) => { - if let Some(s) = self.scalers.get_mut(&v.id()) { - new_frame = true; - s.process_frame(frame, v.width, v.height, transmute(v.pixel_format))? - } else { - frame - } - } - VariantStream::Audio(a) => { - if let Some((r, f)) = self.resampler.get_mut(&a.id()) { - let frame_size = (*enc.codec_context()).frame_size; - new_frame = true; - let mut resampled_frame = r.process_frame(frame)?; - if let Some(ret) = - f.buffer_frame(resampled_frame, frame_size as usize)? - { - // Set correct timebase for audio (1/sample_rate) - (*ret).time_base.num = 1; - (*ret).time_base.den = a.sample_rate as i32; - av_frame_free(&mut resampled_frame); - ret - } else { - av_frame_free(&mut resampled_frame); - continue; - } - } else { - frame - } - } - _ => frame, - }; - - // before encoding frame, rescale timestamps - if !frame.is_null() { - let enc_ctx = enc.codec_context(); - (*frame).pict_type = AV_PICTURE_TYPE_NONE; - (*frame).pts = - av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).pkt_dts = - av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).duration = - av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); - (*frame).time_base = (*enc_ctx).time_base; - } - - let packets = enc.encode_frame(frame)?; - // pass new packets to egress - for mut pkt in packets { - for eg in self.egress.iter_mut() { - let er = eg.process_pkt(pkt, &var.id())?; - egress_results.push(er); - } - av_packet_free(&mut pkt); - } - - if new_frame { - av_frame_free(&mut frame); - } - } - - av_frame_free(&mut frame); - } - - av_packet_free(&mut pkt); + }; // egress results - process async operations without blocking if possible - if !egress_results.is_empty() { + if !result.is_empty() { self.handle.block_on(async { - for er in egress_results { + for er in result { if let EgressResult::Segments { created, deleted } = er { if let Err(e) = self .overseer @@ -510,8 +394,6 @@ impl PipelineRunner { info!("Average fps: {:.2}", n_frames as f32 / elapsed); self.fps_counter_start = Instant::now(); self.fps_last_frame_ctr = self.frame_ctr; - } - } // Close the RunnerState::Normal match arm } Ok(true) } @@ -542,18 +424,16 @@ impl PipelineRunner { height: s.height, fps: s.fps, sample_rate: s.sample_rate, + channels: s.channels, language: s.language.clone(), }) .collect(), }; - let mut cfg = self + let cfg = self .handle .block_on(async { self.overseer.start_stream(&self.connection, &i_info).await })?; - - // Store ingress info in config for placeholder generation - cfg.ingress_info = Some(i_info.clone()); - + self.config = Some(cfg); self.info = Some(i_info); @@ -649,7 +529,10 @@ impl Drop for PipelineRunner { self.copy_stream.clear(); self.egress.clear(); - info!("PipelineRunner cleaned up resources for stream: {}", self.connection.key); + info!( + "PipelineRunner cleaned up resources for stream: {}", + self.connection.key + ); } } } diff --git a/crates/zap-stream/Cargo.toml b/crates/zap-stream/Cargo.toml index e36ba39..fe0c9e3 100644 --- a/crates/zap-stream/Cargo.toml +++ b/crates/zap-stream/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" default = ["srt", "rtmp", "test-pattern"] srt = ["zap-stream-core/srt"] rtmp = ["zap-stream-core/rtmp"] -test-pattern = ["zap-stream-core/test-pattern", "zap-stream-db/test-pattern"] +test-pattern = ["zap-stream-db/test-pattern"] [dependencies] zap-stream-db = { path = "../zap-stream-db" } diff --git a/crates/zap-stream/index.html b/crates/zap-stream/index.html index 8881675..b3a9587 100644 --- a/crates/zap-stream/index.html +++ b/crates/zap-stream/index.html @@ -9,45 +9,55 @@ color: white; font-family: monospace; } + .container { padding: 20px; max-width: 1200px; margin: 0 auto; } + .stream-list { margin: 20px 0; } + .stream-item { background: #333; margin: 10px 0; padding: 15px; border-radius: 5px; } + .stream-title { font-size: 18px; font-weight: bold; margin-bottom: 5px; } + .stream-link { color: #00ff00; text-decoration: none; } + .stream-link:hover { text-decoration: underline; } + .video-player { margin: 20px 0; max-width: 800px; } + video { width: 100%; max-width: 800px; background: #000; } + .no-streams { color: #999; font-style: italic; } + .player-section { margin-top: 30px; border-top: 1px solid #555; @@ -59,19 +69,24 @@

Welcome to {{public_url}}

- +

Active Streams

{{#has_streams}}
{{#streams}}
{{title}}
- {{#summary}}
{{summary}}
{{/summary}} + {{#summary}} +
{{summary}}
+ {{/summary}}
- 📺 {{live_url}} - {{#viewer_count}}👥 {{viewer_count}} viewers{{/viewer_count}} + {{live_url}} + {{#viewer_count}}{{viewer_count}} viewers{{/viewer_count}}
- +
{{/streams}}
@@ -79,15 +94,19 @@ {{^has_streams}}
No active streams
{{/has_streams}} - +

Stream Player

- - + +
@@ -104,12 +123,12 @@ hls = new Hls(); hls.loadSource(url); hls.attachMedia(video); - hls.on(Hls.Events.MANIFEST_PARSED, function() { + hls.on(Hls.Events.MANIFEST_PARSED, function () { video.play(); }); } else if (video.canPlayType('application/vnd.apple.mpegurl')) { video.src = url; - video.addEventListener('loadedmetadata', function() { + video.addEventListener('loadedmetadata', function () { video.play(); }); } else { diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index f471fb4..c369bda 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -11,18 +11,18 @@ use hyper::service::Service; use hyper::{Method, Request, Response}; use log::error; use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind}; -use serde::{Serialize, Deserialize}; +use serde::Serialize; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::fs::File; +use tokio::fs::File; use tokio::sync::RwLock; use tokio_util::io::ReaderStream; use zap_stream_core::viewer::ViewerTracker; -#[derive(Serialize)] +#[derive(Serialize, Clone)] struct StreamData { id: String, title: String, @@ -33,7 +33,7 @@ struct StreamData { viewer_count: Option, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] struct IndexTemplateData { public_url: String, has_streams: bool, @@ -41,7 +41,7 @@ struct IndexTemplateData { streams: Vec, } -struct CachedStreams { +pub struct CachedStreams { data: IndexTemplateData, cached_at: Instant, } @@ -57,7 +57,12 @@ pub struct HttpServer { } impl HttpServer { - pub fn new(index_template: String, files_dir: PathBuf, api: Api, stream_cache: StreamCache) -> Self { + pub fn new( + index_template: String, + files_dir: PathBuf, + api: Api, + stream_cache: StreamCache, + ) -> Self { Self { index_template, files_dir, @@ -70,8 +75,11 @@ impl HttpServer { Self::get_cached_or_fetch_streams_static(&self.stream_cache, &self.api).await } - async fn get_cached_or_fetch_streams_static(stream_cache: &StreamCache, api: &Api) -> Result { - const CACHE_DURATION: Duration = Duration::from_secs(60); // 1 minute + async fn get_cached_or_fetch_streams_static( + stream_cache: &StreamCache, + api: &Api, + ) -> Result { + const CACHE_DURATION: Duration = Duration::from_secs(10); // Check if we have valid cached data { @@ -86,7 +94,7 @@ impl HttpServer { // Cache is expired or missing, fetch new data let active_streams = api.get_active_streams().await?; let public_url = api.get_public_url(); - + let template_data = if !active_streams.is_empty() { let streams: Vec = active_streams .into_iter() @@ -94,10 +102,16 @@ impl HttpServer { let viewer_count = api.get_viewer_count(&stream.id); StreamData { id: stream.id.clone(), - title: stream.title.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])), + title: stream + .title + .unwrap_or_else(|| format!("Stream {}", &stream.id[..8])), summary: stream.summary, live_url: format!("/{}/live.m3u8", stream.id), - viewer_count: if viewer_count > 0 { Some(viewer_count) } else { None }, + viewer_count: if viewer_count > 0 { + Some(viewer_count as _) + } else { + None + }, } }) .collect(); @@ -140,13 +154,18 @@ impl HttpServer { playlist_path: &PathBuf, ) -> Result>, anyhow::Error> { // Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid) - let path_parts: Vec<&str> = req.uri().path().trim_start_matches('/').split('/').collect(); + let path_parts: Vec<&str> = req + .uri() + .path() + .trim_start_matches('/') + .split('/') + .collect(); if path_parts.len() < 2 { return Ok(Response::builder().status(404).body(BoxBody::default())?); } - + let stream_id = path_parts[0]; - + // Get client IP and User-Agent for tracking let client_ip = Self::get_client_ip(req); let user_agent = req @@ -179,9 +198,10 @@ impl HttpServer { // Read the playlist file let playlist_content = tokio::fs::read(playlist_path).await?; - + // Parse and modify playlist to add viewer token to URLs - let modified_content = Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?; + let modified_content = + Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?; Ok(Response::builder() .header("content-type", "application/vnd.apple.mpegurl") @@ -205,7 +225,7 @@ impl HttpServer { } } } - + if let Some(real_ip) = req.headers().get("x-real-ip") { if let Ok(ip_str) = real_ip.to_str() { return ip_str.to_string(); @@ -220,17 +240,18 @@ impl HttpServer { // Parse the M3U8 playlist using the m3u8-rs crate let (_, playlist) = m3u8_rs::parse_playlist(content) .map_err(|e| anyhow::anyhow!("Failed to parse M3U8 playlist: {}", e))?; - + match playlist { m3u8_rs::Playlist::MasterPlaylist(mut master) => { // For master playlists, add viewer token to variant streams for variant in &mut master.variants { variant.uri = Self::add_token_to_url(&variant.uri, viewer_token); } - + // Write the modified playlist back to string let mut output = Vec::new(); - master.write_to(&mut output) + master + .write_to(&mut output) .map_err(|e| anyhow::anyhow!("Failed to write master playlist: {}", e))?; String::from_utf8(output) .map_err(|e| anyhow::anyhow!("Failed to convert playlist to string: {}", e)) @@ -242,7 +263,7 @@ impl HttpServer { } } } - + fn add_token_to_url(url: &str, viewer_token: &str) -> String { if url.contains('?') { format!("{}&vt={}", url, viewer_token) @@ -264,7 +285,7 @@ impl Service> for HttpServer { { let stream_cache = self.stream_cache.clone(); let api = self.api.clone(); - + // Compile template outside async move for better performance let template = match mustache::compile_str(&self.index_template) { Ok(t) => t, @@ -272,40 +293,36 @@ impl Service> for HttpServer { error!("Failed to compile template: {}", e); return Box::pin(async move { Ok(Response::builder() - .status(500) - .body(BoxBody::default()).unwrap()) + .status(500) + .body(BoxBody::default()) + .unwrap()) }); } }; - + return Box::pin(async move { // Use the existing method to get cached template data - let template_data = Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await; + let template_data = + Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await; match template_data { - Ok(data) => { - match template.render_to_string(&data) { - Ok(index_html) => Ok(Response::builder() - .header("content-type", "text/html") - .header("server", "zap-stream-core") - .body( - Full::new(Bytes::from(index_html)) - .map_err(|e| match e {}) - .boxed(), - )?), - Err(e) => { - error!("Failed to render template: {}", e); - Ok(Response::builder() - .status(500) - .body(BoxBody::default())?) - } + Ok(data) => match template.render_to_string(&data) { + Ok(index_html) => Ok(Response::builder() + .header("content-type", "text/html") + .header("server", "zap-stream-core") + .body( + Full::new(Bytes::from(index_html)) + .map_err(|e| match e {}) + .boxed(), + )?), + Err(e) => { + error!("Failed to render template: {}", e); + Ok(Response::builder().status(500).body(BoxBody::default())?) } - } + }, Err(e) => { error!("Failed to fetch template data: {}", e); - Ok(Response::builder() - .status(500) - .body(BoxBody::default())?) + Ok(Response::builder().status(500).body(BoxBody::default())?) } } }); @@ -415,12 +432,21 @@ pub fn check_nip98_auth(req: &Request, public_url: &str) -> Result format!("{}{}?{}", public_url.trim_end_matches('/'), req.uri().path(), query), + Some(query) => format!( + "{}{}?{}", + public_url.trim_end_matches('/'), + req.uri().path(), + query + ), None => format!("{}{}", public_url.trim_end_matches('/'), req.uri().path()), }; if !url_tag.eq_ignore_ascii_case(&request_uri) { - bail!("Invalid nostr event, URL tag invalid. Expected: {}, Got: {}", request_uri, url_tag); + bail!( + "Invalid nostr event, URL tag invalid. Expected: {}, Got: {}", + request_uri, + url_tag + ); } // Check method tag diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index c3f944c..207910b 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -17,14 +17,14 @@ use url::Url; use uuid::Uuid; use zap_stream_core::egress::{EgressConfig, EgressSegment}; use zap_stream_core::ingress::ConnectionInfo; -use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer}; +use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use zap_stream_core::pipeline::{EgressType, PipelineConfig}; use zap_stream_core::variant::audio::AudioVariant; use zap_stream_core::variant::mapping::VariantMapping; use zap_stream_core::variant::video::VideoVariant; use zap_stream_core::variant::{StreamMapping, VariantStream}; use zap_stream_core::viewer::ViewerTracker; -use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; +use zap_stream_db::{IngestEndpoint, UserStream, UserStreamState, ZapStreamDb}; const STREAM_EVENT_KIND: u16 = 30_311; @@ -353,22 +353,18 @@ impl Overseer for ZapStreamOverseer { } // Get ingest endpoint configuration based on connection type - let endpoint_id = self - .detect_endpoint(&connection) - .await? - .ok_or_else(|| anyhow::anyhow!("No ingest endpoints configured"))?; - let endpoint = self - .db - .get_ingest_endpoint(endpoint_id) - .await? - .ok_or_else(|| anyhow::anyhow!("Ingest endpoint not found"))?; + let endpoint = self.detect_endpoint(&connection).await?; - let variants = get_variants_from_endpoint(&stream_info, &endpoint)?; + let cfg = get_variants_from_endpoint(&stream_info, &endpoint)?; + + if cfg.video_src.is_none() || cfg.variants.is_empty() { + bail!("No video src found"); + } let mut egress = vec![]; egress.push(EgressType::HLS(EgressConfig { name: "hls".to_string(), - variants: variants.iter().map(|v| v.id()).collect(), + variants: cfg.variants.iter().map(|v| v.id()).collect(), })); let stream_id = Uuid::new_v4(); @@ -378,7 +374,7 @@ impl Overseer for ZapStreamOverseer { user_id: uid, starts: Utc::now(), state: UserStreamState::Live, - endpoint_id: Some(endpoint_id), + endpoint_id: Some(endpoint.id), ..Default::default() }; let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; @@ -399,8 +395,11 @@ impl Overseer for ZapStreamOverseer { Ok(PipelineConfig { id: stream_id, - variants, + variants: cfg.variants, egress, + ingress_info: stream_info.clone(), + video_src: cfg.video_src.unwrap().index, + audio_src: cfg.audio_src.map(|s| s.index), }) } @@ -525,25 +524,29 @@ impl Overseer for ZapStreamOverseer { impl ZapStreamOverseer { /// Detect which ingest endpoint should be used based on connection info - async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result> { - // Get all ingest endpoints and match by name against connection endpoint + async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result { let endpoints = self.db.get_ingest_endpoints().await?; - for endpoint in &endpoints { - if endpoint.name == connection.endpoint { - return Ok(Some(endpoint.id)); - } - } - - // No matching endpoint found, use the most expensive one - Ok(endpoints.into_iter().max_by_key(|e| e.cost).map(|e| e.id)) + let default = endpoints.iter().max_by_key(|e| e.cost); + Ok(endpoints + .iter() + .find(|e| e.name == connection.endpoint) + .or(default) + .unwrap() + .clone()) } } -fn get_variants_from_endpoint( - info: &IngressInfo, +struct EndpointConfig<'a> { + video_src: Option<&'a IngressStream>, + audio_src: Option<&'a IngressStream>, + variants: Vec, +} + +fn get_variants_from_endpoint<'a>( + info: &'a IngressInfo, endpoint: &zap_stream_db::IngestEndpoint, -) -> Result> { +) -> Result> { let capabilities_str = endpoint.capabilities.as_deref().unwrap_or(""); let capabilities: Vec<&str> = capabilities_str.split(',').collect(); @@ -658,5 +661,9 @@ fn get_variants_from_endpoint( // Handle other capabilities like dvr:720h here if needed } - Ok(vars) + Ok(EndpointConfig { + audio_src, + video_src, + variants: vars, + }) }