feat: upgrade to ffmpeg-rs-raw (WIP)

This commit is contained in:
2024-11-13 11:43:28 +00:00
parent 96e4a38e09
commit 6f618ef58f
27 changed files with 712 additions and 2372 deletions

View File

@ -1,222 +1,227 @@
use std::collections::{HashMap, HashSet};
use std::io::Read;
use std::mem::transmute;
use std::ops::Sub;
use std::time::Instant;
use anyhow::Error;
use log::info;
use tokio::sync::mpsc::UnboundedReceiver;
use uuid::Uuid;
use crate::decode::Decoder;
use crate::demux::info::DemuxerInfo;
use crate::demux::Demuxer;
use crate::egress::hls::HlsEgress;
use crate::egress::recorder::RecorderEgress;
use crate::encode::audio::AudioEncoder;
use crate::encode::video::VideoEncoder;
use crate::egress::Egress;
use crate::ingress::ConnectionInfo;
use crate::pipeline::{
AVPacketSource, EgressType, PipelineConfig, PipelinePayload, PipelineProcessor,
};
use crate::scale::Scaler;
use crate::pipeline::{EgressType, PipelineConfig};
use crate::variant::{StreamMapping, VariantStream};
use crate::webhook::Webhook;
use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_get_sample_fmt, av_packet_free};
use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
};
use itertools::Itertools;
use log::{info, warn};
use uuid::Uuid;
type BoxedProcessor = Box<dyn PipelineProcessor + Sync + Send>;
/// Resample/Encode
struct Transcoder {
pub variant: Uuid,
/// A resampler which can take decoded sames (Audio or Video)
pub sampler: Option<BoxedProcessor>,
/// The encoder which will encode the resampled frames
pub encoder: BoxedProcessor,
}
///
/// |----------------------------------------------------|
/// | Demuxer
/// Pipeline runner is the main entry process for stream transcoding
/// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread
/// using [ingress::spawn_pipeline]
pub struct PipelineRunner {
connection: ConnectionInfo,
/// Configuration for this pipeline (variants, egress config etc.)
config: PipelineConfig,
info: ConnectionInfo,
/// Singleton demuxer for this input
demuxer: Demuxer,
decoder: Decoder,
transcoders: Vec<Transcoder>,
muxers: Vec<BoxedProcessor>,
started: Instant,
frame_no: u64,
stream_info: Option<DemuxerInfo>,
/// Singleton decoder for all stream
decoder: Decoder,
/// Scaler for a variant (variant_id, Scaler)
scalers: HashMap<Uuid, Scaler>,
/// Resampler for a variant (variant_id, Resample)
resampler: HashMap<Uuid, Resample>,
/// Encoder for a variant (variant_id, Encoder)
encoders: HashMap<Uuid, Encoder>,
/// Simple mapping to copy streams
copy_stream: HashMap<Uuid, Uuid>,
/// All configured egress'
egress: Vec<Box<dyn Egress>>,
fps_counter_start: Instant,
frame_ctr: u64,
webhook: Webhook,
info: Option<DemuxerInfo>,
}
impl PipelineRunner {
pub fn new(
config: PipelineConfig,
info: ConnectionInfo,
connection: ConnectionInfo,
webhook: Webhook,
recv: UnboundedReceiver<bytes::Bytes>,
) -> Self {
Self {
config,
info,
demuxer: Demuxer::new(recv),
recv: Box<dyn Read + Send>,
) -> Result<Self> {
Ok(Self {
connection,
config: Default::default(),
demuxer: Demuxer::new_custom_io(recv, None)?,
decoder: Decoder::new(),
transcoders: vec![],
muxers: vec![],
started: Instant::now(),
frame_no: 0,
stream_info: None,
scalers: Default::default(),
resampler: Default::default(),
encoders: Default::default(),
copy_stream: Default::default(),
fps_counter_start: Instant::now(),
egress: Vec::new(),
frame_ctr: 0,
webhook,
}
info: None,
})
}
pub fn run(&mut self) -> Result<(), Error> {
if self.stream_info.is_none() {
if let Some(cfg) = self.demuxer.try_probe()? {
self.configure_pipeline(&cfg)?;
for mux in &mut self.muxers {
mux.process(PipelinePayload::SourceInfo(cfg.clone()))?;
}
self.stream_info = Some(cfg);
} else {
return Ok(());
}
}
/// Main processor, should be called in a loop
pub unsafe fn run(&mut self) -> Result<()> {
self.setup()?;
let demux_pkg = unsafe { self.demuxer.get_packet() }?;
// run transcoder pipeline
let (mut pkt, stream) = self.demuxer.get_packet()?;
let src_index = (*stream).index;
let src_index = if let PipelinePayload::AvPacket(_, s) = &demux_pkg {
if let AVPacketSource::Demuxer(s) = s {
unsafe { (*(*s)).index }
} else {
-1
}
} else {
-1
};
let pkg_variant = self.config.variants.iter().find(|v| match v {
VariantStream::Video(vx) => vx.src_index() as i32 == src_index,
VariantStream::Audio(ax) => ax.src_index() as i32 == src_index,
_ => false,
});
let transcoded_pkgs = if let Some(var) = pkg_variant {
let frames = self.decoder.process(demux_pkg.clone())?;
if let VariantStream::Video(_) = var {
self.frame_no += frames.len() as u64;
//TODO: Account for multiple video streams in
}
// TODO: For copy streams, skip decoder
for frame in self.decoder.decode_pkt(pkt)? {
self.frame_ctr += 1;
let mut pkgs = Vec::new();
for frame in &frames {
for tran in &mut self.transcoders {
let frames = if let Some(ref mut smp) = tran.sampler {
smp.process(frame.clone())?
} else {
vec![frame.clone()]
};
// Copy frame from GPU if using hwaccel decoding
let frame = get_frame_from_hw(frame)?;
for frame in frames {
for pkg in tran.encoder.process(frame)? {
pkgs.push(pkg);
/// Get the variants which want this pkt
let pkt_vars = self
.config
.variants
.iter()
.filter(|v| v.src_index() == src_index as usize);
for var in pkt_vars {
let frame = match var {
VariantStream::Video(v) => {
if let Some(s) = self.scalers.get_mut(&v.id()) {
s.process_frame(frame, v.width, v.height, transmute(v.pixel_format))?
} else {
frame
}
}
VariantStream::Audio(a) => {
if let Some(r) = self.resampler.get_mut(&a.id()) {
r.process_frame(frame)?
} else {
frame
}
}
_ => frame,
};
let packets = if let Some(enc) = self.encoders.get_mut(&var.id()) {
enc.encode_frame(frame)?
} else {
//warn!("Frame had nowhere to go in {} :/", var.id());
continue;
};
// pass new packets to egress
for eg in self.egress.iter_mut() {
for pkt in packets.iter() {
eg.process_pkt(*pkt, &var.id())?;
}
}
}
pkgs
} else {
vec![]
};
// mux
for pkg in transcoded_pkgs {
for ref mut mux in &mut self.muxers {
mux.process(pkg.clone())?;
}
}
for ref mut mux in &mut self.muxers {
mux.process(demux_pkg.clone())?;
}
let elapsed = Instant::now().sub(self.started).as_secs_f32();
av_packet_free(&mut pkt);
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
if elapsed >= 2f32 {
info!("Average fps: {:.2}", self.frame_no as f32 / elapsed);
self.started = Instant::now();
self.frame_no = 0;
info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed);
self.fps_counter_start = Instant::now();
self.frame_ctr = 0;
}
Ok(())
}
/// Setup pipeline based on the demuxer info
fn configure_pipeline(&mut self, info: &DemuxerInfo) -> Result<(), Error> {
// re-configure with demuxer info
self.config = self.webhook.start(info);
info!("Configuring pipeline {}", self.config);
if self.config.egress.iter().any(|x| match x {
EgressType::HLS(_) => true,
_ => false,
}) {
info!(
"Livestream url: http://localhost:8080/{}/live.m3u8",
self.config.id
);
unsafe fn setup(&mut self) -> Result<()> {
if self.info.is_some() {
return Ok(());
}
// configure transcoders
for var in &self.config.variants {
match var {
let info = self.demuxer.probe_input()?;
self.setup_pipeline(&info)?;
self.info = Some(info);
Ok(())
}
unsafe fn setup_pipeline(&mut self, info: &DemuxerInfo) -> Result<()> {
let cfg = self.webhook.start(info);
self.config = cfg.clone();
// src stream indexes
let inputs: HashSet<usize> = cfg.variants.iter().map(|e| e.src_index()).collect();
// enable hardware decoding
self.decoder.enable_hw_decoder_any();
// setup decoders
for input_idx in inputs {
let stream = info.streams.iter().find(|f| f.index == input_idx).unwrap();
self.decoder.setup_decoder(stream, None)?;
}
// setup scaler/encoders
for out_stream in &cfg.variants {
match out_stream {
VariantStream::Video(v) => {
let scaler = Scaler::new(v.clone());
let encoder = VideoEncoder::new(v.clone());
self.transcoders.push(Transcoder {
variant: v.id(),
sampler: Some(Box::new(scaler)),
encoder: Box::new(encoder),
});
self.encoders.insert(out_stream.id(), v.try_into()?);
self.scalers.insert(out_stream.id(), Scaler::new());
}
VariantStream::Audio(a) => {
let encoder = AudioEncoder::new(a.clone());
self.transcoders.push(Transcoder {
variant: a.id(),
sampler: None,
encoder: Box::new(encoder),
});
}
_ => {
//ignored
}
}
}
// configure muxers
for mux in &self.config.egress {
match mux {
EgressType::HLS(c) => {
let mut hls =
HlsEgress::new(Uuid::new_v4(), c.clone(), self.config.variants.clone());
hls.setup_muxer()?;
self.muxers.push(Box::new(hls));
}
EgressType::Recorder(c) => {
let recorder = RecorderEgress::new(
Uuid::new_v4(),
c.clone(),
self.config.variants.clone(),
let enc = a.try_into()?;
let rs = Resample::new(
av_get_sample_fmt(cstr!(&a.sample_fmt)),
a.sample_rate as _,
a.channels as _,
);
self.muxers.push(Box::new(recorder));
}
EgressType::RTMPForwarder(c) => {
todo!("Implement this")
self.resampler.insert(out_stream.id(), rs);
self.encoders.insert(out_stream.id(), enc);
}
_ => continue,
}
}
if self.muxers.is_empty() {
Err(Error::msg("No egress config, pipeline misconfigured!"))
} else {
Ok(())
// Setup copy streams
// Setup egress
for e in cfg.egress {
match e {
EgressType::HLS(ref c) => {
let encoders = self
.encoders
.iter()
.filter(|(k, v)| c.variants.contains(k))
.map(|(_, v)| v);
let hls = HlsEgress::new(c.clone(), encoders)?;
self.egress.push(Box::new(hls));
}
EgressType::Recorder(ref c) => {
let encoders = self
.encoders
.iter()
.filter(|(k, v)| c.variants.contains(k))
.map(|(_, v)| v);
let rec = RecorderEgress::new(c.clone(), encoders)?;
self.egress.push(Box::new(rec));
}
_ => warn!("{} is not implemented", e),
}
}
Ok(())
}
}