diff --git a/crates/core/src/egress/hls.rs b/crates/core/src/egress/hls.rs index 3862458..0653e2f 100644 --- a/crates/core/src/egress/hls.rs +++ b/crates/core/src/egress/hls.rs @@ -14,11 +14,7 @@ impl Egress for HlsMuxer { packet: *mut AVPacket, variant: &Uuid, ) -> Result { - if let Some(ns) = self.mux_packet(packet, variant)? { - Ok(EgressResult::NewSegment(ns)) - } else { - Ok(EgressResult::None) - } + self.mux_packet(packet, variant) } unsafe fn reset(&mut self) -> Result<()> { diff --git a/crates/core/src/egress/mod.rs b/crates/core/src/egress/mod.rs index 4a5f8eb..f95b56d 100644 --- a/crates/core/src/egress/mod.rs +++ b/crates/core/src/egress/mod.rs @@ -25,13 +25,16 @@ pub trait Egress { pub enum EgressResult { /// Nothing to report None, - /// A new segment was created - NewSegment(NewSegment), + /// Egress created/deleted some segments + Segments { + created: Vec, + deleted: Vec, + }, } /// Basic details of new segment created by a muxer #[derive(Debug, Clone)] -pub struct NewSegment { +pub struct EgressSegment { /// The id of the variant (video or audio) pub variant: Uuid, /// Segment index diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index d41b570..002d115 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -1,4 +1,4 @@ -use crate::egress::NewSegment; +use crate::egress::{EgressResult, EgressSegment}; use crate::variant::{StreamMapping, VariantStream}; use anyhow::{bail, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; @@ -79,6 +79,8 @@ pub struct HlsVariant { pub streams: Vec, /// Segment length in seconds pub segment_length: f32, + /// Total number of segments to store for this variant + pub segment_window: Option, /// Current segment index pub idx: u64, /// Current segment start time in seconds (duration) @@ -91,20 +93,24 @@ pub struct HlsVariant { pub segment_type: SegmentType, } -struct SegmentInfo(u64, f32, SegmentType); +struct SegmentInfo { + pub index: u64, + pub duration: f32, + pub kind: SegmentType, +} impl SegmentInfo { fn to_media_segment(&self) -> MediaSegment { MediaSegment { uri: self.filename(), - duration: self.1, + duration: self.duration, title: None, ..MediaSegment::default() } } fn filename(&self) -> String { - HlsVariant::segment_name(self.2, self.0) + HlsVariant::segment_name(self.kind, self.index) } } @@ -175,11 +181,16 @@ impl HlsVariant { Ok(Self { name: name.clone(), segment_length, + segment_window: Some(10), //TODO: configure window mux, streams, idx: 1, pkt_start: 0.0, - segments: Vec::from([SegmentInfo(1, segment_length, segment_type)]), + segments: Vec::from([SegmentInfo { + index: 1, + duration: segment_length, + kind: segment_type, + }]), out_dir: out_dir.to_string(), segment_type, }) @@ -205,21 +216,21 @@ impl HlsVariant { } /// Mux a packet created by the encoder for this variant - pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result> { + pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result { let pkt_q = av_q2d((*pkt).time_base); // time of this packet in seconds let pkt_time = (*pkt).pts as f32 * pkt_q as f32; // what segment this pkt should be in (index) let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64; - let mut result = None; + let mut result = EgressResult::None; let pkt_stream = *(*self.mux.context()) .streams .add((*pkt).stream_index as usize); let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY && (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO; if pkt_seg != self.idx && can_split { - result = Some(self.split_next_seg(pkt_time)?); + result = self.split_next_seg(pkt_time)?; } self.mux.write_packet(pkt)?; Ok(result) @@ -229,7 +240,8 @@ impl HlsVariant { self.mux.close() } - unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result { + /// Reset the muxer state and start the next segment + unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result { self.idx += 1; // Manually reset muxer avio @@ -257,19 +269,40 @@ impl HlsVariant { let duration = pkt_time - self.pkt_start; info!("Writing segment {} [{}s]", &next_seg_url, duration); - if let Err(e) = self.add_segment(self.idx, duration) { + if let Err(e) = self.push_segment(self.idx, duration) { warn!("Failed to update playlist: {}", e); } /// Get the video variant for this group /// since this could actually be audio which would not be useful for /// [Overseer] impl - let video_var = self.video_stream().unwrap_or(self.streams.first().unwrap()); + let video_var_id = self + .video_stream() + .unwrap_or(self.streams.first().unwrap()) + .id() + .clone(); + + // cleanup old segments + let deleted = self + .clean_segments()? + .into_iter() + .map(|seg| EgressSegment { + variant: video_var_id, + idx: seg.index, + duration: seg.duration, + path: PathBuf::from(Self::map_segment_path( + &self.out_dir, + &self.name, + seg.index, + self.segment_type, + )), + }) + .collect(); // emit result of the previously completed segment, let prev_seg = self.idx - 1; - let ret = NewSegment { - variant: *video_var.id(), + let created = EgressSegment { + variant: video_var_id, idx: prev_seg, duration, path: PathBuf::from(Self::map_segment_path( @@ -280,7 +313,10 @@ impl HlsVariant { )), }; self.pkt_start = pkt_time; - Ok(ret) + Ok(EgressResult::Segments { + created: vec![created], + deleted, + }) } fn video_stream(&self) -> Option<&HlsVariantStream> { @@ -289,22 +325,39 @@ impl HlsVariant { .find(|a| matches!(*a, HlsVariantStream::Video { .. })) } - fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> { - self.segments - .push(SegmentInfo(idx, duration, self.segment_type)); + /// Add a new segment to the variant and return a list of deleted segments + fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> { + self.segments.push(SegmentInfo { + index: idx, + duration, + kind: self.segment_type, + }); + self.write_playlist() + } + + /// Delete segments which are too old + fn clean_segments(&mut self) -> Result> { const MAX_SEGMENTS: usize = 10; + let mut ret = vec![]; if self.segments.len() > MAX_SEGMENTS { let n_drain = self.segments.len() - MAX_SEGMENTS; let seg_dir = self.out_dir(); for seg in self.segments.drain(..n_drain) { // delete file let seg_path = seg_dir.join(seg.filename()); - std::fs::remove_file(seg_path)?; + if let Err(e) = std::fs::remove_file(&seg_path) { + warn!( + "Failed to remove segment file: {} {}", + seg_path.display(), + e + ); + } + ret.push(seg); } } - self.write_playlist() + Ok(ret) } fn write_playlist(&mut self) -> Result<()> { @@ -312,7 +365,7 @@ impl HlsVariant { pl.target_duration = self.segment_length as u64; pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); pl.version = Some(3); - pl.media_sequence = self.segments.first().map(|s| s.0).unwrap_or(0); + pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0); let mut f_out = File::create(self.out_dir().join("live.m3u8"))?; pl.write_to(&mut f_out)?; @@ -430,7 +483,7 @@ impl HlsMuxer { &mut self, pkt: *mut AVPacket, variant: &Uuid, - ) -> Result> { + ) -> Result { for var in self.variants.iter_mut() { if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) { // very important for muxer to know which stream this pkt belongs to diff --git a/crates/core/src/overseer/mod.rs b/crates/core/src/overseer/mod.rs index 538ec56..2ac226e 100644 --- a/crates/core/src/overseer/mod.rs +++ b/crates/core/src/overseer/mod.rs @@ -1,5 +1,6 @@ use crate::ingress::ConnectionInfo; +use crate::egress::EgressSegment; use crate::pipeline::PipelineConfig; use anyhow::Result; use async_trait::async_trait; @@ -13,9 +14,6 @@ mod local; #[cfg(feature = "webhook-overseer")] mod webhook; -#[cfg(feature = "zap-stream")] -mod zap_stream; - /// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr #[derive(PartialEq, Clone)] pub struct IngressInfo { @@ -57,16 +55,14 @@ pub trait Overseer: Send + Sync { stream_info: &IngressInfo, ) -> Result; - /// A new segment (HLS etc.) was generated for a stream variant + /// A new segment(s) (HLS etc.) was generated for a stream variant /// /// This handler is usually used for distribution / billing - async fn on_segment( + async fn on_segments( &self, pipeline_id: &Uuid, - variant_id: &Uuid, - index: u64, - duration: f32, - path: &PathBuf, + added: &Vec, + deleted: &Vec, ) -> Result<()>; /// At a regular interval, pipeline will emit one of the frames for processing as a @@ -81,4 +77,4 @@ pub trait Overseer: Send + Sync { /// Stream is finished async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>; -} \ No newline at end of file +} diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 21978d7..03639fd 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -270,10 +270,10 @@ impl PipelineRunner { // egress results self.handle.block_on(async { for er in egress_results { - if let EgressResult::NewSegment(seg) = er { + if let EgressResult::Segments { created, deleted } = er { if let Err(e) = self .overseer - .on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path) + .on_segments(&config.id, &created, &deleted) .await { bail!("Failed to process segment {}", e.to_string()); diff --git a/crates/zap-stream/src/api.rs b/crates/zap-stream/src/api.rs index 9229960..4d3987e 100644 --- a/crates/zap-stream/src/api.rs +++ b/crates/zap-stream/src/api.rs @@ -108,7 +108,11 @@ impl Api { let addr: SocketAddr = endpoint.parse().ok()?; Some(Endpoint { name: "SRT".to_string(), - url: format!("srt://{}:{}", self.settings.endpoints_public_hostname, addr.port()), + url: format!( + "srt://{}:{}", + self.settings.endpoints_public_hostname, + addr.port() + ), key: user.stream_key.clone(), capabilities: vec![], }) @@ -117,7 +121,11 @@ impl Api { let addr: SocketAddr = endpoint.parse().ok()?; Some(Endpoint { name: "RTMP".to_string(), - url: format!("rtmp://{}:{}", self.settings.endpoints_public_hostname, addr.port()), + url: format!( + "rtmp://{}:{}", + self.settings.endpoints_public_hostname, + addr.port() + ), key: user.stream_key.clone(), capabilities: vec![], }) @@ -126,7 +134,11 @@ impl Api { let addr: SocketAddr = endpoint.parse().ok()?; Some(Endpoint { name: "TCP".to_string(), - url: format!("tcp://{}:{}", self.settings.endpoints_public_hostname, addr.port()), + url: format!( + "tcp://{}:{}", + self.settings.endpoints_public_hostname, + addr.port() + ), key: user.stream_key.clone(), capabilities: vec![], }) diff --git a/crates/zap-stream/src/main.rs b/crates/zap-stream/src/main.rs index 10f339b..20c8841 100644 --- a/crates/zap-stream/src/main.rs +++ b/crates/zap-stream/src/main.rs @@ -73,11 +73,7 @@ async fn main() -> Result<()> { let api = Api::new(overseer.database(), settings.clone()); // HTTP server - let server = HttpServer::new( - index_html, - PathBuf::from(settings.output_dir), - api, - ); + let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api); tasks.push(tokio::spawn(async move { let listener = TcpListener::bind(&http_addr).await?; diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index edfa040..00b55a4 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -31,7 +31,7 @@ use tokio::sync::RwLock; use url::Url; use uuid::Uuid; use zap_stream_core::egress::hls::HlsEgress; -use zap_stream_core::egress::EgressConfig; +use zap_stream_core::egress::{EgressConfig, EgressSegment}; use zap_stream_core::ingress::ConnectionInfo; use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer}; use zap_stream_core::pipeline::{EgressType, PipelineConfig}; @@ -319,16 +319,16 @@ impl Overseer for ZapStreamOverseer { }) } - async fn on_segment( + async fn on_segments( &self, pipeline_id: &Uuid, - variant_id: &Uuid, - index: u64, - duration: f32, - path: &PathBuf, + added: &Vec, + deleted: &Vec, ) -> Result<()> { - let cost = self.cost * duration.round() as i64; let stream = self.db.get_stream(pipeline_id).await?; + + let duration = added.iter().fold(0.0, |acc, v| acc + v.duration); + let cost = self.cost * duration.round() as i64; let bal = self .db .tick_stream(pipeline_id, stream.user_id, duration, cost) @@ -337,34 +337,47 @@ impl Overseer for ZapStreamOverseer { bail!("Not enough balance"); } - // Upload to blossom servers if configured + // Upload to blossom servers if configured (N94) let mut blobs = vec![]; - for b in &self.blossom_servers { - blobs.push(b.upload(path, &self.keys, Some("video/mp2t")).await?); - } - if let Some(blob) = blobs.first() { - let a_tag = format!( - "{}:{}:{}", - STREAM_EVENT_KIND, - self.keys.public_key.to_hex(), - pipeline_id - ); - let mut n94 = self.blob_to_event_builder(blob)?.add_tags([ - Tag::parse(["a", &a_tag])?, - Tag::parse(["d", variant_id.to_string().as_str()])?, - Tag::parse(["duration", duration.to_string().as_str()])?, - ]); - for b in blobs.iter().skip(1) { - n94 = n94.tag(Tag::parse(["url", &b.url])?); + for seg in added { + for b in &self.blossom_servers { + blobs.push(b.upload(&seg.path, &self.keys, Some("video/mp2t")).await?); } - let n94 = n94.sign_with_keys(&self.keys)?; - let cc = self.client.clone(); - tokio::spawn(async move { - if let Err(e) = cc.send_event(n94).await { - warn!("Error sending event: {}", e); + if let Some(blob) = blobs.first() { + let a_tag = format!( + "{}:{}:{}", + STREAM_EVENT_KIND, + self.keys.public_key.to_hex(), + pipeline_id + ); + let mut n94 = self.blob_to_event_builder(blob)?.tags([ + Tag::parse(["a", &a_tag])?, + Tag::parse(["d", seg.variant.to_string().as_str()])?, + Tag::parse(["index", seg.idx.to_string().as_str()])?, + ]); + + // some servers add duration tag + if blob + .nip94 + .as_ref() + .map(|a| a.contains_key("duration")) + .is_none() + { + n94 = n94.tag(Tag::parse(["duration", seg.duration.to_string().as_str()])?); } - }); - info!("Published N94 segment to {}", blob.url); + + for b in blobs.iter().skip(1) { + n94 = n94.tag(Tag::parse(["url", &b.url])?); + } + let n94 = n94.sign_with_keys(&self.keys)?; + let cc = self.client.clone(); + tokio::spawn(async move { + if let Err(e) = cc.send_event(n94).await { + warn!("Error sending event: {}", e); + } + }); + info!("Published N94 segment to {}", blob.url); + } } Ok(())