something something something

This commit is contained in:
2024-04-02 17:14:52 +01:00
parent 9c4969cf95
commit 8ed71bd48b
14 changed files with 700 additions and 352 deletions

View File

@ -4,12 +4,14 @@ use std::time::{Duration, Instant};
use anyhow::Error;
use log::{info, warn};
use tokio::sync::broadcast;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::decode::Decoder;
use crate::demux::Demuxer;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::egress::EgressConfig;
use crate::egress::hls::HlsEgress;
use crate::egress::mpegts::MPEGTSEgress;
use crate::encode::audio::AudioEncoder;
use crate::encode::video::VideoEncoder;
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineProcessor};
@ -29,7 +31,7 @@ pub struct PipelineRunner {
decoder: Decoder,
decoder_output: broadcast::Receiver<PipelinePayload>,
encoders: Vec<PipelineChain>,
egress: Vec<HlsEgress>,
egress: Vec<Box<dyn PipelineProcessor + Sync + Send>>,
started: Instant,
frame_no: u64,
stream_info: Option<DemuxStreamInfo>,
@ -106,57 +108,36 @@ impl PipelineRunner {
// re-configure with demuxer info
self.config = self.webhook.configure(&info);
info!("Configuring pipeline {}", self.config);
info!(
"Livestream url: http://localhost:8080/{}/live.m3u8",
self.config.id
);
let video_stream = info
.channels
.iter()
.find(|s| s.channel_type == StreamChannelType::Video);
if let Some(_vs) = video_stream {
for eg in &self.config.egress {
match eg {
EgressType::HLS(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress
.push(HlsEgress::new(egress_rx, self.config.id, cfg.clone()));
for v in &cfg.variants {
match v {
VariantStream::Video(vs) => {
let (sw_tx, sw_rx) = unbounded_channel();
self.encoders.push(PipelineChain {
first: Box::new(Scaler::new(
self.decoder_output.resubscribe(),
sw_tx.clone(),
vs.clone(),
)),
second: Box::new(VideoEncoder::new(
sw_rx,
egress_tx.clone(),
vs.clone(),
)),
});
}
VariantStream::Audio(va) => {
let (tag_tx, tag_rx) = unbounded_channel();
self.encoders.push(PipelineChain {
first: Box::new(TagFrame::new(
v.clone(),
self.decoder_output.resubscribe(),
tag_tx,
)),
second: Box::new(AudioEncoder::new(
tag_rx,
egress_tx.clone(),
va.clone(),
)),
});
}
}
}
for eg in &self.config.egress {
match eg {
EgressType::HLS(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress.push(Box::new(HlsEgress::new(
egress_rx,
self.config.id,
cfg.clone(),
)));
for x in self.add_egress_variants(cfg, egress_tx) {
self.encoders.push(x);
}
_ => return Err(Error::msg("Egress config not supported")),
}
EgressType::MPEGTS(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress.push(Box::new(MPEGTSEgress::new(
egress_rx,
self.config.id,
cfg.clone(),
)));
for x in self.add_egress_variants(cfg, egress_tx) {
self.encoders.push(x);
}
}
_ => return Err(Error::msg("Egress config not supported")),
}
}
@ -166,4 +147,39 @@ impl PipelineRunner {
Ok(())
}
}
fn add_egress_variants(
&self,
cfg: &EgressConfig,
egress_tx: UnboundedSender<PipelinePayload>,
) -> Vec<PipelineChain> {
let mut ret = vec![];
for v in &cfg.variants {
match v {
VariantStream::Video(vs) => {
let (sw_tx, sw_rx) = unbounded_channel();
ret.push(PipelineChain {
first: Box::new(Scaler::new(
self.decoder_output.resubscribe(),
sw_tx.clone(),
vs.clone(),
)),
second: Box::new(VideoEncoder::new(sw_rx, egress_tx.clone(), vs.clone())),
});
}
VariantStream::Audio(va) => {
let (tag_tx, tag_rx) = unbounded_channel();
ret.push(PipelineChain {
first: Box::new(TagFrame::new(
v.clone(),
self.decoder_output.resubscribe(),
tag_tx,
)),
second: Box::new(AudioEncoder::new(tag_rx, egress_tx.clone(), va.clone())),
});
}
}
}
ret
}
}