Only set PTS

This commit is contained in:
kieran 2024-08-30 14:38:50 +01:00
parent 2afbde6c53
commit 57e3eed69e
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
10 changed files with 65 additions and 40 deletions

View File

@ -2,16 +2,14 @@ use std::collections::HashMap;
use std::ptr; use std::ptr;
use anyhow::Error; use anyhow::Error;
use ffmpeg_sys_next::{ use ffmpeg_sys_next::{av_frame_alloc, AVCodec, avcodec_alloc_context3, avcodec_find_decoder, avcodec_free_context, avcodec_open2, avcodec_parameters_copy, avcodec_parameters_to_context, avcodec_receive_frame, avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket};
av_frame_alloc, AVCodec, avcodec_alloc_context3, avcodec_find_decoder,
avcodec_free_context, avcodec_open2, avcodec_parameters_to_context, avcodec_receive_frame,
avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket,
};
use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_NONE; use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_NONE;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedReceiver;
use crate::encode::set_encoded_pkt_timing;
use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload}; use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload};
use crate::variant::{VariantStream, VideoVariant};
struct CodecContext { struct CodecContext {
pub context: *mut AVCodecContext, pub context: *mut AVCodecContext,
@ -32,6 +30,7 @@ pub struct Decoder {
chan_in: UnboundedReceiver<PipelinePayload>, chan_in: UnboundedReceiver<PipelinePayload>,
chan_out: broadcast::Sender<PipelinePayload>, chan_out: broadcast::Sender<PipelinePayload>,
codecs: HashMap<i32, CodecContext>, codecs: HashMap<i32, CodecContext>,
pts: i64,
} }
unsafe impl Send for Decoder {} unsafe impl Send for Decoder {}
@ -47,6 +46,7 @@ impl Decoder {
chan_in, chan_in,
chan_out, chan_out,
codecs: HashMap::new(), codecs: HashMap::new(),
pts: 0,
} }
} }
@ -83,9 +83,10 @@ impl Decoder {
if context.is_null() { if context.is_null() {
return Err(Error::msg("Failed to alloc context")); return Err(Error::msg("Failed to alloc context"));
} }
if avcodec_parameters_to_context(context, codec_par) != 0 { if avcodec_parameters_to_context(context, (*stream).codecpar) != 0 {
return Err(Error::msg("Failed to copy codec parameters to context")); return Err(Error::msg("Failed to copy codec parameters to context"));
} }
(*context).pkt_timebase = (*stream).time_base;
if avcodec_open2(context, codec, ptr::null_mut()) < 0 { if avcodec_open2(context, codec, ptr::null_mut()) < 0 {
return Err(Error::msg("Failed to open codec")); return Err(Error::msg("Failed to open codec"));
} }
@ -108,7 +109,6 @@ impl Decoder {
return Err(Error::msg(format!("Failed to decode {}", ret))); return Err(Error::msg(format!("Failed to decode {}", ret)));
} }
(*frame).pts = (*frame).best_effort_timestamp;
(*frame).pict_type = AV_PICTURE_TYPE_NONE; // encoder prints warnings (*frame).pict_type = AV_PICTURE_TYPE_NONE; // encoder prints warnings
self.chan_out.send(PipelinePayload::AvFrame( self.chan_out.send(PipelinePayload::AvFrame(
frame, frame,

View File

@ -10,6 +10,7 @@ use tokio::sync::mpsc::error::TryRecvError;
use tokio::time::Instant; use tokio::time::Instant;
use crate::demux::info::{DemuxStreamInfo, StreamChannelType, StreamInfoChannel}; use crate::demux::info::{DemuxStreamInfo, StreamChannelType, StreamInfoChannel};
use crate::encode::set_encoded_pkt_timing;
use crate::pipeline::{AVPacketSource, PipelinePayload}; use crate::pipeline::{AVPacketSource, PipelinePayload};
use crate::utils::get_ffmpeg_error_msg; use crate::utils::get_ffmpeg_error_msg;

View File

@ -157,6 +157,12 @@ impl HlsEgress {
.iter() .iter()
.find(|x| x.id() == *v) .find(|x| x.id() == *v)
.ok_or(Error::msg("Variant does not exist"))?, .ok_or(Error::msg("Variant does not exist"))?,
AVPacketSource::Muxer(v) => self
.config
.variants
.iter()
.find(|x| x.id() == *v)
.ok_or(Error::msg("Variant does not exist"))?,
_ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))), _ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))),
}; };
(*pkt).stream_index = variant.dst_index() as libc::c_int; (*pkt).stream_index = variant.dst_index() as libc::c_int;
@ -176,6 +182,7 @@ impl HlsEgress {
) -> Result<(), Error> { ) -> Result<(), Error> {
let variant = match &src { let variant = match &src {
AVPacketSource::Encoder(v) => v, AVPacketSource::Encoder(v) => v,
AVPacketSource::Muxer(v) => v,
_ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))), _ => return Err(Error::msg(format!("Cannot mux packet from {:?}", src))),
}; };
if !self.init { if !self.init {

View File

@ -3,14 +3,7 @@ use std::mem::transmute;
use std::ptr; use std::ptr;
use anyhow::Error; use anyhow::Error;
use ffmpeg_sys_next::{ use ffmpeg_sys_next::{av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_size, av_audio_fifo_write, av_channel_layout_copy, av_frame_alloc, av_frame_free, av_get_sample_fmt_name, av_packet_alloc, av_packet_free, av_packet_rescale_ts, av_samples_alloc_array_and_samples, AVAudioFifo, AVCodec, avcodec_alloc_context3, avcodec_free_context, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational, swr_alloc_set_opts2, swr_convert_frame, swr_free, swr_init, SwrContext};
av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_size,
av_audio_fifo_write, av_channel_layout_copy, av_frame_alloc, av_frame_free,
av_get_sample_fmt_name, av_packet_alloc, av_packet_free, av_samples_alloc_array_and_samples,
AVAudioFifo, AVCodec, avcodec_alloc_context3, avcodec_free_context,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame,
AVRational, swr_alloc_set_opts2, swr_convert_frame, swr_free, swr_init, SwrContext,
};
use libc::EAGAIN; use libc::EAGAIN;
use log::info; use log::info;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
@ -259,6 +252,11 @@ where
return Ok(()); return Ok(());
} }
let mut frame = frame.unwrap(); let mut frame = frame.unwrap();
// examples do it like this
(*frame).pts = self.pts;
self.pts += (*frame).nb_samples as i64;
let mut ret = avcodec_send_frame(self.ctx, frame); let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) { if ret < 0 && ret != AVERROR(EAGAIN) {
av_frame_free(&mut frame); av_frame_free(&mut frame);
@ -277,7 +275,8 @@ where
return Err(Error::msg(get_ffmpeg_error_msg(ret))); return Err(Error::msg(get_ffmpeg_error_msg(ret)));
} }
set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant); //set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant);
av_packet_rescale_ts(pkt, *in_tb, self.variant.time_base());
self.chan_out.send(PipelinePayload::AvPacket( self.chan_out.send(PipelinePayload::AvPacket(
pkt, pkt,
AVPacketSource::Encoder(self.variant.id()), AVPacketSource::Encoder(self.variant.id()),

View File

@ -1,7 +1,7 @@
use ffmpeg_sys_next::{ use ffmpeg_sys_next::{
AV_NOPTS_VALUE, av_packet_rescale_ts, AV_PKT_FLAG_KEY, AVCodecContext, AVPacket, AV_NOPTS_VALUE, av_packet_rescale_ts, AV_PKT_FLAG_KEY, AVCodecContext, AVPacket, AVRational,
AVRational,
}; };
use ffmpeg_sys_next::AVMediaType::AVMEDIA_TYPE_VIDEO;
use log::info; use log::info;
use crate::variant::VariantStreamType; use crate::variant::VariantStreamType;
@ -22,8 +22,8 @@ pub unsafe fn set_encoded_pkt_timing<TVar>(
let out_tb = (*ctx).time_base; let out_tb = (*ctx).time_base;
(*pkt).stream_index = var.dst_index() as libc::c_int; (*pkt).stream_index = var.dst_index() as libc::c_int;
if (*pkt).duration == 0 { if (*pkt).duration <= 0 && (*ctx).codec_type == AVMEDIA_TYPE_VIDEO {
let tb_sec = out_tb.den as i64 / out_tb.num as i64; let tb_sec = in_tb.den as i64 / in_tb.num as i64;
let fps = (*ctx).framerate.num as i64 * (*ctx).framerate.den as i64; let fps = (*ctx).framerate.num as i64 * (*ctx).framerate.den as i64;
(*pkt).duration = tb_sec / if fps == 0 { 1 } else { fps } (*pkt).duration = tb_sec / if fps == 0 { 1 } else { fps }
} }

View File

@ -2,15 +2,11 @@ use std::mem::transmute;
use std::ptr; use std::ptr;
use anyhow::Error; use anyhow::Error;
use ffmpeg_sys_next::{ use ffmpeg_sys_next::{av_packet_alloc, av_packet_free, av_packet_rescale_ts, AVCodec, avcodec_alloc_context3, avcodec_find_encoder, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational};
av_packet_alloc, av_packet_free, AVCodec, avcodec_alloc_context3, avcodec_find_encoder,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame,
AVRational,
};
use libc::EAGAIN; use libc::EAGAIN;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use crate::encode::dump_pkt_info;
use crate::encode::set_encoded_pkt_timing;
use crate::ipc::Rx; use crate::ipc::Rx;
use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor}; use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor};
use crate::utils::get_ffmpeg_error_msg; use crate::utils::get_ffmpeg_error_msg;
@ -83,6 +79,9 @@ where
frame: *mut AVFrame, frame: *mut AVFrame,
in_tb: &AVRational, in_tb: &AVRational,
) -> Result<(), Error> { ) -> Result<(), Error> {
(*frame).pts = self.pts;
self.pts += (*frame).duration;
let mut ret = avcodec_send_frame(self.ctx, frame); let mut ret = avcodec_send_frame(self.ctx, frame);
if ret < 0 && ret != AVERROR(EAGAIN) { if ret < 0 && ret != AVERROR(EAGAIN) {
return Err(Error::msg(get_ffmpeg_error_msg(ret))); return Err(Error::msg(get_ffmpeg_error_msg(ret)));
@ -99,7 +98,8 @@ where
return Err(Error::msg(get_ffmpeg_error_msg(ret))); return Err(Error::msg(get_ffmpeg_error_msg(ret)));
} }
set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant); //set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant);
av_packet_rescale_ts(pkt, *in_tb, self.variant.time_base());
//dump_pkt_info(pkt); //dump_pkt_info(pkt);
self.chan_out.send(PipelinePayload::AvPacket( self.chan_out.send(PipelinePayload::AvPacket(
pkt, pkt,

View File

@ -1,10 +1,11 @@
use std::{ptr, slice}; use std::{ptr, slice};
use std::mem::transmute; use std::mem::transmute;
use std::ops::Add; use std::ops::Add;
use std::time::{Duration, SystemTime};
use ffmpeg_sys_next::{ use ffmpeg_sys_next::{
av_frame_alloc, av_frame_copy_props, av_frame_free, av_frame_get_buffer, av_packet_alloc, av_frame_alloc, av_frame_copy_props, av_frame_free, av_frame_get_buffer, av_packet_alloc,
av_packet_free, AV_PROFILE_H264_MAIN, avcodec_alloc_context3, avcodec_find_encoder, av_packet_free, AV_PROFILE_H264_MAIN, av_q2d, avcodec_alloc_context3, avcodec_find_encoder,
avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVERROR, AVRational, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVERROR, AVRational,
EAGAIN, SWS_BILINEAR, sws_getContext, sws_scale_frame, EAGAIN, SWS_BILINEAR, sws_getContext, sws_scale_frame,
}; };
@ -15,6 +16,7 @@ use ffmpeg_sys_next::AVPixelFormat::{AV_PIX_FMT_RGBA, AV_PIX_FMT_YUV420P};
use fontdue::layout::{CoordinateSystem, Layout, TextStyle}; use fontdue::layout::{CoordinateSystem, Layout, TextStyle};
use libc::memcpy; use libc::memcpy;
use log::{error, info}; use log::{error, info};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::unbounded_channel;
use crate::ingress::ConnectionInfo; use crate::ingress::ConnectionInfo;
@ -27,15 +29,16 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
const HEIGHT: libc::c_int = 1080; const HEIGHT: libc::c_int = 1080;
const FPS: libc::c_int = 25; const FPS: libc::c_int = 25;
tokio::spawn(async move { std::thread::spawn(move || {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let info = ConnectionInfo { let info = ConnectionInfo {
ip_addr: "".to_owned(), ip_addr: "".to_owned(),
endpoint: "test-pattern".to_owned(), endpoint: "test-pattern".to_owned(),
}; };
if let Ok(mut pl) = builder.build_for(info, rx).await { let rt = Runtime::new().unwrap();
std::thread::spawn(move || loop { if let Ok(mut pl) = rt.block_on(builder.build_for(info, rx)) {
let pipeline = std::thread::spawn(move || loop {
if let Err(e) = pl.run() { if let Err(e) = pl.run() {
error!("Pipeline error: {}\n{}", e, e.backtrace()); error!("Pipeline error: {}\n{}", e, e.backtrace());
break; break;
@ -91,6 +94,7 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
let mut layout = Layout::new(CoordinateSystem::PositiveYDown); let mut layout = Layout::new(CoordinateSystem::PositiveYDown);
let fonts = &[&scp]; let fonts = &[&scp];
let start = SystemTime::now();
let mut frame_number: u64 = 0; let mut frame_number: u64 = 0;
loop { loop {
frame_number += 1; frame_number += 1;
@ -151,9 +155,23 @@ pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> {
)); ));
if let Err(e) = tx.send(buf) { if let Err(e) = tx.send(buf) {
error!("Failed to send test pkt: {}", e); error!("Failed to send test pkt: {}", e);
return;
pipeline.join().unwrap();
return ;
} }
} }
let stream_time = Duration::from_secs_f64(
frame_number as libc::c_double * av_q2d((*enc_ctx).time_base),
);
let real_time = SystemTime::now().duration_since(start).unwrap();
let wait_time = if stream_time > real_time {
stream_time - real_time
} else {
Duration::new(0, 0)
};
if !wait_time.is_zero() {
std::thread::sleep(wait_time);
}
} }
} }
} }

View File

@ -65,7 +65,7 @@ async fn main() -> anyhow::Result<()> {
settings.clone(), settings.clone(),
))); )));
/*listeners.push(tokio::spawn(ingress::file::listen( /*listeners.push(tokio::spawn(ingress::file::listen(
"/home/kieran/high_flight.mp4".parse().unwrap(), "/home/kieran/waypoint_flight.mp4".parse().unwrap(),
builder.clone(), builder.clone(),
)));*/ )));*/
listeners.push(tokio::spawn(ingress::test::listen(builder.clone()))); listeners.push(tokio::spawn(ingress::test::listen(builder.clone())));

View File

@ -91,7 +91,7 @@ impl VariantStreamType for VariantStream {
} }
/// Information related to variant streams for a given egress /// Information related to variant streams for a given egress
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
pub struct VideoVariant { pub struct VideoVariant {
/// Unique ID of this variant /// Unique ID of this variant
pub id: Uuid, pub id: Uuid,
@ -311,7 +311,7 @@ impl VariantStreamType for AudioVariant {
fn time_base(&self) -> AVRational { fn time_base(&self) -> AVRational {
AVRational { AVRational {
num: 1, num: 1,
den: 90_000, den: self.sample_rate as libc::c_int,
} }
} }
@ -345,7 +345,7 @@ impl VariantStreamType for AudioVariant {
(*params).bit_rate = self.bitrate as i64; (*params).bit_rate = self.bitrate as i64;
(*params).sample_rate = self.sample_rate as libc::c_int; (*params).sample_rate = self.sample_rate as libc::c_int;
(*params).ch_layout = self.channel_layout(); (*params).ch_layout = self.channel_layout();
(*params).frame_size = 1024; (*params).frame_size = 1024; //TODO: fix this
} }
unsafe fn to_stream(&self, stream: *mut AVStream) { unsafe fn to_stream(&self, stream: *mut AVStream) {

View File

@ -64,16 +64,16 @@ impl Webhook {
id: Uuid::new_v4(), id: Uuid::new_v4(),
recording: vec![], recording: vec![],
egress: vec![ egress: vec![
EgressType::Recorder(EgressConfig { /*EgressType::Recorder(EgressConfig {
name: "REC".to_owned(), name: "REC".to_owned(),
out_dir: self.config.output_dir.clone(), out_dir: self.config.output_dir.clone(),
variants: vars.clone(), variants: vars.clone(),
}), }),*/
/*EgressType::HLS(EgressConfig { EgressType::HLS(EgressConfig {
name: "HLS".to_owned(), name: "HLS".to_owned(),
out_dir: self.config.output_dir.clone(), out_dir: self.config.output_dir.clone(),
variants: vars.clone(), variants: vars.clone(),
}),*/ }),
], ],
} }
} }