This commit is contained in:
Kieran 2024-03-12 09:54:29 +00:00
parent d3cb7c8bf6
commit 13cb456f89
2 changed files with 15 additions and 12 deletions

View File

@ -34,7 +34,6 @@ unsafe extern "C" fn read_data(
) -> libc::c_int { ) -> libc::c_int {
let muxer = opaque as *mut Demuxer; let muxer = opaque as *mut Demuxer;
let len = size.min((*muxer).buffer.len() as libc::c_int); let len = size.min((*muxer).buffer.len() as libc::c_int);
//info!("ask={}, have={}", size, len);
if len > 0 { if len > 0 {
memcpy( memcpy(
buffer as *mut libc::c_void, buffer as *mut libc::c_void,
@ -44,20 +43,19 @@ unsafe extern "C" fn read_data(
_ = (*muxer).buffer.split_to(len as usize); _ = (*muxer).buffer.split_to(len as usize);
len len
} else { } else {
-1 AVERROR_BUFFER_TOO_SMALL
} }
} }
impl Demuxer { impl Demuxer {
const BUFFER_SIZE: usize = 1024 * 1024; const BUFFER_SIZE: usize = 1024 * 1024;
const MIN_BUFFER_TO_READ: usize = 8192; const INIT_BUFFER_THRESHOLD: usize = 2048;
pub fn new() -> Self { pub fn new() -> Self {
unsafe { unsafe {
let ps = avformat_alloc_context(); let ps = avformat_alloc_context();
(*ps).probesize = Self::BUFFER_SIZE as i64; (*ps).probesize = Self::BUFFER_SIZE as i64;
(*ps).flags |= AVFMT_FLAG_CUSTOM_IO; (*ps).flags |= AVFMT_FLAG_CUSTOM_IO;
Self { Self {
ctx: ps, ctx: ps,
buffer: BytesMut::with_capacity(Self::BUFFER_SIZE), buffer: BytesMut::with_capacity(Self::BUFFER_SIZE),
@ -65,15 +63,14 @@ impl Demuxer {
} }
} }
unsafe fn setup_io(&mut self, bytes: &Bytes) -> Result<bool, Error> { unsafe fn append_buffer(&mut self, bytes: &Bytes) {
self.buffer.extend_from_slice(bytes); self.buffer.extend_from_slice(bytes);
self.probe_input()
} }
unsafe fn probe_input(&mut self) -> Result<bool, Error> { unsafe fn probe_input(&mut self) -> Result<bool, Error> {
let size = self.buffer.len(); let size = self.buffer.len();
let score = (*self.ctx).probe_score; let score = (*self.ctx).probe_score;
if score == 0 && size >= Self::MIN_BUFFER_TO_READ { if score == 0 && size >= Self::INIT_BUFFER_THRESHOLD {
let pb = avio_alloc_context( let pb = avio_alloc_context(
av_mallocz(4096) as *mut libc::c_uchar, av_mallocz(4096) as *mut libc::c_uchar,
4096, 4096,
@ -83,6 +80,7 @@ impl Demuxer {
None, None,
None, None,
); );
(*self.ctx).pb = pb; (*self.ctx).pb = pb;
let ret = avformat_open_input( let ret = avformat_open_input(
&mut self.ctx, &mut self.ctx,
@ -111,9 +109,12 @@ impl Demuxer {
av_init_packet(pkt); av_init_packet(pkt);
let ret = av_read_frame(self.ctx, pkt); let ret = av_read_frame(self.ctx, pkt);
if ret == AVERROR_BUFFER_TOO_SMALL {
return Ok(None);
}
if ret < 0 { if ret < 0 {
let msg = Self::get_ffmpeg_error_msg(ret); let msg = Self::get_ffmpeg_error_msg(ret);
return Ok(None); return Err(Error::msg(msg));
} }
Ok(Some(pkt)) Ok(Some(pkt))
} }
@ -152,7 +153,8 @@ impl PipelineStep for Demuxer {
async fn process(&mut self, pkg: PipelinePayload) -> Result<PipelinePayload, Error> { async fn process(&mut self, pkg: PipelinePayload) -> Result<PipelinePayload, Error> {
match pkg { match pkg {
PipelinePayload::Bytes(ref bb) => unsafe { PipelinePayload::Bytes(ref bb) => unsafe {
if !self.setup_io(bb)? { self.append_buffer(bb);
if !self.probe_input()? {
return Ok(PipelinePayload::Empty); return Ok(PipelinePayload::Empty);
} }
match self.decode_packet() { match self.decode_packet() {

View File

@ -12,9 +12,10 @@ impl PipelineRunner {
pub async fn push(&mut self, bytes: bytes::Bytes) -> Result<(), anyhow::Error> { pub async fn push(&mut self, bytes: bytes::Bytes) -> Result<(), anyhow::Error> {
let mut output = PipelinePayload::Bytes(bytes); let mut output = PipelinePayload::Bytes(bytes);
for step in &mut self.steps { for step in &mut self.steps {
let output2 = step.process(output).await?; match step.process(output).await? {
//info!("{} result: {:?}", step.name(), output2); Some(pkg) => output = pkg,
output = output2; None => break,
}
} }
Ok(()) Ok(())
} }