Progress audio encoder (FIFO)

This commit is contained in:
kieran 2024-03-26 15:51:57 +00:00
parent 2a82e2c00b
commit d6c03004ff
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
17 changed files with 803 additions and 271 deletions

40
Cargo.lock generated
View File

@ -349,6 +349,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@ -501,12 +507,45 @@ dependencies = [
"digest",
]
[[package]]
name = "http"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a"
dependencies = [
"bytes",
"http",
"http-body",
"tokio",
]
[[package]]
name = "idna"
version = "0.5.0"
@ -1093,6 +1132,7 @@ dependencies = [
"config",
"ffmpeg-sys-next",
"futures-util",
"hyper",
"itertools",
"libc",
"log",

View File

@ -21,3 +21,4 @@ serde = { version = "1.0.197", features = ["derive"] }
config = { version = "0.14.0", features = ["toml"] }
url = "2.5.0"
itertools = "0.12.1"
hyper = "1.2.0"

View File

@ -7,7 +7,6 @@ use ffmpeg_sys_next::{
avcodec_find_decoder, avcodec_free_context, avcodec_open2, avcodec_parameters_to_context,
avcodec_receive_frame, avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket, AVStream,
};
use ffmpeg_sys_next::AVPictureType::{AV_PICTURE_TYPE_I, AV_PICTURE_TYPE_NONE};
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedReceiver;
@ -101,7 +100,11 @@ impl Decoder {
return Err(Error::msg(format!("Failed to decode {}", ret)));
}
(*frame).time_base = (*stream).time_base;
self.chan_out.send(PipelinePayload::AvFrame("Decoder frame".to_owned(), frame))?;
self.chan_out.send(PipelinePayload::AvFrame(
"Decoder frame".to_owned(),
frame,
stream_index as usize,
))?;
frames += 1;
}
return Ok(frames);

View File

@ -1,3 +1,5 @@
use std::fmt::{Display, Formatter};
use crate::fraction::Fraction;
#[derive(Clone, Debug, PartialEq)]
@ -5,12 +7,35 @@ pub struct DemuxStreamInfo {
pub channels: Vec<StreamInfoChannel>,
}
impl Display for DemuxStreamInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Demuxer Info:")?;
for c in &self.channels {
write!(f, "\n{}", c)?;
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum StreamChannelType {
Video,
Audio,
}
impl Display for StreamChannelType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
StreamChannelType::Video => "video",
StreamChannelType::Audio => "audio",
}
)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct StreamInfoChannel {
pub index: usize,
@ -31,3 +56,13 @@ impl TryInto<Fraction> for StreamInfoChannel {
}
}
}
impl Display for StreamInfoChannel {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} #{}: size={}x{},fps={}",
self.channel_type, self.index, self.width, self.height, self.fps
)
}
}

View File

