From cc973f0d9b98ae9e064fb8afab4aa2c9d212e333 Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 12 Jun 2025 17:29:22 +0100 Subject: [PATCH] chore: format thread names --- crates/core/src/ingress/file.rs | 2 +- crates/core/src/ingress/mod.rs | 9 ++++++--- crates/core/src/ingress/rtmp.rs | 10 ++++------ crates/core/src/ingress/srt.rs | 2 +- crates/core/src/ingress/tcp.rs | 2 +- crates/core/src/ingress/test.rs | 16 ++++++++++++---- 6 files changed, 25 insertions(+), 16 deletions(-) diff --git a/crates/core/src/ingress/file.rs b/crates/core/src/ingress/file.rs index 27b07f8..c68a837 100644 --- a/crates/core/src/ingress/file.rs +++ b/crates/core/src/ingress/file.rs @@ -13,7 +13,7 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) let info = ConnectionInfo { id: Uuid::new_v4(), ip_addr: "127.0.0.1:6969".to_string(), - endpoint: "file-input".to_owned(), + endpoint: "file-input", app_name: "".to_string(), key: "test".to_string(), }; diff --git a/crates/core/src/ingress/mod.rs b/crates/core/src/ingress/mod.rs index c88addd..8ddf0ab 100644 --- a/crates/core/src/ingress/mod.rs +++ b/crates/core/src/ingress/mod.rs @@ -21,8 +21,8 @@ pub struct ConnectionInfo { /// Unique ID of this connection / pipeline pub id: Uuid, - /// Endpoint of the ingress - pub endpoint: String, + /// Name of the ingest point + pub endpoint: &'static str, /// IP address of the connection pub ip_addr: String, @@ -58,7 +58,10 @@ pub fn run_pipeline(mut pl: PipelineRunner) -> anyhow::Result<()> { info!("New client connected: {}", &pl.connection.ip_addr); std::thread::Builder::new() - .name(format!("pipeline-{}", pl.connection.id)) + .name(format!( + "client:{}:{}", + pl.connection.endpoint, pl.connection.id + )) .spawn(move || { pl.run(); })?; diff --git a/crates/core/src/ingress/rtmp.rs b/crates/core/src/ingress/rtmp.rs index 2a73ec3..ac6be5f 100644 --- a/crates/core/src/ingress/rtmp.rs +++ b/crates/core/src/ingress/rtmp.rs @@ -272,12 +272,12 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) info!("RTMP listening on: {}", &addr); while let Ok((socket, ip)) = listener.accept().await { let mut cc = RtmpClient::new(socket.into_std()?)?; - let addr = addr.clone(); let overseer = overseer.clone(); let out_dir = out_dir.clone(); let handle = Handle::current(); + let new_id = Uuid::new_v4(); std::thread::Builder::new() - .name("rtmp-client".to_string()) + .name(format!("client:rtmp:{}", new_id)) .spawn(move || { if let Err(e) = cc.handshake() { bail!("Error during handshake: {}", e) @@ -288,9 +288,9 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) let pr = cc.published_stream.as_ref().unwrap(); let info = ConnectionInfo { - id: Uuid::new_v4(), + id: new_id, ip_addr: ip.to_string(), - endpoint: addr.clone(), + endpoint: "rtmp", app_name: pr.0.clone(), key: pr.1.clone(), }; @@ -307,8 +307,6 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) bail!("Failed to create PipelineRunner {}", e) } }; - //pl.set_demuxer_format("flv"); - //pl.set_demuxer_buffer_size(1024 * 64); pl.run(); Ok(()) })?; diff --git a/crates/core/src/ingress/srt.rs b/crates/core/src/ingress/srt.rs index 86d50ea..1b590eb 100644 --- a/crates/core/src/ingress/srt.rs +++ b/crates/core/src/ingress/srt.rs @@ -22,7 +22,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) let socket = request.accept(None).await?; let info = ConnectionInfo { id: Uuid::new_v4(), - endpoint: addr.clone(), + endpoint: "srt", ip_addr: socket.settings().remote.to_string(), app_name: "".to_string(), key: socket diff --git a/crates/core/src/ingress/tcp.rs b/crates/core/src/ingress/tcp.rs index 0fd96c7..982a49b 100644 --- a/crates/core/src/ingress/tcp.rs +++ b/crates/core/src/ingress/tcp.rs @@ -15,7 +15,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) let info = ConnectionInfo { id: Uuid::new_v4(), ip_addr: ip.to_string(), - endpoint: addr.clone(), + endpoint: "tcp", app_name: "".to_string(), key: "test".to_string(), }; diff --git a/crates/core/src/ingress/test.rs b/crates/core/src/ingress/test.rs index 71a7699..249c045 100644 --- a/crates/core/src/ingress/test.rs +++ b/crates/core/src/ingress/test.rs @@ -4,7 +4,9 @@ use crate::overseer::Overseer; use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_frame_free, av_packet_free, AV_PROFILE_H264_MAIN, AVRational}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_frame_free, av_packet_free, AVRational, AV_PROFILE_H264_MAIN, +}; use ffmpeg_rs_raw::{Encoder, Muxer}; use log::info; use ringbuf::traits::{Observer, Split}; @@ -25,7 +27,7 @@ pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> let info = ConnectionInfo { id: Uuid::new_v4(), - endpoint: "test-pattern".to_string(), + endpoint: "test-pattern", ip_addr: "test-pattern".to_string(), app_name: "".to_string(), key: "test".to_string(), @@ -115,8 +117,14 @@ impl TestPatternSrc { SAMPLE_RATE, frame_size, 1, - AVRational { num: 1, den: VIDEO_FPS as i32 }, - AVRational { num: 1, den: SAMPLE_RATE as i32 }, + AVRational { + num: 1, + den: VIDEO_FPS as i32, + }, + AVRational { + num: 1, + den: SAMPLE_RATE as i32, + }, )?, video_encoder, audio_encoder,