From e41d395bff9975cdb3a2289ae4c4737b4aa5a3b3 Mon Sep 17 00:00:00 2001 From: Kieran Date: Fri, 6 Jun 2025 15:43:33 +0100 Subject: [PATCH] core: add extra logging --- crates/core/src/ingress/rtmp.rs | 30 +++++++++++++++++++++++++++++- crates/core/src/ingress/srt.rs | 29 +++++++++++++++++++++++++++++ crates/core/src/mux/hls.rs | 6 +++++- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/crates/core/src/ingress/rtmp.rs b/crates/core/src/ingress/rtmp.rs index 15e0491..9e71d72 100644 --- a/crates/core/src/ingress/rtmp.rs +++ b/crates/core/src/ingress/rtmp.rs @@ -26,6 +26,9 @@ struct RtmpClient { msg_queue: VecDeque, reader_buf: [u8; 4096], pub published_stream: Option, + last_buffer_log: Instant, + bytes_processed: u64, + frames_received: u64, } impl RtmpClient { @@ -33,11 +36,33 @@ impl RtmpClient { fn add_to_media_buffer(&mut self, data: &[u8]) { if self.media_buf.len() + data.len() > MAX_MEDIA_BUFFER_SIZE { let bytes_to_drop = (self.media_buf.len() + data.len()) - MAX_MEDIA_BUFFER_SIZE; - warn!("Media buffer full ({} bytes), dropping {} oldest bytes", + warn!("RTMP buffer full ({} bytes), dropping {} oldest bytes", self.media_buf.len(), bytes_to_drop); self.media_buf.drain(..bytes_to_drop); } self.media_buf.extend(data); + + // Update performance counters + self.bytes_processed += data.len() as u64; + self.frames_received += 1; + + // Log buffer status every 5 seconds + if self.last_buffer_log.elapsed().as_secs() >= 5 { + let buffer_util = (self.media_buf.len() as f32 / MAX_MEDIA_BUFFER_SIZE as f32) * 100.0; + let elapsed = self.last_buffer_log.elapsed(); + let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0); + let fps = self.frames_received as f64 / elapsed.as_secs_f64(); + + info!( + "RTMP ingress: {:.1} Mbps, {:.1} frames/sec, buffer: {}% ({}/{} bytes)", + mbps, fps, buffer_util as u32, self.media_buf.len(), MAX_MEDIA_BUFFER_SIZE + ); + + // Reset counters + self.last_buffer_log = Instant::now(); + self.bytes_processed = 0; + self.frames_received = 0; + } } async fn start(mut socket: TcpStream) -> Result { @@ -75,6 +100,9 @@ impl RtmpClient { msg_queue: VecDeque::from(res), reader_buf: [0; 4096], published_stream: None, + last_buffer_log: Instant::now(), + bytes_processed: 0, + frames_received: 0, }; return Ok(ret); diff --git a/crates/core/src/ingress/srt.rs b/crates/core/src/ingress/srt.rs index 08fbeb4..2d6126a 100644 --- a/crates/core/src/ingress/srt.rs +++ b/crates/core/src/ingress/srt.rs @@ -8,6 +8,7 @@ use srt_tokio::{SrtListener, SrtSocket}; use std::io::Read; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Instant; use tokio::runtime::Handle; const MAX_SRT_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit @@ -38,6 +39,9 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) handle: Handle::current(), socket, buf: Vec::with_capacity(4096), + last_buffer_log: Instant::now(), + bytes_processed: 0, + packets_received: 0, }), ); } @@ -48,6 +52,9 @@ struct SrtReader { pub handle: Handle, pub socket: SrtSocket, pub buf: Vec, + last_buffer_log: Instant, + bytes_processed: u64, + packets_received: u64, } impl SrtReader { @@ -60,6 +67,28 @@ impl SrtReader { self.buf.drain(..bytes_to_drop); } self.buf.extend(data); + + // Update performance counters + self.bytes_processed += data.len() as u64; + self.packets_received += 1; + + // Log buffer status every 5 seconds + if self.last_buffer_log.elapsed().as_secs() >= 5 { + let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0; + let elapsed = self.last_buffer_log.elapsed(); + let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0); + let pps = self.packets_received as f64 / elapsed.as_secs_f64(); + + info!( + "SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)", + mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE + ); + + // Reset counters + self.last_buffer_log = Instant::now(); + self.bytes_processed = 0; + self.packets_received = 0; + } } } diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index 4cddca4..096fefe 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -265,7 +265,11 @@ impl HlsVariant { ); let duration = pkt_time - self.pkt_start; - info!("Writing segment {} [{}s]", &next_seg_url, duration); + let segment_path = PathBuf::from(&next_seg_url); + let segment_size = segment_path.metadata().map(|m| m.len()).unwrap_or(0); + info!("Writing segment {} [{:.3}s, {} bytes]", + segment_path.file_name().unwrap_or_default().to_string_lossy(), + duration, segment_size); if let Err(e) = self.push_segment(self.idx, duration) { warn!("Failed to update playlist: {}", e); }