Remove opaque pointers in pkts

This commit is contained in:
kieran 2024-08-29 11:46:57 +01:00
parent 759492d974
commit a64b54ba12
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
18 changed files with 250 additions and 396 deletions

View File

@ -3,15 +3,15 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_frame_alloc, AVCodec, avcodec_alloc_context3,
avcodec_find_decoder, avcodec_free_context, avcodec_open2, avcodec_parameters_to_context,
avcodec_receive_frame, avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket, AVStream,
av_frame_alloc, AVCodec, avcodec_alloc_context3, avcodec_find_decoder,
avcodec_free_context, avcodec_open2, avcodec_parameters_to_context, avcodec_receive_frame,
avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket,
};
use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_NONE;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedReceiver;
use crate::pipeline::PipelinePayload;
use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload};
struct CodecContext {
pub context: *mut AVCodecContext,
@ -50,9 +50,17 @@ impl Decoder {
}
}
pub unsafe fn decode_pkt(&mut self, pkt: *mut AVPacket) -> Result<usize, Error> {
pub unsafe fn decode_pkt(
&mut self,
pkt: *mut AVPacket,
src: &AVPacketSource,
) -> Result<usize, Error> {
let stream_index = (*pkt).stream_index;
let stream = (*pkt).opaque as *mut AVStream;
let stream = match src {
AVPacketSource::Demuxer(s) => *s,
_ => return Err(Error::msg(format!("Cannot decode packet from: {:?}", src))),
};
assert_eq!(
stream_index,
(*stream).index,
@ -101,11 +109,9 @@ impl Decoder {
}
// reset picture type, not to confuse the encoder
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
(*frame).opaque = stream as *mut libc::c_void;
self.chan_out.send(PipelinePayload::AvFrame(
"Decoder frame".to_owned(),
frame,
stream_index as usize,
AVFrameSource::Decoder(stream),
))?;
frames += 1;
}
@ -116,9 +122,9 @@ impl Decoder {
pub fn process(&mut self) -> Result<usize, Error> {
if let Ok(pkg) = self.chan_in.try_recv() {
return if let PipelinePayload::AvPacket(_, pkt) = pkg {
return if let PipelinePayload::AvPacket(pkt, ref src) = pkg {
unsafe {
let frames = self.decode_pkt(pkt)?;
let frames = self.decode_pkt(pkt, src)?;
Ok(frames)
}
} else {

View File

@ -5,16 +5,16 @@ use std::time::Duration;
use anyhow::Error;
use bytes::{BufMut, Bytes};
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use ffmpeg_sys_next::*;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use log::{info, warn};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::Mutex;
use tokio::time::Instant;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType, StreamInfoChannel};
use crate::pipeline::PipelinePayload;
use crate::pipeline::{AVPacketSource, PipelinePayload};
use crate::utils::get_ffmpeg_error_msg;
pub mod info;
@ -69,8 +69,7 @@ unsafe extern "C" fn read_data(
}
}
Err(e) => match e {
TryRecvError::Empty => {
}
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
warn!("EOF");
return AVERROR_EOF;
@ -179,9 +178,10 @@ impl Demuxer {
return Err(Error::msg(msg));
}
let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize);
(*pkt).opaque = stream as *mut libc::c_void;
let pkg = PipelinePayload::AvPacket("Demuxer packet".to_owned(), pkt);
let pkg = PipelinePayload::AvPacket(
pkt,
AVPacketSource::Demuxer(stream),
);
self.chan_out.send(pkg)?;
Ok(())
}

View File

@ -4,7 +4,14 @@ use std::mem::transmute;
use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{AV_CH_LAYOUT_STEREO, av_channel_layout_copy, av_dump_format, av_get_sample_fmt, av_interleaved_write_frame, av_opt_set, av_packet_clone, av_packet_copy_props, AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_find_encoder, avcodec_parameters_from_context, avcodec_parameters_to_context, AVCodecContext, avformat_alloc_output_context2, avformat_free_context, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket, AVRational};
use ffmpeg_sys_next::{
AV_CH_LAYOUT_STEREO, av_channel_layout_copy, av_dump_format, av_get_sample_fmt,
av_interleaved_write_frame, av_opt_set, av_packet_clone, av_packet_copy_props,
AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_find_encoder,
avcodec_parameters_from_context, avcodec_parameters_to_context, AVCodecContext, avformat_alloc_output_context2,
avformat_free_context, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket,
AVRational,
};
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
@ -16,10 +23,10 @@ use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::UnboundedReceiver;
use uuid::Uuid;
use crate::egress::{EgressConfig, get_pkt_variant, map_variants_to_streams, update_pkt_for_muxer};
use crate::egress::{EgressConfig, map_variants_to_streams};
use crate::encode::dump_pkt_info;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid};
use crate::pipeline::{AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::{VariantStream, VariantStreamType};
pub struct HlsEgress {
@ -150,9 +157,17 @@ impl HlsEgress {
Ok(())
}
unsafe fn process_pkt_internal(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
let variant = get_pkt_variant(&self.config.variants, pkt)?;
update_pkt_for_muxer(self.ctx, pkt, &variant);
unsafe fn process_pkt_internal(
&mut self,
pkt: *mut AVPacket,
src: &AVPacketSource,
) -> Result<(), Error> {
let variant = match src {
AVPacketSource::Encoder(v) => v,
_ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))),
};
(*pkt).stream_index = variant.dst_index() as libc::c_int;
//dump_pkt_info(pkt);
let ret = av_interleaved_write_frame(self.ctx, pkt);
if ret < 0 {
@ -161,20 +176,21 @@ impl HlsEgress {
Ok(())
}
unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
let variant = get_pkt_variant(&self.config.variants, pkt)?;
if !self.stream_init.contains(&variant.dst_index()) {
let encoder_ctx = (*pkt).opaque as *mut AVCodecContext;
let out_stream = *(*self.ctx).streams.add(variant.dst_index());
avcodec_parameters_from_context((*out_stream).codecpar, encoder_ctx);
self.stream_init.insert(variant.dst_index());
}
unsafe fn process_pkt(
&mut self,
pkt: *mut AVPacket,
src: &AVPacketSource,
) -> Result<(), Error> {
let variant = match &src {
AVPacketSource::Encoder(v) => v,
_ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))),
};
if !self.init {
let pkt_clone = av_packet_clone(pkt);
av_packet_copy_props(pkt_clone, pkt);
self.packet_buffer.push_back(PipelinePayload::AvPacket(
"Buffered Muxer Packet".to_string(),
pkt_clone,
AVPacketSource::Muxer(variant.clone()),
));
}
@ -189,8 +205,8 @@ impl HlsEgress {
// push in pkts from buffer
while let Some(pkt) = self.packet_buffer.pop_front() {
match pkt {
PipelinePayload::AvPacket(_, pkt) => {
self.process_pkt_internal(pkt)?;
PipelinePayload::AvPacket(pkt, ref src) => {
self.process_pkt_internal(pkt, src)?;
}
_ => return Err(Error::msg("")),
}
@ -200,7 +216,7 @@ impl HlsEgress {
return Ok(());
}
self.process_pkt_internal(pkt)
self.process_pkt_internal(pkt, src)
}
}
@ -208,13 +224,20 @@ impl PipelineProcessor for HlsEgress {
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv() {
match pkg {
PipelinePayload::AvPacket(_, pkt) => unsafe {
PipelinePayload::AvPacket(pkt, ref src) => unsafe {
self.process_pkt(pkt, src)?;
},
PipelinePayload::EncoderInfo(ref var, ctx) => unsafe {
if self.ctx.is_null() {
self.setup_muxer()?;
}
self.process_pkt(pkt)?;
if !self.stream_init.contains(&var.dst_index()) {
let out_stream = *(*self.ctx).streams.add(var.dst_index());
avcodec_parameters_from_context((*out_stream).codecpar, ctx);
self.stream_init.insert(var.dst_index());
}
},
_ => return Err(Error::msg("Payload not supported")),
_ => return Err(Error::msg(format!("Payload not supported: {:?}", pkg))),
}
}
Ok(())

View File

@ -2,15 +2,13 @@ use std::fmt::{Display, Formatter};
use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{av_packet_rescale_ts, avformat_new_stream, AVFormatContext, AVPacket};
use ffmpeg_sys_next::{avformat_new_stream, AVFormatContext};
use serde::{Deserialize, Serialize};
use crate::utils::id_ref_to_uuid;
use crate::variant::{VariantStream, VariantStreamType};
pub mod hls;
pub mod http;
pub mod mpegts;
pub mod recorder;
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -62,36 +60,4 @@ pub unsafe fn map_variants_to_streams(
}
}
Ok(())
}
/// Get variant of this packet
pub unsafe fn get_pkt_variant(
vars: &Vec<VariantStream>,
pkt: *mut AVPacket,
) -> Result<&VariantStream, Error> {
let variant_id = id_ref_to_uuid((*pkt).opaque_ref)?;
let variant = vars.iter().find(|v| v.id() == variant_id);
if variant.is_none() {
return Err(Error::msg(format!(
"No stream found with id={:?}",
variant_id
)));
}
Ok(variant.unwrap())
}
/// Update packet stream index to match muxer stream
pub unsafe fn update_pkt_for_muxer(
ctx: *mut AVFormatContext,
pkt: *mut AVPacket,
var: &VariantStream,
) {
let stream = *(*ctx).streams.add(var.dst_index());
let idx = (*stream).index;
if idx != (*pkt).stream_index {
(*pkt).stream_index = idx;
}
// match stream timebase in muxer
av_packet_rescale_ts(pkt, var.time_base(), (*stream).time_base);
(*pkt).time_base = (*stream).time_base;
}
}

View File

@ -1,118 +0,0 @@
use std::{fs, ptr};
use std::collections::HashSet;
use std::fmt::Display;
use anyhow::Error;
use ffmpeg_sys_next::{av_guess_format, av_interleaved_write_frame, av_strdup, avcodec_parameters_from_context, AVCodecContext, avformat_alloc_context, avformat_free_context, avformat_write_header, AVFormatContext, AVIO_FLAG_READ_WRITE, avio_open2, AVPacket};
use itertools::Itertools;
use tokio::sync::mpsc::UnboundedReceiver;
use uuid::Uuid;
use crate::egress::{EgressConfig, get_pkt_variant, map_variants_to_streams, update_pkt_for_muxer};
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::VariantStreamType;
pub struct MPEGTSEgress {
id: Uuid,
config: EgressConfig,
ctx: *mut AVFormatContext,
chan_in: UnboundedReceiver<PipelinePayload>,
stream_init: HashSet<i32>,
}
unsafe impl Send for MPEGTSEgress {}
unsafe impl Sync for MPEGTSEgress {}
impl Drop for MPEGTSEgress {
fn drop(&mut self) {
unsafe {
avformat_free_context(self.ctx);
self.ctx = ptr::null_mut();
}
}
}
impl MPEGTSEgress {
pub fn new(
chan_in: UnboundedReceiver<PipelinePayload>,
id: Uuid,
config: EgressConfig,
) -> Self {
Self {
id,
config,
ctx: ptr::null_mut(),
chan_in,
stream_init: HashSet::new(),
}
}
unsafe fn setup_muxer(&mut self) -> Result<(), Error> {
let mut ctx = avformat_alloc_context();
if ctx.is_null() {
return Err(Error::msg("Failed to create muxer context"));
}
let base = format!("{}/{}", self.config.out_dir, self.id);
fs::create_dir_all(base.clone())?;
let ret = avio_open2(
&mut (*ctx).pb,
format!("{}/live.ts\0", base).as_ptr() as *const libc::c_char,
AVIO_FLAG_READ_WRITE,
ptr::null(),
ptr::null_mut(),
);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
(*ctx).oformat = av_guess_format(
"mpegts\0".as_ptr() as *const libc::c_char,
ptr::null(),
ptr::null(),
);
if (*ctx).oformat.is_null() {
return Err(Error::msg("Output format not found"));
}
(*ctx).url = av_strdup(format!("{}/live.ts\0", base).as_ptr() as *const libc::c_char);
map_variants_to_streams(ctx, &mut self.config.variants)?;
let ret = avformat_write_header(ctx, ptr::null_mut());
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
self.ctx = ctx;
Ok(())
}
unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
let variant = get_pkt_variant(&self.config.variants, pkt)?;
update_pkt_for_muxer(self.ctx, pkt, &variant);
let ret = av_interleaved_write_frame(self.ctx, pkt);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
Ok(())
}
}
impl PipelineProcessor for MPEGTSEgress {
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv() {
match pkg {
PipelinePayload::AvPacket(_, pkt) => unsafe {
if self.ctx.is_null() {
self.setup_muxer()?;
}
self.process_pkt(pkt)?;
},
_ => return Err(Error::msg("Payload not supported")),
}
}
Ok(())
}
}

View File

@ -1,19 +1,19 @@
use std::{fs, ptr};
use std::collections::HashSet;
use std::fmt::Display;
use std::{fs, ptr};
use anyhow::Error;
use ffmpeg_sys_next::{
av_dump_format, av_guess_format, av_interleaved_write_frame, av_strdup, avformat_alloc_context
, avformat_free_context,
AVFormatContext, AVIO_FLAG_READ_WRITE, avio_open2, AVPacket,
av_dump_format, av_guess_format, av_interleaved_write_frame, av_strdup, avformat_alloc_context,
avformat_free_context, avio_open2, AVFormatContext, AVPacket, AVIO_FLAG_READ_WRITE,
};
use tokio::sync::mpsc::UnboundedReceiver;
use uuid::Uuid;
use crate::egress::{EgressConfig, get_pkt_variant, map_variants_to_streams, update_pkt_for_muxer};
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::egress::{map_variants_to_streams, EgressConfig};
use crate::pipeline::{AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::VariantStreamType;
pub struct RecorderEgress {
id: Uuid,
@ -90,9 +90,16 @@ impl RecorderEgress {
Ok(())
}
unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
let variant = get_pkt_variant(&self.config.variants, pkt)?;
update_pkt_for_muxer(self.ctx, pkt, &variant);
unsafe fn process_pkt(
&mut self,
pkt: *mut AVPacket,
src: &AVPacketSource,
) -> Result<(), Error> {
let variant = match src {
AVPacketSource::Encoder(v) => v,
_ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))),
};
(*pkt).stream_index = variant.dst_index() as libc::c_int;
let ret = av_interleaved_write_frame(self.ctx, pkt);
if ret < 0 {
@ -107,11 +114,11 @@ impl PipelineProcessor for RecorderEgress {
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv() {
match pkg {
PipelinePayload::AvPacket(_, pkt) => unsafe {
PipelinePayload::AvPacket(pkt, ref src) => unsafe {
if self.ctx.is_null() {
self.setup_muxer()?;
}
self.process_pkt(pkt)?;
self.process_pkt(pkt, src)?;
},
_ => return Err(Error::msg("Payload not supported")),
}

View File

@ -3,18 +3,28 @@ use std::mem::transmute;
use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_realloc, av_audio_fifo_size, av_audio_fifo_write, av_buffer_ref, av_buffer_unref, av_channel_layout_copy, av_frame_alloc, av_frame_clone, av_frame_free, av_frame_get_buffer, av_frame_unref, av_freep, av_get_sample_fmt_name, av_packet_alloc, av_packet_free, av_rescale_q, av_rescale_rnd, av_samples_alloc, av_samples_alloc_array_and_samples, AVAudioFifo, AVBufferRef, AVCodec, avcodec_alloc_context3, avcodec_free_context, avcodec_open2, avcodec_parameters_from_context, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVStream, swr_alloc_set_opts2, swr_config_frame, swr_convert, swr_convert_frame, swr_free, swr_get_delay, swr_init, SwrContext};
use ffmpeg_sys_next::AVRounding::AV_ROUND_UP;
use ffmpeg_sys_next::AVSampleFormat::AV_SAMPLE_FMT_S16;
use ffmpeg_sys_next::{
av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_realloc,
av_audio_fifo_size, av_audio_fifo_write, av_buffer_ref, av_buffer_unref,
av_channel_layout_copy, av_frame_alloc, av_frame_clone, av_frame_free, av_frame_get_buffer,
av_frame_unref, av_freep, av_get_sample_fmt_name, av_packet_alloc, av_packet_free,
av_rescale_q, av_rescale_rnd, av_samples_alloc, av_samples_alloc_array_and_samples,
avcodec_alloc_context3, avcodec_free_context, avcodec_open2, avcodec_parameters_from_context,
avcodec_receive_packet, avcodec_send_frame, swr_alloc_set_opts2, swr_config_frame, swr_convert,
swr_convert_frame, swr_free, swr_get_delay, swr_init, AVAudioFifo, AVBufferRef, AVCodec,
AVCodecContext, AVFrame, AVStream, SwrContext, AVERROR,
};
use libc::EAGAIN;
use log::info;
use tokio::sync::mpsc::UnboundedSender;
use crate::encode::{dump_pkt_info, set_encoded_pkt_timing};
use crate::ipc::Rx;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::{audio_variant_id_ref, get_ffmpeg_error_msg, id_ref_to_uuid};
use crate::variant::{AudioVariant, VariantStreamType};
use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::{AudioVariant, VariantStream, VariantStreamType};
pub struct AudioEncoder<T> {
variant: AudioVariant,
@ -24,8 +34,8 @@ pub struct AudioEncoder<T> {
fifo: *mut AVAudioFifo,
chan_in: T,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
pts: i64,
frame_pts: i64,
}
unsafe impl<T> Send for AudioEncoder<T> {}
@ -38,7 +48,6 @@ impl<T> Drop for AudioEncoder<T> {
swr_free(&mut self.swr_ctx);
av_audio_fifo_free(self.fifo);
avcodec_free_context(&mut self.ctx);
av_buffer_unref(&mut self.var_id_ref);
}
}
}
@ -52,7 +61,6 @@ where
chan_out: UnboundedSender<PipelinePayload>,
variant: AudioVariant,
) -> Self {
let id_ref = audio_variant_id_ref(&variant);
Self {
ctx: ptr::null_mut(),
codec: ptr::null(),
@ -61,8 +69,8 @@ where
variant,
chan_in,
chan_out,
var_id_ref: id_ref,
pts: 0,
frame_pts: 0,
}
}
@ -86,11 +94,13 @@ where
|| (*ctx).ch_layout.nb_channels != (*frame).ch_layout.nb_channels
{
info!(
"Setup audio resampler: {}@{}->{}@{}",
"Setup audio resampler: {}.{}@{} -> {}.{}@{}",
(*frame).ch_layout.nb_channels,
CStr::from_ptr(av_get_sample_fmt_name(transmute((*frame).format)))
.to_str()
.unwrap(),
(*frame).sample_rate,
(*ctx).ch_layout.nb_channels,
CStr::from_ptr(av_get_sample_fmt_name((*ctx).sample_fmt))
.to_str()
.unwrap(),
@ -133,17 +143,6 @@ where
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
// copy start time
let in_stream = (*frame).opaque as *mut AVStream;
if (*in_stream).start_time > 0 {
self.pts = av_rescale_q(
(*in_stream).start_time,
(*in_stream).time_base,
(*ctx).time_base,
);
info!("Set start pts to {}", self.pts);
}
// copy channel layout from codec
let mut px = (*encoder).ch_layouts;
while !px.is_null() {
@ -153,6 +152,12 @@ where
}
px = px.add(1);
}
// let downstream steps know about the encoder
self.chan_out.send(PipelinePayload::EncoderInfo(
VariantStream::Audio(self.variant.clone()),
ctx,
))?;
self.ctx = ctx;
self.codec = encoder;
}
@ -205,7 +210,6 @@ where
Ok(None)
} else {
let out_frame = self.read_fifo_frame()?;
(*out_frame).opaque = (*frame).opaque;
Ok(Some(out_frame))
};
}
@ -250,15 +254,18 @@ where
av_channel_layout_copy(&mut (*out_frame).ch_layout, &(*self.ctx).ch_layout);
(*out_frame).format = (*self.ctx).sample_fmt as libc::c_int;
(*out_frame).sample_rate = (*self.ctx).sample_rate;
(*out_frame).time_base = (*self.ctx).time_base;
out_frame
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
let var_id = id_ref_to_uuid((*frame).opaque_ref)?;
assert_eq!(var_id, self.variant.id);
let in_stream = (*frame).opaque as *mut AVStream;
unsafe fn process_frame(
&mut self,
frame: *mut AVFrame,
src: &AVFrameSource,
) -> Result<(), Error> {
let in_stream = match src {
AVFrameSource::Decoder(s) => *s,
AVFrameSource::Scaler(s) => *s,
};
self.setup_encoder(frame)?;
let mut frame = self.process_audio_frame(frame)?;
@ -267,6 +274,9 @@ where
}
let mut frame = frame.unwrap();
(*frame).pts = self.frame_pts;
self.frame_pts += (*frame).nb_samples as i64;
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) {
av_frame_free(&mut frame);
@ -285,11 +295,9 @@ where
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
set_encoded_pkt_timing(self.ctx, pkt, in_stream, &mut self.pts, &self.variant);
(*pkt).opaque = self.ctx as *mut libc::c_void;
(*pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out.send(PipelinePayload::AvPacket(
"Audio Encoder packet".to_owned(),
pkt,
AVPacketSource::Encoder(VariantStream::Audio(self.variant.clone())),
))?;
}
@ -305,9 +313,15 @@ where
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(_, frm, idx) => unsafe {
if self.variant.src_index == idx {
self.process_frame(frm)?;
PipelinePayload::AvFrame(frm, ref src) => unsafe {
let idx = match src {
AVFrameSource::Decoder(s) => (**s).index,
_ => {
return Err(Error::msg(format!("Cannot process frame from: {:?}", src)))
}
};
if self.variant.src_index == idx as usize {
self.process_frame(frm, src)?;
}
},
_ => return Err(Error::msg("Payload not supported")),

View File

@ -4,10 +4,9 @@ use ffmpeg_sys_next::{
AV_LOG_INFO, AV_NOPTS_VALUE, av_packet_rescale_ts, av_pkt_dump_log2, AV_PKT_FLAG_KEY, av_q2d,
av_rescale_q, AVCodecContext, AVFrame, AVPacket, AVRational, AVStream,
};
use ffmpeg_sys_next::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use log::info;
use crate::utils::id_ref_to_uuid;
use crate::variant::VariantStreamType;
pub mod audio;
@ -31,13 +30,11 @@ pub unsafe fn set_encoded_pkt_timing<TVar>(
let tb_sec = tb.den as i64 / tb.num as i64;
let fps = (*ctx).framerate.num as i64 * (*ctx).framerate.den as i64;
tb_sec / if fps == 0 { 1 } else { fps }
} else if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO {
av_rescale_q((*pkt).duration, (*in_stream).time_base, (*ctx).time_base)
} else {
(*pkt).duration
av_rescale_q((*pkt).duration, (*in_stream).time_base, tb)
};
if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO {
if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO || (*ctx).codec_type == AVMEDIA_TYPE_AUDIO {
(*pkt).duration = duration;
}
@ -45,11 +42,11 @@ pub unsafe fn set_encoded_pkt_timing<TVar>(
(*pkt).pts = *pts;
*pts += duration;
} else {
(*pkt).pts = av_rescale_q((*pkt).pts, (*in_stream).time_base, (*ctx).time_base);
(*pkt).pts = av_rescale_q((*pkt).pts, (*in_stream).time_base, tb);
*pts = (*pkt).pts;
}
if (*pkt).dts != AV_NOPTS_VALUE {
(*pkt).dts = av_rescale_q((*pkt).dts, (*in_stream).time_base, (*ctx).time_base);
(*pkt).dts = av_rescale_q((*pkt).dts, (*in_stream).time_base, tb);
} else {
(*pkt).dts = (*pkt).pts;
}
@ -57,14 +54,8 @@ pub unsafe fn set_encoded_pkt_timing<TVar>(
pub unsafe fn dump_pkt_info(pkt: *const AVPacket) {
let tb = (*pkt).time_base;
let id = id_ref_to_uuid((*pkt).opaque_ref);
info!(
"stream {}@{}: keyframe={}, duration={}, dts={}, pts={}, size={}, tb={}/{}",
if let Ok(id) = id {
format!("{}", id)
} else {
"Unknown".to_owned()
},
"stream {}: keyframe={}, duration={}, dts={}, pts={}, size={}, tb={}/{}",
(*pkt).stream_index,
((*pkt).flags & AV_PKT_FLAG_KEY) != 0,
(*pkt).duration,

View File

@ -3,18 +3,18 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_buffer_ref, av_packet_alloc, av_packet_free, AVBufferRef,
AVCodec, avcodec_alloc_context3, avcodec_find_encoder, avcodec_open2, avcodec_receive_packet,
avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVStream,
av_packet_alloc, av_packet_free, AVCodec, avcodec_alloc_context3, avcodec_find_encoder,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR,
AVFrame,
};
use libc::EAGAIN;
use tokio::sync::mpsc::UnboundedSender;
use crate::encode::set_encoded_pkt_timing;
use crate::ipc::Rx;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid, video_variant_id_ref};
use crate::variant::{VariantStreamType, VideoVariant};
use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::{VariantStream, VariantStreamType, VideoVariant};
pub struct VideoEncoder<T> {
variant: VideoVariant,
@ -22,7 +22,6 @@ pub struct VideoEncoder<T> {
codec: *const AVCodec,
chan_in: T,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
pts: i64,
}
@ -39,14 +38,12 @@ where
chan_out: UnboundedSender<PipelinePayload>,
variant: VideoVariant,
) -> Self {
let id_ref = video_variant_id_ref(&variant);
Self {
ctx: ptr::null_mut(),
codec: ptr::null(),
variant,
chan_in,
chan_out,
var_id_ref: id_ref,
pts: 0,
}
}
@ -71,17 +68,29 @@ where
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
// let downstream steps know about the encoder
self.chan_out.send(PipelinePayload::EncoderInfo(
VariantStream::Video(self.variant.clone()),
ctx,
))?;
self.ctx = ctx;
self.codec = encoder;
}
Ok(())
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
let var_id = id_ref_to_uuid((*frame).opaque_ref)?;
assert_eq!(var_id, self.variant.id);
unsafe fn process_frame(
&mut self,
frame: *mut AVFrame,
src: &AVFrameSource,
) -> Result<(), Error> {
let in_stream = match src {
AVFrameSource::Decoder(s) => *s,
AVFrameSource::Scaler(s) => *s,
};
self.setup_encoder(frame)?;
let in_stream = (*frame).opaque as *mut AVStream;
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) {
@ -100,12 +109,10 @@ where
}
set_encoded_pkt_timing(self.ctx, pkt, in_stream, &mut self.pts, &self.variant);
(*pkt).opaque = self.ctx as *mut libc::c_void;
(*pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
assert_ne!((*pkt).data, ptr::null_mut());
self.chan_out.send(PipelinePayload::AvPacket(
"Video Encoder packet".to_owned(),
pkt,
AVPacketSource::Encoder(VariantStream::Video(self.variant.clone())),
))?;
}
@ -120,9 +127,15 @@ where
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(_, frm, idx) => unsafe {
if self.variant.src_index == idx {
self.process_frame(frm)?;
PipelinePayload::AvFrame(frm, ref src) => unsafe {
let idx = match src {
AVFrameSource::Decoder(s) => (**s).index,
_ => {
return Err(Error::msg(format!("Cannot process frame from: {:?}", src)))
}
};
if self.variant.src_index == idx as usize {
self.process_frame(frm, src)?;
}
},
_ => return Err(Error::msg("Payload not supported")),

View File

@ -43,7 +43,7 @@ pub async fn listen(path: PathBuf, builder: PipelineBuilder) -> Result<(), anyho
if let Ok(mut pl) = builder.build_for(info, rx).await {
std::thread::spawn(move || loop {
if let Err(e) = pl.run() {
warn!("Pipeline error: {}", e.backtrace());
error!("Pipeline error: {}\n{}", e, e.backtrace());
break;
}
});
@ -53,7 +53,10 @@ pub async fn listen(path: PathBuf, builder: PipelineBuilder) -> Result<(), anyho
loop {
if let Ok(r) = stream.read(&mut buf).await {
if r > 0 {
tx.send(bytes::Bytes::copy_from_slice(&buf[..r])).unwrap();
if let Err(e) = tx.send(bytes::Bytes::copy_from_slice(&buf[..r])) {
error!("Failed to send file: {}", e);
break;
}
} else {
break;
}

View File

@ -1,5 +1,5 @@
use futures_util::{StreamExt, TryStreamExt};
use log::{info, warn};
use log::{error, info, warn};
use srt_tokio::SrtListener;
use tokio::sync::mpsc::unbounded_channel;
@ -21,7 +21,7 @@ pub async fn listen(addr: String, builder: PipelineBuilder) -> Result<(), anyhow
if let Ok(mut pipeline) = builder.build_for(info, rx).await {
std::thread::spawn(move || loop {
if let Err(e) = pipeline.run() {
warn!("Pipeline error: {}\n{}", e, e.backtrace());
error!("Pipeline error: {}\n{}", e, e.backtrace());
break;
}
});

View File

@ -26,7 +26,7 @@ pub async fn listen(addr: String, builder: PipelineBuilder) -> Result<(), anyhow
if let Ok(mut pl) = builder.build_for(info, recv).await {
std::thread::spawn(move || loop {
if let Err(e) = pl.run() {
warn!("Pipeline error: {}", e.backtrace());
error!("Pipeline error: {}\n{}", e, e.backtrace());
break;
}
});

View File

@ -45,7 +45,7 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
if let Ok(mut pl) = builder.build_for(info, rx).await {
std::thread::spawn(move || loop {
if let Err(e) = pl.run() {
warn!("Pipeline error: {}", e.backtrace());
error!("Pipeline error: {}\n{}", e, e.backtrace());
break;
}
});

View File

@ -1,7 +1,10 @@
use std::fmt::{Display, Formatter};
use anyhow::Error;
use ffmpeg_sys_next::{av_frame_clone, av_frame_copy_props, av_frame_free, av_packet_clone, av_packet_copy_props, av_packet_free, AVFrame, AVPacket};
use ffmpeg_sys_next::{
av_frame_clone, av_frame_copy_props, av_frame_free, av_packet_clone, av_packet_copy_props,
av_packet_free, AVCodecContext, AVFrame, AVPacket, AVStream,
};
use serde::{Deserialize, Serialize};
use crate::demux::info::DemuxStreamInfo;
@ -29,7 +32,7 @@ impl Display for EgressType {
EgressType::HLS(c) => format!("{}", c),
EgressType::DASH => "DASH".to_owned(),
EgressType::WHEP => "WHEP".to_owned(),
EgressType::MPEGTS(c) => format!("{}", c),
EgressType::MPEGTS(c) => format!("{}", c),
EgressType::Recorder(c) => format!("{}", c),
}
)
@ -62,6 +65,24 @@ impl Display for PipelineConfig {
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum AVPacketSource {
/// AVPacket from demuxer
Demuxer(*mut AVStream),
/// AVPacket from an encoder
Encoder(VariantStream),
/// AVPacket from muxer
Muxer(VariantStream),
}
#[derive(Debug, PartialEq, Clone)]
pub enum AVFrameSource {
/// ACPacket from decoder source stream
Decoder(*mut AVStream),
/// AVPacket from frame scaler step
Scaler(*mut AVStream),
}
#[derive(Debug, PartialEq)]
pub enum PipelinePayload {
/// No output
@ -69,11 +90,13 @@ pub enum PipelinePayload {
/// Raw bytes from ingress
Bytes(bytes::Bytes),
/// FFMpeg AVPacket
AvPacket(String, *mut AVPacket),
AvPacket(*mut AVPacket, AVPacketSource),
/// FFMpeg AVFrame
AvFrame(String, *mut AVFrame, usize),
AvFrame(*mut AVFrame, AVFrameSource),
/// Information about the input stream
SourceInfo(DemuxStreamInfo),
/// Information about an encoder in this pipeline
EncoderInfo(VariantStream, *const AVCodecContext),
}
unsafe impl Send for PipelinePayload {}
@ -85,19 +108,20 @@ impl Clone for PipelinePayload {
match self {
PipelinePayload::Empty => PipelinePayload::Empty,
PipelinePayload::Bytes(b) => PipelinePayload::Bytes(b.clone()),
PipelinePayload::AvPacket(t, p) => unsafe {
PipelinePayload::AvPacket(p, v) => unsafe {
assert!(!(**p).data.is_null(), "Cannot clone empty packet");
let new_pkt = av_packet_clone(*p);
av_packet_copy_props(new_pkt, *p);
PipelinePayload::AvPacket(t.clone(), new_pkt)
PipelinePayload::AvPacket(new_pkt, v.clone())
},
PipelinePayload::AvFrame(t, p, idx) => unsafe {
PipelinePayload::AvFrame(p, v) => unsafe {
assert!(!(**p).extended_data.is_null(), "Cannot clone empty frame");
let new_frame = av_frame_clone(*p);
av_frame_copy_props(new_frame, *p);
PipelinePayload::AvFrame(t.clone(), new_frame, *idx)
PipelinePayload::AvFrame(new_frame, v.clone())
},
PipelinePayload::SourceInfo(i) => PipelinePayload::SourceInfo(i.clone()),
PipelinePayload::EncoderInfo(v, s) => PipelinePayload::EncoderInfo(v.clone(), *s),
}
}
}
@ -105,19 +129,17 @@ impl Clone for PipelinePayload {
impl Drop for PipelinePayload {
fn drop(&mut self) {
match self {
PipelinePayload::Empty => {}
PipelinePayload::Bytes(_) => {}
PipelinePayload::AvPacket(_, p) => unsafe {
PipelinePayload::AvPacket(p, _) => unsafe {
av_packet_free(p);
},
PipelinePayload::AvFrame(_, p, _) => unsafe {
PipelinePayload::AvFrame(p, _) => unsafe {
av_frame_free(p);
},
PipelinePayload::SourceInfo(_) => {}
_ => {}
}
}
}
pub trait PipelineProcessor {
fn process(&mut self) -> Result<(), Error>;
}
}

View File

@ -1,3 +1,4 @@
use crate::tag_frame::TagFrame;
use std::ops::Add;
use std::time::{Duration, Instant};
@ -11,13 +12,11 @@ use crate::demux::Demuxer;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
use crate::egress::EgressConfig;
use crate::egress::hls::HlsEgress;
use crate::egress::mpegts::MPEGTSEgress;
use crate::egress::recorder::RecorderEgress;
use crate::encode::audio::AudioEncoder;
use crate::encode::video::VideoEncoder;
use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineProcessor};
use crate::scale::Scaler;
use crate::tag_frame::TagFrame;
use crate::variant::VariantStream;
use crate::webhook::Webhook;
@ -127,17 +126,6 @@ impl PipelineRunner {
self.encoders.push(x);
}
}
EgressType::MPEGTS(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress.push(Box::new(MPEGTSEgress::new(
egress_rx,
self.config.id,
cfg.clone(),
)));
for x in self.add_egress_variants(cfg, egress_tx) {
self.encoders.push(x);
}
}
EgressType::Recorder(cfg) => {
let (egress_tx, egress_rx) = unbounded_channel();
self.egress.push(Box::new(RecorderEgress::new(

View File

@ -3,14 +3,14 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_buffer_ref, av_frame_alloc, av_frame_copy_props, AVBufferRef, AVFrame,
SWS_BILINEAR, sws_freeContext, sws_getContext, sws_scale_frame, SwsContext,
av_frame_alloc, av_frame_copy_props, AVBufferRef, AVFrame, SWS_BILINEAR,
sws_freeContext, sws_getContext, sws_scale_frame, SwsContext,
};
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::{get_ffmpeg_error_msg, video_variant_id_ref};
use crate::pipeline::{AVFrameSource, PipelinePayload, PipelineProcessor};
use crate::utils::{get_ffmpeg_error_msg};
use crate::variant::VideoVariant;
pub struct Scaler {
@ -18,7 +18,6 @@ pub struct Scaler {
ctx: *mut SwsContext,
chan_in: broadcast::Receiver<PipelinePayload>,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl Send for Scaler {}
@ -40,17 +39,19 @@ impl Scaler {
chan_out: UnboundedSender<PipelinePayload>,
variant: VideoVariant,
) -> Self {
let id_ref = video_variant_id_ref(&variant);
Self {
chan_in,
chan_out,
variant,
ctx: ptr::null_mut(),
var_id_ref: id_ref,
}
}
unsafe fn process_frame(&mut self, frame: *mut AVFrame, src_index: usize) -> Result<(), Error> {
unsafe fn process_frame(
&mut self,
frame: *mut AVFrame,
src: &AVFrameSource,
) -> Result<(), Error> {
let dst_fmt = transmute((*frame).format);
if self.ctx.is_null() {
@ -83,14 +84,8 @@ impl Scaler {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
(*dst_frame).opaque = (*frame).opaque;
(*dst_frame).opaque_ref = av_buffer_ref(self.var_id_ref);
self.chan_out.send(PipelinePayload::AvFrame(
"Scaler frame".to_owned(),
dst_frame,
src_index,
))?;
self.chan_out
.send(PipelinePayload::AvFrame(dst_frame, src.clone()))?;
Ok(())
}
}
@ -99,9 +94,15 @@ impl PipelineProcessor for Scaler {
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv() {
match pkg {
PipelinePayload::AvFrame(_, frm, idx) => unsafe {
if self.variant.src_index == idx {
self.process_frame(frm, idx)?;
PipelinePayload::AvFrame(frm, ref src) => unsafe {
let idx = match src {
AVFrameSource::Decoder(s) => (**s).index,
_ => {
return Err(Error::msg(format!("Cannot process frame from: {:?}", src)))
}
};
if self.variant.src_index == idx as usize {
self.process_frame(frm, src)?;
}
},
_ => return Err(Error::msg("Payload not supported payload")),

View File

@ -1,17 +1,15 @@
use anyhow::Error;
use ffmpeg_sys_next::{av_buffer_ref, AVBufferRef};
use ffmpeg_sys_next::AVBufferRef;
use tokio::sync::mpsc::UnboundedSender;
use crate::ipc::Rx;
use crate::pipeline::{PipelinePayload, PipelineProcessor};
use crate::utils::variant_id_ref;
use crate::pipeline::{AVFrameSource, PipelinePayload, PipelineProcessor};
use crate::variant::{VariantStream, VariantStreamType};
pub struct TagFrame<TRecv> {
variant: VariantStream,
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
var_id_ref: *mut AVBufferRef,
}
unsafe impl<T> Send for TagFrame<T> {}
@ -27,10 +25,8 @@ where
chan_in: TRecv,
chan_out: UnboundedSender<PipelinePayload>,
) -> Self {
let id_ref = variant_id_ref(&var).unwrap();
Self {
variant: var,
var_id_ref: id_ref,
chan_in,
chan_out,
}
@ -43,11 +39,12 @@ where
{
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
if let PipelinePayload::AvFrame(_, pkt, idx) = &pkg {
if *idx == self.variant.src_index() {
unsafe {
(**pkt).opaque_ref = av_buffer_ref(self.var_id_ref);
}
if let PipelinePayload::AvFrame(_, src) = &pkg {
let idx = match &src {
AVFrameSource::Decoder(s) => unsafe { (**s).index },
_ => return Err(Error::msg(format!("Cannot process frame from: {:?}", src))),
};
if self.variant.src_index() == idx as usize {
self.chan_out.send(pkg)?;
}
}

View File

@ -13,63 +13,4 @@ pub fn get_ffmpeg_error_msg(ret: libc::c_int) -> String {
av_make_error_string(buf.as_mut_ptr(), BUF_SIZE, ret);
String::from(CStr::from_ptr(buf.as_ptr()).to_str().unwrap())
}
}
pub fn variant_id_ref(var: &VariantStream) -> Result<*mut AVBufferRef, Error> {
unsafe {
match var {
VariantStream::Audio(va) => {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
va.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
Ok(buf)
}
VariantStream::Video(vv) => {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
vv.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
Ok(buf)
}
}
}
}
pub fn video_variant_id_ref(var: &VideoVariant) -> *mut AVBufferRef {
unsafe {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
var.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
buf
}
}
pub fn audio_variant_id_ref(var: &AudioVariant) -> *mut AVBufferRef {
unsafe {
let buf = av_buffer_allocz(16);
memcpy(
(*buf).data as *mut libc::c_void,
var.id.as_bytes().as_ptr() as *const libc::c_void,
16,
);
buf
}
}
pub fn id_ref_to_uuid(buf: *mut AVBufferRef) -> Result<Uuid, Error> {
unsafe {
if buf.is_null() {
return Err(Error::msg("Buffer was null"));
}
let binding = Bytes::from(*((*buf).data as *const [u8; 16]));
Ok(*Uuid::from_bytes_ref(&binding))
}
}
}