refactor: split hls module

This commit is contained in:
2025-06-19 12:18:44 +01:00
parent 2c3ef01d45
commit 5c2a58ed46
4 changed files with 249 additions and 227 deletions

View File

@ -0,0 +1,139 @@
use crate::egress::EgressResult;
use crate::mux::hls::variant::HlsVariant;
use crate::variant::{StreamMapping, VariantStream};
use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
use ffmpeg_rs_raw::Encoder;
use itertools::Itertools;
use log::trace;
use std::fmt::Display;
use std::fs::File;
use std::path::PathBuf;
use uuid::Uuid;
mod segment;
mod variant;
pub enum HlsVariantStream {
Video {
group: usize,
index: usize,
id: Uuid,
},
Audio {
group: usize,
index: usize,
id: Uuid,
},
Subtitle {
group: usize,
index: usize,
id: Uuid,
},
}
impl HlsVariantStream {
pub fn id(&self) -> &Uuid {
match self {
HlsVariantStream::Video { id, .. } => id,
HlsVariantStream::Audio { id, .. } => id,
HlsVariantStream::Subtitle { id, .. } => id,
}
}
pub fn index(&self) -> &usize {
match self {
HlsVariantStream::Video { index, .. } => index,
HlsVariantStream::Audio { index, .. } => index,
HlsVariantStream::Subtitle { index, .. } => index,
}
}
}
impl Display for HlsVariantStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HlsVariantStream::Video { index, .. } => write!(f, "v:{}", index),
HlsVariantStream::Audio { index, .. } => write!(f, "a:{}", index),
HlsVariantStream::Subtitle { index, .. } => write!(f, "s:{}", index),
}
}
}
#[derive(Clone, Copy, PartialEq)]
pub enum SegmentType {
MPEGTS,
FMP4,
}
pub struct HlsMuxer {
pub out_dir: PathBuf,
pub variants: Vec<HlsVariant>,
}
impl HlsMuxer {
pub fn new<'a>(
id: &Uuid,
out_dir: &str,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType,
) -> Result<Self> {
let base = PathBuf::from(out_dir).join(id.to_string());
if !base.exists() {
std::fs::create_dir_all(&base)?;
}
let mut vars = Vec::new();
for (k, group) in &encoders
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))
.chunk_by(|a| a.0.group_id())
{
let var = HlsVariant::new(base.to_str().unwrap(), k, group, segment_type)?;
vars.push(var);
}
let ret = Self {
out_dir: base,
variants: vars,
};
ret.write_master_playlist()?;
Ok(ret)
}
fn write_master_playlist(&self) -> Result<()> {
let mut pl = m3u8_rs::MasterPlaylist::default();
pl.version = Some(3);
pl.variants = self
.variants
.iter()
.map(|v| v.to_playlist_variant())
.collect();
let mut f_out = File::create(self.out_dir.join("live.m3u8"))?;
pl.write_to(&mut f_out)?;
Ok(())
}
/// Mux an encoded packet from [Encoder]
pub unsafe fn mux_packet(
&mut self,
pkt: *mut AVPacket,
variant: &Uuid,
) -> Result<EgressResult> {
for var in self.variants.iter_mut() {
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
// very important for muxer to know which stream this pkt belongs to
(*pkt).stream_index = *vs.index() as _;
return var.process_packet(pkt);
}
}
// This HLS muxer doesn't handle this variant, return None instead of failing
// This can happen when multiple egress handlers are configured with different variant sets
trace!(
"HLS muxer received packet for variant {} which it doesn't handle",
variant
);
Ok(EgressResult::None)
}
}

View File

