refactor: cleanup rtmp setup

This commit is contained in:
2025-06-12 09:44:25 +01:00
parent 3a38b05630
commit ad20fbc052
16 changed files with 501 additions and 366 deletions

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
**/target
.idea/
out/
**/out/

2
Cargo.lock generated
View File

@ -1045,7 +1045,7 @@ dependencies = [
[[package]]
name = "ffmpeg-rs-raw"
version = "0.1.0"
source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=d79693ddb0bee2e94c1db07f789523e87bf1b0fc#d79693ddb0bee2e94c1db07f789523e87bf1b0fc"
source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=aa1ce3edcad0fcd286d39b3e0c2fdc610c3988e7#aa1ce3edcad0fcd286d39b3e0c2fdc610c3988e7"
dependencies = [
"anyhow",
"ffmpeg-sys-the-third",

View File

@ -13,7 +13,7 @@ codegen-units = 1
panic = "unwind"
[workspace.dependencies]
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "d79693ddb0bee2e94c1db07f789523e87bf1b0fc" }
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "aa1ce3edcad0fcd286d39b3e0c2fdc610c3988e7" }
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
anyhow = { version = "^1.0.91", features = ["backtrace"] }
async-trait = "0.1.77"

View File

@ -30,7 +30,7 @@ fontdue = "0.9.2"
ringbuf = "0.4.7"
# srt
srt-tokio = { version = "0.4.3", optional = true }
srt-tokio = { version = "0.4.4", optional = true }
# rtmp
rml_rtmp = { version = "0.8.0", optional = true }

View File

