From 047b3fec594fa8ab71b29b0081ca5597f8b64a02 Mon Sep 17 00:00:00 2001 From: Kieran Date: Fri, 13 Jun 2025 12:36:20 +0100 Subject: [PATCH] fix: hls partial sequencing --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/core/src/mux/hls.rs | 66 ++++++++++++++++++++++-------- crates/core/src/pipeline/runner.rs | 8 ++-- 4 files changed, 55 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c38f299..d9b745d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2153,7 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" [[package]] name = "m3u8-rs" version = "6.0.0" -source = "git+https://github.com/v0l/m3u8-rs.git?rev=d76ff96326814237a6d5e92288cdfe7060a43168#d76ff96326814237a6d5e92288cdfe7060a43168" +source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f#5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" dependencies = [ "chrono", "nom", diff --git a/Cargo.toml b/Cargo.toml index 47e3baa..4f76997 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,6 @@ url = "2.5.0" itertools = "0.14.0" chrono = { version = "^0.4.38", features = ["serde"] } hex = "0.4.3" -m3u8-rs = { git = "https://github.com/v0l/m3u8-rs.git", rev = "d76ff96326814237a6d5e92288cdfe7060a43168" } +m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" } sha2 = "0.10.8" data-encoding = "2.9.0" \ No newline at end of file diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index f3e974c..55dfa0f 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -10,7 +10,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; use log::{info, trace, warn}; -use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf}; +use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint}; use std::collections::HashMap; use std::fmt::Display; use std::fs::File; @@ -103,6 +103,8 @@ pub struct HlsVariant { current_partial_index: u64, /// HLS-LL: Current duration in this partial current_partial_duration: f64, + /// HLS-LL: Whether the next partial segment should be marked as independent + next_partial_independent: bool, } #[derive(PartialEq)] @@ -114,8 +116,8 @@ enum HlsSegment { impl HlsSegment { fn to_media_segment(&self) -> MediaSegmentType { match self { - HlsSegment::Full(s) => s.to_media_segment(), - HlsSegment::Partial(s) => s.to_media_segment(), + HlsSegment::Full(f) => f.to_media_segment(), + HlsSegment::Partial(p) => p.to_media_segment(), } } } @@ -168,6 +170,13 @@ impl PartialSegmentInfo { fn filename(&self) -> String { HlsVariant::segment_name(self.parent_kind, self.parent_index) } + + /// Byte offset where this partial segment ends + fn end_pos(&self) -> Option { + self.byte_range + .as_ref() + .map(|(len, start)| start.unwrap_or(0) + len) + } } impl HlsVariant { @@ -271,6 +280,7 @@ impl HlsVariant { partial_target_duration: 0.33, current_partial_index: 0, current_partial_duration: 0.0, + next_partial_independent: false, }) } @@ -311,6 +321,16 @@ impl HlsVariant { is_ref_pkt = false; } + // HLS-LL: write prev partial segment + if self.current_partial_duration >= self.partial_target_duration as f64 { + self.create_partial_segment()?; + + // HLS-LL: Mark next partial as independent if this packet is a keyframe + if can_split { + self.next_partial_independent = true; + } + } + // check if current packet is keyframe, flush current segment if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 { result = self.split_next_seg()?; @@ -334,11 +354,6 @@ impl HlsVariant { self.mux.write_packet(pkt)?; self.packets_written += 1; - // HLS-LL: write next partial segment - if is_ref_pkt && self.current_partial_duration >= self.partial_target_duration as f64 { - self.create_partial_segment(can_split)?; - } - Ok(result) } @@ -347,27 +362,30 @@ impl HlsVariant { } /// Create a partial segment for LL-HLS - fn create_partial_segment(&mut self, independent: bool) -> Result<()> { + fn create_partial_segment(&mut self) -> Result<()> { let ctx = self.mux.context(); - let pos = unsafe { + let end_pos = unsafe { avio_flush((*ctx).pb); avio_size((*ctx).pb) as u64 }; - let previous_partial_end = self.segments.last().and_then(|s| match &s { - HlsSegment::Partial(p) => p.byte_range.as_ref().map(|(len, start)| start.unwrap_or(0) + len), - _ => None, - }); + let previous_end_pos = self + .segments + .last() + .and_then(|s| match &s { + HlsSegment::Partial(p) => p.end_pos(), + _ => None, + }) + .unwrap_or(0); + let independent = self.next_partial_independent; + let partial_size = end_pos - previous_end_pos; let partial_info = PartialSegmentInfo { index: self.current_partial_index, parent_index: self.idx, parent_kind: self.segment_type, duration: self.current_partial_duration, independent, - byte_range: match previous_partial_end { - Some(prev_end) => Some((pos - prev_end, Some(prev_end))), - _ => Some((pos, Some(0))), - }, + byte_range: Some((partial_size, Some(previous_end_pos))), }; trace!( @@ -380,6 +398,7 @@ impl HlsVariant { self.segments.push(HlsSegment::Partial(partial_info)); self.current_partial_index += 1; self.current_partial_duration = 0.0; + self.next_partial_independent = false; self.write_playlist()?; @@ -558,6 +577,17 @@ impl HlsVariant { let mut pl = m3u8_rs::MediaPlaylist::default(); pl.target_duration = (self.segment_length.ceil() as u64).max(1); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); + + // append segment preload for next part segment + if let Some(HlsSegment::Partial(partial)) = self.segments.last() { + // TODO: try to estimate if there will be another partial segment + pl.segments.push(MediaSegmentType::PreloadHint(PreloadHint { + hint_type: "PART".to_string(), + uri: partial.filename(), + byte_range_start: partial.end_pos(), + byte_range_length: None, + })); + } pl.version = Some(6); pl.part_inf = Some(PartInf { part_target: self.partial_target_duration as f64, diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index f1d0973..59da94d 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -208,13 +208,15 @@ impl PipelineRunner { unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> { if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { let frame = av_frame_clone(frame).addr(); - let dst_pic = PathBuf::from(&self.out_dir) - .join(self.connection.id.to_string()) - .join("thumb.webp"); + let dir = PathBuf::from(&self.out_dir).join(self.connection.id.to_string()); + if !dir.exists() { + std::fs::create_dir_all(&dir)?; + } std::thread::spawn(move || unsafe { let mut frame = frame as *mut AVFrame; //TODO: danger?? let thumb_start = Instant::now(); + let dst_pic = dir.join("thumb.webp"); if let Err(e) = Self::save_thumb(frame, &dst_pic) { warn!("Failed to save thumb: {}", e); }