@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ffi::{CStr, CString};
use std::fmt::{Display, Formatter};
use std::mem::transmute;
use std::ptr;
use std::ptr::slice_from_raw_parts;
@ -7,27 +8,47 @@ use std::ptr::slice_from_raw_parts;
use anyhow::Error;
use ffmpeg_sys_next::{
AV_CH_LAYOUT_STEREO, av_channel_layout_default, av_dump_format, av_get_sample_fmt,
av_interleaved_write_frame, av_opt_set, av_packet_rescale_ts, av_write_frame, AVChannelLayout,
AVChannelLayout__bindgen_ty_1, avcodec_send_frame, avcodec_send_packet, AVCodecContext,
av_interleaved_write_frame, av_opt_set, av_packet_rescale_ts, av_write_frame,
AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_parameters_copy,
avcodec_parameters_from_context, avcodec_send_frame, avcodec_send_packet, AVCodecContext,
avformat_alloc_output_context2, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket,
AVRational,
};
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
use futures_util::StreamExt;
use itertools::Itertools;
use log::info;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
use uuid::{Bytes, Uuid, Variant};
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::fraction::Fraction;
use crate::pipeline::{HLSEgressConfig, PipelinePayload};
use crate::pipeline::PipelinePayload;
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid};
use crate::variant::{VariantStream, VideoVariant};
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HLSEgressConfig {
pub out_dir: String,
pub variants: Vec<VariantStream>,
}
impl Display for HLSEgressConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "HLS: out_dir={}", self.out_dir)?;
if self.variants.len() > 0 {
write!(f, "\n\tStreams: ")?;
for v in &self.variants {
write!(f, "\n\t\t{}", v)?;
}
}
Ok(())
}
}
pub struct HlsEgress {
/// Pipeline id
@ -35,6 +56,7 @@ pub struct HlsEgress {
config: HLSEgressConfig,
ctx: *mut AVFormatContext,
chan_in: UnboundedReceiver<PipelinePayload>,
stream_init: HashSet<i32>,
}
unsafe impl Send for HlsEgress {}
@ -52,6 +74,7 @@ impl HlsEgress {
config,
ctx: ptr::null_mut(),
chan_in,
stream_init: HashSet::new(),
}
}
@ -98,6 +121,34 @@ impl HlsEgress {
0,
);
// configure mapping
let mut stream_map: HashMap<usize, Vec<String>> = HashMap::new();
for var in &self.config.variants {
let cfg = match var {
VariantStream::Video(vx) => format!("v:{}", vx.dst_index),
VariantStream::Audio(ax) => format!("a:{}", ax.dst_index),
};
if let Some(out_stream) = stream_map.get_mut(&var.dst_index()) {
out_stream.push(cfg);
} else {
stream_map.insert(var.dst_index(), vec![cfg]);
}
}
let stream_map = stream_map
.values()
.into_iter()
.map(|v| v.join(","))
.join(" ");
info!("map_str={}", stream_map);
av_opt_set(
(*ctx).priv_data,
"var_stream_map\0".as_ptr() as *const libc::c_char,
format!("{}\0", stream_map).as_ptr() as *const libc::c_char,
0,
);
for var in &mut self.config.variants {
let tb = var.time_base();
match var {
@ -158,34 +209,6 @@ impl HlsEgress {
}
}
// configure mapping
let mut stream_map: HashMap<usize, Vec<String>> = HashMap::new();
for var in &self.config.variants {
let cfg = match var {
VariantStream::Video(vx) => format!("v:{}", vx.dst_index),
VariantStream::Audio(ax) => format!("a:{}", ax.dst_index),
};
if let Some(out_stream) = stream_map.get_mut(&var.dst_index()) {
out_stream.push(cfg);
} else {
stream_map.insert(var.dst_index(), vec![cfg]);
}
}
let stream_map = stream_map
.values()
.into_iter()
.map(|v| v.join(","))
.join(" ");
info!("map_str={}", stream_map);
av_opt_set(
(*ctx).priv_data,
"var_stream_map\0".as_ptr() as *const libc::c_char,
format!("{}\0", stream_map).as_ptr() as *const libc::c_char,
0,
);
av_dump_format(ctx, 0, ptr::null(), 1);
let ret = avformat_write_header(ctx, ptr::null_mut());
@ -208,7 +231,13 @@ impl HlsEgress {
}
let stream = *(*self.ctx).streams.add(variant.unwrap().dst_index());
(*pkt).stream_index = (*stream).index;
let idx = (*stream).index as i32;
(*pkt).stream_index = idx;
if !self.stream_init.contains(&idx) {
let encoder = (*pkt).opaque as *mut AVCodecContext;
avcodec_parameters_from_context((*stream).codecpar, encoder);
self.stream_init.insert(idx);
}
let ret = av_interleaved_write_frame(self.ctx, pkt);
if ret < 0 {

276
src/encode/audio.rs Normal file
View File

@ -0,0 +1,276 @@
use std::mem::transmute;
use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_realloc,
av_audio_fifo_size, av_audio_fifo_write, av_buffer_ref, av_buffer_unref,
AV_CH_LAYOUT_STEREO, av_channel_layout_copy, av_frame_alloc, av_frame_free, av_frame_get_buffer,
av_freep, av_get_sample_fmt, av_packet_alloc, av_packet_free,
av_packet_rescale_ts, av_samples_alloc_array_and_samples, AVAudioFifo,
AVBufferRef, AVChannelLayout, AVChannelLayout__bindgen_ty_1, AVCodec,
avcodec_alloc_context3, avcodec_find_encoder, avcodec_free_context, avcodec_open2, avcodec_receive_packet, avcodec_send_frame,
AVCodecContext, AVERROR, AVFrame, swr_alloc_set_opts2, swr_convert, swr_free,
swr_init, SwrContext,
};
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
use libc::EAGAIN;
use tokio::sync::mpsc::UnboundedSender;
use crate::ipc::Rx;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::{audio_variant_id_ref, get_ffmpeg_error_msg, id_ref_to_uuid};
use crate::variant::AudioVariant;
pub struct AudioEncoder<T> {
variant: AudioVariant,
ctx: *mut AVCodecContext,
codec: *const AVCodec,
fifo: *mut AVAudioFifo,
swr_ctx: *mut SwrContext,
chan_in: T,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl<T> Send for AudioEncoder<T> {}
unsafe impl<T> Sync for AudioEncoder<T> {}
impl<T> Drop for AudioEncoder<T> {
fn drop(&mut self) {
unsafe {
swr_free(&mut self.swr_ctx);
av_audio_fifo_free(self.fifo);
avcodec_free_context(&mut self.ctx);
av_buffer_unref(&mut self.var_id_ref);
}
}
}
impl<TRecv> AudioEncoder<TRecv>
where
TRecv: Rx<PipelinePayload>,
{
pub fn new(
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
variant: AudioVariant,
) -> Self {
let id_ref = audio_variant_id_ref(&variant);
Self {
ctx: ptr::null_mut(),
codec: ptr::null(),
fifo: ptr::null_mut(),
swr_ctx: ptr::null_mut(),
variant,
chan_in,
chan_out,
var_id_ref: id_ref,
}
}
unsafe fn setup_encoder(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
if self.ctx == ptr::null_mut() {
let codec = self.variant.codec;
let encoder = avcodec_find_encoder(transmute(codec as i32));
if encoder == ptr::null_mut() {
return Err(Error::msg("Encoder not found"));
}
let ctx = avcodec_alloc_context3(encoder);
if ctx == ptr::null_mut() {
return Err(Error::msg("Failed to allocate encoder context"));
}
(*ctx).time_base = self.variant.time_base();
(*ctx).sample_fmt = av_get_sample_fmt(
format!("{}\0", self.variant.sample_fmt).as_ptr() as *const libc::c_char,
);
(*ctx).bit_rate = self.variant.bitrate as i64;
(*ctx).sample_rate = self.variant.sample_rate as libc::c_int;
(*ctx).ch_layout = AVChannelLayout {
order: AV_CHANNEL_ORDER_NATIVE,
nb_channels: 2,
u: AVChannelLayout__bindgen_ty_1 {
mask: AV_CH_LAYOUT_STEREO,
},
opaque: ptr::null_mut(),
};
// setup audio FIFO
let fifo = av_audio_fifo_alloc((*ctx).sample_fmt, 2, 1);
if fifo == ptr::null_mut() {
return Err(Error::msg("Failed to allocate audio FiFO buffer"));
}
let mut swr_ctx = ptr::null_mut();
let ret = swr_alloc_set_opts2(
&mut swr_ctx,
&(*ctx).ch_layout,
(*ctx).sample_fmt,
(*ctx).sample_rate,
&(*frame).ch_layout,
transmute((*frame).format),
(*frame).sample_rate,
0,
ptr::null_mut(),
);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
let ret = swr_init(swr_ctx);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
let ret = avcodec_open2(ctx, encoder, ptr::null_mut());
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
self.ctx = ctx;
self.codec = encoder;
self.swr_ctx = swr_ctx;
self.fifo = fifo;
}
Ok(())
}
/// Returns true if we should process audio frame from FIFO
/// false if nothing to process this frame
unsafe fn process_audio_frame(&mut self, frame: *mut AVFrame) -> Result<bool, Error> {
let in_samples = (*frame).nb_samples;
let mut dst_samples: *mut *mut u8 = ptr::null_mut();
let ret = av_samples_alloc_array_and_samples(
&mut dst_samples,
ptr::null_mut(),
2,
in_samples,
(*self.ctx).sample_fmt,
0,
);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
// resample audio
let ret = swr_convert(
self.swr_ctx,
dst_samples,
in_samples,
(*frame).extended_data as *const *const u8,
in_samples,
);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
// push resampled audio into fifo
let ret = av_audio_fifo_realloc(self.fifo, av_audio_fifo_size(self.fifo) + in_samples);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
if av_audio_fifo_write(
self.fifo,
dst_samples as *const *mut libc::c_void,
in_samples,
) < in_samples
{
return Err(Error::msg("Failed to write samples to FIFO"));
}
if dst_samples != ptr::null_mut() {
av_freep(dst_samples.add(0) as *mut libc::c_void);
}
let buffered = av_audio_fifo_size(self.fifo);
Ok(buffered >= (*self.ctx).frame_size)
}
unsafe fn get_fifo_frame(&mut self) -> Result<*mut AVFrame, Error> {
let mut frame = av_frame_alloc();
let frame_size = (*self.ctx).frame_size.min(av_audio_fifo_size(self.fifo));
(*frame).nb_samples = frame_size;
av_channel_layout_copy(&mut (*frame).ch_layout, &(*self.ctx).ch_layout);
(*frame).format = (*self.ctx).sample_fmt as libc::c_int;
(*frame).sample_rate = (*self.ctx).sample_rate;
let ret = av_frame_get_buffer(frame, 0);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
let ret = av_audio_fifo_read(
self.fifo,
ptr::addr_of_mut!((*frame).data) as *const *mut libc::c_void,
frame_size,
);
if ret < frame_size {
av_frame_free(&mut frame);
return Err(Error::msg("Failed to read frame from FIFO"));
}
Ok(frame)
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
let var_id = id_ref_to_uuid((*frame).opaque_ref)?;
assert_eq!(var_id, self.variant.id);
self.setup_encoder(frame)?;
if !self.process_audio_frame(frame)? {
return Ok(());
}
// read audio from FIFO
let frame = self.get_fifo_frame()?;
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
while ret > 0 || ret == AVERROR(EAGAIN) {
let mut pkt = av_packet_alloc();
ret = avcodec_receive_packet(self.ctx, pkt);
if ret < 0 {
av_packet_free(&mut pkt);
if ret == AVERROR(EAGAIN) {
return Ok(());
}
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
(*pkt).time_base = (*self.ctx).time_base;
(*pkt).duration = (*frame).duration;
av_packet_rescale_ts(pkt, (*frame).time_base, (*self.ctx).time_base);
(*pkt).opaque = self.ctx as *mut libc::c_void;
(*pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out
.send(PipelinePayload::AvPacket("Encoder packet".to_owned(), pkt))?;
}
Ok(())
}
}
impl<TRecv> PipelineProcessor for AudioEncoder<TRecv>
where
TRecv: Rx<PipelinePayload>,
{
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(_, frm, idx) => unsafe {
if self.variant.src_index == idx {
self.process_frame(frm)?;
}
},
_ => return Err(Error::msg("Payload not supported")),
}
}
Ok(())
}
}

View File

@ -1,165 +1,2 @@
use std::mem::transmute;
use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_buffer_ref, AV_CH_LAYOUT_STEREO, av_get_sample_fmt, av_opt_set, av_packet_alloc,
av_packet_free, av_packet_rescale_ts, AVBufferRef, AVChannelLayout,
AVChannelLayout__bindgen_ty_1, AVCodec, avcodec_alloc_context3, avcodec_find_encoder,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR,
AVFrame,
};
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
use libc::EAGAIN;
use tokio::sync::mpsc::UnboundedSender;
use crate::ipc::Rx;
use crate::pipeline::PipelinePayload;
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid, variant_id_ref};
use crate::variant::VariantStream;
pub struct Encoder<T> {
variant: VariantStream,
ctx: *mut AVCodecContext,
codec: *const AVCodec,
chan_in: T,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl<T> Send for Encoder<T> {}
unsafe impl<T> Sync for Encoder<T> {}
impl<TRecv> Encoder<TRecv>
where
TRecv: Rx<PipelinePayload>,
{
pub fn new(
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
variant: VariantStream,
) -> Self {
let id_ref = variant_id_ref(&variant).unwrap();
Self {
ctx: ptr::null_mut(),
codec: ptr::null(),
variant,
chan_in,
chan_out,
var_id_ref: id_ref,
}
}
unsafe fn setup_encoder(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
if self.ctx == ptr::null_mut() {
let codec = match &self.variant {
VariantStream::Video(vv) => vv.codec,
VariantStream::Audio(va) => va.codec,
_ => return Err(Error::msg("Not supported")),
};
let encoder = avcodec_find_encoder(transmute(codec as i32));
if encoder == ptr::null_mut() {
return Err(Error::msg("Encoder not found"));
}
let ctx = avcodec_alloc_context3(encoder);
if ctx == ptr::null_mut() {
return Err(Error::msg("Failed to allocate encoder context"));
}
(*ctx).time_base = self.variant.time_base();
match &self.variant {
VariantStream::Video(vv) => {
(*ctx).bit_rate = vv.bitrate as i64;
(*ctx).width = (*frame).width;
(*ctx).height = (*frame).height;
let key_frames = vv.fps * vv.keyframe_interval;
(*ctx).gop_size = key_frames as libc::c_int;
(*ctx).max_b_frames = 1;
(*ctx).pix_fmt = AV_PIX_FMT_YUV420P;
av_opt_set(
(*ctx).priv_data,
"preset\0".as_ptr() as *const libc::c_char,
"fast\0".as_ptr() as *const libc::c_char,
0,
);
}
VariantStream::Audio(va) => {
(*ctx).sample_fmt = av_get_sample_fmt(
format!("{}\0", va.sample_fmt).as_ptr() as *const libc::c_char
);
(*ctx).bit_rate = va.bitrate as i64;
(*ctx).sample_rate = va.sample_rate as libc::c_int;
(*ctx).ch_layout = AVChannelLayout {
order: AV_CHANNEL_ORDER_NATIVE,
nb_channels: 2,
u: AVChannelLayout__bindgen_ty_1 {
mask: AV_CH_LAYOUT_STEREO,
},
opaque: ptr::null_mut(),
};
}
_ => {
// nothing
}
};
let ret = avcodec_open2(ctx, encoder, ptr::null_mut());
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
self.ctx = ctx;
self.codec = encoder;
}
Ok(())
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
self.setup_encoder(frame)?;
let var_id = id_ref_to_uuid((*frame).opaque_ref)?;
assert_eq!(var_id, self.variant.id());
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
while ret > 0 || ret == AVERROR(EAGAIN) {
let mut pkt = av_packet_alloc();
ret = avcodec_receive_packet(self.ctx, pkt);
if ret < 0 {
av_packet_free(&mut pkt);
if ret == AVERROR(EAGAIN) {
return Ok(());
}
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
(*pkt).time_base = (*self.ctx).time_base;
(*pkt).duration = (*frame).duration;
av_packet_rescale_ts(pkt, (*frame).time_base, (*self.ctx).time_base);
(*pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out
.send(PipelinePayload::AvPacket("Encoder packet".to_owned(), pkt))?;
}
Ok(())
}
pub fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(_, frm) => unsafe {
self.process_frame(frm)?;
},
_ => return Err(Error::msg("Payload not supported")),
}
}
Ok(())
}
}
pub mod audio;
pub mod video;

