mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-15 17:23:00 +00:00
fix: borrow issue
This commit is contained in:
@ -57,41 +57,6 @@ struct SrtReader {
|
|||||||
packets_received: u64,
|
packets_received: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SrtReader {
|
|
||||||
/// Add data to buffer with size limit to prevent unbounded growth
|
|
||||||
fn add_to_buffer(&mut self, data: &[u8]) {
|
|
||||||
if self.buf.len() + data.len() > MAX_SRT_BUFFER_SIZE {
|
|
||||||
let bytes_to_drop = (self.buf.len() + data.len()) - MAX_SRT_BUFFER_SIZE;
|
|
||||||
warn!("SRT buffer full ({} bytes), dropping {} oldest bytes",
|
|
||||||
self.buf.len(), bytes_to_drop);
|
|
||||||
self.buf.drain(..bytes_to_drop);
|
|
||||||
}
|
|
||||||
self.buf.extend(data);
|
|
||||||
|
|
||||||
// Update performance counters
|
|
||||||
self.bytes_processed += data.len() as u64;
|
|
||||||
self.packets_received += 1;
|
|
||||||
|
|
||||||
// Log buffer status every 5 seconds
|
|
||||||
if self.last_buffer_log.elapsed().as_secs() >= 5 {
|
|
||||||
let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0;
|
|
||||||
let elapsed = self.last_buffer_log.elapsed();
|
|
||||||
let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
|
|
||||||
let pps = self.packets_received as f64 / elapsed.as_secs_f64();
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)",
|
|
||||||
mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE
|
|
||||||
);
|
|
||||||
|
|
||||||
// Reset counters
|
|
||||||
self.last_buffer_log = Instant::now();
|
|
||||||
self.bytes_processed = 0;
|
|
||||||
self.packets_received = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Read for SrtReader {
|
impl Read for SrtReader {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
let (mut rx, _) = self.socket.split_mut();
|
let (mut rx, _) = self.socket.split_mut();
|
||||||
@ -100,7 +65,38 @@ impl Read for SrtReader {
|
|||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
if let Some((_, data)) = self.handle.block_on(rx.next()) {
|
if let Some((_, data)) = self.handle.block_on(rx.next()) {
|
||||||
self.add_to_buffer(data.iter().as_slice());
|
let data_slice = data.iter().as_slice();
|
||||||
|
|
||||||
|
// Inline buffer management to avoid borrow issues
|
||||||
|
if self.buf.len() + data_slice.len() > MAX_SRT_BUFFER_SIZE {
|
||||||
|
let bytes_to_drop = (self.buf.len() + data_slice.len()) - MAX_SRT_BUFFER_SIZE;
|
||||||
|
warn!("SRT buffer full ({} bytes), dropping {} oldest bytes",
|
||||||
|
self.buf.len(), bytes_to_drop);
|
||||||
|
self.buf.drain(..bytes_to_drop);
|
||||||
|
}
|
||||||
|
self.buf.extend(data_slice);
|
||||||
|
|
||||||
|
// Update performance counters
|
||||||
|
self.bytes_processed += data_slice.len() as u64;
|
||||||
|
self.packets_received += 1;
|
||||||
|
|
||||||
|
// Log buffer status every 5 seconds
|
||||||
|
if self.last_buffer_log.elapsed().as_secs() >= 5 {
|
||||||
|
let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0;
|
||||||
|
let elapsed = self.last_buffer_log.elapsed();
|
||||||
|
let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
|
||||||
|
let pps = self.packets_received as f64 / elapsed.as_secs_f64();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)",
|
||||||
|
mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE
|
||||||
|
);
|
||||||
|
|
||||||
|
// Reset counters
|
||||||
|
self.last_buffer_log = Instant::now();
|
||||||
|
self.bytes_processed = 0;
|
||||||
|
self.packets_received = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let drain = self.buf.drain(..buf.len());
|
let drain = self.buf.drain(..buf.len());
|
||||||
|
Reference in New Issue
Block a user