From ad20fbc052aa0550ff5fbbe41e97f82d87649767 Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 12 Jun 2025 09:44:25 +0100 Subject: [PATCH] refactor: cleanup rtmp setup --- .gitignore | 2 +- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/core/Cargo.toml | 2 +- crates/core/src/ingress/file.rs | 2 + crates/core/src/ingress/mod.rs | 129 +++++++-- crates/core/src/ingress/rtmp.rs | 169 +++++------ crates/core/src/ingress/srt.rs | 55 +--- crates/core/src/ingress/tcp.rs | 2 + crates/core/src/ingress/test.rs | 7 + crates/core/src/mux/hls.rs | 37 +-- crates/core/src/pipeline/mod.rs | 4 +- crates/core/src/pipeline/runner.rs | 443 +++++++++++++++++++---------- crates/core/src/viewer.rs | 1 - crates/zap-stream/src/overseer.rs | 9 +- crates/zap-stream/src/settings.rs | 1 - 16 files changed, 501 insertions(+), 366 deletions(-) diff --git a/.gitignore b/.gitignore index f518383..b213f60 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ **/target .idea/ -out/ \ No newline at end of file +**/out/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index a872423..2e496f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1045,7 +1045,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=d79693ddb0bee2e94c1db07f789523e87bf1b0fc#d79693ddb0bee2e94c1db07f789523e87bf1b0fc" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=aa1ce3edcad0fcd286d39b3e0c2fdc610c3988e7#aa1ce3edcad0fcd286d39b3e0c2fdc610c3988e7" dependencies = [ "anyhow", "ffmpeg-sys-the-third", diff --git a/Cargo.toml b/Cargo.toml index 83b0d5e..e38b361 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ codegen-units = 1 panic = "unwind" [workspace.dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "d79693ddb0bee2e94c1db07f789523e87bf1b0fc" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "aa1ce3edcad0fcd286d39b3e0c2fdc610c3988e7" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } async-trait = "0.1.77" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 2572634..12e6da8 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -30,7 +30,7 @@ fontdue = "0.9.2" ringbuf = "0.4.7" # srt -srt-tokio = { version = "0.4.3", optional = true } +srt-tokio = { version = "0.4.4", optional = true } # rtmp rml_rtmp = { version = "0.8.0", optional = true } diff --git a/crates/core/src/ingress/file.rs b/crates/core/src/ingress/file.rs index 206b7b0..27b07f8 100644 --- a/crates/core/src/ingress/file.rs +++ b/crates/core/src/ingress/file.rs @@ -5,11 +5,13 @@ use log::info; use std::path::PathBuf; use std::sync::Arc; use tokio::runtime::Handle; +use uuid::Uuid; pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) -> Result<()> { info!("Sending file: {}", path.display()); let info = ConnectionInfo { + id: Uuid::new_v4(), ip_addr: "127.0.0.1:6969".to_string(), endpoint: "file-input".to_owned(), app_name: "".to_string(), diff --git a/crates/core/src/ingress/mod.rs b/crates/core/src/ingress/mod.rs index a9ecf76..11710d6 100644 --- a/crates/core/src/ingress/mod.rs +++ b/crates/core/src/ingress/mod.rs @@ -1,10 +1,12 @@ use crate::overseer::Overseer; use crate::pipeline::runner::PipelineRunner; -use log::{error, info}; +use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use std::io::Read; use std::sync::Arc; +use std::time::Instant; use tokio::runtime::Handle; +use uuid::Uuid; pub mod file; #[cfg(feature = "rtmp")] @@ -16,6 +18,9 @@ pub mod test; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ConnectionInfo { + /// Unique ID of this connection / pipeline + pub id: Uuid, + /// Endpoint of the ingress pub endpoint: String, @@ -36,33 +41,103 @@ pub fn spawn_pipeline( seer: Arc, reader: Box, ) { - info!("New client connected: {}", &info.ip_addr); - let seer = seer.clone(); - let out_dir = out_dir.to_string(); - std::thread::spawn(move || unsafe { - match PipelineRunner::new(handle, out_dir, seer, info, reader) { - Ok(mut pl) => loop { - match pl.run() { - Ok(c) => { - if !c { - if let Err(e) = pl.flush() { - error!("Pipeline flush failed: {}", e); - } - break; - } - } - Err(e) => { - if let Err(e) = pl.flush() { - error!("Pipeline flush failed: {}", e); - } - error!("Pipeline run failed: {}", e); - break; - } - } - }, + match PipelineRunner::new(handle, out_dir, seer, info, reader, None) { + Ok(pl) => match run_pipeline(pl) { + Ok(_) => {} Err(e) => { - error!("Failed to create PipelineRunner: {}", e); + error!("Failed to run PipelineRunner: {}", e); } + }, + Err(e) => { + error!("Failed to create PipelineRunner: {}", e); } - }); + } +} + +pub fn run_pipeline(mut pl: PipelineRunner) -> anyhow::Result<()> { + info!("New client connected: {}", &pl.connection.ip_addr); + + std::thread::Builder::new() + .name(format!("pipeline-{}", pl.connection.id)) + .spawn(move || { + pl.run(); + })?; + Ok(()) +} + +/// Common buffered reader functionality for ingress sources +pub struct BufferedReader { + pub buf: Vec, + pub max_buffer_size: usize, + pub last_buffer_log: Instant, + pub bytes_processed: u64, + pub packets_received: u64, + pub source_name: &'static str, +} + +impl BufferedReader { + pub fn new(capacity: usize, max_size: usize, source_name: &'static str) -> Self { + Self { + buf: Vec::with_capacity(capacity), + max_buffer_size: max_size, + last_buffer_log: Instant::now(), + bytes_processed: 0, + packets_received: 0, + source_name, + } + } + + /// Add data to buffer with size limit and performance tracking + pub fn add_data(&mut self, data: &[u8]) { + // Inline buffer management to avoid borrow issues + if self.buf.len() + data.len() > self.max_buffer_size { + let bytes_to_drop = (self.buf.len() + data.len()) - self.max_buffer_size; + warn!( + "{} buffer full ({} bytes), dropping {} oldest bytes", + self.source_name, + self.buf.len(), + bytes_to_drop + ); + self.buf.drain(..bytes_to_drop); + } + self.buf.extend(data); + + // Update performance counters + self.bytes_processed += data.len() as u64; + self.packets_received += 1; + + // Log buffer status every 5 seconds + if self.last_buffer_log.elapsed().as_secs() >= 5 { + let buffer_util = (self.buf.len() as f32 / self.max_buffer_size as f32) * 100.0; + let elapsed = self.last_buffer_log.elapsed(); + let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0); + let pps = self.packets_received as f64 / elapsed.as_secs_f64(); + + info!( + "{} ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)", + self.source_name, + mbps, + pps, + buffer_util as u32, + self.buf.len(), + self.max_buffer_size + ); + + // Reset counters + self.last_buffer_log = Instant::now(); + self.bytes_processed = 0; + self.packets_received = 0; + } + } + + /// Read data from buffer, filling the entire output buffer before returning + pub fn read_buffered(&mut self, buf: &mut [u8]) -> usize { + if self.buf.len() >= buf.len() { + let drain = self.buf.drain(..buf.len()); + buf.copy_from_slice(drain.as_slice()); + buf.len() + } else { + 0 + } + } } diff --git a/crates/core/src/ingress/rtmp.rs b/crates/core/src/ingress/rtmp.rs index 9e71d72..e8219dd 100644 --- a/crates/core/src/ingress/rtmp.rs +++ b/crates/core/src/ingress/rtmp.rs @@ -1,111 +1,77 @@ -use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::ingress::{BufferedReader, ConnectionInfo}; use crate::overseer::Overseer; +use crate::pipeline::runner::PipelineRunner; use anyhow::{bail, Result}; -use log::{error, info, warn}; +use log::{error, info}; use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType}; use rml_rtmp::sessions::{ ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult, }; use std::collections::VecDeque; use std::io::{ErrorKind, Read, Write}; +use std::net::TcpStream; use std::sync::Arc; use std::time::Duration; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::TcpListener; use tokio::runtime::Handle; use tokio::time::Instant; +use uuid::Uuid; const MAX_MEDIA_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit + #[derive(PartialEq, Eq, Clone, Hash)] struct RtmpPublishedStream(String, String); struct RtmpClient { - socket: std::net::TcpStream, - media_buf: Vec, + socket: TcpStream, + buffer: BufferedReader, session: ServerSession, msg_queue: VecDeque, reader_buf: [u8; 4096], pub published_stream: Option, - last_buffer_log: Instant, - bytes_processed: u64, - frames_received: u64, } impl RtmpClient { - /// Add data to media buffer with size limit to prevent unbounded growth - fn add_to_media_buffer(&mut self, data: &[u8]) { - if self.media_buf.len() + data.len() > MAX_MEDIA_BUFFER_SIZE { - let bytes_to_drop = (self.media_buf.len() + data.len()) - MAX_MEDIA_BUFFER_SIZE; - warn!("RTMP buffer full ({} bytes), dropping {} oldest bytes", - self.media_buf.len(), bytes_to_drop); - self.media_buf.drain(..bytes_to_drop); - } - self.media_buf.extend(data); - - // Update performance counters - self.bytes_processed += data.len() as u64; - self.frames_received += 1; - - // Log buffer status every 5 seconds - if self.last_buffer_log.elapsed().as_secs() >= 5 { - let buffer_util = (self.media_buf.len() as f32 / MAX_MEDIA_BUFFER_SIZE as f32) * 100.0; - let elapsed = self.last_buffer_log.elapsed(); - let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0); - let fps = self.frames_received as f64 / elapsed.as_secs_f64(); - - info!( - "RTMP ingress: {:.1} Mbps, {:.1} frames/sec, buffer: {}% ({}/{} bytes)", - mbps, fps, buffer_util as u32, self.media_buf.len(), MAX_MEDIA_BUFFER_SIZE - ); - - // Reset counters - self.last_buffer_log = Instant::now(); - self.bytes_processed = 0; - self.frames_received = 0; - } + pub fn new(socket: TcpStream) -> Result { + socket.set_nonblocking(false)?; + let cfg = ServerSessionConfig::new(); + let (ses, res) = ServerSession::new(cfg)?; + Ok(Self { + socket, + session: ses, + buffer: BufferedReader::new(1024 * 1024, MAX_MEDIA_BUFFER_SIZE, "RTMP"), + msg_queue: VecDeque::from(res), + reader_buf: [0; 4096], + published_stream: None, + }) } - async fn start(mut socket: TcpStream) -> Result { + pub fn handshake(&mut self) -> Result<()> { let mut hs = Handshake::new(PeerType::Server); let exchange = hs.generate_outbound_p0_and_p1()?; - socket.write_all(&exchange).await?; + self.socket.write_all(&exchange)?; let mut buf = [0; 4096]; loop { - let r = socket.read(&mut buf).await?; + let r = self.socket.read(&mut buf)?; if r == 0 { bail!("EOF reached while reading"); } match hs.process_bytes(&buf[..r])? { HandshakeProcessResult::InProgress { response_bytes } => { - socket.write_all(&response_bytes).await?; + self.socket.write_all(&response_bytes)?; } HandshakeProcessResult::Completed { response_bytes, remaining_bytes, } => { - socket.write_all(&response_bytes).await?; + self.socket.write_all(&response_bytes)?; - let cfg = ServerSessionConfig::new(); - let (mut ses, mut res) = ServerSession::new(cfg)?; - let q = ses.handle_input(&remaining_bytes)?; - res.extend(q); - - let ret = Self { - socket: socket.into_std()?, - media_buf: vec![], - session: ses, - msg_queue: VecDeque::from(res), - reader_buf: [0; 4096], - published_stream: None, - last_buffer_log: Instant::now(), - bytes_processed: 0, - frames_received: 0, - }; - - return Ok(ret); + let q = self.session.handle_input(&remaining_bytes)?; + self.msg_queue.extend(q); + return Ok(()); } } } @@ -154,12 +120,8 @@ impl RtmpClient { } ServerSessionResult::RaisedEvent(ev) => self.handle_event(ev)?, ServerSessionResult::UnhandleableMessageReceived(m) => { - // Log unhandleable messages for debugging + // Log unhandleable messages for debugging error!("Received unhandleable message with {} bytes", m.data.len()); - // Only append data if it looks like valid media data - if !m.data.is_empty() && m.data.len() > 4 { - self.add_to_media_buffer(&m.data); - } } } } @@ -209,20 +171,10 @@ impl RtmpClient { ); } ServerSessionEvent::AudioDataReceived { data, .. } => { - // Validate audio data before adding to buffer - if !data.is_empty() { - self.add_to_media_buffer(&data); - } else { - error!("Received empty audio data"); - } + self.buffer.add_data(&data); } ServerSessionEvent::VideoDataReceived { data, .. } => { - // Validate video data before adding to buffer - if !data.is_empty() { - self.add_to_media_buffer(&data); - } else { - error!("Received empty video data"); - } + self.buffer.add_data(&data); } ServerSessionEvent::UnhandleableAmf0Command { .. } => {} ServerSessionEvent::PlayStreamRequested { request_id, .. } => { @@ -241,18 +193,15 @@ impl RtmpClient { impl Read for RtmpClient { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - // block this thread until something comes into [media_buf] - while self.media_buf.is_empty() { + // Block until we have enough data to fill the buffer + while self.buffer.buf.len() < buf.len() { if let Err(e) = self.read_data() { error!("Error reading data: {}", e); return Ok(0); }; } - let to_read = buf.len().min(self.media_buf.len()); - let drain = self.media_buf.drain(..to_read); - buf[..to_read].copy_from_slice(drain.as_slice()); - Ok(to_read) + Ok(self.buffer.read_buffered(buf)) } } @@ -261,7 +210,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) info!("RTMP listening on: {}", &addr); while let Ok((socket, ip)) = listener.accept().await { - let mut cc = RtmpClient::start(socket).await?; + let mut cc = RtmpClient::new(socket.into_std()?)?; let addr = addr.clone(); let overseer = overseer.clone(); let out_dir = out_dir.clone(); @@ -269,24 +218,36 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) std::thread::Builder::new() .name("rtmp-client".to_string()) .spawn(move || { - if let Err(e) = cc.read_until_publish_request(Duration::from_secs(10)) { - error!("{}", e); - } else { - let pr = cc.published_stream.as_ref().unwrap(); - let info = ConnectionInfo { - ip_addr: ip.to_string(), - endpoint: addr.clone(), - app_name: pr.0.clone(), - key: pr.1.clone(), - }; - spawn_pipeline( - handle, - info, - out_dir.clone(), - overseer.clone(), - Box::new(cc), - ); + if let Err(e) = cc.handshake() { + bail!("Error during handshake: {}", e) } + if let Err(e) = cc.read_until_publish_request(Duration::from_secs(10)) { + bail!("Error waiting for publish request: {}", e) + } + + let pr = cc.published_stream.as_ref().unwrap(); + let info = ConnectionInfo { + id: Uuid::new_v4(), + ip_addr: ip.to_string(), + endpoint: addr.clone(), + app_name: pr.0.clone(), + key: pr.1.clone(), + }; + let mut pl = match PipelineRunner::new( + handle, + out_dir, + overseer, + info, + Box::new(cc), + Some("flv".to_string()), + ) { + Ok(pl) => pl, + Err(e) => { + bail!("Failed to create PipelineRunner {}", e) + } + }; + pl.run(); + Ok(()) })?; } Ok(()) diff --git a/crates/core/src/ingress/srt.rs b/crates/core/src/ingress/srt.rs index c0e515b..86d50ea 100644 --- a/crates/core/src/ingress/srt.rs +++ b/crates/core/src/ingress/srt.rs @@ -1,15 +1,15 @@ -use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::ingress::{spawn_pipeline, BufferedReader, ConnectionInfo}; use crate::overseer::Overseer; use anyhow::Result; use futures_util::stream::FusedStream; use futures_util::StreamExt; -use log::{info, warn}; +use log::info; use srt_tokio::{SrtListener, SrtSocket}; use std::io::Read; use std::net::SocketAddr; use std::sync::Arc; -use std::time::Instant; use tokio::runtime::Handle; +use uuid::Uuid; const MAX_SRT_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit @@ -21,6 +21,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) while let Some(request) = packets.incoming().next().await { let socket = request.accept(None).await?; let info = ConnectionInfo { + id: Uuid::new_v4(), endpoint: addr.clone(), ip_addr: socket.settings().remote.to_string(), app_name: "".to_string(), @@ -38,10 +39,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) Box::new(SrtReader { handle: Handle::current(), socket, - buf: Vec::with_capacity(4096), - last_buffer_log: Instant::now(), - bytes_processed: 0, - packets_received: 0, + buffer: BufferedReader::new(4096, MAX_SRT_BUFFER_SIZE, "SRT"), }), ); } @@ -51,56 +49,21 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) struct SrtReader { pub handle: Handle, pub socket: SrtSocket, - pub buf: Vec, - last_buffer_log: Instant, - bytes_processed: u64, - packets_received: u64, + pub buffer: BufferedReader, } impl Read for SrtReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let (mut rx, _) = self.socket.split_mut(); - while self.buf.len() < buf.len() { + while self.buffer.buf.len() < buf.len() { if rx.is_terminated() { return Ok(0); } if let Some((_, data)) = self.handle.block_on(rx.next()) { let data_slice = data.iter().as_slice(); - - // Inline buffer management to avoid borrow issues - if self.buf.len() + data_slice.len() > MAX_SRT_BUFFER_SIZE { - let bytes_to_drop = (self.buf.len() + data_slice.len()) - MAX_SRT_BUFFER_SIZE; - warn!("SRT buffer full ({} bytes), dropping {} oldest bytes", - self.buf.len(), bytes_to_drop); - self.buf.drain(..bytes_to_drop); - } - self.buf.extend(data_slice); - - // Update performance counters - self.bytes_processed += data_slice.len() as u64; - self.packets_received += 1; - - // Log buffer status every 5 seconds - if self.last_buffer_log.elapsed().as_secs() >= 5 { - let buffer_util = (self.buf.len() as f32 / MAX_SRT_BUFFER_SIZE as f32) * 100.0; - let elapsed = self.last_buffer_log.elapsed(); - let mbps = (self.bytes_processed as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0); - let pps = self.packets_received as f64 / elapsed.as_secs_f64(); - - info!( - "SRT ingress: {:.1} Mbps, {:.1} packets/sec, buffer: {}% ({}/{} bytes)", - mbps, pps, buffer_util as u32, self.buf.len(), MAX_SRT_BUFFER_SIZE - ); - - // Reset counters - self.last_buffer_log = Instant::now(); - self.bytes_processed = 0; - self.packets_received = 0; - } + self.buffer.add_data(data_slice); } } - let drain = self.buf.drain(..buf.len()); - buf.copy_from_slice(drain.as_slice()); - Ok(buf.len()) + Ok(self.buffer.read_buffered(buf)) } } diff --git a/crates/core/src/ingress/tcp.rs b/crates/core/src/ingress/tcp.rs index f24163e..0fd96c7 100644 --- a/crates/core/src/ingress/tcp.rs +++ b/crates/core/src/ingress/tcp.rs @@ -5,6 +5,7 @@ use log::info; use std::sync::Arc; use tokio::net::TcpListener; use tokio::runtime::Handle; +use uuid::Uuid; pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { let listener = TcpListener::bind(&addr).await?; @@ -12,6 +13,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) info!("TCP listening on: {}", &addr); while let Ok((socket, ip)) = listener.accept().await { let info = ConnectionInfo { + id: Uuid::new_v4(), ip_addr: ip.to_string(), endpoint: addr.clone(), app_name: "".to_string(), diff --git a/crates/core/src/ingress/test.rs b/crates/core/src/ingress/test.rs index f333bb0..4d588eb 100644 --- a/crates/core/src/ingress/test.rs +++ b/crates/core/src/ingress/test.rs @@ -11,13 +11,20 @@ use ringbuf::traits::{Observer, Split}; use ringbuf::{HeapCons, HeapRb}; use std::io::Read; use std::sync::Arc; +use std::time::Duration; use tiny_skia::Pixmap; use tokio::runtime::Handle; +use uuid::Uuid; pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> { info!("Test pattern enabled"); + // add a delay, there is a race condition somewhere, the test pattern doesnt always + // get added to active_streams + tokio::time::sleep(Duration::from_secs(1)).await; + let info = ConnectionInfo { + id: Uuid::new_v4(), endpoint: "test-pattern".to_string(), ip_addr: "test-pattern".to_string(), app_name: "".to_string(), diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index e54c854..00bc272 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -166,9 +166,8 @@ impl HlsVariant { id: v.id(), }); has_video = true; - if ref_stream_index == -1 { - ref_stream_index = stream_idx as _; - } + // Always use video stream as reference for segmentation + ref_stream_index = stream_idx as _; }, VariantStream::Audio(a) => unsafe { let stream = mux.add_stream_encoder(enc)?; @@ -197,6 +196,11 @@ impl HlsVariant { ref_stream_index != -1, "No reference stream found, cant create variant" ); + trace!( + "{} will use stream index {} as reference for segmentation", + name, + ref_stream_index + ); unsafe { mux.open(Some(opts))?; } @@ -236,14 +240,7 @@ impl HlsVariant { .to_string() } - /// Mux a packet created by the encoder for this variant - pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result { - // Simply process the packet directly - no reordering needed - // FFmpeg's interleaving system should handle packet ordering upstream - self.process_packet(pkt) - } - - /// Process a single packet through the muxer - FFmpeg-style implementation + /// Process a single packet through the muxer unsafe fn process_packet(&mut self, pkt: *mut AVPacket) -> Result { let pkt_stream = *(*self.mux.context()) .streams @@ -254,7 +251,7 @@ impl HlsVariant { let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO && ((*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY); let mut is_ref_pkt = - stream_type == AVMEDIA_TYPE_VIDEO && (*pkt_stream).index == self.ref_stream_index; + stream_type == AVMEDIA_TYPE_VIDEO && (*pkt).stream_index == self.ref_stream_index; if (*pkt).pts == AV_NOPTS_VALUE { can_split = false; @@ -264,7 +261,8 @@ impl HlsVariant { // check if current packet is keyframe, flush current segment if self.packets_written > 0 && can_split { trace!( - "Segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}", + "{} segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}", + self.name, (*pkt).pts, self.duration, (*pkt).time_base.num, @@ -429,7 +427,7 @@ impl HlsVariant { e ); } - info!("Removed segment file: {}", seg_path.display()); + trace!("Removed segment file: {}", seg_path.display()); ret.push(seg); } } @@ -571,9 +569,16 @@ impl HlsMuxer { if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) { // very important for muxer to know which stream this pkt belongs to (*pkt).stream_index = *vs.index() as _; - return var.mux_packet(pkt); + return var.process_packet(pkt); } } - bail!("Packet doesnt match any variants"); + + // This HLS muxer doesn't handle this variant, return None instead of failing + // This can happen when multiple egress handlers are configured with different variant sets + trace!( + "HLS muxer received packet for variant {} which it doesn't handle", + variant + ); + Ok(EgressResult::None) } } diff --git a/crates/core/src/pipeline/mod.rs b/crates/core/src/pipeline/mod.rs index c2c978f..3a9690d 100644 --- a/crates/core/src/pipeline/mod.rs +++ b/crates/core/src/pipeline/mod.rs @@ -4,7 +4,6 @@ use crate::egress::EgressConfig; use crate::overseer::IngressInfo; use crate::variant::VariantStream; use serde::{Deserialize, Serialize}; -use uuid::Uuid; pub mod runner; @@ -42,7 +41,6 @@ impl Display for EgressType { #[derive(Clone)] pub struct PipelineConfig { - pub id: Uuid, /// Transcoded/Copied stream config pub variants: Vec, /// Output muxers @@ -57,7 +55,7 @@ pub struct PipelineConfig { impl Display for PipelineConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "\nPipeline Config ID={}", self.id)?; + write!(f, "\nPipeline Config:")?; write!(f, "\nVariants:")?; for v in &self.variants { write!(f, "\n\t{}", v)?; diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 10cc094..f689315 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::io::Read; use std::mem::transmute; use std::ops::Sub; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::ptr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -16,17 +16,15 @@ use crate::mux::SegmentType; use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use crate::pipeline::{EgressType, PipelineConfig}; use crate::variant::{StreamMapping, VariantStream}; -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame, AVMediaType, AVStream, - AV_NOPTS_VALUE, + av_frame_clone, av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame, AVPacket, AV_NOPTS_VALUE, }; use ffmpeg_rs_raw::{ - cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, - StreamType, + cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType, }; use log::{error, info, warn}; use tokio::runtime::Handle; @@ -53,12 +51,12 @@ pub struct PipelineRunner { handle: Handle, /// Input stream connection info - connection: ConnectionInfo, + pub connection: ConnectionInfo, /// Configuration for this pipeline (variants, egress config etc.) config: Option, - /// Singleton demuxer for this input + /// Where the pipeline gets packets from demuxer: Demuxer, /// Singleton decoder for all stream @@ -79,9 +77,6 @@ pub struct PipelineRunner { /// All configured egress' egress: Vec>, - /// Info about the input stream - info: Option, - /// Overseer managing this pipeline overseer: Arc, @@ -90,6 +85,8 @@ pub struct PipelineRunner { /// Total number of frames produced frame_ctr: u64, + + /// Output directory where all stream data is saved out_dir: String, /// Thumbnail generation interval (0 = disabled) @@ -97,8 +94,16 @@ pub struct PipelineRunner { /// Current runner state (normal or idle) state: RunnerState, + + /// Counter for consecutive decode failures + consecutive_decode_failures: u32, + + /// Maximum consecutive failures before triggering circuit breaker + max_consecutive_failures: u32, } +unsafe impl Send for PipelineRunner {} + impl PipelineRunner { pub fn new( handle: Handle, @@ -106,6 +111,7 @@ impl PipelineRunner { overseer: Arc, connection: ConnectionInfo, recv: Box, + url: Option, ) -> Result { Ok(Self { handle, @@ -113,7 +119,7 @@ impl PipelineRunner { overseer, connection, config: Default::default(), - demuxer: Demuxer::new_custom_io(recv, None)?, + demuxer: Demuxer::new_custom_io(recv, url)?, decoder: Decoder::new(), scalers: Default::default(), resampler: Default::default(), @@ -123,72 +129,207 @@ impl PipelineRunner { egress: Vec::new(), frame_ctr: 0, fps_last_frame_ctr: 0, - info: None, - thumb_interval: 1800, // Disable thumbnails by default for performance + thumb_interval: 1800, state: RunnerState::Normal, + consecutive_decode_failures: 0, + max_consecutive_failures: 50, }) } + pub fn set_demuxer_buffer_size(&mut self, buffer_size: usize) { + self.demuxer.set_buffer_size(buffer_size); + } + + /// Save image to disk + unsafe fn save_thumb(frame: *mut AVFrame, dst_pic: &Path) -> Result<()> { + let mut free_frame = false; + // use scaler to convert pixel format if not YUV420P + let mut frame = if (*frame).format != transmute(AV_PIX_FMT_YUV420P) { + let mut sw = Scaler::new(); + let new_frame = sw.process_frame( + frame, + (*frame).width as _, + (*frame).height as _, + AV_PIX_FMT_YUV420P, + )?; + free_frame = true; + new_frame + } else { + frame + }; + + let encoder = Encoder::new(AV_CODEC_ID_WEBP)? + .with_height((*frame).height) + .with_width((*frame).width) + .with_pix_fmt(transmute((*frame).format)) + .open(None)?; + + encoder.save_picture(frame, dst_pic.to_str().unwrap())?; + if free_frame { + av_frame_free(&mut frame); + } + Ok(()) + } + + /// Save a decoded frame as a thumbnail + unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> { + if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { + let frame = av_frame_clone(frame).addr(); + let dst_pic = PathBuf::from(&self.out_dir) + .join(self.connection.id.to_string()) + .join("thumb.webp"); + std::thread::spawn(move || unsafe { + let mut frame = frame as *mut AVFrame; //TODO: danger?? + let thumb_start = Instant::now(); + + if let Err(e) = Self::save_thumb(frame, &dst_pic) { + warn!("Failed to save thumb: {}", e); + } + + let thumb_duration = thumb_start.elapsed(); + av_frame_free(&mut frame); + info!( + "Saved thumb ({}ms) to: {}", + thumb_duration.as_millis(), + dst_pic.display(), + ); + }); + } + Ok(()) + } + + /// Switch to idle mode with placeholder content generation + unsafe fn switch_to_idle_mode(&mut self, config: &PipelineConfig) -> Result<()> { + let src_video_stream = config + .ingress_info + .streams + .iter() + .find(|s| s.index == config.video_src) + .unwrap(); + let src_audio_stream = config + .ingress_info + .streams + .iter() + .find(|s| Some(s.index) == config.audio_src); + + let gen = FrameGenerator::from_stream(src_video_stream, src_audio_stream)?; + self.state = RunnerState::Idle { + start_time: Instant::now(), + last_frame_time: None, + gen, + }; + Ok(()) + } + + /// Handle decode failure with circuit breaker logic + unsafe fn handle_decode_failure( + &mut self, + config: &PipelineConfig, + ) -> Result> { + // Check if we've hit the circuit breaker threshold + if self.consecutive_decode_failures >= self.max_consecutive_failures { + error!( + "Circuit breaker triggered: {} consecutive decode failures exceeded threshold of {}. Switching to idle mode.", + self.consecutive_decode_failures, self.max_consecutive_failures + ); + + // Switch to idle mode to continue stream with placeholder content + match self.switch_to_idle_mode(config) { + Ok(()) => { + self.consecutive_decode_failures = 0; // Reset counter + info!("Switched to idle mode due to excessive decode failures"); + } + Err(e) => { + error!("Failed to switch to idle mode: {}", e); + bail!("Circuit breaker triggered and unable to switch to idle mode"); + } + } + } + + // Return empty result to skip this packet + Ok(vec![]) + } + + unsafe fn process_packet(&mut self, packet: *mut AVPacket) -> Result> { + let config = if let Some(config) = &self.config { + config.clone() + } else { + bail!("Pipeline not configured, cant process packet") + }; + + // Process all packets (original or converted) + let mut egress_results = vec![]; + // TODO: For copy streams, skip decoder + let frames = match self.decoder.decode_pkt(packet) { + Ok(f) => { + // Reset failure counter on successful decode + self.consecutive_decode_failures = 0; + f + } + Err(e) => { + self.consecutive_decode_failures += 1; + + // Enhanced error logging with context + let packet_info = if !packet.is_null() { + format!( + "stream_idx={}, size={}, pts={}, dts={}", + (*packet).stream_index, + (*packet).size, + (*packet).pts, + (*packet).dts + ) + } else { + "null packet".to_string() + }; + + warn!( + "Error decoding packet ({}): {}. Consecutive failures: {}/{}. Skipping packet.", + packet_info, e, self.consecutive_decode_failures, self.max_consecutive_failures + ); + + return self.handle_decode_failure(&config); + } + }; + + for (frame, stream_idx) in frames { + let stream = self.demuxer.get_stream(stream_idx as usize)?; + // Adjust frame pts time without start_offset + // Egress streams don't have a start time offset + if !stream.is_null() { + if (*stream).start_time != AV_NOPTS_VALUE { + (*frame).pts -= (*stream).start_time; + } + (*frame).time_base = (*stream).time_base; + } + + let results = self.process_frame(&config, stream_idx as usize, frame)?; + egress_results.extend(results); + } + + Ok(egress_results) + } + /// process the frame in the pipeline unsafe fn process_frame( &mut self, config: &PipelineConfig, - stream: *mut AVStream, + stream_index: usize, frame: *mut AVFrame, ) -> Result> { // Copy frame from GPU if using hwaccel decoding let mut frame = get_frame_from_hw(frame)?; - (*frame).time_base = (*stream).time_base; - - let p = (*stream).codecpar; - if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO { - // Conditionally generate thumbnails based on interval (0 = disabled) - if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { - let thumb_start = Instant::now(); - let dst_pic = PathBuf::from(&self.out_dir) - .join(config.id.to_string()) - .join("thumb.webp"); - { - let mut sw = Scaler::new(); - let mut scaled_frame = sw.process_frame( - frame, - (*frame).width as _, - (*frame).height as _, - AV_PIX_FMT_YUV420P, - )?; - - let encoder = Encoder::new(AV_CODEC_ID_WEBP)? - .with_height((*scaled_frame).height) - .with_width((*scaled_frame).width) - .with_pix_fmt(transmute((*scaled_frame).format)) - .open(None)?; - - encoder.save_picture(scaled_frame, dst_pic.to_str().unwrap())?; - av_frame_free(&mut scaled_frame); - } - - let thumb_duration = thumb_start.elapsed(); - info!( - "Saved thumb ({:.2}ms) to: {}", - thumb_duration.as_millis() as f32 / 1000.0, - dst_pic.display(), - ); - } - - self.frame_ctr += 1; - } let mut egress_results = Vec::new(); // Get the variants which want this pkt let pkt_vars = config .variants .iter() - .filter(|v| v.src_index() == (*stream).index as usize); + .filter(|v| v.src_index() == stream_index); for var in pkt_vars { let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) { enc } else { - //warn!("Frame had nowhere to go in {} :/", var.id()); + warn!("Frame had nowhere to go in {} :/", var.id()); continue; }; @@ -245,6 +386,11 @@ impl PipelineRunner { } } + // count frame as processed + if stream_index == config.video_src { + self.generate_thumb_from_frame(frame)?; + self.frame_ctr += 1; + } av_frame_free(&mut frame); Ok(egress_results) } @@ -282,7 +428,7 @@ impl PipelineRunner { } /// EOF, cleanup - pub unsafe fn flush(&mut self) -> Result<()> { + unsafe fn flush(&mut self) -> Result<()> { for (var, enc) in &mut self.encoders { for mut pkt in enc.encode_frame(ptr::null_mut())? { for eg in self.egress.iter_mut() { @@ -295,9 +441,9 @@ impl PipelineRunner { eg.reset()?; } - if let Some(config) = &self.config { + if self.config.is_some() { self.handle.block_on(async { - if let Err(e) = self.overseer.on_end(&config.id).await { + if let Err(e) = self.overseer.on_end(&self.connection.id).await { error!("Failed to end stream: {e}"); } }); @@ -305,9 +451,31 @@ impl PipelineRunner { Ok(()) } - /// Main processor, should be called in a loop - /// Returns false when stream data ended (EOF) - pub unsafe fn run(&mut self) -> Result { + pub fn run(&mut self) { + loop { + unsafe { + match self.once() { + Ok(c) => { + if !c { + if let Err(e) = self.flush() { + error!("Pipeline flush failed: {}", e); + } + break; + } + } + Err(e) => { + if let Err(e) = self.flush() { + error!("Pipeline flush failed: {}", e); + } + error!("Pipeline run failed: {}", e); + break; + } + } + } + } + } + + unsafe fn once(&mut self) -> Result { self.setup()?; let config = if let Some(config) = &self.config { @@ -317,31 +485,34 @@ impl PipelineRunner { }; // run transcoder pipeline - let (mut pkt, _) = self.demuxer.get_packet()?; - - let src_video_stream = config - .ingress_info - .streams - .iter() - .find(|s| s.index == config.video_src) - .unwrap(); - let src_audio_stream = config - .ingress_info - .streams - .iter() - .find(|s| Some(s.index) == config.audio_src); + let (mut pkt, _) = match &self.state { + RunnerState::Normal => { + match self.demuxer.get_packet() { + Ok(pkt) => pkt, + Err(e) => { + warn!("Demuxer get_packet failed: {}, entering idle mode", e); + // Switch to idle mode when demuxer fails + match self.switch_to_idle_mode(&config) { + Ok(()) => (ptr::null_mut(), ptr::null_mut()), + Err(switch_err) => { + error!("Failed to switch to idle mode: {}", switch_err); + return Err(e.into()); + } + } + } + } + } + RunnerState::Idle { .. } => { + // return empty when idle - skip demuxer completely + (ptr::null_mut(), ptr::null_mut()) + } + }; // Handle state transitions based on packet availability match (&self.state, pkt.is_null()) { (RunnerState::Normal, true) => { - // First time entering idle mode info!("Stream input disconnected, entering idle mode"); - - self.state = RunnerState::Idle { - start_time: Instant::now(), - last_frame_time: None, - gen: FrameGenerator::from_stream(src_video_stream, src_audio_stream)?, - }; + self.switch_to_idle_mode(&config)?; } (RunnerState::Idle { start_time, .. }, true) => { // Check if we've been idle for more than 1 minute @@ -350,14 +521,7 @@ impl PipelineRunner { return Ok(false); } } - (RunnerState::Idle { .. }, false) => { - // Stream reconnected - info!("Stream reconnected, leaving idle mode"); - self.state = RunnerState::Normal; - } - (RunnerState::Normal, false) => { - // Normal operation continues - } + _ => {} } // Process based on current state @@ -365,41 +529,17 @@ impl PipelineRunner { RunnerState::Idle { gen, .. } => { let frame = gen.next()?; let stream = if (*frame).sample_rate > 0 { - self.demuxer.get_stream( - src_audio_stream - .context("frame generator created an audio frame with no src stream")? - .index, - )? + config + .audio_src + .context("got audio frame with no audio src?")? } else { - self.demuxer.get_stream(src_video_stream.index)? + config.video_src }; self.process_frame(&config, stream, frame)? } - RunnerState::Normal => { - // TODO: For copy streams, skip decoder - let frames = match self.decoder.decode_pkt(pkt) { - Ok(f) => f, - Err(e) => { - warn!("Error decoding frames, {e}"); - return Ok(true); - } - }; - - let mut egress_results = vec![]; - for (frame, stream) in frames { - // Adjust frame pts time without start_offset - // Egress streams don't have a start time offset - if (*stream).start_time != AV_NOPTS_VALUE { - (*frame).pts -= (*stream).start_time; - } - let results = self.process_frame(&config, stream, frame)?; - egress_results.extend(results); - } - - av_packet_free(&mut pkt); - egress_results - } + RunnerState::Normal => self.process_packet(pkt)?, }; + av_packet_free(&mut pkt); // egress results - process async operations without blocking if possible if !result.is_empty() { @@ -408,7 +548,7 @@ impl PipelineRunner { if let EgressResult::Segments { created, deleted } = er { if let Err(e) = self .overseer - .on_segments(&config.id, &created, &deleted) + .on_segments(&self.connection.id, &created, &deleted) .await { bail!("Failed to process segment {}", e.to_string()); @@ -428,15 +568,17 @@ impl PipelineRunner { Ok(true) } - unsafe fn setup(&mut self) -> Result<()> { - if self.info.is_some() { + fn setup(&mut self) -> Result<()> { + if self.config.is_some() { return Ok(()); } - let info = self.demuxer.probe_input()?; - + let info = unsafe { + self.demuxer + .probe_input() + .map_err(|e| anyhow!("Demuxer probe failed: {}", e))? + }; info!("{}", info); - // convert to internal type let i_info = IngressInfo { bitrate: info.bitrate, @@ -461,41 +603,23 @@ impl PipelineRunner { }) .collect(), }; - let cfg = self .handle .block_on(async { self.overseer.start_stream(&self.connection, &i_info).await })?; + let inputs: HashSet = cfg.variants.iter().map(|e| e.src_index()).collect(); + self.decoder.enable_hw_decoder_any(); + for input_idx in inputs { + let stream = info.streams.iter().find(|f| f.index == input_idx).unwrap(); + self.decoder.setup_decoder(stream, None)?; + } + self.setup_encoders(&cfg)?; + info!("{}", cfg); self.config = Some(cfg); - self.info = Some(i_info); - - self.setup_pipeline(&info)?; Ok(()) } - unsafe fn setup_pipeline(&mut self, demux_info: &DemuxerInfo) -> Result<()> { - let cfg = if let Some(ref cfg) = self.config { - cfg - } else { - bail!("Cannot setup pipeline without config"); - }; - - // src stream indexes - let inputs: HashSet = cfg.variants.iter().map(|e| e.src_index()).collect(); - - // enable hardware decoding - self.decoder.enable_hw_decoder_any(); - - // setup decoders - for input_idx in inputs { - let stream = demux_info - .streams - .iter() - .find(|f| f.index == input_idx) - .unwrap(); - self.decoder.setup_decoder(stream, None)?; - } - + fn setup_encoders(&mut self, cfg: &PipelineConfig) -> Result<()> { // setup scaler/encoders for out_stream in &cfg.variants { match out_stream { @@ -505,7 +629,7 @@ impl PipelineRunner { } VariantStream::Audio(a) => { let enc = a.try_into()?; - let fmt = av_get_sample_fmt(cstr!(a.sample_fmt.as_str())); + let fmt = unsafe { av_get_sample_fmt(cstr!(a.sample_fmt.as_str())) }; let rs = Resample::new(fmt, a.sample_rate as _, a.channels as _); let f = AudioFifo::new(fmt, a.channels as _)?; self.resampler.insert(out_stream.id(), (rs, f)); @@ -530,12 +654,17 @@ impl PipelineRunner { }); match e { EgressType::HLS(_) => { - let hls = - HlsEgress::new(&cfg.id, &self.out_dir, 2.0, encoders, SegmentType::MPEGTS)?; + let hls = HlsEgress::new( + &self.connection.id, + &self.out_dir, + 2.0, // TODO: configure segment length + encoders, + SegmentType::MPEGTS, + )?; self.egress.push(Box::new(hls)); } EgressType::Recorder(_) => { - let rec = RecorderEgress::new(&cfg.id, &self.out_dir, encoders)?; + let rec = RecorderEgress::new(&self.connection.id, &self.out_dir, encoders)?; self.egress.push(Box::new(rec)); } _ => warn!("{} is not implemented", e), diff --git a/crates/core/src/viewer.rs b/crates/core/src/viewer.rs index f1d107f..b1d2edb 100644 --- a/crates/core/src/viewer.rs +++ b/crates/core/src/viewer.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use uuid::Uuid; use tokio::task; use log::debug; use sha2::{Digest, Sha256}; diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 207910b..bc9fcdb 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -43,8 +43,6 @@ struct ActiveStreamInfo { /// zap.stream NIP-53 overseer #[derive(Clone)] pub struct ZapStreamOverseer { - /// Dir where HTTP server serves files from - out_dir: String, /// Database instance for accounts/streams db: ZapStreamDb, /// LND node connection @@ -68,7 +66,6 @@ pub struct ZapStreamOverseer { impl ZapStreamOverseer { pub async fn new( - out_dir: &String, public_url: &String, private_key: &str, db: &str, @@ -114,7 +111,6 @@ impl ZapStreamOverseer { client.connect().await; let overseer = Self { - out_dir: out_dir.clone(), db, lnd, client, @@ -367,7 +363,7 @@ impl Overseer for ZapStreamOverseer { variants: cfg.variants.iter().map(|v| v.id()).collect(), })); - let stream_id = Uuid::new_v4(); + let stream_id = connection.id.clone(); // insert new stream record let mut new_stream = UserStream { id: stream_id.to_string(), @@ -394,7 +390,6 @@ impl Overseer for ZapStreamOverseer { self.db.update_stream(&new_stream).await?; Ok(PipelineConfig { - id: stream_id, variants: cfg.variants, egress, ingress_info: stream_info.clone(), @@ -545,7 +540,7 @@ struct EndpointConfig<'a> { fn get_variants_from_endpoint<'a>( info: &'a IngressInfo, - endpoint: &zap_stream_db::IngestEndpoint, + endpoint: &IngestEndpoint, ) -> Result> { let capabilities_str = endpoint.capabilities.as_deref().unwrap_or(""); let capabilities: Vec<&str> = capabilities_str.split(',').collect(); diff --git a/crates/zap-stream/src/settings.rs b/crates/zap-stream/src/settings.rs index 602b4f8..eb60baf 100644 --- a/crates/zap-stream/src/settings.rs +++ b/crates/zap-stream/src/settings.rs @@ -70,7 +70,6 @@ impl Settings { blossom, } => Ok(Arc::new( ZapStreamOverseer::new( - &self.output_dir, &self.public_url, private_key, database,