diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index c86c3ed..c3f944c 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -353,18 +353,17 @@ impl Overseer for ZapStreamOverseer { } // Get ingest endpoint configuration based on connection type - let endpoint_id = self.detect_endpoint(&connection).await?; - let endpoint = if let Some(id) = endpoint_id { - self.db.get_ingest_endpoint(id).await? - } else { - None - }; + let endpoint_id = self + .detect_endpoint(&connection) + .await? + .ok_or_else(|| anyhow::anyhow!("No ingest endpoints configured"))?; + let endpoint = self + .db + .get_ingest_endpoint(endpoint_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Ingest endpoint not found"))?; - let variants = if let Some(endpoint) = &endpoint { - get_variants_from_endpoint(&stream_info, endpoint)? - } else { - get_default_variants(&stream_info)? - }; + let variants = get_variants_from_endpoint(&stream_info, &endpoint)?; let mut egress = vec![]; egress.push(EgressType::HLS(EgressConfig { @@ -379,7 +378,7 @@ impl Overseer for ZapStreamOverseer { user_id: uid, starts: Utc::now(), state: UserStreamState::Live, - endpoint_id, + endpoint_id: Some(endpoint_id), ..Default::default() }; let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; @@ -524,111 +523,20 @@ impl Overseer for ZapStreamOverseer { } } -fn get_default_variants(info: &IngressInfo) -> Result> { - let mut vars: Vec = vec![]; - if let Some(video_src) = info - .streams - .iter() - .find(|c| c.stream_type == IngressStreamType::Video) - { - vars.push(VariantStream::CopyVideo(VariantMapping { - id: Uuid::new_v4(), - src_index: video_src.index, - dst_index: 0, - group_id: 0, - })); - vars.push(VariantStream::Video(VideoVariant { - mapping: VariantMapping { - id: Uuid::new_v4(), - src_index: video_src.index, - dst_index: 1, - group_id: 1, - }, - width: 1280, - height: 720, - fps: video_src.fps, - bitrate: 3_000_000, - codec: "libx264".to_string(), - profile: 77, // AV_PROFILE_H264_MAIN - level: 51, - keyframe_interval: video_src.fps as u16 * 2, - pixel_format: AV_PIX_FMT_YUV420P as u32, - })); - vars.push(VariantStream::Video(VideoVariant { - mapping: VariantMapping { - id: Uuid::new_v4(), - src_index: video_src.index, - dst_index: 4, - group_id: 2, - }, - width: 640, - height: 480, - fps: video_src.fps, - bitrate: 1_000_000, - codec: "libx264".to_string(), - profile: 77, // AV_PROFILE_H264_MAIN - level: 51, - keyframe_interval: video_src.fps as u16 * 2, - pixel_format: AV_PIX_FMT_YUV420P as u32, - })); - } - - if let Some(audio_src) = info - .streams - .iter() - .find(|c| c.stream_type == IngressStreamType::Audio) - { - vars.push(VariantStream::CopyAudio(VariantMapping { - id: Uuid::new_v4(), - src_index: audio_src.index, - dst_index: 2, - group_id: 0, - })); - vars.push(VariantStream::Audio(AudioVariant { - mapping: VariantMapping { - id: Uuid::new_v4(), - src_index: audio_src.index, - dst_index: 3, - group_id: 1, - }, - bitrate: 192_000, - codec: "aac".to_string(), - channels: 2, - sample_rate: 48_000, - sample_fmt: "fltp".to_owned(), - })); - vars.push(VariantStream::Audio(AudioVariant { - mapping: VariantMapping { - id: Uuid::new_v4(), - src_index: audio_src.index, - dst_index: 5, - group_id: 2, - }, - bitrate: 64_000, - codec: "aac".to_string(), - channels: 2, - sample_rate: 48_000, - sample_fmt: "fltp".to_owned(), - })); - } - - Ok(vars) -} - impl ZapStreamOverseer { /// Detect which ingest endpoint should be used based on connection info async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result> { // Get all ingest endpoints and match by name against connection endpoint let endpoints = self.db.get_ingest_endpoints().await?; - for endpoint in endpoints { + for endpoint in &endpoints { if endpoint.name == connection.endpoint { return Ok(Some(endpoint.id)); } } - // No matching endpoint found - Ok(None) + // No matching endpoint found, use the most expensive one + Ok(endpoints.into_iter().max_by_key(|e| e.cost).map(|e| e.id)) } }