This commit is contained in:
kieran 2024-03-25 10:32:03 +00:00
parent 529e3b6234
commit 9a086d80c1
22 changed files with 615 additions and 448 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target
.idea/

604
Cargo.lock generated Normal file → Executable file

File diff suppressed because it is too large Load Diff

4
Cargo.toml Normal file → Executable file
View File

@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
srt-tokio = "0.4.3"
tokio = { version = "1.36.0" , features = ["rt-multi-thread", "sync"]}
tokio = { version = "1.36.0", features = ["rt-multi-thread", "sync"] }
anyhow = { version = "1.0.80", features = ["backtrace"] }
pretty_env_logger = "0.5.0"
bytes = "1.5.0"
@ -13,7 +13,7 @@ tokio-stream = "0.1.14"
futures-util = "0.3.30"
async-trait = "0.1.77"
log = "0.4.21"
ffmpeg-sys-next = { version = "6.1.0", features = ["avformat", "avcodec", "swscale", "avfilter"]}
ffmpeg-sys-next = { version = "6.1.0", features = ["avformat", "avcodec", "swscale", "avfilter"] }
libc = "0.2.153"
pretty-hex = "0.4.1"
uuid = { version = "1.8.0", features = ["v4", "serde"] }

0
Dockerfile Normal file → Executable file
View File

0
README.md Normal file → Executable file
View File

0
config.toml Normal file → Executable file
View File

View File

