Refactor pipeline

This commit is contained in:
2024-09-03 14:09:30 +01:00
parent 65d8964632
commit 2c7d2dc9d1
27 changed files with 1465 additions and 1349 deletions

View File

@ -3,56 +3,69 @@ use std::time::Instant;
use anyhow::Error;
use log::info;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::UnboundedReceiver;
use uuid::Uuid;
use crate::decode::Decoder;
use crate::demux::info::DemuxerInfo;
use crate::demux::Demuxer;
use crate::demux::info::DemuxStreamInfo;
use crate::egress::EgressConfig;
use crate::egress::hls::HlsEgress;
use crate::egress::recorder::RecorderEgress;
use crate::encode::audio::AudioEncoder;
use crate::encode::video::VideoEncoder;
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineProcessor};
use crate::ingress::ConnectionInfo;
use crate::pipeline::{
AVPacketSource, EgressType, PipelineConfig, PipelinePayload, PipelineProcessor,
};
use crate::scale::Scaler;
use crate::tag_frame::TagFrame;
use crate::variant::VariantStream;
use crate::variant::{StreamMapping, VariantStream};
use crate::webhook::Webhook;
struct PipelineChain {
pub first: Box<dyn PipelineProcessor + Sync + Send>,
pub second: Box<dyn PipelineProcessor + Sync + Send>,
type BoxedProcessor = Box<dyn PipelineProcessor + Sync + Send>;
/// Resample/Encode
struct Transcoder {
pub variant: Uuid,
/// A resampler which can take decoded sames (Audio or Video)
pub sampler: Option<BoxedProcessor>,
/// The encoder which will encode the resampled frames
pub encoder: BoxedProcessor,
}
///
/// |----------------------------------------------------|
/// | Demuxer
pub struct PipelineRunner {
config: PipelineConfig,
info: ConnectionInfo,
demuxer: Demuxer,
decoder: Decoder,
decoder_output: broadcast::Receiver<PipelinePayload>,
encoders: Vec<PipelineChain>,
egress: Vec<Box<dyn PipelineProcessor + Sync + Send>>,
transcoders: Vec<Transcoder>,
muxers: Vec<BoxedProcessor>,
started: Instant,
frame_no: u64,
stream_info: Option<DemuxStreamInfo>,
stream_info: Option<DemuxerInfo>,
webhook: Webhook,
}
impl PipelineRunner {
pub fn new(
config: PipelineConfig,
info: ConnectionInfo,
webhook: Webhook,
recv: UnboundedReceiver<bytes::Bytes>,
) -> Self {
let (demux_out, demux_in) = unbounded_channel();
let (dec_tx, dec_rx) = broadcast::channel::<PipelinePayload>(32);
Self {
config,
demuxer: Demuxer::new(recv, demux_out),
decoder: Decoder::new(demux_in, dec_tx),
decoder_output: dec_rx,
encoders: vec![],
egress: vec![],
info,
demuxer: Demuxer::new(recv),
decoder: Decoder::new(),
transcoders: vec![],
muxers: vec![],
started: Instant::now(),
frame_no: 0,
stream_info: None,
@ -61,21 +74,70 @@ impl PipelineRunner {
}
pub fn run(&mut self) -> Result<(), Error> {
if let Some(cfg) = self.demuxer.process()? {
self.configure_pipeline(cfg)?;
}
let frames = self.decoder.process()?;
self.frame_no += frames as u64;
// (scalar)-encoder chains
for sw in &mut self.encoders {
sw.first.process()?;
sw.second.process()?;
if self.stream_info.is_none() {
if let Some(cfg) = self.demuxer.try_probe()? {
self.configure_pipeline(&cfg)?;
for mux in &mut self.muxers {
mux.process(PipelinePayload::SourceInfo(cfg.clone()))?;
}
self.stream_info = Some(cfg);
} else {
return Ok(());
}
}
// egress outputs
for eg in &mut self.egress {
eg.process()?;
let demux_pkg = unsafe { self.demuxer.get_packet() }?;
let src_index = if let PipelinePayload::AvPacket(_, s) = &demux_pkg {
if let AVPacketSource::Demuxer(s) = s {
unsafe { (*(*s)).index }
} else {
-1
}
} else {
-1
};
let pkg_variant = self.config.variants.iter().find(|v| match v {
VariantStream::Video(vx) => vx.src_index() as i32 == src_index,
VariantStream::Audio(ax) => ax.src_index() as i32 == src_index,
_ => false,
});
let transcoded_pkgs = if let Some(var) = pkg_variant {
let frames = self.decoder.process(demux_pkg.clone())?;
if let VariantStream::Video(_) = var {
self.frame_no += frames.len() as u64;
//TODO: Account for multiple video streams in
}
let mut pkgs = Vec::new();
for frame in &frames {
for tran in &mut self.transcoders {
let frames = if let Some(ref mut smp) = tran.sampler {
smp.process(frame.clone())?
} else {
vec![frame.clone()]
};
for frame in frames {
for pkg in tran.encoder.process(frame)? {
pkgs.push(pkg);
}
}
}
}
pkgs
} else {
vec![]
};
// mux
for pkg in transcoded_pkgs {
for ref mut mux in &mut self.muxers {
mux.process(pkg.clone())?;
}
}
for ref mut mux in &mut self.muxers {
mux.process(demux_pkg.clone())?;
}
let elapsed = Instant::now().sub(self.started).as_secs_f32();
@ -87,87 +149,74 @@ impl PipelineRunner {
Ok(())
}
fn configure_pipeline(&mut self, info: DemuxStreamInfo) -> Result<(), Error> {
if self.stream_info.is_some() {
return Err(Error::msg("Pipeline already configured!"));
}
self.stream_info = Some(info.clone());
/// Setup pipeline based on the demuxer info
fn configure_pipeline(&mut self, info: &DemuxerInfo) -> Result<(), Error> {
// re-configure with demuxer info
self.config = self.webhook.configure(&info);
self.config = self.webhook.start(info);
info!("Configuring pipeline {}", self.config);
info!(
"Livestream url: http://localhost:8080/{}/live.m3u8",
self.config.id
);
if self.config.egress.iter().any(|x| match x {
EgressType::HLS(_) => true,
_ => false,
}) {
info!(
"Livestream url: http://localhost:8080/{}/live.m3u8",
self.config.id
);
}
for eg in &self.config.egress {
match eg {
EgressType::HLS(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress.push(Box::new(HlsEgress::new(
egress_rx,
self.config.id,
cfg.clone(),
)));
for x in self.add_egress_variants(cfg, egress_tx) {
self.encoders.push(x);
}
// configure transcoders
for var in &self.config.variants {
match var {
VariantStream::Video(v) => {
let scaler = Scaler::new(v.clone());
let encoder = VideoEncoder::new(v.clone());
self.transcoders.push(Transcoder {
variant: v.id(),
sampler: Some(Box::new(scaler)),
encoder: Box::new(encoder),
});
}
EgressType::Recorder(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress.push(Box::new(RecorderEgress::new(
egress_rx,
self.config.id,
cfg.clone(),
)));
for x in self.add_egress_variants(cfg, egress_tx) {
self.encoders.push(x);
}
VariantStream::Audio(a) => {
let encoder = AudioEncoder::new(a.clone());
self.transcoders.push(Transcoder {
variant: a.id(),
sampler: None,
encoder: Box::new(encoder),
});
}
_ => {
//ignored
}
_ => return Err(Error::msg("Egress config not supported")),
}
}
if self.egress.is_empty() {
// configure muxers
for mux in &self.config.egress {
match mux {
EgressType::HLS(c) => {
let mut hls =
HlsEgress::new(Uuid::new_v4(), c.clone(), self.config.variants.clone());
hls.setup_muxer()?;
self.muxers.push(Box::new(hls));
}
EgressType::Recorder(c) => {
let recorder = RecorderEgress::new(
Uuid::new_v4(),
c.clone(),
self.config.variants.clone(),
);
self.muxers.push(Box::new(recorder));
}
EgressType::RTMPForwarder(c) => {
todo!("Implement this")
}
}
}
if self.muxers.is_empty() {
Err(Error::msg("No egress config, pipeline misconfigured!"))
} else {
Ok(())
}
}
fn add_egress_variants(
&self,
cfg: &EgressConfig,
egress_tx: UnboundedSender<PipelinePayload>,
) -> Vec<PipelineChain> {
let mut ret = vec![];
for v in &cfg.variants {
match v {
VariantStream::Video(vs) => {
let (sw_tx, sw_rx) = unbounded_channel();
ret.push(PipelineChain {
first: Box::new(Scaler::new(
self.decoder_output.resubscribe(),
sw_tx.clone(),
vs.clone(),
)),
second: Box::new(VideoEncoder::new(sw_rx, egress_tx.clone(), vs.clone())),
});
}
VariantStream::Audio(va) => {
let (tag_tx, tag_rx) = unbounded_channel();
ret.push(PipelineChain {
first: Box::new(TagFrame::new(
v.clone(),
self.decoder_output.resubscribe(),
tag_tx,
)),
second: Box::new(AudioEncoder::new(tag_rx, egress_tx.clone(), va.clone())),
});
}
}
}
ret
}
}