refactor: cleanup directory handling
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
2025-06-19 13:57:27 +01:00
parent 6eb7ff9807
commit 686cd7f794
9 changed files with 64 additions and 118 deletions

View File

@ -17,18 +17,12 @@ impl HlsEgress {
pub const PATH: &'static str = "hls"; pub const PATH: &'static str = "hls";
pub fn new<'a>( pub fn new<'a>(
id: &Uuid, out_dir: PathBuf,
out_dir: &str,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType, segment_type: SegmentType,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
mux: HlsMuxer::new( mux: HlsMuxer::new(out_dir.join(Self::PATH), encoders, segment_type)?,
id,
PathBuf::from(out_dir).join(Self::PATH).to_str().unwrap(),
encoders,
segment_type,
)?,
}) })
} }
} }

View File

@ -10,8 +10,6 @@ use crate::egress::{Egress, EgressResult};
use crate::variant::{StreamMapping, VariantStream}; use crate::variant::{StreamMapping, VariantStream};
pub struct RecorderEgress { pub struct RecorderEgress {
/// Pipeline ID
id: Uuid,
/// Internal muxer writing the output packets /// Internal muxer writing the output packets
muxer: Muxer, muxer: Muxer,
/// Mapping from Variant ID to stream index /// Mapping from Variant ID to stream index
@ -20,15 +18,10 @@ pub struct RecorderEgress {
impl RecorderEgress { impl RecorderEgress {
pub fn new<'a>( pub fn new<'a>(
id: &Uuid, out_dir: PathBuf,
out_dir: &str,
variants: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, variants: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
) -> Result<Self> { ) -> Result<Self> {
let base = PathBuf::from(out_dir).join(id.to_string()); let out_file = out_dir.join("recording.ts");
let out_file = base.join("recording.ts");
fs::create_dir_all(&base)?;
let mut var_map = HashMap::new(); let mut var_map = HashMap::new();
let muxer = unsafe { let muxer = unsafe {
let mut m = Muxer::builder() let mut m = Muxer::builder()
@ -41,11 +34,7 @@ impl RecorderEgress {
m.open(None)?; m.open(None)?;
m m
}; };
Ok(Self { Ok(Self { muxer, var_map })
id: *id,
muxer,
var_map,
})
} }
} }

View File

@ -73,27 +73,24 @@ pub struct HlsMuxer {
impl HlsMuxer { impl HlsMuxer {
pub fn new<'a>( pub fn new<'a>(
id: &Uuid, out_dir: PathBuf,
out_dir: &str,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType, segment_type: SegmentType,
) -> Result<Self> { ) -> Result<Self> {
let base = PathBuf::from(out_dir).join(id.to_string()); if !out_dir.exists() {
std::fs::create_dir_all(&out_dir)?;
if !base.exists() {
std::fs::create_dir_all(&base)?;
} }
let mut vars = Vec::new(); let mut vars = Vec::new();
for (k, group) in &encoders for (k, group) in &encoders
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))
.chunk_by(|a| a.0.group_id()) .chunk_by(|a| a.0.group_id())
{ {
let var = HlsVariant::new(base.to_str().unwrap(), k, group, segment_type)?; let var = HlsVariant::new(out_dir.clone(), k, group, segment_type)?;
vars.push(var); vars.push(var);
} }
let ret = Self { let ret = Self {
out_dir: base, out_dir,
variants: vars, variants: vars,
}; };
ret.write_master_playlist()?; ret.write_master_playlist()?;

View File

@ -13,7 +13,7 @@ use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use log::{debug, info, trace, warn}; use log::{debug, info, trace, warn};
use m3u8_rs::{ExtTag, MediaSegmentType, PartInf, PreloadHint}; use m3u8_rs::{ExtTag, MediaSegmentType, PartInf, PreloadHint};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::{create_dir_all, File};
use std::path::PathBuf; use std::path::PathBuf;
use std::ptr; use std::ptr;
@ -31,7 +31,7 @@ pub struct HlsVariant {
/// Current segment index /// Current segment index
idx: u64, idx: u64,
/// Output directory (base) /// Output directory (base)
out_dir: String, out_dir: PathBuf,
/// List of segments to be included in the playlist /// List of segments to be included in the playlist
segments: Vec<HlsSegment>, segments: Vec<HlsSegment>,
/// Type of segments to create /// Type of segments to create
@ -58,19 +58,22 @@ pub struct HlsVariant {
impl HlsVariant { impl HlsVariant {
pub fn new<'a>( pub fn new<'a>(
out_dir: &'a str, out_dir: PathBuf,
group: usize, group: usize,
encoded_vars: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, encoded_vars: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType, segment_type: SegmentType,
) -> Result<Self> { ) -> Result<Self> {
let name = format!("stream_{}", group); let name = format!("stream_{}", group);
let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type);
std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?; let var_dir = out_dir.join(&name);
if !var_dir.exists() {
create_dir_all(&var_dir)?;
}
let mut mux = unsafe { let mut mux = unsafe {
Muxer::builder() Muxer::builder()
.with_output_path( .with_output_path(
first_seg.as_str(), var_dir.join("1.ts").to_str().unwrap(),
match segment_type { match segment_type {
SegmentType::MPEGTS => Some("mpegts"), SegmentType::MPEGTS => Some("mpegts"),
SegmentType::FMP4 => Some("mp4"), SegmentType::FMP4 => Some("mp4"),
@ -155,7 +158,7 @@ impl HlsVariant {
streams, streams,
idx: 1, idx: 1,
segments: Vec::new(), segments: Vec::new(),
out_dir: out_dir.to_string(), out_dir: var_dir,
segment_type, segment_type,
current_segment_start: 0.0, current_segment_start: 0.0,
current_partial_start: 0.0, current_partial_start: 0.0,
@ -201,16 +204,8 @@ impl HlsVariant {
} }
} }
pub fn out_dir(&self) -> PathBuf { pub fn map_segment_path(&self, idx: u64, typ: SegmentType) -> PathBuf {
PathBuf::from(&self.out_dir).join(&self.name) self.out_dir.join(Self::segment_name(typ, idx))
}
pub fn map_segment_path(out_dir: &str, name: &str, idx: u64, typ: SegmentType) -> String {
PathBuf::from(out_dir)
.join(name)
.join(Self::segment_name(typ, idx))
.to_string_lossy()
.to_string()
} }
/// Process a single packet through the muxer /// Process a single packet through the muxer
@ -361,9 +356,8 @@ impl HlsVariant {
avio_close((*ctx).pb); avio_close((*ctx).pb);
av_free((*ctx).url as *mut _); av_free((*ctx).url as *mut _);
let next_seg_url = let next_seg_url = self.map_segment_path(self.idx, self.segment_type);
Self::map_segment_path(&self.out_dir, &self.name, self.idx, self.segment_type); (*ctx).url = cstr!(next_seg_url.to_str().unwrap());
(*ctx).url = cstr!(next_seg_url.as_str());
let ret = avio_open(&mut (*ctx).pb, (*ctx).url, AVIO_FLAG_WRITE); let ret = avio_open(&mut (*ctx).pb, (*ctx).url, AVIO_FLAG_WRITE);
if ret < 0 { if ret < 0 {
@ -371,22 +365,13 @@ impl HlsVariant {
} }
// Log the completed segment (previous index), not the next one // Log the completed segment (previous index), not the next one
let completed_seg_path = Self::map_segment_path( let completed_seg_path = self.map_segment_path(completed_segment_idx, self.segment_type);
&self.out_dir, let segment_size = completed_seg_path.metadata().map(|m| m.len()).unwrap_or(0);
&self.name,
completed_segment_idx,
self.segment_type,
);
let completed_segment_path = PathBuf::from(&completed_seg_path);
let segment_size = completed_segment_path
.metadata()
.map(|m| m.len())
.unwrap_or(0);
let cur_duration = next_pkt_start - self.current_segment_start; let cur_duration = next_pkt_start - self.current_segment_start;
debug!( debug!(
"Finished segment {} [{:.3}s, {:.2} kB, {} pkts]", "Finished segment {} [{:.3}s, {:.2} kB, {} pkts]",
completed_segment_path completed_seg_path
.file_name() .file_name()
.unwrap_or_default() .unwrap_or_default()
.to_string_lossy(), .to_string_lossy(),
@ -409,12 +394,7 @@ impl HlsVariant {
variant: video_var_id, variant: video_var_id,
idx: seg.index, idx: seg.index,
duration: seg.duration, duration: seg.duration,
path: PathBuf::from(Self::map_segment_path( path: self.map_segment_path(seg.index, self.segment_type),
&self.out_dir,
&self.name,
seg.index,
self.segment_type,
)),
}) })
.collect(); .collect();
@ -423,7 +403,7 @@ impl HlsVariant {
variant: video_var_id, variant: video_var_id,
idx: completed_segment_idx, idx: completed_segment_idx,
duration: cur_duration as f32, duration: cur_duration as f32,
path: completed_segment_path, path: completed_seg_path,
}; };
self.segments.push(HlsSegment::Full(SegmentInfo { self.segments.push(HlsSegment::Full(SegmentInfo {
@ -479,11 +459,10 @@ impl HlsVariant {
let mut ret = vec![]; let mut ret = vec![];
if let Some(seg_match) = drain_from_hls_segment { if let Some(seg_match) = drain_from_hls_segment {
if let Some(drain_pos) = self.segments.iter().position(|e| e == seg_match) { if let Some(drain_pos) = self.segments.iter().position(|e| e == seg_match) {
let seg_dir = self.out_dir();
for seg in self.segments.drain(..drain_pos) { for seg in self.segments.drain(..drain_pos) {
match seg { match seg {
HlsSegment::Full(seg) => { HlsSegment::Full(seg) => {
let seg_path = seg_dir.join(seg.filename()); let seg_path = self.out_dir.join(seg.filename());
if let Err(e) = std::fs::remove_file(&seg_path) { if let Err(e) = std::fs::remove_file(&seg_path) {
warn!( warn!(
"Failed to remove segment file: {} {}", "Failed to remove segment file: {} {}",
@ -564,7 +543,7 @@ impl HlsVariant {
.unwrap_or(self.idx); .unwrap_or(self.idx);
pl.end_list = false; pl.end_list = false;
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?; let mut f_out = File::create(self.out_dir.join("live.m3u8"))?;
pl.write_to(&mut f_out)?; pl.write_to(&mut f_out)?;
Ok(()) Ok(())
} }

View File

@ -117,7 +117,7 @@ pub struct PipelineRunner {
frame_ctr: u64, frame_ctr: u64,
/// Output directory where all stream data is saved /// Output directory where all stream data is saved
out_dir: String, out_dir: PathBuf,
/// Thumbnail generation interval (0 = disabled) /// Thumbnail generation interval (0 = disabled)
thumb_interval: u64, thumb_interval: u64,
@ -155,7 +155,7 @@ impl PipelineRunner {
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
handle, handle,
out_dir, out_dir: PathBuf::from(out_dir).join(connection.id.to_string()),
overseer, overseer,
connection, connection,
config: Default::default(), config: Default::default(),
@ -222,15 +222,11 @@ impl PipelineRunner {
unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> { unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> {
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
let frame = av_frame_clone(frame).addr(); let frame = av_frame_clone(frame).addr();
let dir = PathBuf::from(&self.out_dir).join(self.connection.id.to_string()); let dst_pic = self.out_dir.join("thumb.webp");
if !dir.exists() {
std::fs::create_dir_all(&dir)?;
}
std::thread::spawn(move || unsafe { std::thread::spawn(move || unsafe {
let mut frame = frame as *mut AVFrame; //TODO: danger?? let mut frame = frame as *mut AVFrame; //TODO: danger??
let thumb_start = Instant::now(); let thumb_start = Instant::now();
let dst_pic = dir.join("thumb.webp");
if let Err(e) = Self::save_thumb(frame, &dst_pic) { if let Err(e) = Self::save_thumb(frame, &dst_pic) {
warn!("Failed to save thumb: {}", e); warn!("Failed to save thumb: {}", e);
} }
@ -732,16 +728,11 @@ impl PipelineRunner {
}); });
match e { match e {
EgressType::HLS(_) => { EgressType::HLS(_) => {
let hls = HlsEgress::new( let hls = HlsEgress::new(self.out_dir.clone(), encoders, SegmentType::MPEGTS)?;
&self.connection.id,
&self.out_dir,
encoders,
SegmentType::MPEGTS,
)?;
self.egress.push(Box::new(hls)); self.egress.push(Box::new(hls));
} }
EgressType::Recorder(_) => { EgressType::Recorder(_) => {
let rec = RecorderEgress::new(&self.connection.id, &self.out_dir, encoders)?; let rec = RecorderEgress::new(self.out_dir.clone(), encoders)?;
self.egress.push(Box::new(rec)); self.egress.push(Box::new(rec));
} }
_ => warn!("{} is not implemented", e), _ => warn!("{} is not implemented", e),

View File

@ -225,12 +225,8 @@ impl HlsTimingTester {
]; ];
// Create HLS muxer // Create HLS muxer
let mut hls_muxer = HlsMuxer::new( let mut hls_muxer =
&stream_id, HlsMuxer::new(output_dir.to_path_buf(), variants.into_iter(), segment_type)?;
output_dir.to_str().unwrap(),
variants.into_iter(),
segment_type,
)?;
// Create frame generator // Create frame generator
let frame_size = unsafe { (*audio_encoder.codec_context()).frame_size as _ }; let frame_size = unsafe { (*audio_encoder.codec_context()).frame_size as _ };

View File

@ -643,8 +643,16 @@ fn analyze_init_segment(path: &Path) -> Result<InitSegmentInfo> {
let (codec_type_str, width, height, pixel_format) = match codec_type { let (codec_type_str, width, height, pixel_format) = match codec_type {
AVMEDIA_TYPE_VIDEO => { AVMEDIA_TYPE_VIDEO => {
let w = if (*codecpar).width > 0 { Some((*codecpar).width) } else { None }; let w = if (*codecpar).width > 0 {
let h = if (*codecpar).height > 0 { Some((*codecpar).height) } else { None }; Some((*codecpar).width)
} else {
None
};
let h = if (*codecpar).height > 0 {
Some((*codecpar).height)
} else {
None
};
let pix_fmt = if (*codecpar).format != AV_PIX_FMT_NONE as i32 { let pix_fmt = if (*codecpar).format != AV_PIX_FMT_NONE as i32 {
pixel_format_set = true; pixel_format_set = true;
@ -656,12 +664,8 @@ fn analyze_init_segment(path: &Path) -> Result<InitSegmentInfo> {
("video".to_string(), w, h, pix_fmt) ("video".to_string(), w, h, pix_fmt)
} }
AVMEDIA_TYPE_AUDIO => { AVMEDIA_TYPE_AUDIO => ("audio".to_string(), None, None, None),
("audio".to_string(), None, None, None) _ => ("other".to_string(), None, None, None),
}
_ => {
("other".to_string(), None, None, None)
}
}; };
streams.push(StreamInfo { streams.push(StreamInfo {

View File

@ -152,7 +152,7 @@ impl HttpServer {
.title .title
.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])), .unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
summary: stream.summary, summary: stream.summary,
live_url: format!("/{}/{}/live.m3u8", HlsEgress::PATH, stream.id), live_url: format!("/{}/{}/live.m3u8", stream.id, HlsEgress::PATH),
viewer_count: if viewer_count > 0 { viewer_count: if viewer_count > 0 {
Some(viewer_count as _) Some(viewer_count as _)
} else { } else {

View File

@ -229,13 +229,14 @@ impl ZapStreamOverseer {
pubkey: &Vec<u8>, pubkey: &Vec<u8>,
) -> Result<Event> { ) -> Result<Event> {
// TODO: remove assumption that HLS is enabled // TODO: remove assumption that HLS is enabled
let pipeline_dir = PathBuf::from(stream.id.to_string());
let extra_tags = vec![ let extra_tags = vec![
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse([ Tag::parse([
"streaming", "streaming",
self.map_to_public_url( self.map_to_public_url(
PathBuf::from(HlsEgress::PATH) pipeline_dir
.join(stream.id.to_string()) .join(HlsEgress::PATH)
.join("live.m3u8") .join("live.m3u8")
.to_str() .to_str()
.unwrap(), .unwrap(),
@ -244,12 +245,7 @@ impl ZapStreamOverseer {
])?, ])?,
Tag::parse([ Tag::parse([
"image", "image",
self.map_to_public_url( self.map_to_public_url(pipeline_dir.join("thumb.webp").to_str().unwrap())?
PathBuf::from(stream.id.to_string())
.join("thumb.webp")
.to_str()
.unwrap(),
)?
.as_str(), .as_str(),
])?, ])?,
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?, Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,