152
src/encode/video.rs Normal file
View File

@ -0,0 +1,152 @@
use std::mem::transmute;
use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_buffer_ref, av_opt_set, av_packet_alloc, av_packet_free, av_packet_rescale_ts,
AVBufferRef, AVCodec, avcodec_alloc_context3, avcodec_find_encoder,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational,
};
use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709;
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
use libc::EAGAIN;
use tokio::sync::mpsc::UnboundedSender;
use crate::ipc::Rx;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid, video_variant_id_ref};
use crate::variant::VideoVariant;
pub struct VideoEncoder<T> {
variant: VideoVariant,
ctx: *mut AVCodecContext,
codec: *const AVCodec,
chan_in: T,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl<T> Send for VideoEncoder<T> {}
unsafe impl<T> Sync for VideoEncoder<T> {}
impl<TRecv> VideoEncoder<TRecv>
where
TRecv: Rx<PipelinePayload>,
{
pub fn new(
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
variant: VideoVariant,
) -> Self {
let id_ref = video_variant_id_ref(&variant);
Self {
ctx: ptr::null_mut(),
codec: ptr::null(),
variant,
chan_in,
chan_out,
var_id_ref: id_ref,
}
}
unsafe fn setup_encoder(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
if self.ctx == ptr::null_mut() {
let codec = self.variant.codec;
let encoder = avcodec_find_encoder(transmute(codec as i32));
if encoder == ptr::null_mut() {
return Err(Error::msg("Encoder not found"));
}
let ctx = avcodec_alloc_context3(encoder);
if ctx == ptr::null_mut() {
return Err(Error::msg("Failed to allocate encoder context"));
}
(*ctx).time_base = self.variant.time_base();
(*ctx).bit_rate = self.variant.bitrate as i64;
(*ctx).width = (*frame).width;
(*ctx).height = (*frame).height;
(*ctx).level = self.variant.level as libc::c_int;
(*ctx).profile = self.variant.profile as libc::c_int;
(*ctx).framerate = AVRational {
num: 1,
den: self.variant.fps as libc::c_int,
};
let key_frames = self.variant.fps * self.variant.keyframe_interval;
(*ctx).gop_size = key_frames as libc::c_int;
(*ctx).max_b_frames = 1;
(*ctx).pix_fmt = AV_PIX_FMT_YUV420P;
(*ctx).colorspace = AVCOL_SPC_BT709;
av_opt_set(
(*ctx).priv_data,
"preset\0".as_ptr() as *const libc::c_char,
"fast\0".as_ptr() as *const libc::c_char,
0,
);
let ret = avcodec_open2(ctx, encoder, ptr::null_mut());
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
self.ctx = ctx;
self.codec = encoder;
}
Ok(())
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
let var_id = id_ref_to_uuid((*frame).opaque_ref)?;
assert_eq!(var_id, self.variant.id);
self.setup_encoder(frame)?;
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
while ret > 0 || ret == AVERROR(EAGAIN) {
let mut pkt = av_packet_alloc();
ret = avcodec_receive_packet(self.ctx, pkt);
if ret < 0 {
av_packet_free(&mut pkt);
if ret == AVERROR(EAGAIN) {
return Ok(());
}
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
(*pkt).time_base = (*self.ctx).time_base;
(*pkt).duration = (*frame).duration;
av_packet_rescale_ts(pkt, (*frame).time_base, (*self.ctx).time_base);
(*pkt).opaque = self.ctx as *mut libc::c_void;
(*pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out
.send(PipelinePayload::AvPacket("Encoder packet".to_owned(), pkt))?;
}
Ok(())
}
}
impl<TRecv> PipelineProcessor for VideoEncoder<TRecv>
where
TRecv: Rx<PipelinePayload>,
{
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(_, frm, idx) => unsafe {
if self.variant.src_index == idx {
self.process_frame(frm)?;
}
},
_ => return Err(Error::msg("Payload not supported")),
}
}
Ok(())
}
}

