diff --git a/crates/core/src/egress/hls.rs b/crates/core/src/egress/hls.rs index 0d0083a..1fc08ae 100644 --- a/crates/core/src/egress/hls.rs +++ b/crates/core/src/egress/hls.rs @@ -1,10 +1,9 @@ use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; -use ffmpeg_rs_raw::Encoder; use std::path::PathBuf; use uuid::Uuid; -use crate::egress::{Egress, EgressResult}; +use crate::egress::{Egress, EgressResult, EncoderOrSourceStream}; use crate::mux::{HlsMuxer, SegmentType}; use crate::variant::VariantStream; @@ -18,7 +17,7 @@ impl HlsEgress { pub fn new<'a>( out_dir: PathBuf, - encoders: impl Iterator, + encoders: impl Iterator)>, segment_type: SegmentType, ) -> Result { Ok(Self { diff --git a/crates/core/src/egress/mod.rs b/crates/core/src/egress/mod.rs index f95b56d..e3925e1 100644 --- a/crates/core/src/egress/mod.rs +++ b/crates/core/src/egress/mod.rs @@ -1,5 +1,6 @@ use anyhow::Result; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{AVPacket, AVStream}; +use ffmpeg_rs_raw::Encoder; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::path::PathBuf; @@ -44,3 +45,8 @@ pub struct EgressSegment { /// Path on disk to the segment file pub path: PathBuf, } + +pub enum EncoderOrSourceStream<'a> { + Encoder(&'a Encoder), + SourceStream(*mut AVStream), +} diff --git a/crates/core/src/egress/recorder.rs b/crates/core/src/egress/recorder.rs index d84b3af..cb886ea 100644 --- a/crates/core/src/egress/recorder.rs +++ b/crates/core/src/egress/recorder.rs @@ -1,13 +1,11 @@ use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; -use ffmpeg_rs_raw::{Encoder, Muxer}; -use log::info; +use ffmpeg_rs_raw::Muxer; use std::collections::HashMap; -use std::fs; use std::path::PathBuf; use uuid::Uuid; -use crate::egress::{Egress, EgressResult}; +use crate::egress::{Egress, EgressResult, EncoderOrSourceStream}; use crate::variant::{StreamMapping, VariantStream}; pub struct RecorderEgress { @@ -22,7 +20,7 @@ impl RecorderEgress { pub fn new<'a>( out_dir: PathBuf, - variants: impl Iterator, + variants: impl Iterator)>, ) -> Result { let out_file = out_dir.join(Self::FILENAME); let mut var_map = HashMap::new(); @@ -31,8 +29,16 @@ impl RecorderEgress { .with_output_path(out_file.to_str().unwrap(), None)? .build()?; for (var, enc) in variants { - let stream = m.add_stream_encoder(enc)?; - var_map.insert(var.id(), (*stream).index); + match enc { + EncoderOrSourceStream::Encoder(enc) => { + let stream = m.add_stream_encoder(enc)?; + var_map.insert(var.id(), (*stream).index); + } + EncoderOrSourceStream::SourceStream(stream) => { + let stream = m.add_copy_stream(stream)?; + var_map.insert(var.id(), (*stream).index); + } + } } let mut options = HashMap::new(); options.insert("movflags".to_string(), "faststart".to_string()); diff --git a/crates/core/src/mux/hls/mod.rs b/crates/core/src/mux/hls/mod.rs index 96c5b0d..da1390b 100644 --- a/crates/core/src/mux/hls/mod.rs +++ b/crates/core/src/mux/hls/mod.rs @@ -1,4 +1,4 @@ -use crate::egress::EgressResult; +use crate::egress::{EgressResult, EncoderOrSourceStream}; use crate::mux::hls::variant::HlsVariant; use crate::variant::{StreamMapping, VariantStream}; use anyhow::Result; @@ -8,7 +8,9 @@ use itertools::Itertools; use log::{trace, warn}; use std::fmt::Display; use std::fs::{remove_dir_all, File}; +use std::ops::Sub; use std::path::PathBuf; +use tokio::time::Instant; use uuid::Uuid; mod segment; @@ -69,14 +71,18 @@ pub enum SegmentType { pub struct HlsMuxer { pub out_dir: PathBuf, pub variants: Vec, + + last_master_write: Instant, } impl HlsMuxer { - const MASTER_PLAYLIST: &'static str = "live.m3u8"; + pub const MASTER_PLAYLIST: &'static str = "live.m3u8"; + + const MASTER_WRITE_INTERVAL: f32 = 60.0; pub fn new<'a>( out_dir: PathBuf, - encoders: impl Iterator, + encoders: impl Iterator)>, segment_type: SegmentType, ) -> Result { if !out_dir.exists() { @@ -91,15 +97,16 @@ impl HlsMuxer { vars.push(var); } - let ret = Self { + let mut ret = Self { out_dir, variants: vars, + last_master_write: Instant::now(), }; ret.write_master_playlist()?; Ok(ret) } - fn write_master_playlist(&self) -> Result<()> { + fn write_master_playlist(&mut self) -> Result<()> { let mut pl = m3u8_rs::MasterPlaylist::default(); pl.version = Some(3); pl.variants = self @@ -110,6 +117,7 @@ impl HlsMuxer { let mut f_out = File::create(self.out_dir.join(Self::MASTER_PLAYLIST))?; pl.write_to(&mut f_out)?; + self.last_master_write = Instant::now(); Ok(()) } @@ -119,6 +127,9 @@ impl HlsMuxer { pkt: *mut AVPacket, variant: &Uuid, ) -> Result { + if Instant::now().sub(self.last_master_write).as_secs_f32() > Self::MASTER_WRITE_INTERVAL { + self.write_master_playlist()?; + } for var in self.variants.iter_mut() { if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) { // very important for muxer to know which stream this pkt belongs to @@ -140,7 +151,11 @@ impl HlsMuxer { impl Drop for HlsMuxer { fn drop(&mut self) { if let Err(e) = remove_dir_all(&self.out_dir) { - warn!("Failed to clean up hls dir: {} {}", self.out_dir.display(), e); + warn!( + "Failed to clean up hls dir: {} {}", + self.out_dir.display(), + e + ); } } } diff --git a/crates/core/src/mux/hls/variant.rs b/crates/core/src/mux/hls/variant.rs index 05965b6..1b7fa26 100644 --- a/crates/core/src/mux/hls/variant.rs +++ b/crates/core/src/mux/hls/variant.rs @@ -1,4 +1,4 @@ -use crate::egress::{EgressResult, EgressSegment}; +use crate::egress::{EgressResult, EgressSegment, EncoderOrSourceStream}; use crate::mux::hls::segment::{HlsSegment, PartialSegmentInfo, SegmentInfo}; use crate::mux::{HlsVariantStream, SegmentType}; use crate::variant::{StreamMapping, VariantStream}; @@ -6,14 +6,15 @@ use anyhow::{bail, ensure, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_free, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, AVPacket, - AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, + av_free, av_get_bits_per_pixel, av_pix_fmt_desc_get, av_q2d, av_write_frame, avio_close, + avio_flush, avio_open, avio_size, AVPacket, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use log::{debug, info, trace, warn}; use m3u8_rs::{ExtTag, MediaSegmentType, PartInf, PreloadHint}; use std::collections::HashMap; use std::fs::{create_dir_all, File}; +use std::mem::transmute; use std::path::PathBuf; use std::ptr; @@ -60,7 +61,7 @@ impl HlsVariant { pub fn new<'a>( out_dir: PathBuf, group: usize, - encoded_vars: impl Iterator, + encoded_vars: impl Iterator)>, segment_type: SegmentType, ) -> Result { let name = format!("stream_{}", group); @@ -87,44 +88,71 @@ impl HlsVariant { let mut segment_length = 1.0; for (var, enc) in encoded_vars { - match var { - VariantStream::Video(v) => unsafe { - let stream = mux.add_stream_encoder(enc)?; - let stream_idx = (*stream).index as usize; - streams.push(HlsVariantStream::Video { - group, - index: stream_idx, - id: v.id(), - }); - has_video = true; - // Always use video stream as reference for segmentation - ref_stream_index = stream_idx as _; - let sg = v.keyframe_interval as f32 / v.fps; - if sg > segment_length { - segment_length = sg; - } - }, - VariantStream::Audio(a) => unsafe { - let stream = mux.add_stream_encoder(enc)?; - let stream_idx = (*stream).index as usize; - streams.push(HlsVariantStream::Audio { - group, - index: stream_idx, - id: a.id(), - }); - if !has_video && ref_stream_index == -1 { + match enc { + EncoderOrSourceStream::Encoder(enc) => match var { + VariantStream::Video(v) => unsafe { + let stream = mux.add_stream_encoder(enc)?; + let stream_idx = (*stream).index as usize; + streams.push(HlsVariantStream::Video { + group, + index: stream_idx, + id: v.id(), + }); + has_video = true; ref_stream_index = stream_idx as _; - } + let sg = v.keyframe_interval as f32 / v.fps; + if sg > segment_length { + segment_length = sg; + } + }, + VariantStream::Audio(a) => unsafe { + let stream = mux.add_stream_encoder(enc)?; + let stream_idx = (*stream).index as usize; + streams.push(HlsVariantStream::Audio { + group, + index: stream_idx, + id: a.id(), + }); + if !has_video && ref_stream_index == -1 { + ref_stream_index = stream_idx as _; + } + }, + VariantStream::Subtitle(s) => unsafe { + let stream = mux.add_stream_encoder(enc)?; + streams.push(HlsVariantStream::Subtitle { + group, + index: (*stream).index as usize, + id: s.id(), + }) + }, + _ => bail!("unsupported variant stream"), }, - VariantStream::Subtitle(s) => unsafe { - let stream = mux.add_stream_encoder(enc)?; - streams.push(HlsVariantStream::Subtitle { - group, - index: (*stream).index as usize, - id: s.id(), - }) + EncoderOrSourceStream::SourceStream(stream) => match var { + VariantStream::CopyVideo(v) => unsafe { + let stream = mux.add_copy_stream(stream)?; + let stream_idx = (*stream).index as usize; + streams.push(HlsVariantStream::Video { + group, + index: stream_idx, + id: v.id(), + }); + has_video = true; + ref_stream_index = stream_idx as _; + }, + VariantStream::CopyAudio(a) => unsafe { + let stream = mux.add_copy_stream(stream)?; + let stream_idx = (*stream).index as usize; + streams.push(HlsVariantStream::Audio { + group, + index: stream_idx, + id: a.id(), + }); + if !has_video && ref_stream_index == -1 { + ref_stream_index = stream_idx as _; + } + }, + _ => bail!("unsupported variant stream"), }, - _ => bail!("unsupported variant stream"), } } ensure!( @@ -597,17 +625,29 @@ impl HlsVariant { let pes = self.video_stream().unwrap_or(self.streams.first().unwrap()); let av_stream = *(*self.mux.context()).streams.add(*pes.index()); let codec_par = (*av_stream).codecpar; + let bitrate = (*codec_par).bit_rate as u64; + let fps = av_q2d((*codec_par).framerate); m3u8_rs::VariantStream { is_i_frame: false, uri: format!("{}/live.m3u8", self.name), - bandwidth: (*codec_par).bit_rate as u64, + bandwidth: if bitrate == 0 { + // make up bitrate when unknown (copy streams) + // this is the bitrate as a raw decoded stream, it's not accurate at all + // It only serves the purpose of ordering the copy streams as having the highest bitrate + let pix_desc = av_pix_fmt_desc_get(transmute((*codec_par).format)); + (*codec_par).width as u64 + * (*codec_par).height as u64 + * av_get_bits_per_pixel(pix_desc) as u64 + } else { + bitrate + }, average_bandwidth: None, codecs: self.to_codec_attr(), resolution: Some(m3u8_rs::Resolution { width: (*codec_par).width as _, height: (*codec_par).height as _, }), - frame_rate: Some(av_q2d((*codec_par).framerate)), + frame_rate: if fps > 0.0 { Some(fps) } else { None }, hdcp_level: None, audio: None, video: None, diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 9c3dad5..d159307 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -10,7 +10,7 @@ use std::time::{Duration, Instant}; use crate::egress::hls::HlsEgress; use crate::egress::recorder::RecorderEgress; -use crate::egress::{Egress, EgressResult}; +use crate::egress::{Egress, EgressResult, EncoderOrSourceStream}; use crate::generator::FrameGenerator; use crate::ingress::ConnectionInfo; use crate::mux::SegmentType; @@ -101,9 +101,6 @@ pub struct PipelineRunner { /// Encoder for a variant (variant_id, Encoder) encoders: HashMap, - /// Simple mapping to copy streams - copy_stream: HashMap, - /// All configured egress' egress: Vec>, @@ -164,7 +161,6 @@ impl PipelineRunner { scalers: Default::default(), resampler: Default::default(), encoders: Default::default(), - copy_stream: Default::default(), fps_counter_start: Instant::now(), egress: Vec::new(), frame_ctr: 0, @@ -366,51 +362,65 @@ impl PipelineRunner { // Process all packets (original or converted) let mut egress_results = vec![]; - // TODO: For copy streams, skip decoder - let frames = match self.decoder.decode_pkt(packet) { - Ok(f) => { - // Reset failure counter on successful decode - self.consecutive_decode_failures = 0; - f - } - Err(e) => { - self.consecutive_decode_failures += 1; - - // Enhanced error logging with context - let packet_info = if !packet.is_null() { - format!( - "stream_idx={}, size={}, pts={}, dts={}", - (*packet).stream_index, - (*packet).size, - (*packet).pts, - (*packet).dts - ) - } else { - "null packet".to_string() - }; - - warn!( - "Error decoding packet ({}): {}. Consecutive failures: {}/{}. Skipping packet.", - packet_info, e, self.consecutive_decode_failures, self.max_consecutive_failures - ); - - return self.handle_decode_failure(&config); - } - }; - - for (frame, stream_idx) in frames { - let stream = self.demuxer.get_stream(stream_idx as usize)?; - // Adjust frame pts time without start_offset - // Egress streams don't have a start time offset - if !stream.is_null() { - if (*stream).start_time != AV_NOPTS_VALUE { - (*frame).pts -= (*stream).start_time; + // only process via decoder if there is more than 1 encoder + if !self.encoders.is_empty() { + let frames = match self.decoder.decode_pkt(packet) { + Ok(f) => { + // Reset failure counter on successful decode + self.consecutive_decode_failures = 0; + f } - (*frame).time_base = (*stream).time_base; - } + Err(e) => { + self.consecutive_decode_failures += 1; - let results = self.process_frame(&config, stream_idx as usize, frame)?; - egress_results.extend(results); + // Enhanced error logging with context + let packet_info = if !packet.is_null() { + format!( + "stream_idx={}, size={}, pts={}, dts={}", + (*packet).stream_index, + (*packet).size, + (*packet).pts, + (*packet).dts + ) + } else { + "null packet".to_string() + }; + + warn!( + "Error decoding packet ({}): {}. Consecutive failures: {}/{}. Skipping packet.", + packet_info, e, self.consecutive_decode_failures, self.max_consecutive_failures + ); + + return self.handle_decode_failure(&config); + } + }; + + for (frame, stream_idx) in frames { + let stream = self.demuxer.get_stream(stream_idx as usize)?; + // Adjust frame pts time without start_offset + // Egress streams don't have a start time offset + if !stream.is_null() { + if (*stream).start_time != AV_NOPTS_VALUE { + (*frame).pts -= (*stream).start_time; + } + (*frame).time_base = (*stream).time_base; + } + + let results = self.process_frame(&config, stream_idx as usize, frame)?; + egress_results.extend(results); + } + } + + // egress (mux) copy variants + for var in config.variants { + match var { + VariantStream::CopyVideo(v) | VariantStream::CopyAudio(v) + if v.src_index == (*packet).stream_index as _ => + { + egress_results.extend(Self::egress_packet(&mut self.egress, packet, &v.id())?); + } + _ => {} + } } Ok(egress_results) @@ -436,7 +446,6 @@ impl PipelineRunner { let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) { enc } else { - warn!("Frame had nowhere to go in {} :/", var.id()); continue; }; @@ -512,7 +521,6 @@ impl PipelineRunner { encoder: &mut Encoder, frame: *mut AVFrame, ) -> Result> { - let mut ret = vec![]; // before encoding frame, rescale timestamps if !frame.is_null() { let enc_ctx = encoder.codec_context(); @@ -526,16 +534,25 @@ impl PipelineRunner { } let packets = encoder.encode_frame(frame)?; - // pass new packets to egress - for mut pkt in packets { - for eg in egress.iter_mut() { - let pkt_clone = av_packet_clone(pkt); - let er = eg.process_pkt(pkt_clone, &var.id())?; - ret.push(er); - } - av_packet_free(&mut pkt); + let mut ret = vec![]; + for pkt in packets { + ret.extend(Self::egress_packet(egress, pkt, &var.id())?); } + Ok(ret) + } + unsafe fn egress_packet( + egress: &mut Vec>, + mut pkt: *mut AVPacket, + variant: &Uuid, + ) -> Result> { + let mut ret = vec![]; + for eg in egress.iter_mut() { + let mut pkt_clone = av_packet_clone(pkt); + let er = eg.process_pkt(pkt_clone, variant)?; + av_packet_free(&mut pkt_clone); + ret.push(er); + } Ok(ret) } @@ -714,26 +731,33 @@ impl PipelineRunner { } } - // TODO: Setup copy streams - // Setup egress for e in &cfg.egress { let c = e.config(); - let encoders = self.encoders.iter().filter_map(|(k, v)| { - if c.variants.contains(k) { - let var = cfg.variants.iter().find(|x| x.id() == *k)?; - Some((var, v)) + let vars = c + .variants + .iter() + .map_while(|x| cfg.variants.iter().find(|z| z.id() == *x)); + let variant_mapping = vars.map_while(|v| { + if let Some(e) = self.encoders.get(&v.id()) { + Some((v, EncoderOrSourceStream::Encoder(e))) } else { - None + Some(( + v, + EncoderOrSourceStream::SourceStream(unsafe { + self.demuxer.get_stream(v.src_index()).ok()? + }), + )) } }); match e { EgressType::HLS(_) => { - let hls = HlsEgress::new(self.out_dir.clone(), encoders, SegmentType::MPEGTS)?; + let hls = + HlsEgress::new(self.out_dir.clone(), variant_mapping, SegmentType::MPEGTS)?; self.egress.push(Box::new(hls)); } EgressType::Recorder(_) => { - let rec = RecorderEgress::new(self.out_dir.clone(), encoders)?; + let rec = RecorderEgress::new(self.out_dir.clone(), variant_mapping)?; self.egress.push(Box::new(rec)); } _ => warn!("{} is not implemented", e), @@ -756,7 +780,6 @@ impl Drop for PipelineRunner { self.encoders.clear(); self.scalers.clear(); self.resampler.clear(); - self.copy_stream.clear(); self.egress.clear(); info!(