Compare commits

...

10 Commits

Author SHA1 Message Date
a63b88ef3c
fix: leaky demuxer file handles 2025-01-27 22:50:43 +00:00
128f204253
fix: manually free demuxer io 2025-01-27 22:26:03 +00:00
b8d45e8289
fix: return stream pointer in decoder 2025-01-21 11:10:46 +00:00
de2050cec0
feat: transcode custom io 2024-12-21 19:35:00 +00:00
76333375d8
feat: add format to demuxer info 2024-12-16 17:02:55 +00:00
b358b3e420
fix: mux build ff_api* 2024-12-11 10:44:17 +00:00
df69b2f05d
fix: demuxer cleanup 2024-11-29 23:40:10 +00:00
b1f054d7d1
fix: muxer cleanup 2024-11-29 23:31:42 +00:00
464ec32898
fix: free muxer context on reset 2024-11-29 23:12:07 +00:00
8e102423d4
fix: demuxer read 2024-11-22 13:19:48 +00:00
10 changed files with 134 additions and 48 deletions

12
Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 4
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
@ -181,6 +181,7 @@ dependencies = [
"ffmpeg-sys-the-third", "ffmpeg-sys-the-third",
"libc", "libc",
"log", "log",
"rlimit",
"slimbox", "slimbox",
] ]
@ -333,6 +334,15 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rlimit"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "rustc-hash" name = "rustc-hash"
version = "1.1.0" version = "1.1.0"

View File

@ -17,3 +17,4 @@ log = "0.4.22"
[dev-dependencies] [dev-dependencies]
env_logger = "0.11.5" env_logger = "0.11.5"
rlimit = "0.10.2"

View File

