mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 08:59:35 +00:00
Implement idle mode with placeholder content when stream disconnects (#3)
Some checks failed
continuous-integration/drone Build is failing
Some checks failed
continuous-integration/drone Build is failing
* Initial plan for issue * Implement placeholder frame generation and idle mode logic Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Add frame rate throttling for idle mode processing Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Refactor placeholder generation into separate module and simplify idle mode approach Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Address PR review feedback: improve parameter naming, fix memory leaks, refactor state management, and fix frame generation approach Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Store demuxer info in PipelineConfig for placeholder generation Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: v0l <1172179+v0l@users.noreply.github.com>
This commit is contained in:
@ -1,11 +1,13 @@
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use crate::egress::EgressConfig;
|
||||
use crate::overseer::IngressInfo;
|
||||
use crate::variant::VariantStream;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub mod runner;
|
||||
pub mod placeholder;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum EgressType {
|
||||
@ -46,6 +48,8 @@ pub struct PipelineConfig {
|
||||
pub variants: Vec<VariantStream>,
|
||||
/// Output muxers
|
||||
pub egress: Vec<EgressType>,
|
||||
/// Source stream information for placeholder generation
|
||||
pub ingress_info: Option<IngressInfo>,
|
||||
}
|
||||
|
||||
impl Display for PipelineConfig {
|
||||
|
188
crates/core/src/pipeline/placeholder.rs
Normal file
188
crates/core/src/pipeline/placeholder.rs
Normal file
@ -0,0 +1,188 @@
|
||||
use anyhow::{bail, Result};
|
||||
use crate::variant::video::VideoVariant;
|
||||
use crate::variant::audio::AudioVariant;
|
||||
use crate::overseer::{IngressStream, IngressStreamType};
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
||||
av_frame_alloc, av_frame_get_buffer, av_frame_free, av_get_sample_fmt, AVFrame,
|
||||
AVPixelFormat, AVSampleFormat
|
||||
};
|
||||
use std::ffi::CString;
|
||||
|
||||
/// Placeholder frame generator for idle mode when stream disconnects
|
||||
pub struct PlaceholderGenerator;
|
||||
|
||||
impl PlaceholderGenerator {
|
||||
/// Generate a placeholder video frame based on ingress stream info
|
||||
pub unsafe fn generate_video_frame_from_stream(
|
||||
stream: &IngressStream,
|
||||
stream_time_base: (i32, i32),
|
||||
frame_index: u64
|
||||
) -> Result<*mut AVFrame> {
|
||||
let frame = av_frame_alloc();
|
||||
if frame.is_null() {
|
||||
bail!("Failed to allocate placeholder video frame");
|
||||
}
|
||||
|
||||
(*frame).format = AVPixelFormat::AV_PIX_FMT_YUV420P as i32;
|
||||
(*frame).width = stream.width as i32;
|
||||
(*frame).height = stream.height as i32;
|
||||
(*frame).time_base.num = stream_time_base.0;
|
||||
(*frame).time_base.den = stream_time_base.1;
|
||||
|
||||
// Set PTS based on frame rate and total frame index
|
||||
let fps = if stream.fps > 0.0 { stream.fps } else { 30.0 };
|
||||
let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64;
|
||||
(*frame).pts = (frame_index as f64 / fps / time_base_f64) as i64;
|
||||
|
||||
if av_frame_get_buffer(frame, 0) < 0 {
|
||||
av_frame_free(&mut frame);
|
||||
bail!("Failed to allocate buffer for placeholder video frame");
|
||||
}
|
||||
|
||||
// Fill with black (Y=16, U=V=128 for limited range YUV420P)
|
||||
let y_size = ((*frame).width * (*frame).height) as usize;
|
||||
let uv_size = y_size / 4;
|
||||
|
||||
if !(*frame).data[0].is_null() {
|
||||
std::ptr::write_bytes((*frame).data[0], 16, y_size);
|
||||
}
|
||||
if !(*frame).data[1].is_null() {
|
||||
std::ptr::write_bytes((*frame).data[1], 128, uv_size);
|
||||
}
|
||||
if !(*frame).data[2].is_null() {
|
||||
std::ptr::write_bytes((*frame).data[2], 128, uv_size);
|
||||
}
|
||||
|
||||
Ok(frame)
|
||||
}
|
||||
|
||||
/// Generate a placeholder audio frame based on ingress stream info
|
||||
pub unsafe fn generate_audio_frame_from_stream(
|
||||
stream: &IngressStream,
|
||||
stream_time_base: (i32, i32),
|
||||
frame_index: u64,
|
||||
sample_fmt: &str,
|
||||
channels: u32
|
||||
) -> Result<*mut AVFrame> {
|
||||
let frame = av_frame_alloc();
|
||||
if frame.is_null() {
|
||||
bail!("Failed to allocate placeholder audio frame");
|
||||
}
|
||||
|
||||
// Use the provided sample format
|
||||
let sample_fmt_cstr = CString::new(sample_fmt)
|
||||
.map_err(|_| anyhow::anyhow!("Invalid sample format string"))?;
|
||||
let sample_fmt_int = av_get_sample_fmt(sample_fmt_cstr.as_ptr());
|
||||
(*frame).format = sample_fmt_int;
|
||||
(*frame).channels = channels as i32;
|
||||
(*frame).sample_rate = stream.sample_rate as i32;
|
||||
(*frame).nb_samples = 1024; // Standard audio frame size
|
||||
(*frame).time_base.num = stream_time_base.0;
|
||||
(*frame).time_base.den = stream_time_base.1;
|
||||
|
||||
// Set PTS based on sample rate and frame index
|
||||
let samples_per_second = stream.sample_rate as f64;
|
||||
let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64;
|
||||
(*frame).pts = ((frame_index * 1024) as f64 / samples_per_second / time_base_f64) as i64;
|
||||
|
||||
if av_frame_get_buffer(frame, 0) < 0 {
|
||||
av_frame_free(&mut frame);
|
||||
bail!("Failed to allocate buffer for placeholder audio frame");
|
||||
}
|
||||
|
||||
// Fill with silence (zeros)
|
||||
for i in 0..8 {
|
||||
if !(*frame).data[i].is_null() && (*frame).linesize[i] > 0 {
|
||||
std::ptr::write_bytes((*frame).data[i], 0, (*frame).linesize[i] as usize);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(frame)
|
||||
}
|
||||
|
||||
/// Generate a placeholder black video frame
|
||||
pub unsafe fn generate_video_frame(
|
||||
variant: &VideoVariant,
|
||||
stream_time_base: (i32, i32),
|
||||
frame_index: u64
|
||||
) -> Result<*mut AVFrame> {
|
||||
let frame = av_frame_alloc();
|
||||
if frame.is_null() {
|
||||
bail!("Failed to allocate placeholder video frame");
|
||||
}
|
||||
|
||||
(*frame).format = AVPixelFormat::AV_PIX_FMT_YUV420P as i32;
|
||||
(*frame).width = variant.width as i32;
|
||||
(*frame).height = variant.height as i32;
|
||||
(*frame).time_base.num = stream_time_base.0;
|
||||
(*frame).time_base.den = stream_time_base.1;
|
||||
|
||||
// Set PTS based on frame rate and total frame index
|
||||
let fps = if variant.fps > 0.0 { variant.fps } else { 30.0 };
|
||||
let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64;
|
||||
(*frame).pts = (frame_index as f64 / fps / time_base_f64) as i64;
|
||||
|
||||
if av_frame_get_buffer(frame, 0) < 0 {
|
||||
av_frame_free(&mut frame);
|
||||
bail!("Failed to allocate buffer for placeholder video frame");
|
||||
}
|
||||
|
||||
// Fill with black (Y=16, U=V=128 for limited range YUV420P)
|
||||
let y_size = ((*frame).width * (*frame).height) as usize;
|
||||
let uv_size = y_size / 4;
|
||||
|
||||
if !(*frame).data[0].is_null() {
|
||||
std::ptr::write_bytes((*frame).data[0], 16, y_size);
|
||||
}
|
||||
if !(*frame).data[1].is_null() {
|
||||
std::ptr::write_bytes((*frame).data[1], 128, uv_size);
|
||||
}
|
||||
if !(*frame).data[2].is_null() {
|
||||
std::ptr::write_bytes((*frame).data[2], 128, uv_size);
|
||||
}
|
||||
|
||||
Ok(frame)
|
||||
}
|
||||
|
||||
/// Generate a placeholder silent audio frame
|
||||
pub unsafe fn generate_audio_frame(
|
||||
variant: &AudioVariant,
|
||||
stream_time_base: (i32, i32),
|
||||
frame_index: u64
|
||||
) -> Result<*mut AVFrame> {
|
||||
let frame = av_frame_alloc();
|
||||
if frame.is_null() {
|
||||
bail!("Failed to allocate placeholder audio frame");
|
||||
}
|
||||
|
||||
// Use the sample format from the variant configuration
|
||||
let sample_fmt_cstr = CString::new(variant.sample_fmt.as_str())
|
||||
.map_err(|_| anyhow::anyhow!("Invalid sample format string"))?;
|
||||
let sample_fmt_int = av_get_sample_fmt(sample_fmt_cstr.as_ptr());
|
||||
(*frame).format = sample_fmt_int;
|
||||
(*frame).channels = variant.channels as i32;
|
||||
(*frame).sample_rate = variant.sample_rate as i32;
|
||||
(*frame).nb_samples = 1024; // Standard audio frame size
|
||||
(*frame).time_base.num = stream_time_base.0;
|
||||
(*frame).time_base.den = stream_time_base.1;
|
||||
|
||||
// Set PTS based on sample rate and frame index
|
||||
let samples_per_second = variant.sample_rate as f64;
|
||||
let time_base_f64 = stream_time_base.0 as f64 / stream_time_base.1 as f64;
|
||||
(*frame).pts = ((frame_index * 1024) as f64 / samples_per_second / time_base_f64) as i64;
|
||||
|
||||
if av_frame_get_buffer(frame, 0) < 0 {
|
||||
av_frame_free(&mut frame);
|
||||
bail!("Failed to allocate buffer for placeholder audio frame");
|
||||
}
|
||||
|
||||
// Fill with silence (zeros)
|
||||
for i in 0..8 {
|
||||
if !(*frame).data[i].is_null() && (*frame).linesize[i] > 0 {
|
||||
std::ptr::write_bytes((*frame).data[i], 0, (*frame).linesize[i] as usize);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(frame)
|
||||
}
|
||||
}
|
@ -5,7 +5,7 @@ use std::ops::Sub;
|
||||
use std::path::PathBuf;
|
||||
use std::ptr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::egress::hls::HlsEgress;
|
||||
use crate::egress::recorder::RecorderEgress;
|
||||
@ -15,6 +15,7 @@ use crate::mux::SegmentType;
|
||||
use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
|
||||
use crate::pipeline::{EgressType, PipelineConfig};
|
||||
use crate::variant::{StreamMapping, VariantStream};
|
||||
use crate::pipeline::placeholder::PlaceholderGenerator;
|
||||
use anyhow::{bail, Result};
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP;
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
|
||||
@ -30,6 +31,19 @@ use log::{error, info, warn};
|
||||
use tokio::runtime::Handle;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Runner state for handling normal vs idle modes
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RunnerState {
|
||||
/// Normal operation - processing live stream
|
||||
Normal,
|
||||
/// Idle mode - generating placeholder content after disconnection
|
||||
Idle {
|
||||
start_time: Instant,
|
||||
variant_index: usize,
|
||||
last_frame_time: Option<Instant>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Pipeline runner is the main entry process for stream transcoding
|
||||
///
|
||||
/// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread
|
||||
@ -80,6 +94,9 @@ pub struct PipelineRunner {
|
||||
|
||||
/// Thumbnail generation interval (0 = disabled)
|
||||
thumb_interval: u64,
|
||||
|
||||
/// Current runner state (normal or idle)
|
||||
state: RunnerState,
|
||||
}
|
||||
|
||||
impl PipelineRunner {
|
||||
@ -108,9 +125,148 @@ impl PipelineRunner {
|
||||
fps_last_frame_ctr: 0,
|
||||
info: None,
|
||||
thumb_interval: 1800, // Disable thumbnails by default for performance
|
||||
state: RunnerState::Normal,
|
||||
})
|
||||
}
|
||||
|
||||
/// Process a single idle frame - generates one source frame and processes it through all variants
|
||||
unsafe fn process_single_idle_frame(&mut self, config: &PipelineConfig) -> Result<()> {
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
if config.variants.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Extract timing info from current state
|
||||
let (mut last_frame_time, variant_index) = match &mut self.state {
|
||||
RunnerState::Idle { last_frame_time, variant_index, .. } => (last_frame_time, variant_index),
|
||||
_ => return Ok(()), // Only process in idle state
|
||||
};
|
||||
|
||||
// Time-based frame rate calculation
|
||||
let now = Instant::now();
|
||||
if let Some(last_time) = *last_frame_time {
|
||||
// Calculate target frame interval (assume 30fps for now)
|
||||
let target_interval = Duration::from_millis(33); // ~30fps
|
||||
let elapsed = now.duration_since(last_time);
|
||||
|
||||
if elapsed < target_interval {
|
||||
// Not time for next frame yet
|
||||
std::thread::sleep(target_interval - elapsed);
|
||||
}
|
||||
}
|
||||
*last_frame_time = Some(Instant::now());
|
||||
|
||||
// Get source video stream info from stored ingress info
|
||||
let video_stream = config.ingress_info.as_ref()
|
||||
.and_then(|info| info.streams.iter().find(|s| matches!(s.stream_type, crate::overseer::IngressStreamType::Video)));
|
||||
|
||||
let mut egress_results = vec![];
|
||||
|
||||
// Generate one source frame and process it through all relevant variants
|
||||
if let Some(stream) = video_stream {
|
||||
// Generate a single source placeholder video frame based on original stream properties
|
||||
let fps = if stream.fps > 0.0 { stream.fps } else { 30.0 };
|
||||
let time_base = (1, fps as i32);
|
||||
let mut source_frame = PlaceholderGenerator::generate_video_frame_from_stream(stream, time_base, self.frame_ctr)?;
|
||||
|
||||
// Set the frame time_base
|
||||
(*source_frame).time_base.num = time_base.0;
|
||||
(*source_frame).time_base.den = time_base.1;
|
||||
|
||||
// Increment frame counter for all video processing
|
||||
self.frame_ctr += 1;
|
||||
|
||||
// Process this single frame through all video variants (like normal pipeline)
|
||||
for variant in &config.variants {
|
||||
if let VariantStream::Video(v) = variant {
|
||||
// Scale/encode the source frame for this variant
|
||||
if let Some(enc) = self.encoders.get_mut(&v.id()) {
|
||||
// Use scaler if needed for different resolutions
|
||||
let frame_to_encode = if v.width as i32 == (*source_frame).width &&
|
||||
v.height as i32 == (*source_frame).height {
|
||||
// Same resolution, use source frame directly
|
||||
source_frame
|
||||
} else {
|
||||
// Different resolution, need to scale
|
||||
if let Some(scaler) = self.scalers.get_mut(&v.id()) {
|
||||
scaler.process_frame(source_frame, v.width, v.height, AV_PIX_FMT_YUV420P)?
|
||||
} else {
|
||||
source_frame // Fallback to source frame
|
||||
}
|
||||
};
|
||||
|
||||
let packets = enc.encode_frame(frame_to_encode)?;
|
||||
for mut pkt in packets {
|
||||
for eg in self.egress.iter_mut() {
|
||||
let er = eg.process_pkt(pkt, &v.id())?;
|
||||
egress_results.push(er);
|
||||
}
|
||||
av_packet_free(&mut pkt);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
av_frame_free(&mut source_frame);
|
||||
}
|
||||
|
||||
// Generate and process audio frames separately (audio doesn't share like video)
|
||||
let audio_stream = config.ingress_info.as_ref()
|
||||
.and_then(|info| info.streams.iter().find(|s| matches!(s.stream_type, crate::overseer::IngressStreamType::Audio)));
|
||||
|
||||
for variant in &config.variants {
|
||||
if let VariantStream::Audio(a) = variant {
|
||||
let time_base = (1, a.sample_rate as i32);
|
||||
let mut frame = if let Some(stream) = audio_stream {
|
||||
// Use original stream properties for placeholder generation
|
||||
PlaceholderGenerator::generate_audio_frame_from_stream(stream, time_base, self.frame_ctr, &a.sample_fmt, a.channels)?
|
||||
} else {
|
||||
// Fallback to variant properties if no stream info available
|
||||
PlaceholderGenerator::generate_audio_frame(a, time_base, self.frame_ctr)?
|
||||
};
|
||||
|
||||
// Set the frame time_base
|
||||
(*frame).time_base.num = time_base.0;
|
||||
(*frame).time_base.den = time_base.1;
|
||||
|
||||
// Process through the encoding pipeline
|
||||
if let Some(enc) = self.encoders.get_mut(&a.id()) {
|
||||
let packets = enc.encode_frame(frame)?;
|
||||
for mut pkt in packets {
|
||||
for eg in self.egress.iter_mut() {
|
||||
let er = eg.process_pkt(pkt, &a.id())?;
|
||||
egress_results.push(er);
|
||||
}
|
||||
av_packet_free(&mut pkt);
|
||||
}
|
||||
}
|
||||
|
||||
av_frame_free(&mut frame);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle egress results (same as normal processing)
|
||||
if !egress_results.is_empty() {
|
||||
self.handle.block_on(async {
|
||||
for er in egress_results {
|
||||
if let EgressResult::Segments { created, deleted } = er {
|
||||
if let Err(e) = self
|
||||
.overseer
|
||||
.on_segments(&config.id, &created, &deleted)
|
||||
.await
|
||||
{
|
||||
bail!("Failed to process segment {}", e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// EOF, cleanup
|
||||
pub unsafe fn flush(&mut self) -> Result<()> {
|
||||
for (var, enc) in &mut self.encoders {
|
||||
@ -147,22 +303,67 @@ impl PipelineRunner {
|
||||
};
|
||||
|
||||
// run transcoder pipeline
|
||||
let (mut pkt, _stream) = self.demuxer.get_packet()?;
|
||||
if pkt.is_null() {
|
||||
return Ok(false);
|
||||
let (mut pkt, stream_info) = self.demuxer.get_packet()?;
|
||||
|
||||
// Handle state transitions based on packet availability
|
||||
match (&self.state, pkt.is_null()) {
|
||||
(RunnerState::Normal, true) => {
|
||||
// First time entering idle mode
|
||||
info!("Stream input disconnected, entering idle mode with placeholder content");
|
||||
self.state = RunnerState::Idle {
|
||||
start_time: Instant::now(),
|
||||
variant_index: 0,
|
||||
last_frame_time: None,
|
||||
};
|
||||
}
|
||||
(RunnerState::Idle { start_time, .. }, true) => {
|
||||
// Check if we've been idle for more than 1 minute
|
||||
if start_time.elapsed() > Duration::from_secs(60) {
|
||||
info!("Idle timeout reached (60 seconds), ending stream");
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
(RunnerState::Idle { .. }, false) => {
|
||||
// Stream reconnected
|
||||
info!("Stream reconnected, leaving idle mode");
|
||||
self.state = RunnerState::Normal;
|
||||
}
|
||||
(RunnerState::Normal, false) => {
|
||||
// Normal operation continues
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: For copy streams, skip decoder
|
||||
let frames = match self.decoder.decode_pkt(pkt) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
warn!("Error decoding frames, {e}");
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
// Process based on current state
|
||||
match &self.state {
|
||||
RunnerState::Idle { .. } => {
|
||||
// Process a single idle frame (rotating through variants)
|
||||
self.process_single_idle_frame(config)?;
|
||||
|
||||
let mut egress_results = vec![];
|
||||
for (frame, stream) in frames {
|
||||
// Free the null packet if needed
|
||||
if !pkt.is_null() {
|
||||
av_packet_free(&mut pkt);
|
||||
}
|
||||
|
||||
return Ok(true); // Continue processing
|
||||
}
|
||||
RunnerState::Normal => {
|
||||
// Normal packet processing
|
||||
if pkt.is_null() {
|
||||
// This shouldn't happen in Normal state but handle gracefully
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// TODO: For copy streams, skip decoder
|
||||
let frames = match self.decoder.decode_pkt(pkt) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
warn!("Error decoding frames, {e}");
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
|
||||
let mut egress_results = vec![];
|
||||
for (frame, stream) in frames {
|
||||
// Copy frame from GPU if using hwaccel decoding
|
||||
let mut frame = get_frame_from_hw(frame)?;
|
||||
(*frame).time_base = (*stream).time_base;
|
||||
@ -309,6 +510,8 @@ impl PipelineRunner {
|
||||
info!("Average fps: {:.2}", n_frames as f32 / elapsed);
|
||||
self.fps_counter_start = Instant::now();
|
||||
self.fps_last_frame_ctr = self.frame_ctr;
|
||||
}
|
||||
} // Close the RunnerState::Normal match arm
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
@ -344,9 +547,13 @@ impl PipelineRunner {
|
||||
.collect(),
|
||||
};
|
||||
|
||||
let cfg = self
|
||||
let mut cfg = self
|
||||
.handle
|
||||
.block_on(async { self.overseer.start_stream(&self.connection, &i_info).await })?;
|
||||
|
||||
// Store ingress info in config for placeholder generation
|
||||
cfg.ingress_info = Some(i_info.clone());
|
||||
|
||||
self.config = Some(cfg);
|
||||
self.info = Some(i_info);
|
||||
|
||||
|
Reference in New Issue
Block a user