diff --git a/crates/core/src/ingress/srt.rs b/crates/core/src/ingress/srt.rs index 2d6126a..c0e515b 100644 --- a/crates/core/src/ingress/srt.rs +++ b/crates/core/src/ingress/srt.rs @@ -57,41 +57,6 @@ struct SrtReader { packets_received: u64, } -impl SrtReader { - /// Add data to buffer with size limit to prevent unbounded growth - fn add_to_buffer(&mut self, data: &[u8]) { - if self.buf.len() + data.len() > MAX_SRT_BUFFER_SIZE { - let bytes_to_drop = (self.buf.len() + data.len()) - MAX_SRT_BUFFER_SIZE; - warn!("SRT buffer full ({} bytes), dropping {} oldest bytes", - self.buf.len(), bytes_to_drop); - self.buf.drain(..bytes_to_drop); - } - self.buf.extend(data); - - // Update performance counters - self.bytes_processed += data.len() as u64; - self.packets_received += 1; - - // Log buffer status every 5 seconds - if self.last_buffer_log.elapsed().as_secs() >= 5 { - let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0; - let elapsed = self.last_buffer_log.elapsed(); - let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0); - let pps = self.packets_received as f64 / elapsed.as_secs_f64(); - - info!( - "SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)", - mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE - ); - - // Reset counters - self.last_buffer_log = Instant::now(); - self.bytes_processed = 0; - self.packets_received = 0; - } - } -} - impl Read for SrtReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let (mut rx, _) = self.socket.split_mut(); @@ -100,7 +65,38 @@ impl Read for SrtReader { return Ok(0); } if let Some((_, data)) = self.handle.block_on(rx.next()) { - self.add_to_buffer(data.iter().as_slice()); + let data_slice = data.iter().as_slice(); + + // Inline buffer management to avoid borrow issues + if self.buf.len() + data_slice.len() > MAX_SRT_BUFFER_SIZE { + let bytes_to_drop = (self.buf.len() + data_slice.len()) - MAX_SRT_BUFFER_SIZE; + warn!("SRT buffer full ({} bytes), dropping {} oldest bytes", + self.buf.len(), bytes_to_drop); + self.buf.drain(..bytes_to_drop); + } + self.buf.extend(data_slice); + + // Update performance counters + self.bytes_processed += data_slice.len() as u64; + self.packets_received += 1; + + // Log buffer status every 5 seconds + if self.last_buffer_log.elapsed().as_secs() >= 5 { + let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0; + let elapsed = self.last_buffer_log.elapsed(); + let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0); + let pps = self.packets_received as f64 / elapsed.as_secs_f64(); + + info!( + "SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)", + mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE + ); + + // Reset counters + self.last_buffer_log = Instant::now(); + self.bytes_processed = 0; + self.packets_received = 0; + } } } let drain = self.buf.drain(..buf.len());