fix: various buffering / av sync issues
Some checks reported errors
continuous-integration/drone Build was killed

This commit is contained in:
2025-06-09 16:33:46 +01:00
parent 5d7da09801
commit 1c651108ea
7 changed files with 178 additions and 93 deletions

2
Cargo.lock generated
View File

@ -1045,7 +1045,7 @@ dependencies = [
[[package]]
name = "ffmpeg-rs-raw"
version = "0.1.0"
source = "git+https://github.com/v0l/ffmpeg-rs-raw.git?rev=8307b0a225267cefac9c174d5f6a0314a2f0a66b#8307b0a225267cefac9c174d5f6a0314a2f0a66b"
source = "git+https://github.com/v0l/ffmpeg-rs-raw.git?rev=d79693ddb0bee2e94c1db07f789523e87bf1b0fc#d79693ddb0bee2e94c1db07f789523e87bf1b0fc"
dependencies = [
"anyhow",
"ffmpeg-sys-the-third",

View File

@ -13,7 +13,7 @@ codegen-units = 1
panic = "unwind"
[workspace.dependencies]
ffmpeg-rs-raw = { git = "https://github.com/v0l/ffmpeg-rs-raw.git", rev = "8307b0a225267cefac9c174d5f6a0314a2f0a66b" }
ffmpeg-rs-raw = { git = "https://github.com/v0l/ffmpeg-rs-raw.git", rev = "d79693ddb0bee2e94c1db07f789523e87bf1b0fc" }
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
anyhow = { version = "^1.0.91", features = ["backtrace"] }
async-trait = "0.1.77"

View File

@ -15,9 +15,10 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
ip_addr: ip.to_string(),
endpoint: addr.clone(),
app_name: "".to_string(),
key: "no-key-tcp".to_string(),
key: "test".to_string(),
};
let socket = socket.into_std()?;
socket.set_nonblocking(false)?;
spawn_pipeline(
Handle::current(),
info,

View File

@ -1,15 +1,15 @@
use crate::egress::{EgressResult, EgressSegment};
use crate::variant::{StreamMapping, VariantStream};
use anyhow::{bail, Result};
use anyhow::{bail, ensure, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, AVPacket,
AVStream, AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY,
AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
};
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools;
use log::{info, warn};
use log::{info, trace, warn};
use m3u8_rs::MediaSegment;
use std::collections::HashMap;
use std::fmt::Display;
@ -72,31 +72,37 @@ impl Display for HlsVariantStream {
pub struct HlsVariant {
/// Name of this variant (720p)
pub name: String,
name: String,
/// MPEG-TS muxer for this variant
pub mux: Muxer,
mux: Muxer,
/// List of streams ids in this variant
pub streams: Vec<HlsVariantStream>,
streams: Vec<HlsVariantStream>,
/// Segment length in seconds
pub segment_length: f32,
segment_length: f32,
/// Total number of segments to store for this variant
pub segment_window: Option<u16>,
segment_window: Option<u16>,
/// Current segment index
pub idx: u64,
/// Current segment start time in seconds (duration)
pub pkt_start: f32,
idx: u64,
/// Output directory (base)
pub out_dir: String,
out_dir: String,
/// List of segments to be included in the playlist
pub segments: Vec<SegmentInfo>,
segments: Vec<SegmentInfo>,
/// Type of segments to create
pub segment_type: SegmentType,
segment_type: SegmentType,
/// Ending presentation timestamp
end_pts: i64,
/// Current segment duration in seconds (precise accumulation)
duration: f64,
/// Number of packets written to current segment
packets_written: u64,
/// Reference stream used to track duration
ref_stream_index: i32,
}
struct SegmentInfo {
pub index: u64,
pub duration: f32,
pub kind: SegmentType,
index: u64,
duration: f32,
kind: SegmentType,
}
impl SegmentInfo {
@ -146,23 +152,35 @@ impl HlsVariant {
.build()?
};
let mut streams = Vec::new();
let mut ref_stream_index = -1;
let mut has_video = false;
for (var, enc) in encoded_vars {
match var {
VariantStream::Video(v) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Video {
group,
index: (*stream).index as usize,
index: stream_idx,
id: v.id(),
})
});
has_video = true;
if ref_stream_index == -1 {
ref_stream_index = stream_idx as _;
}
},
VariantStream::Audio(a) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Audio {
group,
index: (*stream).index as usize,
index: stream_idx,
id: a.id(),
})
});
if !has_video && ref_stream_index == -1 {
ref_stream_index = stream_idx as _;
}
},
VariantStream::Subtitle(s) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
@ -175,6 +193,10 @@ impl HlsVariant {
_ => bail!("unsupported variant stream"),
}
}
ensure!(
ref_stream_index != -1,
"No reference stream found, cant create variant"
);
unsafe {
mux.open(Some(opts))?;
}
@ -185,10 +207,13 @@ impl HlsVariant {
mux,
streams,
idx: 1,
pkt_start: 0.0,
segments: Vec::new(), // Start with empty segments list
out_dir: out_dir.to_string(),
segment_type,
end_pts: AV_NOPTS_VALUE,
duration: 0.0,
packets_written: 0,
ref_stream_index,
})
}
@ -218,34 +243,54 @@ impl HlsVariant {
self.process_packet(pkt)
}
/// Process a single packet through the muxer
/// Process a single packet through the muxer - FFmpeg-style implementation
unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
let mut result = EgressResult::None;
let pkt_stream = *(*self.mux.context())
.streams
.add((*pkt).stream_index as usize);
// Match FFmpeg's segmentation logic exactly
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
let mut result = EgressResult::None;
let stream_type = (*(*pkt_stream).codecpar).codec_type;
let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO
&& ((*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY);
let mut is_ref_pkt =
stream_type == AVMEDIA_TYPE_VIDEO && (*pkt_stream).index == self.ref_stream_index;
if can_split {
let pkt_q = av_q2d((*pkt).time_base);
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
let relative_time = pkt_time - self.pkt_start;
if (*pkt).pts == AV_NOPTS_VALUE {
can_split = false;
is_ref_pkt = false;
}
// FFmpeg checks: pkt->pts - vs->end_pts > 0 to prevent zero duration
// and av_compare_ts for target duration
let has_positive_duration = relative_time > 0.0;
let target_duration_reached = relative_time >= self.segment_length;
// check if current packet is keyframe, flush current segment
if self.packets_written > 0 && can_split {
trace!(
"Segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}",
(*pkt).pts,
self.duration,
(*pkt).time_base.num,
(*pkt).time_base.den,
self.segment_length
);
if has_positive_duration && target_duration_reached {
result = self.split_next_seg(pkt_time)?;
if self.duration >= self.segment_length as f64 {
result = self.split_next_seg()?;
}
}
// Write packet directly like FFmpeg's ff_write_chained
// track duration from pts
if is_ref_pkt {
if self.end_pts == AV_NOPTS_VALUE {
self.end_pts = (*pkt).pts;
}
let pts_diff = (*pkt).pts - self.end_pts;
if pts_diff > 0 {
self.duration += pts_diff as f64 * av_q2d((*pkt).time_base);
}
self.end_pts = (*pkt).pts;
}
self.mux.write_packet(pkt)?;
self.packets_written += 1;
Ok(result)
}
@ -254,7 +299,7 @@ impl HlsVariant {
}
/// Reset the muxer state and start the next segment
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<EgressResult> {
unsafe fn split_next_seg(&mut self) -> Result<EgressResult> {
let completed_segment_idx = self.idx;
self.idx += 1;
@ -282,7 +327,6 @@ impl HlsVariant {
0,
);
let duration = pkt_time - self.pkt_start;
// Log the completed segment (previous index), not the next one
let completed_seg_path = Self::map_segment_path(
&self.out_dir,
@ -296,13 +340,14 @@ impl HlsVariant {
.map(|m| m.len())
.unwrap_or(0);
info!(
"Finished segment {} [{:.3}s, {} bytes]",
"Finished segment {} [{:.3}s, {:.2} kB, {} pkts]",
completed_segment_path
.file_name()
.unwrap_or_default()
.to_string_lossy(),
duration,
segment_size
self.duration,
segment_size as f32 / 1024f32,
self.packets_written
);
let video_var_id = self
@ -332,14 +377,17 @@ impl HlsVariant {
let created = EgressSegment {
variant: video_var_id,
idx: completed_segment_idx,
duration,
duration: self.duration as f32,
path: completed_segment_path,
};
if let Err(e) = self.push_segment(completed_segment_idx, duration) {
if let Err(e) = self.push_segment(completed_segment_idx, self.duration as f32) {
warn!("Failed to update playlist: {}", e);
}
self.pkt_start = pkt_time;
self.packets_written = 0;
self.duration = 0.0;
Ok(EgressResult::Segments {
created: vec![created],
deleted,
@ -381,6 +429,7 @@ impl HlsVariant {
e
);
}
info!("Removed segment file: {}", seg_path.display());
ret.push(seg);
}
}

View File

@ -21,8 +21,8 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVFrame, AVMediaType,
AVStream,
av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame, AVMediaType, AVStream,
AV_NOPTS_VALUE,
};
use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
@ -194,61 +194,54 @@ impl PipelineRunner {
// scaling / resampling
let mut new_frame = false;
let mut frame = match var {
match var {
VariantStream::Video(v) => {
if let Some(s) = self.scalers.get_mut(&v.id()) {
let mut frame = if let Some(s) = self.scalers.get_mut(&v.id()) {
new_frame = true;
s.process_frame(frame, v.width, v.height, transmute(v.pixel_format))?
} else {
frame
};
egress_results.extend(Self::encode_mux_frame(
&mut self.egress,
var,
enc,
frame,
)?);
if new_frame {
av_frame_free(&mut frame);
}
}
VariantStream::Audio(a) => {
if let Some((r, f)) = self.resampler.get_mut(&a.id()) {
let frame_size = (*enc.codec_context()).frame_size;
new_frame = true;
let mut resampled_frame = r.process_frame(frame)?;
if let Some(ret) = f.buffer_frame(resampled_frame, frame_size as usize)? {
f.buffer_frame(resampled_frame)?;
av_frame_free(&mut resampled_frame);
// drain FIFO
while let Some(mut frame) = f.get_frame(frame_size as usize)? {
// Set correct timebase for audio (1/sample_rate)
(*ret).time_base.num = 1;
(*ret).time_base.den = a.sample_rate as i32;
av_frame_free(&mut resampled_frame);
ret
} else {
av_frame_free(&mut resampled_frame);
continue;
(*frame).time_base.num = 1;
(*frame).time_base.den = a.sample_rate as i32;
egress_results.extend(Self::encode_mux_frame(
&mut self.egress,
var,
enc,
frame,
)?);
av_frame_free(&mut frame);
}
} else {
frame
egress_results.extend(Self::encode_mux_frame(
&mut self.egress,
var,
enc,
frame,
)?);
}
}
_ => frame,
};
// before encoding frame, rescale timestamps
if !frame.is_null() {
let enc_ctx = enc.codec_context();
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
(*frame).pts = av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).pkt_dts =
av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).duration =
av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base);
(*frame).time_base = (*enc_ctx).time_base;
}
let packets = enc.encode_frame(frame)?;
// pass new packets to egress
for mut pkt in packets {
for eg in self.egress.iter_mut() {
let er = eg.process_pkt(pkt, &var.id())?;
egress_results.push(er);
}
av_packet_free(&mut pkt);
}
if new_frame {
av_frame_free(&mut frame);
_ => {}
}
}
@ -256,6 +249,38 @@ impl PipelineRunner {
Ok(egress_results)
}
unsafe fn encode_mux_frame(
egress: &mut Vec<Box<dyn Egress>>,
var: &VariantStream,
encoder: &mut Encoder,
frame: *mut AVFrame,
) -> Result<Vec<EgressResult>> {
let mut ret = vec![];
// before encoding frame, rescale timestamps
if !frame.is_null() {
let enc_ctx = encoder.codec_context();
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
(*frame).pts = av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).pkt_dts =
av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).duration =
av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base);
(*frame).time_base = (*enc_ctx).time_base;
}
let packets = encoder.encode_frame(frame)?;
// pass new packets to egress
for mut pkt in packets {
for eg in egress.iter_mut() {
let er = eg.process_pkt(pkt, &var.id())?;
ret.push(er);
}
av_packet_free(&mut pkt);
}
Ok(ret)
}
/// EOF, cleanup
pub unsafe fn flush(&mut self) -> Result<()> {
for (var, enc) in &mut self.encoders {
@ -362,6 +387,11 @@ impl PipelineRunner {
let mut egress_results = vec![];
for (frame, stream) in frames {
// Adjust frame pts time without start_offset
// Egress streams don't have a start time offset
if (*stream).start_time != AV_NOPTS_VALUE {
(*frame).pts -= (*stream).start_time;
}
let results = self.process_frame(&config, stream, frame)?;
egress_results.extend(results);
}
@ -405,6 +435,8 @@ impl PipelineRunner {
let info = self.demuxer.probe_input()?;
info!("{}", info);
// convert to internal type
let i_info = IngressInfo {
bitrate: info.bitrate,

View File

@ -5,7 +5,6 @@ endpoints:
- "rtmp://127.0.0.1:3336"
- "srt://127.0.0.1:3335"
- "tcp://127.0.0.1:3334"
- "test-pattern://"
# Public hostname which points to the IP address used to listen for all [endpoints]
endpoints_public_hostname: "localhost"

View File

@ -29,7 +29,6 @@ use crate::monitor::BackgroundMonitor;
use crate::overseer::ZapStreamOverseer;
use crate::settings::Settings;
use zap_stream_core::ingress::{file, tcp};
use zap_stream_core::overseer::Overseer;
mod api;
mod blossom;
@ -76,7 +75,12 @@ async fn main() -> Result<()> {
// Create shared stream cache
let stream_cache: StreamCache = Arc::new(RwLock::new(None));
// HTTP server
let server = HttpServer::new(index_template.to_string(), PathBuf::from(settings.output_dir), api, stream_cache);
let server = HttpServer::new(
index_template.to_string(),
PathBuf::from(settings.output_dir),
api,
stream_cache,
);
tasks.push(tokio::spawn(async move {
let listener = TcpListener::bind(&http_addr).await?;