fix: rtmp ingest
All checks were successful
continuous-integration/drone Build is passing

fix: idle placeholder stream
This commit is contained in:
2025-06-12 14:56:59 +01:00
parent ad20fbc052
commit 09577cc2c8
8 changed files with 659 additions and 208 deletions

View File

@ -28,10 +28,13 @@ usvg = "0.45.1"
tiny-skia = "0.11.4"
fontdue = "0.9.2"
ringbuf = "0.4.7"
libc = "0.2.169"
# srt
srt-tokio = { version = "0.4.4", optional = true }
# rtmp
rml_rtmp = { version = "0.8.0", optional = true }
libc = "0.2.169"
bytes = "1.9.0"
xflv = "0.4.4"

View File

@ -5,13 +5,14 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_RGBA;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_channel_layout_default, av_frame_alloc, av_frame_free, av_frame_get_buffer, AVFrame,
AVPixelFormat, AVRational,
av_channel_layout_default, av_frame_alloc, av_frame_free, av_frame_get_buffer, av_q2d,
av_rescale_q, AVFrame, AVPixelFormat, AVRational, AVStream,
};
use ffmpeg_rs_raw::Scaler;
use fontdue::layout::{CoordinateSystem, Layout, TextStyle};
use fontdue::Font;
use std::mem::transmute;
use std::ops::Sub;
use std::time::{Duration, Instant};
use std::{ptr, slice};
@ -26,8 +27,12 @@ pub struct FrameGenerator {
audio_frame_size: i32,
audio_channels: u8,
frame_idx: u64,
audio_samples: u64,
video_pts: i64,
audio_pts: i64,
// Timebases for frame generation
video_timebase: AVRational,
audio_timebase: AVRational,
// internal
next_frame: *mut AVFrame,
@ -56,6 +61,8 @@ impl FrameGenerator {
sample_rate: u32,
frame_size: i32,
channels: u8,
video_timebase: AVRational,
audio_timebase: AVRational,
) -> Result<Self> {
let font = include_bytes!("../SourceCodePro-Regular.ttf") as &[u8];
let font = Font::from_bytes(font, Default::default()).unwrap();
@ -68,8 +75,10 @@ impl FrameGenerator {
audio_sample_rate: sample_rate,
audio_frame_size: frame_size,
audio_channels: channels,
frame_idx: 0,
audio_samples: 0,
video_pts: 0,
audio_pts: 0,
video_timebase,
audio_timebase,
font,
start: Instant::now(),
scaler: Scaler::default(),
@ -80,6 +89,26 @@ impl FrameGenerator {
pub fn from_stream(
video_stream: &IngressStream,
audio_stream: Option<&IngressStream>,
) -> Result<Self> {
Ok(Self::from_stream_with_timebase(
video_stream,
audio_stream,
AVRational {
num: 1,
den: video_stream.fps as i32,
},
audio_stream.map(|s| AVRational {
num: 1,
den: s.sample_rate as i32,
}),
)?)
}
pub fn from_stream_with_timebase(
video_stream: &IngressStream,
audio_stream: Option<&IngressStream>,
video_timebase: AVRational,
audio_timebase: Option<AVRational>,
) -> Result<Self> {
Ok(Self::new(
video_stream.fps,
@ -89,11 +118,72 @@ impl FrameGenerator {
audio_stream.map(|i| i.sample_rate as _).unwrap_or(0),
if audio_stream.is_none() { 0 } else { 1024 },
audio_stream.map(|i| i.channels as _).unwrap_or(0),
video_timebase,
audio_timebase.unwrap_or(AVRational { num: 1, den: 1 }),
)?)
}
pub unsafe fn from_av_streams(
video_stream: *const AVStream,
audio_stream: Option<*const AVStream>,
) -> Result<Self> {
if video_stream.is_null() {
bail!("Video stream cannot be null");
}
let video_codec_par = (*video_stream).codecpar;
let video_timebase = (*video_stream).time_base;
// Extract video stream properties
let width = (*video_codec_par).width as u16;
let height = (*video_codec_par).height as u16;
let pix_fmt = unsafe { transmute((*video_codec_par).format) };
// Calculate FPS from timebase
let fps = av_q2d((*video_stream).r_frame_rate) as f32;
// Extract audio stream properties if available
let (sample_rate, channels, audio_timebase) = if let Some(audio_stream) = audio_stream {
if !audio_stream.is_null() {
let audio_codec_par = (*audio_stream).codecpar;
let audio_tb = (*audio_stream).time_base;
(
(*audio_codec_par).sample_rate as u32,
(*audio_codec_par).ch_layout.nb_channels as u8,
audio_tb,
)
} else {
(0, 0, AVRational { num: 1, den: 44100 })
}
} else {
(0, 0, AVRational { num: 1, den: 44100 })
};
let frame_size = if sample_rate > 0 { 1024 } else { 0 };
Ok(Self::new(
fps,
width,
height,
pix_fmt,
sample_rate,
frame_size,
channels,
video_timebase,
audio_timebase,
)?)
}
pub fn frame_no(&self) -> u64 {
self.frame_idx
(self.video_pts / self.pts_per_frame()) as u64
}
/// Set the starting PTS values for video and audio
pub fn set_starting_pts(&mut self, video_pts: i64, audio_pts: i64) {
self.video_pts = video_pts;
self.audio_pts = audio_pts;
self.start = Instant::now().sub(Duration::from_secs_f64(
video_pts as f64 / self.pts_per_frame() as f64 / self.fps as f64,
));
}
/// Create a new frame for composing text / images
@ -112,12 +202,9 @@ impl FrameGenerator {
(*src_frame).colorspace = AVCOL_SPC_RGB;
//internally always use RGBA, we convert frame to target pixel format at the end
(*src_frame).format = AV_PIX_FMT_RGBA as _;
(*src_frame).pts = self.frame_idx as _;
(*src_frame).duration = 1;
(*src_frame).time_base = AVRational {
num: 1,
den: self.fps as i32,
};
(*src_frame).pts = self.video_pts;
(*src_frame).duration = self.pts_per_frame() as _;
(*src_frame).time_base = self.video_timebase;
if av_frame_get_buffer(src_frame, 0) < 0 {
av_frame_free(&mut src_frame);
bail!("Failed to get frame buffer");
@ -163,6 +250,19 @@ impl FrameGenerator {
Ok(())
}
pub unsafe fn fill_color(&mut self, color32: [u8; 4]) -> Result<()> {
if self.next_frame.is_null() {
bail!("Must call begin() before writing frame data")
}
let buf = slice::from_raw_parts_mut(
(*self.next_frame).data[0],
(self.width as usize * self.height as usize * 4) as usize,
);
for z in 0..(self.width as usize * self.height as usize) {
buf[z * 4..z * 4 + 4].copy_from_slice(&color32);
}
Ok(())
}
/// Copy data directly into the frame buffer (must be RGBA data)
pub unsafe fn copy_frame_data(&mut self, data: &[u8]) -> Result<()> {
if self.next_frame.is_null() {
@ -179,6 +279,15 @@ impl FrameGenerator {
Ok(())
}
fn pts_per_frame(&self) -> i64 {
self.video_timebase.den as i64 / (self.video_timebase.num as i64 * self.fps as i64)
}
fn pts_of_nb_samples(&self, n: i64) -> i64 {
let seconds = (n as f64 / self.audio_sample_rate as f64) as f64;
(seconds / unsafe { av_q2d(self.audio_timebase) }) as _
}
/// Generate audio to stay synchronized with video frames
unsafe fn generate_audio_frame(&mut self) -> Result<*mut AVFrame> {
const FREQUENCY: f32 = 440.0; // A4 note
@ -188,36 +297,38 @@ impl FrameGenerator {
return Ok(ptr::null_mut());
}
// Calculate how many audio samples we need to cover the next video frame
let samples_per_frame = (self.audio_sample_rate as f32 / self.fps) as u64;
let next_frame_needs_samples = (self.frame_idx + 1) * samples_per_frame;
// Calculate audio PTS needed to stay ahead of next video frame
let next_video_pts = self.video_pts + self.pts_per_frame();
// Convert video PTS to audio timebase to see how much audio we need
let audio_pts_needed =
av_rescale_q(next_video_pts, self.video_timebase, self.audio_timebase);
// Generate audio if we don't have enough to cover the next video frame
if self.audio_samples < next_frame_needs_samples {
if self.audio_pts < audio_pts_needed {
let audio_frame = av_frame_alloc();
(*audio_frame).format = AV_SAMPLE_FMT_FLTP as _;
(*audio_frame).nb_samples = self.audio_frame_size as _;
(*audio_frame).duration = self.audio_frame_size as _;
(*audio_frame).sample_rate = self.audio_sample_rate as _;
(*audio_frame).pts = self.audio_samples as _;
(*audio_frame).time_base = AVRational {
num: 1,
den: self.audio_sample_rate as _,
};
(*audio_frame).pts = self.audio_pts;
(*audio_frame).time_base = self.audio_timebase;
(*audio_frame).duration = self.pts_of_nb_samples(self.audio_frame_size as _);
av_channel_layout_default(&mut (*audio_frame).ch_layout, self.audio_channels as _);
av_frame_get_buffer(audio_frame, 0);
// Generate sine wave samples
let data = (*audio_frame).data[0] as *mut f32;
for i in 0..self.audio_frame_size {
let sample_time =
(self.audio_samples + i as u64) as f32 / self.audio_sample_rate as f32;
let sample_value =
(2.0 * std::f32::consts::PI * FREQUENCY * sample_time).sin() * 0.5;
*data.add(i as _) = sample_value;
// Generate sine wave samples for all channels
for ch in 0..self.audio_channels {
let data = (*audio_frame).data[ch as usize] as *mut f32;
for i in 0..self.audio_frame_size {
let sample_time =
(self.audio_pts + i as i64) as f32 / self.audio_sample_rate as f32;
let sample_value =
(2.0 * std::f32::consts::PI * FREQUENCY * sample_time).sin() * 0.5;
*data.add(i as _) = sample_value;
}
}
self.audio_samples += self.audio_frame_size as u64;
return Ok(audio_frame);
}
@ -227,13 +338,14 @@ impl FrameGenerator {
/// Return the next frame for encoding (blocking)
pub unsafe fn next(&mut self) -> Result<*mut AVFrame> {
// set start time to now if this is the first call to next()
if self.frame_idx == 0 {
if self.video_pts == 0 {
self.start = Instant::now();
}
// try to get audio frames before video frames (non-blocking)
let audio_frame = self.generate_audio_frame()?;
if !audio_frame.is_null() {
self.audio_pts += (*audio_frame).duration;
return Ok(audio_frame);
}
@ -242,8 +354,10 @@ impl FrameGenerator {
self.begin()?;
}
let stream_time = Duration::from_secs_f64(self.frame_idx as f64 / self.fps as f64);
let real_time = Instant::now().duration_since(self.start);
let stream_time = Duration::from_secs_f64(
self.video_pts as f64 / self.pts_per_frame() as f64 / self.fps as f64,
);
let real_time = self.start.elapsed();
let wait_time = if stream_time > real_time {
stream_time - real_time
} else {
@ -261,14 +375,14 @@ impl FrameGenerator {
self.height,
self.video_sample_fmt,
)?;
self.video_pts += (*self.next_frame).duration;
av_frame_free(&mut self.next_frame);
self.next_frame = ptr::null_mut();
self.frame_idx += 1;
Ok(out_frame)
} else {
let ret = self.next_frame;
self.video_pts += (*self.next_frame).duration;
self.next_frame = ptr::null_mut();
self.frame_idx += 1;
Ok(ret)
}
}
@ -295,6 +409,14 @@ mod tests {
sample_rate,
frame_size,
channels,
AVRational {
num: 1,
den: fps as i32,
},
AVRational {
num: 1,
den: sample_rate as i32,
},
)
.unwrap();
@ -330,7 +452,7 @@ mod tests {
expected_audio_samples - total_audio_samples
};
println!("Frame {}: VIDEO - PTS: {}, frame_idx: {}, expected_audio: {}, actual_audio: {}, deficit: {}",
println!("Frame {}: VIDEO - PTS: {}, frame_idx: {}, expected_audio: {}, actual_audio: {}, deficit: {}",
i, (*frame).pts, video_frames, expected_audio_samples, total_audio_samples, audio_deficit);
// Verify we have enough audio for this video frame
@ -378,9 +500,24 @@ mod tests {
let fps = 30.0;
let sample_rate = 44100;
let mut gen =
FrameGenerator::new(fps, 1280, 720, AV_PIX_FMT_YUV420P, sample_rate, 1024, 2)
.unwrap();
let mut gen = FrameGenerator::new(
fps,
1280,
720,
AV_PIX_FMT_YUV420P,
sample_rate,
1024,
2,
AVRational {
num: 1,
den: fps as i32,
},
AVRational {
num: 1,
den: sample_rate as i32,
},
)
.unwrap();
let mut last_audio_pts = -1i64;
let mut last_video_pts = -1i64;

View File

@ -130,14 +130,13 @@ impl BufferedReader {
}
}
/// Read data from buffer, filling the entire output buffer before returning
/// Read data from buffer
pub fn read_buffered(&mut self, buf: &mut [u8]) -> usize {
if self.buf.len() >= buf.len() {
let drain = self.buf.drain(..buf.len());
buf.copy_from_slice(drain.as_slice());
buf.len()
} else {
0
let to_drain = buf.len().min(self.buf.len());
if to_drain > 0 {
let drain = self.buf.drain(..to_drain);
buf[..to_drain].copy_from_slice(drain.as_slice());
}
to_drain
}
}

View File

@ -1,7 +1,8 @@
use crate::ingress::{BufferedReader, ConnectionInfo};
use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner;
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use bytes::{Bytes, BytesMut};
use log::{error, info};
use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};
use rml_rtmp::sessions::{
@ -16,6 +17,8 @@ use tokio::net::TcpListener;
use tokio::runtime::Handle;
use tokio::time::Instant;
use uuid::Uuid;
use xflv::errors::FlvMuxerError;
use xflv::muxer::FlvMuxer;
const MAX_MEDIA_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
@ -27,8 +30,8 @@ struct RtmpClient {
buffer: BufferedReader,
session: ServerSession,
msg_queue: VecDeque<ServerSessionResult>,
reader_buf: [u8; 4096],
pub published_stream: Option<RtmpPublishedStream>,
muxer: FlvMuxer,
}
impl RtmpClient {
@ -41,8 +44,8 @@ impl RtmpClient {
session: ses,
buffer: BufferedReader::new(1024 * 1024, MAX_MEDIA_BUFFER_SIZE, "RTMP"),
msg_queue: VecDeque::from(res),
reader_buf: [0; 4096],
published_stream: None,
muxer: FlvMuxer::new(),
})
}
@ -89,27 +92,28 @@ impl RtmpClient {
Ok(())
}
fn read_data(&mut self) -> Result<()> {
let r = match self.socket.read(&mut self.reader_buf) {
fn read_data(&mut self) -> Result<Option<usize>> {
let mut buf = [0; 4096];
let r = match self.socket.read(&mut buf) {
Ok(r) => r,
Err(e) => {
return match e.kind() {
ErrorKind::WouldBlock => Ok(()),
ErrorKind::Interrupted => Ok(()),
ErrorKind::WouldBlock => Ok(None),
ErrorKind::Interrupted => Ok(None),
_ => Err(anyhow::Error::new(e)),
};
}
};
if r == 0 {
bail!("EOF");
return Ok(Some(0));
}
let mx = self.session.handle_input(&self.reader_buf[..r])?;
let mx = self.session.handle_input(&buf[..r])?;
if !mx.is_empty() {
self.msg_queue.extend(mx);
self.process_msg_queue()?;
}
Ok(())
Ok(Some(r))
}
fn process_msg_queue(&mut self) -> Result<()> {
@ -128,6 +132,44 @@ impl RtmpClient {
Ok(())
}
fn write_flv_header(&mut self, metadata: &rml_rtmp::sessions::StreamMetadata) -> Result<()> {
let has_video = metadata.video_codec_id.is_some();
let has_audio = metadata.audio_codec_id.is_some();
self.muxer
.write_flv_header(has_audio, has_video)
.map_err(|e| anyhow!("failed to write flv header {}", e))?;
self.muxer
.write_previous_tag_size(0)
.map_err(|e| anyhow!("failed to write flv header {}", e))?;
// Extract data from the muxer
let data = self.muxer.writer.extract_current_bytes();
self.buffer.add_data(&data);
info!(
"FLV header written with audio: {}, video: {}",
has_audio, has_video
);
Ok(())
}
fn write_flv_tag(
&mut self,
tag_type: u8,
timestamp: u32,
data: Bytes,
) -> Result<(), FlvMuxerError> {
let body_len = data.len();
self.muxer
.write_flv_tag_header(tag_type, body_len as _, timestamp)?;
self.muxer.write_flv_tag_body(BytesMut::from(data))?;
self.muxer.write_previous_tag_size((11 + body_len) as _)?;
let flv_data = self.muxer.writer.extract_current_bytes();
self.buffer.add_data(&flv_data);
Ok(())
}
fn handle_event(&mut self, event: ServerSessionEvent) -> Result<()> {
match event {
ServerSessionEvent::ClientChunkSizeChanged { new_chunk_size } => {
@ -169,12 +211,19 @@ impl RtmpClient {
"Metadata configured: {}/{} {:?}",
app_name, stream_key, metadata
);
self.write_flv_header(&metadata)?;
}
ServerSessionEvent::AudioDataReceived { data, .. } => {
self.buffer.add_data(&data);
ServerSessionEvent::AudioDataReceived {
data, timestamp, ..
} => {
self.write_flv_tag(8, timestamp.value, data)
.map_err(|e| anyhow!("failed to write flv tag: {}", e))?;
}
ServerSessionEvent::VideoDataReceived { data, .. } => {
self.buffer.add_data(&data);
ServerSessionEvent::VideoDataReceived {
data, timestamp, ..
} => {
self.write_flv_tag(9, timestamp.value, data)
.map_err(|e| anyhow!("failed to write flv tag: {}", e))?;
}
ServerSessionEvent::UnhandleableAmf0Command { .. } => {}
ServerSessionEvent::PlayStreamRequested { request_id, .. } => {
@ -195,10 +244,20 @@ impl Read for RtmpClient {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// Block until we have enough data to fill the buffer
while self.buffer.buf.len() < buf.len() {
if let Err(e) = self.read_data() {
error!("Error reading data: {}", e);
return Ok(0);
};
match self.read_data() {
Ok(Some(0)) => {
let r = self.buffer.read_buffered(buf);
if r == 0 {
return Err(std::io::Error::other(anyhow!("EOF")));
}
return Ok(r);
}
Err(e) => {
error!("Error reading data: {}", e);
return Ok(0);
}
_ => continue,
}
}
Ok(self.buffer.read_buffered(buf))
@ -239,13 +298,15 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
overseer,
info,
Box::new(cc),
Some("flv".to_string()),
None,
) {
Ok(pl) => pl,
Err(e) => {
bail!("Failed to create PipelineRunner {}", e)
}
};
//pl.set_demuxer_format("flv");
//pl.set_demuxer_buffer_size(1024 * 64);
pl.run();
Ok(())
})?;

View File

@ -4,7 +4,7 @@ use crate::overseer::Overseer;
use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_frame_free, av_packet_free, AV_PROFILE_H264_MAIN};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_frame_free, av_packet_free, AV_PROFILE_H264_MAIN, AVRational};
use ffmpeg_rs_raw::{Encoder, Muxer};
use log::info;
use ringbuf::traits::{Observer, Split};
@ -115,6 +115,8 @@ impl TestPatternSrc {
SAMPLE_RATE,
frame_size,
1,
AVRational { num: 1, den: VIDEO_FPS as i32 },
AVRational { num: 1, den: SAMPLE_RATE as i32 },
)?,
video_encoder,
audio_encoder,

View File

@ -21,7 +21,8 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_frame_clone, av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame, AVPacket, AV_NOPTS_VALUE,
av_frame_clone, av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, AVFrame,
AVPacket, AV_NOPTS_VALUE,
};
use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType,
@ -30,6 +31,12 @@ use log::{error, info, warn};
use tokio::runtime::Handle;
use uuid::Uuid;
/// Idle mode timeout in seconds
const IDLE_TIMEOUT_SECS: u64 = 600;
/// Circuit breaker threshold for consecutive decode failures
const DEFAULT_MAX_CONSECUTIVE_FAILURES: u32 = 50;
/// Runner state for handling normal vs idle modes
pub enum RunnerState {
/// Normal operation - processing live stream
@ -37,11 +44,25 @@ pub enum RunnerState {
/// Idle mode - generating placeholder content after disconnection
Idle {
start_time: Instant,
last_frame_time: Option<Instant>,
gen: FrameGenerator,
},
}
impl RunnerState {
/// Check if currently in idle mode
pub fn is_idle(&self) -> bool {
matches!(self, RunnerState::Idle { .. })
}
/// Get idle duration, returns None if not in idle mode
pub fn idle_duration(&self) -> Option<Duration> {
match self {
RunnerState::Idle { start_time, .. } => Some(start_time.elapsed()),
RunnerState::Normal => None,
}
}
}
/// Pipeline runner is the main entry process for stream transcoding
///
/// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread
@ -100,6 +121,12 @@ pub struct PipelineRunner {
/// Maximum consecutive failures before triggering circuit breaker
max_consecutive_failures: u32,
/// Last video PTS for continuity in idle mode
last_video_pts: i64,
/// Last audio PTS for continuity in idle mode
last_audio_pts: i64,
}
unsafe impl Send for PipelineRunner {}
@ -132,7 +159,9 @@ impl PipelineRunner {
thumb_interval: 1800,
state: RunnerState::Normal,
consecutive_decode_failures: 0,
max_consecutive_failures: 50,
max_consecutive_failures: DEFAULT_MAX_CONSECUTIVE_FAILURES,
last_video_pts: 0,
last_audio_pts: 0,
})
}
@ -140,6 +169,10 @@ impl PipelineRunner {
self.demuxer.set_buffer_size(buffer_size);
}
pub fn set_demuxer_format(&mut self, format: &str) {
self.demuxer.set_format(format);
}
/// Save image to disk
unsafe fn save_thumb(frame: *mut AVFrame, dst_pic: &Path) -> Result<()> {
let mut free_frame = false;
@ -200,56 +233,118 @@ impl PipelineRunner {
/// Switch to idle mode with placeholder content generation
unsafe fn switch_to_idle_mode(&mut self, config: &PipelineConfig) -> Result<()> {
let src_video_stream = config
.ingress_info
.streams
.iter()
.find(|s| s.index == config.video_src)
.unwrap();
let src_audio_stream = config
.ingress_info
.streams
.iter()
.find(|s| Some(s.index) == config.audio_src);
if self.state.is_idle() {
return Ok(()); // Already in idle mode
}
let gen = FrameGenerator::from_stream(src_video_stream, src_audio_stream)?;
// Get streams directly from demuxer for correct timebase and properties
let video_stream = self.demuxer.get_stream(config.video_src)?;
let audio_stream = if let Some(audio_src) = config.audio_src {
Some(self.demuxer.get_stream(audio_src)?)
} else {
None
};
let mut gen = FrameGenerator::from_av_streams(
video_stream as *const _,
audio_stream.map(|s| s as *const _),
)?;
// Set starting PTS to continue from last frame
gen.set_starting_pts(self.last_video_pts, self.last_audio_pts);
self.state = RunnerState::Idle {
start_time: Instant::now(),
last_frame_time: None,
gen,
};
self.consecutive_decode_failures = 0; // Reset counter when entering idle mode
info!("Switched to idle mode - generating placeholder content");
Ok(())
}
/// Check if circuit breaker should trigger due to consecutive failures
fn should_trigger_circuit_breaker(&self) -> bool {
self.consecutive_decode_failures >= self.max_consecutive_failures
}
/// Handle decode failure with circuit breaker logic
unsafe fn handle_decode_failure(
&mut self,
config: &PipelineConfig,
) -> Result<Vec<EgressResult>> {
// Check if we've hit the circuit breaker threshold
if self.consecutive_decode_failures >= self.max_consecutive_failures {
if self.should_trigger_circuit_breaker() {
error!(
"Circuit breaker triggered: {} consecutive decode failures exceeded threshold of {}. Switching to idle mode.",
self.consecutive_decode_failures, self.max_consecutive_failures
);
// Switch to idle mode to continue stream with placeholder content
match self.switch_to_idle_mode(config) {
Ok(()) => {
self.consecutive_decode_failures = 0; // Reset counter
info!("Switched to idle mode due to excessive decode failures");
}
Err(e) => {
error!("Failed to switch to idle mode: {}", e);
bail!("Circuit breaker triggered and unable to switch to idle mode");
}
}
self.switch_to_idle_mode(config)
.context("Circuit breaker triggered but unable to switch to idle mode")?;
}
// Return empty result to skip this packet
Ok(vec![])
}
/// Process frame in normal mode (live stream)
unsafe fn process_normal_mode(&mut self, config: &PipelineConfig) -> Result<Vec<EgressResult>> {
let (mut pkt, _stream) = self.demuxer.get_packet()?;
if pkt.is_null() {
warn!("Demuxer get_packet failed, entering idle mode");
self.switch_to_idle_mode(config)
.context("Failed to switch to idle mode after demuxer failure")?;
Ok(vec![])
} else {
let res = self.process_packet(pkt)?;
av_packet_free(&mut pkt);
Ok(res)
}
}
/// Process frame in idle mode (placeholder content)
unsafe fn process_idle_mode(&mut self, config: &PipelineConfig) -> Result<Vec<EgressResult>> {
// Check if idle timeout has been reached
if let Some(duration) = self.state.idle_duration() {
if duration > Duration::from_secs(IDLE_TIMEOUT_SECS) {
info!(
"Idle timeout reached ({} seconds), ending stream",
IDLE_TIMEOUT_SECS
);
return Err(anyhow!("Idle timeout reached"));
}
}
// Generate next frame from idle mode generator
if let RunnerState::Idle {
gen, start_time, ..
} = &mut self.state
{
gen.begin()?;
gen.fill_color([0, 0, 0, 255])?;
let message = format!(
"Stream Offline - {} seconds",
start_time.elapsed().as_secs()
);
gen.write_text(&message, 48.0, 50.0, 50.0)?;
gen.write_text("Please reconnect to resume streaming", 24.0, 50.0, 120.0)?;
let frame = gen.next()?;
let stream = if (*frame).sample_rate > 0 {
// Audio frame
config
.audio_src
.context("got audio frame with no audio src?")?
} else {
// Video frame
config.video_src
};
self.process_frame(config, stream, frame)
} else {
bail!("process_idle_mode called but not in idle state")
}
}
unsafe fn process_packet(&mut self, packet: *mut AVPacket) -> Result<Vec<EgressResult>> {
let config = if let Some(config) = &self.config {
config.clone()
@ -386,11 +481,15 @@ impl PipelineRunner {
}
}
// count frame as processed
// Track last PTS values for continuity in idle mode
if stream_index == config.video_src {
self.last_video_pts = (*frame).pts + (*frame).duration;
self.generate_thumb_from_frame(frame)?;
self.frame_ctr += 1;
} else if Some(stream_index) == config.audio_src {
self.last_audio_pts = (*frame).pts + (*frame).duration;
}
av_frame_free(&mut frame);
Ok(egress_results)
}
@ -485,66 +584,15 @@ impl PipelineRunner {
};
// run transcoder pipeline
let (mut pkt, _) = match &self.state {
RunnerState::Normal => {
match self.demuxer.get_packet() {
Ok(pkt) => pkt,
Err(e) => {
warn!("Demuxer get_packet failed: {}, entering idle mode", e);
// Switch to idle mode when demuxer fails
match self.switch_to_idle_mode(&config) {
Ok(()) => (ptr::null_mut(), ptr::null_mut()),
Err(switch_err) => {
error!("Failed to switch to idle mode: {}", switch_err);
return Err(e.into());
}
}
}
}
}
RunnerState::Idle { .. } => {
// return empty when idle - skip demuxer completely
(ptr::null_mut(), ptr::null_mut())
}
let results = match &mut self.state {
RunnerState::Normal => self.process_normal_mode(&config)?,
RunnerState::Idle { .. } => self.process_idle_mode(&config)?,
};
// Handle state transitions based on packet availability
match (&self.state, pkt.is_null()) {
(RunnerState::Normal, true) => {
info!("Stream input disconnected, entering idle mode");
self.switch_to_idle_mode(&config)?;
}
(RunnerState::Idle { start_time, .. }, true) => {
// Check if we've been idle for more than 1 minute
if start_time.elapsed() > Duration::from_secs(60) {
info!("Idle timeout reached (60 seconds), ending stream");
return Ok(false);
}
}
_ => {}
}
// Process based on current state
let result = match &mut self.state {
RunnerState::Idle { gen, .. } => {
let frame = gen.next()?;
let stream = if (*frame).sample_rate > 0 {
config
.audio_src
.context("got audio frame with no audio src?")?
} else {
config.video_src
};
self.process_frame(&config, stream, frame)?
}
RunnerState::Normal => self.process_packet(pkt)?,
};
av_packet_free(&mut pkt);
// egress results - process async operations without blocking if possible
if !result.is_empty() {
if !results.is_empty() {
self.handle.block_on(async {
for er in result {
for er in results {
if let EgressResult::Segments { created, deleted } = er {
if let Err(e) = self
.overseer