From 9846cac89d2536cbc756628d01c600b95fe0dba3 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 13 Nov 2024 13:32:46 +0000 Subject: [PATCH] feat: custom io mux --- src/demux.rs | 34 +++-- src/lib.rs | 6 +- src/mux.rs | 348 +++++++++++++++++++++++++++++++++++++++++++++++ src/muxer.rs | 203 --------------------------- src/transcode.rs | 11 +- 5 files changed, 378 insertions(+), 224 deletions(-) create mode 100644 src/mux.rs delete mode 100644 src/muxer.rs diff --git a/src/demux.rs b/src/demux.rs index e51325f..49f348a 100644 --- a/src/demux.rs +++ b/src/demux.rs @@ -72,14 +72,18 @@ impl Demuxer { crate::set_opts(self.ctx as *mut libc::c_void, options) } - unsafe fn open_input(&mut self) -> libc::c_int { + unsafe fn open(&mut self) -> Result<()> { match &mut self.input { - DemuxerInput::Url(input) => avformat_open_input( - &mut self.ctx, - cstr!(input), - ptr::null_mut(), - ptr::null_mut(), - ), + DemuxerInput::Url(input) => { + let ret = avformat_open_input( + &mut self.ctx, + cstr!(input.as_str()), + ptr::null_mut(), + ptr::null_mut(), + ); + bail_ffmpeg!(ret); + Ok(()) + } DemuxerInput::Reader(input, url) => { let input = input.take().expect("input stream already taken"); const BUFFER_SIZE: usize = 4096; @@ -92,26 +96,29 @@ impl Demuxer { None, None, ); + if pb.is_null() { + bail!("failed to allocate avio context"); + } (*self.ctx).pb = pb; - avformat_open_input( + let ret = avformat_open_input( &mut self.ctx, if let Some(url) = url { - cstr!(url) + cstr!(url.as_str()) } else { ptr::null_mut() }, ptr::null_mut(), ptr::null_mut(), - ) + ); + bail_ffmpeg!(ret); + Ok(()) } } } pub unsafe fn probe_input(&mut self) -> Result { - let ret = self.open_input(); - bail_ffmpeg!(ret); - + self.open()?; if avformat_find_stream_info(self.ctx, ptr::null_mut()) < 0 { return Err(Error::msg("Could not find stream info")); } @@ -237,7 +244,6 @@ impl Drop for Demuxer { drop(SlimBox::::from_raw((*(*self.ctx).pb).opaque)); } avformat_free_context(self.ctx); - self.ctx = ptr::null_mut(); } } } diff --git a/src/lib.rs b/src/lib.rs index 98cc6e7..932a514 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ mod decode; mod demux; mod encode; mod filter; -mod muxer; +mod mux; mod resample; mod scale; mod stream_info; @@ -34,7 +34,7 @@ macro_rules! bail_ffmpeg { #[macro_export] macro_rules! cstr { ($str:expr) => { - format!("{}\0", $str).as_ptr() as *const libc::c_char + std::ffi::CString::new($str).unwrap().into_raw() }; } @@ -166,7 +166,7 @@ pub use demux::*; pub use encode::*; pub use ffmpeg_sys_the_third; pub use filter::*; -pub use muxer::*; +pub use mux::*; pub use resample::*; pub use scale::*; pub use stream_info::*; diff --git a/src/mux.rs b/src/mux.rs new file mode 100644 index 0000000..45e4885 --- /dev/null +++ b/src/mux.rs @@ -0,0 +1,348 @@ +use crate::{bail_ffmpeg, cstr, set_opts, Encoder}; +use anyhow::{bail, Result}; +use ffmpeg_sys_the_third::{ + av_dump_format, av_interleaved_write_frame, av_mallocz, av_packet_rescale_ts, av_write_trailer, + avcodec_parameters_copy, avcodec_parameters_from_context, avformat_alloc_output_context2, + avformat_free_context, avformat_new_stream, avformat_write_header, avio_alloc_context, + avio_open, AVFormatContext, AVPacket, AVStream, AVERROR_EOF, AVFMT_GLOBALHEADER, AVFMT_NOFILE, + AVIO_FLAG_WRITE, AV_CODEC_FLAG_GLOBAL_HEADER, +}; +use slimbox::{slimbox_unsize, SlimBox, SlimMut}; +use std::collections::HashMap; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::{ptr, slice}; + +#[no_mangle] +unsafe extern "C" fn write_data( + opaque: *mut libc::c_void, + buffer: *const u8, + size: libc::c_int, +) -> libc::c_int { + let mut writer: SlimMut<'_, dyn WriteSeek + 'static> = SlimMut::from_raw(opaque); + let data = slice::from_raw_parts(buffer, size as usize); + match writer.write_all(data) { + Ok(_) => size, + Err(e) => { + eprintln!("write_data {}", e); + AVERROR_EOF + } + } +} + +#[no_mangle] +unsafe extern "C" fn seek_data(opaque: *mut libc::c_void, offset: i64, whence: libc::c_int) -> i64 { + let mut writer: SlimMut<'_, dyn WriteSeek + 'static> = SlimMut::from_raw(opaque); + match whence { + libc::SEEK_SET => writer.seek(SeekFrom::Start(offset as u64)).unwrap_or(0) as i64, + libc::SEEK_CUR => writer.seek(SeekFrom::Current(offset)).unwrap_or(0) as i64, + libc::SEEK_END => writer.seek(SeekFrom::End(offset)).unwrap_or(0) as i64, + _ => panic!("seek_data not supported from whence {}", whence), + } +} + +pub struct Muxer { + ctx: *mut AVFormatContext, + output: MuxerOutput, +} + +pub trait WriteSeek: Seek + Write {} +impl WriteSeek for T {} + +pub enum MuxerOutput { + Url(String), + Writer(Option>), +} + +impl Drop for Muxer { + fn drop(&mut self) { + unsafe { + if !self.ctx.is_null() { + if let MuxerOutput::Writer(_) = self.output { + drop(SlimBox::::from_raw((*(*self.ctx).pb).opaque)); + } + avformat_free_context(self.ctx); + } + } + } +} + +pub struct MuxerBuilder { + value: Muxer, +} + +impl MuxerBuilder { + pub fn new() -> Self { + Self { + value: Muxer { + ctx: ptr::null_mut(), + output: MuxerOutput::Url(String::new()), + }, + } + } + + unsafe fn init_ctx( + &mut self, + dst: &str, + format: Option<&str>, + options: Option>, + ) -> Result<()> { + if !self.value.ctx.is_null() { + bail!("context already open"); + } + + let ret = avformat_alloc_output_context2( + &mut self.value.ctx, + ptr::null_mut(), + if let Some(format) = format { + cstr!(format) + } else { + ptr::null() + }, + cstr!(dst), + ); + bail_ffmpeg!(ret); + + // Setup global header flag + if (*(*self.value.ctx).oformat).flags & AVFMT_GLOBALHEADER != 0 { + (*self.value.ctx).flags |= AV_CODEC_FLAG_GLOBAL_HEADER as libc::c_int; + } + + // Set options on ctx + if let Some(opts) = options { + set_opts((*self.value.ctx).priv_data, opts)?; + } + Ok(()) + } + + /// Open the muxer with a destination path + pub unsafe fn with_output_path<'a, T>( + mut self, + dst: T, + format: Option<&'a str>, + options: Option>, + ) -> Result + where + T: Into<&'a str>, + { + let path_str = dst.into(); + self.init_ctx(path_str, format, options)?; + self.value.output = MuxerOutput::Url(path_str.to_string()); + Ok(self) + } + + pub unsafe fn with_output_writer<'a, T, W>( + mut self, + writer: W, + dst: T, + format: Option<&'a str>, + options: Option>, + ) -> Result + where + T: Into<&'a str>, + W: WriteSeek + 'static, + { + self.init_ctx(dst.into(), format, options)?; + self.value.output = MuxerOutput::Writer(Some(slimbox_unsize!(writer))); + Ok(self) + } + + /// Add a stream to the output using an existing encoder + pub unsafe fn with_stream_encoder(mut self, encoder: &Encoder) -> Result { + self.value.add_stream_encoder(encoder)?; + Ok(self) + } + + /// Add a stream to the output using an existing input stream (copy) + pub unsafe fn with_copy_stream(mut self, in_stream: *mut AVStream) -> Result { + self.value.add_copy_stream(in_stream)?; + Ok(self) + } + + /// Build the muxer + pub fn build(self) -> Result { + if self.value.ctx.is_null() { + bail!("context is null"); + } + Ok(self.value) + } +} + +impl Muxer { + pub fn builder() -> MuxerBuilder { + MuxerBuilder::new() + } + + /// Add a stream to the output using an existing encoder + pub unsafe fn add_stream_encoder(&mut self, encoder: &Encoder) -> Result<*mut AVStream> { + let stream = avformat_new_stream(self.ctx, encoder.codec()); + if stream.is_null() { + bail!("unable to allocate stream"); + } + let ret = avcodec_parameters_from_context((*stream).codecpar, encoder.codec_context()); + bail_ffmpeg!(ret); + + // setup other stream params + let encoder_ctx = encoder.codec_context(); + (*stream).time_base = (*encoder_ctx).time_base; + (*stream).avg_frame_rate = (*encoder_ctx).framerate; + (*stream).r_frame_rate = (*encoder_ctx).framerate; + + Ok(stream) + } + + /// Add a stream to the output using an existing input stream (copy) + pub unsafe fn add_copy_stream(&mut self, in_stream: *mut AVStream) -> Result<*mut AVStream> { + let stream = avformat_new_stream(self.ctx, ptr::null_mut()); + if stream.is_null() { + bail!("unable to allocate stream"); + } + + // copy params from input + let ret = avcodec_parameters_copy((*stream).codecpar, (*in_stream).codecpar); + bail_ffmpeg!(ret); + + Ok(stream) + } + + /// Open the output to start sending packets + pub unsafe fn open(&mut self) -> Result<()> { + if (*(*self.ctx).oformat).flags & AVFMT_NOFILE == 0 { + if let MuxerOutput::Writer(w) = &mut self.output { + let writer = w.take().expect("writer already consumed"); + const BUFFER_SIZE: usize = 1024; + let pb = avio_alloc_context( + av_mallocz(BUFFER_SIZE) as *mut _, + BUFFER_SIZE as _, + 1, + writer.into_raw(), + None, + Some(write_data), + Some(seek_data), + ); + if pb.is_null() { + bail!("failed to allocate avio context"); + } + (*self.ctx).pb = pb; + } else { + let ret = avio_open(&mut (*self.ctx).pb, (*self.ctx).url, AVIO_FLAG_WRITE); + bail_ffmpeg!(ret); + } + } + + let ret = avformat_write_header(self.ctx, ptr::null_mut()); + bail_ffmpeg!(ret); + + av_dump_format(self.ctx, 0, (*self.ctx).url, 1); + + Ok(()) + } + + /// Write a packet to the output + pub unsafe fn write_packet(&mut self, pkt: *mut AVPacket) -> Result<()> { + let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize); + av_packet_rescale_ts(pkt, (*pkt).time_base, (*stream).time_base); + (*pkt).time_base = (*stream).time_base; + + let ret = av_interleaved_write_frame(self.ctx, pkt); + bail_ffmpeg!(ret); + Ok(()) + } + + /// Close the output and write the trailer + pub unsafe fn close(self) -> Result<()> { + let ret = av_write_trailer(self.ctx); + bail_ffmpeg!(ret); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{generate_test_frame, Scaler}; + use ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; + use ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; + use ffmpeg_sys_the_third::{AVFrame, AV_PROFILE_H264_MAIN}; + + unsafe fn setup_encoder() -> Result<(*mut AVFrame, Encoder)> { + std::fs::create_dir_all("test_output")?; + let path = PathBuf::from("test_output/test.mp4"); + let frame = generate_test_frame(); + + // convert frame to YUV + let mut scaler = Scaler::new(); + let frame = scaler.process_frame( + frame, + (*frame).width as u16, + (*frame).height as u16, + AV_PIX_FMT_YUV420P, + )?; + + let mut encoder = Encoder::new(AV_CODEC_ID_H264)? + .with_width((*frame).width) + .with_height((*frame).height) + .with_pix_fmt(AV_PIX_FMT_YUV420P) + .with_bitrate(1_000_000) + .with_framerate(30.0) + .with_profile(AV_PROFILE_H264_MAIN) + .with_level(50) + .open(None)?; + Ok((frame, encoder)) + } + + unsafe fn write_frames( + mut muxer: Muxer, + mut encoder: Encoder, + frame: *mut AVFrame, + ) -> Result<()> { + let mut pts = 0; + for _z in 0..100 { + (*frame).pts = pts; + for pkt in encoder.encode_frame(frame)? { + muxer.write_packet(pkt)?; + } + pts += 1; + } + // flush + for f_pk in encoder.encode_frame(ptr::null_mut())? { + muxer.write_packet(f_pk)?; + } + muxer.close()?; + Ok(()) + } + + #[test] + fn encode_mkv() -> Result<()> { + std::fs::create_dir_all("test_output")?; + unsafe { + let path = PathBuf::from("test_output/test_muxer.mp4"); + let (frame, encoder) = setup_encoder()?; + + let mut muxer = Muxer::builder() + .with_output_path(path.to_str().unwrap(), None, None)? + .with_stream_encoder(&encoder)? + .build()?; + muxer.open()?; + write_frames(muxer, encoder, frame)?; + } + Ok(()) + } + + #[test] + fn encode_custom_io() -> Result<()> { + std::fs::create_dir_all("test_output")?; + unsafe { + let path = PathBuf::from("test_output/test_custom_muxer.mp4"); + let (frame, encoder) = setup_encoder()?; + + let fout = std::fs::File::create(path)?; + let mut muxer = Muxer::builder() + .with_output_writer(fout, "custom.mp4", None, None)? + .with_stream_encoder(&encoder)? + .build()?; + muxer.open()?; + write_frames(muxer, encoder, frame)?; + } + Ok(()) + } +} diff --git a/src/muxer.rs b/src/muxer.rs deleted file mode 100644 index 76ebc04..0000000 --- a/src/muxer.rs +++ /dev/null @@ -1,203 +0,0 @@ -use crate::{bail_ffmpeg, cstr, set_opts, Encoder}; -use anyhow::{bail, Result}; -use ffmpeg_sys_the_third::{ - av_dump_format, av_interleaved_write_frame, av_packet_rescale_ts, av_write_trailer, - avcodec_parameters_copy, avcodec_parameters_from_context, avformat_alloc_output_context2, - avformat_free_context, avformat_new_stream, avformat_write_header, avio_open, AVFormatContext, - AVPacket, AVStream, AVFMT_GLOBALHEADER, AVFMT_NOFILE, AVIO_FLAG_WRITE, - AV_CODEC_FLAG_GLOBAL_HEADER, -}; -use std::collections::HashMap; -use std::path::PathBuf; -use std::ptr; - -pub struct Muxer { - ctx: *mut AVFormatContext, -} - -impl Drop for Muxer { - fn drop(&mut self) { - unsafe { - if !self.ctx.is_null() { - avformat_free_context(self.ctx); - } - } - } -} - -impl Default for Muxer { - fn default() -> Self { - Self::new() - } -} - -impl Muxer { - pub fn new() -> Self { - Self { - ctx: ptr::null_mut(), - } - } - - /// Open the muxer with a destination path - pub unsafe fn with_output( - mut self, - dst: &PathBuf, - format: Option<&str>, - options: Option>, - ) -> Result { - if !self.ctx.is_null() { - bail!("context already open"); - } - - let ret = avformat_alloc_output_context2( - &mut self.ctx, - ptr::null_mut(), - if let Some(ref format) = format { - cstr!(format) - } else { - ptr::null() - }, - cstr!(dst.to_str().unwrap()), - ); - bail_ffmpeg!(ret); - - // Setup global header flag - if (*(*self.ctx).oformat).flags & AVFMT_GLOBALHEADER != 0 { - (*self.ctx).flags |= AV_CODEC_FLAG_GLOBAL_HEADER as libc::c_int; - } - - // Set options on ctx - if let Some(opts) = options { - set_opts((*self.ctx).priv_data, opts)?; - } - - Ok(self) - } - - /// Add a stream to the output using an existing encoder - pub unsafe fn with_stream_encoder(mut self, encoder: &Encoder) -> Result { - self.add_stream_encoder(encoder)?; - Ok(self) - } - - /// Add a stream to the output using an existing encoder - pub unsafe fn add_stream_encoder(&mut self, encoder: &Encoder) -> Result<*mut AVStream> { - let stream = avformat_new_stream(self.ctx, encoder.codec()); - if stream.is_null() { - bail!("unable to allocate stream"); - } - let ret = avcodec_parameters_from_context((*stream).codecpar, encoder.codec_context()); - bail_ffmpeg!(ret); - - // setup other stream params - let encoder_ctx = encoder.codec_context(); - (*stream).time_base = (*encoder_ctx).time_base; - (*stream).avg_frame_rate = (*encoder_ctx).framerate; - (*stream).r_frame_rate = (*encoder_ctx).framerate; - - Ok(stream) - } - - /// Add a stream to the output using an existing input stream (copy) - pub unsafe fn add_copy_stream(&mut self, in_stream: *mut AVStream) -> Result<*mut AVStream> { - let stream = avformat_new_stream(self.ctx, ptr::null_mut()); - if stream.is_null() { - bail!("unable to allocate stream"); - } - - // copy params from input - let ret = avcodec_parameters_copy((*stream).codecpar, (*in_stream).codecpar); - bail_ffmpeg!(ret); - - Ok(stream) - } - - /// Open the output to start sending packets - pub unsafe fn open(&mut self) -> Result<()> { - if (*(*self.ctx).oformat).flags & AVFMT_NOFILE == 0 { - let ret = avio_open(&mut (*self.ctx).pb, (*self.ctx).url, AVIO_FLAG_WRITE); - bail_ffmpeg!(ret); - } - - let ret = avformat_write_header(self.ctx, ptr::null_mut()); - bail_ffmpeg!(ret); - - av_dump_format(self.ctx, 0, (*self.ctx).url, 1); - - Ok(()) - } - - /// Write a packet to the output - pub unsafe fn write_packet(&mut self, pkt: *mut AVPacket) -> Result<()> { - let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize); - av_packet_rescale_ts(pkt, (*pkt).time_base, (*stream).time_base); - (*pkt).time_base = (*stream).time_base; - - let ret = av_interleaved_write_frame(self.ctx, pkt); - bail_ffmpeg!(ret); - Ok(()) - } - - /// Close the output and write the trailer - pub unsafe fn close(self) -> Result<()> { - let ret = av_write_trailer(self.ctx); - bail_ffmpeg!(ret); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{generate_test_frame, Scaler}; - use ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; - use ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; - use ffmpeg_sys_the_third::AV_PROFILE_H264_MAIN; - - #[test] - fn encode_mkv() -> Result<()> { - std::fs::create_dir_all("test_output")?; - unsafe { - let path = PathBuf::from("test_output/test.mp4"); - let frame = generate_test_frame(); - - // convert frame to YUV - let mut scaler = Scaler::new(); - let frame = scaler.process_frame( - frame, - (*frame).width as u16, - (*frame).height as u16, - AV_PIX_FMT_YUV420P, - )?; - - let mut encoder = Encoder::new(AV_CODEC_ID_H264)? - .with_width((*frame).width) - .with_height((*frame).height) - .with_pix_fmt(AV_PIX_FMT_YUV420P) - .with_bitrate(1_000_000) - .with_framerate(30.0) - .with_profile(AV_PROFILE_H264_MAIN) - .with_level(50) - .open(None)?; - - let mut muxer = Muxer::new() - .with_output(&path, None, None)? - .with_stream_encoder(&encoder)?; - muxer.open()?; - let mut pts = 0; - for _z in 0..100 { - (*frame).pts = pts; - for pkt in encoder.encode_frame(frame)? { - muxer.write_packet(pkt)?; - } - pts += 1; - } - // flush - for f_pk in encoder.encode_frame(ptr::null_mut())? { - muxer.write_packet(f_pk)?; - } - muxer.close()?; - } - Ok(()) - } -} diff --git a/src/transcode.rs b/src/transcode.rs index 8c8942d..579418e 100644 --- a/src/transcode.rs +++ b/src/transcode.rs @@ -4,7 +4,6 @@ use crate::{ use anyhow::Result; use ffmpeg_sys_the_third::{av_frame_free, av_packet_free}; use std::collections::HashMap; -use std::path::PathBuf; use std::ptr; /// A common transcoder task taking an input file @@ -21,7 +20,9 @@ pub struct Transcoder { impl Transcoder { pub unsafe fn new(input: &str, output: &str) -> Result { - let muxer = Muxer::new().with_output(&PathBuf::from(output), None, None)?; + let muxer = Muxer::builder() + .with_output_path(output, None, None)? + .build()?; Ok(Self { demuxer: Demuxer::new(input)?, @@ -168,8 +169,10 @@ mod tests { fn test_remux() -> Result<()> { unsafe { std::fs::create_dir_all("test_output")?; - let mut transcoder = - Transcoder::new("test_output/test.mp4", "test_output/test_transcode.mkv")?; + let mut transcoder = Transcoder::new( + "test_output/test_muxer.mp4", + "test_output/test_transcode.mkv", + )?; let info = transcoder.prepare()?; for c in info.streams { transcoder.copy_stream(c)?;