mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-15 17:23:00 +00:00
core: add claude suggestions for performance
This commit is contained in:
@ -1,7 +1,7 @@
|
||||
use crate::ingress::{spawn_pipeline, ConnectionInfo};
|
||||
use crate::overseer::Overseer;
|
||||
use anyhow::{bail, Result};
|
||||
use log::{error, info};
|
||||
use log::{error, info, warn};
|
||||
use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};
|
||||
use rml_rtmp::sessions::{
|
||||
ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult,
|
||||
@ -14,6 +14,8 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::time::Instant;
|
||||
|
||||
const MAX_MEDIA_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
|
||||
#[derive(PartialEq, Eq, Clone, Hash)]
|
||||
struct RtmpPublishedStream(String, String);
|
||||
|
||||
@ -27,6 +29,17 @@ struct RtmpClient {
|
||||
}
|
||||
|
||||
impl RtmpClient {
|
||||
/// Add data to media buffer with size limit to prevent unbounded growth
|
||||
fn add_to_media_buffer(&mut self, data: &[u8]) {
|
||||
if self.media_buf.len() + data.len() > MAX_MEDIA_BUFFER_SIZE {
|
||||
let bytes_to_drop = (self.media_buf.len() + data.len()) - MAX_MEDIA_BUFFER_SIZE;
|
||||
warn!("Media buffer full ({} bytes), dropping {} oldest bytes",
|
||||
self.media_buf.len(), bytes_to_drop);
|
||||
self.media_buf.drain(..bytes_to_drop);
|
||||
}
|
||||
self.media_buf.extend(data);
|
||||
}
|
||||
|
||||
async fn start(mut socket: TcpStream) -> Result<Self> {
|
||||
let mut hs = Handshake::new(PeerType::Server);
|
||||
|
||||
@ -117,7 +130,7 @@ impl RtmpClient {
|
||||
error!("Received unhandleable message with {} bytes", m.data.len());
|
||||
// Only append data if it looks like valid media data
|
||||
if !m.data.is_empty() && m.data.len() > 4 {
|
||||
self.media_buf.extend(&m.data);
|
||||
self.add_to_media_buffer(&m.data);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -170,7 +183,7 @@ impl RtmpClient {
|
||||
ServerSessionEvent::AudioDataReceived { data, .. } => {
|
||||
// Validate audio data before adding to buffer
|
||||
if !data.is_empty() {
|
||||
self.media_buf.extend(data);
|
||||
self.add_to_media_buffer(&data);
|
||||
} else {
|
||||
error!("Received empty audio data");
|
||||
}
|
||||
@ -178,7 +191,7 @@ impl RtmpClient {
|
||||
ServerSessionEvent::VideoDataReceived { data, .. } => {
|
||||
// Validate video data before adding to buffer
|
||||
if !data.is_empty() {
|
||||
self.media_buf.extend(data);
|
||||
self.add_to_media_buffer(&data);
|
||||
} else {
|
||||
error!("Received empty video data");
|
||||
}
|
||||
|
@ -3,13 +3,15 @@ use crate::overseer::Overseer;
|
||||
use anyhow::Result;
|
||||
use futures_util::stream::FusedStream;
|
||||
use futures_util::StreamExt;
|
||||
use log::info;
|
||||
use log::{info, warn};
|
||||
use srt_tokio::{SrtListener, SrtSocket};
|
||||
use std::io::Read;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
const MAX_SRT_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
|
||||
|
||||
pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>) -> Result<()> {
|
||||
let binder: SocketAddr = addr.parse()?;
|
||||
let (_binding, mut packets) = SrtListener::builder().bind(binder).await?;
|
||||
@ -48,6 +50,19 @@ struct SrtReader {
|
||||
pub buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl SrtReader {
|
||||
/// Add data to buffer with size limit to prevent unbounded growth
|
||||
fn add_to_buffer(&mut self, data: &[u8]) {
|
||||
if self.buf.len() + data.len() > MAX_SRT_BUFFER_SIZE {
|
||||
let bytes_to_drop = (self.buf.len() + data.len()) - MAX_SRT_BUFFER_SIZE;
|
||||
warn!("SRT buffer full ({} bytes), dropping {} oldest bytes",
|
||||
self.buf.len(), bytes_to_drop);
|
||||
self.buf.drain(..bytes_to_drop);
|
||||
}
|
||||
self.buf.extend(data);
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for SrtReader {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let (mut rx, _) = self.socket.split_mut();
|
||||
@ -56,7 +71,7 @@ impl Read for SrtReader {
|
||||
return Ok(0);
|
||||
}
|
||||
if let Some((_, data)) = self.handle.block_on(rx.next()) {
|
||||
self.buf.extend(data.iter().as_slice());
|
||||
self.add_to_buffer(data.iter().as_slice());
|
||||
}
|
||||
}
|
||||
let drain = self.buf.drain(..buf.len());
|
||||
|
@ -175,26 +175,31 @@ impl PipelineRunner {
|
||||
let dst_pic = PathBuf::from(&self.out_dir)
|
||||
.join(config.id.to_string())
|
||||
.join("thumb.webp");
|
||||
let mut sw = Scaler::new();
|
||||
let mut frame = sw.process_frame(
|
||||
frame,
|
||||
(*frame).width as _,
|
||||
(*frame).height as _,
|
||||
AV_PIX_FMT_YUV420P,
|
||||
)?;
|
||||
Encoder::new(AV_CODEC_ID_WEBP)?
|
||||
.with_height((*frame).height)
|
||||
.with_width((*frame).width)
|
||||
.with_pix_fmt(transmute((*frame).format))
|
||||
.open(None)?
|
||||
.save_picture(frame, dst_pic.to_str().unwrap())?;
|
||||
{
|
||||
let mut sw = Scaler::new();
|
||||
let mut scaled_frame = sw.process_frame(
|
||||
frame,
|
||||
(*frame).width as _,
|
||||
(*frame).height as _,
|
||||
AV_PIX_FMT_YUV420P,
|
||||
)?;
|
||||
|
||||
let mut encoder = Encoder::new(AV_CODEC_ID_WEBP)?
|
||||
.with_height((*scaled_frame).height)
|
||||
.with_width((*scaled_frame).width)
|
||||
.with_pix_fmt(transmute((*scaled_frame).format))
|
||||
.open(None)?;
|
||||
|
||||
encoder.save_picture(scaled_frame, dst_pic.to_str().unwrap())?;
|
||||
av_frame_free(&mut scaled_frame);
|
||||
}
|
||||
|
||||
let thumb_duration = thumb_start.elapsed();
|
||||
info!(
|
||||
"Saved thumb ({:.2}ms) to: {}",
|
||||
thumb_duration.as_millis() as f32 / 1000.0,
|
||||
dst_pic.display(),
|
||||
);
|
||||
av_frame_free(&mut frame);
|
||||
}
|
||||
|
||||
self.frame_ctr += 1;
|
||||
@ -420,3 +425,24 @@ impl PipelineRunner {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PipelineRunner {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
// First try to flush properly
|
||||
if let Err(e) = self.flush() {
|
||||
error!("Failed to flush pipeline during drop: {}", e);
|
||||
}
|
||||
|
||||
// Clear all collections to ensure proper Drop cleanup
|
||||
// The FFmpeg objects should implement Drop properly in ffmpeg-rs-raw
|
||||
self.encoders.clear();
|
||||
self.scalers.clear();
|
||||
self.resampler.clear();
|
||||
self.copy_stream.clear();
|
||||
self.egress.clear();
|
||||
|
||||
info!("PipelineRunner cleaned up resources for stream: {}", self.connection.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user