@ -0,0 +1,75 @@
use crate::mux::hls::variant::HlsVariant;
use crate::mux::SegmentType;
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part};
#[derive(PartialEq)]
pub enum HlsSegment {
Full(SegmentInfo),
Partial(PartialSegmentInfo),
}
impl HlsSegment {
pub fn to_media_segment(&self) -> MediaSegmentType {
match self {
HlsSegment::Full(f) => f.to_media_segment(),
HlsSegment::Partial(p) => p.to_media_segment(),
}
}
}
#[derive(PartialEq)]
pub struct SegmentInfo {
pub index: u64,
pub duration: f32,
pub kind: SegmentType,
}
impl SegmentInfo {
pub fn to_media_segment(&self) -> MediaSegmentType {
MediaSegmentType::Full(MediaSegment {
uri: self.filename(),
duration: self.duration,
..MediaSegment::default()
})
}
pub fn filename(&self) -> String {
HlsVariant::segment_name(self.kind, self.index)
}
}
#[derive(PartialEq)]
pub struct PartialSegmentInfo {
pub index: u64,
pub parent_index: u64,
pub parent_kind: SegmentType,
pub duration: f64,
pub independent: bool,
pub byte_range: Option<(u64, Option<u64>)>,
}
impl PartialSegmentInfo {
pub fn to_media_segment(&self) -> MediaSegmentType {
MediaSegmentType::Partial(Part {
uri: self.filename(),
duration: self.duration,
independent: self.independent,
gap: false,
byte_range: self.byte_range.map(|r| ByteRange {
length: r.0,
offset: r.1,
}),
})
}
pub fn filename(&self) -> String {
HlsVariant::segment_name(self.parent_kind, self.parent_index)
}
/// Byte offset where this partial segment ends
pub fn end_pos(&self) -> Option<u64> {
self.byte_range
.as_ref()
.map(|(len, start)| start.unwrap_or(0) + len)
}
}

View File

