feat: pass deleted segments list to overseer

This commit is contained in:
kieran 2025-01-30 13:10:11 +00:00
parent 6d1edb1b21
commit 3b151563a7
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
8 changed files with 150 additions and 81 deletions

View File

@ -14,11 +14,7 @@ impl Egress for HlsMuxer {
packet: *mut AVPacket, packet: *mut AVPacket,
variant: &Uuid, variant: &Uuid,
) -> Result<EgressResult> { ) -> Result<EgressResult> {
if let Some(ns) = self.mux_packet(packet, variant)? { self.mux_packet(packet, variant)
Ok(EgressResult::NewSegment(ns))
} else {
Ok(EgressResult::None)
}
} }
unsafe fn reset(&mut self) -> Result<()> { unsafe fn reset(&mut self) -> Result<()> {

View File

@ -25,13 +25,16 @@ pub trait Egress {
pub enum EgressResult { pub enum EgressResult {
/// Nothing to report /// Nothing to report
None, None,
/// A new segment was created /// Egress created/deleted some segments
NewSegment(NewSegment), Segments {
created: Vec<EgressSegment>,
deleted: Vec<EgressSegment>,
},
} }
/// Basic details of new segment created by a muxer /// Basic details of new segment created by a muxer
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NewSegment { pub struct EgressSegment {
/// The id of the variant (video or audio) /// The id of the variant (video or audio)
pub variant: Uuid, pub variant: Uuid,
/// Segment index /// Segment index

View File

@ -1,4 +1,4 @@
use crate::egress::NewSegment; use crate::egress::{EgressResult, EgressSegment};
use crate::variant::{StreamMapping, VariantStream}; use crate::variant::{StreamMapping, VariantStream};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
@ -79,6 +79,8 @@ pub struct HlsVariant {
pub streams: Vec<HlsVariantStream>, pub streams: Vec<HlsVariantStream>,
/// Segment length in seconds /// Segment length in seconds
pub segment_length: f32, pub segment_length: f32,
/// Total number of segments to store for this variant
pub segment_window: Option<u16>,
/// Current segment index /// Current segment index
pub idx: u64, pub idx: u64,
/// Current segment start time in seconds (duration) /// Current segment start time in seconds (duration)
@ -91,20 +93,24 @@ pub struct HlsVariant {
pub segment_type: SegmentType, pub segment_type: SegmentType,
} }
struct SegmentInfo(u64, f32, SegmentType); struct SegmentInfo {
pub index: u64,
pub duration: f32,
pub kind: SegmentType,
}
impl SegmentInfo { impl SegmentInfo {
fn to_media_segment(&self) -> MediaSegment { fn to_media_segment(&self) -> MediaSegment {
MediaSegment { MediaSegment {
uri: self.filename(), uri: self.filename(),
duration: self.1, duration: self.duration,
title: None, title: None,
..MediaSegment::default() ..MediaSegment::default()
} }
} }
fn filename(&self) -> String { fn filename(&self) -> String {
HlsVariant::segment_name(self.2, self.0) HlsVariant::segment_name(self.kind, self.index)
} }
} }
@ -175,11 +181,16 @@ impl HlsVariant {
Ok(Self { Ok(Self {
name: name.clone(), name: name.clone(),
segment_length, segment_length,
segment_window: Some(10), //TODO: configure window
mux, mux,
streams, streams,
idx: 1, idx: 1,
pkt_start: 0.0, pkt_start: 0.0,
segments: Vec::from([SegmentInfo(1, segment_length, segment_type)]), segments: Vec::from([SegmentInfo {
index: 1,
duration: segment_length,
kind: segment_type,
}]),
out_dir: out_dir.to_string(), out_dir: out_dir.to_string(),
segment_type, segment_type,
}) })
@ -205,21 +216,21 @@ impl HlsVariant {
} }
/// Mux a packet created by the encoder for this variant /// Mux a packet created by the encoder for this variant
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<Option<NewSegment>> { pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
let pkt_q = av_q2d((*pkt).time_base); let pkt_q = av_q2d((*pkt).time_base);
// time of this packet in seconds // time of this packet in seconds
let pkt_time = (*pkt).pts as f32 * pkt_q as f32; let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
// what segment this pkt should be in (index) // what segment this pkt should be in (index)
let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64; let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64;
let mut result = None; let mut result = EgressResult::None;
let pkt_stream = *(*self.mux.context()) let pkt_stream = *(*self.mux.context())
.streams .streams
.add((*pkt).stream_index as usize); .add((*pkt).stream_index as usize);
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO; && (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
if pkt_seg != self.idx && can_split { if pkt_seg != self.idx && can_split {
result = Some(self.split_next_seg(pkt_time)?); result = self.split_next_seg(pkt_time)?;
} }
self.mux.write_packet(pkt)?; self.mux.write_packet(pkt)?;
Ok(result) Ok(result)
@ -229,7 +240,8 @@ impl HlsVariant {
self.mux.close() self.mux.close()
} }
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<NewSegment> { /// Reset the muxer state and start the next segment
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<EgressResult> {
self.idx += 1; self.idx += 1;
// Manually reset muxer avio // Manually reset muxer avio
@ -257,19 +269,40 @@ impl HlsVariant {
let duration = pkt_time - self.pkt_start; let duration = pkt_time - self.pkt_start;
info!("Writing segment {} [{}s]", &next_seg_url, duration); info!("Writing segment {} [{}s]", &next_seg_url, duration);
if let Err(e) = self.add_segment(self.idx, duration) { if let Err(e) = self.push_segment(self.idx, duration) {
warn!("Failed to update playlist: {}", e); warn!("Failed to update playlist: {}", e);
} }
/// Get the video variant for this group /// Get the video variant for this group
/// since this could actually be audio which would not be useful for /// since this could actually be audio which would not be useful for
/// [Overseer] impl /// [Overseer] impl
let video_var = self.video_stream().unwrap_or(self.streams.first().unwrap()); let video_var_id = self
.video_stream()
.unwrap_or(self.streams.first().unwrap())
.id()
.clone();
// cleanup old segments
let deleted = self
.clean_segments()?
.into_iter()
.map(|seg| EgressSegment {
variant: video_var_id,
idx: seg.index,
duration: seg.duration,
path: PathBuf::from(Self::map_segment_path(
&self.out_dir,
&self.name,
seg.index,
self.segment_type,
)),
})
.collect();
// emit result of the previously completed segment, // emit result of the previously completed segment,
let prev_seg = self.idx - 1; let prev_seg = self.idx - 1;
let ret = NewSegment { let created = EgressSegment {
variant: *video_var.id(), variant: video_var_id,
idx: prev_seg, idx: prev_seg,
duration, duration,
path: PathBuf::from(Self::map_segment_path( path: PathBuf::from(Self::map_segment_path(
@ -280,7 +313,10 @@ impl HlsVariant {
)), )),
}; };
self.pkt_start = pkt_time; self.pkt_start = pkt_time;
Ok(ret) Ok(EgressResult::Segments {
created: vec![created],
deleted,
})
} }
fn video_stream(&self) -> Option<&HlsVariantStream> { fn video_stream(&self) -> Option<&HlsVariantStream> {
@ -289,22 +325,39 @@ impl HlsVariant {
.find(|a| matches!(*a, HlsVariantStream::Video { .. })) .find(|a| matches!(*a, HlsVariantStream::Video { .. }))
} }
fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> { /// Add a new segment to the variant and return a list of deleted segments
self.segments fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
.push(SegmentInfo(idx, duration, self.segment_type)); self.segments.push(SegmentInfo {
index: idx,
duration,
kind: self.segment_type,
});
self.write_playlist()
}
/// Delete segments which are too old
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
const MAX_SEGMENTS: usize = 10; const MAX_SEGMENTS: usize = 10;
let mut ret = vec![];
if self.segments.len() > MAX_SEGMENTS { if self.segments.len() > MAX_SEGMENTS {
let n_drain = self.segments.len() - MAX_SEGMENTS; let n_drain = self.segments.len() - MAX_SEGMENTS;
let seg_dir = self.out_dir(); let seg_dir = self.out_dir();
for seg in self.segments.drain(..n_drain) { for seg in self.segments.drain(..n_drain) {
// delete file // delete file
let seg_path = seg_dir.join(seg.filename()); let seg_path = seg_dir.join(seg.filename());
std::fs::remove_file(seg_path)?; if let Err(e) = std::fs::remove_file(&seg_path) {
warn!(
"Failed to remove segment file: {} {}",
seg_path.display(),
e
);
}
ret.push(seg);
} }
} }
self.write_playlist() Ok(ret)
} }
fn write_playlist(&mut self) -> Result<()> { fn write_playlist(&mut self) -> Result<()> {
@ -312,7 +365,7 @@ impl HlsVariant {
pl.target_duration = self.segment_length as u64; pl.target_duration = self.segment_length as u64;
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
pl.version = Some(3); pl.version = Some(3);
pl.media_sequence = self.segments.first().map(|s| s.0).unwrap_or(0); pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0);
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?; let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
pl.write_to(&mut f_out)?; pl.write_to(&mut f_out)?;
@ -430,7 +483,7 @@ impl HlsMuxer {
&mut self, &mut self,
pkt: *mut AVPacket, pkt: *mut AVPacket,
variant: &Uuid, variant: &Uuid,
) -> Result<Option<NewSegment>> { ) -> Result<EgressResult> {
for var in self.variants.iter_mut() { for var in self.variants.iter_mut() {
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) { if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
// very important for muxer to know which stream this pkt belongs to // very important for muxer to know which stream this pkt belongs to

View File

@ -1,5 +1,6 @@
use crate::ingress::ConnectionInfo; use crate::ingress::ConnectionInfo;
use crate::egress::EgressSegment;
use crate::pipeline::PipelineConfig; use crate::pipeline::PipelineConfig;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
@ -13,9 +14,6 @@ mod local;
#[cfg(feature = "webhook-overseer")] #[cfg(feature = "webhook-overseer")]
mod webhook; mod webhook;
#[cfg(feature = "zap-stream")]
mod zap_stream;
/// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr /// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr
#[derive(PartialEq, Clone)] #[derive(PartialEq, Clone)]
pub struct IngressInfo { pub struct IngressInfo {
@ -57,16 +55,14 @@ pub trait Overseer: Send + Sync {
stream_info: &IngressInfo, stream_info: &IngressInfo,
) -> Result<PipelineConfig>; ) -> Result<PipelineConfig>;
/// A new segment (HLS etc.) was generated for a stream variant /// A new segment(s) (HLS etc.) was generated for a stream variant
/// ///
/// This handler is usually used for distribution / billing /// This handler is usually used for distribution / billing
async fn on_segment( async fn on_segments(
&self, &self,
pipeline_id: &Uuid, pipeline_id: &Uuid,
variant_id: &Uuid, added: &Vec<EgressSegment>,
index: u64, deleted: &Vec<EgressSegment>,
duration: f32,
path: &PathBuf,
) -> Result<()>; ) -> Result<()>;
/// At a regular interval, pipeline will emit one of the frames for processing as a /// At a regular interval, pipeline will emit one of the frames for processing as a

View File

@ -270,10 +270,10 @@ impl PipelineRunner {
// egress results // egress results
self.handle.block_on(async { self.handle.block_on(async {
for er in egress_results { for er in egress_results {
if let EgressResult::NewSegment(seg) = er { if let EgressResult::Segments { created, deleted } = er {
if let Err(e) = self if let Err(e) = self
.overseer .overseer
.on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path) .on_segments(&config.id, &created, &deleted)
.await .await
{ {
bail!("Failed to process segment {}", e.to_string()); bail!("Failed to process segment {}", e.to_string());

View File

@ -108,7 +108,11 @@ impl Api {
let addr: SocketAddr = endpoint.parse().ok()?; let addr: SocketAddr = endpoint.parse().ok()?;
Some(Endpoint { Some(Endpoint {
name: "SRT".to_string(), name: "SRT".to_string(),
url: format!("srt://{}:{}", self.settings.endpoints_public_hostname, addr.port()), url: format!(
"srt://{}:{}",
self.settings.endpoints_public_hostname,
addr.port()
),
key: user.stream_key.clone(), key: user.stream_key.clone(),
capabilities: vec![], capabilities: vec![],
}) })
@ -117,7 +121,11 @@ impl Api {
let addr: SocketAddr = endpoint.parse().ok()?; let addr: SocketAddr = endpoint.parse().ok()?;
Some(Endpoint { Some(Endpoint {
name: "RTMP".to_string(), name: "RTMP".to_string(),
url: format!("rtmp://{}:{}", self.settings.endpoints_public_hostname, addr.port()), url: format!(
"rtmp://{}:{}",
self.settings.endpoints_public_hostname,
addr.port()
),
key: user.stream_key.clone(), key: user.stream_key.clone(),
capabilities: vec![], capabilities: vec![],
}) })
@ -126,7 +134,11 @@ impl Api {
let addr: SocketAddr = endpoint.parse().ok()?; let addr: SocketAddr = endpoint.parse().ok()?;
Some(Endpoint { Some(Endpoint {
name: "TCP".to_string(), name: "TCP".to_string(),
url: format!("tcp://{}:{}", self.settings.endpoints_public_hostname, addr.port()), url: format!(
"tcp://{}:{}",
self.settings.endpoints_public_hostname,
addr.port()
),
key: user.stream_key.clone(), key: user.stream_key.clone(),
capabilities: vec![], capabilities: vec![],
}) })

View File

@ -73,11 +73,7 @@ async fn main() -> Result<()> {
let api = Api::new(overseer.database(), settings.clone()); let api = Api::new(overseer.database(), settings.clone());
// HTTP server // HTTP server
let server = HttpServer::new( let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api);
index_html,
PathBuf::from(settings.output_dir),
api,
);
tasks.push(tokio::spawn(async move { tasks.push(tokio::spawn(async move {
let listener = TcpListener::bind(&http_addr).await?; let listener = TcpListener::bind(&http_addr).await?;

View File

@ -31,7 +31,7 @@ use tokio::sync::RwLock;
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
use zap_stream_core::egress::hls::HlsEgress; use zap_stream_core::egress::hls::HlsEgress;
use zap_stream_core::egress::EgressConfig; use zap_stream_core::egress::{EgressConfig, EgressSegment};
use zap_stream_core::ingress::ConnectionInfo; use zap_stream_core::ingress::ConnectionInfo;
use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer}; use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer};
use zap_stream_core::pipeline::{EgressType, PipelineConfig}; use zap_stream_core::pipeline::{EgressType, PipelineConfig};
@ -319,16 +319,16 @@ impl Overseer for ZapStreamOverseer {
}) })
} }
async fn on_segment( async fn on_segments(
&self, &self,
pipeline_id: &Uuid, pipeline_id: &Uuid,
variant_id: &Uuid, added: &Vec<EgressSegment>,
index: u64, deleted: &Vec<EgressSegment>,
duration: f32,
path: &PathBuf,
) -> Result<()> { ) -> Result<()> {
let cost = self.cost * duration.round() as i64;
let stream = self.db.get_stream(pipeline_id).await?; let stream = self.db.get_stream(pipeline_id).await?;
let duration = added.iter().fold(0.0, |acc, v| acc + v.duration);
let cost = self.cost * duration.round() as i64;
let bal = self let bal = self
.db .db
.tick_stream(pipeline_id, stream.user_id, duration, cost) .tick_stream(pipeline_id, stream.user_id, duration, cost)
@ -337,10 +337,11 @@ impl Overseer for ZapStreamOverseer {
bail!("Not enough balance"); bail!("Not enough balance");
} }
// Upload to blossom servers if configured // Upload to blossom servers if configured (N94)
let mut blobs = vec![]; let mut blobs = vec![];
for seg in added {
for b in &self.blossom_servers { for b in &self.blossom_servers {
blobs.push(b.upload(path, &self.keys, Some("video/mp2t")).await?); blobs.push(b.upload(&seg.path, &self.keys, Some("video/mp2t")).await?);
} }
if let Some(blob) = blobs.first() { if let Some(blob) = blobs.first() {
let a_tag = format!( let a_tag = format!(
@ -349,11 +350,22 @@ impl Overseer for ZapStreamOverseer {
self.keys.public_key.to_hex(), self.keys.public_key.to_hex(),
pipeline_id pipeline_id
); );
let mut n94 = self.blob_to_event_builder(blob)?.add_tags([ let mut n94 = self.blob_to_event_builder(blob)?.tags([
Tag::parse(["a", &a_tag])?, Tag::parse(["a", &a_tag])?,
Tag::parse(["d", variant_id.to_string().as_str()])?, Tag::parse(["d", seg.variant.to_string().as_str()])?,
Tag::parse(["duration", duration.to_string().as_str()])?, Tag::parse(["index", seg.idx.to_string().as_str()])?,
]); ]);
// some servers add duration tag
if blob
.nip94
.as_ref()
.map(|a| a.contains_key("duration"))
.is_none()
{
n94 = n94.tag(Tag::parse(["duration", seg.duration.to_string().as_str()])?);
}
for b in blobs.iter().skip(1) { for b in blobs.iter().skip(1) {
n94 = n94.tag(Tag::parse(["url", &b.url])?); n94 = n94.tag(Tag::parse(["url", &b.url])?);
} }
@ -366,6 +378,7 @@ impl Overseer for ZapStreamOverseer {
}); });
info!("Published N94 segment to {}", blob.url); info!("Published N94 segment to {}", blob.url);
} }
}
Ok(()) Ok(())
} }