From e91c40806f4ae4fbac97633ffe31899156d32551 Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 17 Jun 2025 12:57:02 +0100 Subject: [PATCH] fix: low latency generator always running chore: add HLS generation tests --- Cargo.lock | 123 +++- crates/core/Cargo.toml | 5 + crates/core/src/generator.rs | 31 +- crates/core/src/lib.rs | 2 + crates/core/src/mux/hls.rs | 9 +- crates/core/src/test_hls_timing.rs | 933 +++++++++++++++++++++++++++++ 6 files changed, 1053 insertions(+), 50 deletions(-) create mode 100644 crates/core/src/test_hls_timing.rs diff --git a/Cargo.lock b/Cargo.lock index b8ee98c..26a77ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,7 +182,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -193,7 +193,7 @@ checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -365,7 +365,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -674,7 +674,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -906,7 +906,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -938,7 +938,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -974,6 +974,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log 0.4.25", + "regex", +] + [[package]] name = "env_logger" version = "0.10.2" @@ -987,6 +997,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "env_logger" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log 0.4.25", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1282,7 +1305,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -1897,7 +1920,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -2026,6 +2049,30 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jiff" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" +dependencies = [ + "jiff-static", + "log 0.4.25", + "portable-atomic", + "portable-atomic-util", + "serde", +] + +[[package]] +name = "jiff-static" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.103", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -2443,7 +2490,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -2576,7 +2623,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -2623,7 +2670,7 @@ checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -2695,6 +2742,15 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -2710,7 +2766,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" dependencies = [ - "env_logger", + "env_logger 0.10.2", "log 0.4.25", ] @@ -2721,7 +2777,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6924ced06e1f7dfe3fa48d57b9f74f55d8915f5036121bef647ef4b204895fac" dependencies = [ "proc-macro2", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -2760,7 +2816,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.96", + "syn 2.0.103", "tempfile", ] @@ -2774,7 +2830,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -3335,7 +3391,7 @@ checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -3583,7 +3639,7 @@ dependencies = [ "quote", "sqlx-core", "sqlx-macros-core", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -3606,7 +3662,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 2.0.96", + "syn 2.0.103", "tempfile", "tokio", "url", @@ -3831,9 +3887,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.96" +version = "2.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" +checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8" dependencies = [ "proc-macro2", "quote", @@ -3875,7 +3931,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -3954,7 +4010,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -3965,7 +4021,7 @@ checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -4064,7 +4120,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -4238,7 +4294,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -4308,7 +4364,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -4595,7 +4651,7 @@ dependencies = [ "log 0.4.25", "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", "wasm-bindgen-shared", ] @@ -4630,7 +4686,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4993,7 +5049,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", "synstructure 0.13.1", ] @@ -5041,8 +5097,10 @@ dependencies = [ "async-trait", "bytes", "data-encoding", + "env_logger 0.11.8", "ffmpeg-rs-raw", "fontdue", + "futures", "futures-util", "hex", "itertools 0.14.0", @@ -5055,6 +5113,7 @@ dependencies = [ "serde", "sha2 0.10.8", "srt-tokio", + "tempfile", "tiny-skia", "tokio", "usvg", @@ -5090,7 +5149,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] @@ -5110,7 +5169,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", "synstructure 0.13.1", ] @@ -5139,7 +5198,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.96", + "syn 2.0.103", ] [[package]] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 5196885..781901f 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -37,4 +37,9 @@ srt-tokio = { version = "0.4.4", optional = true } rml_rtmp = { version = "0.8.0", optional = true } bytes = "1.9.0" xflv = "0.4.4" +futures = "0.3.30" + +[dev-dependencies] +tempfile = "3.8.1" +env_logger = "0.11.3" diff --git a/crates/core/src/generator.rs b/crates/core/src/generator.rs index 39fb469..3b27f2e 100644 --- a/crates/core/src/generator.rs +++ b/crates/core/src/generator.rs @@ -22,6 +22,7 @@ pub struct FrameGenerator { width: u16, height: u16, video_sample_fmt: AVPixelFormat, + realtime: bool, audio_sample_rate: u32, audio_frame_size: i32, @@ -71,6 +72,7 @@ impl FrameGenerator { fps, width, height, + realtime: true, video_sample_fmt: pix_fmt, audio_sample_rate: sample_rate, audio_frame_size: frame_size, @@ -86,6 +88,10 @@ impl FrameGenerator { }) } + pub fn set_realtime(&mut self, realtime: bool) { + self.realtime = realtime; + } + pub fn from_stream( video_stream: &IngressStream, audio_stream: Option<&IngressStream>, @@ -263,6 +269,7 @@ impl FrameGenerator { } Ok(()) } + /// Copy data directly into the frame buffer (must be RGBA data) pub unsafe fn copy_frame_data(&mut self, data: &[u8]) -> Result<()> { if self.next_frame.is_null() { @@ -354,17 +361,19 @@ impl FrameGenerator { self.begin()?; } - let stream_time = Duration::from_secs_f64( - self.video_pts as f64 / self.pts_per_frame() as f64 / self.fps as f64, - ); - let real_time = self.start.elapsed(); - let wait_time = if stream_time > real_time { - stream_time - real_time - } else { - Duration::new(0, 0) - }; - if !wait_time.is_zero() && wait_time.as_secs_f32() > 1f32 / self.fps { - std::thread::sleep(wait_time); + if self.realtime { + let stream_time = Duration::from_secs_f64( + self.video_pts as f64 / self.pts_per_frame() as f64 / self.fps as f64, + ); + let real_time = self.start.elapsed(); + let wait_time = if stream_time > real_time { + stream_time - real_time + } else { + Duration::new(0, 0) + }; + if !wait_time.is_zero() && wait_time.as_secs_f32() > 1f32 / self.fps { + std::thread::sleep(wait_time); + } } // convert to output pixel format, or just return internal frame if it matches output diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index a26f13e..1d40868 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,5 +4,7 @@ pub mod ingress; pub mod mux; pub mod overseer; pub mod pipeline; +#[cfg(test)] +pub mod test_hls_timing; pub mod variant; pub mod viewer; diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index 0403012..7269218 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -360,14 +360,9 @@ impl HlsVariant { // check if current packet is keyframe, flush current segment if can_split && cur_duration >= self.segment_length as f64 { result = self.split_next_seg(pkt_pts)?; - } else if cur_part_duration >= self.partial_target_duration as f64 { + } else if self.low_latency && cur_part_duration >= self.partial_target_duration as f64 { result = self.create_partial_segment(pkt_pts)?; - - // HLS-LL: Mark next partial as independent if this packet is a keyframe - if can_split { - info!("Next partial is independent"); - self.next_partial_independent = true; - } + self.next_partial_independent = can_split; } } diff --git a/crates/core/src/test_hls_timing.rs b/crates/core/src/test_hls_timing.rs new file mode 100644 index 0000000..aae4141 --- /dev/null +++ b/crates/core/src/test_hls_timing.rs @@ -0,0 +1,933 @@ +use crate::generator::FrameGenerator; +use crate::mux::{HlsMuxer, SegmentType}; +use crate::variant::audio::AudioVariant; +use crate::variant::mapping::VariantMapping; +use crate::variant::video::VideoVariant; +use crate::variant::{StreamMapping, VariantStream}; +use anyhow::{Context, Result}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_q2d, AVMediaType::AVMEDIA_TYPE_AUDIO, AVMediaType::AVMEDIA_TYPE_VIDEO, + AVPixelFormat::AV_PIX_FMT_YUV420P, AVRational, AVSampleFormat::AV_SAMPLE_FMT_FLTP, + AV_NOPTS_VALUE, AV_PROFILE_H264_MAIN, +}; +use ffmpeg_rs_raw::{Demuxer, Encoder}; +use m3u8_rs::{parse_media_playlist, MediaSegmentType}; +use std::collections::HashMap; +use std::fs; +use std::io::Read; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub struct HlsTimingResult { + pub playlist_duration: f32, + pub actual_duration: f64, + pub video_duration: f64, + pub audio_duration: f64, + pub difference: f64, + pub segment_name: String, + pub is_partial: bool, + pub independent: bool, +} + +#[derive(Debug)] +pub struct HlsTimingTestResult { + pub total_segments: usize, + pub full_segments: usize, + pub partial_segments: usize, + pub independent_partials: usize, + pub total_playlist_duration: f32, + pub total_actual_duration: f64, + pub total_difference: f64, + pub average_difference: f64, + pub min_difference: f64, + pub max_difference: f64, + pub problematic_segments: Vec, + pub segments: Vec, + pub test_duration: Duration, + pub success: bool, + pub error_message: Option, +} + +impl HlsTimingTestResult { + /// Check if the HLS timing test passed based on thresholds + pub fn passes(&self, max_avg_diff: f64, max_individual_diff: f64) -> bool { + self.success + && self.average_difference.abs() <= max_avg_diff + && self + .problematic_segments + .iter() + .all(|s| s.difference.abs() <= max_individual_diff) + } + + /// Get a summary of the test results + pub fn summary(&self) -> String { + if !self.success { + return format!( + "FAILED: {}", + self.error_message.as_deref().unwrap_or("Unknown error") + ); + } + + format!( + "PASSED: {} segments, avg diff: {:.3}s, {} problematic", + self.total_segments, + self.average_difference, + self.problematic_segments.len() + ) + } +} + +pub struct HlsTimingTester { + max_avg_difference: f64, + max_individual_difference: f64, + problematic_threshold: f64, +} + +impl Default for HlsTimingTester { + fn default() -> Self { + Self { + max_avg_difference: 0.1, // 100ms average difference + max_individual_difference: 0.5, // 500ms individual difference + problematic_threshold: 0.2, // 200ms considered problematic + } + } +} + +impl HlsTimingTester { + pub fn new(max_avg_diff: f64, max_individual_diff: f64, problematic_threshold: f64) -> Self { + Self { + max_avg_difference: max_avg_diff, + max_individual_difference: max_individual_diff, + problematic_threshold, + } + } + + /// Generate and test HLS stream with test pattern + pub fn test_generated_stream( + &self, + output_dir: &Path, + duration_seconds: f32, + segment_type: SegmentType, + ) -> Result { + let start_time = Instant::now(); + + // Generate test stream + let stream_id = Uuid::new_v4(); + let hls_dir = + self.generate_test_stream(output_dir, &stream_id, duration_seconds, segment_type)?; + + // Test the generated stream + match self.test_stream_timing_internal(&hls_dir) { + Ok(mut result) => { + result.test_duration = start_time.elapsed(); + result.success = + result.passes(self.max_avg_difference, self.max_individual_difference); + Ok(result) + } + Err(e) => Ok(HlsTimingTestResult { + total_segments: 0, + full_segments: 0, + partial_segments: 0, + independent_partials: 0, + total_playlist_duration: 0.0, + total_actual_duration: 0.0, + total_difference: 0.0, + average_difference: 0.0, + min_difference: 0.0, + max_difference: 0.0, + problematic_segments: Vec::new(), + segments: Vec::new(), + test_duration: start_time.elapsed(), + success: false, + error_message: Some(e.to_string()), + }), + } + } + + /// Generate test HLS stream with test pattern + fn generate_test_stream( + &self, + output_dir: &Path, + stream_id: &Uuid, + duration_seconds: f32, + segment_type: SegmentType, + ) -> Result { + const VIDEO_FPS: f32 = 30.0; + const VIDEO_WIDTH: u16 = 1280; + const VIDEO_HEIGHT: u16 = 720; + const SAMPLE_RATE: u32 = 44100; + + // Create video encoder + let mut video_encoder = unsafe { + Encoder::new_with_name("libx264")? + .with_stream_index(0) + .with_framerate(VIDEO_FPS)? + .with_bitrate(1_000_000) + .with_pix_fmt(AV_PIX_FMT_YUV420P) + .with_width(VIDEO_WIDTH as _) + .with_height(VIDEO_HEIGHT as _) + .with_level(51) + .with_profile(AV_PROFILE_H264_MAIN) + .open(None)? + }; + + // Create audio encoder + let mut audio_encoder = unsafe { + Encoder::new_with_name("aac")? + .with_stream_index(1) + .with_default_channel_layout(1) + .with_bitrate(128_000) + .with_sample_format(AV_SAMPLE_FMT_FLTP) + .with_sample_rate(SAMPLE_RATE as _)? + .open(None)? + }; + + // Create variant streams + let video_stream = VideoVariant { + mapping: VariantMapping { + id: Uuid::new_v4(), + src_index: 0, + dst_index: 0, + group_id: 0, + }, + width: VIDEO_WIDTH, + height: VIDEO_HEIGHT, + fps: VIDEO_FPS, + bitrate: 1_000_000, + codec: "libx264".to_string(), + profile: AV_PROFILE_H264_MAIN as usize, + level: 51, + keyframe_interval: 60, + pixel_format: AV_PIX_FMT_YUV420P as u32, + }; + + let audio_stream = AudioVariant { + mapping: VariantMapping { + id: Uuid::new_v4(), + src_index: 1, + dst_index: 1, + group_id: 0, + }, + bitrate: 128_000, + codec: "aac".to_string(), + channels: 1, + sample_rate: SAMPLE_RATE as usize, + sample_fmt: "fltp".to_string(), + }; + + let video_variant = VariantStream::Video(video_stream.clone()); + let audio_variant = VariantStream::Audio(audio_stream.clone()); + let variants = vec![ + (&video_variant, &video_encoder), + (&audio_variant, &audio_encoder), + ]; + + // Create HLS muxer + let mut hls_muxer = HlsMuxer::new( + &stream_id, + output_dir.to_str().unwrap(), + variants.into_iter(), + segment_type, + )?; + + // Create frame generator + let frame_size = unsafe { (*audio_encoder.codec_context()).frame_size as _ }; + let mut frame_gen = FrameGenerator::new( + VIDEO_FPS, + VIDEO_WIDTH, + VIDEO_HEIGHT, + AV_PIX_FMT_YUV420P, + SAMPLE_RATE, + frame_size, + 1, + AVRational { + num: 1, + den: VIDEO_FPS as i32, + }, + AVRational { + num: 1, + den: SAMPLE_RATE as i32, + }, + )?; + frame_gen.set_realtime(false); + + // Generate frames for the specified duration + let total_video_frames = (duration_seconds * VIDEO_FPS) as u64; + let mut video_frames_generated = 0; + + while video_frames_generated < total_video_frames { + unsafe { + frame_gen.begin()?; + frame_gen.write_text( + &format!("Video Frame: {}", video_frames_generated), + 40.0, + 50.0, + 50.0, + )?; + frame_gen.write_text( + &format!("Time: {:.1}s", video_frames_generated as f32 / VIDEO_FPS), + 40.0, + 50.0, + 100.0, + )?; + + let mut frame = frame_gen.next()?; + if frame.is_null() { + log::warn!("FrameGenerator returned null frame unexpectedly"); + break; + } + + // Determine if this is audio or video frame and encode accordingly + if (*frame).sample_rate > 0 { + // Audio frame - don't increment video counter + log::debug!("Generated audio frame, PTS: {}", (*frame).pts); + for mut pkt in audio_encoder.encode_frame(frame)? { + let result = hls_muxer.mux_packet(pkt, &audio_stream.id())?; + if let crate::egress::EgressResult::Segments { + created, + deleted: _, + } = result + { + for segment in created { + log::debug!("Created audio segment: {:?}", segment.path); + } + } + ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt); + } + } else { + // Video frame - increment video counter + log::debug!( + "Generated video frame {}, PTS: {}", + video_frames_generated, + (*frame).pts + ); + for mut pkt in video_encoder.encode_frame(frame)? { + let result = hls_muxer.mux_packet(pkt, &video_stream.id())?; + if let crate::egress::EgressResult::Segments { + created, + deleted: _, + } = result + { + for segment in created { + log::debug!("Created video segment: {:?}", segment.path); + } + } + ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt); + } + video_frames_generated += 1; + } + + ffmpeg_rs_raw::ffmpeg_sys_the_third::av_frame_free(&mut frame); + } + } + + // Flush encoders to ensure all packets are written + unsafe { + // Flush video encoder + for mut pkt in video_encoder.encode_frame(std::ptr::null_mut())? { + hls_muxer.mux_packet(pkt, &video_stream.id())?; + ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt); + } + + // Flush audio encoder + for mut pkt in audio_encoder.encode_frame(std::ptr::null_mut())? { + hls_muxer.mux_packet(pkt, &audio_stream.id())?; + ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt); + } + } + + log::info!( + "Generated {} video frames ({:.1}s) of test HLS stream at", + video_frames_generated, + video_frames_generated as f32 / VIDEO_FPS + ); + + Ok(output_dir.join(stream_id.to_string()).join("stream_0")) + } + + /// Test HLS timing for a specific stream directory + pub fn test_stream_timing(&self, hls_dir: &Path) -> HlsTimingTestResult { + let start_time = Instant::now(); + + match self.test_stream_timing_internal(hls_dir) { + Ok(mut result) => { + result.test_duration = start_time.elapsed(); + result.success = + result.passes(self.max_avg_difference, self.max_individual_difference); + result + } + Err(e) => HlsTimingTestResult { + total_segments: 0, + full_segments: 0, + partial_segments: 0, + independent_partials: 0, + total_playlist_duration: 0.0, + total_actual_duration: 0.0, + total_difference: 0.0, + average_difference: 0.0, + min_difference: 0.0, + max_difference: 0.0, + problematic_segments: Vec::new(), + segments: Vec::new(), + test_duration: start_time.elapsed(), + success: false, + error_message: Some(e.to_string()), + }, + } + } + + fn test_stream_timing_internal(&self, hls_dir: &Path) -> Result { + let playlist_path = hls_dir.join("live.m3u8"); + + if !playlist_path.exists() { + return Err(anyhow::anyhow!( + "Playlist file does not exist: {:?}", + playlist_path + )); + } + + // Parse the playlist + let playlist_content = + fs::read_to_string(&playlist_path).context("Failed to read playlist file")?; + + let (_, playlist) = parse_media_playlist(playlist_content.as_bytes()) + .map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?; + + let mut segments = Vec::new(); + let mut total_playlist_duration = 0.0f32; + let mut total_actual_duration = 0.0f64; + + // Analyze each segment + for segment_type in &playlist.segments { + match segment_type { + MediaSegmentType::Full(segment) => { + let segment_path = hls_dir.join(&segment.uri); + + if !segment_path.exists() { + continue; // Skip missing segments + } + + let durations = self.analyze_segment(&segment_path)?; + let actual_duration = durations.total_duration; + let video_duration = durations.video_duration; + let audio_duration = durations.audio_duration; + let playlist_duration = segment.duration; + let difference = actual_duration - playlist_duration as f64; + + let result = HlsTimingResult { + playlist_duration, + actual_duration, + video_duration, + audio_duration, + difference, + segment_name: segment.uri.clone(), + is_partial: false, + independent: false, + }; + + segments.push(result); + total_playlist_duration += playlist_duration; + total_actual_duration += actual_duration; + } + MediaSegmentType::Partial(partial) => { + let segment_path = hls_dir.join(&partial.uri); + + if !segment_path.exists() { + continue; // Skip missing segments + } + + let durations = if let Some(byte_range) = &partial.byte_range { + self.analyze_partial_segment( + &segment_path, + byte_range.length, + byte_range.offset, + )? + } else { + self.analyze_segment(&segment_path)? + }; + + let actual_duration = durations.total_duration; + let video_duration = durations.video_duration; + let audio_duration = durations.audio_duration; + let playlist_duration = partial.duration as f32; + let difference = actual_duration - playlist_duration as f64; + + let result = HlsTimingResult { + playlist_duration, + actual_duration, + video_duration, + audio_duration, + difference, + segment_name: partial.uri.clone(), + is_partial: true, + independent: partial.independent, + }; + + segments.push(result); + total_playlist_duration += playlist_duration; + total_actual_duration += actual_duration; + } + MediaSegmentType::PreloadHint(_) => { + // Skip preload hints + continue; + } + } + } + + // Calculate statistics + let full_segments = segments.iter().filter(|s| !s.is_partial).count(); + let partial_segments = segments.iter().filter(|s| s.is_partial).count(); + let independent_partials = segments + .iter() + .filter(|s| s.is_partial && s.independent) + .count(); + let total_difference = total_actual_duration - total_playlist_duration as f64; + let average_difference = if !segments.is_empty() { + total_difference / segments.len() as f64 + } else { + 0.0 + }; + + let differences: Vec = segments.iter().map(|s| s.difference).collect(); + let min_difference = differences.iter().fold(f64::INFINITY, |a, &b| a.min(b)); + let max_difference = differences.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)); + + // Find problematic segments + let problematic_segments: Vec = segments + .iter() + .filter(|s| s.difference.abs() > self.problematic_threshold) + .cloned() + .collect(); + + Ok(HlsTimingTestResult { + total_segments: segments.len(), + full_segments, + partial_segments, + independent_partials, + total_playlist_duration, + total_actual_duration, + total_difference, + average_difference, + min_difference, + max_difference, + problematic_segments, + segments, + test_duration: Duration::from_secs(0), // Will be set by caller + success: true, // Will be determined by caller + error_message: None, + }) + } + + /// Test multiple HLS streams concurrently + pub async fn test_multiple_streams( + &self, + hls_dirs: Vec, + ) -> HashMap { + let mut results = HashMap::new(); + + // Run tests concurrently + let futures: Vec<_> = hls_dirs + .into_iter() + .map(|dir| { + let tester = HlsTimingTester::new( + self.max_avg_difference, + self.max_individual_difference, + self.problematic_threshold, + ); + let dir_clone = dir.clone(); + async move { + let result = + tokio::task::spawn_blocking(move || tester.test_stream_timing(&dir_clone)) + .await + .unwrap_or_else(|_| HlsTimingTestResult { + total_segments: 0, + full_segments: 0, + partial_segments: 0, + independent_partials: 0, + total_playlist_duration: 0.0, + total_actual_duration: 0.0, + total_difference: 0.0, + average_difference: 0.0, + min_difference: 0.0, + max_difference: 0.0, + problematic_segments: Vec::new(), + segments: Vec::new(), + test_duration: Duration::from_secs(0), + success: false, + error_message: Some("Task panicked".to_string()), + }); + (dir, result) + } + }) + .collect(); + + let resolved_futures = futures::future::join_all(futures).await; + + for (dir, result) in resolved_futures { + results.insert(dir, result); + } + + results + } + + fn analyze_segment(&self, path: &Path) -> Result { + let file = fs::File::open(path) + .with_context(|| format!("Failed to open file: {}", path.display()))?; + self.analyze_segment_with_reader(Box::new(file)) + } + + fn analyze_partial_segment( + &self, + path: &Path, + length: u64, + offset: Option, + ) -> Result { + let reader = ByteRangeReader::new(path, length, offset)?; + self.analyze_segment_with_reader(Box::new(reader)) + } + + fn analyze_segment_with_reader(&self, reader: Box) -> Result { + let mut demuxer = Demuxer::new_custom_io(reader, None)?; + + unsafe { + demuxer.probe_input()?; + } + + let mut video_start_pts = AV_NOPTS_VALUE; + let mut video_end_pts = AV_NOPTS_VALUE; + let mut audio_start_pts = AV_NOPTS_VALUE; + let mut audio_end_pts = AV_NOPTS_VALUE; + let mut video_last_duration = 0i64; + let mut audio_last_duration = 0i64; + let mut video_stream_idx: Option = None; + let mut audio_stream_idx: Option = None; + + // Read all packets and track timing + loop { + let packet_result = unsafe { demuxer.get_packet() }; + match packet_result { + Ok((pkt, stream)) => { + if pkt.is_null() { + break; + } + + unsafe { + let codec_type = (*(*stream).codecpar).codec_type; + let pts = (*pkt).pts; + let duration = (*pkt).duration; + let current_stream_idx = (*stream).index as usize; + + match codec_type { + AVMEDIA_TYPE_VIDEO => { + if video_stream_idx.is_none() { + video_stream_idx = Some(current_stream_idx); + } + if pts != AV_NOPTS_VALUE { + if video_start_pts == AV_NOPTS_VALUE { + video_start_pts = pts; + } + video_end_pts = pts; + video_last_duration = duration; + } + } + AVMEDIA_TYPE_AUDIO => { + if audio_stream_idx.is_none() { + audio_stream_idx = Some(current_stream_idx); + } + if pts != AV_NOPTS_VALUE { + if audio_start_pts == AV_NOPTS_VALUE { + audio_start_pts = pts; + } + audio_end_pts = pts; + audio_last_duration = duration; + } + } + _ => {} + } + } + } + Err(_) => break, + } + } + + // Calculate durations + let video_duration = if let Some(stream_idx) = video_stream_idx { + if video_start_pts != AV_NOPTS_VALUE && video_end_pts != AV_NOPTS_VALUE { + unsafe { + let stream = demuxer.get_stream(stream_idx)?; + let time_base = (*stream).time_base; + let pts_duration = (video_end_pts - video_start_pts) as f64 * av_q2d(time_base); + let last_pkt_duration = video_last_duration as f64 * av_q2d(time_base); + pts_duration + last_pkt_duration + } + } else { + 0.0 + } + } else { + 0.0 + }; + + let audio_duration = if let Some(stream_idx) = audio_stream_idx { + if audio_start_pts != AV_NOPTS_VALUE && audio_end_pts != AV_NOPTS_VALUE { + unsafe { + let stream = demuxer.get_stream(stream_idx)?; + let time_base = (*stream).time_base; + let pts_duration = (audio_end_pts - audio_start_pts) as f64 * av_q2d(time_base); + let last_pkt_duration = audio_last_duration as f64 * av_q2d(time_base); + pts_duration + last_pkt_duration + } + } else { + 0.0 + } + } else { + 0.0 + }; + + let total_duration = video_duration.max(audio_duration); + + Ok(SegmentDurations { + total_duration, + video_duration, + audio_duration, + }) + } +} + +#[derive(Debug)] +struct SegmentDurations { + total_duration: f64, + video_duration: f64, + audio_duration: f64, +} + +/// Custom IO reader for byte range access +struct ByteRangeReader { + file: fs::File, + start_offset: u64, + length: u64, + current_pos: u64, +} + +impl ByteRangeReader { + fn new(path: &Path, length: u64, offset: Option) -> Result { + use std::io::{Seek, SeekFrom}; + + let mut file = fs::File::open(path) + .with_context(|| format!("Failed to open file: {}", path.display()))?; + + let start_offset = offset.unwrap_or(0); + file.seek(SeekFrom::Start(start_offset)) + .with_context(|| format!("Failed to seek to offset {}", start_offset))?; + + Ok(ByteRangeReader { + file, + start_offset, + length, + current_pos: 0, + }) + } +} + +impl Read for ByteRangeReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let remaining = self.length - self.current_pos; + if remaining == 0 { + return Ok(0); + } + + let to_read = std::cmp::min(buf.len() as u64, remaining) as usize; + let bytes_read = self.file.read(&mut buf[..to_read])?; + self.current_pos += bytes_read as u64; + Ok(bytes_read) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_timing_tester_creation() { + let tester = HlsTimingTester::default(); + assert_eq!(tester.max_avg_difference, 0.1); + assert_eq!(tester.max_individual_difference, 0.5); + assert_eq!(tester.problematic_threshold, 0.2); + } + + #[test] + fn test_timing_result_passes() { + let result = HlsTimingTestResult { + total_segments: 10, + full_segments: 8, + partial_segments: 2, + independent_partials: 1, + total_playlist_duration: 20.0, + total_actual_duration: 20.05, + total_difference: 0.05, + average_difference: 0.005, + min_difference: -0.01, + max_difference: 0.02, + problematic_segments: Vec::new(), + segments: Vec::new(), + test_duration: Duration::from_millis(100), + success: true, + error_message: None, + }; + + assert!(result.passes(0.1, 0.5)); + assert!(!result.passes(0.001, 0.5)); + } + + #[test] + fn test_missing_playlist() { + let temp_dir = tempdir().unwrap(); + let tester = HlsTimingTester::default(); + let result = tester.test_stream_timing(temp_dir.path()); + + assert!(!result.success); + assert!(result.error_message.is_some()); + assert!(result.error_message.unwrap().contains("does not exist")); + } + + #[test] + fn test_generated_hls_stream_mpegts() { + env_logger::try_init().ok(); + + let temp_dir = tempdir().unwrap(); + let tester = HlsTimingTester::new(0.2, 1.0, 0.5); // More lenient thresholds for test + + let result = tester.test_generated_stream( + temp_dir.path(), + 10.0, // 10 seconds + SegmentType::MPEGTS, + ); + + match result { + Ok(test_result) => { + assert!( + test_result.success, + "Test should pass: {}", + test_result.summary() + ); + assert!( + test_result.total_segments > 0, + "Should have generated segments" + ); + assert!( + test_result.total_playlist_duration > 8.0, + "Should have ~10s of content" + ); + assert!(test_result.full_segments > 0, "Should have full segments"); + println!("✓ MPEG-TS test passed: {}", test_result.summary()); + } + Err(e) => { + panic!("Test generation failed: {}", e); + } + } + } + + #[test] + fn test_generated_hls_stream_fmp4() { + env_logger::try_init().ok(); + + let temp_dir = tempdir().unwrap(); + let tester = HlsTimingTester::new(0.2, 1.0, 0.5); // More lenient thresholds for test + + let result = tester.test_generated_stream( + temp_dir.path(), + 8.0, // 8 seconds + SegmentType::FMP4, + ); + + match result { + Ok(test_result) => { + assert!( + test_result.success, + "Test should pass: {}", + test_result.summary() + ); + assert!( + test_result.total_segments > 0, + "Should have generated segments" + ); + assert!( + test_result.total_playlist_duration > 6.0, + "Should have ~8s of content" + ); + assert!(test_result.full_segments > 0, "Should have full segments"); + println!("✓ fMP4 test passed: {}", test_result.summary()); + } + Err(e) => { + panic!("Test generation failed: {}", e); + } + } + } + + #[test] + fn test_30_second_stream() { + env_logger::try_init().ok(); + + let temp_dir = tempdir().unwrap(); + let tester = HlsTimingTester::default(); + + let result = tester.test_generated_stream( + temp_dir.path(), + 30.0, // 30 seconds as requested + SegmentType::MPEGTS, + ); + + match result { + Ok(test_result) => { + println!("{:?}", test_result); + println!("30-second stream test results:"); + println!(" Total segments: {}", test_result.total_segments); + println!(" Full segments: {}", test_result.full_segments); + println!(" Partial segments: {}", test_result.partial_segments); + println!( + " Total playlist duration: {:.1}s", + test_result.total_playlist_duration + ); + println!( + " Total actual duration: {:.1}s", + test_result.total_actual_duration + ); + println!( + " Average difference: {:.3}s", + test_result.average_difference + ); + println!(" Test duration: {:?}", test_result.test_duration); + println!(" Result: {}", test_result.summary()); + + assert!( + test_result.success, + "30s test should pass: {}", + test_result.summary() + ); + assert!( + test_result.total_segments >= 2, + "Should have multiple segments for 30s" + ); + assert!( + test_result.total_playlist_duration >= 25.0, + "Should have ~30s of content" + ); + + if !test_result.problematic_segments.is_empty() { + println!(" Problematic segments:"); + for seg in &test_result.problematic_segments { + println!( + " {}: {:.3}s difference", + seg.segment_name, seg.difference + ); + } + } + } + Err(e) => { + panic!("30-second test generation failed: {}", e); + } + } + } +}