@ -67,7 +67,7 @@ unsafe fn loop_decoder(mut demuxer: Demuxer, mut decoder: Decoder) {
continue; continue;
} }
if let Ok(frames) = decoder.decode_pkt(pkt) { if let Ok(frames) = decoder.decode_pkt(pkt) {
for mut frame in frames { for (mut frame, _stream) in frames {
// do nothing but decode entire stream // do nothing but decode entire stream
if media_type == AVMediaType::AVMEDIA_TYPE_VIDEO { if media_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
frame = get_frame_from_hw(frame).expect("get frame failed"); frame = get_frame_from_hw(frame).expect("get frame failed");

View File

@ -237,25 +237,25 @@ impl Decoder {
} }
/// Flush all decoders /// Flush all decoders
pub unsafe fn flush(&mut self) -> Result<Vec<*mut AVFrame>, Error> { pub unsafe fn flush(&mut self) -> Result<Vec<(*mut AVFrame, *mut AVStream)>, Error> {
let mut pkgs = Vec::new(); let mut pkgs = Vec::new();
for ctx in self.codecs.values_mut() { for ctx in self.codecs.values_mut() {
pkgs.extend(Self::decode_pkt_internal(ctx.context, ptr::null_mut())?); pkgs.extend(Self::decode_pkt_internal(ctx, ptr::null_mut())?);
} }
Ok(pkgs) Ok(pkgs)
} }
pub unsafe fn decode_pkt_internal( pub unsafe fn decode_pkt_internal(
ctx: *mut AVCodecContext, ctx: &DecoderCodecContext,
pkt: *mut AVPacket, pkt: *mut AVPacket,
) -> Result<Vec<*mut AVFrame>, Error> { ) -> Result<Vec<(*mut AVFrame, *mut AVStream)>, Error> {
let mut ret = avcodec_send_packet(ctx, pkt); let mut ret = avcodec_send_packet(ctx.context, pkt);
bail_ffmpeg!(ret, "Failed to decode packet"); bail_ffmpeg!(ret, "Failed to decode packet");
let mut pkgs = Vec::new(); let mut pkgs = Vec::new();
while ret >= 0 { while ret >= 0 {
let mut frame = av_frame_alloc(); let mut frame = av_frame_alloc();
ret = avcodec_receive_frame(ctx, frame); ret = avcodec_receive_frame(ctx.context, frame);
if ret < 0 { if ret < 0 {
av_frame_free(&mut frame); av_frame_free(&mut frame);
if ret == AVERROR_EOF || ret == AVERROR(libc::EAGAIN) { if ret == AVERROR_EOF || ret == AVERROR(libc::EAGAIN) {
@ -263,17 +263,20 @@ impl Decoder {
} }
return Err(Error::msg(format!("Failed to decode {}", ret))); return Err(Error::msg(format!("Failed to decode {}", ret)));
} }
pkgs.push(frame); pkgs.push((frame, ctx.stream));
} }
Ok(pkgs) Ok(pkgs)
} }
pub unsafe fn decode_pkt(&mut self, pkt: *mut AVPacket) -> Result<Vec<*mut AVFrame>, Error> { pub unsafe fn decode_pkt(
&mut self,
pkt: *mut AVPacket,
) -> Result<Vec<(*mut AVFrame, *mut AVStream)>, Error> {
if pkt.is_null() { if pkt.is_null() {
return self.flush(); return self.flush();
} }
if let Some(ctx) = self.codecs.get_mut(&(*pkt).stream_index) { if let Some(ctx) = self.codecs.get_mut(&(*pkt).stream_index) {
Self::decode_pkt_internal(ctx.context, pkt) Self::decode_pkt_internal(ctx, pkt)
} else { } else {
Ok(vec![]) Ok(vec![])
} }

View File

@ -20,8 +20,8 @@ unsafe extern "C" fn read_data(
) -> libc::c_int { ) -> libc::c_int {
let mut buffer: SlimMut<'_, dyn Read + 'static> = SlimMut::from_raw(opaque); let mut buffer: SlimMut<'_, dyn Read + 'static> = SlimMut::from_raw(opaque);
let dst_slice: &mut [u8] = slice::from_raw_parts_mut(dst_buffer, size as usize); let dst_slice: &mut [u8] = slice::from_raw_parts_mut(dst_buffer, size as usize);
match buffer.read_exact(dst_slice) { match buffer.read(dst_slice) {
Ok(_) => size, Ok(r) => r as libc::c_int,
Err(e) => { Err(e) => {
eprintln!("read_data {}", e); eprintln!("read_data {}", e);
AVERROR_EOF AVERROR_EOF
@ -31,7 +31,7 @@ unsafe extern "C" fn read_data(
pub enum DemuxerInput { pub enum DemuxerInput {
Url(String), Url(String),
Reader(Option<SlimBox<dyn Read + 'static>>, Option<String>), Reader(Option<SlimBox<dyn Read>>, Option<String>),
} }
pub struct Demuxer { pub struct Demuxer {
@ -221,6 +221,8 @@ impl Demuxer {
let info = DemuxerInfo { let info = DemuxerInfo {
duration: (*self.ctx).duration as f32 / AV_TIME_BASE as f32, duration: (*self.ctx).duration as f32 / AV_TIME_BASE as f32,
bitrate: (*self.ctx).bit_rate as usize, bitrate: (*self.ctx).bit_rate as usize,
format: rstr!((*(*self.ctx).iformat).name).to_string(),
mime_types: rstr!((*(*self.ctx).iformat).mime_type).to_string(),
streams, streams,
#[cfg(feature = "avformat_version_greater_than_60_19")] #[cfg(feature = "avformat_version_greater_than_60_19")]
groups: stream_groups, groups: stream_groups,
@ -246,13 +248,17 @@ impl Demuxer {
impl Drop for Demuxer { impl Drop for Demuxer {
fn drop(&mut self) { fn drop(&mut self) {
if !self.ctx.is_null() { unsafe {
unsafe { if !self.ctx.is_null() {
if let DemuxerInput::Reader(_, _) = self.input { match self.input {
av_free((*(*self.ctx).pb).buffer as *mut _); DemuxerInput::Reader(_, _) => {
drop(SlimBox::<dyn Read>::from_raw((*(*self.ctx).pb).opaque)); av_free((*(*self.ctx).pb).buffer as *mut _);
drop(SlimBox::<dyn Read>::from_raw((*(*self.ctx).pb).opaque));
avio_context_free(&mut (*self.ctx).pb);
}
_ => {}
} }
avformat_free_context(self.ctx); avformat_close_input(&mut self.ctx);
} }
} }
} }
@ -268,10 +274,10 @@ mod tests {
fn test_stream_groups() -> Result<()> { fn test_stream_groups() -> Result<()> {
unsafe { unsafe {
let mut demux = let mut demux =
Demuxer::new("/core/[SubsPlease] Kinoko Inu - 06 (1080p) [FECF68AF].mkv")?; Demuxer::new("https://trac.ffmpeg.org/raw-attachment/ticket/11170/IMG_4765.HEIC")?;
let probe = demux.probe_input()?; let probe = demux.probe_input()?;
assert_eq!(1, probe.streams.len()); assert_eq!(3, probe.streams.len());
assert_eq!(1, probe.groups.len()); assert_eq!(0, probe.groups.len());
assert!(matches!( assert!(matches!(
probe.groups[0].group_type, probe.groups[0].group_type,
StreamGroupType::TileGrid { .. } StreamGroupType::TileGrid { .. }
@ -279,4 +285,21 @@ mod tests {
} }
Ok(()) Ok(())
} }
/// Test for leaking file handles
#[test]
fn probe_lots() -> Result<()> {
rlimit::setrlimit(rlimit::Resource::NOFILE, 64, 128)?;
let nof_limit = rlimit::Resource::NOFILE.get_hard()?;
for n in 0..nof_limit {
let mut demux = Demuxer::new("./test_output/test.png")?;
unsafe {
if let Err(e) = demux.probe_input() {
bail!("Failed on {}: {}", n, e);
}
}
}
Ok(())
}
} }

View File

@ -107,7 +107,7 @@ impl Filter {
Ok(()) Ok(())
} }
pub unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<*mut AVFrame, Error> { pub unsafe fn process_frame(&mut self, _frame: *mut AVFrame) -> Result<*mut AVFrame, Error> {
todo!(); todo!();
} }
} }

View File

@ -52,6 +52,7 @@ macro_rules! bail_ffmpeg {
#[macro_export] #[macro_export]
macro_rules! cstr { macro_rules! cstr {
($str:expr) => { ($str:expr) => {
// TODO: leaky
std::ffi::CString::new($str).unwrap().into_raw() std::ffi::CString::new($str).unwrap().into_raw()
}; };
} }

View File

@ -4,18 +4,23 @@ use ffmpeg_sys_the_third::{
av_free, av_interleaved_write_frame, av_mallocz, av_packet_rescale_ts, av_write_trailer, av_free, av_interleaved_write_frame, av_mallocz, av_packet_rescale_ts, av_write_trailer,
avcodec_parameters_copy, avcodec_parameters_from_context, avformat_alloc_output_context2, avcodec_parameters_copy, avcodec_parameters_from_context, avformat_alloc_output_context2,
avformat_free_context, avformat_new_stream, avformat_write_header, avio_alloc_context, avformat_free_context, avformat_new_stream, avformat_write_header, avio_alloc_context,
avio_open, AVFormatContext, AVIOContext, AVPacket, AVStream, AVERROR_EOF, AVFMT_GLOBALHEADER, avio_close, avio_context_free, avio_open, AVFormatContext, AVIOContext, AVPacket, AVStream,
AVFMT_NOFILE, AVIO_FLAG_DIRECT, AVIO_FLAG_WRITE, AV_CODEC_FLAG_GLOBAL_HEADER, AVERROR_EOF, AVFMT_GLOBALHEADER, AVFMT_NOFILE, AVIO_FLAG_DIRECT, AVIO_FLAG_WRITE,
AV_CODEC_FLAG_GLOBAL_HEADER,
}; };
use slimbox::{slimbox_unsize, SlimBox, SlimMut}; use slimbox::{slimbox_unsize, SlimBox, SlimMut};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom, Write}; use std::io::{Seek, SeekFrom, Write};
use std::{ptr, slice}; use std::{ptr, slice};
#[cfg(feature = "ff_api_avio_write_nonconst")]
type WriteDataPtr = *mut u8;
#[cfg(not(feature = "ff_api_avio_write_nonconst"))]
type WriteDataPtr = *const u8;
unsafe extern "C" fn write_data<T>( unsafe extern "C" fn write_data<T>(
opaque: *mut libc::c_void, opaque: *mut libc::c_void,
#[cfg(feature = "avformat_version_greater_than_60_12")] buffer: *const u8, buffer: WriteDataPtr,
#[cfg(not(feature = "avformat_version_greater_than_60_12"))] buffer: *mut u8,
size: libc::c_int, size: libc::c_int,
) -> libc::c_int ) -> libc::c_int
where where
@ -111,6 +116,12 @@ pub struct MuxerBuilder {
format: Option<String>, format: Option<String>,
} }
impl Default for MuxerBuilder {
fn default() -> Self {
Self::new()
}
}
impl MuxerBuilder { impl MuxerBuilder {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -282,13 +293,9 @@ impl Muxer {
MuxerBuilder::add_copy_stream(self.ctx, in_stream) MuxerBuilder::add_copy_stream(self.ctx, in_stream)
} }
/// Initialize the context, usually after it was closed with [Muxer::reset] /// Initialize the context, usually after it was closed with [Muxer::close]
pub unsafe fn init(&mut self) -> Result<()> { pub unsafe fn init(&mut self) -> Result<()> {
MuxerBuilder::init_ctx( MuxerBuilder::init_ctx(&mut self.ctx, self.url.as_deref(), self.format.as_deref())
&mut self.ctx,
self.url.as_ref().map(|v| v.as_str()),
self.format.as_ref().map(|v| v.as_str()),
)
} }
/// Change the muxer URL /// Change the muxer URL
@ -352,10 +359,36 @@ impl Muxer {
/// Close the output and write the trailer /// Close the output and write the trailer
/// [Muxer::init] can be used to re-init the muxer /// [Muxer::init] can be used to re-init the muxer
pub unsafe fn reset(&mut self) -> Result<()> { pub unsafe fn close(&mut self) -> Result<()> {
let ret = av_write_trailer(self.ctx); let ret = av_write_trailer(self.ctx);
bail_ffmpeg!(ret); bail_ffmpeg!(ret);
self.ctx = ptr::null_mut(); self.free_ctx()?;
Ok(())
}
unsafe fn free_ctx(&mut self) -> Result<()> {
if !self.ctx.is_null() {
match self.output {
MuxerOutput::Url(_) => {
if !(*self.ctx).pb.is_null() {
let ret = avio_close((*self.ctx).pb);
bail_ffmpeg!(ret);
}
}
MuxerOutput::WriterSeeker(_) => {
av_free((*(*self.ctx).pb).buffer as *mut _);
drop(SlimBox::<dyn WriteSeek>::from_raw((*(*self.ctx).pb).opaque));
avio_context_free(&mut (*self.ctx).pb);
}
MuxerOutput::Writer(_) => {
av_free((*(*self.ctx).pb).buffer as *mut _);
drop(SlimBox::<dyn Write>::from_raw((*(*self.ctx).pb).opaque));
avio_context_free(&mut (*self.ctx).pb);
}
}
avformat_free_context(self.ctx);
self.ctx = ptr::null_mut();
}
Ok(()) Ok(())
} }
} }
@ -363,13 +396,7 @@ impl Muxer {
impl Drop for Muxer { impl Drop for Muxer {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {
if !self.ctx.is_null() { self.free_ctx().expect("drop muxer");
if let MuxerOutput::Writer(_) = self.output {
av_free((*(*self.ctx).pb).buffer as *mut _);
drop(SlimBox::<dyn Read>::from_raw((*(*self.ctx).pb).opaque));
}
avformat_free_context(self.ctx);
}
} }
} }
} }
@ -425,7 +452,7 @@ mod tests {
for f_pk in encoder.encode_frame(ptr::null_mut())? { for f_pk in encoder.encode_frame(ptr::null_mut())? {
muxer.write_packet(f_pk)?; muxer.write_packet(f_pk)?;
} }
muxer.reset()?; muxer.close()?;
Ok(()) Ok(())
} }

View File

@ -10,8 +10,15 @@ use std::intrinsics::transmute;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct DemuxerInfo { pub struct DemuxerInfo {
/// Average bitrate of the media
pub bitrate: usize, pub bitrate: usize,
/// Duration of the media in seconds
pub duration: f32, pub duration: f32,
/// Comma separated list of formats supported by the demuxer
pub format: String,
/// Comma separated list of mime-types used during probing
pub mime_types: String,
/// List of streams contained in the media
pub streams: Vec<StreamInfo>, pub streams: Vec<StreamInfo>,
#[cfg(feature = "avformat_version_greater_than_60_19")] #[cfg(feature = "avformat_version_greater_than_60_19")]
pub groups: Vec<StreamGroupInfo>, pub groups: Vec<StreamGroupInfo>,

View File

@ -33,6 +33,19 @@ impl Transcoder {
}) })
} }
/// Create a new transcoder from both a muxer and a demuxer
pub unsafe fn new_custom_io(demuxer: Demuxer, muxer: Muxer) -> Self {
Self {
demuxer,
decoder: Decoder::new(),
scalers: HashMap::new(),
resampler: HashMap::new(),
encoders: HashMap::new(),
copy_stream: HashMap::new(),
muxer,
}
}
/// Prepare the transcoder by probing the input /// Prepare the transcoder by probing the input
pub unsafe fn prepare(&mut self) -> Result<DemuxerInfo> { pub unsafe fn prepare(&mut self) -> Result<DemuxerInfo> {
self.demuxer.probe_input() self.demuxer.probe_input()
@ -97,7 +110,7 @@ impl Transcoder {
// flush // flush
if pkt.is_null() { if pkt.is_null() {
for (_, enc) in &mut self.encoders { for enc in self.encoders.values_mut() {
for mut new_pkt in enc.encode_frame(ptr::null_mut())? { for mut new_pkt in enc.encode_frame(ptr::null_mut())? {
self.muxer.write_packet(new_pkt)?; self.muxer.write_packet(new_pkt)?;
av_packet_free(&mut new_pkt); av_packet_free(&mut new_pkt);
@ -108,7 +121,7 @@ impl Transcoder {
let src_index = (*stream).index; let src_index = (*stream).index;
// check if encoded stream // check if encoded stream
if let Some(enc) = self.encoders.get_mut(&src_index) { if let Some(enc) = self.encoders.get_mut(&src_index) {
for mut frame in self.decoder.decode_pkt(pkt)? { for (mut frame, _stream) in self.decoder.decode_pkt(pkt)? {
// scale video frame before sending to encoder // scale video frame before sending to encoder
let frame = if let Some(sws) = self.scalers.get_mut(&src_index) { let frame = if let Some(sws) = self.scalers.get_mut(&src_index) {
let enc_ctx = enc.codec_context(); let enc_ctx = enc.codec_context();
@ -155,7 +168,7 @@ impl Transcoder {
while !self.next()? { while !self.next()? {
// nothing here // nothing here
} }
self.muxer.reset()?; self.muxer.close()?;
Ok(()) Ok(())
} }
} }
@ -172,6 +185,7 @@ mod tests {
"test_output/test_transcode.mkv", "test_output/test_transcode.mkv",
)?; )?;
let info = transcoder.prepare()?; let info = transcoder.prepare()?;
assert!(!info.format.is_empty());
for c in info.streams { for c in info.streams {
transcoder.copy_stream(c)?; transcoder.copy_stream(c)?;
} }