mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 17:08:50 +00:00
fix: force match endpoint
This commit is contained in:
@ -353,18 +353,17 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get ingest endpoint configuration based on connection type
|
// Get ingest endpoint configuration based on connection type
|
||||||
let endpoint_id = self.detect_endpoint(&connection).await?;
|
let endpoint_id = self
|
||||||
let endpoint = if let Some(id) = endpoint_id {
|
.detect_endpoint(&connection)
|
||||||
self.db.get_ingest_endpoint(id).await?
|
.await?
|
||||||
} else {
|
.ok_or_else(|| anyhow::anyhow!("No ingest endpoints configured"))?;
|
||||||
None
|
let endpoint = self
|
||||||
};
|
.db
|
||||||
|
.get_ingest_endpoint(endpoint_id)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Ingest endpoint not found"))?;
|
||||||
|
|
||||||
let variants = if let Some(endpoint) = &endpoint {
|
let variants = get_variants_from_endpoint(&stream_info, &endpoint)?;
|
||||||
get_variants_from_endpoint(&stream_info, endpoint)?
|
|
||||||
} else {
|
|
||||||
get_default_variants(&stream_info)?
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut egress = vec![];
|
let mut egress = vec![];
|
||||||
egress.push(EgressType::HLS(EgressConfig {
|
egress.push(EgressType::HLS(EgressConfig {
|
||||||
@ -379,7 +378,7 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
user_id: uid,
|
user_id: uid,
|
||||||
starts: Utc::now(),
|
starts: Utc::now(),
|
||||||
state: UserStreamState::Live,
|
state: UserStreamState::Live,
|
||||||
endpoint_id,
|
endpoint_id: Some(endpoint_id),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
|
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
|
||||||
@ -524,111 +523,20 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
|
|
||||||
let mut vars: Vec<VariantStream> = vec![];
|
|
||||||
if let Some(video_src) = info
|
|
||||||
.streams
|
|
||||||
.iter()
|
|
||||||
.find(|c| c.stream_type == IngressStreamType::Video)
|
|
||||||
{
|
|
||||||
vars.push(VariantStream::CopyVideo(VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: video_src.index,
|
|
||||||
dst_index: 0,
|
|
||||||
group_id: 0,
|
|
||||||
}));
|
|
||||||
vars.push(VariantStream::Video(VideoVariant {
|
|
||||||
mapping: VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: video_src.index,
|
|
||||||
dst_index: 1,
|
|
||||||
group_id: 1,
|
|
||||||
},
|
|
||||||
width: 1280,
|
|
||||||
height: 720,
|
|
||||||
fps: video_src.fps,
|
|
||||||
bitrate: 3_000_000,
|
|
||||||
codec: "libx264".to_string(),
|
|
||||||
profile: 77, // AV_PROFILE_H264_MAIN
|
|
||||||
level: 51,
|
|
||||||
keyframe_interval: video_src.fps as u16 * 2,
|
|
||||||
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
|
||||||
}));
|
|
||||||
vars.push(VariantStream::Video(VideoVariant {
|
|
||||||
mapping: VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: video_src.index,
|
|
||||||
dst_index: 4,
|
|
||||||
group_id: 2,
|
|
||||||
},
|
|
||||||
width: 640,
|
|
||||||
height: 480,
|
|
||||||
fps: video_src.fps,
|
|
||||||
bitrate: 1_000_000,
|
|
||||||
codec: "libx264".to_string(),
|
|
||||||
profile: 77, // AV_PROFILE_H264_MAIN
|
|
||||||
level: 51,
|
|
||||||
keyframe_interval: video_src.fps as u16 * 2,
|
|
||||||
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(audio_src) = info
|
|
||||||
.streams
|
|
||||||
.iter()
|
|
||||||
.find(|c| c.stream_type == IngressStreamType::Audio)
|
|
||||||
{
|
|
||||||
vars.push(VariantStream::CopyAudio(VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: audio_src.index,
|
|
||||||
dst_index: 2,
|
|
||||||
group_id: 0,
|
|
||||||
}));
|
|
||||||
vars.push(VariantStream::Audio(AudioVariant {
|
|
||||||
mapping: VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: audio_src.index,
|
|
||||||
dst_index: 3,
|
|
||||||
group_id: 1,
|
|
||||||
},
|
|
||||||
bitrate: 192_000,
|
|
||||||
codec: "aac".to_string(),
|
|
||||||
channels: 2,
|
|
||||||
sample_rate: 48_000,
|
|
||||||
sample_fmt: "fltp".to_owned(),
|
|
||||||
}));
|
|
||||||
vars.push(VariantStream::Audio(AudioVariant {
|
|
||||||
mapping: VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: audio_src.index,
|
|
||||||
dst_index: 5,
|
|
||||||
group_id: 2,
|
|
||||||
},
|
|
||||||
bitrate: 64_000,
|
|
||||||
codec: "aac".to_string(),
|
|
||||||
channels: 2,
|
|
||||||
sample_rate: 48_000,
|
|
||||||
sample_fmt: "fltp".to_owned(),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(vars)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ZapStreamOverseer {
|
impl ZapStreamOverseer {
|
||||||
/// Detect which ingest endpoint should be used based on connection info
|
/// Detect which ingest endpoint should be used based on connection info
|
||||||
async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result<Option<u64>> {
|
async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result<Option<u64>> {
|
||||||
// Get all ingest endpoints and match by name against connection endpoint
|
// Get all ingest endpoints and match by name against connection endpoint
|
||||||
let endpoints = self.db.get_ingest_endpoints().await?;
|
let endpoints = self.db.get_ingest_endpoints().await?;
|
||||||
|
|
||||||
for endpoint in endpoints {
|
for endpoint in &endpoints {
|
||||||
if endpoint.name == connection.endpoint {
|
if endpoint.name == connection.endpoint {
|
||||||
return Ok(Some(endpoint.id));
|
return Ok(Some(endpoint.id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// No matching endpoint found
|
// No matching endpoint found, use the most expensive one
|
||||||
Ok(None)
|
Ok(endpoints.into_iter().max_by_key(|e| e.cost).map(|e| e.id))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user