diff --git a/crates/core/src/pipeline/mod.rs b/crates/core/src/pipeline/mod.rs index fde15c4..57079ea 100644 --- a/crates/core/src/pipeline/mod.rs +++ b/crates/core/src/pipeline/mod.rs @@ -1,11 +1,13 @@ use std::fmt::{Display, Formatter}; use crate::egress::EgressConfig; +use crate::overseer::IngressInfo; use crate::variant::VariantStream; use serde::{Deserialize, Serialize}; use uuid::Uuid; pub mod runner; +pub mod placeholder; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum EgressType { @@ -46,6 +48,8 @@ pub struct PipelineConfig { pub variants: Vec, /// Output muxers pub egress: Vec, + /// Source stream information for placeholder generation + pub ingress_info: Option, } impl Display for PipelineConfig { diff --git a/crates/core/src/pipeline/placeholder.rs b/crates/core/src/pipeline/placeholder.rs new file mode 100644 index 0000000..cfc26d2 --- /dev/null +++ b/crates/core/src/pipeline/placeholder.rs @@ -0,0 +1,188 @@ +use anyhow::{bail, Result}; +use crate::variant::video::VideoVariant; +use crate::variant::audio::AudioVariant; +use crate::overseer::{IngressStream, IngressStreamType}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_frame_alloc, av_frame_get_buffer, av_frame_free, av_get_sample_fmt, AVFrame, + AVPixelFormat, AVSampleFormat +}; +use std::ffi::CString; + +/// Placeholder frame generator for idle mode when stream disconnects +pub struct PlaceholderGenerator; + +impl PlaceholderGenerator { + /// Generate a placeholder video frame based on ingress stream info + pub unsafe fn generate_video_frame_from_stream( + stream: &IngressStream, + stream_time_base: (i32, i32), + frame_index: u64 + ) -> Result<*mut AVFrame> { + let frame = av_frame_alloc(); + if frame.is_null() { + bail!("Failed to allocate placeholder video frame"); + } + + (*frame).format = AVPixelFormat::AV_PIX_FMT_YUV420P as i32; + (*frame).width = stream.width as i32; + (*frame).height = stream.height as i32; + (*frame).time_base.num = stream_time_base.0; + (*frame).time_base.den = stream_time_base.1; + + // Set PTS based on frame rate and total frame index + let fps = if stream.fps > 0.0 { stream.fps } else { 30.0 }; + let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; + (*frame).pts = (frame_index as f64 / fps / time_base_f64) as i64; + + if av_frame_get_buffer(frame, 0) < 0 { + av_frame_free(&mut frame); + bail!("Failed to allocate buffer for placeholder video frame"); + } + + // Fill with black (Y=16, U=V=128 for limited range YUV420P) + let y_size = ((*frame).width * (*frame).height) as usize; + let uv_size = y_size / 4; + + if !(*frame).data[0].is_null() { + std::ptr::write_bytes((*frame).data[0], 16, y_size); + } + if !(*frame).data[1].is_null() { + std::ptr::write_bytes((*frame).data[1], 128, uv_size); + } + if !(*frame).data[2].is_null() { + std::ptr::write_bytes((*frame).data[2], 128, uv_size); + } + + Ok(frame) + } + + /// Generate a placeholder audio frame based on ingress stream info + pub unsafe fn generate_audio_frame_from_stream( + stream: &IngressStream, + stream_time_base: (i32, i32), + frame_index: u64, + sample_fmt: &str, + channels: u32 + ) -> Result<*mut AVFrame> { + let frame = av_frame_alloc(); + if frame.is_null() { + bail!("Failed to allocate placeholder audio frame"); + } + + // Use the provided sample format + let sample_fmt_cstr = CString::new(sample_fmt) + .map_err(|_| anyhow::anyhow!("Invalid sample format string"))?; + let sample_fmt_int = av_get_sample_fmt(sample_fmt_cstr.as_ptr()); + (*frame).format = sample_fmt_int; + (*frame).channels = channels as i32; + (*frame).sample_rate = stream.sample_rate as i32; + (*frame).nb_samples = 1024; // Standard audio frame size + (*frame).time_base.num = stream_time_base.0; + (*frame).time_base.den = stream_time_base.1; + + // Set PTS based on sample rate and frame index + let samples_per_second = stream.sample_rate as f64; + let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; + (*frame).pts = ((frame_index * 1024) as f64 / samples_per_second / time_base_f64) as i64; + + if av_frame_get_buffer(frame, 0) < 0 { + av_frame_free(&mut frame); + bail!("Failed to allocate buffer for placeholder audio frame"); + } + + // Fill with silence (zeros) + for i in 0..8 { + if !(*frame).data[i].is_null() && (*frame).linesize[i] > 0 { + std::ptr::write_bytes((*frame).data[i], 0, (*frame).linesize[i] as usize); + } + } + + Ok(frame) + } + + /// Generate a placeholder black video frame + pub unsafe fn generate_video_frame( + variant: &VideoVariant, + stream_time_base: (i32, i32), + frame_index: u64 + ) -> Result<*mut AVFrame> { + let frame = av_frame_alloc(); + if frame.is_null() { + bail!("Failed to allocate placeholder video frame"); + } + + (*frame).format = AVPixelFormat::AV_PIX_FMT_YUV420P as i32; + (*frame).width = variant.width as i32; + (*frame).height = variant.height as i32; + (*frame).time_base.num = stream_time_base.0; + (*frame).time_base.den = stream_time_base.1; + + // Set PTS based on frame rate and total frame index + let fps = if variant.fps > 0.0 { variant.fps } else { 30.0 }; + let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; + (*frame).pts = (frame_index as f64 / fps / time_base_f64) as i64; + + if av_frame_get_buffer(frame, 0) < 0 { + av_frame_free(&mut frame); + bail!("Failed to allocate buffer for placeholder video frame"); + } + + // Fill with black (Y=16, U=V=128 for limited range YUV420P) + let y_size = ((*frame).width * (*frame).height) as usize; + let uv_size = y_size / 4; + + if !(*frame).data[0].is_null() { + std::ptr::write_bytes((*frame).data[0], 16, y_size); + } + if !(*frame).data[1].is_null() { + std::ptr::write_bytes((*frame).data[1], 128, uv_size); + } + if !(*frame).data[2].is_null() { + std::ptr::write_bytes((*frame).data[2], 128, uv_size); + } + + Ok(frame) + } + + /// Generate a placeholder silent audio frame + pub unsafe fn generate_audio_frame( + variant: &AudioVariant, + stream_time_base: (i32, i32), + frame_index: u64 + ) -> Result<*mut AVFrame> { + let frame = av_frame_alloc(); + if frame.is_null() { + bail!("Failed to allocate placeholder audio frame"); + } + + // Use the sample format from the variant configuration + let sample_fmt_cstr = CString::new(variant.sample_fmt.as_str()) + .map_err(|_| anyhow::anyhow!("Invalid sample format string"))?; + let sample_fmt_int = av_get_sample_fmt(sample_fmt_cstr.as_ptr()); + (*frame).format = sample_fmt_int; + (*frame).channels = variant.channels as i32; + (*frame).sample_rate = variant.sample_rate as i32; + (*frame).nb_samples = 1024; // Standard audio frame size + (*frame).time_base.num = stream_time_base.0; + (*frame).time_base.den = stream_time_base.1; + + // Set PTS based on sample rate and frame index + let samples_per_second = variant.sample_rate as f64; + let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64; + (*frame).pts = ((frame_index * 1024) as f64 / samples_per_second / time_base_f64) as i64; + + if av_frame_get_buffer(frame, 0) < 0 { + av_frame_free(&mut frame); + bail!("Failed to allocate buffer for placeholder audio frame"); + } + + // Fill with silence (zeros) + for i in 0..8 { + if !(*frame).data[i].is_null() && (*frame).linesize[i] > 0 { + std::ptr::write_bytes((*frame).data[i], 0, (*frame).linesize[i] as usize); + } + } + + Ok(frame) + } +} \ No newline at end of file diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index bf94fbf..3d9288b 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -5,7 +5,7 @@ use std::ops::Sub; use std::path::PathBuf; use std::ptr; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use crate::egress::hls::HlsEgress; use crate::egress::recorder::RecorderEgress; @@ -15,6 +15,7 @@ use crate::mux::SegmentType; use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use crate::pipeline::{EgressType, PipelineConfig}; use crate::variant::{StreamMapping, VariantStream}; +use crate::pipeline::placeholder::PlaceholderGenerator; use anyhow::{bail, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; @@ -30,6 +31,19 @@ use log::{error, info, warn}; use tokio::runtime::Handle; use uuid::Uuid; +/// Runner state for handling normal vs idle modes +#[derive(Debug, Clone)] +pub enum RunnerState { + /// Normal operation - processing live stream + Normal, + /// Idle mode - generating placeholder content after disconnection + Idle { + start_time: Instant, + variant_index: usize, + last_frame_time: Option, + }, +} + /// Pipeline runner is the main entry process for stream transcoding /// /// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread @@ -80,6 +94,9 @@ pub struct PipelineRunner { /// Thumbnail generation interval (0 = disabled) thumb_interval: u64, + + /// Current runner state (normal or idle) + state: RunnerState, } impl PipelineRunner { @@ -108,9 +125,148 @@ impl PipelineRunner { fps_last_frame_ctr: 0, info: None, thumb_interval: 1800, // Disable thumbnails by default for performance + state: RunnerState::Normal, }) } + /// Process a single idle frame - generates one source frame and processes it through all variants + unsafe fn process_single_idle_frame(&mut self, config: &PipelineConfig) -> Result<()> { + use std::time::{Duration, Instant}; + + if config.variants.is_empty() { + return Ok(()); + } + + // Extract timing info from current state + let (mut last_frame_time, variant_index) = match &mut self.state { + RunnerState::Idle { last_frame_time, variant_index, .. } => (last_frame_time, variant_index), + _ => return Ok(()), // Only process in idle state + }; + + // Time-based frame rate calculation + let now = Instant::now(); + if let Some(last_time) = *last_frame_time { + // Calculate target frame interval (assume 30fps for now) + let target_interval = Duration::from_millis(33); // ~30fps + let elapsed = now.duration_since(last_time); + + if elapsed < target_interval { + // Not time for next frame yet + std::thread::sleep(target_interval - elapsed); + } + } + *last_frame_time = Some(Instant::now()); + + // Get source video stream info from stored ingress info + let video_stream = config.ingress_info.as_ref() + .and_then(|info| info.streams.iter().find(|s| matches!(s.stream_type, crate::overseer::IngressStreamType::Video))); + + let mut egress_results = vec![]; + + // Generate one source frame and process it through all relevant variants + if let Some(stream) = video_stream { + // Generate a single source placeholder video frame based on original stream properties + let fps = if stream.fps > 0.0 { stream.fps } else { 30.0 }; + let time_base = (1, fps as i32); + let mut source_frame = PlaceholderGenerator::generate_video_frame_from_stream(stream, time_base, self.frame_ctr)?; + + // Set the frame time_base + (*source_frame).time_base.num = time_base.0; + (*source_frame).time_base.den = time_base.1; + + // Increment frame counter for all video processing + self.frame_ctr += 1; + + // Process this single frame through all video variants (like normal pipeline) + for variant in &config.variants { + if let VariantStream::Video(v) = variant { + // Scale/encode the source frame for this variant + if let Some(enc) = self.encoders.get_mut(&v.id()) { + // Use scaler if needed for different resolutions + let frame_to_encode = if v.width as i32 == (*source_frame).width && + v.height as i32 == (*source_frame).height { + // Same resolution, use source frame directly + source_frame + } else { + // Different resolution, need to scale + if let Some(scaler) = self.scalers.get_mut(&v.id()) { + scaler.process_frame(source_frame, v.width, v.height, AV_PIX_FMT_YUV420P)? + } else { + source_frame // Fallback to source frame + } + }; + + let packets = enc.encode_frame(frame_to_encode)?; + for mut pkt in packets { + for eg in self.egress.iter_mut() { + let er = eg.process_pkt(pkt, &v.id())?; + egress_results.push(er); + } + av_packet_free(&mut pkt); + } + } + } + } + + av_frame_free(&mut source_frame); + } + + // Generate and process audio frames separately (audio doesn't share like video) + let audio_stream = config.ingress_info.as_ref() + .and_then(|info| info.streams.iter().find(|s| matches!(s.stream_type, crate::overseer::IngressStreamType::Audio))); + + for variant in &config.variants { + if let VariantStream::Audio(a) = variant { + let time_base = (1, a.sample_rate as i32); + let mut frame = if let Some(stream) = audio_stream { + // Use original stream properties for placeholder generation + PlaceholderGenerator::generate_audio_frame_from_stream(stream, time_base, self.frame_ctr, &a.sample_fmt, a.channels)? + } else { + // Fallback to variant properties if no stream info available + PlaceholderGenerator::generate_audio_frame(a, time_base, self.frame_ctr)? + }; + + // Set the frame time_base + (*frame).time_base.num = time_base.0; + (*frame).time_base.den = time_base.1; + + // Process through the encoding pipeline + if let Some(enc) = self.encoders.get_mut(&a.id()) { + let packets = enc.encode_frame(frame)?; + for mut pkt in packets { + for eg in self.egress.iter_mut() { + let er = eg.process_pkt(pkt, &a.id())?; + egress_results.push(er); + } + av_packet_free(&mut pkt); + } + } + + av_frame_free(&mut frame); + } + } + + // Handle egress results (same as normal processing) + if !egress_results.is_empty() { + self.handle.block_on(async { + for er in egress_results { + if let EgressResult::Segments { created, deleted } = er { + if let Err(e) = self + .overseer + .on_segments(&config.id, &created, &deleted) + .await + { + bail!("Failed to process segment {}", e.to_string()); + } + } + } + Ok(()) + })?; + } + + Ok(()) + } + /// EOF, cleanup pub unsafe fn flush(&mut self) -> Result<()> { for (var, enc) in &mut self.encoders { @@ -147,22 +303,67 @@ impl PipelineRunner { }; // run transcoder pipeline - let (mut pkt, _stream) = self.demuxer.get_packet()?; - if pkt.is_null() { - return Ok(false); - } - - // TODO: For copy streams, skip decoder - let frames = match self.decoder.decode_pkt(pkt) { - Ok(f) => f, - Err(e) => { - warn!("Error decoding frames, {e}"); - return Ok(true); + let (mut pkt, stream_info) = self.demuxer.get_packet()?; + + // Handle state transitions based on packet availability + match (&self.state, pkt.is_null()) { + (RunnerState::Normal, true) => { + // First time entering idle mode + info!("Stream input disconnected, entering idle mode with placeholder content"); + self.state = RunnerState::Idle { + start_time: Instant::now(), + variant_index: 0, + last_frame_time: None, + }; } - }; + (RunnerState::Idle { start_time, .. }, true) => { + // Check if we've been idle for more than 1 minute + if start_time.elapsed() > Duration::from_secs(60) { + info!("Idle timeout reached (60 seconds), ending stream"); + return Ok(false); + } + } + (RunnerState::Idle { .. }, false) => { + // Stream reconnected + info!("Stream reconnected, leaving idle mode"); + self.state = RunnerState::Normal; + } + (RunnerState::Normal, false) => { + // Normal operation continues + } + } + + // Process based on current state + match &self.state { + RunnerState::Idle { .. } => { + // Process a single idle frame (rotating through variants) + self.process_single_idle_frame(config)?; + + // Free the null packet if needed + if !pkt.is_null() { + av_packet_free(&mut pkt); + } + + return Ok(true); // Continue processing + } + RunnerState::Normal => { + // Normal packet processing + if pkt.is_null() { + // This shouldn't happen in Normal state but handle gracefully + return Ok(true); + } + + // TODO: For copy streams, skip decoder + let frames = match self.decoder.decode_pkt(pkt) { + Ok(f) => f, + Err(e) => { + warn!("Error decoding frames, {e}"); + return Ok(true); + } + }; - let mut egress_results = vec![]; - for (frame, stream) in frames { + let mut egress_results = vec![]; + for (frame, stream) in frames { // Copy frame from GPU if using hwaccel decoding let mut frame = get_frame_from_hw(frame)?; (*frame).time_base = (*stream).time_base; @@ -309,6 +510,8 @@ impl PipelineRunner { info!("Average fps: {:.2}", n_frames as f32 / elapsed); self.fps_counter_start = Instant::now(); self.fps_last_frame_ctr = self.frame_ctr; + } + } // Close the RunnerState::Normal match arm } Ok(true) } @@ -344,9 +547,13 @@ impl PipelineRunner { .collect(), }; - let cfg = self + let mut cfg = self .handle .block_on(async { self.overseer.start_stream(&self.connection, &i_info).await })?; + + // Store ingress info in config for placeholder generation + cfg.ingress_info = Some(i_info.clone()); + self.config = Some(cfg); self.info = Some(i_info);