From 8cca7a174c9a76a7fe7b70d21bdb176eea8fa8d6 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 3 Apr 2024 16:19:31 +0100 Subject: [PATCH] more something --- src/demux/mod.rs | 6 --- src/egress/hls.rs | 71 ++++++++++++++++++------- src/egress/mod.rs | 11 ++-- src/egress/recorder.rs | 117 +++++++++++++++++++++++++++++++++++++++++ src/encode/audio.rs | 69 +++++++++++++++--------- src/encode/mod.rs | 41 +++++++++++---- src/encode/video.rs | 18 ++++--- src/pipeline/mod.rs | 4 +- src/pipeline/runner.rs | 12 +++++ src/tag_frame.rs | 29 +++++----- src/variant.rs | 8 +-- src/webhook.rs | 8 +-- test.sh | 2 +- 13 files changed, 298 insertions(+), 98 deletions(-) create mode 100644 src/egress/recorder.rs diff --git a/src/demux/mod.rs b/src/demux/mod.rs index 20b6862..051746c 100644 --- a/src/demux/mod.rs +++ b/src/demux/mod.rs @@ -151,12 +151,6 @@ impl Demuxer { return Err(Error::msg(msg)); } let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize); - if (*pkt).time_base.num == 0 { - (*pkt).time_base = (*stream).time_base; - } - if (*stream).start_time > 0 && (*pkt).pts != AV_NOPTS_VALUE { - (*pkt).pts -= (*stream).start_time; - } (*pkt).opaque = stream as *mut libc::c_void; let pkg = PipelinePayload::AvPacket("Demuxer packet".to_owned(), pkt); diff --git a/src/egress/hls.rs b/src/egress/hls.rs index 5f27141..80c0ad2 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -1,27 +1,22 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::{Display, Formatter}; use std::mem::transmute; use std::ptr; use anyhow::Error; +use ffmpeg_sys_next::{AV_CH_LAYOUT_STEREO, av_channel_layout_copy, av_dump_format, av_get_sample_fmt, av_interleaved_write_frame, av_opt_set, av_packet_clone, av_packet_copy_props, AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_find_encoder, avcodec_parameters_from_context, avcodec_parameters_to_context, AVCodecContext, avformat_alloc_output_context2, avformat_free_context, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket, AVRational}; use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709; use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO}; use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; -use ffmpeg_sys_next::{ - av_dump_format, av_get_sample_fmt, av_interleaved_write_frame, av_opt_set, - avcodec_find_encoder, avcodec_parameters_from_context, avformat_alloc_output_context2, - avformat_free_context, avformat_new_stream, avformat_write_header, AVChannelLayout, - AVChannelLayout__bindgen_ty_1, AVCodecContext, AVFormatContext, AVPacket, AVRational, - AV_CH_LAYOUT_STEREO, -}; +use futures_util::SinkExt; use itertools::Itertools; use log::info; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::UnboundedReceiver; use uuid::Uuid; -use crate::egress::{map_variants_to_streams, EgressConfig, update_pkt_for_muxer, get_pkt_variant}; +use crate::egress::{EgressConfig, get_pkt_variant, map_variants_to_streams, update_pkt_for_muxer}; use crate::encode::dump_pkt_info; use crate::pipeline::{PipelinePayload, PipelineProcessor}; use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid}; @@ -32,6 +27,9 @@ pub struct HlsEgress { config: EgressConfig, ctx: *mut AVFormatContext, chan_in: UnboundedReceiver, + stream_init: HashSet, + init: bool, + packet_buffer: VecDeque, } unsafe impl Send for HlsEgress {} @@ -58,6 +56,9 @@ impl HlsEgress { config, ctx: ptr::null_mut(), chan_in, + init: false, + stream_init: HashSet::new(), + packet_buffer: VecDeque::new(), } } @@ -75,7 +76,6 @@ impl HlsEgress { if ret < 0 { return Err(Error::msg(get_ffmpeg_error_msg(ret))); } - av_opt_set( (*ctx).priv_data, "hls_segment_filename\0".as_ptr() as *const libc::c_char, @@ -146,27 +146,62 @@ impl HlsEgress { map_variants_to_streams(ctx, &mut self.config.variants)?; - let ret = avformat_write_header(ctx, ptr::null_mut()); - if ret < 0 { - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - self.ctx = ctx; Ok(()) } - unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> { + unsafe fn process_pkt_internal(&mut self, pkt: *mut AVPacket) -> Result<(), Error> { let variant = get_pkt_variant(&self.config.variants, pkt)?; update_pkt_for_muxer(self.ctx, pkt, &variant); - //dump_pkt_info(pkt); let ret = av_interleaved_write_frame(self.ctx, pkt); if ret < 0 { return Err(Error::msg(get_ffmpeg_error_msg(ret))); } - Ok(()) } + + unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> { + let variant = get_pkt_variant(&self.config.variants, pkt)?; + if !self.stream_init.contains(&variant.dst_index()) { + let encoder_ctx = (*pkt).opaque as *mut AVCodecContext; + let out_stream = *(*self.ctx).streams.add(variant.dst_index()); + avcodec_parameters_from_context((*out_stream).codecpar, encoder_ctx); + self.stream_init.insert(variant.dst_index()); + } + if !self.init { + let pkt_clone = av_packet_clone(pkt); + av_packet_copy_props(pkt_clone, pkt); + self.packet_buffer.push_back(PipelinePayload::AvPacket( + "Buffered Muxer Packet".to_string(), + pkt_clone, + )); + } + + if !self.init && self.stream_init.len() == self.config.variants.len() { + let ret = avformat_write_header(self.ctx, ptr::null_mut()); + if ret < 0 { + return Err(Error::msg(get_ffmpeg_error_msg(ret))); + } + + av_dump_format(self.ctx, 0, ptr::null(), 1); + self.init = true; + // push in pkts from buffer + while let Some(pkt) = self.packet_buffer.pop_front() { + match pkt { + PipelinePayload::AvPacket(_, pkt) => { + self.process_pkt_internal(pkt)?; + } + _ => return Err(Error::msg("")), + } + } + return Ok(()); + } else if !self.init { + return Ok(()); + } + + self.process_pkt_internal(pkt) + } } impl PipelineProcessor for HlsEgress { diff --git a/src/egress/mod.rs b/src/egress/mod.rs index 68b046d..626b79f 100644 --- a/src/egress/mod.rs +++ b/src/egress/mod.rs @@ -2,8 +2,7 @@ use std::fmt::{Display, Formatter}; use std::ptr; use anyhow::Error; -use ffmpeg_sys_next::{av_dump_format, avformat_new_stream, AVFormatContext, AVPacket}; -use log::info; +use ffmpeg_sys_next::{av_packet_rescale_ts, avformat_new_stream, AVFormatContext, AVPacket}; use serde::{Deserialize, Serialize}; use crate::utils::id_ref_to_uuid; @@ -12,6 +11,7 @@ use crate::variant::{VariantStream, VariantStreamType}; pub mod hls; pub mod http; pub mod mpegts; +pub mod recorder; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EgressConfig { @@ -61,11 +61,10 @@ pub unsafe fn map_variants_to_streams( } } } - - av_dump_format(ctx, 0, ptr::null(), 1); Ok(()) } +/// Get variant of this packet pub unsafe fn get_pkt_variant( vars: &Vec, pkt: *mut AVPacket, @@ -81,6 +80,7 @@ pub unsafe fn get_pkt_variant( Ok(variant.unwrap()) } +/// Update packet stream index to match muxer stream pub unsafe fn update_pkt_for_muxer( ctx: *mut AVFormatContext, pkt: *mut AVPacket, @@ -91,4 +91,7 @@ pub unsafe fn update_pkt_for_muxer( if idx != (*pkt).stream_index { (*pkt).stream_index = idx; } + // match stream timebase in muxer + av_packet_rescale_ts(pkt, var.time_base(), (*stream).time_base); + (*pkt).time_base = (*stream).time_base; } diff --git a/src/egress/recorder.rs b/src/egress/recorder.rs new file mode 100644 index 0000000..6b79edf --- /dev/null +++ b/src/egress/recorder.rs @@ -0,0 +1,117 @@ +use std::{fs, ptr}; +use std::collections::HashSet; +use std::fmt::Display; + +use anyhow::Error; +use ffmpeg_sys_next::{av_dump_format, av_guess_format, av_interleaved_write_frame, av_strdup, avformat_alloc_context, avformat_alloc_output_context2, avformat_free_context, avformat_write_header, AVFormatContext, AVIO_FLAG_READ_WRITE, avio_flush, avio_open2, AVPacket}; +use tokio::sync::mpsc::UnboundedReceiver; +use uuid::Uuid; + +use crate::egress::{EgressConfig, get_pkt_variant, map_variants_to_streams, update_pkt_for_muxer}; +use crate::pipeline::{PipelinePayload, PipelineProcessor}; +use crate::utils::get_ffmpeg_error_msg; + +pub struct RecorderEgress { + id: Uuid, + config: EgressConfig, + ctx: *mut AVFormatContext, + chan_in: UnboundedReceiver, + stream_init: HashSet, +} + +unsafe impl Send for RecorderEgress {} + +unsafe impl Sync for RecorderEgress {} + +impl Drop for RecorderEgress { + fn drop(&mut self) { + unsafe { + avformat_free_context(self.ctx); + self.ctx = ptr::null_mut(); + } + } +} + +impl RecorderEgress { + pub fn new( + chan_in: UnboundedReceiver, + id: Uuid, + config: EgressConfig, + ) -> Self { + Self { + id, + config, + ctx: ptr::null_mut(), + chan_in, + stream_init: HashSet::new(), + } + } + + unsafe fn setup_muxer(&mut self) -> Result<(), Error> { + let mut ctx = avformat_alloc_context(); + if ctx.is_null() { + return Err(Error::msg("Failed to create muxer context")); + } + let base = format!("{}/{}", self.config.out_dir, self.id); + + let out_file = format!("{}/recording.mkv\0", base).as_ptr() as *const libc::c_char; + fs::create_dir_all(base.clone())?; + let ret = avio_open2( + &mut (*ctx).pb, + out_file, + AVIO_FLAG_READ_WRITE, + ptr::null(), + ptr::null_mut(), + ); + if ret < 0 { + return Err(Error::msg(get_ffmpeg_error_msg(ret))); + } + (*ctx).oformat = av_guess_format( + "matroska\0".as_ptr() as *const libc::c_char, + out_file, + ptr::null(), + ); + if (*ctx).oformat.is_null() { + return Err(Error::msg("Output format not found")); + } + (*ctx).url = av_strdup(out_file); + map_variants_to_streams(ctx, &mut self.config.variants)?; + + let ret = avformat_write_header(ctx, ptr::null_mut()); + if ret < 0 { + return Err(Error::msg(get_ffmpeg_error_msg(ret))); + } + av_dump_format(ctx, 0, ptr::null(), 1); + self.ctx = ctx; + Ok(()) + } + + unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> { + let variant = get_pkt_variant(&self.config.variants, pkt)?; + update_pkt_for_muxer(self.ctx, pkt, &variant); + + let ret = av_interleaved_write_frame(self.ctx, pkt); + if ret < 0 { + return Err(Error::msg(get_ffmpeg_error_msg(ret))); + } + + Ok(()) + } +} + +impl PipelineProcessor for RecorderEgress { + fn process(&mut self) -> Result<(), Error> { + while let Ok(pkg) = self.chan_in.try_recv() { + match pkg { + PipelinePayload::AvPacket(_, pkt) => unsafe { + if self.ctx.is_null() { + self.setup_muxer()?; + } + self.process_pkt(pkt)?; + }, + _ => return Err(Error::msg("Payload not supported")), + } + } + Ok(()) + } +} diff --git a/src/encode/audio.rs b/src/encode/audio.rs index 1c6892c..18f81e7 100644 --- a/src/encode/audio.rs +++ b/src/encode/audio.rs @@ -3,17 +3,9 @@ use std::mem::transmute; use std::ptr; use anyhow::Error; -use ffmpeg_sys_next::{ - av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_realloc, - av_audio_fifo_size, av_audio_fifo_write, av_buffer_ref, av_buffer_unref, - av_channel_layout_copy, av_frame_alloc, av_frame_clone, av_frame_free, av_frame_get_buffer, - av_frame_unref, av_freep, av_get_sample_fmt_name, av_packet_alloc, av_packet_free, - av_rescale_rnd, av_samples_alloc, av_samples_alloc_array_and_samples, AVAudioFifo, - AVBufferRef, AVCodec, avcodec_alloc_context3, avcodec_free_context, - avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, - swr_alloc_set_opts2, swr_convert, swr_convert_frame, swr_free, swr_get_delay, swr_init, SwrContext, -}; +use ffmpeg_sys_next::{av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_realloc, av_audio_fifo_size, av_audio_fifo_write, av_buffer_ref, av_buffer_unref, av_channel_layout_copy, av_frame_alloc, av_frame_clone, av_frame_free, av_frame_get_buffer, av_frame_unref, av_freep, av_get_sample_fmt_name, av_packet_alloc, av_packet_free, av_rescale_q, av_rescale_rnd, av_samples_alloc, av_samples_alloc_array_and_samples, AVAudioFifo, AVBufferRef, AVCodec, avcodec_alloc_context3, avcodec_free_context, avcodec_open2, avcodec_parameters_from_context, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVStream, swr_alloc_set_opts2, swr_config_frame, swr_convert, swr_convert_frame, swr_free, swr_get_delay, swr_init, SwrContext}; use ffmpeg_sys_next::AVRounding::AV_ROUND_UP; +use ffmpeg_sys_next::AVSampleFormat::AV_SAMPLE_FMT_S16; use libc::EAGAIN; use log::info; use tokio::sync::mpsc::UnboundedSender; @@ -141,6 +133,26 @@ where return Err(Error::msg(get_ffmpeg_error_msg(ret))); } + // copy start time + let in_stream = (*frame).opaque as *mut AVStream; + if (*in_stream).start_time > 0 { + self.pts = av_rescale_q( + (*in_stream).start_time, + (*in_stream).time_base, + (*ctx).time_base, + ); + info!("Set start pts to {}", self.pts); + } + + // copy channel layout from codec + let mut px = (*encoder).ch_layouts; + while !px.is_null() { + if (*px).nb_channels as u16 == self.variant.channels { + av_channel_layout_copy(&mut (*ctx).ch_layout, px); + break; + } + px = px.add(1); + } self.ctx = ctx; self.codec = encoder; } @@ -158,23 +170,15 @@ where return Ok(Some(frame)); } - let in_samples = (*frame).nb_samples; - let out_samples = av_rescale_rnd( - swr_get_delay(self.swr_ctx, (*frame).sample_rate as i64) + in_samples as i64, - (*self.ctx).sample_rate as i64, - (*frame).sample_rate as i64, - AV_ROUND_UP, - ) as libc::c_int; - let mut out_frame = self.new_frame(); - (*out_frame).nb_samples = out_samples; - let ret = swr_convert_frame(self.swr_ctx, out_frame, frame); if ret < 0 { av_frame_free(&mut out_frame); return Err(Error::msg(get_ffmpeg_error_msg(ret))); } + // skip fifo + return Ok(Some(out_frame)); let ret = av_audio_fifo_write( self.fifo, (*out_frame).extended_data as *const *mut libc::c_void, @@ -184,7 +188,16 @@ where av_frame_free(&mut out_frame); return Err(Error::msg(get_ffmpeg_error_msg(ret))); } + if ret != (*out_frame).nb_samples { + av_frame_free(&mut out_frame); + return Err(Error::msg(format!( + "FIFO write {} != {}", + ret, + (*out_frame).nb_samples + ))); + } + //info!("Resampled {}->{} (wrote={})", in_samples, (*out_frame).nb_samples, ret); av_frame_free(&mut out_frame); let buff = av_audio_fifo_size(self.fifo); @@ -223,6 +236,11 @@ where return Err(Error::msg(get_ffmpeg_error_msg(ret))); } + assert_eq!( + ret, + (*out_frame).nb_samples, + "Read wrong number of samples from FIFO" + ); Ok(out_frame) } @@ -240,6 +258,8 @@ where let var_id = id_ref_to_uuid((*frame).opaque_ref)?; assert_eq!(var_id, self.variant.id); + let in_stream = (*frame).opaque as *mut AVStream; + self.setup_encoder(frame)?; let mut frame = self.process_audio_frame(frame)?; if frame.is_none() { @@ -264,12 +284,13 @@ where } return Err(Error::msg(get_ffmpeg_error_msg(ret))); } - - set_encoded_pkt_timing(self.ctx, pkt, &mut self.pts, &self.variant); + set_encoded_pkt_timing(self.ctx, pkt, in_stream, &mut self.pts, &self.variant); (*pkt).opaque = self.ctx as *mut libc::c_void; (*pkt).opaque_ref = av_buffer_ref(self.var_id_ref); - self.chan_out - .send(PipelinePayload::AvPacket("Audio Encoder packet".to_owned(), pkt))?; + self.chan_out.send(PipelinePayload::AvPacket( + "Audio Encoder packet".to_owned(), + pkt, + ))?; } av_frame_free(&mut frame); diff --git a/src/encode/mod.rs b/src/encode/mod.rs index 80775eb..81b591b 100644 --- a/src/encode/mod.rs +++ b/src/encode/mod.rs @@ -1,12 +1,13 @@ use std::ptr; use ffmpeg_sys_next::{ - AV_LOG_INFO, AV_NOPTS_VALUE, av_packet_rescale_ts, av_pkt_dump_log2, AV_PKT_FLAG_KEY, av_q2d, AVCodecContext, - AVPacket, AVRational, AVStream, + AV_LOG_INFO, AV_NOPTS_VALUE, av_packet_rescale_ts, av_pkt_dump_log2, AV_PKT_FLAG_KEY, av_q2d, + av_rescale_q, AVCodecContext, AVFrame, AVPacket, AVRational, AVStream, }; use ffmpeg_sys_next::AVMediaType::AVMEDIA_TYPE_VIDEO; use log::info; +use crate::utils::id_ref_to_uuid; use crate::variant::VariantStreamType; pub mod audio; @@ -16,37 +17,57 @@ pub mod video; pub unsafe fn set_encoded_pkt_timing( ctx: *mut AVCodecContext, pkt: *mut AVPacket, + in_stream: *mut AVStream, pts: &mut i64, var: &TVar, ) where TVar: VariantStreamType, { let tb = (*ctx).time_base; + (*pkt).stream_index = var.dst_index() as libc::c_int; (*pkt).time_base = var.time_base(); - if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO && (*pkt).duration == 0 { + let duration = if (*pkt).duration == 0 { let tb_sec = tb.den as i64 / tb.num as i64; let fps = (*ctx).framerate.num as i64 * (*ctx).framerate.den as i64; - (*pkt).duration = tb_sec / fps; + tb_sec / if fps == 0 { 1 } else { fps } + } else if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO { + av_rescale_q((*pkt).duration, (*in_stream).time_base, (*ctx).time_base) + } else { + (*pkt).duration + }; + + if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO { + (*pkt).duration = duration; } + if (*pkt).pts == AV_NOPTS_VALUE { (*pkt).pts = *pts; - *pts += (*pkt).duration; + *pts += duration; } else { + (*pkt).pts = av_rescale_q((*pkt).pts, (*in_stream).time_base, (*ctx).time_base); *pts = (*pkt).pts; } - if (*pkt).dts == AV_NOPTS_VALUE { + if (*pkt).dts != AV_NOPTS_VALUE { + (*pkt).dts = av_rescale_q((*pkt).dts, (*in_stream).time_base, (*ctx).time_base); + } else { (*pkt).dts = (*pkt).pts; } } pub unsafe fn dump_pkt_info(pkt: *const AVPacket) { let tb = (*pkt).time_base; + let id = id_ref_to_uuid((*pkt).opaque_ref); info!( - "stream #{}: keyframe={}, duration={:.3}, dts={}, pts={}, size={}", + "stream {}@{}: keyframe={}, duration={}, dts={}, pts={}, size={}, tb={}/{}", + if let Ok(id) = id { + format!("{}", id) + } else { + "Unknown".to_owned() + }, (*pkt).stream_index, ((*pkt).flags & AV_PKT_FLAG_KEY) != 0, - (*pkt).duration as f64 * av_q2d(tb), + (*pkt).duration, if (*pkt).dts == AV_NOPTS_VALUE { "N/A".to_owned() } else { @@ -57,6 +78,8 @@ pub unsafe fn dump_pkt_info(pkt: *const AVPacket) { } else { format!("{}", (*pkt).pts) }, - (*pkt).size + (*pkt).size, + tb.num, + tb.den ); } diff --git a/src/encode/video.rs b/src/encode/video.rs index d29ad36..2fa0bce 100644 --- a/src/encode/video.rs +++ b/src/encode/video.rs @@ -3,14 +3,14 @@ use std::ptr; use anyhow::Error; use ffmpeg_sys_next::{ - av_buffer_ref, av_packet_alloc, av_packet_free, av_packet_rescale_ts, avcodec_alloc_context3, - avcodec_find_encoder, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVBufferRef, - AVCodec, AVCodecContext, AVFrame, AVStream, AVERROR, + av_buffer_ref, av_packet_alloc, av_packet_free, AVBufferRef, + AVCodec, avcodec_alloc_context3, avcodec_find_encoder, avcodec_open2, avcodec_receive_packet, + avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVStream, }; use libc::EAGAIN; use tokio::sync::mpsc::UnboundedSender; -use crate::encode::set_encoded_pkt_timing; +use crate::encode::set_encoded_pkt_timing; use crate::ipc::Rx; use crate::pipeline::{PipelinePayload, PipelineProcessor}; use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid, video_variant_id_ref}; @@ -80,8 +80,8 @@ where unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> { let var_id = id_ref_to_uuid((*frame).opaque_ref)?; assert_eq!(var_id, self.variant.id); - self.setup_encoder(frame)?; + let in_stream = (*frame).opaque as *mut AVStream; let mut ret = avcodec_send_frame(self.ctx, frame); if ret < 0 && ret != AVERROR(EAGAIN) { @@ -99,12 +99,14 @@ where return Err(Error::msg(get_ffmpeg_error_msg(ret))); } - set_encoded_pkt_timing(self.ctx, pkt, &mut self.pts, &self.variant); + set_encoded_pkt_timing(self.ctx, pkt, in_stream, &mut self.pts, &self.variant); (*pkt).opaque = self.ctx as *mut libc::c_void; (*pkt).opaque_ref = av_buffer_ref(self.var_id_ref); assert_ne!((*pkt).data, ptr::null_mut()); - self.chan_out - .send(PipelinePayload::AvPacket("Video Encoder packet".to_owned(), pkt))?; + self.chan_out.send(PipelinePayload::AvPacket( + "Video Encoder packet".to_owned(), + pkt, + ))?; } Ok(()) diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 2a9b4e0..f578e72 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -17,6 +17,7 @@ pub enum EgressType { DASH, WHEP, MPEGTS(EgressConfig), + Recorder(EgressConfig), } impl Display for EgressType { @@ -28,7 +29,8 @@ impl Display for EgressType { EgressType::HLS(c) => format!("{}", c), EgressType::DASH => "DASH".to_owned(), EgressType::WHEP => "WHEP".to_owned(), - EgressType::MPEGTS(c) => format!("{}", c), + EgressType::MPEGTS(c) => format!("{}", c), + EgressType::Recorder(c) => format!("{}", c), } ) } diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 4481230..1c8015f 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -12,6 +12,7 @@ use crate::demux::info::{DemuxStreamInfo, StreamChannelType}; use crate::egress::EgressConfig; use crate::egress::hls::HlsEgress; use crate::egress::mpegts::MPEGTSEgress; +use crate::egress::recorder::RecorderEgress; use crate::encode::audio::AudioEncoder; use crate::encode::video::VideoEncoder; use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineProcessor}; @@ -137,6 +138,17 @@ impl PipelineRunner { self.encoders.push(x); } } + EgressType::Recorder(cfg) => { + let (egress_tx, egress_rx) = unbounded_channel(); + self.egress.push(Box::new(RecorderEgress::new( + egress_rx, + self.config.id, + cfg.clone(), + ))); + for x in self.add_egress_variants(cfg, egress_tx) { + self.encoders.push(x); + } + } _ => return Err(Error::msg("Egress config not supported")), } } diff --git a/src/tag_frame.rs b/src/tag_frame.rs index f7030dd..2cea481 100644 --- a/src/tag_frame.rs +++ b/src/tag_frame.rs @@ -1,5 +1,5 @@ use anyhow::Error; -use ffmpeg_sys_next::{av_buffer_ref, av_frame_clone, av_frame_copy_props, AVBufferRef}; +use ffmpeg_sys_next::{av_buffer_ref, AVBufferRef}; use tokio::sync::mpsc::UnboundedSender; use crate::ipc::Rx; @@ -19,8 +19,8 @@ unsafe impl Send for TagFrame {} unsafe impl Sync for TagFrame {} impl TagFrame - where - TRecv: Rx, +where + TRecv: Rx, { pub fn new( var: VariantStream, @@ -38,24 +38,19 @@ impl TagFrame } impl PipelineProcessor for TagFrame - where - TRecv: Rx, +where + TRecv: Rx, { fn process(&mut self) -> Result<(), Error> { while let Ok(pkg) = self.chan_in.try_recv_next() { - match pkg { - PipelinePayload::AvFrame(ref tag, frm, idx) => unsafe { - if idx == self.variant.src_index() { - let new_frame = av_frame_clone(frm); - av_frame_copy_props(new_frame, frm); - (*new_frame).opaque = (*frm).opaque; - (*new_frame).opaque_ref = av_buffer_ref(self.var_id_ref); - self.chan_out - .send(PipelinePayload::AvFrame(tag.clone(), new_frame, idx))?; + if let PipelinePayload::AvFrame(_, pkt, idx) = &pkg { + if *idx == self.variant.src_index() { + unsafe { + (**pkt).opaque_ref = av_buffer_ref(self.var_id_ref); } - }, - _ => return Err(Error::msg("Payload not supported")), - }; + self.chan_out.send(pkg)?; + } + } } Ok(()) } diff --git a/src/variant.rs b/src/variant.rs index 58b8ef5..108e37f 100644 --- a/src/variant.rs +++ b/src/variant.rs @@ -313,7 +313,7 @@ impl VariantStreamType for AudioVariant { fn time_base(&self) -> AVRational { AVRational { num: 1, - den: self.sample_rate as libc::c_int, + den: 90_000, } } @@ -347,15 +347,11 @@ impl VariantStreamType for AudioVariant { (*params).bit_rate = self.bitrate as i64; (*params).sample_rate = self.sample_rate as libc::c_int; (*params).ch_layout = self.channel_layout(); + (*params).frame_size = 1024; } unsafe fn to_stream(&self, stream: *mut AVStream) { (*stream).time_base = self.time_base(); - (*stream).r_frame_rate = AVRational { - num: (*stream).time_base.den, - den: (*stream).time_base.num, - }; - self.to_codec_params((*stream).codecpar); } } diff --git a/src/webhook.rs b/src/webhook.rs index e4a3285..63eb26b 100644 --- a/src/webhook.rs +++ b/src/webhook.rs @@ -68,7 +68,7 @@ impl Webhook { bitrate: 320_000, codec: 86018, channels: 2, - sample_rate: 44_100, + sample_rate: 48_000, sample_fmt: "s16".to_owned(), })); vars.push(VariantStream::Audio(AudioVariant { @@ -78,7 +78,7 @@ impl Webhook { bitrate: 220_000, codec: 86018, channels: 2, - sample_rate: 44_100, + sample_rate: 48_000, sample_fmt: "s16".to_owned(), })); } @@ -87,8 +87,8 @@ impl Webhook { id: Uuid::new_v4(), recording: vec![], egress: vec![ - EgressType::HLS(EgressConfig { - name: "HLS".to_owned(), + EgressType::Recorder(EgressConfig { + name: "Recorder".to_owned(), out_dir: self.config.output_dir.clone(), variants: vars.clone(), }), diff --git a/test.sh b/test.sh index eb65085..0aa6a7f 100755 --- a/test.sh +++ b/test.sh @@ -3,4 +3,4 @@ ffmpeg \ -f lavfi -i "sine=frequency=1000:sample_rate=48000" \ -re -f lavfi -i testsrc -g 300 -r 60 -pix_fmt yuv420p -s 1280x720 \ - -c:v h264 -b:v 2000k -c:a aac -ac 2 -b:a 192k -fflags +genpts -f mpegts srt://localhost:3333 + -c:v h264 -b:v 2000k -c:a aac -ac 2 -b:a 192k -fflags nobuffer -f mpegts srt://localhost:3333