View File

@ -24,6 +24,7 @@ mod utils;
mod variant;
mod webhook;
mod ipc;
mod tag_frame;
/// Test: ffmpeg -re -f lavfi -i testsrc -g 2 -r 30 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -f mpegts srt://localhost:3333
#[tokio::main]

View File

@ -1,9 +1,12 @@
use std::fmt::{Display, Formatter};
use std::ops::{Deref, DerefMut};
use anyhow::Error;
use ffmpeg_sys_next::{av_frame_clone, av_frame_copy_props, av_frame_free, av_packet_clone, av_packet_copy_props, av_packet_free, AVFrame, AVPacket};
use serde::{Deserialize, Serialize};
use crate::demux::info::DemuxStreamInfo;
use crate::egress::hls::HLSEgressConfig;
use crate::variant::VariantStream;
pub mod builder;
@ -17,12 +20,20 @@ pub enum EgressType {
MPEGTS,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HLSEgressConfig {
pub out_dir: String,
pub variants: Vec<VariantStream>,
impl Display for EgressType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
EgressType::HLS(c) => format!("{}", c),
EgressType::DASH => "DASH".to_owned(),
EgressType::WHEP => "WHEP".to_owned(),
EgressType::MPEGTS => "MPEGTS".to_owned(),
}
)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct PipelineConfig {
pub id: uuid::Uuid,
@ -30,6 +41,25 @@ pub struct PipelineConfig {
pub egress: Vec<EgressType>,
}
impl Display for PipelineConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "\nPipeline Config ID={}", self.id)?;
if self.recording.len() > 0 {
write!(f, "\nRecording:")?;
for r in &self.recording {
write!(f, "\n\t{}", r)?;
}
}
if self.egress.len() > 0 {
write!(f, "\nEgress:")?;
for e in &self.egress {
write!(f, "\n\t{}", e)?;
}
}
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub enum PipelinePayload {
/// No output
@ -39,7 +69,7 @@ pub enum PipelinePayload {
/// FFMpeg AVPacket
AvPacket(String, *mut AVPacket),
/// FFMpeg AVFrame
AvFrame(String, *mut AVFrame),
AvFrame(String, *mut AVFrame, usize),
/// Information about the input stream
SourceInfo(DemuxStreamInfo),
}
@ -58,10 +88,10 @@ impl Clone for PipelinePayload {
av_packet_copy_props(new_pkt, *p);
PipelinePayload::AvPacket(t.clone(), new_pkt)
},
PipelinePayload::AvFrame(t, p) => unsafe {
PipelinePayload::AvFrame(t, p, idx) => unsafe {
let new_frame = av_frame_clone(*p);
av_frame_copy_props(new_frame, *p);
PipelinePayload::AvFrame(t.clone(), new_frame)
PipelinePayload::AvFrame(t.clone(), new_frame, idx.clone())
},
PipelinePayload::SourceInfo(i) => PipelinePayload::SourceInfo(i.clone()),
}
@ -76,10 +106,14 @@ impl Drop for PipelinePayload {
PipelinePayload::AvPacket(_, p) => unsafe {
av_packet_free(p);
},
PipelinePayload::AvFrame(_, p) => unsafe {
PipelinePayload::AvFrame(_, p, _) => unsafe {
av_frame_free(p);
},
PipelinePayload::SourceInfo(_) => {}
}
}
}
pub trait PipelineProcessor {
fn process(&mut self) -> Result<(), Error>;
}

