Recorder fixes

This commit is contained in:
kieran 2024-08-30 13:10:49 +01:00
parent dc7e623663
commit c3e3e17c50
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
18 changed files with 303 additions and 195 deletions

63
Cargo.lock generated
View File

@ -34,6 +34,18 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "ahash"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.2"
@ -43,6 +55,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
[[package]]
name = "anyhow"
version = "1.0.80"
@ -487,7 +505,17 @@ dependencies = [
"memmap2",
"slotmap",
"tinyvec",
"ttf-parser",
"ttf-parser 0.24.1",
]
[[package]]
name = "fontdue"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efe23d02309319171d00d794c9ff48d4f903c0e481375b1b04b017470838af04"
dependencies = [
"hashbrown 0.14.3",
"ttf-parser 0.21.1",
]
[[package]]
@ -649,6 +677,10 @@ name = "hashbrown"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
dependencies = [
"ahash",
"allocator-api2",
]
[[package]]
name = "headers"
@ -1374,7 +1406,7 @@ dependencies = [
"core_maths",
"log",
"smallvec",
"ttf-parser",
"ttf-parser 0.24.1",
"unicode-bidi-mirroring",
"unicode-ccc",
"unicode-properties",
@ -1604,6 +1636,7 @@ dependencies = [
"bytes",
"config",
"ffmpeg-sys-next",
"fontdue",
"futures-util",
"itertools",
"libc",
@ -1896,6 +1929,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "ttf-parser"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c591d83f69777866b9126b24c6dd9a18351f177e49d625920d19f989fd31cf8"
[[package]]
name = "ttf-parser"
version = "0.24.1"
@ -2307,6 +2346,26 @@ dependencies = [
"linked-hash-map",
]
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "zune-core"
version = "0.4.12"

View File

@ -26,3 +26,4 @@ rand = "0.8.5"
resvg = "0.43.0"
usvg = "0.43.0"
tiny-skia = "0.11.4"
fontdue = "0.9.2"

BIN
SourceCodePro-Regular.ttf Normal file

Binary file not shown.

View File

@ -107,8 +107,9 @@ impl Decoder {
}
return Err(Error::msg(format!("Failed to decode {}", ret)));
}
// reset picture type, not to confuse the encoder
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
(*frame).pts = (*frame).best_effort_timestamp;
(*frame).pict_type = AV_PICTURE_TYPE_NONE; // encoder prints warnings
self.chan_out.send(PipelinePayload::AvFrame(
frame,
AVFrameSource::Decoder(stream),

View File

@ -68,7 +68,6 @@ unsafe extern "C" fn read_data(
Err(e) => match e {
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
warn!("EOF");
return AVERROR_EOF;
}
},
@ -168,17 +167,15 @@ impl Demuxer {
let pkt: *mut AVPacket = av_packet_alloc();
let ret = av_read_frame(self.ctx, pkt);
if ret == AVERROR_EOF {
return Err(Error::msg("Stream EOF"));
self.chan_out.send(PipelinePayload::Flush)?;
return Ok(());
}
if ret < 0 {
let msg = get_ffmpeg_error_msg(ret);
return Err(Error::msg(msg));
}
let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize);
let pkg = PipelinePayload::AvPacket(
pkt,
AVPacketSource::Demuxer(stream),
);
let pkg = PipelinePayload::AvPacket(pkt, AVPacketSource::Demuxer(stream));
self.chan_out.send(pkg)?;
Ok(())
}

View File

@ -3,10 +3,9 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_dump_format,
av_interleaved_write_frame, av_opt_set, av_packet_clone, av_packet_copy_props,
avcodec_parameters_from_context, avformat_alloc_output_context2,
avformat_free_context, avformat_write_header, AVFormatContext, AVPacket,
av_dump_format, av_interleaved_write_frame, av_opt_set, av_packet_clone, av_packet_copy_props,
avcodec_parameters_from_context, avformat_alloc_output_context2, avformat_free_context,
avformat_write_header, AVFormatContext, AVPacket,
};
use itertools::Itertools;
use log::info;
@ -23,7 +22,7 @@ pub struct HlsEgress {
config: EgressConfig,
ctx: *mut AVFormatContext,
chan_in: UnboundedReceiver<PipelinePayload>,
stream_init: HashSet<usize>,
stream_init: HashSet<Uuid>,
init: bool,
packet_buffer: VecDeque<PipelinePayload>,
}
@ -152,7 +151,12 @@ impl HlsEgress {
src: &AVPacketSource,
) -> Result<(), Error> {
let variant = match src {
AVPacketSource::Encoder(v) => v,
AVPacketSource::Encoder(v) => self
.config
.variants
.iter()
.find(|x| x.id() == *v)
.ok_or(Error::msg("Variant does not exist"))?,
_ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))),
};
(*pkt).stream_index = variant.dst_index() as libc::c_int;
@ -220,10 +224,16 @@ impl PipelineProcessor for HlsEgress {
if self.ctx.is_null() {
self.setup_muxer()?;
}
if !self.stream_init.contains(&var.dst_index()) {
let out_stream = *(*self.ctx).streams.add(var.dst_index());
if !self.stream_init.contains(var) {
let variant = self
.config
.variants
.iter()
.find(|x| x.id() == *var)
.ok_or(Error::msg("Variant does not exist"))?;
let out_stream = *(*self.ctx).streams.add(variant.dst_index());
avcodec_parameters_from_context((*out_stream).codecpar, ctx);
self.stream_init.insert(var.dst_index());
self.stream_init.insert(*var);
}
},
_ => return Err(Error::msg(format!("Payload not supported: {:?}", pkg))),

View File

@ -1,15 +1,23 @@
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::{fs, ptr};
use anyhow::Error;
use ffmpeg_sys_next::{
av_dump_format, av_guess_format, av_interleaved_write_frame, av_strdup, avformat_alloc_context,
avformat_free_context, avio_open2, AVFormatContext, AVPacket, AVIO_FLAG_READ_WRITE,
av_dict_set, av_dump_format, av_guess_format, av_interleaved_write_frame, av_malloc,
av_mallocz, av_opt_set, av_packet_rescale_ts, av_strdup, av_write_trailer,
avformat_alloc_context, avformat_alloc_output_context2, avformat_free_context, avio_open,
avio_open2, AVDictionary, AVFormatContext, AVPacket, AVIO_FLAG_WRITE, AV_DICT_APPEND,
};
use ffmpeg_sys_next::{
avcodec_parameters_from_context, avformat_write_header, AVFMT_GLOBALHEADER,
AV_CODEC_FLAG_GLOBAL_HEADER,
};
use log::info;
use tokio::sync::mpsc::UnboundedReceiver;
use uuid::Uuid;
use crate::egress::{map_variants_to_streams, EgressConfig};
use crate::encode::{dump_pkt_info, set_encoded_pkt_timing};
use crate::pipeline::{AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::VariantStreamType;
@ -19,7 +27,9 @@ pub struct RecorderEgress {
config: EgressConfig,
ctx: *mut AVFormatContext,
chan_in: UnboundedReceiver<PipelinePayload>,
stream_init: HashSet<i32>,
stream_init: HashSet<Uuid>,
init: bool,
packet_buffer: VecDeque<PipelinePayload>,
}
unsafe impl Send for RecorderEgress {}
@ -47,64 +57,73 @@ impl RecorderEgress {
ctx: ptr::null_mut(),
chan_in,
stream_init: HashSet::new(),
init: false,
packet_buffer: VecDeque::new(),
}
}
unsafe fn setup_muxer(&mut self) -> Result<(), Error> {
let ctx = avformat_alloc_context();
if ctx.is_null() {
return Err(Error::msg("Failed to create muxer context"));
}
let base = format!("{}/{}", self.config.out_dir, self.id);
let out_file = format!("{}/recording.mkv\0", base);
let out_file = format!("{}/recording.mp4\0", base);
fs::create_dir_all(base.clone())?;
let ret = avio_open2(
&mut (*ctx).pb,
out_file.as_ptr() as *const libc::c_char,
AVIO_FLAG_READ_WRITE,
ptr::null(),
let mut ctx = ptr::null_mut();
let ret = avformat_alloc_output_context2(
&mut ctx,
ptr::null_mut(),
ptr::null_mut(),
out_file.as_ptr() as *const libc::c_char,
);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
(*ctx).oformat = av_guess_format(
"matroska\0".as_ptr() as *const libc::c_char,
out_file.as_ptr() as *const libc::c_char,
ptr::null(),
);
if (*ctx).oformat.is_null() {
return Err(Error::msg("Output format not found"));
}
(*ctx).url = av_strdup(out_file.as_ptr() as *const libc::c_char);
map_variants_to_streams(ctx, &mut self.config.variants)?;
//let ret = avformat_write_header(ctx, ptr::null_mut());
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
if (*(*ctx).oformat).flags & AVFMT_GLOBALHEADER != 0 {
(*ctx).flags |= AV_CODEC_FLAG_GLOBAL_HEADER as libc::c_int;
}
av_dump_format(ctx, 0, ptr::null(), 1);
av_opt_set(
(*ctx).priv_data,
"movflags\0".as_ptr() as *const libc::c_char,
"+dash+delay_moov+skip_sidx+skip_trailer\0".as_ptr() as *const libc::c_char,
0,
);
self.ctx = ctx;
Ok(())
}
unsafe fn process_pkt(
&mut self,
pkt: *mut AVPacket,
src: &AVPacketSource,
) -> Result<(), Error> {
let variant = match src {
AVPacketSource::Encoder(v) => v,
_ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))),
};
(*pkt).stream_index = variant.dst_index() as libc::c_int;
unsafe fn open_muxer(&mut self) -> Result<bool, Error> {
if !self.init && self.stream_init.len() == self.config.variants.len() {
let ret = avio_open2(
&mut (*self.ctx).pb,
(*self.ctx).url,
AVIO_FLAG_WRITE,
ptr::null_mut(),
ptr::null_mut(),
);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
av_dump_format(self.ctx, 0, ptr::null(), 1);
let ret = avformat_write_header(self.ctx, ptr::null_mut());
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
self.init = true;
Ok(true)
} else {
Ok(self.init)
}
}
unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> {
//dump_pkt_info(pkt);
let ret = av_interleaved_write_frame(self.ctx, pkt);
if ret < 0 {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
Ok(())
}
}
@ -114,10 +133,38 @@ impl PipelineProcessor for RecorderEgress {
while let Ok(pkg) = self.chan_in.try_recv() {
match pkg {
PipelinePayload::AvPacket(pkt, ref src) => unsafe {
if self.open_muxer()? {
while let Some(pkt) = self.packet_buffer.pop_front() {
match pkt {
PipelinePayload::AvPacket(pkt, ref src) => {
self.process_pkt(pkt)?;
}
_ => return Err(Error::msg("")),
}
}
self.process_pkt(pkt)?;
} else {
self.packet_buffer.push_back(pkg);
}
},
PipelinePayload::EncoderInfo(ref var, ctx) => unsafe {
if self.ctx.is_null() {
self.setup_muxer()?;
}
self.process_pkt(pkt, src)?;
if !self.stream_init.contains(var) {
let my_var = self
.config
.variants
.iter()
.find(|x| x.id() == *var)
.ok_or(Error::msg("Variant does not exist"))?;
let out_stream = *(*self.ctx).streams.add(my_var.dst_index());
avcodec_parameters_from_context((*out_stream).codecpar, ctx);
(*(*out_stream).codecpar).codec_tag = 0;
self.stream_init.insert(*var);
info!("Setup encoder info: {}", my_var);
}
},
_ => return Err(Error::msg("Payload not supported")),
}

View File

@ -4,13 +4,12 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read,
av_audio_fifo_size, av_audio_fifo_write,
av_channel_layout_copy, av_frame_alloc, av_frame_free, av_get_sample_fmt_name, av_packet_alloc, av_packet_free, av_samples_alloc_array_and_samples,
avcodec_alloc_context3, avcodec_free_context, avcodec_open2,
avcodec_receive_packet, avcodec_send_frame, swr_alloc_set_opts2,
swr_convert_frame, swr_free, swr_init, AVAudioFifo, AVCodec,
AVCodecContext, AVFrame, SwrContext, AVERROR,
av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_size,
av_audio_fifo_write, av_channel_layout_copy, av_frame_alloc, av_frame_free,
av_get_sample_fmt_name, av_packet_alloc, av_packet_free, av_samples_alloc_array_and_samples,
AVAudioFifo, AVCodec, avcodec_alloc_context3, avcodec_free_context,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame,
AVRational, swr_alloc_set_opts2, swr_convert_frame, swr_free, swr_init, SwrContext,
};
use libc::EAGAIN;
use log::info;
@ -31,7 +30,6 @@ pub struct AudioEncoder<T> {
chan_in: T,
chan_out: UnboundedSender<PipelinePayload>,
pts: i64,
frame_pts: i64,
}
unsafe impl<T> Send for AudioEncoder<T> {}
@ -66,7 +64,6 @@ where
chan_in,
chan_out,
pts: 0,
frame_pts: 0,
}
}
@ -150,10 +147,8 @@ where
}
// let downstream steps know about the encoder
self.chan_out.send(PipelinePayload::EncoderInfo(
VariantStream::Audio(self.variant.clone()),
ctx,
))?;
self.chan_out
.send(PipelinePayload::EncoderInfo(self.variant.id(), ctx))?;
self.ctx = ctx;
self.codec = encoder;
}
@ -256,23 +251,14 @@ where
unsafe fn process_frame(
&mut self,
frame: *mut AVFrame,
src: &AVFrameSource,
in_tb: &AVRational,
) -> Result<(), Error> {
let in_stream = match src {
AVFrameSource::Decoder(s) => *s,
AVFrameSource::Scaler(s) => *s,
};
self.setup_encoder(frame)?;
let frame = self.process_audio_frame(frame)?;
if frame.is_none() {
return Ok(());
}
let mut frame = frame.unwrap();
(*frame).pts = self.frame_pts;
self.frame_pts += (*frame).nb_samples as i64;
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) {
av_frame_free(&mut frame);
@ -290,10 +276,11 @@ where
}
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
set_encoded_pkt_timing(self.ctx, pkt, in_stream, &mut self.pts, &self.variant);
set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant);
self.chan_out.send(PipelinePayload::AvPacket(
pkt,
AVPacketSource::Encoder(VariantStream::Audio(self.variant.clone())),
AVPacketSource::Encoder(self.variant.id()),
))?;
}
@ -310,16 +297,19 @@ where
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(frm, ref src) => unsafe {
let idx = match src {
AVFrameSource::Decoder(s) => (**s).index,
let in_stream = match src {
AVFrameSource::Decoder(s) => *s,
_ => {
return Err(Error::msg(format!("Cannot process frame from: {:?}", src)))
}
};
if self.variant.src_index == idx as usize {
self.process_frame(frm, src)?;
if self.variant.src_index == (*in_stream).index as usize {
self.process_frame(frm, &(*in_stream).time_base)?;
}
},
PipelinePayload::Flush => unsafe {
self.process_frame(ptr::null_mut(), &AVRational { num: 0, den: 1 })?;
},
_ => return Err(Error::msg("Payload not supported")),
}
}