@ -3,12 +3,12 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_frame_alloc, av_packet_unref, avcodec_alloc_context3, avcodec_find_decoder,
avcodec_free_context, avcodec_open2, avcodec_parameters_to_context, avcodec_receive_frame,
avcodec_send_packet, AVCodec, AVCodecContext, AVPacket, AVStream, AVERROR, AVERROR_EOF,
av_frame_alloc, av_packet_unref, AVCodec, avcodec_alloc_context3,
avcodec_find_decoder, avcodec_free_context, avcodec_open2, avcodec_parameters_to_context,
avcodec_receive_frame, avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket, AVStream,
};
use tokio::sync::broadcast;
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
use tokio::sync::mpsc::UnboundedReceiver;
use crate::pipeline::PipelinePayload;
@ -32,6 +32,7 @@ pub struct Decoder {
}
unsafe impl Send for Decoder {}
unsafe impl Sync for Decoder {}
impl Decoder {
@ -46,16 +47,23 @@ impl Decoder {
}
}
pub unsafe fn decode_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
pub unsafe fn decode_pkt(&mut self, pkt: *mut AVPacket) -> Result<usize, Error> {
let stream_index = (*pkt).stream_index as i32;
let stream = (*pkt).opaque as *mut AVStream;
let codec_par = (*stream).codecpar;
let has_codec_params = codec_par != ptr::null_mut();
if !has_codec_params {
panic!("Cant handle pkt, dropped!");
}
assert_eq!(
stream_index,
(*stream).index,
"Passed stream reference does not match stream_index of packet"
);
if has_codec_params && !self.codecs.contains_key(&stream_index) {
let codec_par = (*stream).codecpar;
assert_ne!(
codec_par,
ptr::null_mut(),
"Codec parameters are missing from stream"
);
if !self.codecs.contains_key(&stream_index) {
let codec = avcodec_find_decoder((*codec_par).codec_id);
if codec == ptr::null_mut() {
return Err(Error::msg("Failed to find codec"));
@ -81,6 +89,7 @@ impl Decoder {
return Err(Error::msg(format!("Failed to decode packet {}", ret)));
}
let mut frames = 0;
while ret >= 0 {
let frame = av_frame_alloc();
ret = avcodec_receive_frame(ctx.context, frame);
@ -91,22 +100,26 @@ impl Decoder {
return Err(Error::msg(format!("Failed to decode {}", ret)));
}
(*frame).time_base = (*pkt).time_base;
(*frame).opaque = stream as *mut libc::c_void;
self.chan_out.send(PipelinePayload::AvFrame(frame))?;
frames += 1;
}
return Ok(frames);
}
Ok(())
Ok(0)
}
pub fn process(&mut self) -> Result<(), Error> {
pub fn process(&mut self) -> Result<usize, Error> {
while let Ok(pkg) = self.chan_in.try_recv() {
if let PipelinePayload::AvPacket(pkt) = pkg {
return if let PipelinePayload::AvPacket(pkt) = pkg {
unsafe {
self.decode_pkt(pkt)?;
let frames = self.decode_pkt(pkt)?;
Ok(frames)
}
} else {
return Err(Error::msg("Payload not supported"));
}
Err(Error::msg("Payload not supported"))
};
}
Ok(())
Ok(0)
}
}

View File

@ -1,4 +1,3 @@
use ffmpeg_sys_next::AVCodecParameters;
use crate::fraction::Fraction;
#[derive(Clone, Debug, PartialEq)]
@ -18,12 +17,9 @@ pub struct StreamInfoChannel {
pub channel_type: StreamChannelType,
pub width: usize,
pub height: usize,
pub codec_params: *const AVCodecParameters,
pub fps: f32,
}
unsafe impl Sync for StreamInfoChannel {}
unsafe impl Send for StreamInfoChannel {}
impl TryInto<Fraction> for StreamInfoChannel {
type Error = ();

View File

@ -3,9 +3,9 @@ use std::time::{Duration, SystemTime};
use anyhow::Error;
use bytes::{Bytes, BytesMut};
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use ffmpeg_sys_next::*;
use log::info;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use log::{info, warn};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::time::Instant;
@ -32,6 +32,7 @@ pub(crate) struct Demuxer {
}
unsafe impl Send for Demuxer {}
unsafe impl Sync for Demuxer {}
unsafe extern "C" fn read_data(
@ -105,17 +106,12 @@ impl Demuxer {
av_find_best_stream(self.ctx, AVMEDIA_TYPE_VIDEO, -1, -1, ptr::null_mut(), 0) as usize;
if video_stream_index != AVERROR_STREAM_NOT_FOUND as usize {
let video_stream = *(*self.ctx).streams.add(video_stream_index);
let codec_copy = unsafe {
let ptr = avcodec_parameters_alloc();
avcodec_parameters_copy(ptr, (*video_stream).codecpar);
ptr
};
channel_infos.push(StreamInfoChannel {
index: video_stream_index,
channel_type: StreamChannelType::Video,
width: (*(*video_stream).codecpar).width as usize,
height: (*(*video_stream).codecpar).height as usize,
codec_params: codec_copy,
fps: av_q2d((*video_stream).avg_frame_rate) as f32,
});
}
@ -133,7 +129,7 @@ impl Demuxer {
channel_type: StreamChannelType::Audio,
width: (*(*audio_stream).codecpar).width as usize,
height: (*(*audio_stream).codecpar).height as usize,
codec_params: codec_copy,
fps: 0.0,
});
}
@ -149,6 +145,7 @@ impl Demuxer {
if ret == AVERROR_EOF {
// reset EOF flag, stream never ends
(*(*self.ctx).pb).eof_reached = 0;
warn!("EOF was reached, stream might skip frames");
return Ok(());
}
if ret < 0 {
@ -156,10 +153,13 @@ impl Demuxer {
return Err(Error::msg(msg));
}
let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize);
(*pkt).time_base = (*stream).time_base;
if (*pkt).time_base.num == 0 {
(*pkt).time_base = (*stream).time_base;
}
(*pkt).opaque = stream as *mut libc::c_void;
self.chan_out.send(PipelinePayload::AvPacket(pkt))?;
let pkg = PipelinePayload::AvPacket(pkt);
self.chan_out.send(pkg)?;
Ok(())
}

View File

@ -3,28 +3,28 @@ use std::mem::transmute;
use std::ptr;
use std::ptr::slice_from_raw_parts;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::fraction::Fraction;
use anyhow::Error;
use ffmpeg_sys_next::{
AV_CH_LAYOUT_STEREO, av_channel_layout_default, av_dump_format, av_get_sample_fmt,
av_interleaved_write_frame, av_opt_set, av_packet_rescale_ts, av_write_frame, AVChannelLayout,
AVChannelLayout__bindgen_ty_1, avcodec_send_frame, avcodec_send_packet, AVCodecContext,
avformat_alloc_output_context2, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket,
AVRational,
};
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_sys_next::AVSampleFormat::AV_SAMPLE_FMT_FLT;
use ffmpeg_sys_next::{
av_channel_layout_default, av_dump_format, av_interleaved_write_frame, av_opt_set,
av_packet_rescale_ts, av_write_frame, avcodec_send_frame, avcodec_send_packet,
avformat_alloc_output_context2, avformat_new_stream, avformat_write_header, AVChannelLayout,
AVChannelLayout__bindgen_ty_1, AVCodecContext, AVFormatContext, AVPacket, AVRational,
AV_CH_LAYOUT_STEREO,
};
use futures_util::StreamExt;
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
use log::info;
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
use uuid::{Bytes, Uuid, Variant};
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::fraction::Fraction;
use crate::pipeline::{HLSEgressConfig, PipelinePayload};
use crate::utils::get_ffmpeg_error_msg;
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid};
use crate::variant::{VariantStream, VideoVariant};
pub struct HlsEgress {
@ -36,10 +36,15 @@ pub struct HlsEgress {
}
unsafe impl Send for HlsEgress {}
unsafe impl Sync for HlsEgress {}
impl HlsEgress {
pub fn new(chan_in: UnboundedReceiver<PipelinePayload>, id: Uuid, config: HLSEgressConfig) -> Self {
pub fn new(
chan_in: UnboundedReceiver<PipelinePayload>,
id: Uuid,
config: HLSEgressConfig,
) -> Self {
Self {
id,
config,
@ -137,7 +142,9 @@ impl HlsEgress {
(*params).codec_id = transmute(va.codec as i32);
(*params).codec_type = AVMEDIA_TYPE_AUDIO;
(*params).format = AV_SAMPLE_FMT_FLT as libc::c_int;
(*params).format = av_get_sample_fmt(
format!("{}\0", va.sample_fmt).as_ptr() as *const libc::c_char
) as libc::c_int;
(*params).bit_rate = va.bitrate as i64;
(*params).sample_rate = va.sample_rate as libc::c_int;
(*params).ch_layout = AVChannelLayout {
@ -165,19 +172,17 @@ impl HlsEgress {
}
unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
let slice_raw = slice_from_raw_parts((*(*pkt).opaque_ref).data, 16);
let binding = Bytes::from(*(slice_raw as *const [u8; 16]));
let variant_id = Uuid::from_bytes_ref(&binding);
let variant_id = id_ref_to_uuid((*pkt).opaque_ref);
let dst_stream_index = self.config.variants.iter().find_map(|v| match &v {
VariantStream::Video(vv) => {
if vv.id.eq(variant_id) {
if vv.id.eq(&variant_id) {
Some(vv.dst_index)
} else {
None
}
}
VariantStream::Audio(va) => {
if va.id.eq(variant_id) {
if va.id.eq(&variant_id) {
Some(va.dst_index)
} else {
None

View File

@ -1,23 +1,22 @@
use std::mem::transmute;
use std::ptr;
use crate::ipc::Rx;
use anyhow::Error;
use async_trait::async_trait;
use ffmpeg_sys_next::{
av_buffer_ref, AV_CH_LAYOUT_STEREO, AV_CODEC_FLAG_GLOBAL_HEADER, av_get_sample_fmt, av_opt_set,
av_packet_alloc, av_packet_free, AVBufferRef, AVChannelLayout,
AVChannelLayout__bindgen_ty_1, AVCodec, avcodec_alloc_context3, avcodec_find_encoder, avcodec_open2,
avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational,
AVStream,
};
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_sys_next::AVSampleFormat::AV_SAMPLE_FMT_FLT;
use ffmpeg_sys_next::{
av_buffer_allocz, av_opt_set, av_packet_alloc, av_packet_free, avcodec_alloc_context3,
avcodec_find_encoder, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, memcpy,
AVChannelLayout, AVChannelLayout__bindgen_ty_1, AVCodec, AVCodecContext, AVFrame, AVRational,
AVERROR, AV_CH_LAYOUT_STEREO,
};
use libc::EAGAIN;
use tokio::sync::mpsc::{UnboundedSender};
use tokio::sync::mpsc::UnboundedSender;
use crate::ipc::Rx;
use crate::pipeline::PipelinePayload;
use crate::utils::get_ffmpeg_error_msg;
use crate::utils::{get_ffmpeg_error_msg, variant_id_ref};
use crate::variant::VariantStream;
pub struct Encoder<T> {
@ -26,26 +25,30 @@ pub struct Encoder<T> {
codec: *const AVCodec,
chan_in: T,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl<T> Send for Encoder<T> {}
unsafe impl<T> Sync for Encoder<T> {}
impl<TRecv> Encoder<TRecv>
where
TRecv: Rx<PipelinePayload>,
where
TRecv: Rx<PipelinePayload>,
{
pub fn new(
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
variant: VariantStream,
) -> Self {
let id_ref = variant_id_ref(&variant).unwrap();
Self {
ctx: ptr::null_mut(),
codec: ptr::null(),
variant,
chan_in,
chan_out,
var_id_ref: id_ref,
}
}
@ -87,11 +90,9 @@ where
);
}
VariantStream::Audio(va) => {
(*ctx).sample_fmt = if (*encoder).sample_fmts != ptr::null() {
*(*encoder).sample_fmts.add(0)
} else {
AV_SAMPLE_FMT_FLT
};
(*ctx).sample_fmt = av_get_sample_fmt(
format!("{}\0", va.sample_fmt).as_ptr() as *const libc::c_char
);
(*ctx).bit_rate = va.bitrate as i64;
(*ctx).sample_rate = va.sample_rate as libc::c_int;
(*ctx).ch_layout = AVChannelLayout {
@ -105,7 +106,7 @@ where
(*ctx).time_base = AVRational {
num: 1,
den: va.sample_rate as libc::c_int,
}
};
}
_ => {
// nothing
@ -124,19 +125,23 @@ where
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
let stream = (*frame).opaque as *mut AVStream;
if (*stream).index as usize != self.variant.src_index() {
return Ok(());
}
self.setup_encoder(frame)?;
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 {
if ret < 0 && ret != AVERROR(EAGAIN) {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
while ret > 0 {
while ret > 0 || ret == AVERROR(EAGAIN) {
let mut pkt = av_packet_alloc();
ret = avcodec_receive_packet(self.ctx, pkt);
if ret < 0 {
av_packet_free(&mut pkt);
if ret == AVERROR(EAGAIN) {
av_packet_free(&mut pkt);
return Ok(());
}
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
@ -144,27 +149,8 @@ where
(*pkt).duration = (*frame).duration;
(*pkt).time_base = (*frame).time_base;
(*pkt).opaque_ref = match &self.variant {
VariantStream::Audio(va) => {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
va.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
buf
}
VariantStream::Video(vv) => {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
vv.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
buf
}
_ => return Err(Error::msg("Cannot assign pkt stream index")),
};
(*pkt).opaque = stream as *mut libc::c_void;
(*pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out.send(PipelinePayload::AvPacket(pkt))?;
}
@ -172,7 +158,7 @@ where
}
pub fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv() {
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(frm) => unsafe {
self.process_frame(frm)?;
@ -182,4 +168,4 @@ where
}
Ok(())
}
}
}

View File

@ -1,11 +1,14 @@
use crate::ingress::ConnectionInfo;
use crate::pipeline::builder::PipelineBuilder;
use crate::pipeline::runner::PipelineRunner;
use std::time::Instant;
use futures_util::{StreamExt, TryStreamExt};
use log::{info, warn};
use srt_tokio::{SrtListener, SrtSocket};
use tokio::sync::mpsc::unbounded_channel;
use crate::ingress::ConnectionInfo;
use crate::pipeline::builder::PipelineBuilder;
use crate::pipeline::runner::PipelineRunner;
pub async fn listen(addr: String, builder: PipelineBuilder) -> Result<(), anyhow::Error> {
let (_binding, mut packets) = SrtListener::builder().bind(addr.clone()).await?;

View File

@ -1,15 +1,17 @@
use crate::ingress::ConnectionInfo;
use crate::pipeline::builder::PipelineBuilder;
use crate::pipeline::runner::PipelineRunner;
use std::io;
use bytes::BytesMut;
use futures_util::{StreamExt, TryStreamExt};
use log::{error, info, warn};
use srt_tokio::{SrtListener, SrtSocket};
use std::io;
use tokio::io::AsyncReadExt;
use tokio::net::{TcpListener, TcpSocket};
use tokio::sync::mpsc::unbounded_channel;
use crate::ingress::ConnectionInfo;
use crate::pipeline::builder::PipelineBuilder;
use crate::pipeline::runner::PipelineRunner;
pub async fn listen(addr: String, builder: PipelineBuilder) -> Result<(), anyhow::Error> {
let listener = TcpListener::bind(addr.clone()).await.unwrap();

View File

@ -4,32 +4,32 @@ use async_trait::async_trait;
#[async_trait]
pub trait Rx<T> {
async fn recv(&mut self) -> Result<T, Error>;
fn try_recv(&mut self) -> Result<T, Error>;
fn try_recv_next(&mut self) -> Result<T, Error>;
}
#[async_trait]
impl<T> Rx<T> for tokio::sync::mpsc::UnboundedReceiver<T>
where
T: Send + Sync,
where
T: Send + Sync,
{
async fn recv(&mut self) -> Result<T, Error> {
self.recv().await.ok_or(Error::msg("recv error"))
}
fn try_recv(&mut self) -> Result<T, Error> {
fn try_recv_next(&mut self) -> Result<T, Error> {
Ok(self.try_recv()?)
}
}
#[async_trait]
impl<T> Rx<T> for tokio::sync::broadcast::Receiver<T>
where
T: Send + Sync,
where
T: Send + Sync + Clone,
{
async fn recv(&mut self) -> Result<T, Error> {
Ok(self.recv().await?)
}
fn try_recv(&mut self) -> Result<T, Error> {
fn try_recv_next(&mut self) -> Result<T, Error> {
Ok(self.try_recv()?)
}
}

View File

@ -1,3 +1,16 @@
use std::ffi::CStr;
use config::Config;
use futures_util::future::join_all;
use futures_util::StreamExt;
use log::{error, info};
use tokio::sync::futures;
use url::Url;
use crate::pipeline::builder::PipelineBuilder;
use crate::settings::Settings;
use crate::webhook::Webhook;
mod decode;
mod demux;
mod egress;
@ -12,17 +25,6 @@ mod variant;
mod webhook;
mod ipc;
use crate::pipeline::builder::PipelineBuilder;
use crate::settings::Settings;
use crate::webhook::Webhook;
use config::Config;
use futures_util::StreamExt;
use log::{error, info};
use std::ffi::CStr;
use futures_util::future::join_all;
use tokio::sync::futures;
use url::Url;
/// Test: ffmpeg -re -f lavfi -i testsrc -g 2 -r 30 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -f mpegts srt://localhost:3333
#[tokio::main]
async fn main() -> anyhow::Result<()> {

View File

@ -1,4 +1,5 @@
use tokio::sync::mpsc::UnboundedReceiver;
use crate::ingress::ConnectionInfo;
use crate::pipeline::runner::PipelineRunner;
use crate::webhook::Webhook;

View File

@ -1,7 +1,10 @@
use std::ops::{Deref, DerefMut};
use async_trait::async_trait;
use ffmpeg_sys_next::{AVFrame, AVPacket};
use ffmpeg_sys_next::{
av_frame_alloc, av_frame_free, av_frame_ref, av_packet_alloc, av_packet_free, av_packet_ref,
AVFrame, AVPacket,
};
use serde::{Deserialize, Serialize};
use crate::demux::info::DemuxStreamInfo;
@ -34,7 +37,7 @@ pub struct PipelineConfig {
pub egress: Vec<EgressType>,
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, PartialEq)]
pub enum PipelinePayload {
/// No output
Empty,
@ -49,8 +52,45 @@ pub enum PipelinePayload {
}
unsafe impl Send for PipelinePayload {}
unsafe impl Sync for PipelinePayload {}
impl Clone for PipelinePayload {
fn clone(&self) -> Self {
match self {
PipelinePayload::Empty => PipelinePayload::Empty,
PipelinePayload::Bytes(b) => PipelinePayload::Bytes(b.clone()),
PipelinePayload::AvPacket(p) => unsafe {
let new_pkt = av_packet_alloc();
av_packet_ref(new_pkt, *p);
PipelinePayload::AvPacket(new_pkt)
},
PipelinePayload::AvFrame(p) => unsafe {
let new_frame = av_frame_alloc();
av_frame_ref(new_frame, *p);
PipelinePayload::AvFrame(new_frame)
},
PipelinePayload::SourceInfo(i) => PipelinePayload::SourceInfo(i.clone()),
}
}
}
impl Drop for PipelinePayload {
fn drop(&mut self) {
match self {
PipelinePayload::Empty => {}
PipelinePayload::Bytes(_) => {}
PipelinePayload::AvPacket(p) => unsafe {
av_packet_free(p);
},
PipelinePayload::AvFrame(p) => unsafe {
av_frame_free(p);
},
PipelinePayload::SourceInfo(_) => {}
}
}
}
#[async_trait]
pub trait PipelineStep {
fn name(&self) -> String;

View File

@ -1,15 +1,19 @@
use std::ops::{Add, AddAssign};
use std::time::{Duration, Instant};
use anyhow::Error;
use log::info;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use crate::decode::Decoder;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::demux::Demuxer;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::egress::hls::HlsEgress;
use crate::encode::Encoder;
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineStep};
use crate::scale::Scaler;
use crate::variant::VariantStream;
use anyhow::Error;
use log::info;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
struct ScalerEncoder {
pub scaler: Scaler,
@ -24,6 +28,9 @@ pub struct PipelineRunner {
scalers: Vec<ScalerEncoder>,
encoders: Vec<Encoder<broadcast::Receiver<PipelinePayload>>>,
egress: Vec<HlsEgress>,
started: Instant,
frame_no: u64,
stream_info: Option<DemuxStreamInfo>,
}
impl PipelineRunner {
@ -38,14 +45,39 @@ impl PipelineRunner {
scalers: vec![],
encoders: vec![],
egress: vec![],
started: Instant::now(),
frame_no: 0,
stream_info: None,
}
}
pub fn run(&mut self) -> Result<(), Error> {
if let Some(info) = &self.stream_info {
if let Some(v_stream) = info
.channels
.iter()
.find(|s| s.channel_type == StreamChannelType::Video)
{
let duration = self.frame_no as f64 / v_stream.fps as f64;
let target_time = self.started.add(Duration::from_secs_f64(duration));
let now = Instant::now();
if now < target_time {
let poll_sleep = target_time - now;
std::thread::sleep(poll_sleep);
}
}
}
if let Some(cfg) = self.demuxer.process()? {
self.configure_pipeline(cfg)?;
}
self.decoder.process()?;
let frames = self.decoder.process()?;
if let Some(v) = self.frame_no.checked_add(frames as u64) {
self.frame_no = v;
} else {
panic!("Frame number overflowed, maybe you need a bigger number!");
}
// video scalar-encoder chains
for sw in &mut self.scalers {
sw.scaler.process()?;
sw.encoder.process()?;
@ -53,15 +85,22 @@ impl PipelineRunner {
eg.process()?;
}
}
// audio encoder chains
for enc in &mut self.encoders {
enc.process()?;
for eg in &mut self.egress {
eg.process()?;
}
}
Ok(())
}
fn configure_pipeline(&mut self, info: DemuxStreamInfo) -> Result<(), Error> {
// configure scalers
if self.scalers.len() != 0 {
if self.stream_info.is_some() {
return Err(Error::msg("Pipeline already configured!"));
}
info!("Configuring pipeline {:?}", info);
self.stream_info = Some(info.clone());
let video_stream = info
.channels
@ -100,7 +139,7 @@ impl PipelineRunner {
return Err(Error::msg(format!(
"Variant config not supported {:?}",
c
)))
)));
}
}
}
@ -109,6 +148,11 @@ impl PipelineRunner {
}
}
}
Ok(())
if self.egress.len() == 0 {
Err(Error::msg("No egress config, pipeline misconfigured!"))
} else {
Ok(())
}
}
}

View File

@ -3,14 +3,14 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_frame_alloc, av_frame_copy_props, av_frame_unref, AVFrame, SWS_BILINEAR, sws_getContext,
sws_scale_frame, SwsContext,
av_buffer_ref, av_frame_alloc, av_frame_copy_props, av_frame_unref, AVBufferRef,
AVFrame, SWS_BILINEAR, sws_getContext, sws_scale_frame, SwsContext,
};
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use crate::pipeline::PipelinePayload;
use crate::utils::get_ffmpeg_error_msg;
use crate::utils::{get_ffmpeg_error_msg, video_variant_id_ref};
use crate::variant::VideoVariant;
pub struct Scaler {
@ -18,9 +18,11 @@ pub struct Scaler {
ctx: *mut SwsContext,
chan_in: broadcast::Receiver<PipelinePayload>,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl Send for Scaler {}
unsafe impl Sync for Scaler {}
impl Scaler {
@ -29,11 +31,13 @@ impl Scaler {
chan_out: UnboundedSender<PipelinePayload>,
variant: VideoVariant,
) -> Self {
let id_ref = video_variant_id_ref(&variant);
Self {
chan_in,
chan_out,
variant,
ctx: ptr::null_mut(),
var_id_ref: id_ref,
}
}
@ -78,6 +82,8 @@ impl Scaler {
(*dst_frame).time_base = (*frame).time_base;
(*dst_frame).pts = (*frame).pts;
(*dst_frame).pkt_dts = (*frame).pkt_dts;
(*dst_frame).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out.send(PipelinePayload::AvFrame(dst_frame))?;
Ok(())
}

View File

@ -1,6 +1,11 @@
use ffmpeg_sys_next::av_make_error_string;
use std::ffi::CStr;
use anyhow::Error;
use ffmpeg_sys_next::{av_buffer_allocz, av_make_error_string, AVBufferRef, memcpy};
use uuid::{Bytes, Uuid};
use crate::variant::{VariantStream, VideoVariant};
pub fn get_ffmpeg_error_msg(ret: libc::c_int) -> String {
unsafe {
const BUF_SIZE: usize = 512;
@ -9,3 +14,48 @@ pub fn get_ffmpeg_error_msg(ret: libc::c_int) -> String {
String::from(CStr::from_ptr(buf.as_ptr()).to_str().unwrap())
}
}
pub fn variant_id_ref(var: &VariantStream) -> Result<*mut AVBufferRef, Error> {
unsafe {
match var {
VariantStream::Audio(va) => {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
va.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
Ok(buf)
}
VariantStream::Video(vv) => {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
vv.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
Ok(buf)
}
_ => return Err(Error::msg("Cannot assign pkt stream index")),
}
}
}
pub fn video_variant_id_ref(var: &VideoVariant) -> *mut AVBufferRef {
unsafe {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
var.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
buf
}
}
pub fn id_ref_to_uuid(buf: *mut AVBufferRef) -> Uuid {
unsafe {
let binding = Bytes::from(*((*buf).data as *const [u8; 16]));
Uuid::from_bytes_ref(&binding).clone()
}
}

View File

@ -1,5 +1,5 @@
use crate::fraction::Fraction;
use std::fmt::{Display, Formatter};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -71,7 +71,7 @@ impl Display for VideoVariant {
pub struct AudioVariant {
/// Unique ID of this variant
pub id: Uuid,
/// Source video stream to use for this variant
pub src_index: usize,
@ -89,6 +89,9 @@ pub struct AudioVariant {
/// Sample rate
pub sample_rate: usize,
/// Sample format as ffmpeg sample format string
pub sample_fmt: String,
}
impl Display for AudioVariant {
@ -102,3 +105,14 @@ impl Display for AudioVariant {
)
}
}
impl VariantStream {
pub fn src_index(&self) -> usize {
match self {
VariantStream::Video(v) => v.src_index,
VariantStream::Audio(v) => v.src_index,
VariantStream::CopyVideo(v) => v.clone(),
VariantStream::CopyAudio(v) => v.clone(),
}
}
}

View File

@ -1,10 +1,12 @@
use std::fmt::Display;
use ffmpeg_sys_next::{AV_LEVEL_UNKNOWN, AV_PROFILE_H264_HIGH};
use ffmpeg_sys_next::AVCodecID::{AV_CODEC_ID_AAC, AV_CODEC_ID_H264};
use uuid::Uuid;
use crate::ingress::ConnectionInfo;
use crate::pipeline::{EgressType, HLSEgressConfig, PipelineConfig};
use crate::variant::{AudioVariant, VariantStream, VideoVariant};
use ffmpeg_sys_next::AVCodecID::{AV_CODEC_ID_AAC, AV_CODEC_ID_H264};
use ffmpeg_sys_next::{AV_LEVEL_UNKNOWN, AV_PROFILE_H264_HIGH};
use std::fmt::Display;
use uuid::Uuid;
#[derive(Clone)]
pub struct Webhook {
@ -54,6 +56,7 @@ impl Webhook {
codec: 86018,
channels: 2,
sample_rate: 44_100,
sample_fmt: "fltp".to_owned(),
};
let audio_var_2 = AudioVariant {
@ -64,6 +67,7 @@ impl Webhook {
codec: 86018,
channels: 2,
sample_rate: 44_100,
sample_fmt: "fltp".to_owned(),
};
Ok(PipelineConfig {