feat: remux copy streams
Some checks failed
continuous-integration/drone/push Build is failing

closes #8
This commit is contained in:
2025-06-20 11:45:19 +01:00
parent e6bddcf641
commit add82b6933
6 changed files with 215 additions and 126 deletions

View File

@ -1,10 +1,9 @@
use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
use ffmpeg_rs_raw::Encoder;
use std::path::PathBuf;
use uuid::Uuid;
use crate::egress::{Egress, EgressResult};
use crate::egress::{Egress, EgressResult, EncoderOrSourceStream};
use crate::mux::{HlsMuxer, SegmentType};
use crate::variant::VariantStream;
@ -18,7 +17,7 @@ impl HlsEgress {
pub fn new<'a>(
out_dir: PathBuf,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
encoders: impl Iterator<Item = (&'a VariantStream, EncoderOrSourceStream<'a>)>,
segment_type: SegmentType,
) -> Result<Self> {
Ok(Self {

View File

@ -1,5 +1,6 @@
use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{AVPacket, AVStream};
use ffmpeg_rs_raw::Encoder;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::path::PathBuf;
@ -44,3 +45,8 @@ pub struct EgressSegment {
/// Path on disk to the segment file
pub path: PathBuf,
}
pub enum EncoderOrSourceStream<'a> {
Encoder(&'a Encoder),
SourceStream(*mut AVStream),
}

View File

@ -1,13 +1,11 @@
use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
use ffmpeg_rs_raw::{Encoder, Muxer};
use log::info;
use ffmpeg_rs_raw::Muxer;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use uuid::Uuid;
use crate::egress::{Egress, EgressResult};
use crate::egress::{Egress, EgressResult, EncoderOrSourceStream};
use crate::variant::{StreamMapping, VariantStream};
pub struct RecorderEgress {
@ -22,7 +20,7 @@ impl RecorderEgress {
pub fn new<'a>(
out_dir: PathBuf,
variants: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
variants: impl Iterator<Item = (&'a VariantStream, EncoderOrSourceStream<'a>)>,
) -> Result<Self> {
let out_file = out_dir.join(Self::FILENAME);
let mut var_map = HashMap::new();
@ -31,8 +29,16 @@ impl RecorderEgress {
.with_output_path(out_file.to_str().unwrap(), None)?
.build()?;
for (var, enc) in variants {
let stream = m.add_stream_encoder(enc)?;
var_map.insert(var.id(), (*stream).index);
match enc {
EncoderOrSourceStream::Encoder(enc) => {
let stream = m.add_stream_encoder(enc)?;
var_map.insert(var.id(), (*stream).index);
}
EncoderOrSourceStream::SourceStream(stream) => {
let stream = m.add_copy_stream(stream)?;
var_map.insert(var.id(), (*stream).index);
}
}
}
let mut options = HashMap::new();
options.insert("movflags".to_string(), "faststart".to_string());

View File

@ -1,4 +1,4 @@
use crate::egress::EgressResult;
use crate::egress::{EgressResult, EncoderOrSourceStream};
use crate::mux::hls::variant::HlsVariant;
use crate::variant::{StreamMapping, VariantStream};
use anyhow::Result;
@ -8,7 +8,9 @@ use itertools::Itertools;
use log::{trace, warn};
use std::fmt::Display;
use std::fs::{remove_dir_all, File};
use std::ops::Sub;
use std::path::PathBuf;
use tokio::time::Instant;
use uuid::Uuid;
mod segment;
@ -69,14 +71,18 @@ pub enum SegmentType {
pub struct HlsMuxer {
pub out_dir: PathBuf,
pub variants: Vec<HlsVariant>,
last_master_write: Instant,
}
impl HlsMuxer {
const MASTER_PLAYLIST: &'static str = "live.m3u8";
pub const MASTER_PLAYLIST: &'static str = "live.m3u8";
const MASTER_WRITE_INTERVAL: f32 = 60.0;
pub fn new<'a>(
out_dir: PathBuf,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
encoders: impl Iterator<Item = (&'a VariantStream, EncoderOrSourceStream<'a>)>,
segment_type: SegmentType,
) -> Result<Self> {
if !out_dir.exists() {
@ -91,15 +97,16 @@ impl HlsMuxer {
vars.push(var);
}
let ret = Self {
let mut ret = Self {
out_dir,
variants: vars,
last_master_write: Instant::now(),
};
ret.write_master_playlist()?;
Ok(ret)
}
fn write_master_playlist(&self) -> Result<()> {
fn write_master_playlist(&mut self) -> Result<()> {
let mut pl = m3u8_rs::MasterPlaylist::default();
pl.version = Some(3);
pl.variants = self
@ -110,6 +117,7 @@ impl HlsMuxer {
let mut f_out = File::create(self.out_dir.join(Self::MASTER_PLAYLIST))?;
pl.write_to(&mut f_out)?;
self.last_master_write = Instant::now();
Ok(())
}
@ -119,6 +127,9 @@ impl HlsMuxer {
pkt: *mut AVPacket,
variant: &Uuid,
) -> Result<EgressResult> {
if Instant::now().sub(self.last_master_write).as_secs_f32() > Self::MASTER_WRITE_INTERVAL {
self.write_master_playlist()?;
}
for var in self.variants.iter_mut() {
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
// very important for muxer to know which stream this pkt belongs to
@ -140,7 +151,11 @@ impl HlsMuxer {
impl Drop for HlsMuxer {
fn drop(&mut self) {
if let Err(e) = remove_dir_all(&self.out_dir) {
warn!("Failed to clean up hls dir: {} {}", self.out_dir.display(), e);
warn!(
"Failed to clean up hls dir: {} {}",
self.out_dir.display(),
e
);
}
}
}

View File

@ -1,4 +1,4 @@
use crate::egress::{EgressResult, EgressSegment};
use crate::egress::{EgressResult, EgressSegment, EncoderOrSourceStream};
use crate::mux::hls::segment::{HlsSegment, PartialSegmentInfo, SegmentInfo};
use crate::mux::{HlsVariantStream, SegmentType};
use crate::variant::{StreamMapping, VariantStream};
@ -6,14 +6,15 @@ use anyhow::{bail, ensure, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, AVPacket,
AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
av_free, av_get_bits_per_pixel, av_pix_fmt_desc_get, av_q2d, av_write_frame, avio_close,
avio_flush, avio_open, avio_size, AVPacket, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
};
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use log::{debug, info, trace, warn};
use m3u8_rs::{ExtTag, MediaSegmentType, PartInf, PreloadHint};
use std::collections::HashMap;
use std::fs::{create_dir_all, File};
use std::mem::transmute;
use std::path::PathBuf;
use std::ptr;
@ -60,7 +61,7 @@ impl HlsVariant {
pub fn new<'a>(
out_dir: PathBuf,
group: usize,
encoded_vars: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
encoded_vars: impl Iterator<Item = (&'a VariantStream, EncoderOrSourceStream<'a>)>,
segment_type: SegmentType,
) -> Result<Self> {
let name = format!("stream_{}", group);
@ -87,44 +88,71 @@ impl HlsVariant {
let mut segment_length = 1.0;
for (var, enc) in encoded_vars {
match var {
VariantStream::Video(v) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Video {
group,
index: stream_idx,
id: v.id(),
});
has_video = true;
// Always use video stream as reference for segmentation
ref_stream_index = stream_idx as _;
let sg = v.keyframe_interval as f32 / v.fps;
if sg > segment_length {
segment_length = sg;
}
},
VariantStream::Audio(a) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Audio {
group,
index: stream_idx,
id: a.id(),
});
if !has_video && ref_stream_index == -1 {
match enc {
EncoderOrSourceStream::Encoder(enc) => match var {
VariantStream::Video(v) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Video {
group,
index: stream_idx,
id: v.id(),
});
has_video = true;
ref_stream_index = stream_idx as _;
}
let sg = v.keyframe_interval as f32 / v.fps;
if sg > segment_length {
segment_length = sg;
}
},
VariantStream::Audio(a) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Audio {
group,
index: stream_idx,
id: a.id(),
});
if !has_video && ref_stream_index == -1 {
ref_stream_index = stream_idx as _;
}
},
VariantStream::Subtitle(s) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
streams.push(HlsVariantStream::Subtitle {
group,
index: (*stream).index as usize,
id: s.id(),
})
},
_ => bail!("unsupported variant stream"),
},
VariantStream::Subtitle(s) => unsafe {
let stream = mux.add_stream_encoder(enc)?;
streams.push(HlsVariantStream::Subtitle {
group,
index: (*stream).index as usize,
id: s.id(),
})
EncoderOrSourceStream::SourceStream(stream) => match var {
VariantStream::CopyVideo(v) => unsafe {
let stream = mux.add_copy_stream(stream)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Video {
group,
index: stream_idx,
id: v.id(),
});
has_video = true;
ref_stream_index = stream_idx as _;
},
VariantStream::CopyAudio(a) => unsafe {
let stream = mux.add_copy_stream(stream)?;
let stream_idx = (*stream).index as usize;
streams.push(HlsVariantStream::Audio {
group,
index: stream_idx,
id: a.id(),
});
if !has_video && ref_stream_index == -1 {
ref_stream_index = stream_idx as _;
}
},
_ => bail!("unsupported variant stream"),
},
_ => bail!("unsupported variant stream"),
}
}
ensure!(
@ -597,17 +625,29 @@ impl HlsVariant {
let pes = self.video_stream().unwrap_or(self.streams.first().unwrap());
let av_stream = *(*self.mux.context()).streams.add(*pes.index());
let codec_par = (*av_stream).codecpar;
let bitrate = (*codec_par).bit_rate as u64;
let fps = av_q2d((*codec_par).framerate);
m3u8_rs::VariantStream {
is_i_frame: false,
uri: format!("{}/live.m3u8", self.name),
bandwidth: (*codec_par).bit_rate as u64,
bandwidth: if bitrate == 0 {
// make up bitrate when unknown (copy streams)
// this is the bitrate as a raw decoded stream, it's not accurate at all
// It only serves the purpose of ordering the copy streams as having the highest bitrate
let pix_desc = av_pix_fmt_desc_get(transmute((*codec_par).format));
(*codec_par).width as u64
* (*codec_par).height as u64
* av_get_bits_per_pixel(pix_desc) as u64
} else {
bitrate
},
average_bandwidth: None,
codecs: self.to_codec_attr(),
resolution: Some(m3u8_rs::Resolution {
width: (*codec_par).width as _,
height: (*codec_par).height as _,
}),
frame_rate: Some(av_q2d((*codec_par).framerate)),
frame_rate: if fps > 0.0 { Some(fps) } else { None },
hdcp_level: None,
audio: None,
video: None,

View File

@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use crate::egress::hls::HlsEgress;
use crate::egress::recorder::RecorderEgress;
use crate::egress::{Egress, EgressResult};
use crate::egress::{Egress, EgressResult, EncoderOrSourceStream};
use crate::generator::FrameGenerator;
use crate::ingress::ConnectionInfo;
use crate::mux::SegmentType;
@ -101,9 +101,6 @@ pub struct PipelineRunner {
/// Encoder for a variant (variant_id, Encoder)
encoders: HashMap<Uuid, Encoder>,
/// Simple mapping to copy streams
copy_stream: HashMap<Uuid, Uuid>,
/// All configured egress'
egress: Vec<Box<dyn Egress>>,
@ -164,7 +161,6 @@ impl PipelineRunner {
scalers: Default::default(),
resampler: Default::default(),
encoders: Default::default(),
copy_stream: Default::default(),
fps_counter_start: Instant::now(),
egress: Vec::new(),
frame_ctr: 0,
@ -366,51 +362,65 @@ impl PipelineRunner {
// Process all packets (original or converted)
let mut egress_results = vec![];
// TODO: For copy streams, skip decoder
let frames = match self.decoder.decode_pkt(packet) {
Ok(f) => {
// Reset failure counter on successful decode
self.consecutive_decode_failures = 0;
f
}
Err(e) => {
self.consecutive_decode_failures += 1;
// Enhanced error logging with context
let packet_info = if !packet.is_null() {
format!(
"stream_idx={}, size={}, pts={}, dts={}",
(*packet).stream_index,
(*packet).size,
(*packet).pts,
(*packet).dts
)
} else {
"null packet".to_string()
};
warn!(
"Error decoding packet ({}): {}. Consecutive failures: {}/{}. Skipping packet.",
packet_info, e, self.consecutive_decode_failures, self.max_consecutive_failures
);
return self.handle_decode_failure(&config);
}
};
for (frame, stream_idx) in frames {
let stream = self.demuxer.get_stream(stream_idx as usize)?;
// Adjust frame pts time without start_offset
// Egress streams don't have a start time offset
if !stream.is_null() {
if (*stream).start_time != AV_NOPTS_VALUE {
(*frame).pts -= (*stream).start_time;
// only process via decoder if there is more than 1 encoder
if !self.encoders.is_empty() {
let frames = match self.decoder.decode_pkt(packet) {
Ok(f) => {
// Reset failure counter on successful decode
self.consecutive_decode_failures = 0;
f
}
(*frame).time_base = (*stream).time_base;
}
Err(e) => {
self.consecutive_decode_failures += 1;
let results = self.process_frame(&config, stream_idx as usize, frame)?;
egress_results.extend(results);
// Enhanced error logging with context
let packet_info = if !packet.is_null() {
format!(
"stream_idx={}, size={}, pts={}, dts={}",
(*packet).stream_index,
(*packet).size,
(*packet).pts,
(*packet).dts
)
} else {
"null packet".to_string()
};
warn!(
"Error decoding packet ({}): {}. Consecutive failures: {}/{}. Skipping packet.",
packet_info, e, self.consecutive_decode_failures, self.max_consecutive_failures
);
return self.handle_decode_failure(&config);
}
};
for (frame, stream_idx) in frames {
let stream = self.demuxer.get_stream(stream_idx as usize)?;
// Adjust frame pts time without start_offset
// Egress streams don't have a start time offset
if !stream.is_null() {
if (*stream).start_time != AV_NOPTS_VALUE {
(*frame).pts -= (*stream).start_time;
}
(*frame).time_base = (*stream).time_base;
}
let results = self.process_frame(&config, stream_idx as usize, frame)?;
egress_results.extend(results);
}
}
// egress (mux) copy variants
for var in config.variants {
match var {
VariantStream::CopyVideo(v) | VariantStream::CopyAudio(v)
if v.src_index == (*packet).stream_index as _ =>
{
egress_results.extend(Self::egress_packet(&mut self.egress, packet, &v.id())?);
}
_ => {}
}
}
Ok(egress_results)
@ -436,7 +446,6 @@ impl PipelineRunner {
let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) {
enc
} else {
warn!("Frame had nowhere to go in {} :/", var.id());
continue;
};
@ -512,7 +521,6 @@ impl PipelineRunner {
encoder: &mut Encoder,
frame: *mut AVFrame,
) -> Result<Vec<EgressResult>> {
let mut ret = vec![];
// before encoding frame, rescale timestamps
if !frame.is_null() {
let enc_ctx = encoder.codec_context();
@ -526,16 +534,25 @@ impl PipelineRunner {
}
let packets = encoder.encode_frame(frame)?;
// pass new packets to egress
for mut pkt in packets {
for eg in egress.iter_mut() {
let pkt_clone = av_packet_clone(pkt);
let er = eg.process_pkt(pkt_clone, &var.id())?;
ret.push(er);
}
av_packet_free(&mut pkt);
let mut ret = vec![];
for pkt in packets {
ret.extend(Self::egress_packet(egress, pkt, &var.id())?);
}
Ok(ret)
}
unsafe fn egress_packet(
egress: &mut Vec<Box<dyn Egress>>,
mut pkt: *mut AVPacket,
variant: &Uuid,
) -> Result<Vec<EgressResult>> {
let mut ret = vec![];
for eg in egress.iter_mut() {
let mut pkt_clone = av_packet_clone(pkt);
let er = eg.process_pkt(pkt_clone, variant)?;
av_packet_free(&mut pkt_clone);
ret.push(er);
}
Ok(ret)
}
@ -714,26 +731,33 @@ impl PipelineRunner {
}
}
// TODO: Setup copy streams
// Setup egress
for e in &cfg.egress {
let c = e.config();
let encoders = self.encoders.iter().filter_map(|(k, v)| {
if c.variants.contains(k) {
let var = cfg.variants.iter().find(|x| x.id() == *k)?;
Some((var, v))
let vars = c
.variants
.iter()
.map_while(|x| cfg.variants.iter().find(|z| z.id() == *x));
let variant_mapping = vars.map_while(|v| {
if let Some(e) = self.encoders.get(&v.id()) {
Some((v, EncoderOrSourceStream::Encoder(e)))
} else {
None
Some((
v,
EncoderOrSourceStream::SourceStream(unsafe {
self.demuxer.get_stream(v.src_index()).ok()?
}),
))
}
});
match e {
EgressType::HLS(_) => {
let hls = HlsEgress::new(self.out_dir.clone(), encoders, SegmentType::MPEGTS)?;
let hls =
HlsEgress::new(self.out_dir.clone(), variant_mapping, SegmentType::MPEGTS)?;
self.egress.push(Box::new(hls));
}
EgressType::Recorder(_) => {
let rec = RecorderEgress::new(self.out_dir.clone(), encoders)?;
let rec = RecorderEgress::new(self.out_dir.clone(), variant_mapping)?;
self.egress.push(Box::new(rec));
}
_ => warn!("{} is not implemented", e),
@ -756,7 +780,6 @@ impl Drop for PipelineRunner {
self.encoders.clear();
self.scalers.clear();
self.resampler.clear();
self.copy_stream.clear();
self.egress.clear();
info!(