diff --git a/crates/core/src/egress/hls.rs b/crates/core/src/egress/hls.rs index 23fcc8e..0d0083a 100644 --- a/crates/core/src/egress/hls.rs +++ b/crates/core/src/egress/hls.rs @@ -17,18 +17,12 @@ impl HlsEgress { pub const PATH: &'static str = "hls"; pub fn new<'a>( - id: &Uuid, - out_dir: &str, + out_dir: PathBuf, encoders: impl Iterator, segment_type: SegmentType, ) -> Result { Ok(Self { - mux: HlsMuxer::new( - id, - PathBuf::from(out_dir).join(Self::PATH).to_str().unwrap(), - encoders, - segment_type, - )?, + mux: HlsMuxer::new(out_dir.join(Self::PATH), encoders, segment_type)?, }) } } diff --git a/crates/core/src/egress/recorder.rs b/crates/core/src/egress/recorder.rs index 3e7cca0..36c6558 100644 --- a/crates/core/src/egress/recorder.rs +++ b/crates/core/src/egress/recorder.rs @@ -10,8 +10,6 @@ use crate::egress::{Egress, EgressResult}; use crate::variant::{StreamMapping, VariantStream}; pub struct RecorderEgress { - /// Pipeline ID - id: Uuid, /// Internal muxer writing the output packets muxer: Muxer, /// Mapping from Variant ID to stream index @@ -20,15 +18,10 @@ pub struct RecorderEgress { impl RecorderEgress { pub fn new<'a>( - id: &Uuid, - out_dir: &str, + out_dir: PathBuf, variants: impl Iterator, ) -> Result { - let base = PathBuf::from(out_dir).join(id.to_string()); - - let out_file = base.join("recording.ts"); - fs::create_dir_all(&base)?; - + let out_file = out_dir.join("recording.ts"); let mut var_map = HashMap::new(); let muxer = unsafe { let mut m = Muxer::builder() @@ -41,11 +34,7 @@ impl RecorderEgress { m.open(None)?; m }; - Ok(Self { - id: *id, - muxer, - var_map, - }) + Ok(Self { muxer, var_map }) } } diff --git a/crates/core/src/mux/hls/mod.rs b/crates/core/src/mux/hls/mod.rs index 5f88cbc..5941f58 100644 --- a/crates/core/src/mux/hls/mod.rs +++ b/crates/core/src/mux/hls/mod.rs @@ -73,27 +73,24 @@ pub struct HlsMuxer { impl HlsMuxer { pub fn new<'a>( - id: &Uuid, - out_dir: &str, + out_dir: PathBuf, encoders: impl Iterator, segment_type: SegmentType, ) -> Result { - let base = PathBuf::from(out_dir).join(id.to_string()); - - if !base.exists() { - std::fs::create_dir_all(&base)?; + if !out_dir.exists() { + std::fs::create_dir_all(&out_dir)?; } let mut vars = Vec::new(); for (k, group) in &encoders .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) .chunk_by(|a| a.0.group_id()) { - let var = HlsVariant::new(base.to_str().unwrap(), k, group, segment_type)?; + let var = HlsVariant::new(out_dir.clone(), k, group, segment_type)?; vars.push(var); } let ret = Self { - out_dir: base, + out_dir, variants: vars, }; ret.write_master_playlist()?; diff --git a/crates/core/src/mux/hls/variant.rs b/crates/core/src/mux/hls/variant.rs index 736923e..05965b6 100644 --- a/crates/core/src/mux/hls/variant.rs +++ b/crates/core/src/mux/hls/variant.rs @@ -13,7 +13,7 @@ use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use log::{debug, info, trace, warn}; use m3u8_rs::{ExtTag, MediaSegmentType, PartInf, PreloadHint}; use std::collections::HashMap; -use std::fs::File; +use std::fs::{create_dir_all, File}; use std::path::PathBuf; use std::ptr; @@ -31,7 +31,7 @@ pub struct HlsVariant { /// Current segment index idx: u64, /// Output directory (base) - out_dir: String, + out_dir: PathBuf, /// List of segments to be included in the playlist segments: Vec, /// Type of segments to create @@ -58,19 +58,22 @@ pub struct HlsVariant { impl HlsVariant { pub fn new<'a>( - out_dir: &'a str, + out_dir: PathBuf, group: usize, encoded_vars: impl Iterator, segment_type: SegmentType, ) -> Result { let name = format!("stream_{}", group); - let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type); - std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?; + + let var_dir = out_dir.join(&name); + if !var_dir.exists() { + create_dir_all(&var_dir)?; + } let mut mux = unsafe { Muxer::builder() .with_output_path( - first_seg.as_str(), + var_dir.join("1.ts").to_str().unwrap(), match segment_type { SegmentType::MPEGTS => Some("mpegts"), SegmentType::FMP4 => Some("mp4"), @@ -155,7 +158,7 @@ impl HlsVariant { streams, idx: 1, segments: Vec::new(), - out_dir: out_dir.to_string(), + out_dir: var_dir, segment_type, current_segment_start: 0.0, current_partial_start: 0.0, @@ -201,16 +204,8 @@ impl HlsVariant { } } - pub fn out_dir(&self) -> PathBuf { - PathBuf::from(&self.out_dir).join(&self.name) - } - - pub fn map_segment_path(out_dir: &str, name: &str, idx: u64, typ: SegmentType) -> String { - PathBuf::from(out_dir) - .join(name) - .join(Self::segment_name(typ, idx)) - .to_string_lossy() - .to_string() + pub fn map_segment_path(&self, idx: u64, typ: SegmentType) -> PathBuf { + self.out_dir.join(Self::segment_name(typ, idx)) } /// Process a single packet through the muxer @@ -361,9 +356,8 @@ impl HlsVariant { avio_close((*ctx).pb); av_free((*ctx).url as *mut _); - let next_seg_url = - Self::map_segment_path(&self.out_dir, &self.name, self.idx, self.segment_type); - (*ctx).url = cstr!(next_seg_url.as_str()); + let next_seg_url = self.map_segment_path(self.idx, self.segment_type); + (*ctx).url = cstr!(next_seg_url.to_str().unwrap()); let ret = avio_open(&mut (*ctx).pb, (*ctx).url, AVIO_FLAG_WRITE); if ret < 0 { @@ -371,22 +365,13 @@ impl HlsVariant { } // Log the completed segment (previous index), not the next one - let completed_seg_path = Self::map_segment_path( - &self.out_dir, - &self.name, - completed_segment_idx, - self.segment_type, - ); - let completed_segment_path = PathBuf::from(&completed_seg_path); - let segment_size = completed_segment_path - .metadata() - .map(|m| m.len()) - .unwrap_or(0); + let completed_seg_path = self.map_segment_path(completed_segment_idx, self.segment_type); + let segment_size = completed_seg_path.metadata().map(|m| m.len()).unwrap_or(0); let cur_duration = next_pkt_start - self.current_segment_start; debug!( "Finished segment {} [{:.3}s, {:.2} kB, {} pkts]", - completed_segment_path + completed_seg_path .file_name() .unwrap_or_default() .to_string_lossy(), @@ -409,12 +394,7 @@ impl HlsVariant { variant: video_var_id, idx: seg.index, duration: seg.duration, - path: PathBuf::from(Self::map_segment_path( - &self.out_dir, - &self.name, - seg.index, - self.segment_type, - )), + path: self.map_segment_path(seg.index, self.segment_type), }) .collect(); @@ -423,7 +403,7 @@ impl HlsVariant { variant: video_var_id, idx: completed_segment_idx, duration: cur_duration as f32, - path: completed_segment_path, + path: completed_seg_path, }; self.segments.push(HlsSegment::Full(SegmentInfo { @@ -479,11 +459,10 @@ impl HlsVariant { let mut ret = vec![]; if let Some(seg_match) = drain_from_hls_segment { if let Some(drain_pos) = self.segments.iter().position(|e| e == seg_match) { - let seg_dir = self.out_dir(); for seg in self.segments.drain(..drain_pos) { match seg { HlsSegment::Full(seg) => { - let seg_path = seg_dir.join(seg.filename()); + let seg_path = self.out_dir.join(seg.filename()); if let Err(e) = std::fs::remove_file(&seg_path) { warn!( "Failed to remove segment file: {} {}", @@ -564,7 +543,7 @@ impl HlsVariant { .unwrap_or(self.idx); pl.end_list = false; - let mut f_out = File::create(self.out_dir().join("live.m3u8"))?; + let mut f_out = File::create(self.out_dir.join("live.m3u8"))?; pl.write_to(&mut f_out)?; Ok(()) } diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 6587140..60e3704 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -117,7 +117,7 @@ pub struct PipelineRunner { frame_ctr: u64, /// Output directory where all stream data is saved - out_dir: String, + out_dir: PathBuf, /// Thumbnail generation interval (0 = disabled) thumb_interval: u64, @@ -155,7 +155,7 @@ impl PipelineRunner { ) -> Result { Ok(Self { handle, - out_dir, + out_dir: PathBuf::from(out_dir).join(connection.id.to_string()), overseer, connection, config: Default::default(), @@ -222,15 +222,11 @@ impl PipelineRunner { unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> { if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { let frame = av_frame_clone(frame).addr(); - let dir = PathBuf::from(&self.out_dir).join(self.connection.id.to_string()); - if !dir.exists() { - std::fs::create_dir_all(&dir)?; - } + let dst_pic = self.out_dir.join("thumb.webp"); std::thread::spawn(move || unsafe { let mut frame = frame as *mut AVFrame; //TODO: danger?? let thumb_start = Instant::now(); - let dst_pic = dir.join("thumb.webp"); if let Err(e) = Self::save_thumb(frame, &dst_pic) { warn!("Failed to save thumb: {}", e); } @@ -732,16 +728,11 @@ impl PipelineRunner { }); match e { EgressType::HLS(_) => { - let hls = HlsEgress::new( - &self.connection.id, - &self.out_dir, - encoders, - SegmentType::MPEGTS, - )?; + let hls = HlsEgress::new(self.out_dir.clone(), encoders, SegmentType::MPEGTS)?; self.egress.push(Box::new(hls)); } EgressType::Recorder(_) => { - let rec = RecorderEgress::new(&self.connection.id, &self.out_dir, encoders)?; + let rec = RecorderEgress::new(self.out_dir.clone(), encoders)?; self.egress.push(Box::new(rec)); } _ => warn!("{} is not implemented", e), diff --git a/crates/core/src/test_hls_timing.rs b/crates/core/src/test_hls_timing.rs index 9a542de..5ac1994 100644 --- a/crates/core/src/test_hls_timing.rs +++ b/crates/core/src/test_hls_timing.rs @@ -225,12 +225,8 @@ impl HlsTimingTester { ]; // Create HLS muxer - let mut hls_muxer = HlsMuxer::new( - &stream_id, - output_dir.to_str().unwrap(), - variants.into_iter(), - segment_type, - )?; + let mut hls_muxer = + HlsMuxer::new(output_dir.to_path_buf(), variants.into_iter(), segment_type)?; // Create frame generator let frame_size = unsafe { (*audio_encoder.codec_context()).frame_size as _ }; diff --git a/crates/zap-stream/src/bin/hls_debug.rs b/crates/zap-stream/src/bin/hls_debug.rs index 0a638f4..96534b0 100644 --- a/crates/zap-stream/src/bin/hls_debug.rs +++ b/crates/zap-stream/src/bin/hls_debug.rs @@ -149,7 +149,7 @@ fn main() -> Result<()> { println!("Analyzing HLS stream: {}", hls_dir.display()); println!("Playlist: {}", playlist_path.display()); - + // Check for initialization segment let init_path = hls_dir.join("init.mp4"); if init_path.exists() { @@ -612,7 +612,7 @@ fn analyze_init_segment(path: &Path) -> Result { let file = fs::File::open(path) .with_context(|| format!("Failed to open init segment: {}", path.display()))?; - + let mut demuxer = Demuxer::new_custom_io(Box::new(file), None)?; // Probe the input to get stream information @@ -622,7 +622,7 @@ fn analyze_init_segment(path: &Path) -> Result { let mut streams = Vec::new(); let mut pixel_format_set = false; - + // Try to get streams - we'll iterate until we hit an error let mut i = 0; loop { @@ -631,7 +631,7 @@ fn analyze_init_segment(path: &Path) -> Result { Ok(stream) => unsafe { let codecpar = (*stream).codecpar; let codec_type = (*codecpar).codec_type; - + let codec_name = { let name_ptr = avcodec_get_name((*codecpar).codec_id); if name_ptr.is_null() { @@ -643,9 +643,17 @@ fn analyze_init_segment(path: &Path) -> Result { let (codec_type_str, width, height, pixel_format) = match codec_type { AVMEDIA_TYPE_VIDEO => { - let w = if (*codecpar).width > 0 { Some((*codecpar).width) } else { None }; - let h = if (*codecpar).height > 0 { Some((*codecpar).height) } else { None }; - + let w = if (*codecpar).width > 0 { + Some((*codecpar).width) + } else { + None + }; + let h = if (*codecpar).height > 0 { + Some((*codecpar).height) + } else { + None + }; + let pix_fmt = if (*codecpar).format != AV_PIX_FMT_NONE as i32 { pixel_format_set = true; // Skip pixel format name resolution for now due to type mismatch @@ -653,15 +661,11 @@ fn analyze_init_segment(path: &Path) -> Result { } else { None }; - + ("video".to_string(), w, h, pix_fmt) } - AVMEDIA_TYPE_AUDIO => { - ("audio".to_string(), None, None, None) - } - _ => { - ("other".to_string(), None, None, None) - } + AVMEDIA_TYPE_AUDIO => ("audio".to_string(), None, None, None), + _ => ("other".to_string(), None, None, None), }; streams.push(StreamInfo { @@ -671,7 +675,7 @@ fn analyze_init_segment(path: &Path) -> Result { height, pixel_format, }); - + i += 1; }, Err(_) => break, // No more streams diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index c6919b3..2e83e6c 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -152,7 +152,7 @@ impl HttpServer { .title .unwrap_or_else(|| format!("Stream {}", &stream.id[..8])), summary: stream.summary, - live_url: format!("/{}/{}/live.m3u8", HlsEgress::PATH, stream.id), + live_url: format!("/{}/{}/live.m3u8", stream.id, HlsEgress::PATH), viewer_count: if viewer_count > 0 { Some(viewer_count as _) } else { diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index aac9b6a..5e05a66 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -229,13 +229,14 @@ impl ZapStreamOverseer { pubkey: &Vec, ) -> Result { // TODO: remove assumption that HLS is enabled + let pipeline_dir = PathBuf::from(stream.id.to_string()); let extra_tags = vec![ Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse([ "streaming", self.map_to_public_url( - PathBuf::from(HlsEgress::PATH) - .join(stream.id.to_string()) + pipeline_dir + .join(HlsEgress::PATH) .join("live.m3u8") .to_str() .unwrap(), @@ -244,13 +245,8 @@ impl ZapStreamOverseer { ])?, Tag::parse([ "image", - self.map_to_public_url( - PathBuf::from(stream.id.to_string()) - .join("thumb.webp") - .to_str() - .unwrap(), - )? - .as_str(), + self.map_to_public_url(pipeline_dir.join("thumb.webp").to_str().unwrap())? + .as_str(), ])?, Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?, ]; @@ -642,7 +638,7 @@ fn get_variants_from_endpoint<'a>( bitrate: bitrate as u64, codec: "libx264".to_string(), profile: 77, // AV_PROFILE_H264_MAIN - level: 51, // High 5.1 (4K) + level: 51, // High 5.1 (4K) keyframe_interval: video_src.fps as u16, pixel_format: AV_PIX_FMT_YUV420P as u32, }));