feat: clean shutdown RTMP stream
All checks were successful
continuous-integration/drone Build is passing

This commit is contained in:
2025-06-19 13:08:15 +01:00
parent 5c2a58ed46
commit 68fad98000
9 changed files with 84 additions and 35 deletions

View File

@ -264,8 +264,11 @@ impl FrameGenerator {
(*self.next_frame).data[0], (*self.next_frame).data[0],
(self.width as usize * self.height as usize * 4) as usize, (self.width as usize * self.height as usize * 4) as usize,
); );
for z in 0..(self.width as usize * self.height as usize) { for chunk in buf.chunks_exact_mut(4) {
buf[z * 4..z * 4 + 4].copy_from_slice(&color32); chunk[0] = color32[0];
chunk[1] = color32[1];
chunk[2] = color32[2];
chunk[3] = color32[3];
} }
Ok(()) Ok(())
} }

View File

@ -17,6 +17,7 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc<dyn Overseer>)
app_name: "".to_string(), app_name: "".to_string(),
key: "test".to_string(), key: "test".to_string(),
}; };
let url = path.to_str().unwrap().to_string();
let file = std::fs::File::open(path)?; let file = std::fs::File::open(path)?;
spawn_pipeline( spawn_pipeline(
Handle::current(), Handle::current(),
@ -24,6 +25,8 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc<dyn Overseer>)
out_dir.clone(), out_dir.clone(),
overseer.clone(), overseer.clone(),
Box::new(file), Box::new(file),
Some(url),
None,
); );
Ok(()) Ok(())

View File

