From 68fad9800069375489b64450da601abaaf8592d0 Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 19 Jun 2025 13:08:15 +0100 Subject: [PATCH] feat: clean shutdown RTMP stream --- crates/core/src/generator.rs | 7 +++- crates/core/src/ingress/file.rs | 3 ++ crates/core/src/ingress/mod.rs | 7 +++- crates/core/src/ingress/rtmp.rs | 19 +++++++--- crates/core/src/ingress/srt.rs | 7 ++++ crates/core/src/ingress/tcp.rs | 2 + crates/core/src/ingress/test.rs | 11 ++---- crates/core/src/pipeline/runner.rs | 60 ++++++++++++++++++++++-------- crates/zap-stream/config.yaml | 3 -- 9 files changed, 84 insertions(+), 35 deletions(-) diff --git a/crates/core/src/generator.rs b/crates/core/src/generator.rs index 3b27f2e..d67851c 100644 --- a/crates/core/src/generator.rs +++ b/crates/core/src/generator.rs @@ -264,8 +264,11 @@ impl FrameGenerator { (*self.next_frame).data[0], (self.width as usize * self.height as usize * 4) as usize, ); - for z in 0..(self.width as usize * self.height as usize) { - buf[z * 4..z * 4 + 4].copy_from_slice(&color32); + for chunk in buf.chunks_exact_mut(4) { + chunk[0] = color32[0]; + chunk[1] = color32[1]; + chunk[2] = color32[2]; + chunk[3] = color32[3]; } Ok(()) } diff --git a/crates/core/src/ingress/file.rs b/crates/core/src/ingress/file.rs index c68a837..1a69811 100644 --- a/crates/core/src/ingress/file.rs +++ b/crates/core/src/ingress/file.rs @@ -17,6 +17,7 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) app_name: "".to_string(), key: "test".to_string(), }; + let url = path.to_str().unwrap().to_string(); let file = std::fs::File::open(path)?; spawn_pipeline( Handle::current(), @@ -24,6 +25,8 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) out_dir.clone(), overseer.clone(), Box::new(file), + Some(url), + None, ); Ok(()) diff --git a/crates/core/src/ingress/mod.rs b/crates/core/src/ingress/mod.rs index 8ddf0ab..6d4216d 100644 --- a/crates/core/src/ingress/mod.rs +++ b/crates/core/src/ingress/mod.rs @@ -1,8 +1,9 @@ use crate::overseer::Overseer; -use crate::pipeline::runner::PipelineRunner; +use crate::pipeline::runner::{PipelineCommand, PipelineRunner}; use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use std::io::Read; +use std::sync::mpsc::Receiver; use std::sync::Arc; use std::time::Instant; use tokio::runtime::Handle; @@ -40,8 +41,10 @@ pub fn spawn_pipeline( out_dir: String, seer: Arc, reader: Box, + url: Option, + rx: Option>, ) { - match PipelineRunner::new(handle, out_dir, seer, info, reader, None) { + match PipelineRunner::new(handle, out_dir, seer, info, reader, url, rx) { Ok(pl) => match run_pipeline(pl) { Ok(_) => {} Err(e) => { diff --git a/crates/core/src/ingress/rtmp.rs b/crates/core/src/ingress/rtmp.rs index ac6be5f..4aa26bd 100644 --- a/crates/core/src/ingress/rtmp.rs +++ b/crates/core/src/ingress/rtmp.rs @@ -1,6 +1,6 @@ use crate::ingress::{BufferedReader, ConnectionInfo}; use crate::overseer::Overseer; -use crate::pipeline::runner::PipelineRunner; +use crate::pipeline::runner::{PipelineCommand, PipelineRunner}; use anyhow::{anyhow, bail, Result}; use bytes::{Bytes, BytesMut}; use log::{error, info}; @@ -11,6 +11,7 @@ use rml_rtmp::sessions::{ use std::collections::VecDeque; use std::io::{ErrorKind, Read, Write}; use std::net::TcpStream; +use std::sync::mpsc::Sender; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; @@ -32,10 +33,11 @@ struct RtmpClient { msg_queue: VecDeque, pub published_stream: Option, muxer: FlvMuxer, + tx: Sender, } impl RtmpClient { - pub fn new(socket: TcpStream) -> Result { + pub fn new(socket: TcpStream, tx: Sender) -> Result { socket.set_nonblocking(false)?; let cfg = ServerSessionConfig::new(); let (ses, res) = ServerSession::new(cfg)?; @@ -46,6 +48,7 @@ impl RtmpClient { msg_queue: VecDeque::from(res), published_stream: None, muxer: FlvMuxer::new(), + tx, }) } @@ -201,8 +204,12 @@ impl RtmpClient { self.published_stream = Some(RtmpPublishedStream(app_name, stream_key)); } } - ServerSessionEvent::PublishStreamFinished { .. } => { - // TODO: shutdown pipeline + ServerSessionEvent::PublishStreamFinished { + app_name, + stream_key, + } => { + self.tx.send(PipelineCommand::Shutdown)?; + info!("Stream ending: {app_name}/{stream_key}"); } ServerSessionEvent::StreamMetadataChanged { app_name, @@ -271,7 +278,6 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) info!("RTMP listening on: {}", &addr); while let Ok((socket, ip)) = listener.accept().await { - let mut cc = RtmpClient::new(socket.into_std()?)?; let overseer = overseer.clone(); let out_dir = out_dir.clone(); let handle = Handle::current(); @@ -279,6 +285,8 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) std::thread::Builder::new() .name(format!("client:rtmp:{}", new_id)) .spawn(move || { + let (tx, rx) = std::sync::mpsc::channel(); + let mut cc = RtmpClient::new(socket.into_std()?, tx)?; if let Err(e) = cc.handshake() { bail!("Error during handshake: {}", e) } @@ -301,6 +309,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) info, Box::new(cc), None, + Some(rx), ) { Ok(pl) => pl, Err(e) => { diff --git a/crates/core/src/ingress/srt.rs b/crates/core/src/ingress/srt.rs index 1b590eb..10cc4ee 100644 --- a/crates/core/src/ingress/srt.rs +++ b/crates/core/src/ingress/srt.rs @@ -1,5 +1,6 @@ use crate::ingress::{spawn_pipeline, BufferedReader, ConnectionInfo}; use crate::overseer::Overseer; +use crate::pipeline::runner::PipelineCommand; use anyhow::Result; use futures_util::stream::FusedStream; use futures_util::StreamExt; @@ -7,6 +8,7 @@ use log::info; use srt_tokio::{SrtListener, SrtSocket}; use std::io::Read; use std::net::SocketAddr; +use std::sync::mpsc::{channel, Sender}; use std::sync::Arc; use tokio::runtime::Handle; use uuid::Uuid; @@ -31,6 +33,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) .as_ref() .map_or(String::new(), |s| s.to_string()), }; + let (tx, rx) = channel(); spawn_pipeline( Handle::current(), info, @@ -40,7 +43,10 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) handle: Handle::current(), socket, buffer: BufferedReader::new(4096, MAX_SRT_BUFFER_SIZE, "SRT"), + tx, }), + None, + Some(rx), ); } Ok(()) @@ -50,6 +56,7 @@ struct SrtReader { pub handle: Handle, pub socket: SrtSocket, pub buffer: BufferedReader, + pub tx: Sender, // TODO: implement clean shutdown } impl Read for SrtReader { diff --git a/crates/core/src/ingress/tcp.rs b/crates/core/src/ingress/tcp.rs index 982a49b..643969a 100644 --- a/crates/core/src/ingress/tcp.rs +++ b/crates/core/src/ingress/tcp.rs @@ -27,6 +27,8 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) out_dir.clone(), overseer.clone(), Box::new(socket), + None, + None, ); } Ok(()) diff --git a/crates/core/src/ingress/test.rs b/crates/core/src/ingress/test.rs index 249c045..bd727ba 100644 --- a/crates/core/src/ingress/test.rs +++ b/crates/core/src/ingress/test.rs @@ -13,7 +13,6 @@ use ringbuf::traits::{Observer, Split}; use ringbuf::{HeapCons, HeapRb}; use std::io::Read; use std::sync::Arc; -use std::time::Duration; use tiny_skia::Pixmap; use tokio::runtime::Handle; use uuid::Uuid; @@ -21,10 +20,6 @@ use uuid::Uuid; pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> { info!("Test pattern enabled"); - // add a delay, there is a race condition somewhere, the test pattern doesnt always - // get added to active_streams - tokio::time::sleep(Duration::from_secs(1)).await; - let info = ConnectionInfo { id: Uuid::new_v4(), endpoint: "test-pattern", @@ -36,9 +31,11 @@ pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> spawn_pipeline( Handle::current(), info, - out_dir.clone(), - overseer.clone(), + out_dir, + overseer, Box::new(src), + None, + None, ); Ok(()) } diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 896c0c4..6587140 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -4,6 +4,7 @@ use std::mem::transmute; use std::ops::Sub; use std::path::{Path, PathBuf}; use std::ptr; +use std::sync::mpsc::Receiver; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -46,6 +47,8 @@ pub enum RunnerState { start_time: Instant, gen: FrameGenerator, }, + /// Pipeline should shut down and do any cleanup + Shutdown, } impl RunnerState { @@ -58,11 +61,17 @@ impl RunnerState { pub fn idle_duration(&self) -> Option { match self { RunnerState::Idle { start_time, .. } => Some(start_time.elapsed()), - RunnerState::Normal => None, + _ => None, } } } +#[derive(Debug, Clone)] +pub enum PipelineCommand { + /// External process requested clean shutdown + Shutdown, +} + /// Pipeline runner is the main entry process for stream transcoding /// /// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread @@ -127,6 +136,9 @@ pub struct PipelineRunner { /// Last audio PTS for continuity in idle mode last_audio_pts: i64, + + /// Command receiver for external process control + cmd_channel: Option>, } unsafe impl Send for PipelineRunner {} @@ -139,6 +151,7 @@ impl PipelineRunner { connection: ConnectionInfo, recv: Box, url: Option, + command: Option>, ) -> Result { Ok(Self { handle, @@ -162,6 +175,7 @@ impl PipelineRunner { max_consecutive_failures: DEFAULT_MAX_CONSECUTIVE_FAILURES, last_video_pts: 0, last_audio_pts: 0, + cmd_channel: command, }) } @@ -530,6 +544,13 @@ impl PipelineRunner { /// EOF, cleanup unsafe fn flush(&mut self) -> Result<()> { + if self.config.is_some() { + self.handle.block_on(async { + if let Err(e) = self.overseer.on_end(&self.connection.id).await { + error!("Failed to end stream: {e}"); + } + }); + } for (var, enc) in &mut self.encoders { for mut pkt in enc.encode_frame(ptr::null_mut())? { for eg in self.egress.iter_mut() { @@ -541,14 +562,6 @@ impl PipelineRunner { for eg in self.egress.iter_mut() { eg.reset()?; } - - if self.config.is_some() { - self.handle.block_on(async { - if let Err(e) = self.overseer.on_end(&self.connection.id).await { - error!("Failed to end stream: {e}"); - } - }); - } Ok(()) } @@ -558,16 +571,12 @@ impl PipelineRunner { match self.once() { Ok(c) => { if !c { - if let Err(e) = self.flush() { - error!("Pipeline flush failed: {}", e); - } + // let drop handle flush break; } } Err(e) => { - if let Err(e) = self.flush() { - error!("Pipeline flush failed: {}", e); - } + // let drop handle flush error!("Pipeline run failed: {}", e); break; } @@ -576,7 +585,25 @@ impl PipelineRunner { } } + fn handle_command(&mut self) -> Result> { + if let Some(cmd) = &self.cmd_channel { + while let Ok(c) = cmd.try_recv() { + match c { + PipelineCommand::Shutdown => { + self.state = RunnerState::Shutdown; + return Ok(Some(true)); + } + _ => warn!("Unexpected command: {:?}", c), + } + } + } + Ok(None) + } + unsafe fn once(&mut self) -> Result { + if let Some(r) = self.handle_command()? { + return Ok(r); + } self.setup()?; let config = if let Some(config) = &self.config { @@ -589,6 +616,7 @@ impl PipelineRunner { let results = match &mut self.state { RunnerState::Normal => self.process_normal_mode(&config)?, RunnerState::Idle { .. } => self.process_idle_mode(&config)?, + _ => return Ok(false), // skip once, nothing to do }; // egress results - process async operations without blocking if possible @@ -741,7 +769,7 @@ impl Drop for PipelineRunner { info!( "PipelineRunner cleaned up resources for stream: {}", - self.connection.key + self.connection.id ); } } diff --git a/crates/zap-stream/config.yaml b/crates/zap-stream/config.yaml index 7f6f8af..1f33130 100755 --- a/crates/zap-stream/config.yaml +++ b/crates/zap-stream/config.yaml @@ -3,9 +3,6 @@ # All the endpoints must be valid URI's endpoints: - "rtmp://127.0.0.1:3336" - - "srt://127.0.0.1:3335" - - "tcp://127.0.0.1:3334" - - "test-pattern://" # Public hostname which points to the IP address used to listen for all [endpoints] endpoints_public_hostname: "localhost"