From 5c2a58ed468998e791a3c4a07f7d06159a670209 Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 19 Jun 2025 12:18:44 +0100 Subject: [PATCH] refactor: split hls module --- crates/core/src/mux/hls/mod.rs | 139 ++++++++++ crates/core/src/mux/hls/segment.rs | 75 +++++ .../core/src/mux/{hls.rs => hls/variant.rs} | 261 +++--------------- crates/core/src/mux/mod.rs | 1 + 4 files changed, 249 insertions(+), 227 deletions(-) create mode 100644 crates/core/src/mux/hls/mod.rs create mode 100644 crates/core/src/mux/hls/segment.rs rename crates/core/src/mux/{hls.rs => hls/variant.rs} (77%) diff --git a/crates/core/src/mux/hls/mod.rs b/crates/core/src/mux/hls/mod.rs new file mode 100644 index 0000000..5f88cbc --- /dev/null +++ b/crates/core/src/mux/hls/mod.rs @@ -0,0 +1,139 @@ +use crate::egress::EgressResult; +use crate::mux::hls::variant::HlsVariant; +use crate::variant::{StreamMapping, VariantStream}; +use anyhow::Result; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; +use ffmpeg_rs_raw::Encoder; +use itertools::Itertools; +use log::trace; +use std::fmt::Display; +use std::fs::File; +use std::path::PathBuf; +use uuid::Uuid; + +mod segment; +mod variant; + +pub enum HlsVariantStream { + Video { + group: usize, + index: usize, + id: Uuid, + }, + Audio { + group: usize, + index: usize, + id: Uuid, + }, + Subtitle { + group: usize, + index: usize, + id: Uuid, + }, +} + +impl HlsVariantStream { + pub fn id(&self) -> &Uuid { + match self { + HlsVariantStream::Video { id, .. } => id, + HlsVariantStream::Audio { id, .. } => id, + HlsVariantStream::Subtitle { id, .. } => id, + } + } + + pub fn index(&self) -> &usize { + match self { + HlsVariantStream::Video { index, .. } => index, + HlsVariantStream::Audio { index, .. } => index, + HlsVariantStream::Subtitle { index, .. } => index, + } + } +} + +impl Display for HlsVariantStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HlsVariantStream::Video { index, .. } => write!(f, "v:{}", index), + HlsVariantStream::Audio { index, .. } => write!(f, "a:{}", index), + HlsVariantStream::Subtitle { index, .. } => write!(f, "s:{}", index), + } + } +} + +#[derive(Clone, Copy, PartialEq)] +pub enum SegmentType { + MPEGTS, + FMP4, +} + +pub struct HlsMuxer { + pub out_dir: PathBuf, + pub variants: Vec, +} + +impl HlsMuxer { + pub fn new<'a>( + id: &Uuid, + out_dir: &str, + encoders: impl Iterator, + segment_type: SegmentType, + ) -> Result { + let base = PathBuf::from(out_dir).join(id.to_string()); + + if !base.exists() { + std::fs::create_dir_all(&base)?; + } + let mut vars = Vec::new(); + for (k, group) in &encoders + .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) + .chunk_by(|a| a.0.group_id()) + { + let var = HlsVariant::new(base.to_str().unwrap(), k, group, segment_type)?; + vars.push(var); + } + + let ret = Self { + out_dir: base, + variants: vars, + }; + ret.write_master_playlist()?; + Ok(ret) + } + + fn write_master_playlist(&self) -> Result<()> { + let mut pl = m3u8_rs::MasterPlaylist::default(); + pl.version = Some(3); + pl.variants = self + .variants + .iter() + .map(|v| v.to_playlist_variant()) + .collect(); + + let mut f_out = File::create(self.out_dir.join("live.m3u8"))?; + pl.write_to(&mut f_out)?; + Ok(()) + } + + /// Mux an encoded packet from [Encoder] + pub unsafe fn mux_packet( + &mut self, + pkt: *mut AVPacket, + variant: &Uuid, + ) -> Result { + for var in self.variants.iter_mut() { + if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) { + // very important for muxer to know which stream this pkt belongs to + (*pkt).stream_index = *vs.index() as _; + return var.process_packet(pkt); + } + } + + // This HLS muxer doesn't handle this variant, return None instead of failing + // This can happen when multiple egress handlers are configured with different variant sets + trace!( + "HLS muxer received packet for variant {} which it doesn't handle", + variant + ); + Ok(EgressResult::None) + } +} diff --git a/crates/core/src/mux/hls/segment.rs b/crates/core/src/mux/hls/segment.rs new file mode 100644 index 0000000..1b738c4 --- /dev/null +++ b/crates/core/src/mux/hls/segment.rs @@ -0,0 +1,75 @@ +use crate::mux::hls::variant::HlsVariant; +use crate::mux::SegmentType; +use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part}; + +#[derive(PartialEq)] +pub enum HlsSegment { + Full(SegmentInfo), + Partial(PartialSegmentInfo), +} + +impl HlsSegment { + pub fn to_media_segment(&self) -> MediaSegmentType { + match self { + HlsSegment::Full(f) => f.to_media_segment(), + HlsSegment::Partial(p) => p.to_media_segment(), + } + } +} + +#[derive(PartialEq)] +pub struct SegmentInfo { + pub index: u64, + pub duration: f32, + pub kind: SegmentType, +} + +impl SegmentInfo { + pub fn to_media_segment(&self) -> MediaSegmentType { + MediaSegmentType::Full(MediaSegment { + uri: self.filename(), + duration: self.duration, + ..MediaSegment::default() + }) + } + + pub fn filename(&self) -> String { + HlsVariant::segment_name(self.kind, self.index) + } +} + +#[derive(PartialEq)] +pub struct PartialSegmentInfo { + pub index: u64, + pub parent_index: u64, + pub parent_kind: SegmentType, + pub duration: f64, + pub independent: bool, + pub byte_range: Option<(u64, Option)>, +} + +impl PartialSegmentInfo { + pub fn to_media_segment(&self) -> MediaSegmentType { + MediaSegmentType::Partial(Part { + uri: self.filename(), + duration: self.duration, + independent: self.independent, + gap: false, + byte_range: self.byte_range.map(|r| ByteRange { + length: r.0, + offset: r.1, + }), + }) + } + + pub fn filename(&self) -> String { + HlsVariant::segment_name(self.parent_kind, self.parent_index) + } + + /// Byte offset where this partial segment ends + pub fn end_pos(&self) -> Option { + self.byte_range + .as_ref() + .map(|(len, start)| start.unwrap_or(0) + len) + } +} diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls/variant.rs similarity index 77% rename from crates/core/src/mux/hls.rs rename to crates/core/src/mux/hls/variant.rs index 7269218..736923e 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls/variant.rs @@ -1,4 +1,6 @@ use crate::egress::{EgressResult, EgressSegment}; +use crate::mux::hls::segment::{HlsSegment, PartialSegmentInfo, SegmentInfo}; +use crate::mux::{HlsVariantStream, SegmentType}; use crate::variant::{StreamMapping, VariantStream}; use anyhow::{bail, ensure, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; @@ -8,67 +10,12 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; -use itertools::Itertools; use log::{debug, info, trace, warn}; -use m3u8_rs::{ByteRange, ExtTag, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint}; +use m3u8_rs::{ExtTag, MediaSegmentType, PartInf, PreloadHint}; use std::collections::HashMap; -use std::fmt::Display; use std::fs::File; use std::path::PathBuf; use std::ptr; -use uuid::Uuid; - -#[derive(Clone, Copy, PartialEq)] -pub enum SegmentType { - MPEGTS, - FMP4, -} - -pub enum HlsVariantStream { - Video { - group: usize, - index: usize, - id: Uuid, - }, - Audio { - group: usize, - index: usize, - id: Uuid, - }, - Subtitle { - group: usize, - index: usize, - id: Uuid, - }, -} - -impl HlsVariantStream { - pub fn id(&self) -> &Uuid { - match self { - HlsVariantStream::Video { id, .. } => id, - HlsVariantStream::Audio { id, .. } => id, - HlsVariantStream::Subtitle { id, .. } => id, - } - } - - pub fn index(&self) -> &usize { - match self { - HlsVariantStream::Video { index, .. } => index, - HlsVariantStream::Audio { index, .. } => index, - HlsVariantStream::Subtitle { index, .. } => index, - } - } -} - -impl Display for HlsVariantStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - HlsVariantStream::Video { index, .. } => write!(f, "v:{}", index), - HlsVariantStream::Audio { index, .. } => write!(f, "a:{}", index), - HlsVariantStream::Subtitle { index, .. } => write!(f, "s:{}", index), - } - } -} pub struct HlsVariant { /// Name of this variant (720p) @@ -76,9 +23,9 @@ pub struct HlsVariant { /// MPEG-TS muxer for this variant mux: Muxer, /// List of streams ids in this variant - streams: Vec, + pub(crate) streams: Vec, /// Segment length in seconds - segment_length: f32, + segment_length_target: f32, /// Total number of seconds of video to store segment_window: f32, /// Current segment index @@ -109,82 +56,7 @@ pub struct HlsVariant { init_segment_path: Option, } -#[derive(PartialEq)] -enum HlsSegment { - Full(SegmentInfo), - Partial(PartialSegmentInfo), -} - -impl HlsSegment { - fn to_media_segment(&self) -> MediaSegmentType { - match self { - HlsSegment::Full(f) => f.to_media_segment(), - HlsSegment::Partial(p) => p.to_media_segment(), - } - } -} - -#[derive(PartialEq)] -struct SegmentInfo { - index: u64, - duration: f32, - kind: SegmentType, -} - -impl SegmentInfo { - fn to_media_segment(&self) -> MediaSegmentType { - MediaSegmentType::Full(MediaSegment { - uri: self.filename(), - duration: self.duration, - ..MediaSegment::default() - }) - } - - fn filename(&self) -> String { - HlsVariant::segment_name(self.kind, self.index) - } -} - -#[derive(PartialEq)] -struct PartialSegmentInfo { - index: u64, - parent_index: u64, - parent_kind: SegmentType, - duration: f64, - independent: bool, - byte_range: Option<(u64, Option)>, -} - -impl PartialSegmentInfo { - fn to_media_segment(&self) -> MediaSegmentType { - MediaSegmentType::Partial(Part { - uri: self.filename(), - duration: self.duration, - independent: self.independent, - gap: false, - byte_range: self.byte_range.map(|r| ByteRange { - length: r.0, - offset: r.1, - }), - }) - } - - fn filename(&self) -> String { - HlsVariant::segment_name(self.parent_kind, self.parent_index) - } - - /// Byte offset where this partial segment ends - fn end_pos(&self) -> Option { - self.byte_range - .as_ref() - .map(|(len, start)| start.unwrap_or(0) + len) - } -} - impl HlsVariant { - const LOW_LATENCY: bool = false; - const LL_PARTS: usize = 3; - pub fn new<'a>( out_dir: &'a str, group: usize, @@ -209,7 +81,7 @@ impl HlsVariant { let mut streams = Vec::new(); let mut ref_stream_index = -1; let mut has_video = false; - let mut seg_size = 2.0; + let mut segment_length = 1.0; for (var, enc) in encoded_vars { match var { @@ -224,9 +96,9 @@ impl HlsVariant { has_video = true; // Always use video stream as reference for segmentation ref_stream_index = stream_idx as _; - let v_seg = v.keyframe_interval as f32 / v.fps; - if v_seg > seg_size { - seg_size = v_seg; + let sg = v.keyframe_interval as f32 / v.fps; + if sg > segment_length { + segment_length = sg; } }, VariantStream::Audio(a) => unsafe { @@ -262,12 +134,6 @@ impl HlsVariant { ref_stream_index ); - let min_segment_length = if Self::LOW_LATENCY { - (seg_size * 3.0).max(6.0) // make segments 3x longer in LL mode or minimum 6s - } else { - 2.0 - }; - let segment_length = seg_size.max(min_segment_length); let mut opts = HashMap::new(); if let SegmentType::FMP4 = segment_type { //opts.insert("fflags".to_string(), "-autobsf".to_string()); @@ -276,8 +142,6 @@ impl HlsVariant { "+frag_custom+dash+delay_moov".to_string(), ); }; - let mut partial_seg_size = segment_length / Self::LL_PARTS as f32; - partial_seg_size -= partial_seg_size % seg_size; // align to keyframe unsafe { mux.open(Some(opts))?; @@ -286,7 +150,6 @@ impl HlsVariant { let mut variant = Self { name: name.clone(), - segment_length, segment_window: 30.0, mux, streams, @@ -294,14 +157,15 @@ impl HlsVariant { segments: Vec::new(), out_dir: out_dir.to_string(), segment_type, - packets_written: 0, - ref_stream_index, - partial_target_duration: partial_seg_size, - current_partial_index: 0, current_segment_start: 0.0, current_partial_start: 0.0, + packets_written: 0, + ref_stream_index, + low_latency: false, + partial_target_duration: 0.0, + current_partial_index: 0, next_partial_independent: false, - low_latency: Self::LOW_LATENCY, + segment_length_target: segment_length, init_segment_path: None, }; @@ -315,6 +179,21 @@ impl HlsVariant { Ok(variant) } + pub fn segment_length(&self) -> f32 { + let min_segment_length = if self.low_latency { + (self.segment_length_target * 3.0).max(6.0) // make segments 3x longer in LL mode or minimum 6s + } else { + 2.0 + }; + self.segment_length_target.max(min_segment_length) + } + + pub fn partial_segment_length(&self) -> f32 { + let seg_size = self.segment_length(); + let partial_seg_size = seg_size / 3.0; // 3 segments min + partial_seg_size - partial_seg_size % seg_size + } + pub fn segment_name(t: SegmentType, idx: u64) -> String { match t { SegmentType::MPEGTS => format!("{}.ts", idx), @@ -335,7 +214,7 @@ impl HlsVariant { } /// Process a single packet through the muxer - unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result { + pub(crate) unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result { let pkt_stream = *(*self.mux.context()) .streams .add((*pkt).stream_index as usize); @@ -358,7 +237,7 @@ impl HlsVariant { let cur_part_duration = pkt_pts - self.current_partial_start; // check if current packet is keyframe, flush current segment - if can_split && cur_duration >= self.segment_length as f64 { + if can_split && cur_duration >= self.segment_length() as f64 { result = self.split_next_seg(pkt_pts)?; } else if self.low_latency && cur_part_duration >= self.partial_target_duration as f64 { result = self.create_partial_segment(pkt_pts)?; @@ -666,9 +545,9 @@ impl HlsVariant { pl.version = Some(self.playlist_version()); pl.target_duration = if self.playlist_version() >= 6 { - self.segment_length.round() as _ + self.segment_length().round() as _ } else { - self.segment_length + self.segment_length() }; if self.low_latency { pl.part_inf = Some(PartInf { @@ -760,75 +639,3 @@ impl HlsVariant { } } } - -pub struct HlsMuxer { - pub out_dir: PathBuf, - pub variants: Vec, -} - -impl HlsMuxer { - pub fn new<'a>( - id: &Uuid, - out_dir: &str, - encoders: impl Iterator, - segment_type: SegmentType, - ) -> Result { - let base = PathBuf::from(out_dir).join(id.to_string()); - - if !base.exists() { - std::fs::create_dir_all(&base)?; - } - let mut vars = Vec::new(); - for (k, group) in &encoders - .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) - .chunk_by(|a| a.0.group_id()) - { - let var = HlsVariant::new(base.to_str().unwrap(), k, group, segment_type)?; - vars.push(var); - } - - let ret = Self { - out_dir: base, - variants: vars, - }; - ret.write_master_playlist()?; - Ok(ret) - } - - fn write_master_playlist(&self) -> Result<()> { - let mut pl = m3u8_rs::MasterPlaylist::default(); - pl.version = Some(3); - pl.variants = self - .variants - .iter() - .map(|v| v.to_playlist_variant()) - .collect(); - - let mut f_out = File::create(self.out_dir.join("live.m3u8"))?; - pl.write_to(&mut f_out)?; - Ok(()) - } - - /// Mux an encoded packet from [Encoder] - pub unsafe fn mux_packet( - &mut self, - pkt: *mut AVPacket, - variant: &Uuid, - ) -> Result { - for var in self.variants.iter_mut() { - if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) { - // very important for muxer to know which stream this pkt belongs to - (*pkt).stream_index = *vs.index() as _; - return var.process_packet(pkt); - } - } - - // This HLS muxer doesn't handle this variant, return None instead of failing - // This can happen when multiple egress handlers are configured with different variant sets - trace!( - "HLS muxer received packet for variant {} which it doesn't handle", - variant - ); - Ok(EgressResult::None) - } -} diff --git a/crates/core/src/mux/mod.rs b/crates/core/src/mux/mod.rs index d6aeb80..bcd4c6f 100644 --- a/crates/core/src/mux/mod.rs +++ b/crates/core/src/mux/mod.rs @@ -1,2 +1,3 @@ mod hls; + pub use hls::*;