View File

@ -12,15 +12,17 @@ use crate::decode::Decoder;
use crate::demux::Demuxer;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::egress::hls::HlsEgress;
use crate::encode::Encoder;
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload};
use crate::encode::audio::AudioEncoder;
use crate::encode::video::VideoEncoder;
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineProcessor};
use crate::scale::Scaler;
use crate::tag_frame::TagFrame;
use crate::variant::VariantStream;
use crate::webhook::Webhook;
struct ScalerEncoder {
pub scaler: Scaler,
pub encoder: Encoder<UnboundedReceiver<PipelinePayload>>,
struct PipelineChain {
pub first: Box<dyn PipelineProcessor + Sync + Send>,
pub second: Box<dyn PipelineProcessor + Sync + Send>,
}
pub struct PipelineRunner {
@ -28,8 +30,7 @@ pub struct PipelineRunner {
demuxer: Demuxer,
decoder: Decoder,
decoder_output: broadcast::Receiver<PipelinePayload>,
scalers: Vec<ScalerEncoder>,
encoders: Vec<Encoder<broadcast::Receiver<PipelinePayload>>>,
encoders: Vec<PipelineChain>,
egress: Vec<HlsEgress>,
started: Instant,
frame_no: u64,
@ -50,7 +51,6 @@ impl PipelineRunner {
demuxer: Demuxer::new(recv, demux_out),
decoder: Decoder::new(demux_in, dec_tx),
decoder_output: dec_rx,
scalers: vec![],
encoders: vec![],
egress: vec![],
started: Instant::now(),
@ -86,20 +86,15 @@ impl PipelineRunner {
panic!("Frame number overflowed, maybe you need a bigger number!");
}
// video scalar-encoder chains
for sw in &mut self.scalers {
sw.scaler.process()?;
sw.encoder.process()?;
for eg in &mut self.egress {
eg.process()?;
}
// (scalar)-encoder chains
for sw in &mut self.encoders {
sw.first.process()?;
sw.second.process()?;
}
// audio encoder chains
for enc in &mut self.encoders {
enc.process()?;
for eg in &mut self.egress {
eg.process()?;
}
// egress outputs
for eg in &mut self.egress {
eg.process()?;
}
Ok(())
}
@ -108,11 +103,11 @@ impl PipelineRunner {
if self.stream_info.is_some() {
return Err(Error::msg("Pipeline already configured!"));
}
info!("Configuring pipeline {:?}", info);
self.stream_info = Some(info.clone());
// re-configure with demuxer info
self.config = self.webhook.configure(&info);
info!("Configuring pipeline {}", self.config);
let video_stream = info
.channels
@ -131,21 +126,33 @@ impl PipelineRunner {
match v {
VariantStream::Video(vs) => {
let (sw_tx, sw_rx) = unbounded_channel();
self.scalers.push(ScalerEncoder {
scaler: Scaler::new(
self.encoders.push(PipelineChain {
first: Box::new(Scaler::new(
self.decoder_output.resubscribe(),
sw_tx.clone(),
vs.clone(),
),
encoder: Encoder::new(sw_rx, egress_tx.clone(), v.clone()),
)),
second: Box::new(VideoEncoder::new(
sw_rx,
egress_tx.clone(),
vs.clone(),
)),
});
}
VariantStream::Audio(_) => {
self.encoders.push(Encoder::new(
self.decoder_output.resubscribe(),
egress_tx.clone(),
v.clone(),
));
VariantStream::Audio(va) => {
let (tag_tx, tag_rx) = unbounded_channel();
self.encoders.push(PipelineChain {
first: Box::new(TagFrame::new(
v.clone(),
self.decoder_output.resubscribe(),
tag_tx,
)),
second: Box::new(AudioEncoder::new(
tag_rx,
egress_tx.clone(),
va.clone(),
)),
});
}
c => {
return Err(Error::msg(format!(

View File

@ -3,13 +3,13 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_buffer_ref, av_frame_alloc, av_frame_copy_props, AVBufferRef,
AVFrame, SWS_BILINEAR, sws_getContext, sws_scale_frame, SwsContext,
av_buffer_ref, av_frame_alloc, av_frame_copy_props, AVBufferRef, AVFrame,
SWS_BILINEAR, sws_getContext, sws_scale_frame, SwsContext,
};
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use crate::pipeline::PipelinePayload;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::{get_ffmpeg_error_msg, video_variant_id_ref};
use crate::variant::VideoVariant;
@ -41,11 +41,7 @@ impl Scaler {
}
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
if (*frame).width == 0 {
// only picture frames supported
return Ok(());
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame, src_index: usize) -> Result<(), Error> {
let dst_fmt = transmute((*frame).format);
if self.ctx == ptr::null_mut() {
@ -80,15 +76,23 @@ impl Scaler {
(*dst_frame).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out.send(PipelinePayload::AvFrame("Scaler frame".to_owned(), dst_frame))?;
self.chan_out.send(PipelinePayload::AvFrame(
"Scaler frame".to_owned(),
dst_frame,
src_index
))?;
Ok(())
}
}
pub fn process(&mut self) -> Result<(), Error> {
impl PipelineProcessor for Scaler {
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv() {
match pkg {
PipelinePayload::AvFrame(_, frm) => unsafe {
self.process_frame(frm)?;
PipelinePayload::AvFrame(_, frm, idx) => unsafe {
if self.variant.src_index == idx {
self.process_frame(frm, idx)?;
}
},
_ => return Err(Error::msg("Payload not supported payload")),
}

61
src/tag_frame.rs Normal file
View File

@ -0,0 +1,61 @@
use anyhow::Error;
use ffmpeg_sys_next::{av_buffer_ref, av_frame_clone, av_frame_copy_props, AVBufferRef};
use tokio::sync::mpsc::UnboundedSender;
use crate::ipc::Rx;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::variant_id_ref;
use crate::variant::VariantStream;
pub struct TagFrame<TRecv> {
variant: VariantStream,
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl<T> Send for TagFrame<T> {}
unsafe impl<T> Sync for TagFrame<T> {}
impl<TRecv> TagFrame<TRecv>
where
TRecv: Rx<PipelinePayload>,
{
pub fn new(
var: VariantStream,
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
) -> Self {
let id_ref = variant_id_ref(&var).unwrap();
Self {
variant: var,
var_id_ref: id_ref,
chan_in,
chan_out,
}
}
}
impl<TRecv> PipelineProcessor for TagFrame<TRecv>
where
TRecv: Rx<PipelinePayload>,
{
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(ref tag, frm, idx) => unsafe {
if idx == self.variant.src_index() {
let new_frame = av_frame_clone(frm);
av_frame_copy_props(new_frame, frm);
(*new_frame).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out
.send(PipelinePayload::AvFrame(tag.clone(), new_frame, idx))?;
}
},
_ => return Err(Error::msg("Payload not supported")),
};
}
Ok(())
}
}

View File

@ -5,7 +5,7 @@ use anyhow::Error;
use ffmpeg_sys_next::{av_buffer_allocz, av_make_error_string, AVBufferRef, memcpy};
use uuid::{Bytes, Uuid};
use crate::variant::{VariantStream, VideoVariant};
use crate::variant::{AudioVariant, VariantStream, VideoVariant};
pub fn get_ffmpeg_error_msg(ret: libc::c_int) -> String {
unsafe {
@ -54,6 +54,18 @@ pub fn video_variant_id_ref(var: &VideoVariant) -> *mut AVBufferRef {
}
}
pub fn audio_variant_id_ref(var: &AudioVariant) -> *mut AVBufferRef {
unsafe {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
var.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
buf
}
}
pub fn id_ref_to_uuid(buf: *mut AVBufferRef) -> Result<Uuid, Error> {
unsafe {
if buf == ptr::null_mut() {

View File

@ -1,6 +1,8 @@
use std::ffi::CStr;
use std::fmt::{Display, Formatter};
use std::mem::transmute;
use ffmpeg_sys_next::AVRational;
use ffmpeg_sys_next::{avcodec_get_name, AVRational};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -12,6 +14,15 @@ pub enum VariantStream {
Audio(AudioVariant),
}
impl Display for VariantStream {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
VariantStream::Video(v) => write!(f, "{}", v),
VariantStream::Audio(a) => write!(f, "{}", a),
}
}
}
/// Information related to variant streams for a given egress
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct VideoVariant {
@ -53,9 +64,15 @@ impl Display for VideoVariant {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Video #{}: {}, {}p, {}fps, {}kbps",
"Video #{}->{}: {}, {}x{}, {}fps, {}kbps",
self.src_index,
self.codec,
self.dst_index,
unsafe {
CStr::from_ptr(avcodec_get_name(transmute(self.codec as i32)))
.to_str()
.unwrap()
},
self.width,
self.height,
self.fps,
self.bitrate / 1000
@ -95,9 +112,14 @@ impl Display for AudioVariant {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Audio #{}: {}, {}kbps",
"Audio #{}->{}: {}, {}kbps",
self.src_index,
self.codec,
self.dst_index,
unsafe {
CStr::from_ptr(avcodec_get_name(transmute(self.codec as i32)))
.to_str()
.unwrap()
},
self.bitrate / 1000
)
}
@ -127,14 +149,26 @@ impl VariantStream {
pub fn time_base(&self) -> AVRational {
match &self {
VariantStream::Video(vv) => AVRational {
num: 1,
den: 90_000,
},
VariantStream::Audio(va) => AVRational {
num: 1,
den: va.sample_rate as libc::c_int,
},
VariantStream::Video(vv) => vv.time_base(),
VariantStream::Audio(va) => va.time_base(),
}
}
}
impl VideoVariant {
pub fn time_base(&self) -> AVRational {
AVRational {
num: 1,
den: 90_000,
}
}
}
impl AudioVariant {
pub fn time_base(&self) -> AVRational {
AVRational {
num: 1,
den: self.sample_rate as libc::c_int,
}
}
}

View File

@ -3,8 +3,9 @@ use std::fmt::Display;
use uuid::Uuid;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::egress::hls::HLSEgressConfig;
use crate::ingress::ConnectionInfo;
use crate::pipeline::{EgressType, HLSEgressConfig, PipelineConfig};
use crate::pipeline::{EgressType, PipelineConfig};
use crate::settings::Settings;
use crate::variant::{AudioVariant, VariantStream, VideoVariant};
@ -34,7 +35,7 @@ impl Webhook {
bitrate: 3_000_000,
codec: 27,
profile: 100,
level: 1,
level: 51,
keyframe_interval: 2,
}));
vars.push(VariantStream::Video(VideoVariant {
@ -47,7 +48,7 @@ impl Webhook {
bitrate: 1_000_000,
codec: 27,
profile: 100,
level: 1,
level: 51,
keyframe_interval: 2,
}));
let has_audio = stream_info

5
test.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/bash
ffmpeg \
-re -f lavfi -i testsrc -g 60 -r 30 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -c:a aac -b:a 192k \
-f mpegts srt://localhost:3333