Configurable encoder pipeline

This commit is contained in:
2024-03-20 22:46:19 +00:00
parent 13cb456f89
commit 529e3b6234
24 changed files with 1707 additions and 209 deletions

View File

@ -1,7 +1,6 @@
use crate::demux::Demuxer;
use tokio::sync::mpsc::UnboundedReceiver;
use crate::ingress::ConnectionInfo;
use crate::pipeline::runner::PipelineRunner;
use crate::pipeline::PipelineStep;
use crate::webhook::Webhook;
#[derive(Clone)]
@ -14,12 +13,8 @@ impl PipelineBuilder {
Self { webhook }
}
pub async fn build_for(&self, info: ConnectionInfo) -> Result<PipelineRunner, anyhow::Error> {
pub async fn build_for(&self, info: ConnectionInfo, recv: UnboundedReceiver<bytes::Bytes>) -> Result<PipelineRunner, anyhow::Error> {
let config = self.webhook.start(info).await?;
let mut steps: Vec<Box<dyn PipelineStep + Sync + Send>> = Vec::new();
steps.push(Box::new(Demuxer::new()));
Ok(PipelineRunner::new(steps))
Ok(PipelineRunner::new(config, recv))
}
}

View File

@ -1,42 +1,58 @@
use std::ops::{Deref, DerefMut};
use async_trait::async_trait;
use ffmpeg_sys_next::{av_packet_unref, AVPacket};
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use ffmpeg_sys_next::{AVFrame, AVPacket};
use serde::{Deserialize, Serialize};
use crate::demux::info::DemuxStreamInfo;
use crate::variant::VariantStream;
pub mod builder;
pub mod runner;
#[derive(Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EgressType {
HLS(HLSEgressConfig),
DASH,
WHEP,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HLSEgressConfig {
pub variants: Vec<VariantStream>,
/// FFMPEG stream mapping string
///
/// v:0,a:0 v:1,a:0, v:2,a:1 etc..
pub stream_map: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PipelineConfig {
pub id: uuid::Uuid,
pub recording: Vec<VariantStream>,
pub egress: Vec<EgressType>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PipelinePayload {
/// No output
Empty,
/// Skip this step
Skip,
/// Raw bytes from ingress
Bytes(bytes::Bytes),
/// FFMpeg AVPacket
AvPacket(*mut AVPacket),
/// FFMpeg AVFrame
AvFrame(),
AvFrame(*mut AVFrame),
/// Information about the input stream
SourceInfo(DemuxStreamInfo),
}
unsafe impl Send for PipelinePayload {}
unsafe impl Sync for PipelinePayload {}
impl Drop for PipelinePayload {
fn drop(&mut self) {
match self {
PipelinePayload::AvPacket(pkt) => unsafe {
av_packet_unref(*pkt);
},
_ => {}
}
}
}
#[async_trait]
pub trait PipelineStep {
fn name(&self) -> String;
async fn process(&mut self, pkg: PipelinePayload) -> Result<PipelinePayload, anyhow::Error>;
async fn process(&mut self, pkg: &PipelinePayload) -> Result<PipelinePayload, anyhow::Error>;
}

View File

@ -1,20 +1,112 @@
use crate::pipeline::{PipelinePayload, PipelineStep};
use crate::decode::Decoder;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::demux::Demuxer;
use crate::egress::hls::HlsEgress;
use crate::encode::Encoder;
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineStep};
use crate::scale::Scaler;
use crate::variant::VariantStream;
use anyhow::Error;
use log::info;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
struct ScalerEncoder {
pub scaler: Scaler,
pub encoder: Encoder<UnboundedReceiver<PipelinePayload>>,
}
pub struct PipelineRunner {
steps: Vec<Box<dyn PipelineStep + Sync + Send>>,
config: PipelineConfig,
demuxer: Demuxer,
decoder: Decoder,
decoder_output: broadcast::Receiver<PipelinePayload>,
scalers: Vec<ScalerEncoder>,
encoders: Vec<Encoder<broadcast::Receiver<PipelinePayload>>>,
egress: Vec<HlsEgress>,
}
impl PipelineRunner {
pub fn new(steps: Vec<Box<dyn PipelineStep + Sync + Send>>) -> Self {
Self { steps }
pub fn new(config: PipelineConfig, recv: UnboundedReceiver<bytes::Bytes>) -> Self {
let (demux_out, demux_in) = unbounded_channel();
let (dec_tx, dec_rx) = broadcast::channel::<PipelinePayload>(32);
Self {
config,
demuxer: Demuxer::new(recv, demux_out),
decoder: Decoder::new(demux_in, dec_tx),
decoder_output: dec_rx,
scalers: vec![],
encoders: vec![],
egress: vec![],
}
}
pub async fn push(&mut self, bytes: bytes::Bytes) -> Result<(), anyhow::Error> {
let mut output = PipelinePayload::Bytes(bytes);
for step in &mut self.steps {
match step.process(output).await? {
Some(pkg) => output = pkg,
None => break,
pub fn run(&mut self) -> Result<(), Error> {
if let Some(cfg) = self.demuxer.process()? {
self.configure_pipeline(cfg)?;
}
self.decoder.process()?;
for sw in &mut self.scalers {
sw.scaler.process()?;
sw.encoder.process()?;
for eg in &mut self.egress {
eg.process()?;
}
}
Ok(())
}
fn configure_pipeline(&mut self, info: DemuxStreamInfo) -> Result<(), Error> {
// configure scalers
if self.scalers.len() != 0 {
return Err(Error::msg("Pipeline already configured!"));
}
info!("Configuring pipeline {:?}", info);
let video_stream = info
.channels
.iter()
.find(|s| s.channel_type == StreamChannelType::Video);
if let Some(ref vs) = video_stream {
for eg in &self.config.egress {
match eg {
EgressType::HLS(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress
.push(HlsEgress::new(egress_rx, self.config.id, cfg.clone()));
for v in &cfg.variants {
let (var_tx, var_rx) = unbounded_channel();
match v {
VariantStream::Video(vs) => {
self.scalers.push(ScalerEncoder {
scaler: Scaler::new(
self.decoder_output.resubscribe(),
var_tx.clone(),
vs.clone(),
),
encoder: Encoder::new(var_rx, egress_tx.clone(), v.clone()),
});
}
VariantStream::Audio(_) => {
self.encoders.push(Encoder::new(
self.decoder_output.resubscribe(),
egress_tx.clone(),
v.clone(),
));
}
c => {
return Err(Error::msg(format!(
"Variant config not supported {:?}",
c
)))
}
}
}
}
_ => return Err(Error::msg("Egress config not supported")),
}
}
}
Ok(())