fix packet timestamps
This commit is contained in:
parent
9a086d80c1
commit
2a82e2c00b
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,2 +1,3 @@
|
||||
/target
|
||||
.idea/
|
||||
.idea/
|
||||
out/
|
620
Cargo.lock
generated
620
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
1
Cargo.toml
Executable file → Normal file
1
Cargo.toml
Executable file → Normal file
@ -20,3 +20,4 @@ uuid = { version = "1.8.0", features = ["v4", "serde"] }
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
config = { version = "0.14.0", features = ["toml"] }
|
||||
url = "2.5.0"
|
||||
itertools = "0.12.1"
|
||||
|
@ -7,6 +7,7 @@ use ffmpeg_sys_next::{
|
||||
avcodec_find_decoder, avcodec_free_context, avcodec_open2, avcodec_parameters_to_context,
|
||||
avcodec_receive_frame, avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket, AVStream,
|
||||
};
|
||||
use ffmpeg_sys_next::AVPictureType::{AV_PICTURE_TYPE_I, AV_PICTURE_TYPE_NONE};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
|
||||
@ -99,9 +100,8 @@ impl Decoder {
|
||||
}
|
||||
return Err(Error::msg(format!("Failed to decode {}", ret)));
|
||||
}
|
||||
(*frame).time_base = (*pkt).time_base;
|
||||
(*frame).opaque = stream as *mut libc::c_void;
|
||||
self.chan_out.send(PipelinePayload::AvFrame(frame))?;
|
||||
(*frame).time_base = (*stream).time_base;
|
||||
self.chan_out.send(PipelinePayload::AvFrame("Decoder frame".to_owned(), frame))?;
|
||||
frames += 1;
|
||||
}
|
||||
return Ok(frames);
|
||||
@ -111,7 +111,7 @@ impl Decoder {
|
||||
|
||||
pub fn process(&mut self) -> Result<usize, Error> {
|
||||
while let Ok(pkg) = self.chan_in.try_recv() {
|
||||
return if let PipelinePayload::AvPacket(pkt) = pkg {
|
||||
return if let PipelinePayload::AvPacket(_, pkt) = pkg {
|
||||
unsafe {
|
||||
let frames = self.decode_pkt(pkt)?;
|
||||
Ok(frames)
|
||||
|
@ -158,7 +158,7 @@ impl Demuxer {
|
||||
}
|
||||
(*pkt).opaque = stream as *mut libc::c_void;
|
||||
|
||||
let pkg = PipelinePayload::AvPacket(pkt);
|
||||
let pkg = PipelinePayload::AvPacket("Demuxer packet".to_owned(), pkt);
|
||||
self.chan_out.send(pkg)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::ffi::{CStr, CString};
|
||||
use std::mem::transmute;
|
||||
use std::ptr;
|
||||
@ -14,9 +15,8 @@ use ffmpeg_sys_next::{
|
||||
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
|
||||
use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709;
|
||||
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
|
||||
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||
use ffmpeg_sys_next::AVSampleFormat::AV_SAMPLE_FMT_FLT;
|
||||
use futures_util::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use log::info;
|
||||
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
|
||||
use uuid::{Bytes, Uuid, Variant};
|
||||
@ -27,6 +27,8 @@ use crate::pipeline::{HLSEgressConfig, PipelinePayload};
|
||||
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid};
|
||||
use crate::variant::{VariantStream, VideoVariant};
|
||||
|
||||
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||
|
||||
pub struct HlsEgress {
|
||||
/// Pipeline id
|
||||
id: Uuid,
|
||||
@ -56,11 +58,13 @@ impl HlsEgress {
|
||||
unsafe fn setup_muxer(&mut self) -> Result<(), Error> {
|
||||
let mut ctx = ptr::null_mut();
|
||||
|
||||
let base = format!("{}/{}", self.config.out_dir, self.id);
|
||||
|
||||
let ret = avformat_alloc_output_context2(
|
||||
&mut ctx,
|
||||
ptr::null(),
|
||||
"hls\0".as_ptr() as *const libc::c_char,
|
||||
format!("{}/stream_%v/live.m3u8\0", self.id).as_ptr() as *const libc::c_char,
|
||||
format!("{}/stream_%v/live.m3u8\0", base).as_ptr() as *const libc::c_char,
|
||||
);
|
||||
if ret < 0 {
|
||||
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
|
||||
@ -69,7 +73,7 @@ impl HlsEgress {
|
||||
av_opt_set(
|
||||
(*ctx).priv_data,
|
||||
"hls_segment_filename\0".as_ptr() as *const libc::c_char,
|
||||
format!("{}/stream_%v/seg_%05d.ts\0", self.id).as_ptr() as *const libc::c_char,
|
||||
format!("{}/stream_%v/seg_%05d.ts\0", base).as_ptr() as *const libc::c_char,
|
||||
0,
|
||||
);
|
||||
|
||||
@ -94,16 +98,8 @@ impl HlsEgress {
|
||||
0,
|
||||
);
|
||||
|
||||
info!("map_str={}", self.config.stream_map);
|
||||
|
||||
av_opt_set(
|
||||
(*ctx).priv_data,
|
||||
"var_stream_map\0".as_ptr() as *const libc::c_char,
|
||||
format!("{}\0", self.config.stream_map).as_ptr() as *const libc::c_char,
|
||||
0,
|
||||
);
|
||||
|
||||
for var in &mut self.config.variants {
|
||||
let tb = var.time_base();
|
||||
match var {
|
||||
VariantStream::Video(vs) => {
|
||||
let stream = avformat_new_stream(ctx, ptr::null());
|
||||
@ -113,6 +109,7 @@ impl HlsEgress {
|
||||
|
||||
// overwrite dst_index to match output stream
|
||||
vs.dst_index = (*stream).index as usize;
|
||||
(*stream).time_base = tb;
|
||||
|
||||
let params = (*stream).codecpar;
|
||||
(*params).height = vs.height as libc::c_int;
|
||||
@ -137,6 +134,7 @@ impl HlsEgress {
|
||||
|
||||
// overwrite dst_index to match output stream
|
||||
va.dst_index = (*stream).index as usize;
|
||||
(*stream).time_base = tb;
|
||||
|
||||
let params = (*stream).codecpar;
|
||||
|
||||
@ -160,6 +158,34 @@ impl HlsEgress {
|
||||
}
|
||||
}
|
||||
|
||||
// configure mapping
|
||||
let mut stream_map: HashMap<usize, Vec<String>> = HashMap::new();
|
||||
for var in &self.config.variants {
|
||||
let cfg = match var {
|
||||
VariantStream::Video(vx) => format!("v:{}", vx.dst_index),
|
||||
VariantStream::Audio(ax) => format!("a:{}", ax.dst_index),
|
||||
};
|
||||
if let Some(out_stream) = stream_map.get_mut(&var.dst_index()) {
|
||||
out_stream.push(cfg);
|
||||
} else {
|
||||
stream_map.insert(var.dst_index(), vec![cfg]);
|
||||
}
|
||||
}
|
||||
let stream_map = stream_map
|
||||
.values()
|
||||
.into_iter()
|
||||
.map(|v| v.join(","))
|
||||
.join(" ");
|
||||
|
||||
info!("map_str={}", stream_map);
|
||||
|
||||
av_opt_set(
|
||||
(*ctx).priv_data,
|
||||
"var_stream_map\0".as_ptr() as *const libc::c_char,
|
||||
format!("{}\0", stream_map).as_ptr() as *const libc::c_char,
|
||||
0,
|
||||
);
|
||||
|
||||
av_dump_format(ctx, 0, ptr::null(), 1);
|
||||
|
||||
let ret = avformat_write_header(ctx, ptr::null_mut());
|
||||
@ -172,34 +198,16 @@ impl HlsEgress {
|
||||
}
|
||||
|
||||
unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
|
||||
let variant_id = id_ref_to_uuid((*pkt).opaque_ref);
|
||||
let dst_stream_index = self.config.variants.iter().find_map(|v| match &v {
|
||||
VariantStream::Video(vv) => {
|
||||
if vv.id.eq(&variant_id) {
|
||||
Some(vv.dst_index)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
VariantStream::Audio(va) => {
|
||||
if va.id.eq(&variant_id) {
|
||||
Some(va.dst_index)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
});
|
||||
if let None = dst_stream_index {
|
||||
let variant_id = id_ref_to_uuid((*pkt).opaque_ref)?;
|
||||
let variant = self.config.variants.iter().find(|v| v.id() == variant_id);
|
||||
if variant.is_none() {
|
||||
return Err(Error::msg(format!(
|
||||
"No stream found with id={:?}",
|
||||
dst_stream_index
|
||||
variant_id
|
||||
)));
|
||||
}
|
||||
|
||||
let stream = *(*self.ctx).streams.add(dst_stream_index.unwrap());
|
||||
av_packet_rescale_ts(pkt, (*pkt).time_base, (*stream).time_base);
|
||||
|
||||
let stream = *(*self.ctx).streams.add(variant.unwrap().dst_index());
|
||||
(*pkt).stream_index = (*stream).index;
|
||||
|
||||
let ret = av_interleaved_write_frame(self.ctx, pkt);
|
||||
@ -213,7 +221,7 @@ impl HlsEgress {
|
||||
pub fn process(&mut self) -> Result<(), Error> {
|
||||
while let Ok(pkg) = self.chan_in.try_recv() {
|
||||
match pkg {
|
||||
PipelinePayload::AvPacket(pkt) => unsafe {
|
||||
PipelinePayload::AvPacket(_, pkt) => unsafe {
|
||||
if self.ctx == ptr::null_mut() {
|
||||
self.setup_muxer()?;
|
||||
}
|
||||
|
@ -3,11 +3,11 @@ use std::ptr;
|
||||
|
||||
use anyhow::Error;
|
||||
use ffmpeg_sys_next::{
|
||||
av_buffer_ref, AV_CH_LAYOUT_STEREO, AV_CODEC_FLAG_GLOBAL_HEADER, av_get_sample_fmt, av_opt_set,
|
||||
av_packet_alloc, av_packet_free, AVBufferRef, AVChannelLayout,
|
||||
AVChannelLayout__bindgen_ty_1, AVCodec, avcodec_alloc_context3, avcodec_find_encoder, avcodec_open2,
|
||||
avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational,
|
||||
AVStream,
|
||||
av_buffer_ref, AV_CH_LAYOUT_STEREO, av_get_sample_fmt, av_opt_set, av_packet_alloc,
|
||||
av_packet_free, av_packet_rescale_ts, AVBufferRef, AVChannelLayout,
|
||||
AVChannelLayout__bindgen_ty_1, AVCodec, avcodec_alloc_context3, avcodec_find_encoder,
|
||||
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR,
|
||||
AVFrame,
|
||||
};
|
||||
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
|
||||
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||
@ -16,7 +16,7 @@ use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::ipc::Rx;
|
||||
use crate::pipeline::PipelinePayload;
|
||||
use crate::utils::{get_ffmpeg_error_msg, variant_id_ref};
|
||||
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid, variant_id_ref};
|
||||
use crate::variant::VariantStream;
|
||||
|
||||
pub struct Encoder<T> {
|
||||
@ -33,8 +33,8 @@ unsafe impl<T> Send for Encoder<T> {}
|
||||
unsafe impl<T> Sync for Encoder<T> {}
|
||||
|
||||
impl<TRecv> Encoder<TRecv>
|
||||
where
|
||||
TRecv: Rx<PipelinePayload>,
|
||||
where
|
||||
TRecv: Rx<PipelinePayload>,
|
||||
{
|
||||
pub fn new(
|
||||
chan_in: TRecv,
|
||||
@ -69,17 +69,15 @@ impl<TRecv> Encoder<TRecv>
|
||||
return Err(Error::msg("Failed to allocate encoder context"));
|
||||
}
|
||||
|
||||
(*ctx).time_base = self.variant.time_base();
|
||||
match &self.variant {
|
||||
VariantStream::Video(vv) => {
|
||||
(*ctx).bit_rate = vv.bitrate as i64;
|
||||
(*ctx).width = (*frame).width;
|
||||
(*ctx).height = (*frame).height;
|
||||
(*ctx).time_base = AVRational {
|
||||
num: 1,
|
||||
den: vv.fps as libc::c_int,
|
||||
};
|
||||
|
||||
(*ctx).gop_size = (vv.fps * vv.keyframe_interval) as libc::c_int;
|
||||
let key_frames = vv.fps * vv.keyframe_interval;
|
||||
(*ctx).gop_size = key_frames as libc::c_int;
|
||||
(*ctx).max_b_frames = 1;
|
||||
(*ctx).pix_fmt = AV_PIX_FMT_YUV420P;
|
||||
av_opt_set(
|
||||
@ -103,10 +101,6 @@ impl<TRecv> Encoder<TRecv>
|
||||
},
|
||||
opaque: ptr::null_mut(),
|
||||
};
|
||||
(*ctx).time_base = AVRational {
|
||||
num: 1,
|
||||
den: va.sample_rate as libc::c_int,
|
||||
};
|
||||
}
|
||||
_ => {
|
||||
// nothing
|
||||
@ -125,12 +119,11 @@ impl<TRecv> Encoder<TRecv>
|
||||
}
|
||||
|
||||
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
|
||||
let stream = (*frame).opaque as *mut AVStream;
|
||||
if (*stream).index as usize != self.variant.src_index() {
|
||||
return Ok(());
|
||||
}
|
||||
self.setup_encoder(frame)?;
|
||||
|
||||
let var_id = id_ref_to_uuid((*frame).opaque_ref)?;
|
||||
assert_eq!(var_id, self.variant.id());
|
||||
|
||||
let mut ret = avcodec_send_frame(self.ctx, frame);
|
||||
if ret < 0 && ret != AVERROR(EAGAIN) {
|
||||
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
|
||||
@ -147,11 +140,12 @@ impl<TRecv> Encoder<TRecv>
|
||||
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
|
||||
}
|
||||
|
||||
(*pkt).time_base = (*self.ctx).time_base;
|
||||
(*pkt).duration = (*frame).duration;
|
||||
(*pkt).time_base = (*frame).time_base;
|
||||
(*pkt).opaque = stream as *mut libc::c_void;
|
||||
av_packet_rescale_ts(pkt, (*frame).time_base, (*self.ctx).time_base);
|
||||
(*pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
|
||||
self.chan_out.send(PipelinePayload::AvPacket(pkt))?;
|
||||
self.chan_out
|
||||
.send(PipelinePayload::AvPacket("Encoder packet".to_owned(), pkt))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -160,7 +154,7 @@ impl<TRecv> Encoder<TRecv>
|
||||
pub fn process(&mut self) -> Result<(), Error> {
|
||||
while let Ok(pkg) = self.chan_in.try_recv_next() {
|
||||
match pkg {
|
||||
PipelinePayload::AvFrame(frm) => unsafe {
|
||||
PipelinePayload::AvFrame(_, frm) => unsafe {
|
||||
self.process_frame(frm)?;
|
||||
},
|
||||
_ => return Err(Error::msg("Payload not supported")),
|
||||
|
@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
pretty_env_logger::init();
|
||||
|
||||
unsafe {
|
||||
//ffmpeg_sys_next::av_log_set_level(ffmpeg_sys_next::AV_LOG_MAX_OFFSET);
|
||||
//ffmpeg_sys_next::av_log_set_level(ffmpeg_sys_next::AV_LOG_DEBUG);
|
||||
info!(
|
||||
"FFMPEG version={}",
|
||||
CStr::from_ptr(ffmpeg_sys_next::av_version_info())
|
||||
@ -47,7 +47,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let settings: Settings = builder.try_deserialize()?;
|
||||
|
||||
let webhook = Webhook::new(settings.webhook_url);
|
||||
let webhook = Webhook::new(settings.clone());
|
||||
let builder = PipelineBuilder::new(webhook);
|
||||
let mut listeners = vec![];
|
||||
for e in settings.endpoints {
|
||||
|
@ -14,8 +14,16 @@ impl PipelineBuilder {
|
||||
Self { webhook }
|
||||
}
|
||||
|
||||
pub async fn build_for(&self, info: ConnectionInfo, recv: UnboundedReceiver<bytes::Bytes>) -> Result<PipelineRunner, anyhow::Error> {
|
||||
let config = self.webhook.start(info).await?;
|
||||
Ok(PipelineRunner::new(config, recv))
|
||||
pub async fn build_for(
|
||||
&self,
|
||||
info: ConnectionInfo,
|
||||
recv: UnboundedReceiver<bytes::Bytes>,
|
||||
) -> Result<PipelineRunner, anyhow::Error> {
|
||||
self.webhook.start(info).await?;
|
||||
Ok(PipelineRunner::new(
|
||||
Default::default(),
|
||||
self.webhook.clone(),
|
||||
recv,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,6 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use ffmpeg_sys_next::{
|
||||
av_frame_alloc, av_frame_free, av_frame_ref, av_packet_alloc, av_packet_free, av_packet_ref,
|
||||
AVFrame, AVPacket,
|
||||
};
|
||||
use ffmpeg_sys_next::{av_frame_clone, av_frame_copy_props, av_frame_free, av_packet_clone, av_packet_copy_props, av_packet_free, AVFrame, AVPacket};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::demux::info::DemuxStreamInfo;
|
||||
@ -18,19 +14,16 @@ pub enum EgressType {
|
||||
HLS(HLSEgressConfig),
|
||||
DASH,
|
||||
WHEP,
|
||||
MPEGTS,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct HLSEgressConfig {
|
||||
pub out_dir: String,
|
||||
pub variants: Vec<VariantStream>,
|
||||
|
||||
/// FFMPEG stream mapping string
|
||||
///
|
||||
/// v:0,a:0 v:1,a:0, v:2,a:1 etc..
|
||||
pub stream_map: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
pub struct PipelineConfig {
|
||||
pub id: uuid::Uuid,
|
||||
pub recording: Vec<VariantStream>,
|
||||
@ -44,9 +37,9 @@ pub enum PipelinePayload {
|
||||
/// Raw bytes from ingress
|
||||
Bytes(bytes::Bytes),
|
||||
/// FFMpeg AVPacket
|
||||
AvPacket(*mut AVPacket),
|
||||
AvPacket(String, *mut AVPacket),
|
||||
/// FFMpeg AVFrame
|
||||
AvFrame(*mut AVFrame),
|
||||
AvFrame(String, *mut AVFrame),
|
||||
/// Information about the input stream
|
||||
SourceInfo(DemuxStreamInfo),
|
||||
}
|
||||
@ -60,15 +53,15 @@ impl Clone for PipelinePayload {
|
||||
match self {
|
||||
PipelinePayload::Empty => PipelinePayload::Empty,
|
||||
PipelinePayload::Bytes(b) => PipelinePayload::Bytes(b.clone()),
|
||||
PipelinePayload::AvPacket(p) => unsafe {
|
||||
let new_pkt = av_packet_alloc();
|
||||
av_packet_ref(new_pkt, *p);
|
||||
PipelinePayload::AvPacket(new_pkt)
|
||||
PipelinePayload::AvPacket(t, p) => unsafe {
|
||||
let new_pkt = av_packet_clone(*p);
|
||||
av_packet_copy_props(new_pkt, *p);
|
||||
PipelinePayload::AvPacket(t.clone(), new_pkt)
|
||||
},
|
||||
PipelinePayload::AvFrame(p) => unsafe {
|
||||
let new_frame = av_frame_alloc();
|
||||
av_frame_ref(new_frame, *p);
|
||||
PipelinePayload::AvFrame(new_frame)
|
||||
PipelinePayload::AvFrame(t, p) => unsafe {
|
||||
let new_frame = av_frame_clone(*p);
|
||||
av_frame_copy_props(new_frame, *p);
|
||||
PipelinePayload::AvFrame(t.clone(), new_frame)
|
||||
},
|
||||
PipelinePayload::SourceInfo(i) => PipelinePayload::SourceInfo(i.clone()),
|
||||
}
|
||||
@ -80,19 +73,13 @@ impl Drop for PipelinePayload {
|
||||
match self {
|
||||
PipelinePayload::Empty => {}
|
||||
PipelinePayload::Bytes(_) => {}
|
||||
PipelinePayload::AvPacket(p) => unsafe {
|
||||
PipelinePayload::AvPacket(_, p) => unsafe {
|
||||
av_packet_free(p);
|
||||
},
|
||||
PipelinePayload::AvFrame(p) => unsafe {
|
||||
PipelinePayload::AvFrame(_, p) => unsafe {
|
||||
av_frame_free(p);
|
||||
},
|
||||
PipelinePayload::SourceInfo(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait PipelineStep {
|
||||
fn name(&self) -> String;
|
||||
async fn process(&mut self, pkg: &PipelinePayload) -> Result<PipelinePayload, anyhow::Error>;
|
||||
}
|
||||
|
@ -2,7 +2,9 @@ use std::ops::{Add, AddAssign};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Error;
|
||||
use itertools::Itertools;
|
||||
use log::info;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
|
||||
|
||||
@ -11,9 +13,10 @@ use crate::demux::Demuxer;
|
||||
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
|
||||
use crate::egress::hls::HlsEgress;
|
||||
use crate::encode::Encoder;
|
||||
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineStep};
|
||||
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload};
|
||||
use crate::scale::Scaler;
|
||||
use crate::variant::VariantStream;
|
||||
use crate::webhook::Webhook;
|
||||
|
||||
struct ScalerEncoder {
|
||||
pub scaler: Scaler,
|
||||
@ -31,10 +34,15 @@ pub struct PipelineRunner {
|
||||
started: Instant,
|
||||
frame_no: u64,
|
||||
stream_info: Option<DemuxStreamInfo>,
|
||||
webhook: Webhook,
|
||||
}
|
||||
|
||||
impl PipelineRunner {
|
||||
pub fn new(config: PipelineConfig, recv: UnboundedReceiver<bytes::Bytes>) -> Self {
|
||||
pub fn new(
|
||||
config: PipelineConfig,
|
||||
webhook: Webhook,
|
||||
recv: UnboundedReceiver<bytes::Bytes>,
|
||||
) -> Self {
|
||||
let (demux_out, demux_in) = unbounded_channel();
|
||||
let (dec_tx, dec_rx) = broadcast::channel::<PipelinePayload>(32);
|
||||
Self {
|
||||
@ -48,6 +56,7 @@ impl PipelineRunner {
|
||||
started: Instant::now(),
|
||||
frame_no: 0,
|
||||
stream_info: None,
|
||||
webhook,
|
||||
}
|
||||
}
|
||||
|
||||
@ -102,6 +111,9 @@ impl PipelineRunner {
|
||||
info!("Configuring pipeline {:?}", info);
|
||||
self.stream_info = Some(info.clone());
|
||||
|
||||
// re-configure with demuxer info
|
||||
self.config = self.webhook.configure(&info);
|
||||
|
||||
let video_stream = info
|
||||
.channels
|
||||
.iter()
|
||||
@ -116,16 +128,16 @@ impl PipelineRunner {
|
||||
.push(HlsEgress::new(egress_rx, self.config.id, cfg.clone()));
|
||||
|
||||
for v in &cfg.variants {
|
||||
let (var_tx, var_rx) = unbounded_channel();
|
||||
match v {
|
||||
VariantStream::Video(vs) => {
|
||||
let (sw_tx, sw_rx) = unbounded_channel();
|
||||
self.scalers.push(ScalerEncoder {
|
||||
scaler: Scaler::new(
|
||||
self.decoder_output.resubscribe(),
|
||||
var_tx.clone(),
|
||||
sw_tx.clone(),
|
||||
vs.clone(),
|
||||
),
|
||||
encoder: Encoder::new(var_rx, egress_tx.clone(), v.clone()),
|
||||
encoder: Encoder::new(sw_rx, egress_tx.clone(), v.clone()),
|
||||
});
|
||||
}
|
||||
VariantStream::Audio(_) => {
|
||||
|
@ -3,7 +3,7 @@ use std::ptr;
|
||||
|
||||
use anyhow::Error;
|
||||
use ffmpeg_sys_next::{
|
||||
av_buffer_ref, av_frame_alloc, av_frame_copy_props, av_frame_unref, AVBufferRef,
|
||||
av_buffer_ref, av_frame_alloc, av_frame_copy_props, AVBufferRef,
|
||||
AVFrame, SWS_BILINEAR, sws_getContext, sws_scale_frame, SwsContext,
|
||||
};
|
||||
use tokio::sync::broadcast;
|
||||
@ -74,24 +74,20 @@ impl Scaler {
|
||||
}
|
||||
|
||||
let ret = sws_scale_frame(self.ctx, dst_frame, frame);
|
||||
av_frame_unref(frame);
|
||||
if ret < 0 {
|
||||
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
|
||||
}
|
||||
|
||||
(*dst_frame).time_base = (*frame).time_base;
|
||||
(*dst_frame).pts = (*frame).pts;
|
||||
(*dst_frame).pkt_dts = (*frame).pkt_dts;
|
||||
(*dst_frame).opaque_ref = av_buffer_ref(self.var_id_ref);
|
||||
|
||||
self.chan_out.send(PipelinePayload::AvFrame(dst_frame))?;
|
||||
self.chan_out.send(PipelinePayload::AvFrame("Scaler frame".to_owned(), dst_frame))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn process(&mut self) -> Result<(), Error> {
|
||||
while let Ok(pkg) = self.chan_in.try_recv() {
|
||||
match pkg {
|
||||
PipelinePayload::AvFrame(frm) => unsafe {
|
||||
PipelinePayload::AvFrame(_, frm) => unsafe {
|
||||
self.process_frame(frm)?;
|
||||
},
|
||||
_ => return Err(Error::msg("Payload not supported payload")),
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::ffi::CStr;
|
||||
use std::ptr;
|
||||
|
||||
use anyhow::Error;
|
||||
use ffmpeg_sys_next::{av_buffer_allocz, av_make_error_string, AVBufferRef, memcpy};
|
||||
@ -53,9 +54,12 @@ pub fn video_variant_id_ref(var: &VideoVariant) -> *mut AVBufferRef {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn id_ref_to_uuid(buf: *mut AVBufferRef) -> Uuid {
|
||||
pub fn id_ref_to_uuid(buf: *mut AVBufferRef) -> Result<Uuid, Error> {
|
||||
unsafe {
|
||||
if buf == ptr::null_mut() {
|
||||
return Err(Error::msg("Buffer was null"));
|
||||
}
|
||||
let binding = Bytes::from(*((*buf).data as *const [u8; 16]));
|
||||
Uuid::from_bytes_ref(&binding).clone()
|
||||
Ok(Uuid::from_bytes_ref(&binding).clone())
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use ffmpeg_sys_next::AVRational;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
@ -9,10 +10,6 @@ pub enum VariantStream {
|
||||
Video(VideoVariant),
|
||||
/// Audio stream mapping
|
||||
Audio(AudioVariant),
|
||||
/// Copy source stream (video)
|
||||
CopyVideo(usize),
|
||||
/// Copy source stream (audio)
|
||||
CopyAudio(usize),
|
||||
}
|
||||
|
||||
/// Information related to variant streams for a given egress
|
||||
@ -107,12 +104,37 @@ impl Display for AudioVariant {
|
||||
}
|
||||
|
||||
impl VariantStream {
|
||||
pub fn id(&self) -> Uuid {
|
||||
match self {
|
||||
VariantStream::Video(v) => v.id,
|
||||
VariantStream::Audio(v) => v.id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn src_index(&self) -> usize {
|
||||
match self {
|
||||
VariantStream::Video(v) => v.src_index,
|
||||
VariantStream::Audio(v) => v.src_index,
|
||||
VariantStream::CopyVideo(v) => v.clone(),
|
||||
VariantStream::CopyAudio(v) => v.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dst_index(&self) -> usize {
|
||||
match self {
|
||||
VariantStream::Video(v) => v.dst_index,
|
||||
VariantStream::Audio(v) => v.dst_index,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn time_base(&self) -> AVRational {
|
||||
match &self {
|
||||
VariantStream::Video(vv) => AVRational {
|
||||
num: 1,
|
||||
den: 90_000,
|
||||
},
|
||||
VariantStream::Audio(va) => AVRational {
|
||||
num: 1,
|
||||
den: va.sample_rate as libc::c_int,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,28 +1,30 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use ffmpeg_sys_next::{AV_LEVEL_UNKNOWN, AV_PROFILE_H264_HIGH};
|
||||
use ffmpeg_sys_next::AVCodecID::{AV_CODEC_ID_AAC, AV_CODEC_ID_H264};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
|
||||
use crate::ingress::ConnectionInfo;
|
||||
use crate::pipeline::{EgressType, HLSEgressConfig, PipelineConfig};
|
||||
use crate::settings::Settings;
|
||||
use crate::variant::{AudioVariant, VariantStream, VideoVariant};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Webhook {
|
||||
url: String,
|
||||
config: Settings,
|
||||
}
|
||||
|
||||
impl Webhook {
|
||||
pub fn new(url: String) -> Self {
|
||||
Self { url }
|
||||
pub fn new(config: Settings) -> Self {
|
||||
Self { config }
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
&self,
|
||||
connection_info: ConnectionInfo,
|
||||
) -> Result<PipelineConfig, anyhow::Error> {
|
||||
let video_var = VideoVariant {
|
||||
pub async fn start(&self, connection_info: ConnectionInfo) -> Result<(), anyhow::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure(&self, stream_info: &DemuxStreamInfo) -> PipelineConfig {
|
||||
let mut vars: Vec<VariantStream> = vec![];
|
||||
vars.push(VariantStream::Video(VideoVariant {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: 0,
|
||||
dst_index: 0,
|
||||
@ -34,11 +36,11 @@ impl Webhook {
|
||||
profile: 100,
|
||||
level: 1,
|
||||
keyframe_interval: 2,
|
||||
};
|
||||
let video_var_2 = VideoVariant {
|
||||
}));
|
||||
vars.push(VariantStream::Video(VideoVariant {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: 0,
|
||||
dst_index: 0,
|
||||
dst_index: 1,
|
||||
width: 640,
|
||||
height: 360,
|
||||
fps: 30,
|
||||
@ -47,41 +49,41 @@ impl Webhook {
|
||||
profile: 100,
|
||||
level: 1,
|
||||
keyframe_interval: 2,
|
||||
};
|
||||
let audio_var = AudioVariant {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: 1,
|
||||
dst_index: 0,
|
||||
bitrate: 320_000,
|
||||
codec: 86018,
|
||||
channels: 2,
|
||||
sample_rate: 44_100,
|
||||
sample_fmt: "fltp".to_owned(),
|
||||
};
|
||||
}));
|
||||
let has_audio = stream_info
|
||||
.channels
|
||||
.iter()
|
||||
.any(|c| c.channel_type == StreamChannelType::Audio);
|
||||
if has_audio {
|
||||
vars.push(VariantStream::Audio(AudioVariant {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: 1,
|
||||
dst_index: 0,
|
||||
bitrate: 320_000,
|
||||
codec: 86018,
|
||||
channels: 2,
|
||||
sample_rate: 44_100,
|
||||
sample_fmt: "fltp".to_owned(),
|
||||
}));
|
||||
vars.push(VariantStream::Audio(AudioVariant {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: 1,
|
||||
dst_index: 1,
|
||||
bitrate: 220_000,
|
||||
codec: 86018,
|
||||
channels: 2,
|
||||
sample_rate: 44_100,
|
||||
sample_fmt: "fltp".to_owned(),
|
||||
}));
|
||||
}
|
||||
|
||||
let audio_var_2 = AudioVariant {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: 1,
|
||||
dst_index: 0,
|
||||
bitrate: 220_000,
|
||||
codec: 86018,
|
||||
channels: 2,
|
||||
sample_rate: 44_100,
|
||||
sample_fmt: "fltp".to_owned(),
|
||||
};
|
||||
|
||||
Ok(PipelineConfig {
|
||||
PipelineConfig {
|
||||
id: Uuid::new_v4(),
|
||||
recording: vec![],
|
||||
egress: vec![EgressType::HLS(HLSEgressConfig {
|
||||
variants: vec![
|
||||
VariantStream::Video(video_var),
|
||||
VariantStream::Video(video_var_2),
|
||||
VariantStream::Audio(audio_var),
|
||||
VariantStream::Audio(audio_var_2),
|
||||
],
|
||||
stream_map: "v:0,a:0 v:1,a:1".to_owned(),
|
||||
out_dir: self.config.output_dir.clone(),
|
||||
variants: vars,
|
||||
})],
|
||||
recording: vec![VariantStream::CopyVideo(0), VariantStream::CopyAudio(1)],
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user