From f5fe85e5c5762d69ce1cbc00c35c9c030ed67c88 Mon Sep 17 00:00:00 2001 From: Kieran Date: Fri, 6 Jun 2025 15:39:45 +0100 Subject: [PATCH] core: add claude suggestions for performance --- crates/core/src/ingress/rtmp.rs | 21 +++++++++--- crates/core/src/ingress/srt.rs | 19 +++++++++-- crates/core/src/pipeline/runner.rs | 54 ++++++++++++++++++++++-------- 3 files changed, 74 insertions(+), 20 deletions(-) diff --git a/crates/core/src/ingress/rtmp.rs b/crates/core/src/ingress/rtmp.rs index a94cfc5..15e0491 100644 --- a/crates/core/src/ingress/rtmp.rs +++ b/crates/core/src/ingress/rtmp.rs @@ -1,7 +1,7 @@ use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; use anyhow::{bail, Result}; -use log::{error, info}; +use log::{error, info, warn}; use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType}; use rml_rtmp::sessions::{ ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult, @@ -14,6 +14,8 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime::Handle; use tokio::time::Instant; + +const MAX_MEDIA_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit #[derive(PartialEq, Eq, Clone, Hash)] struct RtmpPublishedStream(String, String); @@ -27,6 +29,17 @@ struct RtmpClient { } impl RtmpClient { + /// Add data to media buffer with size limit to prevent unbounded growth + fn add_to_media_buffer(&mut self, data: &[u8]) { + if self.media_buf.len() + data.len() > MAX_MEDIA_BUFFER_SIZE { + let bytes_to_drop = (self.media_buf.len() + data.len()) - MAX_MEDIA_BUFFER_SIZE; + warn!("Media buffer full ({} bytes), dropping {} oldest bytes", + self.media_buf.len(), bytes_to_drop); + self.media_buf.drain(..bytes_to_drop); + } + self.media_buf.extend(data); + } + async fn start(mut socket: TcpStream) -> Result { let mut hs = Handshake::new(PeerType::Server); @@ -117,7 +130,7 @@ impl RtmpClient { error!("Received unhandleable message with {} bytes", m.data.len()); // Only append data if it looks like valid media data if !m.data.is_empty() && m.data.len() > 4 { - self.media_buf.extend(&m.data); + self.add_to_media_buffer(&m.data); } } } @@ -170,7 +183,7 @@ impl RtmpClient { ServerSessionEvent::AudioDataReceived { data, .. } => { // Validate audio data before adding to buffer if !data.is_empty() { - self.media_buf.extend(data); + self.add_to_media_buffer(&data); } else { error!("Received empty audio data"); } @@ -178,7 +191,7 @@ impl RtmpClient { ServerSessionEvent::VideoDataReceived { data, .. } => { // Validate video data before adding to buffer if !data.is_empty() { - self.media_buf.extend(data); + self.add_to_media_buffer(&data); } else { error!("Received empty video data"); } diff --git a/crates/core/src/ingress/srt.rs b/crates/core/src/ingress/srt.rs index f24d7f6..08fbeb4 100644 --- a/crates/core/src/ingress/srt.rs +++ b/crates/core/src/ingress/srt.rs @@ -3,13 +3,15 @@ use crate::overseer::Overseer; use anyhow::Result; use futures_util::stream::FusedStream; use futures_util::StreamExt; -use log::info; +use log::{info, warn}; use srt_tokio::{SrtListener, SrtSocket}; use std::io::Read; use std::net::SocketAddr; use std::sync::Arc; use tokio::runtime::Handle; +const MAX_SRT_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit + pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { let binder: SocketAddr = addr.parse()?; let (_binding, mut packets) = SrtListener::builder().bind(binder).await?; @@ -48,6 +50,19 @@ struct SrtReader { pub buf: Vec, } +impl SrtReader { + /// Add data to buffer with size limit to prevent unbounded growth + fn add_to_buffer(&mut self, data: &[u8]) { + if self.buf.len() + data.len() > MAX_SRT_BUFFER_SIZE { + let bytes_to_drop = (self.buf.len() + data.len()) - MAX_SRT_BUFFER_SIZE; + warn!("SRT buffer full ({} bytes), dropping {} oldest bytes", + self.buf.len(), bytes_to_drop); + self.buf.drain(..bytes_to_drop); + } + self.buf.extend(data); + } +} + impl Read for SrtReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let (mut rx, _) = self.socket.split_mut(); @@ -56,7 +71,7 @@ impl Read for SrtReader { return Ok(0); } if let Some((_, data)) = self.handle.block_on(rx.next()) { - self.buf.extend(data.iter().as_slice()); + self.add_to_buffer(data.iter().as_slice()); } } let drain = self.buf.drain(..buf.len()); diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index a5ce9fc..bf94fbf 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -175,26 +175,31 @@ impl PipelineRunner { let dst_pic = PathBuf::from(&self.out_dir) .join(config.id.to_string()) .join("thumb.webp"); - let mut sw = Scaler::new(); - let mut frame = sw.process_frame( - frame, - (*frame).width as _, - (*frame).height as _, - AV_PIX_FMT_YUV420P, - )?; - Encoder::new(AV_CODEC_ID_WEBP)? - .with_height((*frame).height) - .with_width((*frame).width) - .with_pix_fmt(transmute((*frame).format)) - .open(None)? - .save_picture(frame, dst_pic.to_str().unwrap())?; + { + let mut sw = Scaler::new(); + let mut scaled_frame = sw.process_frame( + frame, + (*frame).width as _, + (*frame).height as _, + AV_PIX_FMT_YUV420P, + )?; + + let mut encoder = Encoder::new(AV_CODEC_ID_WEBP)? + .with_height((*scaled_frame).height) + .with_width((*scaled_frame).width) + .with_pix_fmt(transmute((*scaled_frame).format)) + .open(None)?; + + encoder.save_picture(scaled_frame, dst_pic.to_str().unwrap())?; + av_frame_free(&mut scaled_frame); + } + let thumb_duration = thumb_start.elapsed(); info!( "Saved thumb ({:.2}ms) to: {}", thumb_duration.as_millis() as f32 / 1000.0, dst_pic.display(), ); - av_frame_free(&mut frame); } self.frame_ctr += 1; @@ -420,3 +425,24 @@ impl PipelineRunner { Ok(()) } } + +impl Drop for PipelineRunner { + fn drop(&mut self) { + unsafe { + // First try to flush properly + if let Err(e) = self.flush() { + error!("Failed to flush pipeline during drop: {}", e); + } + + // Clear all collections to ensure proper Drop cleanup + // The FFmpeg objects should implement Drop properly in ffmpeg-rs-raw + self.encoders.clear(); + self.scalers.clear(); + self.resampler.clear(); + self.copy_stream.clear(); + self.egress.clear(); + + info!("PipelineRunner cleaned up resources for stream: {}", self.connection.key); + } + } +}