View File

@ -1,7 +1,6 @@
use ffmpeg_sys_next::{
AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
av_rescale_q, AVCodecContext, AVPacket, AVStream,
AV_NOPTS_VALUE, av_packet_rescale_ts, AV_PKT_FLAG_KEY, av_rescale_q, AVCodecContext, AVPacket,
AVRational, AVStream,
};
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use log::info;
@ -15,38 +14,29 @@ pub mod video;
pub unsafe fn set_encoded_pkt_timing<TVar>(
ctx: *mut AVCodecContext,
pkt: *mut AVPacket,
in_stream: *mut AVStream,
in_tb: &AVRational,
pts: &mut i64,
var: &TVar,
) where
TVar: VariantStreamType,
{
let tb = (*ctx).time_base;
let out_tb = (*ctx).time_base;
(*pkt).stream_index = var.dst_index() as libc::c_int;
(*pkt).time_base = var.time_base();
let duration = if (*pkt).duration == 0 {
let tb_sec = tb.den as i64 / tb.num as i64;
if (*pkt).duration == 0 {
let tb_sec = out_tb.den as i64 / out_tb.num as i64;
let fps = (*ctx).framerate.num as i64 * (*ctx).framerate.den as i64;
tb_sec / if fps == 0 { 1 } else { fps }
} else {
av_rescale_q((*pkt).duration, (*in_stream).time_base, tb)
};
if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO || (*ctx).codec_type == AVMEDIA_TYPE_AUDIO {
(*pkt).duration = duration;
(*pkt).duration = tb_sec / if fps == 0 { 1 } else { fps }
}
av_packet_rescale_ts(pkt, *in_tb, out_tb);
(*pkt).time_base = var.time_base();
(*pkt).pos = -1;
if (*pkt).pts == AV_NOPTS_VALUE {
(*pkt).pts = *pts;
*pts += duration;
} else {
(*pkt).pts = av_rescale_q((*pkt).pts, (*in_stream).time_base, tb);
*pts = (*pkt).pts;
*pts += (*pkt).duration;
}
if (*pkt).dts != AV_NOPTS_VALUE {
(*pkt).dts = av_rescale_q((*pkt).dts, (*in_stream).time_base, tb);
} else {
if (*pkt).dts == AV_NOPTS_VALUE {
(*pkt).dts = (*pkt).pts;
}
}

View File

@ -4,17 +4,17 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_packet_alloc, av_packet_free, AVCodec, avcodec_alloc_context3, avcodec_find_encoder,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR,
AVFrame,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame,
AVRational,
};
use libc::EAGAIN;
use tokio::sync::mpsc::UnboundedSender;
use crate::encode::set_encoded_pkt_timing;
use crate::encode::{dump_pkt_info, set_encoded_pkt_timing};
use crate::ipc::Rx;
use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::{VariantStream, VariantStreamType, VideoVariant};
use crate::variant::{VariantStreamType, VideoVariant};
pub struct VideoEncoder<T> {
variant: VideoVariant,
@ -48,7 +48,7 @@ where
}
}
unsafe fn setup_encoder(&mut self, frame: *mut AVFrame) -> Result<(), Error> {
unsafe fn setup_encoder(&mut self) -> Result<(), Error> {
if self.ctx.is_null() {
let codec = self.variant.codec;
let encoder = avcodec_find_encoder(transmute(codec as i32));
@ -69,10 +69,8 @@ where
}
// let downstream steps know about the encoder
self.chan_out.send(PipelinePayload::EncoderInfo(
VariantStream::Video(self.variant.clone()),
ctx,
))?;
self.chan_out
.send(PipelinePayload::EncoderInfo(self.variant.id(), ctx))?;
self.ctx = ctx;
self.codec = encoder;
@ -83,15 +81,8 @@ where
unsafe fn process_frame(
&mut self,
frame: *mut AVFrame,
src: &AVFrameSource,
in_tb: &AVRational,
) -> Result<(), Error> {
let in_stream = match src {
AVFrameSource::Decoder(s) => *s,
AVFrameSource::Scaler(s) => *s,
};
self.setup_encoder(frame)?;
let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) {
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
@ -108,11 +99,11 @@ where
return Err(Error::msg(get_ffmpeg_error_msg(ret)));
}
set_encoded_pkt_timing(self.ctx, pkt, in_stream, &mut self.pts, &self.variant);
assert_ne!((*pkt).data, ptr::null_mut());
set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant);
//dump_pkt_info(pkt);
self.chan_out.send(PipelinePayload::AvPacket(
pkt,
AVPacketSource::Encoder(VariantStream::Video(self.variant.clone())),
AVPacketSource::Encoder(self.variant.id()),
))?;
}
@ -125,19 +116,25 @@ where
TRecv: Rx<PipelinePayload>,
{
fn process(&mut self) -> Result<(), Error> {
unsafe {
self.setup_encoder()?;
}
while let Ok(pkg) = self.chan_in.try_recv_next() {
match pkg {
PipelinePayload::AvFrame(frm, ref src) => unsafe {
let idx = match src {
AVFrameSource::Decoder(s) => (**s).index,
let in_stream = match src {
AVFrameSource::Decoder(s) => *s,
_ => {
return Err(Error::msg(format!("Cannot process frame from: {:?}", src)))
}
};
if self.variant.src_index == idx as usize {
self.process_frame(frm, src)?;
if self.variant.src_index == (*in_stream).index as usize {
self.process_frame(frm, &(*in_stream).time_base)?;
}
},
PipelinePayload::Flush => unsafe {
self.process_frame(ptr::null_mut(), &AVRational { num: 0, den: 1 })?;
},
_ => return Err(Error::msg("Payload not supported")),
}
}

View File

@ -26,7 +26,7 @@ pub async fn listen(path: PathBuf, builder: PipelineBuilder) -> Result<(), anyho
});
if let Ok(mut stream) = tokio::fs::File::open(path).await {
let mut buf = [0u8; 1500];
let mut buf = [0u8; 4096];
loop {
if let Ok(r) = stream.read(&mut buf).await {
if r > 0 {

View File

@ -6,15 +6,18 @@ use std::time::{Duration, SystemTime};
use ffmpeg_sys_next::{
av_frame_alloc, av_frame_copy_props, av_frame_free, av_frame_get_buffer, av_packet_alloc,
av_packet_free, AV_PROFILE_H264_MAIN, av_q2d, avcodec_alloc_context3, avcodec_find_encoder,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVERROR, AVRational, EAGAIN,
SWS_BILINEAR, sws_getContext, sws_scale_frame,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVERROR, AVRational,
EAGAIN, SWS_BILINEAR, sws_getContext, sws_scale_frame,
};
use ffmpeg_sys_next::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_sys_next::AVColorSpace::{AVCOL_SPC_BT709, AVCOL_SPC_RGB};
use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_sys_next::AVPixelFormat::{AV_PIX_FMT_RGB24, AV_PIX_FMT_YUV420P};
use ffmpeg_sys_next::AVPixelFormat::{AV_PIX_FMT_RGB24, AV_PIX_FMT_RGBA, AV_PIX_FMT_YUV420P};
use fontdue::layout::{CoordinateSystem, Layout, TextStyle};
use libc::memcpy;
use log::{error, info};
use tokio::sync::mpsc::unbounded_channel;
use usvg::{Font, Node};
use crate::ingress::ConnectionInfo;
use crate::pipeline::builder::PipelineBuilder;
@ -22,9 +25,9 @@ use crate::pipeline::builder::PipelineBuilder;
pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
info!("Test pattern enabled");
const WIDTH: libc::c_int = 1280;
const HEIGHT: libc::c_int = 720;
const TBN: libc::c_int = 30;
const WIDTH: libc::c_int = 1920;
const HEIGHT: libc::c_int = 1080;
const FPS: libc::c_int = 25;
tokio::spawn(async move {
let (tx, rx) = unbounded_channel();
@ -48,11 +51,11 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
(*enc_ctx).pix_fmt = AV_PIX_FMT_YUV420P;
(*enc_ctx).colorspace = AVCOL_SPC_BT709;
(*enc_ctx).bit_rate = 1_000_000;
(*enc_ctx).framerate = AVRational { num: 30, den: 1 };
(*enc_ctx).framerate = AVRational { num: FPS, den: 1 };
(*enc_ctx).gop_size = 30;
(*enc_ctx).level = 40;
(*enc_ctx).profile = AV_PROFILE_H264_MAIN;
(*enc_ctx).time_base = AVRational { num: 1, den: TBN };
(*enc_ctx).time_base = AVRational { num: 1, den: FPS };
(*enc_ctx).pkt_timebase = (*enc_ctx).time_base;
avcodec_open2(enc_ctx, codec, ptr::null_mut());
@ -63,9 +66,9 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
(*src_frame).pict_type = AV_PICTURE_TYPE_NONE;
(*src_frame).key_frame = 1;
(*src_frame).colorspace = AVCOL_SPC_RGB;
(*src_frame).format = AV_PIX_FMT_RGB24 as libc::c_int;
(*src_frame).format = AV_PIX_FMT_RGBA as libc::c_int;
(*src_frame).time_base = (*enc_ctx).time_base;
av_frame_get_buffer(src_frame, 0);
av_frame_get_buffer(src_frame, 1);
let sws = sws_getContext(
WIDTH as libc::c_int,
@ -82,26 +85,47 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
let svg_data = std::fs::read("./test.svg").unwrap();
let tree = usvg::Tree::from_data(&svg_data, &Default::default()).unwrap();
let mut pixmap = tiny_skia::Pixmap::new(WIDTH as u32, HEIGHT as u32).unwrap();
let render_ts = tiny_skia::Transform::from_scale(0.5, 0.5);
let render_ts = tiny_skia::Transform::from_scale(1f32, 1f32);
resvg::render(&tree, render_ts, &mut pixmap.as_mut());
for x in 0..WIDTH as u32 {
for y in 0..HEIGHT as u32 {
if let Some(px) = pixmap.pixel(x, y) {
let offset = 3 * x + y * (*src_frame).linesize[0] as u32;
let pixel = (*src_frame).data[0].add(offset as usize);
*pixel.offset(0) = px.red();
*pixel.offset(1) = px.green();
*pixel.offset(2) = px.blue();
}
}
}
let font = include_bytes!("../../SourceCodePro-Regular.ttf") as &[u8];
let scp = fontdue::Font::from_bytes(font, Default::default()).unwrap();
let mut layout = Layout::new(CoordinateSystem::PositiveYDown);
let fonts = &[&scp];
let mut frame_number: u64 = 0;
let start = SystemTime::now();
loop {
frame_number += 1;
(*src_frame).pts = (TBN as u64 * frame_number) as i64;
(*src_frame).pts = frame_number as i64;
(*src_frame).duration = 1;
memcpy(
(*src_frame).data[0] as *mut libc::c_void,
pixmap.data().as_ptr() as *const libc::c_void,
(WIDTH * HEIGHT * 4) as libc::size_t,
);
layout.clear();
layout.append(
fonts,
&TextStyle::new(&format!("frame={}", frame_number), 40.0, 0),
);
for g in layout.glyphs() {
let (metrics, bitmap) = scp.rasterize_config_subpixel(g.key);
for y in 0..metrics.height {
for x in 0..metrics.width {
let dst_x = x + g.x as usize;
let dst_y = y + g.y as usize;
let offset_src = (x + y * metrics.width) * 3;
let offset_dst =
4 * dst_x + dst_y * (*src_frame).linesize[0] as usize;
let pixel_dst = (*src_frame).data[0].add(offset_dst);
*pixel_dst.offset(0) = bitmap[offset_src];
*pixel_dst.offset(1) = bitmap[offset_src + 1];
*pixel_dst.offset(2) = bitmap[offset_src + 2];
}
}
}
let mut dst_frame = av_frame_alloc();
av_frame_copy_props(dst_frame, src_frame);
@ -127,20 +151,11 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
(*av_pkt).data,
(*av_pkt).size as usize,
));
for z in 0..(buf.len() as f32 / 1024.0).ceil() as usize {
if let Err(e) = tx.send(buf.slice(z..(z + 1024).min(buf.len()))) {
error!("Failed to write data {}", e);
break;
}
if let Err(e) = tx.send(buf) {
error!("Failed to send test pkt: {}", e);
return;
}
}
let stream_time = Duration::from_secs_f64(
frame_number as libc::c_double * av_q2d((*enc_ctx).time_base),
);
let real_time = SystemTime::now().duration_since(start).unwrap();
let wait_time = stream_time - real_time;
std::thread::sleep(wait_time);
}
}
}

View File

@ -64,11 +64,11 @@ async fn main() -> anyhow::Result<()> {
"0.0.0.0:8080".to_owned(),
settings.clone(),
)));
listeners.push(tokio::spawn(ingress::file::listen(
/*listeners.push(tokio::spawn(ingress::file::listen(
"/home/kieran/high_flight.mp4".parse().unwrap(),
builder.clone(),
)));
//listeners.push(tokio::spawn(ingress::test::listen(builder.clone())));
)));*/
listeners.push(tokio::spawn(ingress::test::listen(builder.clone())));
for handle in listeners {
if let Err(e) = handle.await {

View File

@ -6,6 +6,7 @@ use ffmpeg_sys_next::{
av_packet_free, AVCodecContext, AVFrame, AVPacket, AVStream,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::demux::info::DemuxStreamInfo;
use crate::egress::EgressConfig;
@ -70,9 +71,9 @@ pub enum AVPacketSource {
/// AVPacket from demuxer
Demuxer(*mut AVStream),
/// AVPacket from an encoder
Encoder(VariantStream),
Encoder(Uuid),
/// AVPacket from muxer
Muxer(VariantStream),
Muxer(Uuid),
}
#[derive(Debug, PartialEq, Clone)]
@ -81,6 +82,8 @@ pub enum AVFrameSource {
Decoder(*mut AVStream),
/// AVPacket from frame scaler step
Scaler(*mut AVStream),
/// Flush frame (empty)
Flush,
}
#[derive(Debug, PartialEq)]
@ -96,7 +99,9 @@ pub enum PipelinePayload {
/// Information about the input stream
SourceInfo(DemuxStreamInfo),
/// Information about an encoder in this pipeline
EncoderInfo(VariantStream, *const AVCodecContext),
EncoderInfo(Uuid, *const AVCodecContext),
/// Flush pipeline
Flush,
}
unsafe impl Send for PipelinePayload {}
@ -122,6 +127,7 @@ impl Clone for PipelinePayload {
},
PipelinePayload::SourceInfo(i) => PipelinePayload::SourceInfo(i.clone()),
PipelinePayload::EncoderInfo(v, s) => PipelinePayload::EncoderInfo(v.clone(), *s),
PipelinePayload::Flush => PipelinePayload::Flush,
}
}
}

View File

@ -3,14 +3,14 @@ use std::ptr;
use anyhow::Error;
use ffmpeg_sys_next::{
av_frame_alloc, av_frame_copy_props, AVFrame, SWS_BILINEAR,
sws_freeContext, sws_getContext, sws_scale_frame, SwsContext,
av_frame_alloc, av_frame_copy_props, AVFrame, SWS_BILINEAR, sws_freeContext, sws_getContext,
sws_scale_frame, SwsContext,
};
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use crate::pipeline::{AVFrameSource, PipelinePayload, PipelineProcessor};
use crate::utils::{get_ffmpeg_error_msg};
use crate::utils::get_ffmpeg_error_msg;
use crate::variant::VideoVariant;
pub struct Scaler {
@ -105,6 +105,10 @@ impl PipelineProcessor for Scaler {
self.process_frame(frm, src)?;
}
},
PipelinePayload::Flush => {
// pass flush to next step
self.chan_out.send(PipelinePayload::Flush)?;
}
_ => return Err(Error::msg("Payload not supported payload")),
}
}

View File

@ -2,7 +2,7 @@ use anyhow::Error;
use tokio::sync::mpsc::UnboundedSender;
use crate::ipc::Rx;
use crate::pipeline::{AVFrameSource, PipelinePayload, PipelineProcessor};
use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::variant::{VariantStream, VariantStreamType};
pub struct TagFrame<TRecv> {
@ -38,15 +38,7 @@ where
{
fn process(&mut self) -> Result<(), Error> {
while let Ok(pkg) = self.chan_in.try_recv_next() {
if let PipelinePayload::AvFrame(_, src) = &pkg {
let idx = match &src {
AVFrameSource::Decoder(s) => unsafe { (**s).index },
_ => return Err(Error::msg(format!("Cannot process frame from: {:?}", src))),
};
if self.variant.src_index() == idx as usize {
self.chan_out.send(pkg)?;
}
}
self.chan_out.send(pkg)?;
}
Ok(())
}

View File

@ -246,10 +246,9 @@ impl VariantStreamType for VideoVariant {
let key_frames = self.fps * self.keyframe_interval;
(*ctx).gop_size = key_frames as libc::c_int;
(*ctx).keyint_min = key_frames as libc::c_int;
(*ctx).max_b_frames = 1;
(*ctx).max_b_frames = 3;
(*ctx).pix_fmt = AV_PIX_FMT_YUV420P;
(*ctx).colorspace = AVCOL_SPC_BT709;
(*ctx).color_range = AVCOL_RANGE_MPEG;
if (*codec).id == AV_CODEC_ID_H264 {
av_opt_set(
(*ctx).priv_data,

View File

@ -64,16 +64,16 @@ impl Webhook {
id: Uuid::new_v4(),
recording: vec![],
egress: vec![
/*EgressType::Recorder(EgressConfig {
EgressType::Recorder(EgressConfig {
name: "REC".to_owned(),
out_dir: self.config.output_dir.clone(),
variants: vars.clone(),
}),*/
EgressType::HLS(EgressConfig {
}),
/*EgressType::HLS(EgressConfig {
name: "HLS".to_owned(),
out_dir: self.config.output_dir.clone(),
variants: vars.clone(),
}),
}),*/
],
}
}