fix: revert packet buffer, match ffmpeg hlsenc.c

This commit is contained in:
2025-06-06 18:17:57 +01:00
parent 86041ebed1
commit 29919c63f5

View File

@ -4,14 +4,14 @@ use anyhow::{bail, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_opt_set, av_packet_clone, av_packet_free, av_q2d, av_write_frame, avio_close,
av_free, av_opt_set, av_q2d, av_write_frame, avio_close,
avio_flush, avio_open, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY,
};
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools;
use log::{info, warn};
use m3u8_rs::MediaSegment;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::fmt::Display;
use std::fs::File;
use std::path::PathBuf;
@ -24,46 +24,6 @@ pub enum SegmentType {
FMP4,
}
/// A buffered packet that owns its data for reordering
struct BufferedPacket {
/// Owned copy of the AVPacket
packet: *mut AVPacket,
/// PTS value for sorting
pts: i64,
/// Time base for PTS conversion
time_base: ffmpeg_rs_raw::ffmpeg_sys_the_third::AVRational,
}
impl BufferedPacket {
unsafe fn new(pkt: *mut AVPacket) -> Result<Self> {
let cloned_pkt = av_packet_clone(pkt);
if cloned_pkt.is_null() {
bail!("Failed to clone packet");
}
Ok(Self {
packet: cloned_pkt,
pts: (*pkt).pts,
time_base: (*pkt).time_base,
})
}
fn get_time(&self) -> f32 {
unsafe {
let pkt_q = av_q2d(self.time_base);
self.pts as f32 * pkt_q as f32
}
}
}
impl Drop for BufferedPacket {
fn drop(&mut self) {
unsafe {
if !self.packet.is_null() {
av_packet_free(&mut self.packet);
}
}
}
}
pub enum HlsVariantStream {
Video {
@ -132,10 +92,6 @@ pub struct HlsVariant {
pub segments: Vec<SegmentInfo>,
/// Type of segments to create
pub segment_type: SegmentType,
/// Packet queue for reordering out-of-order packets
pub packet_queue: VecDeque<BufferedPacket>,
/// Expected next PTS value for ordering
pub expected_next_pts: Option<i64>,
}
struct SegmentInfo {
@ -234,8 +190,6 @@ impl HlsVariant {
segments: Vec::new(), // Start with empty segments list
out_dir: out_dir.to_string(),
segment_type,
packet_queue: VecDeque::new(),
expected_next_pts: None,
})
}
@ -260,80 +214,38 @@ impl HlsVariant {
/// Mux a packet created by the encoder for this variant
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
const MAX_QUEUE_SIZE: usize = 5;
let current_pts = (*pkt).pts;
let mut result = EgressResult::None;
// Check if this is the expected packet (in order)
if let Some(expected_pts) = self.expected_next_pts {
if current_pts == expected_pts {
// Packet is in order, process it directly
let next_pts = current_pts + (*pkt).duration;
self.expected_next_pts = Some(next_pts);
result = self.process_packet(pkt)?;
// After processing in-order packet, check if any queued packets can now be processed
while !self.packet_queue.is_empty() {
if let Some(next_expected_pts) = self.expected_next_pts {
if let Some(pos) = self
.packet_queue
.iter()
.position(|p| p.pts == next_expected_pts)
{
let buffered = self.packet_queue.remove(pos).unwrap();
let next_pts = buffered.pts + (*buffered.packet).duration;
self.expected_next_pts = Some(next_pts);
result = self.process_packet(buffered.packet)?;
} else {
break;
}
} else {
break;
}
}
} else {
// Packet is out of order, add to queue
self.packet_queue.push_back(BufferedPacket::new(pkt)?);
// Check if queue is too long - abort muxing if so
if self.packet_queue.len() > MAX_QUEUE_SIZE {
bail!(
"Packet queue overflow - too many out-of-order packets. Queue size: {}",
self.packet_queue.len()
);
}
}
} else {
// Initialize expected_next_pts with first packet
let next_pts = current_pts + (*pkt).duration;
self.expected_next_pts = Some(next_pts);
result = self.process_packet(pkt)?;
}
Ok(result)
// Simply process the packet directly - no reordering needed
// FFmpeg's interleaving system should handle packet ordering upstream
self.process_packet(pkt)
}
/// Process a single packet through the muxer
unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
let pkt_q = av_q2d((*pkt).time_base);
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
let relative_time = pkt_time - self.pkt_start;
let mut result = EgressResult::None;
let pkt_stream = *(*self.mux.context())
.streams
.add((*pkt).stream_index as usize);
// Match FFmpeg's segmentation logic exactly
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
let min_duration = self.segment_length * 0.5;
let should_split = can_split && relative_time >= min_duration;
if should_split {
result = self.split_next_seg(pkt_time)?;
if can_split {
let pkt_q = av_q2d((*pkt).time_base);
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
let relative_time = pkt_time - self.pkt_start;
// FFmpeg checks: pkt->pts - vs->end_pts > 0 to prevent zero duration
// and av_compare_ts for target duration
let has_positive_duration = relative_time > 0.0;
let target_duration_reached = relative_time >= self.segment_length;
if has_positive_duration && target_duration_reached {
result = self.split_next_seg(pkt_time)?;
}
}
// Write packet directly like FFmpeg's ff_write_chained
self.mux.write_packet(pkt)?;
Ok(result)
}