@ -1,8 +1,9 @@
use crate::overseer::Overseer; use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner; use crate::pipeline::runner::{PipelineCommand, PipelineRunner};
use log::{error, info, warn}; use log::{error, info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::io::Read; use std::io::Read;
use std::sync::mpsc::Receiver;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::runtime::Handle; use tokio::runtime::Handle;
@ -40,8 +41,10 @@ pub fn spawn_pipeline(
out_dir: String, out_dir: String,
seer: Arc<dyn Overseer>, seer: Arc<dyn Overseer>,
reader: Box<dyn Read + Send>, reader: Box<dyn Read + Send>,
url: Option<String>,
rx: Option<Receiver<PipelineCommand>>,
) { ) {
match PipelineRunner::new(handle, out_dir, seer, info, reader, None) { match PipelineRunner::new(handle, out_dir, seer, info, reader, url, rx) {
Ok(pl) => match run_pipeline(pl) { Ok(pl) => match run_pipeline(pl) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {

View File

@ -1,6 +1,6 @@
use crate::ingress::{BufferedReader, ConnectionInfo}; use crate::ingress::{BufferedReader, ConnectionInfo};
use crate::overseer::Overseer; use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner; use crate::pipeline::runner::{PipelineCommand, PipelineRunner};
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use log::{error, info}; use log::{error, info};
@ -11,6 +11,7 @@ use rml_rtmp::sessions::{
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{ErrorKind, Read, Write}; use std::io::{ErrorKind, Read, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::mpsc::Sender;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -32,10 +33,11 @@ struct RtmpClient {
msg_queue: VecDeque<ServerSessionResult>, msg_queue: VecDeque<ServerSessionResult>,
pub published_stream: Option<RtmpPublishedStream>, pub published_stream: Option<RtmpPublishedStream>,
muxer: FlvMuxer, muxer: FlvMuxer,
tx: Sender<PipelineCommand>,
} }
impl RtmpClient { impl RtmpClient {
pub fn new(socket: TcpStream) -> Result<Self> { pub fn new(socket: TcpStream, tx: Sender<PipelineCommand>) -> Result<Self> {
socket.set_nonblocking(false)?; socket.set_nonblocking(false)?;
let cfg = ServerSessionConfig::new(); let cfg = ServerSessionConfig::new();
let (ses, res) = ServerSession::new(cfg)?; let (ses, res) = ServerSession::new(cfg)?;
@ -46,6 +48,7 @@ impl RtmpClient {
msg_queue: VecDeque::from(res), msg_queue: VecDeque::from(res),
published_stream: None, published_stream: None,
muxer: FlvMuxer::new(), muxer: FlvMuxer::new(),
tx,
}) })
} }
@ -201,8 +204,12 @@ impl RtmpClient {
self.published_stream = Some(RtmpPublishedStream(app_name, stream_key)); self.published_stream = Some(RtmpPublishedStream(app_name, stream_key));
} }
} }
ServerSessionEvent::PublishStreamFinished { .. } => { ServerSessionEvent::PublishStreamFinished {
// TODO: shutdown pipeline app_name,
stream_key,
} => {
self.tx.send(PipelineCommand::Shutdown)?;
info!("Stream ending: {app_name}/{stream_key}");
} }
ServerSessionEvent::StreamMetadataChanged { ServerSessionEvent::StreamMetadataChanged {
app_name, app_name,
@ -271,7 +278,6 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
info!("RTMP listening on: {}", &addr); info!("RTMP listening on: {}", &addr);
while let Ok((socket, ip)) = listener.accept().await { while let Ok((socket, ip)) = listener.accept().await {
let mut cc = RtmpClient::new(socket.into_std()?)?;
let overseer = overseer.clone(); let overseer = overseer.clone();
let out_dir = out_dir.clone(); let out_dir = out_dir.clone();
let handle = Handle::current(); let handle = Handle::current();
@ -279,6 +285,8 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
std::thread::Builder::new() std::thread::Builder::new()
.name(format!("client:rtmp:{}", new_id)) .name(format!("client:rtmp:{}", new_id))
.spawn(move || { .spawn(move || {
let (tx, rx) = std::sync::mpsc::channel();
let mut cc = RtmpClient::new(socket.into_std()?, tx)?;
if let Err(e) = cc.handshake() { if let Err(e) = cc.handshake() {
bail!("Error during handshake: {}", e) bail!("Error during handshake: {}", e)
} }
@ -301,6 +309,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
info, info,
Box::new(cc), Box::new(cc),
None, None,
Some(rx),
) { ) {
Ok(pl) => pl, Ok(pl) => pl,
Err(e) => { Err(e) => {

View File

@ -1,5 +1,6 @@
use crate::ingress::{spawn_pipeline, BufferedReader, ConnectionInfo}; use crate::ingress::{spawn_pipeline, BufferedReader, ConnectionInfo};
use crate::overseer::Overseer; use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineCommand;
use anyhow::Result; use anyhow::Result;
use futures_util::stream::FusedStream; use futures_util::stream::FusedStream;
use futures_util::StreamExt; use futures_util::StreamExt;
@ -7,6 +8,7 @@ use log::info;
use srt_tokio::{SrtListener, SrtSocket}; use srt_tokio::{SrtListener, SrtSocket};
use std::io::Read; use std::io::Read;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Handle; use tokio::runtime::Handle;
use uuid::Uuid; use uuid::Uuid;
@ -31,6 +33,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
.as_ref() .as_ref()
.map_or(String::new(), |s| s.to_string()), .map_or(String::new(), |s| s.to_string()),
}; };
let (tx, rx) = channel();
spawn_pipeline( spawn_pipeline(
Handle::current(), Handle::current(),
info, info,
@ -40,7 +43,10 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
handle: Handle::current(), handle: Handle::current(),
socket, socket,
buffer: BufferedReader::new(4096, MAX_SRT_BUFFER_SIZE, "SRT"), buffer: BufferedReader::new(4096, MAX_SRT_BUFFER_SIZE, "SRT"),
tx,
}), }),
None,
Some(rx),
); );
} }
Ok(()) Ok(())
@ -50,6 +56,7 @@ struct SrtReader {
pub handle: Handle, pub handle: Handle,
pub socket: SrtSocket, pub socket: SrtSocket,
pub buffer: BufferedReader, pub buffer: BufferedReader,
pub tx: Sender<PipelineCommand>, // TODO: implement clean shutdown
} }
impl Read for SrtReader { impl Read for SrtReader {

View File

@ -27,6 +27,8 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
out_dir.clone(), out_dir.clone(),
overseer.clone(), overseer.clone(),
Box::new(socket), Box::new(socket),
None,
None,
); );
} }
Ok(()) Ok(())

View File

@ -13,7 +13,6 @@ use ringbuf::traits::{Observer, Split};
use ringbuf::{HeapCons, HeapRb}; use ringbuf::{HeapCons, HeapRb};
use std::io::Read; use std::io::Read;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tiny_skia::Pixmap; use tiny_skia::Pixmap;
use tokio::runtime::Handle; use tokio::runtime::Handle;
use uuid::Uuid; use uuid::Uuid;
@ -21,10 +20,6 @@ use uuid::Uuid;
pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()> { pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()> {
info!("Test pattern enabled"); info!("Test pattern enabled");
// add a delay, there is a race condition somewhere, the test pattern doesnt always
// get added to active_streams
tokio::time::sleep(Duration::from_secs(1)).await;
let info = ConnectionInfo { let info = ConnectionInfo {
id: Uuid::new_v4(), id: Uuid::new_v4(),
endpoint: "test-pattern", endpoint: "test-pattern",
@ -36,9 +31,11 @@ pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()>
spawn_pipeline( spawn_pipeline(
Handle::current(), Handle::current(),
info, info,
out_dir.clone(), out_dir,
overseer.clone(), overseer,
Box::new(src), Box::new(src),
None,
None,
); );
Ok(()) Ok(())
} }

View File

@ -4,6 +4,7 @@ use std::mem::transmute;
use std::ops::Sub; use std::ops::Sub;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::ptr; use std::ptr;
use std::sync::mpsc::Receiver;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -46,6 +47,8 @@ pub enum RunnerState {
start_time: Instant, start_time: Instant,
gen: FrameGenerator, gen: FrameGenerator,
}, },
/// Pipeline should shut down and do any cleanup
Shutdown,
} }
impl RunnerState { impl RunnerState {
@ -58,11 +61,17 @@ impl RunnerState {
pub fn idle_duration(&self) -> Option<Duration> { pub fn idle_duration(&self) -> Option<Duration> {
match self { match self {
RunnerState::Idle { start_time, .. } => Some(start_time.elapsed()), RunnerState::Idle { start_time, .. } => Some(start_time.elapsed()),
RunnerState::Normal => None, _ => None,
} }
} }
} }
#[derive(Debug, Clone)]
pub enum PipelineCommand {
/// External process requested clean shutdown
Shutdown,
}
/// Pipeline runner is the main entry process for stream transcoding /// Pipeline runner is the main entry process for stream transcoding
/// ///
/// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread /// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread
@ -127,6 +136,9 @@ pub struct PipelineRunner {
/// Last audio PTS for continuity in idle mode /// Last audio PTS for continuity in idle mode
last_audio_pts: i64, last_audio_pts: i64,
/// Command receiver for external process control
cmd_channel: Option<Receiver<PipelineCommand>>,
} }
unsafe impl Send for PipelineRunner {} unsafe impl Send for PipelineRunner {}
@ -139,6 +151,7 @@ impl PipelineRunner {
connection: ConnectionInfo, connection: ConnectionInfo,
recv: Box<dyn Read + Send>, recv: Box<dyn Read + Send>,
url: Option<String>, url: Option<String>,
command: Option<Receiver<PipelineCommand>>,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
handle, handle,
@ -162,6 +175,7 @@ impl PipelineRunner {
max_consecutive_failures: DEFAULT_MAX_CONSECUTIVE_FAILURES, max_consecutive_failures: DEFAULT_MAX_CONSECUTIVE_FAILURES,
last_video_pts: 0, last_video_pts: 0,
last_audio_pts: 0, last_audio_pts: 0,
cmd_channel: command,
}) })
} }
@ -530,6 +544,13 @@ impl PipelineRunner {
/// EOF, cleanup /// EOF, cleanup
unsafe fn flush(&mut self) -> Result<()> { unsafe fn flush(&mut self) -> Result<()> {
if self.config.is_some() {
self.handle.block_on(async {
if let Err(e) = self.overseer.on_end(&self.connection.id).await {
error!("Failed to end stream: {e}");
}
});
}
for (var, enc) in &mut self.encoders { for (var, enc) in &mut self.encoders {
for mut pkt in enc.encode_frame(ptr::null_mut())? { for mut pkt in enc.encode_frame(ptr::null_mut())? {
for eg in self.egress.iter_mut() { for eg in self.egress.iter_mut() {
@ -541,14 +562,6 @@ impl PipelineRunner {
for eg in self.egress.iter_mut() { for eg in self.egress.iter_mut() {
eg.reset()?; eg.reset()?;
} }
if self.config.is_some() {
self.handle.block_on(async {
if let Err(e) = self.overseer.on_end(&self.connection.id).await {
error!("Failed to end stream: {e}");
}
});
}
Ok(()) Ok(())
} }
@ -558,16 +571,12 @@ impl PipelineRunner {
match self.once() { match self.once() {
Ok(c) => { Ok(c) => {
if !c { if !c {
if let Err(e) = self.flush() { // let drop handle flush
error!("Pipeline flush failed: {}", e);
}
break; break;
} }
} }
Err(e) => { Err(e) => {
if let Err(e) = self.flush() { // let drop handle flush
error!("Pipeline flush failed: {}", e);
}
error!("Pipeline run failed: {}", e); error!("Pipeline run failed: {}", e);
break; break;
} }
@ -576,7 +585,25 @@ impl PipelineRunner {
} }
} }
fn handle_command(&mut self) -> Result<Option<bool>> {
if let Some(cmd) = &self.cmd_channel {
while let Ok(c) = cmd.try_recv() {
match c {
PipelineCommand::Shutdown => {
self.state = RunnerState::Shutdown;
return Ok(Some(true));
}
_ => warn!("Unexpected command: {:?}", c),
}
}
}
Ok(None)
}
unsafe fn once(&mut self) -> Result<bool> { unsafe fn once(&mut self) -> Result<bool> {
if let Some(r) = self.handle_command()? {
return Ok(r);
}
self.setup()?; self.setup()?;
let config = if let Some(config) = &self.config { let config = if let Some(config) = &self.config {
@ -589,6 +616,7 @@ impl PipelineRunner {
let results = match &mut self.state { let results = match &mut self.state {
RunnerState::Normal => self.process_normal_mode(&config)?, RunnerState::Normal => self.process_normal_mode(&config)?,
RunnerState::Idle { .. } => self.process_idle_mode(&config)?, RunnerState::Idle { .. } => self.process_idle_mode(&config)?,
_ => return Ok(false), // skip once, nothing to do
}; };
// egress results - process async operations without blocking if possible // egress results - process async operations without blocking if possible
@ -741,7 +769,7 @@ impl Drop for PipelineRunner {
info!( info!(
"PipelineRunner cleaned up resources for stream: {}", "PipelineRunner cleaned up resources for stream: {}",
self.connection.key self.connection.id
); );
} }
} }

View File

@ -3,9 +3,6 @@
# All the endpoints must be valid URI's # All the endpoints must be valid URI's
endpoints: endpoints:
- "rtmp://127.0.0.1:3336" - "rtmp://127.0.0.1:3336"
- "srt://127.0.0.1:3335"
- "tcp://127.0.0.1:3334"
- "test-pattern://"
# Public hostname which points to the IP address used to listen for all [endpoints] # Public hostname which points to the IP address used to listen for all [endpoints]
endpoints_public_hostname: "localhost" endpoints_public_hostname: "localhost"