@ -1,4 +1,6 @@
use crate::egress::{EgressResult, EgressSegment};
use crate::mux::hls::segment::{HlsSegment, PartialSegmentInfo, SegmentInfo};
use crate::mux::{HlsVariantStream, SegmentType};
use crate::variant::{StreamMapping, VariantStream};
use anyhow::{bail, ensure, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
@ -8,67 +10,12 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
};
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools;
use log::{debug, info, trace, warn};
use m3u8_rs::{ByteRange, ExtTag, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint};
use m3u8_rs::{ExtTag, MediaSegmentType, PartInf, PreloadHint};
use std::collections::HashMap;
use std::fmt::Display;
use std::fs::File;
use std::path::PathBuf;
use std::ptr;
use uuid::Uuid;
#[derive(Clone, Copy, PartialEq)]
pub enum SegmentType {
MPEGTS,
FMP4,
}
pub enum HlsVariantStream {
Video {
group: usize,
index: usize,
id: Uuid,
},
Audio {
group: usize,
index: usize,
id: Uuid,
},
Subtitle {
group: usize,
index: usize,
id: Uuid,
},
}
impl HlsVariantStream {
pub fn id(&self) -> &Uuid {
match self {
HlsVariantStream::Video { id, .. } => id,
HlsVariantStream::Audio { id, .. } => id,
HlsVariantStream::Subtitle { id, .. } => id,
}
}
pub fn index(&self) -> &usize {
match self {
HlsVariantStream::Video { index, .. } => index,
HlsVariantStream::Audio { index, .. } => index,
HlsVariantStream::Subtitle { index, .. } => index,
}
}
}
impl Display for HlsVariantStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HlsVariantStream::Video { index, .. } => write!(f, "v:{}", index),
HlsVariantStream::Audio { index, .. } => write!(f, "a:{}", index),
HlsVariantStream::Subtitle { index, .. } => write!(f, "s:{}", index),
}
}
}
pub struct HlsVariant {
/// Name of this variant (720p)
@ -76,9 +23,9 @@ pub struct HlsVariant {
/// MPEG-TS muxer for this variant
mux: Muxer,
/// List of streams ids in this variant
streams: Vec<HlsVariantStream>,
pub(crate) streams: Vec<HlsVariantStream>,
/// Segment length in seconds
segment_length: f32,
segment_length_target: f32,
/// Total number of seconds of video to store
segment_window: f32,
/// Current segment index
@ -109,82 +56,7 @@ pub struct HlsVariant {
init_segment_path: Option<String>,
}
#[derive(PartialEq)]
enum HlsSegment {
Full(SegmentInfo),
Partial(PartialSegmentInfo),
}
impl HlsSegment {
fn to_media_segment(&self) -> MediaSegmentType {
match self {
HlsSegment::Full(f) => f.to_media_segment(),
HlsSegment::Partial(p) => p.to_media_segment(),
}
}
}
#[derive(PartialEq)]
struct SegmentInfo {
index: u64,
duration: f32,
kind: SegmentType,
}
impl SegmentInfo {
fn to_media_segment(&self) -> MediaSegmentType {
MediaSegmentType::Full(MediaSegment {
uri: self.filename(),
duration: self.duration,
..MediaSegment::default()
})
}
fn filename(&self) -> String {
HlsVariant::segment_name(self.kind, self.index)
}
}
#[derive(PartialEq)]
struct PartialSegmentInfo {
index: u64,
parent_index: u64,
parent_kind: SegmentType,
duration: f64,
independent: bool,
byte_range: Option<(u64, Option<u64>)>,
}
impl PartialSegmentInfo {
fn to_media_segment(&self) -> MediaSegmentType {
MediaSegmentType::Partial(Part {
uri: self.filename(),
duration: self.duration,
independent: self.independent,
gap: false,
byte_range: self.byte_range.map(|r| ByteRange {
length: r.0,
offset: r.1,
}),
})
}
fn filename(&self) -> String {
HlsVariant::segment_name(self.parent_kind, self.parent_index)
}
/// Byte offset where this partial segment ends
fn end_pos(&self) -> Option<u64> {
self.byte_range
.as_ref()
.map(|(len, start)| start.unwrap_or(0) + len)
}
}
impl HlsVariant {
const LOW_LATENCY: bool = false;
const LL_PARTS: usize = 3;
pub fn new<'a>(
out_dir: &'a str,
group: usize,
@ -209,7 +81,7 @@ impl HlsVariant {
let mut streams = Vec::new();
let mut ref_stream_index = -1;
let mut has_video = false;
let mut seg_size = 2.0;
let mut segment_length = 1.0;
for (var, enc) in encoded_vars {
match var {
@ -224,9 +96,9 @@ impl HlsVariant {
has_video = true;
// Always use video stream as reference for segmentation
ref_stream_index = stream_idx as _;
let v_seg = v.keyframe_interval as f32 / v.fps;
if v_seg > seg_size {
seg_size = v_seg;
let sg = v.keyframe_interval as f32 / v.fps;
if sg > segment_length {
segment_length = sg;
}
},
VariantStream::Audio(a) => unsafe {
@ -262,12 +134,6 @@ impl HlsVariant {
ref_stream_index
);
let min_segment_length = if Self::LOW_LATENCY {
(seg_size * 3.0).max(6.0) // make segments 3x longer in LL mode or minimum 6s
} else {
2.0
};
let segment_length = seg_size.max(min_segment_length);
let mut opts = HashMap::new();
if let SegmentType::FMP4 = segment_type {
//opts.insert("fflags".to_string(), "-autobsf".to_string());
@ -276,8 +142,6 @@ impl HlsVariant {
"+frag_custom+dash+delay_moov".to_string(),
);
};
let mut partial_seg_size = segment_length / Self::LL_PARTS as f32;
partial_seg_size -= partial_seg_size % seg_size; // align to keyframe
unsafe {
mux.open(Some(opts))?;
@ -286,7 +150,6 @@ impl HlsVariant {
let mut variant = Self {
name: name.clone(),
segment_length,
segment_window: 30.0,
mux,
streams,
@ -294,14 +157,15 @@ impl HlsVariant {
segments: Vec::new(),
out_dir: out_dir.to_string(),
segment_type,
packets_written: 0,
ref_stream_index,
partial_target_duration: partial_seg_size,
current_partial_index: 0,
current_segment_start: 0.0,
current_partial_start: 0.0,
packets_written: 0,
ref_stream_index,
low_latency: false,
partial_target_duration: 0.0,
current_partial_index: 0,
next_partial_independent: false,
low_latency: Self::LOW_LATENCY,
segment_length_target: segment_length,
init_segment_path: None,
};
@ -315,6 +179,21 @@ impl HlsVariant {
Ok(variant)
}
pub fn segment_length(&self) -> f32 {
let min_segment_length = if self.low_latency {
(self.segment_length_target * 3.0).max(6.0) // make segments 3x longer in LL mode or minimum 6s
} else {
2.0
};
self.segment_length_target.max(min_segment_length)
}
pub fn partial_segment_length(&self) -> f32 {
let seg_size = self.segment_length();
let partial_seg_size = seg_size / 3.0; // 3 segments min
partial_seg_size - partial_seg_size % seg_size
}
pub fn segment_name(t: SegmentType, idx: u64) -> String {
match t {
SegmentType::MPEGTS => format!("{}.ts", idx),
@ -335,7 +214,7 @@ impl HlsVariant {
}
/// Process a single packet through the muxer
unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
pub(crate) unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
let pkt_stream = *(*self.mux.context())
.streams
.add((*pkt).stream_index as usize);
@ -358,7 +237,7 @@ impl HlsVariant {
let cur_part_duration = pkt_pts - self.current_partial_start;
// check if current packet is keyframe, flush current segment
if can_split && cur_duration >= self.segment_length as f64 {
if can_split && cur_duration >= self.segment_length() as f64 {
result = self.split_next_seg(pkt_pts)?;
} else if self.low_latency && cur_part_duration >= self.partial_target_duration as f64 {
result = self.create_partial_segment(pkt_pts)?;
@ -666,9 +545,9 @@ impl HlsVariant {
pl.version = Some(self.playlist_version());
pl.target_duration = if self.playlist_version() >= 6 {
self.segment_length.round() as _
self.segment_length().round() as _
} else {
self.segment_length
self.segment_length()
};
if self.low_latency {
pl.part_inf = Some(PartInf {
@ -760,75 +639,3 @@ impl HlsVariant {
}
}
}
pub struct HlsMuxer {
pub out_dir: PathBuf,
pub variants: Vec<HlsVariant>,
}
impl HlsMuxer {
pub fn new<'a>(
id: &Uuid,
out_dir: &str,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType,
) -> Result<Self> {
let base = PathBuf::from(out_dir).join(id.to_string());
if !base.exists() {
std::fs::create_dir_all(&base)?;
}
let mut vars = Vec::new();
for (k, group) in &encoders
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))
.chunk_by(|a| a.0.group_id())
{
let var = HlsVariant::new(base.to_str().unwrap(), k, group, segment_type)?;
vars.push(var);
}
let ret = Self {
out_dir: base,
variants: vars,
};
ret.write_master_playlist()?;
Ok(ret)
}
fn write_master_playlist(&self) -> Result<()> {
let mut pl = m3u8_rs::MasterPlaylist::default();
pl.version = Some(3);
pl.variants = self
.variants
.iter()
.map(|v| v.to_playlist_variant())
.collect();
let mut f_out = File::create(self.out_dir.join("live.m3u8"))?;
pl.write_to(&mut f_out)?;
Ok(())
}
/// Mux an encoded packet from [Encoder]
pub unsafe fn mux_packet(
&mut self,
pkt: *mut AVPacket,
variant: &Uuid,
) -> Result<EgressResult> {
for var in self.variants.iter_mut() {
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
// very important for muxer to know which stream this pkt belongs to
(*pkt).stream_index = *vs.index() as _;
return var.process_packet(pkt);
}
}
// This HLS muxer doesn't handle this variant, return None instead of failing
// This can happen when multiple egress handlers are configured with different variant sets
trace!(
"HLS muxer received packet for variant {} which it doesn't handle",
variant
);
Ok(EgressResult::None)
}
}

View File

@ -1,2 +1,3 @@
mod hls;
pub use hls::*;