From e056e0427fa3b937a3fe83f1b3c2f9db8fa3d01b Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 17 Jun 2025 11:48:49 +0100 Subject: [PATCH] fix: HLS-LL refactor: fMP4 (WIP) --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/core/src/lib.rs | 2 +- crates/core/src/mux/hls.rs | 287 ++++++++----- crates/core/src/pipeline/runner.rs | 4 +- crates/core/src/variant/video.rs | 2 +- crates/core/src/viewer.rs | 103 +++-- crates/zap-stream/src/api.rs | 12 +- crates/zap-stream/src/bin/hls_debug.rs | 530 ++++++++++++++++++++++--- crates/zap-stream/src/http.rs | 6 + crates/zap-stream/src/overseer.rs | 2 +- 11 files changed, 747 insertions(+), 205 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d9b745d..b8ee98c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2153,7 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" [[package]] name = "m3u8-rs" version = "6.0.0" -source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f#5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" +source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=6803eefca2838a8bfae9e19fd516ef36d7d89997#6803eefca2838a8bfae9e19fd516ef36d7d89997" dependencies = [ "chrono", "nom", diff --git a/Cargo.toml b/Cargo.toml index 4f76997..77c0c8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,6 @@ url = "2.5.0" itertools = "0.14.0" chrono = { version = "^0.4.38", features = ["serde"] } hex = "0.4.3" -m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" } +m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "6803eefca2838a8bfae9e19fd516ef36d7d89997" } sha2 = "0.10.8" data-encoding = "2.9.0" \ No newline at end of file diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fc373d9..a26f13e 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -1,8 +1,8 @@ pub mod egress; +mod generator; pub mod ingress; pub mod mux; pub mod overseer; pub mod pipeline; pub mod variant; pub mod viewer; -mod generator; diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index 52c7b73..d714947 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -4,13 +4,13 @@ use anyhow::{bail, ensure, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_free, av_interleaved_write_frame, av_opt_set, av_q2d, avio_close, avio_flush, avio_open, - avio_size, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, + av_free, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, AVPacket, + AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; -use log::{info, trace, warn}; -use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint}; +use log::{debug, info, trace, warn}; +use m3u8_rs::{ByteRange, ExtTag, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint}; use std::collections::HashMap; use std::fmt::Display; use std::fs::File; @@ -89,12 +89,10 @@ pub struct HlsVariant { segments: Vec, /// Type of segments to create segment_type: SegmentType, - /// Timestamp of the previous packet - last_pkt_pts: i64, /// Timestamp of the start of the current segment current_segment_start: f64, - /// Current segment duration in seconds (precise accumulation) - duration: f64, + /// Timestamp of the start of the current partial + current_partial_start: f64, /// Number of packets written to current segment packets_written: u64, /// Reference stream used to track duration @@ -105,10 +103,10 @@ pub struct HlsVariant { partial_target_duration: f32, /// HLS-LL: Current partial index current_partial_index: u64, - /// HLS-LL: Current duration in this partial - current_partial_duration: f64, /// HLS-LL: Whether the next partial segment should be marked as independent next_partial_independent: bool, + /// Path to initialization segment for fMP4 + init_segment_path: Option, } #[derive(PartialEq)] @@ -184,6 +182,9 @@ impl PartialSegmentInfo { } impl HlsVariant { + const LOW_LATENCY: bool = true; + const LL_PARTS: usize = 3; + pub fn new<'a>( out_dir: &'a str, group: usize, @@ -194,14 +195,6 @@ impl HlsVariant { let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type); std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?; - let mut opts = HashMap::new(); - if let SegmentType::FMP4 = segment_type { - opts.insert("fflags".to_string(), "-autobsf".to_string()); - opts.insert( - "movflags".to_string(), - "+frag_custom+dash+delay_moov".to_string(), - ); - }; let mut mux = unsafe { Muxer::builder() .with_output_path( @@ -216,7 +209,7 @@ impl HlsVariant { let mut streams = Vec::new(); let mut ref_stream_index = -1; let mut has_video = false; - let mut seg_size = 1.0; + let mut seg_size = 2.0; for (var, enc) in encoded_vars { match var { @@ -268,12 +261,32 @@ impl HlsVariant { name, ref_stream_index ); + + let min_segment_length = if Self::LOW_LATENCY { + (seg_size * 3.0).max(6.0) // make segments 3x longer in LL mode or minimum 6s + } else { + 2.0 + }; + let segment_length = seg_size.max(min_segment_length); + let mut opts = HashMap::new(); + if let SegmentType::FMP4 = segment_type { + //opts.insert("fflags".to_string(), "-autobsf".to_string()); + opts.insert( + "movflags".to_string(), + "+frag_custom+dash+delay_moov".to_string(), + ); + }; + let mut partial_seg_size = segment_length / Self::LL_PARTS as f32; + partial_seg_size -= partial_seg_size % seg_size; // align to keyframe + unsafe { mux.open(Some(opts))?; + //av_dump_format(mux.context(), 0, ptr::null_mut(), 0); } - Ok(Self { + + let mut variant = Self { name: name.clone(), - segment_length: seg_size, + segment_length, segment_window: 30.0, mux, streams, @@ -281,17 +294,25 @@ impl HlsVariant { segments: Vec::new(), out_dir: out_dir.to_string(), segment_type, - last_pkt_pts: AV_NOPTS_VALUE, - duration: 0.0, packets_written: 0, ref_stream_index, - partial_target_duration: 0.33, + partial_target_duration: partial_seg_size, current_partial_index: 0, - current_partial_duration: 0.0, current_segment_start: 0.0, + current_partial_start: 0.0, next_partial_independent: false, - low_latency: false, - }) + low_latency: Self::LOW_LATENCY, + init_segment_path: None, + }; + + // Create initialization segment for fMP4 + if segment_type == SegmentType::FMP4 { + unsafe { + variant.create_init_segment()?; + } + } + + Ok(variant) } pub fn segment_name(t: SegmentType, idx: u64) -> String { @@ -332,38 +353,23 @@ impl HlsVariant { is_ref_pkt = false; } - // HLS-LL: write prev partial segment - if self.low_latency && self.current_partial_duration >= self.partial_target_duration as f64 - { - self.create_partial_segment()?; + if is_ref_pkt && self.packets_written > 0 { + let pkt_pts = (*pkt).pts as f64 * pkt_q; + let cur_duration = pkt_pts - self.current_segment_start; + let cur_part_duration = pkt_pts - self.current_partial_start; - // HLS-LL: Mark next partial as independent if this packet is a keyframe - if can_split { - self.next_partial_independent = true; - } - } - // check if current packet is keyframe, flush current segment - if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 { - result = self.split_next_seg((*pkt).pts as f64 * pkt_q)?; - } + // check if current packet is keyframe, flush current segment + if can_split && cur_duration >= self.segment_length as f64 { + result = self.split_next_seg(pkt_pts)?; + } else if cur_part_duration >= self.partial_target_duration as f64 { + result = self.create_partial_segment(pkt_pts)?; - // track duration from pts - if is_ref_pkt { - if self.last_pkt_pts == AV_NOPTS_VALUE { - self.last_pkt_pts = (*pkt).pts; - } - let time_delta = if (*pkt).duration != 0 { - (*pkt).duration as f64 * pkt_q - } else { - ((*pkt).pts - self.last_pkt_pts) as f64 * pkt_q - }; - if time_delta > 0.0 { - self.duration += time_delta; - if self.low_latency { - self.current_partial_duration += time_delta; + // HLS-LL: Mark next partial as independent if this packet is a keyframe + if can_split { + info!("Next partial is independent"); + self.next_partial_independent = true; } } - self.last_pkt_pts = (*pkt).pts; } // write to current segment @@ -378,13 +384,21 @@ impl HlsVariant { } /// Create a partial segment for LL-HLS - fn create_partial_segment(&mut self) -> Result<()> { + fn create_partial_segment(&mut self, next_pkt_start: f64) -> Result { let ctx = self.mux.context(); let end_pos = unsafe { avio_flush((*ctx).pb); avio_size((*ctx).pb) as u64 }; + ensure!(end_pos > 0, "End position cannot be 0"); + if self.segment_type == SegmentType::MPEGTS { + ensure!( + end_pos % 188 == 0, + "Invalid end position, must be multiple of 188" + ); + } + let previous_end_pos = self .segments .last() @@ -393,31 +407,68 @@ impl HlsVariant { _ => None, }) .unwrap_or(0); - let independent = self.next_partial_independent; let partial_size = end_pos - previous_end_pos; let partial_info = PartialSegmentInfo { index: self.current_partial_index, parent_index: self.idx, parent_kind: self.segment_type, - duration: self.current_partial_duration, - independent, + duration: next_pkt_start - self.current_partial_start, + independent: self.next_partial_independent, byte_range: Some((partial_size, Some(previous_end_pos))), }; - trace!( + debug!( "{} created partial segment {} [{:.3}s, independent={}]", - self.name, - partial_info.index, - partial_info.duration, - independent + self.name, partial_info.index, partial_info.duration, partial_info.independent, ); self.segments.push(HlsSegment::Partial(partial_info)); self.current_partial_index += 1; - self.current_partial_duration = 0.0; self.next_partial_independent = false; + self.current_partial_start = next_pkt_start; self.write_playlist()?; + Ok(EgressResult::None) + } + + /// Create initialization segment for fMP4 + unsafe fn create_init_segment(&mut self) -> Result<()> { + if self.segment_type != SegmentType::FMP4 || self.init_segment_path.is_some() { + return Ok(()); + } + + let init_path = PathBuf::from(&self.out_dir) + .join(&self.name) + .join("init.mp4") + .to_string_lossy() + .to_string(); + + // Create a temporary muxer for initialization segment + let mut init_opts = HashMap::new(); + init_opts.insert( + "movflags".to_string(), + "+frag_custom+dash+delay_moov".to_string(), + ); + + let mut init_mux = Muxer::builder() + .with_output_path(init_path.as_str(), Some("mp4"))? + .build()?; + + // Copy stream parameters from main muxer + let main_ctx = self.mux.context(); + for i in 0..(*main_ctx).nb_streams { + let src_stream = *(*main_ctx).streams.add(i as usize); + let s = init_mux.add_copy_stream(src_stream)?; + ensure!((*s).index == (*src_stream).index, "Stream index mismatch"); + } + + init_mux.open(Some(init_opts))?; + av_write_frame(init_mux.context(), ptr::null_mut()); + init_mux.close()?; + + self.init_segment_path = Some("init.mp4".to_string()); + info!("Created fMP4 initialization segment: {}", init_path); + Ok(()) } @@ -425,10 +476,11 @@ impl HlsVariant { unsafe fn split_next_seg(&mut self, next_pkt_start: f64) -> Result { let completed_segment_idx = self.idx; self.idx += 1; + self.current_partial_index = 0; // Manually reset muxer avio let ctx = self.mux.context(); - let ret = av_interleaved_write_frame(ctx, ptr::null_mut()); + let ret = av_write_frame(ctx, ptr::null_mut()); if ret < 0 { bail!("Failed to split segment {}", ret); } @@ -445,14 +497,6 @@ impl HlsVariant { bail!("Failed to re-init avio"); } - // tell muxer it needs to write headers again - av_opt_set( - (*ctx).priv_data, - cstr!("events_flags"), - cstr!("resend_headers"), - 0, - ); - // Log the completed segment (previous index), not the next one let completed_seg_path = Self::map_segment_path( &self.out_dir, @@ -467,7 +511,7 @@ impl HlsVariant { .unwrap_or(0); let cur_duration = next_pkt_start - self.current_segment_start; - info!( + debug!( "Finished segment {} [{:.3}s, {:.2} kB, {} pkts]", completed_segment_path .file_name() @@ -519,7 +563,6 @@ impl HlsVariant { // Reset counters for next segment self.packets_written = 0; - self.duration = 0.0; self.current_segment_start = next_pkt_start; Ok(EgressResult::Segments { @@ -590,9 +633,18 @@ impl HlsVariant { } let mut pl = m3u8_rs::MediaPlaylist::default(); - pl.target_duration = (self.segment_length.ceil() as u64).max(1); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); + // Add EXT-X-MAP initialization segment for fMP4 + if self.segment_type == SegmentType::FMP4 { + if let Some(ref init_path) = self.init_segment_path { + pl.unknown_tags.push(ExtTag { + tag: "X-MAP".to_string(), + rest: Some(format!("URI=\"{}\"", init_path)), + }); + } + } + // append segment preload for next part segment if let Some(HlsSegment::Partial(partial)) = self.segments.last() { // TODO: try to estimate if there will be another partial segment @@ -603,7 +655,19 @@ impl HlsVariant { byte_range_length: None, })); } - pl.version = Some(if self.low_latency { 6 } else { 3 }); + let pl_version = if self.low_latency { + 6 + } else if self.segment_type == SegmentType::FMP4 { + 6 // EXT-X-MAP without I-FRAMES-ONLY + } else { + 3 + }; + pl.version = Some(pl_version); + pl.target_duration = if pl_version >= 6 { + self.segment_length.round() as _ + } else { + self.segment_length + }; if self.low_latency { pl.part_inf = Some(PartInf { part_target: self.partial_target_duration as f64, @@ -624,31 +688,48 @@ impl HlsVariant { Ok(()) } - /// https://git.ffmpeg.org/gitweb/ffmpeg.git/blob/HEAD:/libavformat/hlsenc.c#l351 - unsafe fn to_codec_attr(&self, stream: *mut AVStream) -> Option { - let p = (*stream).codecpar; - if (*p).codec_id == AV_CODEC_ID_H264 { - let data = (*p).extradata; - if !data.is_null() { - let mut id_ptr = ptr::null_mut(); - let ds: *mut u16 = data as *mut u16; - if (*ds) == 1 && (*data.add(4)) & 0x1F == 7 { - id_ptr = data.add(5); - } else if (*ds) == 1 && (*data.add(3)) & 0x1F == 7 { - id_ptr = data.add(4); - } else if *data.add(0) == 1 { - id_ptr = data.add(1); - } else { - return None; - } + unsafe fn to_codec_attr(&self) -> Option { + let mut codecs = Vec::new(); - return Some(format!( - "avc1.{}", - hex::encode([*id_ptr.add(0), *id_ptr.add(1), *id_ptr.add(2)]) - )); + // Find video and audio streams and build codec string + for stream in &self.streams { + let av_stream = *(*self.mux.context()).streams.add(*stream.index()); + let p = (*av_stream).codecpar; + + match stream { + HlsVariantStream::Video { .. } => { + if (*p).codec_id == AV_CODEC_ID_H264 { + // Use profile and level from codec parameters + let profile_idc = (*p).profile as u8; + let level_idc = (*p).level as u8; + + // For H.264, constraint flags are typically 0 unless specified + // Common constraint flags: 0x40 (constraint_set1_flag) for baseline + let constraint_flags = match profile_idc { + 66 => 0x40, // Baseline profile + _ => 0x00, // Main/High profiles typically have no constraints + }; + + let avc1_code = format!( + "avc1.{:02x}{:02x}{:02x}", + profile_idc, constraint_flags, level_idc + ); + codecs.push(avc1_code); + } + } + HlsVariantStream::Audio { .. } => { + // Standard AAC-LC codec string + codecs.push("mp4a.40.2".to_string()); + } + _ => {} } } - None + + if codecs.is_empty() { + None + } else { + Some(codecs.join(",")) + } } pub fn to_playlist_variant(&self) -> m3u8_rs::VariantStream { @@ -659,9 +740,9 @@ impl HlsVariant { m3u8_rs::VariantStream { is_i_frame: false, uri: format!("{}/live.m3u8", self.name), - bandwidth: 0, - average_bandwidth: Some((*codec_par).bit_rate as u64), - codecs: self.to_codec_attr(av_stream), + bandwidth: (*codec_par).bit_rate as u64, + average_bandwidth: None, + codecs: self.to_codec_attr(), resolution: Some(m3u8_rs::Resolution { width: (*codec_par).width as _, height: (*codec_par).height as _, diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index e71d575..896c0c4 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -27,7 +27,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType, }; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use tokio::runtime::Handle; use uuid::Uuid; @@ -611,7 +611,7 @@ impl PipelineRunner { let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); if elapsed >= 2f32 { let n_frames = self.frame_ctr - self.fps_last_frame_ctr; - info!("Average fps: {:.2}", n_frames as f32 / elapsed); + debug!("Average fps: {:.2}", n_frames as f32 / elapsed); self.fps_counter_start = Instant::now(); self.fps_last_frame_ctr = self.frame_ctr; } diff --git a/crates/core/src/variant/video.rs b/crates/core/src/variant/video.rs index 9052ce7..833ea43 100644 --- a/crates/core/src/variant/video.rs +++ b/crates/core/src/variant/video.rs @@ -85,7 +85,7 @@ impl TryInto for &VideoVariant { fn try_into(self) -> Result { unsafe { let mut opt = HashMap::new(); - if self.codec == "x264" { + if self.codec == "x264" || self.codec == "libx264" { opt.insert("preset".to_string(), "fast".to_string()); //opt.insert("tune".to_string(), "zerolatency".to_string()); } diff --git a/crates/core/src/viewer.rs b/crates/core/src/viewer.rs index b1d2edb..f947cb8 100644 --- a/crates/core/src/viewer.rs +++ b/crates/core/src/viewer.rs @@ -1,10 +1,10 @@ +use data_encoding::BASE32_NOPAD; +use log::debug; +use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use tokio::task; -use log::debug; -use sha2::{Digest, Sha256}; -use data_encoding::BASE32_NOPAD; #[derive(Debug, Clone)] pub struct ViewerInfo { @@ -26,13 +26,13 @@ impl ViewerTracker { viewers: Arc::new(RwLock::new(HashMap::new())), timeout_duration: Duration::from_secs(600), // 10 minutes }; - + // Start cleanup task let cleanup_tracker = tracker.clone(); task::spawn(async move { cleanup_tracker.cleanup_task().await; }); - + tracker } @@ -42,48 +42,56 @@ impl ViewerTracker { Some(ua) => format!("{}{}", ip_address, ua), None => ip_address.to_string(), }; - + // Hash the input using SHA-256 let mut hasher = Sha256::new(); hasher.update(input.as_bytes()); let hash = hasher.finalize(); - + // Take the first 8 bytes of the hash let fingerprint = &hash[..8]; - + // Base32 encode the fingerprint (without padding) BASE32_NOPAD.encode(fingerprint).to_lowercase() } - pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option) { + pub fn track_viewer( + &self, + token: &str, + stream_id: &str, + ip_address: &str, + user_agent: Option, + ) { let mut viewers = self.viewers.write().unwrap(); - + let viewer_info = ViewerInfo { stream_id: stream_id.to_string(), ip_address: ip_address.to_string(), user_agent, last_seen: Instant::now(), }; - + if let Some(existing) = viewers.get(token) { debug!("Updating viewer {} for stream {}", token, stream_id); } else { debug!("New viewer {} for stream {}", token, stream_id); } - + viewers.insert(token.to_string(), viewer_info); } pub fn get_viewer_count(&self, stream_id: &str) -> usize { let viewers = self.viewers.read().unwrap(); - viewers.values() + viewers + .values() .filter(|v| v.stream_id == stream_id) .count() } pub fn get_active_viewers(&self, stream_id: &str) -> Vec { let viewers = self.viewers.read().unwrap(); - viewers.iter() + viewers + .iter() .filter(|(_, v)| v.stream_id == stream_id) .map(|(token, _)| token.clone()) .collect() @@ -98,7 +106,7 @@ impl ViewerTracker { async fn cleanup_task(&self) { let mut interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute - + loop { interval.tick().await; self.cleanup_expired_viewers(); @@ -108,16 +116,21 @@ impl ViewerTracker { fn cleanup_expired_viewers(&self) { let mut viewers = self.viewers.write().unwrap(); let now = Instant::now(); - - let expired_tokens: Vec = viewers.iter() + + let expired_tokens: Vec = viewers + .iter() .filter(|(_, viewer)| now.duration_since(viewer.last_seen) > self.timeout_duration) .map(|(token, _)| token.clone()) .collect(); - + for token in expired_tokens { if let Some(viewer) = viewers.remove(&token) { - debug!("Expired viewer {} from stream {} (last seen {:?} ago)", - token, viewer.stream_id, now.duration_since(viewer.last_seen)); + debug!( + "Expired viewer {} from stream {} (last seen {:?} ago)", + token, + viewer.stream_id, + now.duration_since(viewer.last_seen) + ); } } } @@ -138,11 +151,14 @@ mod tests { // Test that the same IP and user agent always generate the same token let ip = "192.168.1.1"; let user_agent = Some("Mozilla/5.0 (Test Browser)"); - + let token1 = ViewerTracker::generate_viewer_token(ip, user_agent); let token2 = ViewerTracker::generate_viewer_token(ip, user_agent); - - assert_eq!(token1, token2, "Same IP and user agent should generate identical tokens"); + + assert_eq!( + token1, token2, + "Same IP and user agent should generate identical tokens" + ); } #[test] @@ -151,22 +167,28 @@ mod tests { let ip1 = "192.168.1.1"; let ip2 = "192.168.1.2"; let user_agent = Some("Mozilla/5.0 (Test Browser)"); - + let token1 = ViewerTracker::generate_viewer_token(ip1, user_agent); let token2 = ViewerTracker::generate_viewer_token(ip2, user_agent); - - assert_ne!(token1, token2, "Different IPs should generate different tokens"); + + assert_ne!( + token1, token2, + "Different IPs should generate different tokens" + ); } #[test] fn test_generate_viewer_token_no_user_agent() { // Test that tokens are generated even without user agent let ip = "192.168.1.1"; - + let token1 = ViewerTracker::generate_viewer_token(ip, None); let token2 = ViewerTracker::generate_viewer_token(ip, None); - - assert_eq!(token1, token2, "Same IP without user agent should generate identical tokens"); + + assert_eq!( + token1, token2, + "Same IP without user agent should generate identical tokens" + ); } #[test] @@ -174,12 +196,16 @@ mod tests { // Test that generated tokens have the expected base32 format let ip = "192.168.1.1"; let user_agent = Some("Mozilla/5.0 (Test Browser)"); - + let token = ViewerTracker::generate_viewer_token(ip, user_agent); - + // Should be base32 encoded (lowercase, no padding) - assert!(token.chars().all(|c| "abcdefghijklmnopqrstuvwxyz234567".contains(c)), - "Token should only contain base32 characters"); + assert!( + token + .chars() + .all(|c| "abcdefghijklmnopqrstuvwxyz234567".contains(c)), + "Token should only contain base32 characters" + ); assert!(token.len() > 10, "Token should be reasonably long"); } @@ -189,10 +215,13 @@ mod tests { let ip = "192.168.1.1"; let user_agent1 = Some("Mozilla/5.0 (Chrome)"); let user_agent2 = Some("Mozilla/5.0 (Firefox)"); - + let token1 = ViewerTracker::generate_viewer_token(ip, user_agent1); let token2 = ViewerTracker::generate_viewer_token(ip, user_agent2); - - assert_ne!(token1, token2, "Different user agents should generate different tokens"); + + assert_ne!( + token1, token2, + "Different user agents should generate different tokens" + ); } -} \ No newline at end of file +} diff --git a/crates/zap-stream/src/api.rs b/crates/zap-stream/src/api.rs index 9826400..3265c3c 100644 --- a/crates/zap-stream/src/api.rs +++ b/crates/zap-stream/src/api.rs @@ -652,8 +652,16 @@ impl Api { } /// Track a viewer for viewer count analytics - pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option) { - self.overseer.viewer_tracker().track_viewer(token, stream_id, ip_address, user_agent); + pub fn track_viewer( + &self, + token: &str, + stream_id: &str, + ip_address: &str, + user_agent: Option, + ) { + self.overseer + .viewer_tracker() + .track_viewer(token, stream_id, ip_address, user_agent); } /// Get current viewer count for a stream diff --git a/crates/zap-stream/src/bin/hls_debug.rs b/crates/zap-stream/src/bin/hls_debug.rs index d665fcc..0a638f4 100644 --- a/crates/zap-stream/src/bin/hls_debug.rs +++ b/crates/zap-stream/src/bin/hls_debug.rs @@ -1,11 +1,13 @@ use anyhow::{Context, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_q2d, AV_NOPTS_VALUE, AVMediaType::AVMEDIA_TYPE_VIDEO, AVMediaType::AVMEDIA_TYPE_AUDIO, + av_q2d, AVMediaType::AVMEDIA_TYPE_AUDIO, AVMediaType::AVMEDIA_TYPE_VIDEO, AV_NOPTS_VALUE, }; use ffmpeg_rs_raw::Demuxer; use m3u8_rs::{parse_media_playlist, MediaSegmentType}; use std::env; +use std::fmt; use std::fs; +use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; #[derive(Debug)] @@ -16,6 +18,16 @@ struct SegmentInfo { video_duration: f64, audio_duration: f64, difference: f64, + segment_type: SegmentAnalysisType, +} + +#[derive(Debug, Clone)] +enum SegmentAnalysisType { + Full, + Partial { + independent: bool, + byte_range: Option<(u64, Option)>, + }, } #[derive(Debug)] @@ -31,11 +43,99 @@ struct SegmentDurations { audio_end_pts: i64, } +#[derive(Debug)] +struct InitSegmentInfo { + stream_count: usize, + streams: Vec, + has_moov: bool, + pixel_format_set: bool, +} + +#[derive(Debug)] +struct StreamInfo { + codec_type: String, + codec_name: String, + width: Option, + height: Option, + pixel_format: Option, +} + +impl fmt::Display for StreamInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.codec_type.as_str() { + "video" => { + if let (Some(w), Some(h)) = (self.width, self.height) { + write!(f, "{} {}x{}", self.codec_name, w, h)?; + } else { + write!(f, "{}", self.codec_name)?; + } + if let Some(ref pix_fmt) = self.pixel_format { + write!(f, " ({})", pix_fmt)?; + } + Ok(()) + } + "audio" => write!(f, "{} (audio)", self.codec_name), + _ => write!(f, "{} ({})", self.codec_name, self.codec_type), + } + } +} + +/// Custom IO reader that implements Read for byte range access to files +/// This allows us to read only a specific byte range from a file, which is essential +/// for analyzing HLS-LL partial segments that reference byte ranges in larger files. +struct ByteRangeReader { + file: fs::File, + start_offset: u64, + length: u64, + current_pos: u64, +} + +impl ByteRangeReader { + /// Create a new ByteRangeReader for the specified file and byte range + fn new(path: &Path, length: u64, offset: Option) -> Result { + let mut file = fs::File::open(path) + .with_context(|| format!("Failed to open file: {}", path.display()))?; + + let start_offset = offset.unwrap_or(0); + + // Seek to the start of our byte range + file.seek(SeekFrom::Start(start_offset)) + .with_context(|| format!("Failed to seek to offset {}", start_offset))?; + + Ok(ByteRangeReader { + file, + start_offset, + length, + current_pos: 0, + }) + } +} + +impl Read for ByteRangeReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // Calculate how many bytes we can still read within our range + let remaining = self.length - self.current_pos; + if remaining == 0 { + return Ok(0); // EOF for our byte range + } + + // Limit the read to not exceed our byte range + let to_read = std::cmp::min(buf.len() as u64, remaining) as usize; + let bytes_read = self.file.read(&mut buf[..to_read])?; + + self.current_pos += bytes_read as u64; + Ok(bytes_read) + } +} + fn main() -> Result<()> { let args: Vec = env::args().collect(); if args.len() != 2 { eprintln!("Usage: {} ", args[0]); - eprintln!("Example: {} out/hls/8c220348-fdbb-44cd-94d5-97a11a9ec91d/stream_0", args[0]); + eprintln!( + "Example: {} out/hls/8c220348-fdbb-44cd-94d5-97a11a9ec91d/stream_0", + args[0] + ); std::process::exit(1); } @@ -49,12 +149,41 @@ fn main() -> Result<()> { println!("Analyzing HLS stream: {}", hls_dir.display()); println!("Playlist: {}", playlist_path.display()); + + // Check for initialization segment + let init_path = hls_dir.join("init.mp4"); + if init_path.exists() { + println!("Init segment: {}", init_path.display()); + match analyze_init_segment(&init_path) { + Ok(info) => { + println!(" Streams: {}", info.stream_count); + for (i, stream_info) in info.streams.iter().enumerate() { + println!(" Stream {}: {}", i, stream_info); + } + if info.has_moov { + println!(" ✓ Contains MOOV box"); + } else { + println!(" ✗ Missing MOOV box"); + } + if info.pixel_format_set { + println!(" ✓ Pixel format properly set"); + } else { + println!(" ✗ Pixel format not set"); + } + } + Err(e) => { + println!(" Error analyzing init segment: {}", e); + } + } + } else { + println!("No init segment found"); + } println!(); // Parse the playlist - let playlist_content = fs::read_to_string(&playlist_path) - .context("Failed to read playlist file")?; - + let playlist_content = + fs::read_to_string(&playlist_path).context("Failed to read playlist file")?; + let (_, playlist) = parse_media_playlist(playlist_content.as_bytes()) .map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?; @@ -64,60 +193,179 @@ fn main() -> Result<()> { let mut total_actual_duration = 0.0f64; println!("Segment Analysis:"); - println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}", - "Segment", "Playlist", "Actual", "Video", "Audio", "Difference"); - println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}", - "--------", "--------", "------", "-----", "-----", "----------"); + println!( + "{:<12} {:>4} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12}", + "Segment", "Type", "Playlist", "Actual", "Video", "Audio", "Difference", "Info" + ); + println!( + "{:<12} {:>4} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12}", + "--------", "----", "--------", "------", "-----", "-----", "----------", "----" + ); for segment_type in &playlist.segments { - if let MediaSegmentType::Full(segment) = segment_type { - let segment_path = hls_dir.join(&segment.uri); - - if !segment_path.exists() { - eprintln!("Warning: Segment file {:?} does not exist", segment_path); + match segment_type { + MediaSegmentType::Full(segment) => { + let segment_path = hls_dir.join(&segment.uri); + + if !segment_path.exists() { + eprintln!("Warning: Segment file {:?} does not exist", segment_path); + continue; + } + + // Analyze file using demuxer + let durations = analyze_segment(&segment_path)?; + let actual_duration = durations.total_duration; + let video_duration = durations.video_duration; + let audio_duration = durations.audio_duration; + + let playlist_duration = segment.duration; + let difference = actual_duration - playlist_duration as f64; + + let info = SegmentInfo { + filename: segment.uri.clone(), + playlist_duration, + actual_duration, + video_duration, + audio_duration, + difference, + segment_type: SegmentAnalysisType::Full, + }; + + println!( + "{:<12} {:>4} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12}", + info.filename, + "FULL", + info.playlist_duration, + info.actual_duration, + info.video_duration, + info.audio_duration, + info.difference, + "" + ); + + segments.push(info); + total_playlist_duration += playlist_duration; + total_actual_duration += actual_duration; + } + MediaSegmentType::Partial(partial) => { + let segment_path = hls_dir.join(&partial.uri); + + if !segment_path.exists() { + eprintln!( + "Warning: Partial segment file {:?} does not exist", + segment_path + ); + continue; + } + + // For partial segments, we need to analyze them differently since they reference byte ranges + let (actual_duration, video_duration, audio_duration) = + if let Some(byte_range) = &partial.byte_range { + // Analyze partial segment using byte range + let durations = analyze_partial_segment( + &segment_path, + byte_range.length, + byte_range.offset, + )?; + ( + durations.total_duration, + durations.video_duration, + durations.audio_duration, + ) + } else { + // Fallback to full file analysis if no byte range + let durations = analyze_segment(&segment_path)?; + ( + durations.total_duration, + durations.video_duration, + durations.audio_duration, + ) + }; + + let playlist_duration = partial.duration as f32; + let difference = actual_duration - playlist_duration as f64; + + let byte_range_info = partial.byte_range.as_ref().map(|br| (br.length, br.offset)); + + let info = SegmentInfo { + filename: partial.uri.clone(), + playlist_duration, + actual_duration, + video_duration, + audio_duration, + difference, + segment_type: SegmentAnalysisType::Partial { + independent: partial.independent, + byte_range: byte_range_info, + }, + }; + + let info_str = if partial.independent { "IND" } else { "" }; + + println!( + "{:<12} {:>4} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12}", + info.filename, + "PART", + info.playlist_duration, + info.actual_duration, + info.video_duration, + info.audio_duration, + info.difference, + info_str + ); + + segments.push(info); + total_playlist_duration += playlist_duration; + total_actual_duration += actual_duration; + } + MediaSegmentType::PreloadHint(_) => { + // Skip preload hints for analysis continue; } - - // Analyze file using demuxer - let durations = analyze_segment(&segment_path)?; - let actual_duration = durations.total_duration; - let video_duration = durations.video_duration; - let audio_duration = durations.audio_duration; - - let playlist_duration = segment.duration; - let difference = actual_duration - playlist_duration as f64; - - let info = SegmentInfo { - filename: segment.uri.clone(), - playlist_duration, - actual_duration, - video_duration, - audio_duration, - difference, - }; - - println!("{:<12} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3}", - info.filename, - info.playlist_duration, - info.actual_duration, - info.video_duration, - info.audio_duration, - info.difference); - - segments.push(info); - total_playlist_duration += playlist_duration; - total_actual_duration += actual_duration; } } println!(); + + // Separate full and partial segments for better analysis + let full_segments: Vec<&SegmentInfo> = segments + .iter() + .filter(|s| matches!(s.segment_type, SegmentAnalysisType::Full)) + .collect(); + let partial_segments: Vec<&SegmentInfo> = segments + .iter() + .filter(|s| matches!(s.segment_type, SegmentAnalysisType::Partial { .. })) + .collect(); + let independent_partials: Vec<&SegmentInfo> = segments + .iter() + .filter(|s| { + matches!( + s.segment_type, + SegmentAnalysisType::Partial { + independent: true, + .. + } + ) + }) + .collect(); + println!("Summary:"); println!(" Total segments: {}", segments.len()); + println!(" Full segments: {}", full_segments.len()); + println!(" Partial segments: {}", partial_segments.len()); + println!(" Independent partials: {}", independent_partials.len()); println!(" Total playlist duration: {:.3}s", total_playlist_duration); println!(" Total actual duration: {:.3}s", total_actual_duration); - println!(" Total difference: {:.3}s", total_actual_duration - total_playlist_duration as f64); - println!(" Average difference per segment: {:.3}s", - (total_actual_duration - total_playlist_duration as f64) / segments.len() as f64); + println!( + " Total difference: {:.3}s", + total_actual_duration - total_playlist_duration as f64 + ); + if !segments.is_empty() { + println!( + " Average difference per segment: {:.3}s", + (total_actual_duration - total_playlist_duration as f64) / segments.len() as f64 + ); + } // Statistics let differences: Vec = segments.iter().map(|s| s.difference).collect(); @@ -132,7 +380,8 @@ fn main() -> Result<()> { println!(" Average difference: {:.3}s", avg_diff); // Check for problematic segments - let problematic: Vec<&SegmentInfo> = segments.iter() + let problematic: Vec<&SegmentInfo> = segments + .iter() .filter(|s| s.difference.abs() > 0.5) .collect(); @@ -144,6 +393,56 @@ fn main() -> Result<()> { } } + // HLS-LL specific analysis + if !partial_segments.is_empty() { + println!(); + println!("HLS-LL Analysis:"); + let avg_partial_duration: f64 = partial_segments + .iter() + .map(|s| s.playlist_duration as f64) + .sum::() + / partial_segments.len() as f64; + println!(" Average partial duration: {:.3}s", avg_partial_duration); + + if let Some(part_inf) = &playlist.part_inf { + let target_duration = part_inf.part_target; + println!(" Target partial duration: {:.3}s", target_duration); + println!( + " Partial duration variance: {:.3}s", + (avg_partial_duration - target_duration).abs() + ); + } + + // Show byte range info for partial segments + let partials_with_ranges = partial_segments + .iter() + .filter_map(|s| { + if let SegmentAnalysisType::Partial { + byte_range: Some((length, offset)), + .. + } = &s.segment_type + { + Some((s, length, offset)) + } else { + None + } + }) + .collect::>(); + + if !partials_with_ranges.is_empty() { + println!( + " Partial segments with byte ranges: {}", + partials_with_ranges.len() + ); + let avg_range_size = partials_with_ranges + .iter() + .map(|(_, &length, _)| length) + .sum::() as f64 + / partials_with_ranges.len() as f64; + println!(" Average byte range size: {:.0} bytes", avg_range_size); + } + } + // Check playlist properties println!(); println!("Playlist Properties:"); @@ -151,20 +450,33 @@ fn main() -> Result<()> { println!(" Target duration: {:?}", playlist.target_duration); println!(" Media sequence: {:?}", playlist.media_sequence); if let Some(part_inf) = &playlist.part_inf { - println!(" Part target: {:.3}s (LL-HLS enabled)", part_inf.part_target); + println!( + " Part target: {:.3}s (LL-HLS enabled)", + part_inf.part_target + ); + } + + // Count preload hints + let preload_hints = playlist + .segments + .iter() + .filter(|s| matches!(s, MediaSegmentType::PreloadHint(_))) + .count(); + if preload_hints > 0 { + println!(" Preload hints: {}", preload_hints); } Ok(()) } -fn analyze_segment(path: &Path) -> Result { - let mut demuxer = Demuxer::new(path.to_str().unwrap())?; - +fn analyze_segment_with_reader(reader: Box) -> Result { + let mut demuxer = Demuxer::new_custom_io(reader, None)?; + // Probe the input to get stream information unsafe { demuxer.probe_input()?; } - + let mut video_start_pts = AV_NOPTS_VALUE; let mut video_end_pts = AV_NOPTS_VALUE; let mut audio_start_pts = AV_NOPTS_VALUE; @@ -184,13 +496,13 @@ fn analyze_segment(path: &Path) -> Result { if pkt.is_null() { break; // End of stream } - + unsafe { let codec_type = (*(*stream).codecpar).codec_type; let pts = (*pkt).pts; let duration = (*pkt).duration; let current_stream_idx = (*stream).index as usize; - + match codec_type { AVMEDIA_TYPE_VIDEO => { if video_stream_idx.is_none() { @@ -272,4 +584,110 @@ fn analyze_segment(path: &Path) -> Result { audio_start_pts, audio_end_pts, }) -} \ No newline at end of file +} + +fn analyze_segment(path: &Path) -> Result { + let file = + fs::File::open(path).with_context(|| format!("Failed to open file: {}", path.display()))?; + analyze_segment_with_reader(Box::new(file)) +} + +fn analyze_partial_segment( + path: &Path, + length: u64, + offset: Option, +) -> Result { + // Create a custom byte range reader for the partial segment + let reader = ByteRangeReader::new(path, length, offset)?; + + // Use the custom IO with demuxer to analyze only the byte range + analyze_segment_with_reader(Box::new(reader)) +} + +fn analyze_init_segment(path: &Path) -> Result { + use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_get_pix_fmt_name, avcodec_get_name, AVPixelFormat::AV_PIX_FMT_NONE, + }; + use std::ffi::CStr; + + let file = fs::File::open(path) + .with_context(|| format!("Failed to open init segment: {}", path.display()))?; + + let mut demuxer = Demuxer::new_custom_io(Box::new(file), None)?; + + // Probe the input to get stream information + unsafe { + demuxer.probe_input()?; + } + + let mut streams = Vec::new(); + let mut pixel_format_set = false; + + // Try to get streams - we'll iterate until we hit an error + let mut i = 0; + loop { + let stream_result = unsafe { demuxer.get_stream(i) }; + match stream_result { + Ok(stream) => unsafe { + let codecpar = (*stream).codecpar; + let codec_type = (*codecpar).codec_type; + + let codec_name = { + let name_ptr = avcodec_get_name((*codecpar).codec_id); + if name_ptr.is_null() { + "unknown".to_string() + } else { + CStr::from_ptr(name_ptr).to_string_lossy().to_string() + } + }; + + let (codec_type_str, width, height, pixel_format) = match codec_type { + AVMEDIA_TYPE_VIDEO => { + let w = if (*codecpar).width > 0 { Some((*codecpar).width) } else { None }; + let h = if (*codecpar).height > 0 { Some((*codecpar).height) } else { None }; + + let pix_fmt = if (*codecpar).format != AV_PIX_FMT_NONE as i32 { + pixel_format_set = true; + // Skip pixel format name resolution for now due to type mismatch + Some("yuv420p".to_string()) // Common default + } else { + None + }; + + ("video".to_string(), w, h, pix_fmt) + } + AVMEDIA_TYPE_AUDIO => { + ("audio".to_string(), None, None, None) + } + _ => { + ("other".to_string(), None, None, None) + } + }; + + streams.push(StreamInfo { + codec_type: codec_type_str, + codec_name, + width, + height, + pixel_format, + }); + + i += 1; + }, + Err(_) => break, // No more streams + } + } + + let stream_count = streams.len(); + + // Check if this is a proper MP4 initialization segment by looking for file data + let file_data = fs::read(path)?; + let has_moov = file_data.windows(4).any(|window| window == b"moov"); + + Ok(InitSegmentInfo { + stream_count, + streams, + has_moov, + pixel_format_set, + }) +} diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index 32696a2..c6919b3 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -104,6 +104,12 @@ impl HttpServer { HttpServerPath::HlsSegmentFile, ) .unwrap(); + router + .insert( + format!("/{}/{{stream}}/{{variant}}/{{seg}}.m4s", HlsEgress::PATH), + HttpServerPath::HlsSegmentFile, + ) + .unwrap(); Self { index_template, diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 6ca0109..aac9b6a 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -642,7 +642,7 @@ fn get_variants_from_endpoint<'a>( bitrate: bitrate as u64, codec: "libx264".to_string(), profile: 77, // AV_PROFILE_H264_MAIN - level: 51, + level: 51, // High 5.1 (4K) keyframe_interval: video_src.fps as u16, pixel_format: AV_PIX_FMT_YUV420P as u32, }));