From e50e789f2eceaadd4f61a4f9c28cb06549cf4bd8 Mon Sep 17 00:00:00 2001 From: Kieran Date: Fri, 6 Jun 2025 17:24:53 +0100 Subject: [PATCH] fix: buffer packets to handle out of order pts fix: always split on keyframes --- crates/core/src/mux/hls.rs | 138 +++++++++++++++++++++++++++++++++---- 1 file changed, 126 insertions(+), 12 deletions(-) diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index 096fefe..1f325cf 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -4,14 +4,14 @@ use anyhow::{bail, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_free, av_opt_set, av_q2d, av_write_frame, avio_flush, avio_open, AVPacket, AVStream, - AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY, + av_free, av_opt_set, av_packet_clone, av_packet_free, av_q2d, av_write_frame, avio_flush, + avio_open, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; use log::{info, warn}; use m3u8_rs::MediaSegment; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt::Display; use std::fs::File; use std::path::PathBuf; @@ -24,6 +24,47 @@ pub enum SegmentType { FMP4, } +/// A buffered packet that owns its data for reordering +struct BufferedPacket { + /// Owned copy of the AVPacket + packet: *mut AVPacket, + /// PTS value for sorting + pts: i64, + /// Time base for PTS conversion + time_base: ffmpeg_rs_raw::ffmpeg_sys_the_third::AVRational, +} + +impl BufferedPacket { + unsafe fn new(pkt: *mut AVPacket) -> Result { + let cloned_pkt = av_packet_clone(pkt); + if cloned_pkt.is_null() { + bail!("Failed to clone packet"); + } + Ok(Self { + packet: cloned_pkt, + pts: (*pkt).pts, + time_base: (*pkt).time_base, + }) + } + + fn get_time(&self) -> f32 { + unsafe { + let pkt_q = av_q2d(self.time_base); + self.pts as f32 * pkt_q as f32 + } + } +} + +impl Drop for BufferedPacket { + fn drop(&mut self) { + unsafe { + if !self.packet.is_null() { + av_packet_free(&mut self.packet); + } + } + } +} + pub enum HlsVariantStream { Video { group: usize, @@ -91,6 +132,10 @@ pub struct HlsVariant { pub segments: Vec, /// Type of segments to create pub segment_type: SegmentType, + /// Packet queue for reordering out-of-order packets + pub packet_queue: VecDeque, + /// Expected next PTS value for ordering + pub expected_next_pts: Option, } struct SegmentInfo { @@ -189,6 +234,8 @@ impl HlsVariant { segments: Vec::new(), // Start with empty segments list out_dir: out_dir.to_string(), segment_type, + packet_queue: VecDeque::new(), + expected_next_pts: None, }) } @@ -213,12 +260,65 @@ impl HlsVariant { /// Mux a packet created by the encoder for this variant pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result { + const MAX_QUEUE_SIZE: usize = 5; + + let current_pts = (*pkt).pts; + let mut result = EgressResult::None; + + // Check if this is the expected packet (in order) + if let Some(expected_pts) = self.expected_next_pts { + if current_pts == expected_pts { + // Packet is in order, process it directly + let next_pts = current_pts + (*pkt).duration; + self.expected_next_pts = Some(next_pts); + result = self.process_packet(pkt)?; + + // After processing in-order packet, check if any queued packets can now be processed + while !self.packet_queue.is_empty() { + if let Some(next_expected_pts) = self.expected_next_pts { + if let Some(pos) = self + .packet_queue + .iter() + .position(|p| p.pts == next_expected_pts) + { + let buffered = self.packet_queue.remove(pos).unwrap(); + let next_pts = buffered.pts + (*buffered.packet).duration; + self.expected_next_pts = Some(next_pts); + result = self.process_packet(buffered.packet)?; + } else { + break; + } + } else { + break; + } + } + } else { + // Packet is out of order, add to queue + self.packet_queue.push_back(BufferedPacket::new(pkt)?); + + // Check if queue is too long - abort muxing if so + if self.packet_queue.len() > MAX_QUEUE_SIZE { + bail!( + "Packet queue overflow - too many out-of-order packets. Queue size: {}", + self.packet_queue.len() + ); + } + } + } else { + // Initialize expected_next_pts with first packet + let next_pts = current_pts + (*pkt).duration; + self.expected_next_pts = Some(next_pts); + result = self.process_packet(pkt)?; + } + + Ok(result) + } + + /// Process a single packet through the muxer + unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result { let pkt_q = av_q2d((*pkt).time_base); - // time of this packet in seconds let pkt_time = (*pkt).pts as f32 * pkt_q as f32; - // what segment this pkt should be in (index) - use relative time from start let relative_time = pkt_time - self.pkt_start; - let pkt_seg = self.idx + (relative_time / self.segment_length).floor() as u64; let mut result = EgressResult::None; let pkt_stream = *(*self.mux.context()) @@ -226,9 +326,14 @@ impl HlsVariant { .add((*pkt).stream_index as usize); let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY && (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO; - if pkt_seg != self.idx && can_split { + + let min_duration = self.segment_length * 0.5; + let should_split = can_split && relative_time >= min_duration; + + if should_split { result = self.split_next_seg(pkt_time)?; } + self.mux.write_packet(pkt)?; Ok(result) } @@ -265,11 +370,20 @@ impl HlsVariant { ); let duration = pkt_time - self.pkt_start; - let segment_path = PathBuf::from(&next_seg_url); + // Log the completed segment (previous index), not the next one + let completed_seg_path = + Self::map_segment_path(&self.out_dir, &self.name, self.idx - 1, self.segment_type); + let segment_path = PathBuf::from(&completed_seg_path); let segment_size = segment_path.metadata().map(|m| m.len()).unwrap_or(0); - info!("Writing segment {} [{:.3}s, {} bytes]", - segment_path.file_name().unwrap_or_default().to_string_lossy(), - duration, segment_size); + info!( + "Writing segment {} [{:.3}s, {} bytes]", + segment_path + .file_name() + .unwrap_or_default() + .to_string_lossy(), + duration, + segment_size + ); if let Err(e) = self.push_segment(self.idx, duration) { warn!("Failed to update playlist: {}", e); } @@ -365,7 +479,7 @@ impl HlsVariant { if self.segments.is_empty() { return Ok(()); // Don't write empty playlists } - + let mut pl = m3u8_rs::MediaPlaylist::default(); // Round up target duration to ensure compliance pl.target_duration = (self.segment_length.ceil() as u64).max(1);