feat: custom muxer io (no seek)

This commit is contained in:
kieran 2024-11-13 15:39:13 +00:00
parent 9846cac89d
commit 6d443a24bc
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
3 changed files with 126 additions and 46 deletions

View File

@ -241,6 +241,7 @@ impl Drop for Demuxer {
if !self.ctx.is_null() { if !self.ctx.is_null() {
unsafe { unsafe {
if let DemuxerInput::Reader(_, _) = self.input { if let DemuxerInput::Reader(_, _) = self.input {
av_free((*(*self.ctx).pb).buffer as *mut _);
drop(SlimBox::<dyn Read>::from_raw((*(*self.ctx).pb).opaque)); drop(SlimBox::<dyn Read>::from_raw((*(*self.ctx).pb).opaque));
} }
avformat_free_context(self.ctx); avformat_free_context(self.ctx);

View File

@ -49,6 +49,8 @@ macro_rules! rstr {
}; };
} }
pub(crate) const AVIO_BUFFER_SIZE: usize = 4096;
fn get_ffmpeg_error_msg(ret: libc::c_int) -> String { fn get_ffmpeg_error_msg(ret: libc::c_int) -> String {
unsafe { unsafe {
const BUF_SIZE: usize = 512; const BUF_SIZE: usize = 512;

View File

@ -1,24 +1,27 @@
use crate::{bail_ffmpeg, cstr, set_opts, Encoder}; use crate::{bail_ffmpeg, cstr, set_opts, Encoder, AVIO_BUFFER_SIZE};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use ffmpeg_sys_the_third::{ use ffmpeg_sys_the_third::{
av_dump_format, av_interleaved_write_frame, av_mallocz, av_packet_rescale_ts, av_write_trailer, av_dump_format, av_free, av_interleaved_write_frame, av_mallocz, av_packet_rescale_ts,
avcodec_parameters_copy, avcodec_parameters_from_context, avformat_alloc_output_context2, av_write_trailer, avcodec_parameters_copy, avcodec_parameters_from_context,
avformat_free_context, avformat_new_stream, avformat_write_header, avio_alloc_context, avformat_alloc_output_context2, avformat_free_context, avformat_new_stream,
avio_open, AVFormatContext, AVPacket, AVStream, AVERROR_EOF, AVFMT_GLOBALHEADER, AVFMT_NOFILE, avformat_write_header, avio_alloc_context, avio_open, AVFormatContext, AVIOContext, AVPacket,
AVIO_FLAG_WRITE, AV_CODEC_FLAG_GLOBAL_HEADER, AVStream, AVERROR_EOF, AVFMT_GLOBALHEADER, AVFMT_NOFILE, AVIO_FLAG_DIRECT, AVIO_FLAG_WRITE,
AV_CODEC_FLAG_GLOBAL_HEADER,
}; };
use slimbox::{slimbox_unsize, SlimBox, SlimMut}; use slimbox::{slimbox_unsize, SlimBox, SlimMut};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom, Write}; use std::io::{Read, Seek, SeekFrom, Write};
use std::{ptr, slice}; use std::{ptr, slice};
#[no_mangle] unsafe extern "C" fn write_data<T>(
unsafe extern "C" fn write_data(
opaque: *mut libc::c_void, opaque: *mut libc::c_void,
buffer: *const u8, buffer: *const u8,
size: libc::c_int, size: libc::c_int,
) -> libc::c_int { ) -> libc::c_int
let mut writer: SlimMut<'_, dyn WriteSeek + 'static> = SlimMut::from_raw(opaque); where
T: Write + 'static + ?Sized,
{
let mut writer: SlimMut<'_, T> = SlimMut::from_raw(opaque);
let data = slice::from_raw_parts(buffer, size as usize); let data = slice::from_raw_parts(buffer, size as usize);
match writer.write_all(data) { match writer.write_all(data) {
Ok(_) => size, Ok(_) => size,
@ -29,7 +32,6 @@ unsafe extern "C" fn write_data(
} }
} }
#[no_mangle]
unsafe extern "C" fn seek_data(opaque: *mut libc::c_void, offset: i64, whence: libc::c_int) -> i64 { unsafe extern "C" fn seek_data(opaque: *mut libc::c_void, offset: i64, whence: libc::c_int) -> i64 {
let mut writer: SlimMut<'_, dyn WriteSeek + 'static> = SlimMut::from_raw(opaque); let mut writer: SlimMut<'_, dyn WriteSeek + 'static> = SlimMut::from_raw(opaque);
match whence { match whence {
@ -50,18 +52,52 @@ impl<T: Seek + Write> WriteSeek for T {}
pub enum MuxerOutput { pub enum MuxerOutput {
Url(String), Url(String),
Writer(Option<SlimBox<dyn WriteSeek + 'static>>), WriterSeeker(Option<SlimBox<dyn WriteSeek + 'static>>),
Writer(Option<SlimBox<dyn Write + 'static>>),
} }
impl Drop for Muxer { impl TryInto<*mut AVIOContext> for &mut MuxerOutput {
fn drop(&mut self) { type Error = anyhow::Error;
fn try_into(self) -> Result<*mut AVIOContext, Self::Error> {
unsafe { unsafe {
if !self.ctx.is_null() { Ok(match self {
if let MuxerOutput::Writer(_) = self.output { MuxerOutput::Writer(ref mut w) => {
drop(SlimBox::<dyn Read>::from_raw((*(*self.ctx).pb).opaque)); let writer = w.take().expect("writer already consumed");
let pb = avio_alloc_context(
av_mallocz(AVIO_BUFFER_SIZE) as *mut _,
AVIO_BUFFER_SIZE as _,
1,
writer.into_raw(),
None,
Some(write_data::<dyn Write + 'static>),
None,
);
if pb.is_null() {
bail!("failed to allocate AVIO from MuxerOutput");
}
pb
} }
avformat_free_context(self.ctx); MuxerOutput::WriterSeeker(ref mut w) => {
} let writer = w.take().expect("writer already consumed");
let pb = avio_alloc_context(
av_mallocz(AVIO_BUFFER_SIZE) as *mut _,
AVIO_BUFFER_SIZE as _,
1,
writer.into_raw(),
None,
Some(write_data::<dyn WriteSeek + 'static>),
Some(seek_data),
);
if pb.is_null() {
bail!("failed to allocate AVIO from MuxerOutput");
}
pb
}
MuxerOutput::Url(_) => ptr::null_mut(),
})
} }
} }
} }
@ -82,7 +118,7 @@ impl MuxerBuilder {
unsafe fn init_ctx( unsafe fn init_ctx(
&mut self, &mut self,
dst: &str, dst: Option<&str>,
format: Option<&str>, format: Option<&str>,
options: Option<HashMap<String, String>>, options: Option<HashMap<String, String>>,
) -> Result<()> { ) -> Result<()> {
@ -98,7 +134,11 @@ impl MuxerBuilder {
} else { } else {
ptr::null() ptr::null()
}, },
cstr!(dst), if let Some(dst) = dst {
cstr!(dst)
} else {
ptr::null()
},
); );
bail_ffmpeg!(ret); bail_ffmpeg!(ret);
@ -125,23 +165,38 @@ impl MuxerBuilder {
T: Into<&'a str>, T: Into<&'a str>,
{ {
let path_str = dst.into(); let path_str = dst.into();
self.init_ctx(path_str, format, options)?; self.init_ctx(Some(path_str), format, options)?;
self.value.output = MuxerOutput::Url(path_str.to_string()); self.value.output = MuxerOutput::Url(path_str.to_string());
Ok(self) Ok(self)
} }
pub unsafe fn with_output_writer<'a, T, W>( /// Create a muxer using a custom IO context
/// This impl requires [Seek] trait as some muxers need seek support
pub unsafe fn with_output_write_seek<W>(
mut self, mut self,
writer: W, writer: W,
dst: T, format: Option<&str>,
format: Option<&'a str>,
options: Option<HashMap<String, String>>, options: Option<HashMap<String, String>>,
) -> Result<Self> ) -> Result<Self>
where where
T: Into<&'a str>,
W: WriteSeek + 'static, W: WriteSeek + 'static,
{ {
self.init_ctx(dst.into(), format, options)?; self.init_ctx(None, format, options)?;
self.value.output = MuxerOutput::WriterSeeker(Some(slimbox_unsize!(writer)));
Ok(self)
}
/// Create a muxer using a custom IO context
pub unsafe fn with_output_write<W>(
mut self,
writer: W,
format: Option<&str>,
options: Option<HashMap<String, String>>,
) -> Result<Self>
where
W: Write + 'static,
{
self.init_ctx(None, format, options)?;
self.value.output = MuxerOutput::Writer(Some(slimbox_unsize!(writer))); self.value.output = MuxerOutput::Writer(Some(slimbox_unsize!(writer)));
Ok(self) Ok(self)
} }
@ -207,25 +262,14 @@ impl Muxer {
/// Open the output to start sending packets /// Open the output to start sending packets
pub unsafe fn open(&mut self) -> Result<()> { pub unsafe fn open(&mut self) -> Result<()> {
if (*(*self.ctx).oformat).flags & AVFMT_NOFILE == 0 { if (*(*self.ctx).oformat).flags & AVFMT_NOFILE == 0 {
if let MuxerOutput::Writer(w) = &mut self.output { (*self.ctx).pb = (&mut self.output).try_into()?;
let writer = w.take().expect("writer already consumed"); // if pb is still null, open with ctx.url
const BUFFER_SIZE: usize = 1024; if (*self.ctx).pb.is_null() {
let pb = avio_alloc_context(
av_mallocz(BUFFER_SIZE) as *mut _,
BUFFER_SIZE as _,
1,
writer.into_raw(),
None,
Some(write_data),
Some(seek_data),
);
if pb.is_null() {
bail!("failed to allocate avio context");
}
(*self.ctx).pb = pb;
} else {
let ret = avio_open(&mut (*self.ctx).pb, (*self.ctx).url, AVIO_FLAG_WRITE); let ret = avio_open(&mut (*self.ctx).pb, (*self.ctx).url, AVIO_FLAG_WRITE);
bail_ffmpeg!(ret); bail_ffmpeg!(ret);
} else {
// Don't write buffer, just let the handler functions write directly
(*self.ctx).flags |= AVIO_FLAG_DIRECT;
} }
} }
@ -256,6 +300,20 @@ impl Muxer {
} }
} }
impl Drop for Muxer {
fn drop(&mut self) {
unsafe {
if !self.ctx.is_null() {
if let MuxerOutput::Writer(_) = self.output {
av_free((*(*self.ctx).pb).buffer as *mut _);
drop(SlimBox::<dyn Read>::from_raw((*(*self.ctx).pb).opaque));
}
avformat_free_context(self.ctx);
}
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -263,6 +321,7 @@ mod tests {
use ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_sys_the_third::{AVFrame, AV_PROFILE_H264_MAIN}; use ffmpeg_sys_the_third::{AVFrame, AV_PROFILE_H264_MAIN};
use std::path::PathBuf;
unsafe fn setup_encoder() -> Result<(*mut AVFrame, Encoder)> { unsafe fn setup_encoder() -> Result<(*mut AVFrame, Encoder)> {
std::fs::create_dir_all("test_output")?; std::fs::create_dir_all("test_output")?;
@ -278,7 +337,7 @@ mod tests {
AV_PIX_FMT_YUV420P, AV_PIX_FMT_YUV420P,
)?; )?;
let mut encoder = Encoder::new(AV_CODEC_ID_H264)? let encoder = Encoder::new(AV_CODEC_ID_H264)?
.with_width((*frame).width) .with_width((*frame).width)
.with_height((*frame).height) .with_height((*frame).height)
.with_pix_fmt(AV_PIX_FMT_YUV420P) .with_pix_fmt(AV_PIX_FMT_YUV420P)
@ -337,7 +396,25 @@ mod tests {
let fout = std::fs::File::create(path)?; let fout = std::fs::File::create(path)?;
let mut muxer = Muxer::builder() let mut muxer = Muxer::builder()
.with_output_writer(fout, "custom.mp4", None, None)? .with_output_write_seek(fout, Some("mp4"), None)?
.with_stream_encoder(&encoder)?
.build()?;
muxer.open()?;
write_frames(muxer, encoder, frame)?;
}
Ok(())
}
#[test]
fn encode_custom_io_non_seek() -> Result<()> {
std::fs::create_dir_all("test_output")?;
unsafe {
let path = PathBuf::from("test_output/test_custom_muxer_no_seek.ts");
let (frame, encoder) = setup_encoder()?;
let fout = std::fs::File::create(path)?;
let mut muxer = Muxer::builder()
.with_output_write(fout, Some("mpegts"), None)?
.with_stream_encoder(&encoder)? .with_stream_encoder(&encoder)?
.build()?; .build()?;
muxer.open()?; muxer.open()?;