mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-18 20:37:11 +00:00
feat: overseer
This commit is contained in:
@ -2,34 +2,41 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::io::Read;
|
||||
use std::mem::transmute;
|
||||
use std::ops::Sub;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::egress::hls::HlsEgress;
|
||||
use crate::egress::recorder::RecorderEgress;
|
||||
use crate::egress::Egress;
|
||||
use crate::ingress::ConnectionInfo;
|
||||
use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
|
||||
use crate::pipeline::{EgressType, PipelineConfig};
|
||||
use crate::variant::{StreamMapping, VariantStream};
|
||||
use crate::webhook::Webhook;
|
||||
use anyhow::Result;
|
||||
use anyhow::{bail, Result};
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
||||
av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q,
|
||||
};
|
||||
use ffmpeg_rs_raw::{
|
||||
cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
|
||||
cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, StreamType,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use log::{info, warn};
|
||||
use tokio::runtime::Handle;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Pipeline runner is the main entry process for stream transcoding
|
||||
///
|
||||
/// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread
|
||||
/// using [ingress::spawn_pipeline]
|
||||
/// using [crate::ingress::spawn_pipeline]
|
||||
pub struct PipelineRunner {
|
||||
/// Async runtime handle
|
||||
handle: Handle,
|
||||
|
||||
/// Input stream connection info
|
||||
connection: ConnectionInfo,
|
||||
|
||||
/// Configuration for this pipeline (variants, egress config etc.)
|
||||
config: PipelineConfig,
|
||||
config: Option<PipelineConfig>,
|
||||
|
||||
/// Singleton demuxer for this input
|
||||
demuxer: Demuxer,
|
||||
@ -54,18 +61,24 @@ pub struct PipelineRunner {
|
||||
|
||||
fps_counter_start: Instant,
|
||||
frame_ctr: u64,
|
||||
webhook: Webhook,
|
||||
|
||||
info: Option<DemuxerInfo>,
|
||||
/// Info about the input stream
|
||||
info: Option<IngressInfo>,
|
||||
|
||||
/// Overseer managing this pipeline
|
||||
overseer: Arc<dyn Overseer>,
|
||||
}
|
||||
|
||||
impl PipelineRunner {
|
||||
pub fn new(
|
||||
handle: Handle,
|
||||
overseer: Arc<dyn Overseer>,
|
||||
connection: ConnectionInfo,
|
||||
webhook: Webhook,
|
||||
recv: Box<dyn Read + Send>,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
handle,
|
||||
overseer,
|
||||
connection,
|
||||
config: Default::default(),
|
||||
demuxer: Demuxer::new_custom_io(recv, None)?,
|
||||
@ -77,7 +90,6 @@ impl PipelineRunner {
|
||||
fps_counter_start: Instant::now(),
|
||||
egress: Vec::new(),
|
||||
frame_ctr: 0,
|
||||
webhook,
|
||||
info: None,
|
||||
})
|
||||
}
|
||||
@ -86,6 +98,12 @@ impl PipelineRunner {
|
||||
pub unsafe fn run(&mut self) -> Result<()> {
|
||||
self.setup()?;
|
||||
|
||||
let config = if let Some(ref config) = self.config {
|
||||
config
|
||||
} else {
|
||||
bail!("Pipeline not configured, cannot run")
|
||||
};
|
||||
|
||||
// run transcoder pipeline
|
||||
let (mut pkt, stream) = self.demuxer.get_packet()?;
|
||||
let src_index = (*stream).index;
|
||||
@ -106,8 +124,7 @@ impl PipelineRunner {
|
||||
(*frame).time_base = (*stream).time_base;
|
||||
|
||||
// Get the variants which want this pkt
|
||||
let pkt_vars = self
|
||||
.config
|
||||
let pkt_vars = config
|
||||
.variants
|
||||
.iter()
|
||||
.filter(|v| v.src_index() == src_index as usize);
|
||||
@ -188,14 +205,49 @@ impl PipelineRunner {
|
||||
}
|
||||
|
||||
let info = self.demuxer.probe_input()?;
|
||||
|
||||
// convert to internal type
|
||||
let i_info = IngressInfo {
|
||||
bitrate: info.bitrate,
|
||||
streams: info
|
||||
.streams
|
||||
.iter()
|
||||
.map(|s| IngressStream {
|
||||
index: s.index,
|
||||
stream_type: match s.stream_type {
|
||||
StreamType::Video => IngressStreamType::Video,
|
||||
StreamType::Audio => IngressStreamType::Audio,
|
||||
StreamType::Subtitle => IngressStreamType::Subtitle,
|
||||
},
|
||||
codec: s.codec,
|
||||
format: s.format,
|
||||
width: s.width,
|
||||
height: s.height,
|
||||
fps: s.fps,
|
||||
sample_rate: s.sample_rate,
|
||||
language: s.language.clone(),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
let cfg = self.handle.block_on(async {
|
||||
self.overseer
|
||||
.configure_pipeline(&self.connection, &i_info)
|
||||
.await
|
||||
})?;
|
||||
self.config = Some(cfg);
|
||||
self.info = Some(i_info);
|
||||
|
||||
self.setup_pipeline(&info)?;
|
||||
self.info = Some(info);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
unsafe fn setup_pipeline(&mut self, info: &DemuxerInfo) -> Result<()> {
|
||||
let cfg = self.webhook.start(info);
|
||||
self.config = cfg.clone();
|
||||
unsafe fn setup_pipeline(&mut self, demux_info: &DemuxerInfo) -> Result<()> {
|
||||
let cfg = if let Some(ref cfg) = self.config {
|
||||
cfg
|
||||
} else {
|
||||
bail!("Cannot setup pipeline without config");
|
||||
};
|
||||
|
||||
// src stream indexes
|
||||
let inputs: HashSet<usize> = cfg.variants.iter().map(|e| e.src_index()).collect();
|
||||
@ -205,7 +257,11 @@ impl PipelineRunner {
|
||||
|
||||
// setup decoders
|
||||
for input_idx in inputs {
|
||||
let stream = info.streams.iter().find(|f| f.index == input_idx).unwrap();
|
||||
let stream = demux_info
|
||||
.streams
|
||||
.iter()
|
||||
.find(|f| f.index == input_idx)
|
||||
.unwrap();
|
||||
self.decoder.setup_decoder(stream, None)?;
|
||||
}
|
||||
|
||||
@ -219,7 +275,7 @@ impl PipelineRunner {
|
||||
VariantStream::Audio(a) => {
|
||||
let enc = a.try_into()?;
|
||||
let rs = Resample::new(
|
||||
av_get_sample_fmt(cstr!(a.sample_fmt.as_bytes())),
|
||||
av_get_sample_fmt(cstr!(a.sample_fmt.as_str())),
|
||||
a.sample_rate as _,
|
||||
a.channels as _,
|
||||
);
|
||||
@ -230,10 +286,10 @@ impl PipelineRunner {
|
||||
}
|
||||
}
|
||||
|
||||
// Setup copy streams
|
||||
// TODO: Setup copy streams
|
||||
|
||||
// Setup egress
|
||||
for e in cfg.egress {
|
||||
for e in &cfg.egress {
|
||||
match e {
|
||||
EgressType::HLS(ref c) => {
|
||||
let encoders = self.encoders.iter().filter_map(|(k, v)| {
|
||||
@ -252,7 +308,7 @@ impl PipelineRunner {
|
||||
let encoders = self
|
||||
.encoders
|
||||
.iter()
|
||||
.filter(|(k, v)| c.variants.contains(k))
|
||||
.filter(|(k, _v)| c.variants.contains(k))
|
||||
.map(|(_, v)| v);
|
||||
let rec = RecorderEgress::new(c.clone(), encoders)?;
|
||||
self.egress.push(Box::new(rec));
|
||||
|
Reference in New Issue
Block a user