@ -5,11 +5,13 @@ use log::info;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::Handle;
use uuid::Uuid;
pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc<dyn Overseer>) -> Result<()> {
info!("Sending file: {}", path.display());
let info = ConnectionInfo {
id: Uuid::new_v4(),
ip_addr: "127.0.0.1:6969".to_string(),
endpoint: "file-input".to_owned(),
app_name: "".to_string(),

View File

@ -1,10 +1,12 @@
use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner;
use log::{error, info};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use std::io::Read;
use std::sync::Arc;
use std::time::Instant;
use tokio::runtime::Handle;
use uuid::Uuid;
pub mod file;
#[cfg(feature = "rtmp")]
@ -16,6 +18,9 @@ pub mod test;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ConnectionInfo {
/// Unique ID of this connection / pipeline
pub id: Uuid,
/// Endpoint of the ingress
pub endpoint: String,
@ -36,33 +41,103 @@ pub fn spawn_pipeline(
seer: Arc<dyn Overseer>,
reader: Box<dyn Read + Send>,
) {
info!("New client connected: {}", &info.ip_addr);
let seer = seer.clone();
let out_dir = out_dir.to_string();
std::thread::spawn(move || unsafe {
match PipelineRunner::new(handle, out_dir, seer, info, reader) {
Ok(mut pl) => loop {
match pl.run() {
Ok(c) => {
if !c {
if let Err(e) = pl.flush() {
error!("Pipeline flush failed: {}", e);
}
break;
}
}
match PipelineRunner::new(handle, out_dir, seer, info, reader, None) {
Ok(pl) => match run_pipeline(pl) {
Ok(_) => {}
Err(e) => {
if let Err(e) = pl.flush() {
error!("Pipeline flush failed: {}", e);
}
error!("Pipeline run failed: {}", e);
break;
}
error!("Failed to run PipelineRunner: {}", e);
}
},
Err(e) => {
error!("Failed to create PipelineRunner: {}", e);
}
}
});
}
pub fn run_pipeline(mut pl: PipelineRunner) -> anyhow::Result<()> {
info!("New client connected: {}", &pl.connection.ip_addr);
std::thread::Builder::new()
.name(format!("pipeline-{}", pl.connection.id))
.spawn(move || {
pl.run();
})?;
Ok(())
}
/// Common buffered reader functionality for ingress sources
pub struct BufferedReader {
pub buf: Vec<u8>,
pub max_buffer_size: usize,
pub last_buffer_log: Instant,
pub bytes_processed: u64,
pub packets_received: u64,
pub source_name: &'static str,
}
impl BufferedReader {
pub fn new(capacity: usize, max_size: usize, source_name: &'static str) -> Self {
Self {
buf: Vec::with_capacity(capacity),
max_buffer_size: max_size,
last_buffer_log: Instant::now(),
bytes_processed: 0,
packets_received: 0,
source_name,
}
}
/// Add data to buffer with size limit and performance tracking
pub fn add_data(&mut self, data: &[u8]) {
// Inline buffer management to avoid borrow issues
if self.buf.len() + data.len() > self.max_buffer_size {
let bytes_to_drop = (self.buf.len() + data.len()) - self.max_buffer_size;
warn!(
"{} buffer full ({} bytes), dropping {} oldest bytes",
self.source_name,
self.buf.len(),
bytes_to_drop
);
self.buf.drain(..bytes_to_drop);
}
self.buf.extend(data);
// Update performance counters
self.bytes_processed += data.len() as u64;
self.packets_received += 1;
// Log buffer status every 5 seconds
if self.last_buffer_log.elapsed().as_secs() >= 5 {
let buffer_util = (self.buf.len() as f32 / self.max_buffer_size as f32) * 100.0;
let elapsed = self.last_buffer_log.elapsed();
let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
let pps = self.packets_received as f64 / elapsed.as_secs_f64();
info!(
"{} ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)",
self.source_name,
mbps,
pps,
buffer_util as u32,
self.buf.len(),
self.max_buffer_size
);
// Reset counters
self.last_buffer_log = Instant::now();
self.bytes_processed = 0;
self.packets_received = 0;
}
}
/// Read data from buffer, filling the entire output buffer before returning
pub fn read_buffered(&mut self, buf: &mut [u8]) -> usize {
if self.buf.len() >= buf.len() {
let drain = self.buf.drain(..buf.len());
buf.copy_from_slice(drain.as_slice());
buf.len()
} else {
0
}
}
}

View File

@ -1,111 +1,77 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::ingress::{BufferedReader, ConnectionInfo};
use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner;
use anyhow::{bail, Result};
use log::{error, info, warn};
use log::{error, info};
use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};
use rml_rtmp::sessions::{
ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult,
};
use std::collections::VecDeque;
use std::io::{ErrorKind, Read, Write};
use std::net::TcpStream;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::TcpListener;
use tokio::runtime::Handle;
use tokio::time::Instant;
use uuid::Uuid;
const MAX_MEDIA_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
#[derive(PartialEq, Eq, Clone, Hash)]
struct RtmpPublishedStream(String, String);
struct RtmpClient {
socket: std::net::TcpStream,
media_buf: Vec<u8>,
socket: TcpStream,
buffer: BufferedReader,
session: ServerSession,
msg_queue: VecDeque<ServerSessionResult>,
reader_buf: [u8; 4096],
pub published_stream: Option<RtmpPublishedStream>,
last_buffer_log: Instant,
bytes_processed: u64,
frames_received: u64,
}
impl RtmpClient {
/// Add data to media buffer with size limit to prevent unbounded growth
fn add_to_media_buffer(&mut self, data: &[u8]) {
if self.media_buf.len() + data.len() > MAX_MEDIA_BUFFER_SIZE {
let bytes_to_drop = (self.media_buf.len() + data.len()) - MAX_MEDIA_BUFFER_SIZE;
warn!("RTMP buffer full ({} bytes), dropping {} oldest bytes",
self.media_buf.len(), bytes_to_drop);
self.media_buf.drain(..bytes_to_drop);
}
self.media_buf.extend(data);
// Update performance counters
self.bytes_processed += data.len() as u64;
self.frames_received += 1;
// Log buffer status every 5 seconds
if self.last_buffer_log.elapsed().as_secs() >= 5 {
let buffer_util = (self.media_buf.len() as f32 / MAX_MEDIA_BUFFER_SIZE as f32) * 100.0;
let elapsed = self.last_buffer_log.elapsed();
let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
let fps = self.frames_received as f64 / elapsed.as_secs_f64();
info!(
"RTMP ingress: {:.1} Mbps, {:.1} frames/sec, buffer: {}% ({}/{} bytes)",
mbps, fps, buffer_util as u32, self.media_buf.len(), MAX_MEDIA_BUFFER_SIZE
);
// Reset counters
self.last_buffer_log = Instant::now();
self.bytes_processed = 0;
self.frames_received = 0;
}
pub fn new(socket: TcpStream) -> Result<Self> {
socket.set_nonblocking(false)?;
let cfg = ServerSessionConfig::new();
let (ses, res) = ServerSession::new(cfg)?;
Ok(Self {
socket,
session: ses,
buffer: BufferedReader::new(1024 * 1024, MAX_MEDIA_BUFFER_SIZE, "RTMP"),
msg_queue: VecDeque::from(res),
reader_buf: [0; 4096],
published_stream: None,
})
}
async fn start(mut socket: TcpStream) -> Result<Self> {
pub fn handshake(&mut self) -> Result<()> {
let mut hs = Handshake::new(PeerType::Server);
let exchange = hs.generate_outbound_p0_and_p1()?;
socket.write_all(&exchange).await?;
self.socket.write_all(&exchange)?;
let mut buf = [0; 4096];
loop {
let r = socket.read(&mut buf).await?;
let r = self.socket.read(&mut buf)?;
if r == 0 {
bail!("EOF reached while reading");
}
match hs.process_bytes(&buf[..r])? {
HandshakeProcessResult::InProgress { response_bytes } => {
socket.write_all(&response_bytes).await?;
self.socket.write_all(&response_bytes)?;
}
HandshakeProcessResult::Completed {
response_bytes,
remaining_bytes,
} => {
socket.write_all(&response_bytes).await?;
self.socket.write_all(&response_bytes)?;
let cfg = ServerSessionConfig::new();
let (mut ses, mut res) = ServerSession::new(cfg)?;
let q = ses.handle_input(&remaining_bytes)?;
res.extend(q);
let ret = Self {
socket: socket.into_std()?,
media_buf: vec![],
session: ses,
msg_queue: VecDeque::from(res),
reader_buf: [0; 4096],
published_stream: None,
last_buffer_log: Instant::now(),
bytes_processed: 0,
frames_received: 0,
};
return Ok(ret);
let q = self.session.handle_input(&remaining_bytes)?;
self.msg_queue.extend(q);
return Ok(());
}
}
}
@ -156,10 +122,6 @@ impl RtmpClient {
ServerSessionResult::UnhandleableMessageReceived(m) => {
// Log unhandleable messages for debugging
error!("Received unhandleable message with {} bytes", m.data.len());
// Only append data if it looks like valid media data
if !m.data.is_empty() && m.data.len() > 4 {
self.add_to_media_buffer(&m.data);
}
}
}
}
@ -209,20 +171,10 @@ impl RtmpClient {
);
}
ServerSessionEvent::AudioDataReceived { data, .. } => {
// Validate audio data before adding to buffer
if !data.is_empty() {
self.add_to_media_buffer(&data);
} else {
error!("Received empty audio data");
}
self.buffer.add_data(&data);
}
ServerSessionEvent::VideoDataReceived { data, .. } => {
// Validate video data before adding to buffer
if !data.is_empty() {
self.add_to_media_buffer(&data);
} else {
error!("Received empty video data");
}
self.buffer.add_data(&data);
}
ServerSessionEvent::UnhandleableAmf0Command { .. } => {}
ServerSessionEvent::PlayStreamRequested { request_id, .. } => {
@ -241,18 +193,15 @@ impl RtmpClient {
impl Read for RtmpClient {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// block this thread until something comes into [media_buf]
while self.media_buf.is_empty() {
// Block until we have enough data to fill the buffer
while self.buffer.buf.len() < buf.len() {
if let Err(e) = self.read_data() {
error!("Error reading data: {}", e);
return Ok(0);
};
}
let to_read = buf.len().min(self.media_buf.len());
let drain = self.media_buf.drain(..to_read);
buf[..to_read].copy_from_slice(drain.as_slice());
Ok(to_read)
Ok(self.buffer.read_buffered(buf))
}
}
@ -261,7 +210,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
info!("RTMP listening on: {}", &addr);
while let Ok((socket, ip)) = listener.accept().await {
let mut cc = RtmpClient::start(socket).await?;
let mut cc = RtmpClient::new(socket.into_std()?)?;
let addr = addr.clone();
let overseer = overseer.clone();
let out_dir = out_dir.clone();
@ -269,24 +218,36 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
std::thread::Builder::new()
.name("rtmp-client".to_string())
.spawn(move || {
if let Err(e) = cc.handshake() {
bail!("Error during handshake: {}", e)
}
if let Err(e) = cc.read_until_publish_request(Duration::from_secs(10)) {
error!("{}", e);
} else {
bail!("Error waiting for publish request: {}", e)
}
let pr = cc.published_stream.as_ref().unwrap();
let info = ConnectionInfo {
id: Uuid::new_v4(),
ip_addr: ip.to_string(),
endpoint: addr.clone(),
app_name: pr.0.clone(),
key: pr.1.clone(),
};
spawn_pipeline(
let mut pl = match PipelineRunner::new(
handle,
out_dir,
overseer,
info,
out_dir.clone(),
overseer.clone(),
Box::new(cc),
);
Some("flv".to_string()),
) {
Ok(pl) => pl,
Err(e) => {
bail!("Failed to create PipelineRunner {}", e)
}
};
pl.run();
Ok(())
})?;
}
Ok(())

View File

@ -1,15 +1,15 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::ingress::{spawn_pipeline, BufferedReader, ConnectionInfo};
use crate::overseer::Overseer;
use anyhow::Result;
use futures_util::stream::FusedStream;
use futures_util::StreamExt;
use log::{info, warn};
use log::info;
use srt_tokio::{SrtListener, SrtSocket};
use std::io::Read;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::runtime::Handle;
use uuid::Uuid;
const MAX_SRT_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
@ -21,6 +21,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
while let Some(request) = packets.incoming().next().await {
let socket = request.accept(None).await?;
let info = ConnectionInfo {
id: Uuid::new_v4(),
endpoint: addr.clone(),
ip_addr: socket.settings().remote.to_string(),
app_name: "".to_string(),
@ -38,10 +39,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
Box::new(SrtReader {
handle: Handle::current(),
socket,
buf: Vec::with_capacity(4096),
last_buffer_log: Instant::now(),
bytes_processed: 0,
packets_received: 0,
buffer: BufferedReader::new(4096, MAX_SRT_BUFFER_SIZE, "SRT"),
}),
);
}
@ -51,56 +49,21 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
struct SrtReader {
pub handle: Handle,
pub socket: SrtSocket,
pub buf: Vec<u8>,
last_buffer_log: Instant,
bytes_processed: u64,
packets_received: u64,
pub buffer: BufferedReader,
}
impl Read for SrtReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let (mut rx, _) = self.socket.split_mut();
while self.buf.len() < buf.len() {
while self.buffer.buf.len() < buf.len() {
if rx.is_terminated() {
return Ok(0);
}
if let Some((_, data)) = self.handle.block_on(rx.next()) {
let data_slice = data.iter().as_slice();
// Inline buffer management to avoid borrow issues
if self.buf.len() + data_slice.len() > MAX_SRT_BUFFER_SIZE {
let bytes_to_drop = (self.buf.len() + data_slice.len()) - MAX_SRT_BUFFER_SIZE;
warn!("SRT buffer full ({} bytes), dropping {} oldest bytes",
self.buf.len(), bytes_to_drop);
self.buf.drain(..bytes_to_drop);
}
self.buf.extend(data_slice);
// Update performance counters
self.bytes_processed += data_slice.len() as u64;
self.packets_received += 1;
// Log buffer status every 5 seconds
if self.last_buffer_log.elapsed().as_secs() >= 5 {
let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0;
let elapsed = self.last_buffer_log.elapsed();
let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0);
let pps = self.packets_received as f64 / elapsed.as_secs_f64();
info!(
"SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)",
mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE
);
// Reset counters
self.last_buffer_log = Instant::now();
self.bytes_processed = 0;
self.packets_received = 0;
self.buffer.add_data(data_slice);
}
}
}
let drain = self.buf.drain(..buf.len());
buf.copy_from_slice(drain.as_slice());
Ok(buf.len())
Ok(self.buffer.read_buffered(buf))
}
}

View File

@ -5,6 +5,7 @@ use log::info;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::runtime::Handle;
use uuid::Uuid;
pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>) -> Result<()> {
let listener = TcpListener::bind(&addr).await?;
@ -12,6 +13,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
info!("TCP listening on: {}", &addr);
while let Ok((socket, ip)) = listener.accept().await {
let info = ConnectionInfo {
id: Uuid::new_v4(),
ip_addr: ip.to_string(),
endpoint: addr.clone(),
app_name: "".to_string(),

View File

@ -11,13 +11,20 @@ use ringbuf::traits::{Observer, Split};
use ringbuf::{HeapCons, HeapRb};
use std::io::Read;
use std::sync::Arc;
use std::time::Duration;
use tiny_skia::Pixmap;
use tokio::runtime::Handle;
use uuid::Uuid;
pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()> {
info!("Test pattern enabled");
// add a delay, there is a race condition somewhere, the test pattern doesnt always
// get added to active_streams
tokio::time::sleep(Duration::from_secs(1)).await;
let info = ConnectionInfo {
id: Uuid::new_v4(),
endpoint: "test-pattern".to_string(),
ip_addr: "test-pattern".to_string(),
app_name: "".to_string(),

View File

@ -166,9 +166,8 @@ impl HlsVariant {
id: v.id(),
});
has_video = true;
if ref_stream_index == -1 {
// Always use video stream as reference for segmentation
ref_stream_index = stream_idx as _;
}
},
VariantStream::Audio(a) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
@ -197,6 +196,11 @@ impl HlsVariant {
ref_stream_index != -1,
"No reference stream found, cant create variant"
);
trace!(
"{} will use stream index {} as reference for segmentation",
name,
ref_stream_index
);
unsafe {
mux.open(Some(opts))?;
}
@ -236,14 +240,7 @@ impl HlsVariant {
.to_string()
}
/// Mux a packet created by the encoder for this variant
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
// Simply process the packet directly - no reordering needed
// FFmpeg's interleaving system should handle packet ordering upstream
self.process_packet(pkt)
}
/// Process a single packet through the muxer - FFmpeg-style implementation
/// Process a single packet through the muxer
unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
let pkt_stream = *(*self.mux.context())
.streams
@ -254,7 +251,7 @@ impl HlsVariant {
let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO
&& ((*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY);
let mut is_ref_pkt =
stream_type == AVMEDIA_TYPE_VIDEO && (*pkt_stream).index == self.ref_stream_index;
stream_type == AVMEDIA_TYPE_VIDEO && (*pkt).stream_index == self.ref_stream_index;
if (*pkt).pts == AV_NOPTS_VALUE {
can_split = false;
@ -264,7 +261,8 @@ impl HlsVariant {
// check if current packet is keyframe, flush current segment
if self.packets_written > 0 && can_split {
trace!(
"Segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}",
"{} segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}",
self.name,
(*pkt).pts,
self.duration,
(*pkt).time_base.num,
@ -429,7 +427,7 @@ impl HlsVariant {
e
);
}
info!("Removed segment file: {}", seg_path.display());
trace!("Removed segment file: {}", seg_path.display());
ret.push(seg);
}
}
@ -571,9 +569,16 @@ impl HlsMuxer {
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
// very important for muxer to know which stream this pkt belongs to
(*pkt).stream_index = *vs.index() as _;
return var.mux_packet(pkt);
return var.process_packet(pkt);
}
}
bail!("Packet doesnt match any variants");
// This HLS muxer doesn't handle this variant, return None instead of failing
// This can happen when multiple egress handlers are configured with different variant sets
trace!(
"HLS muxer received packet for variant {} which it doesn't handle",
variant
);
Ok(EgressResult::None)
}
}

View File

@ -4,7 +4,6 @@ use crate::egress::EgressConfig;
use crate::overseer::IngressInfo;
use crate::variant::VariantStream;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub mod runner;
@ -42,7 +41,6 @@ impl Display for EgressType {
#[derive(Clone)]
pub struct PipelineConfig {
pub id: Uuid,
/// Transcoded/Copied stream config
pub variants: Vec<VariantStream>,
/// Output muxers
@ -57,7 +55,7 @@ pub struct PipelineConfig {
impl Display for PipelineConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "\nPipeline Config ID={}", self.id)?;
write!(f, "\nPipeline Config:")?;
write!(f, "\nVariants:")?;
for v in &self.variants {
write!(f, "\n\t{}", v)?;

View File

@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::io::Read;
use std::mem::transmute;
use std::ops::Sub;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::ptr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -16,17 +16,15 @@ use crate::mux::SegmentType;
use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
use crate::pipeline::{EgressType, PipelineConfig};
use crate::variant::{StreamMapping, VariantStream};
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame, AVMediaType, AVStream,
AV_NOPTS_VALUE,
av_frame_clone, av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame, AVPacket, AV_NOPTS_VALUE,
};
use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
StreamType,
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType,
};
use log::{error, info, warn};
use tokio::runtime::Handle;
@ -53,12 +51,12 @@ pub struct PipelineRunner {
handle: Handle,
/// Input stream connection info
connection: ConnectionInfo,
pub connection: ConnectionInfo,
/// Configuration for this pipeline (variants, egress config etc.)
config: Option<PipelineConfig>,
/// Singleton demuxer for this input
/// Where the pipeline gets packets from
demuxer: Demuxer,
/// Singleton decoder for all stream
@ -79,9 +77,6 @@ pub struct PipelineRunner {
/// All configured egress'
egress: Vec<Box<dyn Egress>>,
/// Info about the input stream
info: Option<IngressInfo>,
/// Overseer managing this pipeline
overseer: Arc<dyn Overseer>,
@ -90,6 +85,8 @@ pub struct PipelineRunner {
/// Total number of frames produced
frame_ctr: u64,
/// Output directory where all stream data is saved
out_dir: String,
/// Thumbnail generation interval (0 = disabled)
@ -97,8 +94,16 @@ pub struct PipelineRunner {
/// Current runner state (normal or idle)
state: RunnerState,
/// Counter for consecutive decode failures
consecutive_decode_failures: u32,
/// Maximum consecutive failures before triggering circuit breaker
max_consecutive_failures: u32,
}
unsafe impl Send for PipelineRunner {}
impl PipelineRunner {
pub fn new(
handle: Handle,
@ -106,6 +111,7 @@ impl PipelineRunner {
overseer: Arc<dyn Overseer>,
connection: ConnectionInfo,
recv: Box<dyn Read + Send>,
url: Option<String>,
) -> Result<Self> {
Ok(Self {
handle,
@ -113,7 +119,7 @@ impl PipelineRunner {
overseer,
connection,
config: Default::default(),
demuxer: Demuxer::new_custom_io(recv, None)?,
demuxer: Demuxer::new_custom_io(recv, url)?,
decoder: Decoder::new(),
scalers: Default::default(),
resampler: Default::default(),
@ -123,72 +129,207 @@ impl PipelineRunner {
egress: Vec::new(),
frame_ctr: 0,
fps_last_frame_ctr: 0,
info: None,
thumb_interval: 1800, // Disable thumbnails by default for performance
thumb_interval: 1800,
state: RunnerState::Normal,
consecutive_decode_failures: 0,
max_consecutive_failures: 50,
})
}
pub fn set_demuxer_buffer_size(&mut self, buffer_size: usize) {
self.demuxer.set_buffer_size(buffer_size);
}
/// Save image to disk
unsafe fn save_thumb(frame: *mut AVFrame, dst_pic: &Path) -> Result<()> {
let mut free_frame = false;
// use scaler to convert pixel format if not YUV420P
let mut frame = if (*frame).format != transmute(AV_PIX_FMT_YUV420P) {
let mut sw = Scaler::new();
let new_frame = sw.process_frame(
frame,
(*frame).width as _,
(*frame).height as _,
AV_PIX_FMT_YUV420P,
)?;
free_frame = true;
new_frame
} else {
frame
};
let encoder = Encoder::new(AV_CODEC_ID_WEBP)?
.with_height((*frame).height)
.with_width((*frame).width)
.with_pix_fmt(transmute((*frame).format))
.open(None)?;
encoder.save_picture(frame, dst_pic.to_str().unwrap())?;
if free_frame {
av_frame_free(&mut frame);
}
Ok(())
}
/// Save a decoded frame as a thumbnail
unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> {
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
let frame = av_frame_clone(frame).addr();
let dst_pic = PathBuf::from(&self.out_dir)
.join(self.connection.id.to_string())
.join("thumb.webp");
std::thread::spawn(move || unsafe {
let mut frame = frame as *mut AVFrame; //TODO: danger??
let thumb_start = Instant::now();
if let Err(e) = Self::save_thumb(frame, &dst_pic) {
warn!("Failed to save thumb: {}", e);
}
let thumb_duration = thumb_start.elapsed();
av_frame_free(&mut frame);
info!(
"Saved thumb ({}ms) to: {}",
thumb_duration.as_millis(),
dst_pic.display(),
);
});
}
Ok(())
}
/// Switch to idle mode with placeholder content generation
unsafe fn switch_to_idle_mode(&mut self, config: &PipelineConfig) -> Result<()> {
let src_video_stream = config
.ingress_info
.streams
.iter()
.find(|s| s.index == config.video_src)
.unwrap();
let src_audio_stream = config
.ingress_info
.streams
.iter()
.find(|s| Some(s.index) == config.audio_src);
let gen = FrameGenerator::from_stream(src_video_stream, src_audio_stream)?;
self.state = RunnerState::Idle {
start_time: Instant::now(),
last_frame_time: None,
gen,
};
Ok(())
}
/// Handle decode failure with circuit breaker logic
unsafe fn handle_decode_failure(
&mut self,
config: &PipelineConfig,
) -> Result<Vec<EgressResult>> {
// Check if we've hit the circuit breaker threshold
if self.consecutive_decode_failures >= self.max_consecutive_failures {
error!(
"Circuit breaker triggered: {} consecutive decode failures exceeded threshold of {}. Switching to idle mode.",
self.consecutive_decode_failures, self.max_consecutive_failures
);
// Switch to idle mode to continue stream with placeholder content
match self.switch_to_idle_mode(config) {
Ok(()) => {
self.consecutive_decode_failures = 0; // Reset counter
info!("Switched to idle mode due to excessive decode failures");
}
Err(e) => {
error!("Failed to switch to idle mode: {}", e);
bail!("Circuit breaker triggered and unable to switch to idle mode");
}
}
}
// Return empty result to skip this packet
Ok(vec![])
}
unsafe fn process_packet(&mut self, packet: *mut AVPacket) -> Result<Vec<EgressResult>> {
let config = if let Some(config) = &self.config {
config.clone()
} else {
bail!("Pipeline not configured, cant process packet")
};
// Process all packets (original or converted)
let mut egress_results = vec![];
// TODO: For copy streams, skip decoder
let frames = match self.decoder.decode_pkt(packet) {
Ok(f) => {
// Reset failure counter on successful decode
self.consecutive_decode_failures = 0;
f
}
Err(e) => {
self.consecutive_decode_failures += 1;
// Enhanced error logging with context
let packet_info = if !packet.is_null() {
format!(
"stream_idx={}, size={}, pts={}, dts={}",
(*packet).stream_index,
(*packet).size,
(*packet).pts,
(*packet).dts
)
} else {
"null packet".to_string()
};
warn!(
"Error decoding packet ({}): {}. Consecutive failures: {}/{}. Skipping packet.",
packet_info, e, self.consecutive_decode_failures, self.max_consecutive_failures
);
return self.handle_decode_failure(&config);
}
};
for (frame, stream_idx) in frames {
let stream = self.demuxer.get_stream(stream_idx as usize)?;
// Adjust frame pts time without start_offset
// Egress streams don't have a start time offset
if !stream.is_null() {
if (*stream).start_time != AV_NOPTS_VALUE {
(*frame).pts -= (*stream).start_time;
}
(*frame).time_base = (*stream).time_base;
}
let results = self.process_frame(&config, stream_idx as usize, frame)?;
egress_results.extend(results);
}
Ok(egress_results)
}
/// process the frame in the pipeline
unsafe fn process_frame(
&mut self,
config: &PipelineConfig,
stream: *mut AVStream,
stream_index: usize,
frame: *mut AVFrame,
) -> Result<Vec<EgressResult>> {
// Copy frame from GPU if using hwaccel decoding
let mut frame = get_frame_from_hw(frame)?;
(*frame).time_base = (*stream).time_base;
let p = (*stream).codecpar;
if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
// Conditionally generate thumbnails based on interval (0 = disabled)
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
let thumb_start = Instant::now();
let dst_pic = PathBuf::from(&self.out_dir)
.join(config.id.to_string())
.join("thumb.webp");
{
let mut sw = Scaler::new();
let mut scaled_frame = sw.process_frame(
frame,
(*frame).width as _,
(*frame).height as _,
AV_PIX_FMT_YUV420P,
)?;
let encoder = Encoder::new(AV_CODEC_ID_WEBP)?
.with_height((*scaled_frame).height)
.with_width((*scaled_frame).width)
.with_pix_fmt(transmute((*scaled_frame).format))
.open(None)?;
encoder.save_picture(scaled_frame, dst_pic.to_str().unwrap())?;
av_frame_free(&mut scaled_frame);
}
let thumb_duration = thumb_start.elapsed();
info!(
"Saved thumb ({:.2}ms) to: {}",
thumb_duration.as_millis() as f32 / 1000.0,
dst_pic.display(),
);
}
self.frame_ctr += 1;
}
let mut egress_results = Vec::new();
// Get the variants which want this pkt
let pkt_vars = config
.variants
.iter()
.filter(|v| v.src_index() == (*stream).index as usize);
.filter(|v| v.src_index() == stream_index);
for var in pkt_vars {
let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) {
enc
} else {
//warn!("Frame had nowhere to go in {} :/", var.id());
warn!("Frame had nowhere to go in {} :/", var.id());
continue;
};
@ -245,6 +386,11 @@ impl PipelineRunner {
}
}
// count frame as processed
if stream_index == config.video_src {
self.generate_thumb_from_frame(frame)?;
self.frame_ctr += 1;
}
av_frame_free(&mut frame);
Ok(egress_results)
}
@ -282,7 +428,7 @@ impl PipelineRunner {
}
/// EOF, cleanup
pub unsafe fn flush(&mut self) -> Result<()> {
unsafe fn flush(&mut self) -> Result<()> {
for (var, enc) in &mut self.encoders {
for mut pkt in enc.encode_frame(ptr::null_mut())? {
for eg in self.egress.iter_mut() {
@ -295,9 +441,9 @@ impl PipelineRunner {
eg.reset()?;
}
if let Some(config) = &self.config {
if self.config.is_some() {
self.handle.block_on(async {
if let Err(e) = self.overseer.on_end(&config.id).await {
if let Err(e) = self.overseer.on_end(&self.connection.id).await {
error!("Failed to end stream: {e}");
}
});
@ -305,9 +451,31 @@ impl PipelineRunner {
Ok(())
}
/// Main processor, should be called in a loop
/// Returns false when stream data ended (EOF)
pub unsafe fn run(&mut self) -> Result<bool> {
pub fn run(&mut self) {
loop {
unsafe {
match self.once() {
Ok(c) => {
if !c {
if let Err(e) = self.flush() {
error!("Pipeline flush failed: {}", e);
}
break;
}
}
Err(e) => {
if let Err(e) = self.flush() {
error!("Pipeline flush failed: {}", e);
}
error!("Pipeline run failed: {}", e);
break;
}
}
}
}
}
unsafe fn once(&mut self) -> Result<bool> {
self.setup()?;
let config = if let Some(config) = &self.config {
@ -317,31 +485,34 @@ impl PipelineRunner {
};
// run transcoder pipeline
let (mut pkt, _) = self.demuxer.get_packet()?;
let src_video_stream = config
.ingress_info
.streams
.iter()
.find(|s| s.index == config.video_src)
.unwrap();
let src_audio_stream = config
.ingress_info
.streams
.iter()
.find(|s| Some(s.index) == config.audio_src);
let (mut pkt, _) = match &self.state {
RunnerState::Normal => {
match self.demuxer.get_packet() {
Ok(pkt) => pkt,
Err(e) => {
warn!("Demuxer get_packet failed: {}, entering idle mode", e);
// Switch to idle mode when demuxer fails
match self.switch_to_idle_mode(&config) {
Ok(()) => (ptr::null_mut(), ptr::null_mut()),
Err(switch_err) => {
error!("Failed to switch to idle mode: {}", switch_err);
return Err(e.into());
}
}
}
}
}
RunnerState::Idle { .. } => {
// return empty when idle - skip demuxer completely
(ptr::null_mut(), ptr::null_mut())
}
};
// Handle state transitions based on packet availability
match (&self.state, pkt.is_null()) {
(RunnerState::Normal, true) => {
// First time entering idle mode
info!("Stream input disconnected, entering idle mode");
self.state = RunnerState::Idle {
start_time: Instant::now(),
last_frame_time: None,
gen: FrameGenerator::from_stream(src_video_stream, src_audio_stream)?,
};
self.switch_to_idle_mode(&config)?;
}
(RunnerState::Idle { start_time, .. }, true) => {
// Check if we've been idle for more than 1 minute
@ -350,14 +521,7 @@ impl PipelineRunner {
return Ok(false);
}
}
(RunnerState::Idle { .. }, false) => {
// Stream reconnected
info!("Stream reconnected, leaving idle mode");
self.state = RunnerState::Normal;
}
(RunnerState::Normal, false) => {
// Normal operation continues
}
_ => {}
}
// Process based on current state
@ -365,41 +529,17 @@ impl PipelineRunner {
RunnerState::Idle { gen, .. } => {
let frame = gen.next()?;
let stream = if (*frame).sample_rate > 0 {
self.demuxer.get_stream(
src_audio_stream
.context("frame generator created an audio frame with no src stream")?
.index,
)?
config
.audio_src
.context("got audio frame with no audio src?")?
} else {
self.demuxer.get_stream(src_video_stream.index)?
config.video_src
};
self.process_frame(&config, stream, frame)?
}
RunnerState::Normal => {
// TODO: For copy streams, skip decoder
let frames = match self.decoder.decode_pkt(pkt) {
Ok(f) => f,
Err(e) => {
warn!("Error decoding frames, {e}");
return Ok(true);
}
RunnerState::Normal => self.process_packet(pkt)?,
};
let mut egress_results = vec![];
for (frame, stream) in frames {
// Adjust frame pts time without start_offset
// Egress streams don't have a start time offset
if (*stream).start_time != AV_NOPTS_VALUE {
(*frame).pts -= (*stream).start_time;
}
let results = self.process_frame(&config, stream, frame)?;
egress_results.extend(results);
}
av_packet_free(&mut pkt);
egress_results
}
};
// egress results - process async operations without blocking if possible
if !result.is_empty() {
@ -408,7 +548,7 @@ impl PipelineRunner {
if let EgressResult::Segments { created, deleted } = er {
if let Err(e) = self
.overseer
.on_segments(&config.id, &created, &deleted)
.on_segments(&self.connection.id, &created, &deleted)
.await
{
bail!("Failed to process segment {}", e.to_string());
@ -428,15 +568,17 @@ impl PipelineRunner {
Ok(true)
}
unsafe fn setup(&mut self) -> Result<()> {
if self.info.is_some() {
fn setup(&mut self) -> Result<()> {
if self.config.is_some() {
return Ok(());
}
let info = self.demuxer.probe_input()?;
let info = unsafe {
self.demuxer
.probe_input()
.map_err(|e| anyhow!("Demuxer probe failed: {}", e))?
};
info!("{}", info);
// convert to internal type
let i_info = IngressInfo {
bitrate: info.bitrate,
@ -461,41 +603,23 @@ impl PipelineRunner {
})
.collect(),
};
let cfg = self
.handle
.block_on(async { self.overseer.start_stream(&self.connection, &i_info).await })?;
let inputs: HashSet<usize> = cfg.variants.iter().map(|e| e.src_index()).collect();
self.decoder.enable_hw_decoder_any();
for input_idx in inputs {
let stream = info.streams.iter().find(|f| f.index == input_idx).unwrap();
self.decoder.setup_decoder(stream, None)?;
}
self.setup_encoders(&cfg)?;
info!("{}", cfg);
self.config = Some(cfg);
self.info = Some(i_info);
self.setup_pipeline(&info)?;
Ok(())
}
unsafe fn setup_pipeline(&mut self, demux_info: &DemuxerInfo) -> Result<()> {
let cfg = if let Some(ref cfg) = self.config {
cfg
} else {
bail!("Cannot setup pipeline without config");
};
// src stream indexes
let inputs: HashSet<usize> = cfg.variants.iter().map(|e| e.src_index()).collect();
// enable hardware decoding
self.decoder.enable_hw_decoder_any();
// setup decoders
for input_idx in inputs {
let stream = demux_info
.streams
.iter()
.find(|f| f.index == input_idx)
.unwrap();
self.decoder.setup_decoder(stream, None)?;
}
fn setup_encoders(&mut self, cfg: &PipelineConfig) -> Result<()> {
// setup scaler/encoders
for out_stream in &cfg.variants {
match out_stream {
@ -505,7 +629,7 @@ impl PipelineRunner {
}
VariantStream::Audio(a) => {
let enc = a.try_into()?;
let fmt = av_get_sample_fmt(cstr!(a.sample_fmt.as_str()));
let fmt = unsafe { av_get_sample_fmt(cstr!(a.sample_fmt.as_str())) };
let rs = Resample::new(fmt, a.sample_rate as _, a.channels as _);
let f = AudioFifo::new(fmt, a.channels as _)?;
self.resampler.insert(out_stream.id(), (rs, f));
@ -530,12 +654,17 @@ impl PipelineRunner {
});
match e {
EgressType::HLS(_) => {
let hls =
HlsEgress::new(&cfg.id, &self.out_dir, 2.0, encoders, SegmentType::MPEGTS)?;
let hls = HlsEgress::new(
&self.connection.id,
&self.out_dir,
2.0, // TODO: configure segment length
encoders,
SegmentType::MPEGTS,
)?;
self.egress.push(Box::new(hls));
}
EgressType::Recorder(_) => {
let rec = RecorderEgress::new(&cfg.id, &self.out_dir, encoders)?;
let rec = RecorderEgress::new(&self.connection.id, &self.out_dir, encoders)?;
self.egress.push(Box::new(rec));
}
_ => warn!("{} is not implemented", e),

View File

@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use uuid::Uuid;
use tokio::task;
use log::debug;
use sha2::{Digest, Sha256};

View File

@ -43,8 +43,6 @@ struct ActiveStreamInfo {
/// zap.stream NIP-53 overseer
#[derive(Clone)]
pub struct ZapStreamOverseer {
/// Dir where HTTP server serves files from
out_dir: String,
/// Database instance for accounts/streams
db: ZapStreamDb,
/// LND node connection
@ -68,7 +66,6 @@ pub struct ZapStreamOverseer {
impl ZapStreamOverseer {
pub async fn new(
out_dir: &String,
public_url: &String,
private_key: &str,
db: &str,
@ -114,7 +111,6 @@ impl ZapStreamOverseer {
client.connect().await;
let overseer = Self {
out_dir: out_dir.clone(),
db,
lnd,
client,
@ -367,7 +363,7 @@ impl Overseer for ZapStreamOverseer {
variants: cfg.variants.iter().map(|v| v.id()).collect(),
}));
let stream_id = Uuid::new_v4();
let stream_id = connection.id.clone();
// insert new stream record
let mut new_stream = UserStream {
id: stream_id.to_string(),
@ -394,7 +390,6 @@ impl Overseer for ZapStreamOverseer {
self.db.update_stream(&new_stream).await?;
Ok(PipelineConfig {
id: stream_id,
variants: cfg.variants,
egress,
ingress_info: stream_info.clone(),
@ -545,7 +540,7 @@ struct EndpointConfig<'a> {
fn get_variants_from_endpoint<'a>(
info: &'a IngressInfo,
endpoint: &zap_stream_db::IngestEndpoint,
endpoint: &IngestEndpoint,
) -> Result<EndpointConfig<'a>> {
let capabilities_str = endpoint.capabilities.as_deref().unwrap_or("");
let capabilities: Vec<&str> = capabilities_str.split(',').collect();

View File

@ -70,7 +70,6 @@ impl Settings {
blossom,
} => Ok(Arc::new(
ZapStreamOverseer::new(
&self.output_dir,
&self.public_url,
private_key,
database,