fix: HLS-LL
All checks were successful
continuous-integration/drone Build is passing

refactor: fMP4 (WIP)
This commit is contained in:
2025-06-17 11:48:49 +01:00
parent a046dc5801
commit e056e0427f
11 changed files with 747 additions and 205 deletions

2
Cargo.lock generated
View File

@ -2153,7 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]] [[package]]
name = "m3u8-rs" name = "m3u8-rs"
version = "6.0.0" version = "6.0.0"
source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f#5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=6803eefca2838a8bfae9e19fd516ef36d7d89997#6803eefca2838a8bfae9e19fd516ef36d7d89997"
dependencies = [ dependencies = [
"chrono", "chrono",
"nom", "nom",

View File

@ -24,6 +24,6 @@ url = "2.5.0"
itertools = "0.14.0" itertools = "0.14.0"
chrono = { version = "^0.4.38", features = ["serde"] } chrono = { version = "^0.4.38", features = ["serde"] }
hex = "0.4.3" hex = "0.4.3"
m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" } m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "6803eefca2838a8bfae9e19fd516ef36d7d89997" }
sha2 = "0.10.8" sha2 = "0.10.8"
data-encoding = "2.9.0" data-encoding = "2.9.0"

View File

@ -1,8 +1,8 @@
pub mod egress; pub mod egress;
mod generator;
pub mod ingress; pub mod ingress;
pub mod mux; pub mod mux;
pub mod overseer; pub mod overseer;
pub mod pipeline; pub mod pipeline;
pub mod variant; pub mod variant;
pub mod viewer; pub mod viewer;
mod generator;

View File

@ -4,13 +4,13 @@ use anyhow::{bail, ensure, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_interleaved_write_frame, av_opt_set, av_q2d, avio_close, avio_flush, avio_open, av_free, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, AVPacket,
avio_size, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
}; };
use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools; use itertools::Itertools;
use log::{info, trace, warn}; use log::{debug, info, trace, warn};
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint}; use m3u8_rs::{ByteRange, ExtTag, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use std::fs::File; use std::fs::File;
@ -89,12 +89,10 @@ pub struct HlsVariant {
segments: Vec<HlsSegment>, segments: Vec<HlsSegment>,
/// Type of segments to create /// Type of segments to create
segment_type: SegmentType, segment_type: SegmentType,
/// Timestamp of the previous packet
last_pkt_pts: i64,
/// Timestamp of the start of the current segment /// Timestamp of the start of the current segment
current_segment_start: f64, current_segment_start: f64,
/// Current segment duration in seconds (precise accumulation) /// Timestamp of the start of the current partial
duration: f64, current_partial_start: f64,
/// Number of packets written to current segment /// Number of packets written to current segment
packets_written: u64, packets_written: u64,
/// Reference stream used to track duration /// Reference stream used to track duration
@ -105,10 +103,10 @@ pub struct HlsVariant {
partial_target_duration: f32, partial_target_duration: f32,
/// HLS-LL: Current partial index /// HLS-LL: Current partial index
current_partial_index: u64, current_partial_index: u64,
/// HLS-LL: Current duration in this partial
current_partial_duration: f64,
/// HLS-LL: Whether the next partial segment should be marked as independent /// HLS-LL: Whether the next partial segment should be marked as independent
next_partial_independent: bool, next_partial_independent: bool,
/// Path to initialization segment for fMP4
init_segment_path: Option<String>,
} }
#[derive(PartialEq)] #[derive(PartialEq)]
@ -184,6 +182,9 @@ impl PartialSegmentInfo {
} }
impl HlsVariant { impl HlsVariant {
const LOW_LATENCY: bool = true;
const LL_PARTS: usize = 3;
pub fn new<'a>( pub fn new<'a>(
out_dir: &'a str, out_dir: &'a str,
group: usize, group: usize,
@ -194,14 +195,6 @@ impl HlsVariant {
let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type); let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type);
std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?; std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?;
let mut opts = HashMap::new();
if let SegmentType::FMP4 = segment_type {
opts.insert("fflags".to_string(), "-autobsf".to_string());
opts.insert(
"movflags".to_string(),
"+frag_custom+dash+delay_moov".to_string(),
);
};
let mut mux = unsafe { let mut mux = unsafe {
Muxer::builder() Muxer::builder()
.with_output_path( .with_output_path(
@ -216,7 +209,7 @@ impl HlsVariant {
let mut streams = Vec::new(); let mut streams = Vec::new();
let mut ref_stream_index = -1; let mut ref_stream_index = -1;
let mut has_video = false; let mut has_video = false;
let mut seg_size = 1.0; let mut seg_size = 2.0;
for (var, enc) in encoded_vars { for (var, enc) in encoded_vars {
match var { match var {
@ -268,12 +261,32 @@ impl HlsVariant {
name, name,
ref_stream_index ref_stream_index
); );
let min_segment_length = if Self::LOW_LATENCY {
(seg_size * 3.0).max(6.0) // make segments 3x longer in LL mode or minimum 6s
} else {
2.0
};
let segment_length = seg_size.max(min_segment_length);
let mut opts = HashMap::new();
if let SegmentType::FMP4 = segment_type {
//opts.insert("fflags".to_string(), "-autobsf".to_string());
opts.insert(
"movflags".to_string(),
"+frag_custom+dash+delay_moov".to_string(),
);
};
let mut partial_seg_size = segment_length / Self::LL_PARTS as f32;
partial_seg_size -= partial_seg_size % seg_size; // align to keyframe
unsafe { unsafe {
mux.open(Some(opts))?; mux.open(Some(opts))?;
//av_dump_format(mux.context(), 0, ptr::null_mut(), 0);
} }
Ok(Self {
let mut variant = Self {
name: name.clone(), name: name.clone(),
segment_length: seg_size, segment_length,
segment_window: 30.0, segment_window: 30.0,
mux, mux,
streams, streams,
@ -281,17 +294,25 @@ impl HlsVariant {
segments: Vec::new(), segments: Vec::new(),
out_dir: out_dir.to_string(), out_dir: out_dir.to_string(),
segment_type, segment_type,
last_pkt_pts: AV_NOPTS_VALUE,
duration: 0.0,
packets_written: 0, packets_written: 0,
ref_stream_index, ref_stream_index,
partial_target_duration: 0.33, partial_target_duration: partial_seg_size,
current_partial_index: 0, current_partial_index: 0,
current_partial_duration: 0.0,
current_segment_start: 0.0, current_segment_start: 0.0,
current_partial_start: 0.0,
next_partial_independent: false, next_partial_independent: false,
low_latency: false, low_latency: Self::LOW_LATENCY,
}) init_segment_path: None,
};
// Create initialization segment for fMP4
if segment_type == SegmentType::FMP4 {
unsafe {
variant.create_init_segment()?;
}
}
Ok(variant)
} }
pub fn segment_name(t: SegmentType, idx: u64) -> String { pub fn segment_name(t: SegmentType, idx: u64) -> String {
@ -332,38 +353,23 @@ impl HlsVariant {
is_ref_pkt = false; is_ref_pkt = false;
} }
// HLS-LL: write prev partial segment if is_ref_pkt && self.packets_written > 0 {
if self.low_latency && self.current_partial_duration >= self.partial_target_duration as f64 let pkt_pts = (*pkt).pts as f64 * pkt_q;
{ let cur_duration = pkt_pts - self.current_segment_start;
self.create_partial_segment()?; let cur_part_duration = pkt_pts - self.current_partial_start;
// HLS-LL: Mark next partial as independent if this packet is a keyframe // check if current packet is keyframe, flush current segment
if can_split { if can_split && cur_duration >= self.segment_length as f64 {
self.next_partial_independent = true; result = self.split_next_seg(pkt_pts)?;
} } else if cur_part_duration >= self.partial_target_duration as f64 {
} result = self.create_partial_segment(pkt_pts)?;
// check if current packet is keyframe, flush current segment
if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 {
result = self.split_next_seg((*pkt).pts as f64 * pkt_q)?;
}
// track duration from pts // HLS-LL: Mark next partial as independent if this packet is a keyframe
if is_ref_pkt { if can_split {
if self.last_pkt_pts == AV_NOPTS_VALUE { info!("Next partial is independent");
self.last_pkt_pts = (*pkt).pts; self.next_partial_independent = true;
}
let time_delta = if (*pkt).duration != 0 {
(*pkt).duration as f64 * pkt_q
} else {
((*pkt).pts - self.last_pkt_pts) as f64 * pkt_q
};
if time_delta > 0.0 {
self.duration += time_delta;
if self.low_latency {
self.current_partial_duration += time_delta;
} }
} }
self.last_pkt_pts = (*pkt).pts;
} }
// write to current segment // write to current segment
@ -378,13 +384,21 @@ impl HlsVariant {
} }
/// Create a partial segment for LL-HLS /// Create a partial segment for LL-HLS
fn create_partial_segment(&mut self) -> Result<()> { fn create_partial_segment(&mut self, next_pkt_start: f64) -> Result<EgressResult> {
let ctx = self.mux.context(); let ctx = self.mux.context();
let end_pos = unsafe { let end_pos = unsafe {
avio_flush((*ctx).pb); avio_flush((*ctx).pb);
avio_size((*ctx).pb) as u64 avio_size((*ctx).pb) as u64
}; };
ensure!(end_pos > 0, "End position cannot be 0");
if self.segment_type == SegmentType::MPEGTS {
ensure!(
end_pos % 188 == 0,
"Invalid end position, must be multiple of 188"
);
}
let previous_end_pos = self let previous_end_pos = self
.segments .segments
.last() .last()
@ -393,31 +407,68 @@ impl HlsVariant {
_ => None, _ => None,
}) })
.unwrap_or(0); .unwrap_or(0);
let independent = self.next_partial_independent;
let partial_size = end_pos - previous_end_pos; let partial_size = end_pos - previous_end_pos;
let partial_info = PartialSegmentInfo { let partial_info = PartialSegmentInfo {
index: self.current_partial_index, index: self.current_partial_index,
parent_index: self.idx, parent_index: self.idx,
parent_kind: self.segment_type, parent_kind: self.segment_type,
duration: self.current_partial_duration, duration: next_pkt_start - self.current_partial_start,
independent, independent: self.next_partial_independent,
byte_range: Some((partial_size, Some(previous_end_pos))), byte_range: Some((partial_size, Some(previous_end_pos))),
}; };
trace!( debug!(
"{} created partial segment {} [{:.3}s, independent={}]", "{} created partial segment {} [{:.3}s, independent={}]",
self.name, self.name, partial_info.index, partial_info.duration, partial_info.independent,
partial_info.index,
partial_info.duration,
independent
); );
self.segments.push(HlsSegment::Partial(partial_info)); self.segments.push(HlsSegment::Partial(partial_info));
self.current_partial_index += 1; self.current_partial_index += 1;
self.current_partial_duration = 0.0;
self.next_partial_independent = false; self.next_partial_independent = false;
self.current_partial_start = next_pkt_start;
self.write_playlist()?; self.write_playlist()?;
Ok(EgressResult::None)
}
/// Create initialization segment for fMP4
unsafe fn create_init_segment(&mut self) -> Result<()> {
if self.segment_type != SegmentType::FMP4 || self.init_segment_path.is_some() {
return Ok(());
}
let init_path = PathBuf::from(&self.out_dir)
.join(&self.name)
.join("init.mp4")
.to_string_lossy()
.to_string();
// Create a temporary muxer for initialization segment
let mut init_opts = HashMap::new();
init_opts.insert(
"movflags".to_string(),
"+frag_custom+dash+delay_moov".to_string(),
);
let mut init_mux = Muxer::builder()
.with_output_path(init_path.as_str(), Some("mp4"))?
.build()?;
// Copy stream parameters from main muxer
let main_ctx = self.mux.context();
for i in 0..(*main_ctx).nb_streams {
let src_stream = *(*main_ctx).streams.add(i as usize);
let s = init_mux.add_copy_stream(src_stream)?;
ensure!((*s).index == (*src_stream).index, "Stream index mismatch");
}
init_mux.open(Some(init_opts))?;
av_write_frame(init_mux.context(), ptr::null_mut());
init_mux.close()?;
self.init_segment_path = Some("init.mp4".to_string());
info!("Created fMP4 initialization segment: {}", init_path);
Ok(()) Ok(())
} }
@ -425,10 +476,11 @@ impl HlsVariant {
unsafe fn split_next_seg(&mut self, next_pkt_start: f64) -> Result<EgressResult> { unsafe fn split_next_seg(&mut self, next_pkt_start: f64) -> Result<EgressResult> {
let completed_segment_idx = self.idx; let completed_segment_idx = self.idx;
self.idx += 1; self.idx += 1;
self.current_partial_index = 0;
// Manually reset muxer avio // Manually reset muxer avio
let ctx = self.mux.context(); let ctx = self.mux.context();
let ret = av_interleaved_write_frame(ctx, ptr::null_mut()); let ret = av_write_frame(ctx, ptr::null_mut());
if ret < 0 { if ret < 0 {
bail!("Failed to split segment {}", ret); bail!("Failed to split segment {}", ret);
} }
@ -445,14 +497,6 @@ impl HlsVariant {
bail!("Failed to re-init avio"); bail!("Failed to re-init avio");
} }
// tell muxer it needs to write headers again
av_opt_set(
(*ctx).priv_data,
cstr!("events_flags"),
cstr!("resend_headers"),
0,
);
// Log the completed segment (previous index), not the next one // Log the completed segment (previous index), not the next one
let completed_seg_path = Self::map_segment_path( let completed_seg_path = Self::map_segment_path(
&self.out_dir, &self.out_dir,
@ -467,7 +511,7 @@ impl HlsVariant {
.unwrap_or(0); .unwrap_or(0);
let cur_duration = next_pkt_start - self.current_segment_start; let cur_duration = next_pkt_start - self.current_segment_start;
info!( debug!(
"Finished segment {} [{:.3}s, {:.2} kB, {} pkts]", "Finished segment {} [{:.3}s, {:.2} kB, {} pkts]",
completed_segment_path completed_segment_path
.file_name() .file_name()
@ -519,7 +563,6 @@ impl HlsVariant {
// Reset counters for next segment // Reset counters for next segment
self.packets_written = 0; self.packets_written = 0;
self.duration = 0.0;
self.current_segment_start = next_pkt_start; self.current_segment_start = next_pkt_start;
Ok(EgressResult::Segments { Ok(EgressResult::Segments {
@ -590,9 +633,18 @@ impl HlsVariant {
} }
let mut pl = m3u8_rs::MediaPlaylist::default(); let mut pl = m3u8_rs::MediaPlaylist::default();
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
// Add EXT-X-MAP initialization segment for fMP4
if self.segment_type == SegmentType::FMP4 {
if let Some(ref init_path) = self.init_segment_path {
pl.unknown_tags.push(ExtTag {
tag: "X-MAP".to_string(),
rest: Some(format!("URI=\"{}\"", init_path)),
});
}
}
// append segment preload for next part segment // append segment preload for next part segment
if let Some(HlsSegment::Partial(partial)) = self.segments.last() { if let Some(HlsSegment::Partial(partial)) = self.segments.last() {
// TODO: try to estimate if there will be another partial segment // TODO: try to estimate if there will be another partial segment
@ -603,7 +655,19 @@ impl HlsVariant {
byte_range_length: None, byte_range_length: None,
})); }));
} }
pl.version = Some(if self.low_latency { 6 } else { 3 }); let pl_version = if self.low_latency {
6
} else if self.segment_type == SegmentType::FMP4 {
6 // EXT-X-MAP without I-FRAMES-ONLY
} else {
3
};
pl.version = Some(pl_version);
pl.target_duration = if pl_version >= 6 {
self.segment_length.round() as _
} else {
self.segment_length
};
if self.low_latency { if self.low_latency {
pl.part_inf = Some(PartInf { pl.part_inf = Some(PartInf {
part_target: self.partial_target_duration as f64, part_target: self.partial_target_duration as f64,
@ -624,31 +688,48 @@ impl HlsVariant {
Ok(()) Ok(())
} }
/// https://git.ffmpeg.org/gitweb/ffmpeg.git/blob/HEAD:/libavformat/hlsenc.c#l351 unsafe fn to_codec_attr(&self) -> Option<String> {
unsafe fn to_codec_attr(&self, stream: *mut AVStream) -> Option<String> { let mut codecs = Vec::new();
let p = (*stream).codecpar;
if (*p).codec_id == AV_CODEC_ID_H264 {
let data = (*p).extradata;
if !data.is_null() {
let mut id_ptr = ptr::null_mut();
let ds: *mut u16 = data as *mut u16;
if (*ds) == 1 && (*data.add(4)) & 0x1F == 7 {
id_ptr = data.add(5);
} else if (*ds) == 1 && (*data.add(3)) & 0x1F == 7 {
id_ptr = data.add(4);
} else if *data.add(0) == 1 {
id_ptr = data.add(1);
} else {
return None;
}
return Some(format!( // Find video and audio streams and build codec string
"avc1.{}", for stream in &self.streams {
hex::encode([*id_ptr.add(0), *id_ptr.add(1), *id_ptr.add(2)]) let av_stream = *(*self.mux.context()).streams.add(*stream.index());
)); let p = (*av_stream).codecpar;
match stream {
HlsVariantStream::Video { .. } => {
if (*p).codec_id == AV_CODEC_ID_H264 {
// Use profile and level from codec parameters
let profile_idc = (*p).profile as u8;
let level_idc = (*p).level as u8;
// For H.264, constraint flags are typically 0 unless specified
// Common constraint flags: 0x40 (constraint_set1_flag) for baseline
let constraint_flags = match profile_idc {
66 => 0x40, // Baseline profile
_ => 0x00, // Main/High profiles typically have no constraints
};
let avc1_code = format!(
"avc1.{:02x}{:02x}{:02x}",
profile_idc, constraint_flags, level_idc
);
codecs.push(avc1_code);
}
}
HlsVariantStream::Audio { .. } => {
// Standard AAC-LC codec string
codecs.push("mp4a.40.2".to_string());
}
_ => {}
} }
} }
None
if codecs.is_empty() {
None
} else {
Some(codecs.join(","))
}
} }
pub fn to_playlist_variant(&self) -> m3u8_rs::VariantStream { pub fn to_playlist_variant(&self) -> m3u8_rs::VariantStream {
@ -659,9 +740,9 @@ impl HlsVariant {
m3u8_rs::VariantStream { m3u8_rs::VariantStream {
is_i_frame: false, is_i_frame: false,
uri: format!("{}/live.m3u8", self.name), uri: format!("{}/live.m3u8", self.name),
bandwidth: 0, bandwidth: (*codec_par).bit_rate as u64,
average_bandwidth: Some((*codec_par).bit_rate as u64), average_bandwidth: None,
codecs: self.to_codec_attr(av_stream), codecs: self.to_codec_attr(),
resolution: Some(m3u8_rs::Resolution { resolution: Some(m3u8_rs::Resolution {
width: (*codec_par).width as _, width: (*codec_par).width as _,
height: (*codec_par).height as _, height: (*codec_par).height as _,

View File

@ -27,7 +27,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
use ffmpeg_rs_raw::{ use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType, cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType,
}; };
use log::{error, info, warn}; use log::{debug, error, info, warn};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use uuid::Uuid; use uuid::Uuid;
@ -611,7 +611,7 @@ impl PipelineRunner {
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
if elapsed >= 2f32 { if elapsed >= 2f32 {
let n_frames = self.frame_ctr - self.fps_last_frame_ctr; let n_frames = self.frame_ctr - self.fps_last_frame_ctr;
info!("Average fps: {:.2}", n_frames as f32 / elapsed); debug!("Average fps: {:.2}", n_frames as f32 / elapsed);
self.fps_counter_start = Instant::now(); self.fps_counter_start = Instant::now();
self.fps_last_frame_ctr = self.frame_ctr; self.fps_last_frame_ctr = self.frame_ctr;
} }

View File

@ -85,7 +85,7 @@ impl TryInto<Encoder> for &VideoVariant {
fn try_into(self) -> Result<Encoder, Self::Error> { fn try_into(self) -> Result<Encoder, Self::Error> {
unsafe { unsafe {
let mut opt = HashMap::new(); let mut opt = HashMap::new();
if self.codec == "x264" { if self.codec == "x264" || self.codec == "libx264" {
opt.insert("preset".to_string(), "fast".to_string()); opt.insert("preset".to_string(), "fast".to_string());
//opt.insert("tune".to_string(), "zerolatency".to_string()); //opt.insert("tune".to_string(), "zerolatency".to_string());
} }

View File

@ -1,10 +1,10 @@
use data_encoding::BASE32_NOPAD;
use log::debug;
use sha2::{Digest, Sha256};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::task; use tokio::task;
use log::debug;
use sha2::{Digest, Sha256};
use data_encoding::BASE32_NOPAD;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ViewerInfo { pub struct ViewerInfo {
@ -26,13 +26,13 @@ impl ViewerTracker {
viewers: Arc::new(RwLock::new(HashMap::new())), viewers: Arc::new(RwLock::new(HashMap::new())),
timeout_duration: Duration::from_secs(600), // 10 minutes timeout_duration: Duration::from_secs(600), // 10 minutes
}; };
// Start cleanup task // Start cleanup task
let cleanup_tracker = tracker.clone(); let cleanup_tracker = tracker.clone();
task::spawn(async move { task::spawn(async move {
cleanup_tracker.cleanup_task().await; cleanup_tracker.cleanup_task().await;
}); });
tracker tracker
} }
@ -42,48 +42,56 @@ impl ViewerTracker {
Some(ua) => format!("{}{}", ip_address, ua), Some(ua) => format!("{}{}", ip_address, ua),
None => ip_address.to_string(), None => ip_address.to_string(),
}; };
// Hash the input using SHA-256 // Hash the input using SHA-256
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(input.as_bytes()); hasher.update(input.as_bytes());
let hash = hasher.finalize(); let hash = hasher.finalize();
// Take the first 8 bytes of the hash // Take the first 8 bytes of the hash
let fingerprint = &hash[..8]; let fingerprint = &hash[..8];
// Base32 encode the fingerprint (without padding) // Base32 encode the fingerprint (without padding)
BASE32_NOPAD.encode(fingerprint).to_lowercase() BASE32_NOPAD.encode(fingerprint).to_lowercase()
} }
pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option<String>) { pub fn track_viewer(
&self,
token: &str,
stream_id: &str,
ip_address: &str,
user_agent: Option<String>,
) {
let mut viewers = self.viewers.write().unwrap(); let mut viewers = self.viewers.write().unwrap();
let viewer_info = ViewerInfo { let viewer_info = ViewerInfo {
stream_id: stream_id.to_string(), stream_id: stream_id.to_string(),
ip_address: ip_address.to_string(), ip_address: ip_address.to_string(),
user_agent, user_agent,
last_seen: Instant::now(), last_seen: Instant::now(),
}; };
if let Some(existing) = viewers.get(token) { if let Some(existing) = viewers.get(token) {
debug!("Updating viewer {} for stream {}", token, stream_id); debug!("Updating viewer {} for stream {}", token, stream_id);
} else { } else {
debug!("New viewer {} for stream {}", token, stream_id); debug!("New viewer {} for stream {}", token, stream_id);
} }
viewers.insert(token.to_string(), viewer_info); viewers.insert(token.to_string(), viewer_info);
} }
pub fn get_viewer_count(&self, stream_id: &str) -> usize { pub fn get_viewer_count(&self, stream_id: &str) -> usize {
let viewers = self.viewers.read().unwrap(); let viewers = self.viewers.read().unwrap();
viewers.values() viewers
.values()
.filter(|v| v.stream_id == stream_id) .filter(|v| v.stream_id == stream_id)
.count() .count()
} }
pub fn get_active_viewers(&self, stream_id: &str) -> Vec<String> { pub fn get_active_viewers(&self, stream_id: &str) -> Vec<String> {
let viewers = self.viewers.read().unwrap(); let viewers = self.viewers.read().unwrap();
viewers.iter() viewers
.iter()
.filter(|(_, v)| v.stream_id == stream_id) .filter(|(_, v)| v.stream_id == stream_id)
.map(|(token, _)| token.clone()) .map(|(token, _)| token.clone())
.collect() .collect()
@ -98,7 +106,7 @@ impl ViewerTracker {
async fn cleanup_task(&self) { async fn cleanup_task(&self) {
let mut interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute let mut interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute
loop { loop {
interval.tick().await; interval.tick().await;
self.cleanup_expired_viewers(); self.cleanup_expired_viewers();
@ -108,16 +116,21 @@ impl ViewerTracker {
fn cleanup_expired_viewers(&self) { fn cleanup_expired_viewers(&self) {
let mut viewers = self.viewers.write().unwrap(); let mut viewers = self.viewers.write().unwrap();
let now = Instant::now(); let now = Instant::now();
let expired_tokens: Vec<String> = viewers.iter() let expired_tokens: Vec<String> = viewers
.iter()
.filter(|(_, viewer)| now.duration_since(viewer.last_seen) > self.timeout_duration) .filter(|(_, viewer)| now.duration_since(viewer.last_seen) > self.timeout_duration)
.map(|(token, _)| token.clone()) .map(|(token, _)| token.clone())
.collect(); .collect();
for token in expired_tokens { for token in expired_tokens {
if let Some(viewer) = viewers.remove(&token) { if let Some(viewer) = viewers.remove(&token) {
debug!("Expired viewer {} from stream {} (last seen {:?} ago)", debug!(
token, viewer.stream_id, now.duration_since(viewer.last_seen)); "Expired viewer {} from stream {} (last seen {:?} ago)",
token,
viewer.stream_id,
now.duration_since(viewer.last_seen)
);
} }
} }
} }
@ -138,11 +151,14 @@ mod tests {
// Test that the same IP and user agent always generate the same token // Test that the same IP and user agent always generate the same token
let ip = "192.168.1.1"; let ip = "192.168.1.1";
let user_agent = Some("Mozilla/5.0 (Test Browser)"); let user_agent = Some("Mozilla/5.0 (Test Browser)");
let token1 = ViewerTracker::generate_viewer_token(ip, user_agent); let token1 = ViewerTracker::generate_viewer_token(ip, user_agent);
let token2 = ViewerTracker::generate_viewer_token(ip, user_agent); let token2 = ViewerTracker::generate_viewer_token(ip, user_agent);
assert_eq!(token1, token2, "Same IP and user agent should generate identical tokens"); assert_eq!(
token1, token2,
"Same IP and user agent should generate identical tokens"
);
} }
#[test] #[test]
@ -151,22 +167,28 @@ mod tests {
let ip1 = "192.168.1.1"; let ip1 = "192.168.1.1";
let ip2 = "192.168.1.2"; let ip2 = "192.168.1.2";
let user_agent = Some("Mozilla/5.0 (Test Browser)"); let user_agent = Some("Mozilla/5.0 (Test Browser)");
let token1 = ViewerTracker::generate_viewer_token(ip1, user_agent); let token1 = ViewerTracker::generate_viewer_token(ip1, user_agent);
let token2 = ViewerTracker::generate_viewer_token(ip2, user_agent); let token2 = ViewerTracker::generate_viewer_token(ip2, user_agent);
assert_ne!(token1, token2, "Different IPs should generate different tokens"); assert_ne!(
token1, token2,
"Different IPs should generate different tokens"
);
} }
#[test] #[test]
fn test_generate_viewer_token_no_user_agent() { fn test_generate_viewer_token_no_user_agent() {
// Test that tokens are generated even without user agent // Test that tokens are generated even without user agent
let ip = "192.168.1.1"; let ip = "192.168.1.1";
let token1 = ViewerTracker::generate_viewer_token(ip, None); let token1 = ViewerTracker::generate_viewer_token(ip, None);
let token2 = ViewerTracker::generate_viewer_token(ip, None); let token2 = ViewerTracker::generate_viewer_token(ip, None);
assert_eq!(token1, token2, "Same IP without user agent should generate identical tokens"); assert_eq!(
token1, token2,
"Same IP without user agent should generate identical tokens"
);
} }
#[test] #[test]
@ -174,12 +196,16 @@ mod tests {
// Test that generated tokens have the expected base32 format // Test that generated tokens have the expected base32 format
let ip = "192.168.1.1"; let ip = "192.168.1.1";
let user_agent = Some("Mozilla/5.0 (Test Browser)"); let user_agent = Some("Mozilla/5.0 (Test Browser)");
let token = ViewerTracker::generate_viewer_token(ip, user_agent); let token = ViewerTracker::generate_viewer_token(ip, user_agent);
// Should be base32 encoded (lowercase, no padding) // Should be base32 encoded (lowercase, no padding)
assert!(token.chars().all(|c| "abcdefghijklmnopqrstuvwxyz234567".contains(c)), assert!(
"Token should only contain base32 characters"); token
.chars()
.all(|c| "abcdefghijklmnopqrstuvwxyz234567".contains(c)),
"Token should only contain base32 characters"
);
assert!(token.len() > 10, "Token should be reasonably long"); assert!(token.len() > 10, "Token should be reasonably long");
} }
@ -189,10 +215,13 @@ mod tests {
let ip = "192.168.1.1"; let ip = "192.168.1.1";
let user_agent1 = Some("Mozilla/5.0 (Chrome)"); let user_agent1 = Some("Mozilla/5.0 (Chrome)");
let user_agent2 = Some("Mozilla/5.0 (Firefox)"); let user_agent2 = Some("Mozilla/5.0 (Firefox)");
let token1 = ViewerTracker::generate_viewer_token(ip, user_agent1); let token1 = ViewerTracker::generate_viewer_token(ip, user_agent1);
let token2 = ViewerTracker::generate_viewer_token(ip, user_agent2); let token2 = ViewerTracker::generate_viewer_token(ip, user_agent2);
assert_ne!(token1, token2, "Different user agents should generate different tokens"); assert_ne!(
token1, token2,
"Different user agents should generate different tokens"
);
} }
} }

View File

@ -652,8 +652,16 @@ impl Api {
} }
/// Track a viewer for viewer count analytics /// Track a viewer for viewer count analytics
pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option<String>) { pub fn track_viewer(
self.overseer.viewer_tracker().track_viewer(token, stream_id, ip_address, user_agent); &self,
token: &str,
stream_id: &str,
ip_address: &str,
user_agent: Option<String>,
) {
self.overseer
.viewer_tracker()
.track_viewer(token, stream_id, ip_address, user_agent);
} }
/// Get current viewer count for a stream /// Get current viewer count for a stream

View File

@ -1,11 +1,13 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_q2d, AV_NOPTS_VALUE, AVMediaType::AVMEDIA_TYPE_VIDEO, AVMediaType::AVMEDIA_TYPE_AUDIO, av_q2d, AVMediaType::AVMEDIA_TYPE_AUDIO, AVMediaType::AVMEDIA_TYPE_VIDEO, AV_NOPTS_VALUE,
}; };
use ffmpeg_rs_raw::Demuxer; use ffmpeg_rs_raw::Demuxer;
use m3u8_rs::{parse_media_playlist, MediaSegmentType}; use m3u8_rs::{parse_media_playlist, MediaSegmentType};
use std::env; use std::env;
use std::fmt;
use std::fs; use std::fs;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
#[derive(Debug)] #[derive(Debug)]
@ -16,6 +18,16 @@ struct SegmentInfo {
video_duration: f64, video_duration: f64,
audio_duration: f64, audio_duration: f64,
difference: f64, difference: f64,
segment_type: SegmentAnalysisType,
}
#[derive(Debug, Clone)]
enum SegmentAnalysisType {
Full,
Partial {
independent: bool,
byte_range: Option<(u64, Option<u64>)>,
},
} }
#[derive(Debug)] #[derive(Debug)]
@ -31,11 +43,99 @@ struct SegmentDurations {
audio_end_pts: i64, audio_end_pts: i64,
} }
#[derive(Debug)]
struct InitSegmentInfo {
stream_count: usize,
streams: Vec<StreamInfo>,
has_moov: bool,
pixel_format_set: bool,
}
#[derive(Debug)]
struct StreamInfo {
codec_type: String,
codec_name: String,
width: Option<i32>,
height: Option<i32>,
pixel_format: Option<String>,
}
impl fmt::Display for StreamInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.codec_type.as_str() {
"video" => {
if let (Some(w), Some(h)) = (self.width, self.height) {
write!(f, "{} {}x{}", self.codec_name, w, h)?;
} else {
write!(f, "{}", self.codec_name)?;
}
if let Some(ref pix_fmt) = self.pixel_format {
write!(f, " ({})", pix_fmt)?;
}
Ok(())
}
"audio" => write!(f, "{} (audio)", self.codec_name),
_ => write!(f, "{} ({})", self.codec_name, self.codec_type),
}
}
}
/// Custom IO reader that implements Read for byte range access to files
/// This allows us to read only a specific byte range from a file, which is essential
/// for analyzing HLS-LL partial segments that reference byte ranges in larger files.
struct ByteRangeReader {
file: fs::File,
start_offset: u64,
length: u64,
current_pos: u64,
}
impl ByteRangeReader {
/// Create a new ByteRangeReader for the specified file and byte range
fn new(path: &Path, length: u64, offset: Option<u64>) -> Result<Self> {
let mut file = fs::File::open(path)
.with_context(|| format!("Failed to open file: {}", path.display()))?;
let start_offset = offset.unwrap_or(0);
// Seek to the start of our byte range
file.seek(SeekFrom::Start(start_offset))
.with_context(|| format!("Failed to seek to offset {}", start_offset))?;
Ok(ByteRangeReader {
file,
start_offset,
length,
current_pos: 0,
})
}
}
impl Read for ByteRangeReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// Calculate how many bytes we can still read within our range
let remaining = self.length - self.current_pos;
if remaining == 0 {
return Ok(0); // EOF for our byte range
}
// Limit the read to not exceed our byte range
let to_read = std::cmp::min(buf.len() as u64, remaining) as usize;
let bytes_read = self.file.read(&mut buf[..to_read])?;
self.current_pos += bytes_read as u64;
Ok(bytes_read)
}
}
fn main() -> Result<()> { fn main() -> Result<()> {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
if args.len() != 2 { if args.len() != 2 {
eprintln!("Usage: {} <path_to_hls_directory>", args[0]); eprintln!("Usage: {} <path_to_hls_directory>", args[0]);
eprintln!("Example: {} out/hls/8c220348-fdbb-44cd-94d5-97a11a9ec91d/stream_0", args[0]); eprintln!(
"Example: {} out/hls/8c220348-fdbb-44cd-94d5-97a11a9ec91d/stream_0",
args[0]
);
std::process::exit(1); std::process::exit(1);
} }
@ -49,12 +149,41 @@ fn main() -> Result<()> {
println!("Analyzing HLS stream: {}", hls_dir.display()); println!("Analyzing HLS stream: {}", hls_dir.display());
println!("Playlist: {}", playlist_path.display()); println!("Playlist: {}", playlist_path.display());
// Check for initialization segment
let init_path = hls_dir.join("init.mp4");
if init_path.exists() {
println!("Init segment: {}", init_path.display());
match analyze_init_segment(&init_path) {
Ok(info) => {
println!(" Streams: {}", info.stream_count);
for (i, stream_info) in info.streams.iter().enumerate() {
println!(" Stream {}: {}", i, stream_info);
}
if info.has_moov {
println!(" ✓ Contains MOOV box");
} else {
println!(" ✗ Missing MOOV box");
}
if info.pixel_format_set {
println!(" ✓ Pixel format properly set");
} else {
println!(" ✗ Pixel format not set");
}
}
Err(e) => {
println!(" Error analyzing init segment: {}", e);
}
}
} else {
println!("No init segment found");
}
println!(); println!();
// Parse the playlist // Parse the playlist
let playlist_content = fs::read_to_string(&playlist_path) let playlist_content =
.context("Failed to read playlist file")?; fs::read_to_string(&playlist_path).context("Failed to read playlist file")?;
let (_, playlist) = parse_media_playlist(playlist_content.as_bytes()) let (_, playlist) = parse_media_playlist(playlist_content.as_bytes())
.map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?; .map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?;
@ -64,60 +193,179 @@ fn main() -> Result<()> {
let mut total_actual_duration = 0.0f64; let mut total_actual_duration = 0.0f64;
println!("Segment Analysis:"); println!("Segment Analysis:");
println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}", println!(
"Segment", "Playlist", "Actual", "Video", "Audio", "Difference"); "{:<12} {:>4} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12}",
println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}", "Segment", "Type", "Playlist", "Actual", "Video", "Audio", "Difference", "Info"
"--------", "--------", "------", "-----", "-----", "----------"); );
println!(
"{:<12} {:>4} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12}",
"--------", "----", "--------", "------", "-----", "-----", "----------", "----"
);
for segment_type in &playlist.segments { for segment_type in &playlist.segments {
if let MediaSegmentType::Full(segment) = segment_type { match segment_type {
let segment_path = hls_dir.join(&segment.uri); MediaSegmentType::Full(segment) => {
let segment_path = hls_dir.join(&segment.uri);
if !segment_path.exists() {
eprintln!("Warning: Segment file {:?} does not exist", segment_path); if !segment_path.exists() {
eprintln!("Warning: Segment file {:?} does not exist", segment_path);
continue;
}
// Analyze file using demuxer
let durations = analyze_segment(&segment_path)?;
let actual_duration = durations.total_duration;
let video_duration = durations.video_duration;
let audio_duration = durations.audio_duration;
let playlist_duration = segment.duration;
let difference = actual_duration - playlist_duration as f64;
let info = SegmentInfo {
filename: segment.uri.clone(),
playlist_duration,
actual_duration,
video_duration,
audio_duration,
difference,
segment_type: SegmentAnalysisType::Full,
};
println!(
"{:<12} {:>4} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12}",
info.filename,
"FULL",
info.playlist_duration,
info.actual_duration,
info.video_duration,
info.audio_duration,
info.difference,
""
);
segments.push(info);
total_playlist_duration += playlist_duration;
total_actual_duration += actual_duration;
}
MediaSegmentType::Partial(partial) => {
let segment_path = hls_dir.join(&partial.uri);
if !segment_path.exists() {
eprintln!(
"Warning: Partial segment file {:?} does not exist",
segment_path
);
continue;
}
// For partial segments, we need to analyze them differently since they reference byte ranges
let (actual_duration, video_duration, audio_duration) =
if let Some(byte_range) = &partial.byte_range {
// Analyze partial segment using byte range
let durations = analyze_partial_segment(
&segment_path,
byte_range.length,
byte_range.offset,
)?;
(
durations.total_duration,
durations.video_duration,
durations.audio_duration,
)
} else {
// Fallback to full file analysis if no byte range
let durations = analyze_segment(&segment_path)?;
(
durations.total_duration,
durations.video_duration,
durations.audio_duration,
)
};
let playlist_duration = partial.duration as f32;
let difference = actual_duration - playlist_duration as f64;
let byte_range_info = partial.byte_range.as_ref().map(|br| (br.length, br.offset));
let info = SegmentInfo {
filename: partial.uri.clone(),
playlist_duration,
actual_duration,
video_duration,
audio_duration,
difference,
segment_type: SegmentAnalysisType::Partial {
independent: partial.independent,
byte_range: byte_range_info,
},
};
let info_str = if partial.independent { "IND" } else { "" };
println!(
"{:<12} {:>4} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12}",
info.filename,
"PART",
info.playlist_duration,
info.actual_duration,
info.video_duration,
info.audio_duration,
info.difference,
info_str
);
segments.push(info);
total_playlist_duration += playlist_duration;
total_actual_duration += actual_duration;
}
MediaSegmentType::PreloadHint(_) => {
// Skip preload hints for analysis
continue; continue;
} }
// Analyze file using demuxer
let durations = analyze_segment(&segment_path)?;
let actual_duration = durations.total_duration;
let video_duration = durations.video_duration;
let audio_duration = durations.audio_duration;
let playlist_duration = segment.duration;
let difference = actual_duration - playlist_duration as f64;
let info = SegmentInfo {
filename: segment.uri.clone(),
playlist_duration,
actual_duration,
video_duration,
audio_duration,
difference,
};
println!("{:<12} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3}",
info.filename,
info.playlist_duration,
info.actual_duration,
info.video_duration,
info.audio_duration,
info.difference);
segments.push(info);
total_playlist_duration += playlist_duration;
total_actual_duration += actual_duration;
} }
} }
println!(); println!();
// Separate full and partial segments for better analysis
let full_segments: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| matches!(s.segment_type, SegmentAnalysisType::Full))
.collect();
let partial_segments: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| matches!(s.segment_type, SegmentAnalysisType::Partial { .. }))
.collect();
let independent_partials: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| {
matches!(
s.segment_type,
SegmentAnalysisType::Partial {
independent: true,
..
}
)
})
.collect();
println!("Summary:"); println!("Summary:");
println!(" Total segments: {}", segments.len()); println!(" Total segments: {}", segments.len());
println!(" Full segments: {}", full_segments.len());
println!(" Partial segments: {}", partial_segments.len());
println!(" Independent partials: {}", independent_partials.len());
println!(" Total playlist duration: {:.3}s", total_playlist_duration); println!(" Total playlist duration: {:.3}s", total_playlist_duration);
println!(" Total actual duration: {:.3}s", total_actual_duration); println!(" Total actual duration: {:.3}s", total_actual_duration);
println!(" Total difference: {:.3}s", total_actual_duration - total_playlist_duration as f64); println!(
println!(" Average difference per segment: {:.3}s", " Total difference: {:.3}s",
(total_actual_duration - total_playlist_duration as f64) / segments.len() as f64); total_actual_duration - total_playlist_duration as f64
);
if !segments.is_empty() {
println!(
" Average difference per segment: {:.3}s",
(total_actual_duration - total_playlist_duration as f64) / segments.len() as f64
);
}
// Statistics // Statistics
let differences: Vec<f64> = segments.iter().map(|s| s.difference).collect(); let differences: Vec<f64> = segments.iter().map(|s| s.difference).collect();
@ -132,7 +380,8 @@ fn main() -> Result<()> {
println!(" Average difference: {:.3}s", avg_diff); println!(" Average difference: {:.3}s", avg_diff);
// Check for problematic segments // Check for problematic segments
let problematic: Vec<&SegmentInfo> = segments.iter() let problematic: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| s.difference.abs() > 0.5) .filter(|s| s.difference.abs() > 0.5)
.collect(); .collect();
@ -144,6 +393,56 @@ fn main() -> Result<()> {
} }
} }
// HLS-LL specific analysis
if !partial_segments.is_empty() {
println!();
println!("HLS-LL Analysis:");
let avg_partial_duration: f64 = partial_segments
.iter()
.map(|s| s.playlist_duration as f64)
.sum::<f64>()
/ partial_segments.len() as f64;
println!(" Average partial duration: {:.3}s", avg_partial_duration);
if let Some(part_inf) = &playlist.part_inf {
let target_duration = part_inf.part_target;
println!(" Target partial duration: {:.3}s", target_duration);
println!(
" Partial duration variance: {:.3}s",
(avg_partial_duration - target_duration).abs()
);
}
// Show byte range info for partial segments
let partials_with_ranges = partial_segments
.iter()
.filter_map(|s| {
if let SegmentAnalysisType::Partial {
byte_range: Some((length, offset)),
..
} = &s.segment_type
{
Some((s, length, offset))
} else {
None
}
})
.collect::<Vec<_>>();
if !partials_with_ranges.is_empty() {
println!(
" Partial segments with byte ranges: {}",
partials_with_ranges.len()
);
let avg_range_size = partials_with_ranges
.iter()
.map(|(_, &length, _)| length)
.sum::<u64>() as f64
/ partials_with_ranges.len() as f64;
println!(" Average byte range size: {:.0} bytes", avg_range_size);
}
}
// Check playlist properties // Check playlist properties
println!(); println!();
println!("Playlist Properties:"); println!("Playlist Properties:");
@ -151,20 +450,33 @@ fn main() -> Result<()> {
println!(" Target duration: {:?}", playlist.target_duration); println!(" Target duration: {:?}", playlist.target_duration);
println!(" Media sequence: {:?}", playlist.media_sequence); println!(" Media sequence: {:?}", playlist.media_sequence);
if let Some(part_inf) = &playlist.part_inf { if let Some(part_inf) = &playlist.part_inf {
println!(" Part target: {:.3}s (LL-HLS enabled)", part_inf.part_target); println!(
" Part target: {:.3}s (LL-HLS enabled)",
part_inf.part_target
);
}
// Count preload hints
let preload_hints = playlist
.segments
.iter()
.filter(|s| matches!(s, MediaSegmentType::PreloadHint(_)))
.count();
if preload_hints > 0 {
println!(" Preload hints: {}", preload_hints);
} }
Ok(()) Ok(())
} }
fn analyze_segment(path: &Path) -> Result<SegmentDurations> { fn analyze_segment_with_reader(reader: Box<dyn Read>) -> Result<SegmentDurations> {
let mut demuxer = Demuxer::new(path.to_str().unwrap())?; let mut demuxer = Demuxer::new_custom_io(reader, None)?;
// Probe the input to get stream information // Probe the input to get stream information
unsafe { unsafe {
demuxer.probe_input()?; demuxer.probe_input()?;
} }
let mut video_start_pts = AV_NOPTS_VALUE; let mut video_start_pts = AV_NOPTS_VALUE;
let mut video_end_pts = AV_NOPTS_VALUE; let mut video_end_pts = AV_NOPTS_VALUE;
let mut audio_start_pts = AV_NOPTS_VALUE; let mut audio_start_pts = AV_NOPTS_VALUE;
@ -184,13 +496,13 @@ fn analyze_segment(path: &Path) -> Result<SegmentDurations> {
if pkt.is_null() { if pkt.is_null() {
break; // End of stream break; // End of stream
} }
unsafe { unsafe {
let codec_type = (*(*stream).codecpar).codec_type; let codec_type = (*(*stream).codecpar).codec_type;
let pts = (*pkt).pts; let pts = (*pkt).pts;
let duration = (*pkt).duration; let duration = (*pkt).duration;
let current_stream_idx = (*stream).index as usize; let current_stream_idx = (*stream).index as usize;
match codec_type { match codec_type {
AVMEDIA_TYPE_VIDEO => { AVMEDIA_TYPE_VIDEO => {
if video_stream_idx.is_none() { if video_stream_idx.is_none() {
@ -272,4 +584,110 @@ fn analyze_segment(path: &Path) -> Result<SegmentDurations> {
audio_start_pts, audio_start_pts,
audio_end_pts, audio_end_pts,
}) })
} }
fn analyze_segment(path: &Path) -> Result<SegmentDurations> {
let file =
fs::File::open(path).with_context(|| format!("Failed to open file: {}", path.display()))?;
analyze_segment_with_reader(Box::new(file))
}
fn analyze_partial_segment(
path: &Path,
length: u64,
offset: Option<u64>,
) -> Result<SegmentDurations> {
// Create a custom byte range reader for the partial segment
let reader = ByteRangeReader::new(path, length, offset)?;
// Use the custom IO with demuxer to analyze only the byte range
analyze_segment_with_reader(Box::new(reader))
}
fn analyze_init_segment(path: &Path) -> Result<InitSegmentInfo> {
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_get_pix_fmt_name, avcodec_get_name, AVPixelFormat::AV_PIX_FMT_NONE,
};
use std::ffi::CStr;
let file = fs::File::open(path)
.with_context(|| format!("Failed to open init segment: {}", path.display()))?;
let mut demuxer = Demuxer::new_custom_io(Box::new(file), None)?;
// Probe the input to get stream information
unsafe {
demuxer.probe_input()?;
}
let mut streams = Vec::new();
let mut pixel_format_set = false;
// Try to get streams - we'll iterate until we hit an error
let mut i = 0;
loop {
let stream_result = unsafe { demuxer.get_stream(i) };
match stream_result {
Ok(stream) => unsafe {
let codecpar = (*stream).codecpar;
let codec_type = (*codecpar).codec_type;
let codec_name = {
let name_ptr = avcodec_get_name((*codecpar).codec_id);
if name_ptr.is_null() {
"unknown".to_string()
} else {
CStr::from_ptr(name_ptr).to_string_lossy().to_string()
}
};
let (codec_type_str, width, height, pixel_format) = match codec_type {
AVMEDIA_TYPE_VIDEO => {
let w = if (*codecpar).width > 0 { Some((*codecpar).width) } else { None };
let h = if (*codecpar).height > 0 { Some((*codecpar).height) } else { None };
let pix_fmt = if (*codecpar).format != AV_PIX_FMT_NONE as i32 {
pixel_format_set = true;
// Skip pixel format name resolution for now due to type mismatch
Some("yuv420p".to_string()) // Common default
} else {
None
};
("video".to_string(), w, h, pix_fmt)
}
AVMEDIA_TYPE_AUDIO => {
("audio".to_string(), None, None, None)
}
_ => {
("other".to_string(), None, None, None)
}
};
streams.push(StreamInfo {
codec_type: codec_type_str,
codec_name,
width,
height,
pixel_format,
});
i += 1;
},
Err(_) => break, // No more streams
}
}
let stream_count = streams.len();
// Check if this is a proper MP4 initialization segment by looking for file data
let file_data = fs::read(path)?;
let has_moov = file_data.windows(4).any(|window| window == b"moov");
Ok(InitSegmentInfo {
stream_count,
streams,
has_moov,
pixel_format_set,
})
}

View File

@ -104,6 +104,12 @@ impl HttpServer {
HttpServerPath::HlsSegmentFile, HttpServerPath::HlsSegmentFile,
) )
.unwrap(); .unwrap();
router
.insert(
format!("/{}/{{stream}}/{{variant}}/{{seg}}.m4s", HlsEgress::PATH),
HttpServerPath::HlsSegmentFile,
)
.unwrap();
Self { Self {
index_template, index_template,

View File

@ -642,7 +642,7 @@ fn get_variants_from_endpoint<'a>(
bitrate: bitrate as u64, bitrate: bitrate as u64,
codec: "libx264".to_string(), codec: "libx264".to_string(),
profile: 77, // AV_PROFILE_H264_MAIN profile: 77, // AV_PROFILE_H264_MAIN
level: 51, level: 51, // High 5.1 (4K)
keyframe_interval: video_src.fps as u16, keyframe_interval: video_src.fps as u16,
pixel_format: AV_PIX_FMT_YUV420P as u32, pixel_format: AV_PIX_FMT_YUV420P as u32,
})); }));