From e4ec570239568bbe7000c447275ad8c4cd9306bd Mon Sep 17 00:00:00 2001 From: kieran Date: Mon, 11 Nov 2024 11:31:23 +0000 Subject: [PATCH] feat: transcoder --- .gitignore | 4 +- cashu.png | Bin 862 -> 0 bytes examples/main.rs | 6 +-- src/decode.rs | 8 ++-- src/demux.rs | 29 +++++++----- src/encode.rs | 20 +++++++- src/muxer.rs | 47 +++++++++++++------ src/transcode.rs | 120 ++++++++++++++++++++++++++++++++++++++++++++++- 8 files changed, 194 insertions(+), 40 deletions(-) delete mode 100644 cashu.png diff --git a/.gitignore b/.gitignore index 86b2d2e..8348705 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,4 @@ /.idea # Test output -*.mp4 -*.png -*.mkv \ No newline at end of file +/test_output \ No newline at end of file diff --git a/cashu.png b/cashu.png deleted file mode 100644 index f545ca02fd6ae886b39700279273d3bb566dab28..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 862 zcmeAS@N?(olHy`uVBq!ia0vp^?La(}gAGWsT5CQ8Qk(@Ik;On71Q;1wD#d{uW&uwZ z$B>FSZ*Ljqi8zY5Uc9Nzoha(@rh#?B1i_7rK}Opg7-o73&U*e*uKfG|(9(5Zlv)mc z?HAZoyIc0kbpH2oeK*d(pWl8vFyA7#K!dG4357VmDA)4UcK`SMH~ZJix8J_h@Ss7N zja|mj;z7qjMt2@Qi==`Bg3Zi*5;EP73Mvr1f+IK;;*d(dcsQRE$vjkL$FuiFyo?I} zYq5HL>DB(tzo5+W|Cj!6|9<^^%&+s0v-dCm`26y&i*mw!+|q9iAMV%yVy^!6y*qZh zquh5%nO?j3g-y(TH#{ulYFAdDKi-t=40OEr{hZyVHFx&jH@W?JqyL|pV{5JZw%53q zo}bTrd@jTj8`s*uDX#vsqq_VF|0^5rr@gb)A^y*KI$tsN?{jB)>_Jj+BJ$6i8F0WEm+g@=KK8k_BRF&1z#j(3JwJBedT-@sHnXSsGOe{s8L=DXoQ^! z(6HJUngtA7p8a{v#12%+2qG>fTxD+psbm4E1gBO-SW_BO*0=wIv&P|uA^Ty>* yK8t%-rum;yv-vY^m3&!S==~2{$f0FRyZ3)(U#0x9tal$Mmw3ARxvX Demuxer { let mut data: Vec = Vec::new(); File::open(path).unwrap().read_to_end(&mut data).unwrap(); let reader = Cursor::new(data); - Demuxer::new_custom_io(reader, None) + Demuxer::new_custom_io(reader, None).unwrap() } fn read_as_file(path_buf: PathBuf) -> Demuxer { - Demuxer::new(path_buf.to_str().unwrap()) + Demuxer::new(path_buf.to_str().unwrap()).unwrap() } fn scan_input(mut demuxer: Demuxer) { @@ -67,7 +67,7 @@ unsafe fn loop_decoder(mut demuxer: Demuxer, mut decoder: Decoder) { continue; } if let Ok(frames) = decoder.decode_pkt(pkt, stream) { - for (mut frame, _stream) in frames { + for mut frame in frames { // do nothing but decode entire stream if media_type == AVMediaType::AVMEDIA_TYPE_VIDEO { frame = get_frame_from_hw(frame).expect("get frame failed"); diff --git a/src/decode.rs b/src/decode.rs index ec21b46..417f421 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -238,7 +238,7 @@ impl Decoder { } /// Flush all decoders - pub unsafe fn flush(&mut self) -> Result, Error> { + pub unsafe fn flush(&mut self) -> Result, Error> { let mut pkgs = Vec::new(); for ctx in self.codecs.values_mut() { pkgs.extend(Self::decode_pkt_internal( @@ -254,7 +254,7 @@ impl Decoder { ctx: *mut AVCodecContext, pkt: *mut AVPacket, stream: *mut AVStream, - ) -> Result, Error> { + ) -> Result, Error> { let mut ret = avcodec_send_packet(ctx, pkt); bail_ffmpeg!(ret, "Failed to decode packet"); @@ -270,7 +270,7 @@ impl Decoder { } (*frame).pict_type = AV_PICTURE_TYPE_NONE; // encoder prints warnings - pkgs.push((frame, stream)); + pkgs.push(frame); } Ok(pkgs) } @@ -279,7 +279,7 @@ impl Decoder { &mut self, pkt: *mut AVPacket, stream: *mut AVStream, - ) -> Result, Error> { + ) -> Result, Error> { if pkt.is_null() { return self.flush(); } diff --git a/src/demux.rs b/src/demux.rs index 85cc9c2..c209d12 100644 --- a/src/demux.rs +++ b/src/demux.rs @@ -1,6 +1,6 @@ use crate::{bail_ffmpeg, cstr}; -use crate::{get_ffmpeg_error_msg, DemuxerInfo, StreamChannelType, StreamInfoChannel}; -use anyhow::Error; +use crate::{DemuxerInfo, StreamChannelType, StreamInfoChannel}; +use anyhow::{bail, Error, Result}; use ffmpeg_sys_the_third::*; use slimbox::{slimbox_unsize, SlimBox, SlimMut}; use std::collections::HashMap; @@ -36,26 +36,32 @@ pub struct Demuxer { impl Demuxer { /// Create a new [Demuxer] from a file path or url - pub fn new(input: &str) -> Self { + pub fn new(input: &str) -> Result { unsafe { let ctx = avformat_alloc_context(); - Self { + if ctx.is_null() { + bail!("Failed to allocate AV context"); + } + Ok(Self { ctx, input: DemuxerInput::Url(input.to_string()), - } + }) } } /// Create a new [Demuxer] from an object that implements [Read] - pub fn new_custom_io(reader: R, url: Option) -> Self { + pub fn new_custom_io(reader: R, url: Option) -> Result { unsafe { let ctx = avformat_alloc_context(); + if ctx.is_null() { + bail!("Failed to allocate AV context"); + } (*ctx).flags |= AVFMT_FLAG_CUSTOM_IO; - Self { + Ok(Self { ctx, input: DemuxerInput::Reader(Some(slimbox_unsize!(reader)), url), - } + }) } } @@ -172,11 +178,10 @@ impl Demuxer { if ret == AVERROR_EOF { return Ok((ptr::null_mut(), ptr::null_mut())); } - if ret < 0 { - let msg = get_ffmpeg_error_msg(ret); - return Err(Error::msg(msg)); - } + bail_ffmpeg!(ret); + let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize); + (*pkt).time_base = (*stream).time_base; let pkg = (pkt, stream); Ok(pkg) } diff --git a/src/encode.rs b/src/encode.rs index 967b123..dd30d87 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -15,6 +15,7 @@ use libc::EAGAIN; pub struct Encoder { ctx: *mut AVCodecContext, codec: *const AVCodec, + dst_stream_index: Option, } impl Drop for Encoder { @@ -41,7 +42,11 @@ impl Encoder { } // set some defaults (*ctx).time_base = AVRational { num: 1, den: 1 }; - Ok(Self { ctx, codec }) + Ok(Self { + ctx, + codec, + dst_stream_index: None, + }) } } @@ -71,6 +76,13 @@ impl Encoder { Ok(slice::from_raw_parts(dst, num_dst as usize)) } + /// Store the destination stream index along with the encoder + /// AVPacket's created by this encoder will have stream_index assigned to this value + pub unsafe fn with_stream_index(mut self, index: i32) -> Self { + self.dst_stream_index = Some(index); + self + } + /// Set the encoder bitrate pub unsafe fn with_bitrate(self, bitrate: i64) -> Self { (*self.ctx).bit_rate = bitrate; @@ -187,6 +199,9 @@ impl Encoder { bail!(get_ffmpeg_error_msg(ret)); } (*pkt).time_base = (*self.ctx).time_base; + if let Some(idx) = self.dst_stream_index { + (*pkt).stream_index = idx; + } pkgs.push(pkt); } @@ -212,7 +227,8 @@ mod tests { encoder.list_configs(AVCodecConfig::AV_CODEC_CONFIG_PIX_FORMAT)?; encoder = encoder.with_pix_fmt(pix_fmts[0]).open(None)?; - let mut test_file = std::fs::File::create("test.png")?; + std::fs::create_dir_all("test_output")?; + let mut test_file = std::fs::File::create("test_output/test.png")?; let mut pkts = encoder.encode_frame(frame)?; let flush_pkts = encoder.encode_frame(ptr::null_mut())?; pkts.extend(flush_pkts); diff --git a/src/muxer.rs b/src/muxer.rs index 3976607..f4d0aa4 100644 --- a/src/muxer.rs +++ b/src/muxer.rs @@ -2,9 +2,10 @@ use crate::{bail_ffmpeg, cstr, set_opts, Encoder}; use anyhow::{bail, Result}; use ffmpeg_sys_the_third::{ av_dump_format, av_interleaved_write_frame, av_packet_rescale_ts, av_write_trailer, - avcodec_parameters_from_context, avformat_alloc_output_context2, avformat_free_context, - avformat_new_stream, avformat_write_header, avio_flush, avio_open, AVFormatContext, AVPacket, - AVFMT_GLOBALHEADER, AVFMT_NOFILE, AVIO_FLAG_WRITE, AV_CODEC_FLAG_GLOBAL_HEADER, + avcodec_parameters_copy, avcodec_parameters_from_context, avformat_alloc_output_context2, + avformat_free_context, avformat_new_stream, avformat_write_header, avio_flush, avio_open, + AVFormatContext, AVPacket, AVStream, AVFMT_GLOBALHEADER, AVFMT_NOFILE, AVIO_FLAG_WRITE, + AV_CODEC_FLAG_GLOBAL_HEADER, }; use std::collections::HashMap; use std::path::PathBuf; @@ -75,7 +76,13 @@ impl Muxer { } /// Add a stream to the output using an existing encoder - pub unsafe fn with_stream_encoder(self, encoder: &Encoder) -> Result { + pub unsafe fn with_stream_encoder(mut self, encoder: &Encoder) -> Result { + self.add_stream_encoder(encoder)?; + Ok(self) + } + + /// Add a stream to the output using an existing encoder + pub unsafe fn add_stream_encoder(&mut self, encoder: &Encoder) -> Result<*mut AVStream> { let stream = avformat_new_stream(self.ctx, encoder.codec()); if stream.is_null() { bail!("unable to allocate stream"); @@ -89,11 +96,25 @@ impl Muxer { (*stream).avg_frame_rate = (*encoder_ctx).framerate; (*stream).r_frame_rate = (*encoder_ctx).framerate; - Ok(self) + Ok(stream) + } + + /// Add a stream to the output using an existing input stream (copy) + pub unsafe fn add_copy_stream(&mut self, in_stream: *mut AVStream) -> Result<*mut AVStream> { + let stream = avformat_new_stream(self.ctx, ptr::null_mut()); + if stream.is_null() { + bail!("unable to allocate stream"); + } + + // copy params from input + let ret = avcodec_parameters_copy((*stream).codecpar, (*in_stream).codecpar); + bail_ffmpeg!(ret); + + Ok(stream) } /// Open the output to start sending packets - pub unsafe fn open(self) -> Result { + pub unsafe fn open(&mut self) -> Result<()> { if (*(*self.ctx).oformat).flags & AVFMT_NOFILE == 0 { let ret = avio_open(&mut (*self.ctx).pb, (*self.ctx).url, AVIO_FLAG_WRITE); bail_ffmpeg!(ret); @@ -104,16 +125,14 @@ impl Muxer { av_dump_format(self.ctx, 0, (*self.ctx).url, 1); - Ok(self) + Ok(()) } /// Write a packet to the output pub unsafe fn write_packet(&mut self, pkt: *mut AVPacket) -> Result<()> { - assert!((*pkt).stream_index >= 0 && (*pkt).stream_index < (*self.ctx).nb_streams as i32); - assert!((*pkt).time_base.num != 0 && (*pkt).time_base.den != 0); - let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize); av_packet_rescale_ts(pkt, (*pkt).time_base, (*stream).time_base); + (*pkt).time_base = (*stream).time_base; let ret = av_interleaved_write_frame(self.ctx, pkt); bail_ffmpeg!(ret); @@ -138,8 +157,9 @@ mod tests { #[test] fn encode_mkv() -> Result<()> { + std::fs::create_dir_all("test_output")?; unsafe { - let path = PathBuf::from("test.mp4"); + let path = PathBuf::from("test_output/test.mp4"); let frame = generate_test_frame(); // convert frame to YUV @@ -163,9 +183,8 @@ mod tests { let mut muxer = Muxer::new() .with_output(&path, None, None)? - .with_stream_encoder(&encoder)? - .open()?; - + .with_stream_encoder(&encoder)?; + muxer.open()?; let mut pts = 0; for z in 0..100 { (*frame).pts = pts; diff --git a/src/transcode.rs b/src/transcode.rs index 2805081..04b0140 100644 --- a/src/transcode.rs +++ b/src/transcode.rs @@ -1,9 +1,125 @@ -use crate::{Decoder, Demuxer, Encoder, Muxer, Scaler}; +use crate::{Decoder, Demuxer, DemuxerInfo, Encoder, Muxer, Scaler, StreamInfoChannel}; +use anyhow::Result; +use ffmpeg_sys_the_third::{av_frame_free, av_packet_free}; +use std::collections::HashMap; +use std::path::PathBuf; +use std::ptr; +/// A common transcoder task taking an input file +/// and transcoding it to another output path pub struct Transcoder { demuxer: Demuxer, decoder: Decoder, scaler: Scaler, - encoder: Encoder, + encoders: HashMap, + copy_stream: HashMap, muxer: Muxer, } + +impl Transcoder { + pub unsafe fn new(input: &str, output: &str) -> Result { + let muxer = Muxer::new().with_output(&PathBuf::from(output), None, None)?; + + Ok(Self { + demuxer: Demuxer::new(input)?, + decoder: Decoder::new(), + scaler: Scaler::new(), + encoders: HashMap::new(), + copy_stream: HashMap::new(), + muxer, + }) + } + + /// Prepare the transcoder by probing the input + pub unsafe fn prepare(&mut self) -> Result { + self.demuxer.probe_input() + } + + /// Create a transcoded stream in the output given an input stream and + /// a pre-configured output encoder + pub unsafe fn transcode_stream( + &mut self, + in_stream: StreamInfoChannel, + encoder_out: Encoder, + ) -> Result<()> { + let dst_stream = self.muxer.add_stream_encoder(&encoder_out)?; + self.encoders.insert( + in_stream.index as i32, + encoder_out.with_stream_index((*dst_stream).index), + ); + Ok(()) + } + + /// Copy a stream from the input to the output + pub unsafe fn copy_stream(&mut self, in_stream: StreamInfoChannel) -> Result<()> { + let dst_stream = self.muxer.add_copy_stream(in_stream.stream)?; + self.copy_stream + .insert(in_stream.index as i32, (*dst_stream).index); + Ok(()) + } + + /// Process the next packet, called by [run] + unsafe fn next(&mut self) -> Result { + let (mut pkt, stream) = self.demuxer.get_packet()?; + + // flush + if pkt.is_null() { + for (_, enc) in &mut self.encoders { + for mut new_pkt in enc.encode_frame(ptr::null_mut())? { + self.muxer.write_packet(pkt)?; + av_packet_free(&mut new_pkt); + } + } + Ok(true) + } else { + let src_index = (*stream).index; + // check if encoded stream + if let Some(enc) = self.encoders.get_mut(&src_index) { + for mut frame in self.decoder.decode_pkt(pkt, stream)? { + for mut new_pkt in enc.encode_frame(frame)? { + self.muxer.write_packet(new_pkt)?; + av_packet_free(&mut new_pkt); + } + av_frame_free(&mut frame); + } + } else if let Some(dst_stream) = self.copy_stream.get(&src_index) { + // write pkt directly to muxer (re-mux) + (*pkt).stream_index = *dst_stream; + self.muxer.write_packet(pkt)?; + } + + av_packet_free(&mut pkt); + Ok(false) + } + } + + /// Run the transcoder + pub unsafe fn run(mut self) -> Result<()> { + self.muxer.open()?; + while !self.next()? { + // nothing here + } + self.muxer.close()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_remux() -> Result<()> { + unsafe { + std::fs::create_dir_all("test_output")?; + let mut transcoder = + Transcoder::new("test_output/test.mp4", "test_output/test_transcode.mkv")?; + let info = transcoder.prepare()?; + for c in info.channels { + transcoder.copy_stream(c)?; + } + transcoder.run()?; + + Ok(()) + } + } +}