diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 5e05a66..b2c64e1 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -357,7 +357,8 @@ impl Overseer for ZapStreamOverseer { // Get ingest endpoint configuration based on connection type let endpoint = self.detect_endpoint(&connection).await?; - let cfg = get_variants_from_endpoint(&stream_info, &endpoint)?; + let caps = parse_capabilities(&endpoint.capabilities.unwrap_or("".to_string())); + let cfg = get_variants_from_endpoint(&stream_info, &caps)?; if cfg.video_src.is_none() || cfg.variants.is_empty() { bail!("No video src found"); @@ -368,6 +369,27 @@ impl Overseer for ZapStreamOverseer { name: "hls".to_string(), variants: cfg.variants.iter().map(|v| v.id()).collect(), })); + if let Some(EndpointCapability::DVR { height }) = caps + .iter() + .find(|c| matches!(c, EndpointCapability::DVR { .. })) + { + let var = cfg.variants.iter().find(|v| match v { + VariantStream::Video(v) => v.height == *height, + _ => false, + }); + match var { + Some(var) => egress.push(EgressType::Recorder(EgressConfig { + name: "dvr".to_string(), + variants: [var.id()].into(), + })), + None => { + warn!( + "Invalid DVR config, no variant found with height {}", + height + ); + } + } + } let stream_id = connection.id.clone(); // insert new stream record @@ -551,13 +573,48 @@ struct EndpointConfig<'a> { variants: Vec, } +enum EndpointCapability { + SourceVariant, + Variant { height: u16, bitrate: u64 }, + DVR { height: u16 }, +} + +fn parse_capabilities(cap: &str) -> Vec { + cap.to_ascii_lowercase() + .split(',') + .map_while(|c| { + let cs = c.split(':').collect::>(); + match cs[0] { + "variant" if cs[1] == "source" => Some(EndpointCapability::SourceVariant), + "variant" if cs.len() == 3 => { + if let (Ok(h), Ok(br)) = (cs[1].parse(), cs[2].parse()) { + Some(EndpointCapability::Variant { + height: h, + bitrate: br, + }) + } else { + warn!("Invalid variant: {}", c); + None + } + } + "dvr" if cs.len() == 2 => { + if let Ok(h) = cs[1].parse() { + Some(EndpointCapability::DVR { height: h }) + } else { + warn!("Invalid dvr: {}", c); + None + } + } + _ => None, + } + }) + .collect() +} + fn get_variants_from_endpoint<'a>( info: &'a IngressInfo, - endpoint: &IngestEndpoint, + capabilities: &Vec, ) -> Result> { - let capabilities_str = endpoint.capabilities.as_deref().unwrap_or(""); - let capabilities: Vec<&str> = capabilities_str.split(',').collect(); - let mut vars: Vec = vec![]; let video_src = info @@ -574,43 +631,48 @@ fn get_variants_from_endpoint<'a>( let mut dst_index = 0; for capability in capabilities { - let parts: Vec<&str> = capability.split(':').collect(); + match capability { + EndpointCapability::SourceVariant => { + // Add copy variant (group for source) + if let Some(video_src) = video_src { + vars.push(VariantStream::CopyVideo(VariantMapping { + id: Uuid::new_v4(), + src_index: video_src.index, + dst_index, + group_id, + })); + dst_index += 1; + } - if parts.len() >= 2 && parts[0] == "variant" && parts[1] == "source" { - // Add copy variant (group for source) - if let Some(video_src) = video_src { - vars.push(VariantStream::CopyVideo(VariantMapping { - id: Uuid::new_v4(), - src_index: video_src.index, - dst_index, - group_id, - })); - dst_index += 1; + if let Some(audio_src) = audio_src { + vars.push(VariantStream::CopyAudio(VariantMapping { + id: Uuid::new_v4(), + src_index: audio_src.index, + dst_index, + group_id, + })); + dst_index += 1; + } + + group_id += 1; } - - if let Some(audio_src) = audio_src { - vars.push(VariantStream::CopyAudio(VariantMapping { - id: Uuid::new_v4(), - src_index: audio_src.index, - dst_index, - group_id, - })); - dst_index += 1; - } - - group_id += 1; - } else if parts.len() >= 3 && parts[0] == "variant" { - if let (Ok(target_height), Ok(bitrate)) = - (parts[1].parse::(), parts[2].parse::()) - { + EndpointCapability::Variant { height, bitrate } => { // Add video variant for this group if let Some(video_src) = video_src { + let output_height = *height; + if video_src.height < output_height as _ { + info!( + "Skipping variant {}p, source would be upscaled from {}p", + height, video_src.height + ); + continue; + } + // Calculate dimensions maintaining aspect ratio let input_width = video_src.width as f32; let input_height = video_src.height as f32; let aspect_ratio = input_width / input_height; - let output_height = target_height; let output_width = (output_height as f32 * aspect_ratio).round() as u16; // Ensure even dimensions for H.264 compatibility @@ -623,7 +685,7 @@ fn get_variants_from_endpoint<'a>( output_height + 1 } else { output_height - } as u16; + }; vars.push(VariantStream::Video(VideoVariant { mapping: VariantMapping { @@ -633,9 +695,9 @@ fn get_variants_from_endpoint<'a>( group_id, }, width: output_width, - height: output_height, + height: output_height as _, fps: video_src.fps, - bitrate: bitrate as u64, + bitrate: *bitrate as _, codec: "libx264".to_string(), profile: 77, // AV_PROFILE_H264_MAIN level: 51, // High 5.1 (4K) @@ -643,30 +705,30 @@ fn get_variants_from_endpoint<'a>( pixel_format: AV_PIX_FMT_YUV420P as u32, })); dst_index += 1; - } - // Add audio variant for the same group - if let Some(audio_src) = audio_src { - vars.push(VariantStream::Audio(AudioVariant { - mapping: VariantMapping { - id: Uuid::new_v4(), - src_index: audio_src.index, - dst_index, - group_id, - }, - bitrate: 192_000, - codec: "aac".to_string(), - channels: 2, - sample_rate: 48_000, - sample_fmt: "fltp".to_owned(), - })); - dst_index += 1; - } + // Add audio variant for the same group + if let Some(audio_src) = audio_src { + vars.push(VariantStream::Audio(AudioVariant { + mapping: VariantMapping { + id: Uuid::new_v4(), + src_index: audio_src.index, + dst_index, + group_id, + }, + bitrate: 192_000, + codec: "aac".to_string(), + channels: 2, + sample_rate: 48_000, + sample_fmt: "fltp".to_owned(), + })); + dst_index += 1; + } - group_id += 1; + group_id += 1; + } } + _ => {} } - // Handle other capabilities like dvr:720h here if needed } Ok(EndpointConfig {