From 1c651108eac66116936260bad23da6922c852160 Mon Sep 17 00:00:00 2001 From: Kieran Date: Mon, 9 Jun 2025 16:33:46 +0100 Subject: [PATCH] fix: various buffering / av sync issues --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/core/src/ingress/tcp.rs | 3 +- crates/core/src/mux/hls.rs | 141 +++++++++++++++++++---------- crates/core/src/pipeline/runner.rs | 114 ++++++++++++++--------- crates/zap-stream/config.yaml | 1 - crates/zap-stream/src/main.rs | 8 +- 7 files changed, 178 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99897da..0f9dd4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1045,7 +1045,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://github.com/v0l/ffmpeg-rs-raw.git?rev=8307b0a225267cefac9c174d5f6a0314a2f0a66b#8307b0a225267cefac9c174d5f6a0314a2f0a66b" +source = "git+https://github.com/v0l/ffmpeg-rs-raw.git?rev=d79693ddb0bee2e94c1db07f789523e87bf1b0fc#d79693ddb0bee2e94c1db07f789523e87bf1b0fc" dependencies = [ "anyhow", "ffmpeg-sys-the-third", diff --git a/Cargo.toml b/Cargo.toml index ff49da2..0696139 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ codegen-units = 1 panic = "unwind" [workspace.dependencies] -ffmpeg-rs-raw = { git = "https://github.com/v0l/ffmpeg-rs-raw.git", rev = "8307b0a225267cefac9c174d5f6a0314a2f0a66b" } +ffmpeg-rs-raw = { git = "https://github.com/v0l/ffmpeg-rs-raw.git", rev = "d79693ddb0bee2e94c1db07f789523e87bf1b0fc" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } async-trait = "0.1.77" diff --git a/crates/core/src/ingress/tcp.rs b/crates/core/src/ingress/tcp.rs index 47b014a..f24163e 100644 --- a/crates/core/src/ingress/tcp.rs +++ b/crates/core/src/ingress/tcp.rs @@ -15,9 +15,10 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) ip_addr: ip.to_string(), endpoint: addr.clone(), app_name: "".to_string(), - key: "no-key-tcp".to_string(), + key: "test".to_string(), }; let socket = socket.into_std()?; + socket.set_nonblocking(false)?; spawn_pipeline( Handle::current(), info, diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index f328fa0..e54c854 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -1,15 +1,15 @@ use crate::egress::{EgressResult, EgressSegment}; use crate::variant::{StreamMapping, VariantStream}; -use anyhow::{bail, Result}; +use anyhow::{bail, ensure, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, AVPacket, - AVStream, AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY, + AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; -use log::{info, warn}; +use log::{info, trace, warn}; use m3u8_rs::MediaSegment; use std::collections::HashMap; use std::fmt::Display; @@ -72,31 +72,37 @@ impl Display for HlsVariantStream { pub struct HlsVariant { /// Name of this variant (720p) - pub name: String, + name: String, /// MPEG-TS muxer for this variant - pub mux: Muxer, + mux: Muxer, /// List of streams ids in this variant - pub streams: Vec, + streams: Vec, /// Segment length in seconds - pub segment_length: f32, + segment_length: f32, /// Total number of segments to store for this variant - pub segment_window: Option, + segment_window: Option, /// Current segment index - pub idx: u64, - /// Current segment start time in seconds (duration) - pub pkt_start: f32, + idx: u64, /// Output directory (base) - pub out_dir: String, + out_dir: String, /// List of segments to be included in the playlist - pub segments: Vec, + segments: Vec, /// Type of segments to create - pub segment_type: SegmentType, + segment_type: SegmentType, + /// Ending presentation timestamp + end_pts: i64, + /// Current segment duration in seconds (precise accumulation) + duration: f64, + /// Number of packets written to current segment + packets_written: u64, + /// Reference stream used to track duration + ref_stream_index: i32, } struct SegmentInfo { - pub index: u64, - pub duration: f32, - pub kind: SegmentType, + index: u64, + duration: f32, + kind: SegmentType, } impl SegmentInfo { @@ -146,23 +152,35 @@ impl HlsVariant { .build()? }; let mut streams = Vec::new(); + let mut ref_stream_index = -1; + let mut has_video = false; + for (var, enc) in encoded_vars { match var { VariantStream::Video(v) => unsafe { let stream = mux.add_stream_encoder(enc)?; + let stream_idx = (*stream).index as usize; streams.push(HlsVariantStream::Video { group, - index: (*stream).index as usize, + index: stream_idx, id: v.id(), - }) + }); + has_video = true; + if ref_stream_index == -1 { + ref_stream_index = stream_idx as _; + } }, VariantStream::Audio(a) => unsafe { let stream = mux.add_stream_encoder(enc)?; + let stream_idx = (*stream).index as usize; streams.push(HlsVariantStream::Audio { group, - index: (*stream).index as usize, + index: stream_idx, id: a.id(), - }) + }); + if !has_video && ref_stream_index == -1 { + ref_stream_index = stream_idx as _; + } }, VariantStream::Subtitle(s) => unsafe { let stream = mux.add_stream_encoder(enc)?; @@ -175,6 +193,10 @@ impl HlsVariant { _ => bail!("unsupported variant stream"), } } + ensure!( + ref_stream_index != -1, + "No reference stream found, cant create variant" + ); unsafe { mux.open(Some(opts))?; } @@ -185,10 +207,13 @@ impl HlsVariant { mux, streams, idx: 1, - pkt_start: 0.0, segments: Vec::new(), // Start with empty segments list out_dir: out_dir.to_string(), segment_type, + end_pts: AV_NOPTS_VALUE, + duration: 0.0, + packets_written: 0, + ref_stream_index, }) } @@ -218,34 +243,54 @@ impl HlsVariant { self.process_packet(pkt) } - /// Process a single packet through the muxer + /// Process a single packet through the muxer - FFmpeg-style implementation unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result { - let mut result = EgressResult::None; let pkt_stream = *(*self.mux.context()) .streams .add((*pkt).stream_index as usize); - // Match FFmpeg's segmentation logic exactly - let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY - && (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO; + let mut result = EgressResult::None; + let stream_type = (*(*pkt_stream).codecpar).codec_type; + let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO + && ((*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY); + let mut is_ref_pkt = + stream_type == AVMEDIA_TYPE_VIDEO && (*pkt_stream).index == self.ref_stream_index; - if can_split { - let pkt_q = av_q2d((*pkt).time_base); - let pkt_time = (*pkt).pts as f32 * pkt_q as f32; - let relative_time = pkt_time - self.pkt_start; + if (*pkt).pts == AV_NOPTS_VALUE { + can_split = false; + is_ref_pkt = false; + } - // FFmpeg checks: pkt->pts - vs->end_pts > 0 to prevent zero duration - // and av_compare_ts for target duration - let has_positive_duration = relative_time > 0.0; - let target_duration_reached = relative_time >= self.segment_length; + // check if current packet is keyframe, flush current segment + if self.packets_written > 0 && can_split { + trace!( + "Segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}", + (*pkt).pts, + self.duration, + (*pkt).time_base.num, + (*pkt).time_base.den, + self.segment_length + ); - if has_positive_duration && target_duration_reached { - result = self.split_next_seg(pkt_time)?; + if self.duration >= self.segment_length as f64 { + result = self.split_next_seg()?; } } - // Write packet directly like FFmpeg's ff_write_chained + // track duration from pts + if is_ref_pkt { + if self.end_pts == AV_NOPTS_VALUE { + self.end_pts = (*pkt).pts; + } + let pts_diff = (*pkt).pts - self.end_pts; + if pts_diff > 0 { + self.duration += pts_diff as f64 * av_q2d((*pkt).time_base); + } + self.end_pts = (*pkt).pts; + } + self.mux.write_packet(pkt)?; + self.packets_written += 1; Ok(result) } @@ -254,7 +299,7 @@ impl HlsVariant { } /// Reset the muxer state and start the next segment - unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result { + unsafe fn split_next_seg(&mut self) -> Result { let completed_segment_idx = self.idx; self.idx += 1; @@ -282,7 +327,6 @@ impl HlsVariant { 0, ); - let duration = pkt_time - self.pkt_start; // Log the completed segment (previous index), not the next one let completed_seg_path = Self::map_segment_path( &self.out_dir, @@ -296,13 +340,14 @@ impl HlsVariant { .map(|m| m.len()) .unwrap_or(0); info!( - "Finished segment {} [{:.3}s, {} bytes]", + "Finished segment {} [{:.3}s, {:.2} kB, {} pkts]", completed_segment_path .file_name() .unwrap_or_default() .to_string_lossy(), - duration, - segment_size + self.duration, + segment_size as f32 / 1024f32, + self.packets_written ); let video_var_id = self @@ -332,14 +377,17 @@ impl HlsVariant { let created = EgressSegment { variant: video_var_id, idx: completed_segment_idx, - duration, + duration: self.duration as f32, path: completed_segment_path, }; - if let Err(e) = self.push_segment(completed_segment_idx, duration) { + if let Err(e) = self.push_segment(completed_segment_idx, self.duration as f32) { warn!("Failed to update playlist: {}", e); } - self.pkt_start = pkt_time; + + self.packets_written = 0; + self.duration = 0.0; + Ok(EgressResult::Segments { created: vec![created], deleted, @@ -381,6 +429,7 @@ impl HlsVariant { e ); } + info!("Removed segment file: {}", seg_path.display()); ret.push(seg); } } diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 644e03b..10cc094 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -21,8 +21,8 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVFrame, AVMediaType, - AVStream, + av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame, AVMediaType, AVStream, + AV_NOPTS_VALUE, }; use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, @@ -194,61 +194,54 @@ impl PipelineRunner { // scaling / resampling let mut new_frame = false; - let mut frame = match var { + match var { VariantStream::Video(v) => { - if let Some(s) = self.scalers.get_mut(&v.id()) { + let mut frame = if let Some(s) = self.scalers.get_mut(&v.id()) { new_frame = true; s.process_frame(frame, v.width, v.height, transmute(v.pixel_format))? } else { frame + }; + egress_results.extend(Self::encode_mux_frame( + &mut self.egress, + var, + enc, + frame, + )?); + if new_frame { + av_frame_free(&mut frame); } } VariantStream::Audio(a) => { if let Some((r, f)) = self.resampler.get_mut(&a.id()) { let frame_size = (*enc.codec_context()).frame_size; - new_frame = true; let mut resampled_frame = r.process_frame(frame)?; - if let Some(ret) = f.buffer_frame(resampled_frame, frame_size as usize)? { + f.buffer_frame(resampled_frame)?; + av_frame_free(&mut resampled_frame); + // drain FIFO + while let Some(mut frame) = f.get_frame(frame_size as usize)? { // Set correct timebase for audio (1/sample_rate) - (*ret).time_base.num = 1; - (*ret).time_base.den = a.sample_rate as i32; - av_frame_free(&mut resampled_frame); - ret - } else { - av_frame_free(&mut resampled_frame); - continue; + (*frame).time_base.num = 1; + (*frame).time_base.den = a.sample_rate as i32; + + egress_results.extend(Self::encode_mux_frame( + &mut self.egress, + var, + enc, + frame, + )?); + av_frame_free(&mut frame); } } else { - frame + egress_results.extend(Self::encode_mux_frame( + &mut self.egress, + var, + enc, + frame, + )?); } } - _ => frame, - }; - - // before encoding frame, rescale timestamps - if !frame.is_null() { - let enc_ctx = enc.codec_context(); - (*frame).pict_type = AV_PICTURE_TYPE_NONE; - (*frame).pts = av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).pkt_dts = - av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).duration = - av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); - (*frame).time_base = (*enc_ctx).time_base; - } - - let packets = enc.encode_frame(frame)?; - // pass new packets to egress - for mut pkt in packets { - for eg in self.egress.iter_mut() { - let er = eg.process_pkt(pkt, &var.id())?; - egress_results.push(er); - } - av_packet_free(&mut pkt); - } - - if new_frame { - av_frame_free(&mut frame); + _ => {} } } @@ -256,6 +249,38 @@ impl PipelineRunner { Ok(egress_results) } + unsafe fn encode_mux_frame( + egress: &mut Vec>, + var: &VariantStream, + encoder: &mut Encoder, + frame: *mut AVFrame, + ) -> Result> { + let mut ret = vec![]; + // before encoding frame, rescale timestamps + if !frame.is_null() { + let enc_ctx = encoder.codec_context(); + (*frame).pict_type = AV_PICTURE_TYPE_NONE; + (*frame).pts = av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).pkt_dts = + av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).duration = + av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); + (*frame).time_base = (*enc_ctx).time_base; + } + + let packets = encoder.encode_frame(frame)?; + // pass new packets to egress + for mut pkt in packets { + for eg in egress.iter_mut() { + let er = eg.process_pkt(pkt, &var.id())?; + ret.push(er); + } + av_packet_free(&mut pkt); + } + + Ok(ret) + } + /// EOF, cleanup pub unsafe fn flush(&mut self) -> Result<()> { for (var, enc) in &mut self.encoders { @@ -362,6 +387,11 @@ impl PipelineRunner { let mut egress_results = vec![]; for (frame, stream) in frames { + // Adjust frame pts time without start_offset + // Egress streams don't have a start time offset + if (*stream).start_time != AV_NOPTS_VALUE { + (*frame).pts -= (*stream).start_time; + } let results = self.process_frame(&config, stream, frame)?; egress_results.extend(results); } @@ -405,6 +435,8 @@ impl PipelineRunner { let info = self.demuxer.probe_input()?; + info!("{}", info); + // convert to internal type let i_info = IngressInfo { bitrate: info.bitrate, diff --git a/crates/zap-stream/config.yaml b/crates/zap-stream/config.yaml index 7f6f8af..f24b6ef 100755 --- a/crates/zap-stream/config.yaml +++ b/crates/zap-stream/config.yaml @@ -5,7 +5,6 @@ endpoints: - "rtmp://127.0.0.1:3336" - "srt://127.0.0.1:3335" - "tcp://127.0.0.1:3334" - - "test-pattern://" # Public hostname which points to the IP address used to listen for all [endpoints] endpoints_public_hostname: "localhost" diff --git a/crates/zap-stream/src/main.rs b/crates/zap-stream/src/main.rs index ebbe5e6..d3415de 100644 --- a/crates/zap-stream/src/main.rs +++ b/crates/zap-stream/src/main.rs @@ -29,7 +29,6 @@ use crate::monitor::BackgroundMonitor; use crate::overseer::ZapStreamOverseer; use crate::settings::Settings; use zap_stream_core::ingress::{file, tcp}; -use zap_stream_core::overseer::Overseer; mod api; mod blossom; @@ -76,7 +75,12 @@ async fn main() -> Result<()> { // Create shared stream cache let stream_cache: StreamCache = Arc::new(RwLock::new(None)); // HTTP server - let server = HttpServer::new(index_template.to_string(), PathBuf::from(settings.output_dir), api, stream_cache); + let server = HttpServer::new( + index_template.to_string(), + PathBuf::from(settings.output_dir), + api, + stream_cache, + ); tasks.push(tokio::spawn(async move { let listener = TcpListener::bind(&http_addr).await?;