fix: tag recording in ended stream event

This commit is contained in:
2025-06-20 10:46:26 +01:00
parent b1ebf75244
commit e6bddcf641
3 changed files with 78 additions and 48 deletions

View File

@ -18,11 +18,13 @@ pub struct RecorderEgress {
} }
impl RecorderEgress { impl RecorderEgress {
pub const FILENAME: &'static str = "recording.mp4";
pub fn new<'a>( pub fn new<'a>(
out_dir: PathBuf, out_dir: PathBuf,
variants: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, variants: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
) -> Result<Self> { ) -> Result<Self> {
let out_file = out_dir.join("recording.mp4"); let out_file = out_dir.join(Self::FILENAME);
let mut var_map = HashMap::new(); let mut var_map = HashMap::new();
let muxer = unsafe { let muxer = unsafe {
let mut m = Muxer::builder() let mut m = Muxer::builder()

View File

@ -358,10 +358,10 @@ impl ZapStreamDb {
} }
/// Get ingest endpoint by id /// Get ingest endpoint by id
pub async fn get_ingest_endpoint(&self, endpoint_id: u64) -> Result<Option<IngestEndpoint>> { pub async fn get_ingest_endpoint(&self, endpoint_id: u64) -> Result<IngestEndpoint> {
Ok(sqlx::query_as("select * from ingest_endpoint where id = ?") Ok(sqlx::query_as("select * from ingest_endpoint where id = ?")
.bind(endpoint_id) .bind(endpoint_id)
.fetch_optional(&self.db) .fetch_one(&self.db)
.await?) .await?)
} }

View File

@ -16,6 +16,7 @@ use tokio::sync::RwLock;
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
use zap_stream_core::egress::hls::HlsEgress; use zap_stream_core::egress::hls::HlsEgress;
use zap_stream_core::egress::recorder::RecorderEgress;
use zap_stream_core::egress::{EgressConfig, EgressSegment}; use zap_stream_core::egress::{EgressConfig, EgressSegment};
use zap_stream_core::ingress::ConnectionInfo; use zap_stream_core::ingress::ConnectionInfo;
use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
@ -230,19 +231,8 @@ impl ZapStreamOverseer {
) -> Result<Event> { ) -> Result<Event> {
// TODO: remove assumption that HLS is enabled // TODO: remove assumption that HLS is enabled
let pipeline_dir = PathBuf::from(stream.id.to_string()); let pipeline_dir = PathBuf::from(stream.id.to_string());
let extra_tags = vec![ let mut extra_tags = vec![
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse([
"streaming",
self.map_to_public_url(
pipeline_dir
.join(HlsEgress::PATH)
.join("live.m3u8")
.to_str()
.unwrap(),
)?
.as_str(),
])?,
Tag::parse([ Tag::parse([
"image", "image",
self.map_to_public_url(pipeline_dir.join("thumb.webp").to_str().unwrap())? self.map_to_public_url(pipeline_dir.join("thumb.webp").to_str().unwrap())?
@ -250,6 +240,43 @@ impl ZapStreamOverseer {
])?, ])?,
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?, Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
]; ];
match stream.state {
UserStreamState::Live => {
extra_tags.push(Tag::parse([
"streaming",
self.map_to_public_url(
pipeline_dir
.join(HlsEgress::PATH)
.join("live.m3u8")
.to_str()
.unwrap(),
)?
.as_str(),
])?);
}
UserStreamState::Ended => {
if let Some(ep) = stream.endpoint_id {
let endpoint = self.db.get_ingest_endpoint(ep).await?;
let caps = parse_capabilities(&endpoint.capabilities);
let has_recording = caps
.iter()
.any(|c| matches!(c, EndpointCapability::DVR { .. }));
if has_recording {
extra_tags.push(Tag::parse([
"recording",
self.map_to_public_url(
pipeline_dir
.join(RecorderEgress::FILENAME)
.to_str()
.unwrap(),
)?
.as_str(),
])?);
}
}
}
_ => {}
}
let ev = self let ev = self
.stream_to_event_builder(stream)? .stream_to_event_builder(stream)?
.tags(extra_tags) .tags(extra_tags)
@ -357,7 +384,7 @@ impl Overseer for ZapStreamOverseer {
// Get ingest endpoint configuration based on connection type // Get ingest endpoint configuration based on connection type
let endpoint = self.detect_endpoint(&connection).await?; let endpoint = self.detect_endpoint(&connection).await?;
let caps = parse_capabilities(&endpoint.capabilities.unwrap_or("".to_string())); let caps = parse_capabilities(&endpoint.capabilities);
let cfg = get_variants_from_endpoint(&stream_info, &caps)?; let cfg = get_variants_from_endpoint(&stream_info, &caps)?;
if cfg.video_src.is_none() || cfg.variants.is_empty() { if cfg.video_src.is_none() || cfg.variants.is_empty() {
@ -451,11 +478,8 @@ impl Overseer for ZapStreamOverseer {
// Get the cost per minute from the ingest endpoint, or use default // Get the cost per minute from the ingest endpoint, or use default
let cost_per_minute = if let Some(endpoint_id) = stream.endpoint_id { let cost_per_minute = if let Some(endpoint_id) = stream.endpoint_id {
if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? { let ep = self.db.get_ingest_endpoint(endpoint_id).await?;
endpoint.cost ep.cost
} else {
bail!("Endpoint doesnt exist");
}
} else { } else {
bail!("Endpoint id not set on stream"); bail!("Endpoint id not set on stream");
}; };
@ -586,36 +610,40 @@ enum EndpointCapability {
DVR { height: u16 }, DVR { height: u16 },
} }
fn parse_capabilities(cap: &str) -> Vec<EndpointCapability> { fn parse_capabilities(cap: &Option<String>) -> Vec<EndpointCapability> {
cap.to_ascii_lowercase() if let Some(cap) = cap {
.split(',') cap.to_ascii_lowercase()
.map_while(|c| { .split(',')
let cs = c.split(':').collect::<Vec<&str>>(); .map_while(|c| {
match cs[0] { let cs = c.split(':').collect::<Vec<&str>>();
"variant" if cs[1] == "source" => Some(EndpointCapability::SourceVariant), match cs[0] {
"variant" if cs.len() == 3 => { "variant" if cs[1] == "source" => Some(EndpointCapability::SourceVariant),
if let (Ok(h), Ok(br)) = (cs[1].parse(), cs[2].parse()) { "variant" if cs.len() == 3 => {
Some(EndpointCapability::Variant { if let (Ok(h), Ok(br)) = (cs[1].parse(), cs[2].parse()) {
height: h, Some(EndpointCapability::Variant {
bitrate: br, height: h,
}) bitrate: br,
} else { })
warn!("Invalid variant: {}", c); } else {
None warn!("Invalid variant: {}", c);
None
}
} }
} "dvr" if cs.len() == 2 => {
"dvr" if cs.len() == 2 => { if let Ok(h) = cs[1].parse() {
if let Ok(h) = cs[1].parse() { Some(EndpointCapability::DVR { height: h })
Some(EndpointCapability::DVR { height: h }) } else {
} else { warn!("Invalid dvr: {}", c);
warn!("Invalid dvr: {}", c); None
None }
} }
_ => None,
} }
_ => None, })
} .collect()
}) } else {
.collect() vec![]
}
} }
fn get_variants_from_endpoint<'a>( fn get_variants_from_endpoint<'a>(