mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 01:28:08 +00:00
core: add extra logging
This commit is contained in:
@ -26,6 +26,9 @@ struct RtmpClient {
|
||||
msg_queue: VecDeque<ServerSessionResult>,
|
||||
reader_buf: [u8; 4096],
|
||||
pub published_stream: Option<RtmpPublishedStream>,
|
||||
last_buffer_log: Instant,
|
||||
bytes_processed: u64,
|
||||
frames_received: u64,
|
||||
}
|
||||
|
||||
impl RtmpClient {
|
||||
@ -33,11 +36,33 @@ impl RtmpClient {
|
||||
fn add_to_media_buffer(&mut self, data: &[u8]) {
|
||||
if self.media_buf.len() + data.len() > MAX_MEDIA_BUFFER_SIZE {
|
||||
let bytes_to_drop = (self.media_buf.len() + data.len()) - MAX_MEDIA_BUFFER_SIZE;
|
||||
warn!("Media buffer full ({} bytes), dropping {} oldest bytes",
|
||||
warn!("RTMP buffer full ({} bytes), dropping {} oldest bytes",
|
||||
self.media_buf.len(), bytes_to_drop);
|
||||
self.media_buf.drain(..bytes_to_drop);
|
||||
}
|
||||
self.media_buf.extend(data);
|
||||
|
||||
// Update performance counters
|
||||
self.bytes_processed += data.len() as u64;
|
||||
self.frames_received += 1;
|
||||
|
||||
// Log buffer status every 5 seconds
|
||||
if self.last_buffer_log.elapsed().as_secs() >= 5 {
|
||||
let buffer_util = (self.media_buf.len() as f32 / MAX_MEDIA_BUFFER_SIZE as f32) * 100.0;
|
||||
let elapsed = self.last_buffer_log.elapsed();
|
||||
let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
|
||||
let fps = self.frames_received as f64 / elapsed.as_secs_f64();
|
||||
|
||||
info!(
|
||||
"RTMP ingress: {:.1} Mbps, {:.1} frames/sec, buffer: {}% ({}/{} bytes)",
|
||||
mbps, fps, buffer_util as u32, self.media_buf.len(), MAX_MEDIA_BUFFER_SIZE
|
||||
);
|
||||
|
||||
// Reset counters
|
||||
self.last_buffer_log = Instant::now();
|
||||
self.bytes_processed = 0;
|
||||
self.frames_received = 0;
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(mut socket: TcpStream) -> Result<Self> {
|
||||
@ -75,6 +100,9 @@ impl RtmpClient {
|
||||
msg_queue: VecDeque::from(res),
|
||||
reader_buf: [0; 4096],
|
||||
published_stream: None,
|
||||
last_buffer_log: Instant::now(),
|
||||
bytes_processed: 0,
|
||||
frames_received: 0,
|
||||
};
|
||||
|
||||
return Ok(ret);
|
||||
|
@ -8,6 +8,7 @@ use srt_tokio::{SrtListener, SrtSocket};
|
||||
use std::io::Read;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
const MAX_SRT_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
|
||||
@ -38,6 +39,9 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
|
||||
handle: Handle::current(),
|
||||
socket,
|
||||
buf: Vec::with_capacity(4096),
|
||||
last_buffer_log: Instant::now(),
|
||||
bytes_processed: 0,
|
||||
packets_received: 0,
|
||||
}),
|
||||
);
|
||||
}
|
||||
@ -48,6 +52,9 @@ struct SrtReader {
|
||||
pub handle: Handle,
|
||||
pub socket: SrtSocket,
|
||||
pub buf: Vec<u8>,
|
||||
last_buffer_log: Instant,
|
||||
bytes_processed: u64,
|
||||
packets_received: u64,
|
||||
}
|
||||
|
||||
impl SrtReader {
|
||||
@ -60,6 +67,28 @@ impl SrtReader {
|
||||
self.buf.drain(..bytes_to_drop);
|
||||
}
|
||||
self.buf.extend(data);
|
||||
|
||||
// Update performance counters
|
||||
self.bytes_processed += data.len() as u64;
|
||||
self.packets_received += 1;
|
||||
|
||||
// Log buffer status every 5 seconds
|
||||
if self.last_buffer_log.elapsed().as_secs() >= 5 {
|
||||
let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0;
|
||||
let elapsed = self.last_buffer_log.elapsed();
|
||||
let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
|
||||
let pps = self.packets_received as f64 / elapsed.as_secs_f64();
|
||||
|
||||
info!(
|
||||
"SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)",
|
||||
mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE
|
||||
);
|
||||
|
||||
// Reset counters
|
||||
self.last_buffer_log = Instant::now();
|
||||
self.bytes_processed = 0;
|
||||
self.packets_received = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -265,7 +265,11 @@ impl HlsVariant {
|
||||
);
|
||||
|
||||
let duration = pkt_time - self.pkt_start;
|
||||
info!("Writing segment {} [{}s]", &next_seg_url, duration);
|
||||
let segment_path = PathBuf::from(&next_seg_url);
|
||||
let segment_size = segment_path.metadata().map(|m| m.len()).unwrap_or(0);
|
||||
info!("Writing segment {} [{:.3}s, {} bytes]",
|
||||
segment_path.file_name().unwrap_or_default().to_string_lossy(),
|
||||
duration, segment_size);
|
||||
if let Err(e) = self.push_segment(self.idx, duration) {
|
||||
warn!("Failed to update playlist: {}", e);
|
||||
}
|
||||
|
Reference in New Issue
Block a user