fix: buffer packets to handle out of order pts

fix: always split on keyframes
This commit is contained in:
2025-06-06 17:24:53 +01:00
parent 338533e261
commit e50e789f2e

View File

@ -4,14 +4,14 @@ use anyhow::{bail, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_opt_set, av_q2d, av_write_frame, avio_flush, avio_open, AVPacket, AVStream, av_free, av_opt_set, av_packet_clone, av_packet_free, av_q2d, av_write_frame, avio_flush,
AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY, avio_open, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY,
}; };
use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools; use itertools::Itertools;
use log::{info, warn}; use log::{info, warn};
use m3u8_rs::MediaSegment; use m3u8_rs::MediaSegment;
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use std::fmt::Display; use std::fmt::Display;
use std::fs::File; use std::fs::File;
use std::path::PathBuf; use std::path::PathBuf;
@ -24,6 +24,47 @@ pub enum SegmentType {
FMP4, FMP4,
} }
/// A buffered packet that owns its data for reordering
struct BufferedPacket {
/// Owned copy of the AVPacket
packet: *mut AVPacket,
/// PTS value for sorting
pts: i64,
/// Time base for PTS conversion
time_base: ffmpeg_rs_raw::ffmpeg_sys_the_third::AVRational,
}
impl BufferedPacket {
unsafe fn new(pkt: *mut AVPacket) -> Result<Self> {
let cloned_pkt = av_packet_clone(pkt);
if cloned_pkt.is_null() {
bail!("Failed to clone packet");
}
Ok(Self {
packet: cloned_pkt,
pts: (*pkt).pts,
time_base: (*pkt).time_base,
})
}
fn get_time(&self) -> f32 {
unsafe {
let pkt_q = av_q2d(self.time_base);
self.pts as f32 * pkt_q as f32
}
}
}
impl Drop for BufferedPacket {
fn drop(&mut self) {
unsafe {
if !self.packet.is_null() {
av_packet_free(&mut self.packet);
}
}
}
}
pub enum HlsVariantStream { pub enum HlsVariantStream {
Video { Video {
group: usize, group: usize,
@ -91,6 +132,10 @@ pub struct HlsVariant {
pub segments: Vec<SegmentInfo>, pub segments: Vec<SegmentInfo>,
/// Type of segments to create /// Type of segments to create
pub segment_type: SegmentType, pub segment_type: SegmentType,
/// Packet queue for reordering out-of-order packets
pub packet_queue: VecDeque<BufferedPacket>,
/// Expected next PTS value for ordering
pub expected_next_pts: Option<i64>,
} }
struct SegmentInfo { struct SegmentInfo {
@ -189,6 +234,8 @@ impl HlsVariant {
segments: Vec::new(), // Start with empty segments list segments: Vec::new(), // Start with empty segments list
out_dir: out_dir.to_string(), out_dir: out_dir.to_string(),
segment_type, segment_type,
packet_queue: VecDeque::new(),
expected_next_pts: None,
}) })
} }
@ -213,12 +260,65 @@ impl HlsVariant {
/// Mux a packet created by the encoder for this variant /// Mux a packet created by the encoder for this variant
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> { pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
const MAX_QUEUE_SIZE: usize = 5;
let current_pts = (*pkt).pts;
let mut result = EgressResult::None;
// Check if this is the expected packet (in order)
if let Some(expected_pts) = self.expected_next_pts {
if current_pts == expected_pts {
// Packet is in order, process it directly
let next_pts = current_pts + (*pkt).duration;
self.expected_next_pts = Some(next_pts);
result = self.process_packet(pkt)?;
// After processing in-order packet, check if any queued packets can now be processed
while !self.packet_queue.is_empty() {
if let Some(next_expected_pts) = self.expected_next_pts {
if let Some(pos) = self
.packet_queue
.iter()
.position(|p| p.pts == next_expected_pts)
{
let buffered = self.packet_queue.remove(pos).unwrap();
let next_pts = buffered.pts + (*buffered.packet).duration;
self.expected_next_pts = Some(next_pts);
result = self.process_packet(buffered.packet)?;
} else {
break;
}
} else {
break;
}
}
} else {
// Packet is out of order, add to queue
self.packet_queue.push_back(BufferedPacket::new(pkt)?);
// Check if queue is too long - abort muxing if so
if self.packet_queue.len() > MAX_QUEUE_SIZE {
bail!(
"Packet queue overflow - too many out-of-order packets. Queue size: {}",
self.packet_queue.len()
);
}
}
} else {
// Initialize expected_next_pts with first packet
let next_pts = current_pts + (*pkt).duration;
self.expected_next_pts = Some(next_pts);
result = self.process_packet(pkt)?;
}
Ok(result)
}
/// Process a single packet through the muxer
unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
let pkt_q = av_q2d((*pkt).time_base); let pkt_q = av_q2d((*pkt).time_base);
// time of this packet in seconds
let pkt_time = (*pkt).pts as f32 * pkt_q as f32; let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
// what segment this pkt should be in (index) - use relative time from start
let relative_time = pkt_time - self.pkt_start; let relative_time = pkt_time - self.pkt_start;
let pkt_seg = self.idx + (relative_time / self.segment_length).floor() as u64;
let mut result = EgressResult::None; let mut result = EgressResult::None;
let pkt_stream = *(*self.mux.context()) let pkt_stream = *(*self.mux.context())
@ -226,9 +326,14 @@ impl HlsVariant {
.add((*pkt).stream_index as usize); .add((*pkt).stream_index as usize);
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO; && (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
if pkt_seg != self.idx && can_split {
let min_duration = self.segment_length * 0.5;
let should_split = can_split && relative_time >= min_duration;
if should_split {
result = self.split_next_seg(pkt_time)?; result = self.split_next_seg(pkt_time)?;
} }
self.mux.write_packet(pkt)?; self.mux.write_packet(pkt)?;
Ok(result) Ok(result)
} }
@ -265,11 +370,20 @@ impl HlsVariant {
); );
let duration = pkt_time - self.pkt_start; let duration = pkt_time - self.pkt_start;
let segment_path = PathBuf::from(&next_seg_url); // Log the completed segment (previous index), not the next one
let completed_seg_path =
Self::map_segment_path(&self.out_dir, &self.name, self.idx - 1, self.segment_type);
let segment_path = PathBuf::from(&completed_seg_path);
let segment_size = segment_path.metadata().map(|m| m.len()).unwrap_or(0); let segment_size = segment_path.metadata().map(|m| m.len()).unwrap_or(0);
info!("Writing segment {} [{:.3}s, {} bytes]", info!(
segment_path.file_name().unwrap_or_default().to_string_lossy(), "Writing segment {} [{:.3}s, {} bytes]",
duration, segment_size); segment_path
.file_name()
.unwrap_or_default()
.to_string_lossy(),
duration,
segment_size
);
if let Err(e) = self.push_segment(self.idx, duration) { if let Err(e) = self.push_segment(self.idx, duration) {
warn!("Failed to update playlist: {}", e); warn!("Failed to update playlist: {}", e);
} }
@ -365,7 +479,7 @@ impl HlsVariant {
if self.segments.is_empty() { if self.segments.is_empty() {
return Ok(()); // Don't write empty playlists return Ok(()); // Don't write empty playlists
} }
let mut pl = m3u8_rs::MediaPlaylist::default(); let mut pl = m3u8_rs::MediaPlaylist::default();
// Round up target duration to ensure compliance // Round up target duration to ensure compliance
pl.target_duration = (self.segment_length.ceil() as u64).max(1); pl.target_duration = (self.segment_length.ceil() as u64).max(1);