From e7e1f0299d521861a4c0de3d3d09ed9e411c3d17 Mon Sep 17 00:00:00 2001 From: Kieran Date: Fri, 13 Jun 2025 17:42:39 +0100 Subject: [PATCH] fix: segment duration calc feat: add debugging tool for hls segments --- crates/core/src/mux/hls.rs | 67 +++--- crates/zap-stream/src/bin/hls_debug.rs | 275 +++++++++++++++++++++++++ crates/zap-stream/src/http.rs | 2 +- 3 files changed, 312 insertions(+), 32 deletions(-) create mode 100644 crates/zap-stream/src/bin/hls_debug.rs diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index 3ffd309..8a83b5e 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -4,8 +4,8 @@ use anyhow::{bail, ensure, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, - AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, + av_free, av_interleaved_write_frame, av_opt_set, av_q2d, avio_close, avio_flush, avio_open, + avio_size, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; @@ -89,8 +89,10 @@ pub struct HlsVariant { segments: Vec, /// Type of segments to create segment_type: SegmentType, - /// Ending presentation timestamp - end_pts: i64, + /// Timestamp of the previous packet + last_pkt_pts: i64, + /// Timestamp of the start of the current segment + current_segment_start: f64, /// Current segment duration in seconds (precise accumulation) duration: f64, /// Number of packets written to current segment @@ -275,13 +277,14 @@ impl HlsVariant { segments: Vec::new(), out_dir: out_dir.to_string(), segment_type, - end_pts: AV_NOPTS_VALUE, + last_pkt_pts: AV_NOPTS_VALUE, duration: 0.0, packets_written: 0, ref_stream_index, partial_target_duration: 0.33, current_partial_index: 0, current_partial_duration: 0.0, + current_segment_start: 0.0, next_partial_independent: false, low_latency: false, }) @@ -312,6 +315,7 @@ impl HlsVariant { .streams .add((*pkt).stream_index as usize); + let pkt_q = av_q2d((*pkt).time_base); let mut result = EgressResult::None; let stream_type = (*(*pkt_stream).codecpar).codec_type; let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO @@ -334,26 +338,28 @@ impl HlsVariant { self.next_partial_independent = true; } } - // check if current packet is keyframe, flush current segment if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 { - result = self.split_next_seg()?; + result = self.split_next_seg((*pkt).pts as f64 * pkt_q)?; } // track duration from pts if is_ref_pkt { - if self.end_pts == AV_NOPTS_VALUE { - self.end_pts = (*pkt).pts; + if self.last_pkt_pts == AV_NOPTS_VALUE { + self.last_pkt_pts = (*pkt).pts; } - let pts_diff = (*pkt).pts - self.end_pts; - if pts_diff > 0 { - let time_delta = pts_diff as f64 * av_q2d((*pkt).time_base); + let time_delta = if (*pkt).duration != 0 { + (*pkt).duration as f64 * pkt_q + } else { + ((*pkt).pts - self.last_pkt_pts) as f64 * pkt_q + }; + if time_delta > 0.0 { self.duration += time_delta; if self.low_latency { self.current_partial_duration += time_delta; } } - self.end_pts = (*pkt).pts; + self.last_pkt_pts = (*pkt).pts; } // write to current segment @@ -412,13 +418,16 @@ impl HlsVariant { } /// Reset the muxer state and start the next segment - unsafe fn split_next_seg(&mut self) -> Result { + unsafe fn split_next_seg(&mut self, next_pkt_start: f64) -> Result { let completed_segment_idx = self.idx; self.idx += 1; // Manually reset muxer avio let ctx = self.mux.context(); - av_write_frame(ctx, ptr::null_mut()); + let ret = av_interleaved_write_frame(ctx, ptr::null_mut()); + if ret < 0 { + bail!("Failed to split segment {}", ret); + } avio_flush((*ctx).pb); avio_close((*ctx).pb); av_free((*ctx).url as *mut _); @@ -452,13 +461,15 @@ impl HlsVariant { .metadata() .map(|m| m.len()) .unwrap_or(0); + + let cur_duration = next_pkt_start - self.current_segment_start; info!( "Finished segment {} [{:.3}s, {:.2} kB, {} pkts]", completed_segment_path .file_name() .unwrap_or_default() .to_string_lossy(), - self.duration, + cur_duration, segment_size as f32 / 1024f32, self.packets_written ); @@ -490,17 +501,22 @@ impl HlsVariant { let created = EgressSegment { variant: video_var_id, idx: completed_segment_idx, - duration: self.duration as f32, + duration: cur_duration as f32, path: completed_segment_path, }; - if let Err(e) = self.push_segment(completed_segment_idx, self.duration as f32) { - warn!("Failed to update playlist: {}", e); - } + self.segments.push(HlsSegment::Full(SegmentInfo { + index: completed_segment_idx, + duration: cur_duration as f32, + kind: self.segment_type, + })); + + self.write_playlist()?; // Reset counters for next segment self.packets_written = 0; self.duration = 0.0; + self.current_segment_start = next_pkt_start; Ok(EgressResult::Segments { created: vec![created], @@ -514,17 +530,6 @@ impl HlsVariant { .find(|a| matches!(*a, HlsVariantStream::Video { .. })) } - /// Add a new segment to the variant and return a list of deleted segments - fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> { - self.segments.push(HlsSegment::Full(SegmentInfo { - index: idx, - duration, - kind: self.segment_type, - })); - - self.write_playlist() - } - /// Delete segments which are too old fn clean_segments(&mut self) -> Result> { let drain_from_hls_segment = { diff --git a/crates/zap-stream/src/bin/hls_debug.rs b/crates/zap-stream/src/bin/hls_debug.rs new file mode 100644 index 0000000..d665fcc --- /dev/null +++ b/crates/zap-stream/src/bin/hls_debug.rs @@ -0,0 +1,275 @@ +use anyhow::{Context, Result}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_q2d, AV_NOPTS_VALUE, AVMediaType::AVMEDIA_TYPE_VIDEO, AVMediaType::AVMEDIA_TYPE_AUDIO, +}; +use ffmpeg_rs_raw::Demuxer; +use m3u8_rs::{parse_media_playlist, MediaSegmentType}; +use std::env; +use std::fs; +use std::path::{Path, PathBuf}; + +#[derive(Debug)] +struct SegmentInfo { + filename: String, + playlist_duration: f32, + actual_duration: f64, + video_duration: f64, + audio_duration: f64, + difference: f64, +} + +#[derive(Debug)] +struct SegmentDurations { + total_duration: f64, + video_duration: f64, + audio_duration: f64, + video_packets: u64, + audio_packets: u64, + video_start_pts: i64, + video_end_pts: i64, + audio_start_pts: i64, + audio_end_pts: i64, +} + +fn main() -> Result<()> { + let args: Vec = env::args().collect(); + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + eprintln!("Example: {} out/hls/8c220348-fdbb-44cd-94d5-97a11a9ec91d/stream_0", args[0]); + std::process::exit(1); + } + + let hls_dir = PathBuf::from(&args[1]); + let playlist_path = hls_dir.join("live.m3u8"); + + if !playlist_path.exists() { + eprintln!("Error: Playlist file {:?} does not exist", playlist_path); + std::process::exit(1); + } + + println!("Analyzing HLS stream: {}", hls_dir.display()); + println!("Playlist: {}", playlist_path.display()); + println!(); + + // Parse the playlist + let playlist_content = fs::read_to_string(&playlist_path) + .context("Failed to read playlist file")?; + + let (_, playlist) = parse_media_playlist(playlist_content.as_bytes()) + .map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?; + + // Analyze each segment + let mut segments = Vec::new(); + let mut total_playlist_duration = 0.0f32; + let mut total_actual_duration = 0.0f64; + + println!("Segment Analysis:"); + println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}", + "Segment", "Playlist", "Actual", "Video", "Audio", "Difference"); + println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}", + "--------", "--------", "------", "-----", "-----", "----------"); + + for segment_type in &playlist.segments { + if let MediaSegmentType::Full(segment) = segment_type { + let segment_path = hls_dir.join(&segment.uri); + + if !segment_path.exists() { + eprintln!("Warning: Segment file {:?} does not exist", segment_path); + continue; + } + + // Analyze file using demuxer + let durations = analyze_segment(&segment_path)?; + let actual_duration = durations.total_duration; + let video_duration = durations.video_duration; + let audio_duration = durations.audio_duration; + + let playlist_duration = segment.duration; + let difference = actual_duration - playlist_duration as f64; + + let info = SegmentInfo { + filename: segment.uri.clone(), + playlist_duration, + actual_duration, + video_duration, + audio_duration, + difference, + }; + + println!("{:<12} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3}", + info.filename, + info.playlist_duration, + info.actual_duration, + info.video_duration, + info.audio_duration, + info.difference); + + segments.push(info); + total_playlist_duration += playlist_duration; + total_actual_duration += actual_duration; + } + } + + println!(); + println!("Summary:"); + println!(" Total segments: {}", segments.len()); + println!(" Total playlist duration: {:.3}s", total_playlist_duration); + println!(" Total actual duration: {:.3}s", total_actual_duration); + println!(" Total difference: {:.3}s", total_actual_duration - total_playlist_duration as f64); + println!(" Average difference per segment: {:.3}s", + (total_actual_duration - total_playlist_duration as f64) / segments.len() as f64); + + // Statistics + let differences: Vec = segments.iter().map(|s| s.difference).collect(); + let min_diff = differences.iter().fold(f64::INFINITY, |a, &b| a.min(b)); + let max_diff = differences.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)); + let avg_diff = differences.iter().sum::() / differences.len() as f64; + + println!(); + println!("Difference Statistics:"); + println!(" Min difference: {:.3}s", min_diff); + println!(" Max difference: {:.3}s", max_diff); + println!(" Average difference: {:.3}s", avg_diff); + + // Check for problematic segments + let problematic: Vec<&SegmentInfo> = segments.iter() + .filter(|s| s.difference.abs() > 0.5) + .collect(); + + if !problematic.is_empty() { + println!(); + println!("Problematic segments (>0.5s difference):"); + for seg in problematic { + println!(" {}: {:.3}s difference", seg.filename, seg.difference); + } + } + + // Check playlist properties + println!(); + println!("Playlist Properties:"); + println!(" Version: {:?}", playlist.version); + println!(" Target duration: {:?}", playlist.target_duration); + println!(" Media sequence: {:?}", playlist.media_sequence); + if let Some(part_inf) = &playlist.part_inf { + println!(" Part target: {:.3}s (LL-HLS enabled)", part_inf.part_target); + } + + Ok(()) +} + +fn analyze_segment(path: &Path) -> Result { + let mut demuxer = Demuxer::new(path.to_str().unwrap())?; + + // Probe the input to get stream information + unsafe { + demuxer.probe_input()?; + } + + let mut video_start_pts = AV_NOPTS_VALUE; + let mut video_end_pts = AV_NOPTS_VALUE; + let mut audio_start_pts = AV_NOPTS_VALUE; + let mut audio_end_pts = AV_NOPTS_VALUE; + let mut video_last_duration = 0i64; + let mut audio_last_duration = 0i64; + let mut video_packets = 0u64; + let mut audio_packets = 0u64; + let mut video_stream_idx: Option = None; + let mut audio_stream_idx: Option = None; + + // Read all packets and track timing + loop { + let packet_result = unsafe { demuxer.get_packet() }; + match packet_result { + Ok((pkt, stream)) => { + if pkt.is_null() { + break; // End of stream + } + + unsafe { + let codec_type = (*(*stream).codecpar).codec_type; + let pts = (*pkt).pts; + let duration = (*pkt).duration; + let current_stream_idx = (*stream).index as usize; + + match codec_type { + AVMEDIA_TYPE_VIDEO => { + if video_stream_idx.is_none() { + video_stream_idx = Some(current_stream_idx); + } + if pts != AV_NOPTS_VALUE { + if video_start_pts == AV_NOPTS_VALUE { + video_start_pts = pts; + } + video_end_pts = pts; + video_last_duration = duration; + video_packets += 1; + } + } + AVMEDIA_TYPE_AUDIO => { + if audio_stream_idx.is_none() { + audio_stream_idx = Some(current_stream_idx); + } + if pts != AV_NOPTS_VALUE { + if audio_start_pts == AV_NOPTS_VALUE { + audio_start_pts = pts; + } + audio_end_pts = pts; + audio_last_duration = duration; + audio_packets += 1; + } + } + _ => {} + } + } + } + Err(_) => break, // End of file or error + } + } + + // Calculate durations (including last packet duration) + let video_duration = if let Some(stream_idx) = video_stream_idx { + if video_start_pts != AV_NOPTS_VALUE && video_end_pts != AV_NOPTS_VALUE { + unsafe { + let stream = demuxer.get_stream(stream_idx)?; + let time_base = (*stream).time_base; + let pts_duration = (video_end_pts - video_start_pts) as f64 * av_q2d(time_base); + let last_pkt_duration = video_last_duration as f64 * av_q2d(time_base); + pts_duration + last_pkt_duration + } + } else { + 0.0 + } + } else { + 0.0 + }; + + let audio_duration = if let Some(stream_idx) = audio_stream_idx { + if audio_start_pts != AV_NOPTS_VALUE && audio_end_pts != AV_NOPTS_VALUE { + unsafe { + let stream = demuxer.get_stream(stream_idx)?; + let time_base = (*stream).time_base; + let pts_duration = (audio_end_pts - audio_start_pts) as f64 * av_q2d(time_base); + let last_pkt_duration = audio_last_duration as f64 * av_q2d(time_base); + pts_duration + last_pkt_duration + } + } else { + 0.0 + } + } else { + 0.0 + }; + + let total_duration = video_duration.max(audio_duration); + + Ok(SegmentDurations { + total_duration, + video_duration, + audio_duration, + video_packets, + audio_packets, + video_start_pts, + video_end_pts, + audio_start_pts, + audio_end_pts, + }) +} \ No newline at end of file diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index 64020fb..32696a2 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -381,7 +381,7 @@ impl HttpServer { .header("server", "zap-stream-core") .header("access-control-allow-origin", "*") .header("access-control-allow-headers", "*") - .header("access-control-allow-methods", "HEAD, GET") + .header("access-control-allow-methods", "HEAD, GET, OPTIONS") } /// Get a response object for a file body