This commit is contained in:
kieran 2024-03-26 21:48:01 +00:00
parent d6c03004ff
commit 4ac111214a
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
6 changed files with 41 additions and 35 deletions

View File

@ -21,6 +21,8 @@ impl Drop for CodecContext {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {
avcodec_free_context(&mut self.context); avcodec_free_context(&mut self.context);
self.codec = ptr::null_mut();
self.context = ptr::null_mut();
} }
} }
} }

View File

@ -1,11 +1,10 @@
use std::ptr; use std::ptr;
use std::time::{Duration, SystemTime}; use std::time::Duration;
use anyhow::Error; use anyhow::Error;
use bytes::{Bytes, BytesMut}; use bytes::Bytes;
use ffmpeg_sys_next::*; use ffmpeg_sys_next::*;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO}; use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
use log::{info, warn};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::time::Instant; use tokio::time::Instant;
@ -41,18 +40,21 @@ unsafe extern "C" fn read_data(
size: libc::c_int, size: libc::c_int,
) -> libc::c_int { ) -> libc::c_int {
let chan = opaque as *mut UnboundedReceiver<Bytes>; let chan = opaque as *mut UnboundedReceiver<Bytes>;
let mut data = (*chan).blocking_recv().expect("shit"); if let Some(mut data) = (*chan).blocking_recv() {
let buff_len = data.len(); let buff_len = data.len();
let mut len = size.min(buff_len as libc::c_int); let mut len = size.min(buff_len as libc::c_int);
if len > 0 { if len > 0 {
memcpy( memcpy(
buffer as *mut libc::c_void, buffer as *mut libc::c_void,
data.as_ptr() as *const libc::c_void, data.as_ptr() as *const libc::c_void,
len as libc::c_ulonglong, len as libc::c_ulonglong,
); );
}
len
} else {
AVERROR_EOF
} }
len
} }
impl Demuxer { impl Demuxer {
@ -143,10 +145,7 @@ impl Demuxer {
let pkt: *mut AVPacket = av_packet_alloc(); let pkt: *mut AVPacket = av_packet_alloc();
let ret = av_read_frame(self.ctx, pkt); let ret = av_read_frame(self.ctx, pkt);
if ret == AVERROR_EOF { if ret == AVERROR_EOF {
// reset EOF flag, stream never ends return Err(Error::msg("Stream EOF"));
(*(*self.ctx).pb).eof_reached = 0;
warn!("EOF was reached, stream might skip frames");
return Ok(());
} }
if ret < 0 { if ret < 0 {
let msg = get_ffmpeg_error_msg(ret); let msg = get_ffmpeg_error_msg(ret);

View File

@ -1,19 +1,10 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ffi::{CStr, CString};
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::mem::transmute; use std::mem::transmute;
use std::ptr; use std::ptr;
use std::ptr::slice_from_raw_parts;
use anyhow::Error; use anyhow::Error;
use ffmpeg_sys_next::{ use ffmpeg_sys_next::{AV_CH_LAYOUT_STEREO, av_channel_layout_default, av_dump_format, av_get_sample_fmt, av_interleaved_write_frame, av_opt_set, AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_parameters_from_context, AVCodecContext, avformat_alloc_output_context2, avformat_free_context, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket, AVRational};
AV_CH_LAYOUT_STEREO, av_channel_layout_default, av_dump_format, av_get_sample_fmt,
av_interleaved_write_frame, av_opt_set, av_packet_rescale_ts, av_write_frame,
AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_parameters_copy,
avcodec_parameters_from_context, avcodec_send_frame, avcodec_send_packet, AVCodecContext,
avformat_alloc_output_context2, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket,
AVRational,
};
use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE;
use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709; use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709;
use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO}; use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO};
@ -22,7 +13,7 @@ use futures_util::StreamExt;
use itertools::Itertools; use itertools::Itertools;
use log::info; use log::info;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, UnboundedReceiver}; use tokio::sync::mpsc::{UnboundedReceiver};
use uuid::{Bytes, Uuid, Variant}; use uuid::{Bytes, Uuid, Variant};
use crate::demux::info::{DemuxStreamInfo, StreamChannelType}; use crate::demux::info::{DemuxStreamInfo, StreamChannelType};
@ -51,7 +42,6 @@ impl Display for HLSEgressConfig {
} }
pub struct HlsEgress { pub struct HlsEgress {
/// Pipeline id
id: Uuid, id: Uuid,
config: HLSEgressConfig, config: HLSEgressConfig,
ctx: *mut AVFormatContext, ctx: *mut AVFormatContext,
@ -63,6 +53,15 @@ unsafe impl Send for HlsEgress {}
unsafe impl Sync for HlsEgress {} unsafe impl Sync for HlsEgress {}
impl Drop for HlsEgress {
fn drop(&mut self) {
unsafe {
avformat_free_context(self.ctx);
self.ctx = ptr::null_mut();
}
}
}
impl HlsEgress { impl HlsEgress {
pub fn new( pub fn new(
chan_in: UnboundedReceiver<PipelinePayload>, chan_in: UnboundedReceiver<PipelinePayload>,

View File

@ -1,10 +1,7 @@
use std::ffi::CStr; use std::ffi::CStr;
use config::Config; use config::Config;
use futures_util::future::join_all;
use futures_util::StreamExt;
use log::{error, info}; use log::{error, info};
use tokio::sync::futures;
use url::Url; use url::Url;
use crate::pipeline::builder::PipelineBuilder; use crate::pipeline::builder::PipelineBuilder;

View File

@ -4,7 +4,7 @@ use std::ptr;
use anyhow::Error; use anyhow::Error;
use ffmpeg_sys_next::{ use ffmpeg_sys_next::{
av_buffer_ref, av_frame_alloc, av_frame_copy_props, AVBufferRef, AVFrame, av_buffer_ref, av_frame_alloc, av_frame_copy_props, AVBufferRef, AVFrame,
SWS_BILINEAR, sws_getContext, sws_scale_frame, SwsContext, SWS_BILINEAR, sws_freeContext, sws_getContext, sws_scale_frame, SwsContext,
}; };
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
@ -25,6 +25,15 @@ unsafe impl Send for Scaler {}
unsafe impl Sync for Scaler {} unsafe impl Sync for Scaler {}
impl Drop for Scaler {
fn drop(&mut self) {
unsafe {
sws_freeContext(self.ctx);
self.ctx = ptr::null_mut();
}
}
}
impl Scaler { impl Scaler {
pub fn new( pub fn new(
chan_in: broadcast::Receiver<PipelinePayload>, chan_in: broadcast::Receiver<PipelinePayload>,
@ -79,7 +88,7 @@ impl Scaler {
self.chan_out.send(PipelinePayload::AvFrame( self.chan_out.send(PipelinePayload::AvFrame(
"Scaler frame".to_owned(), "Scaler frame".to_owned(),
dst_frame, dst_frame,
src_index src_index,
))?; ))?;
Ok(()) Ok(())
} }

View File

@ -1,5 +1,5 @@
#!/bin/bash #!/bin/bash
ffmpeg \ ffmpeg \
-re -f lavfi -i testsrc -g 60 -r 30 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -c:a aac -b:a 192k \ -re -f lavfi -i testsrc -g 300 -r 60 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -c:a aac -b:a 192k \
-f mpegts srt://localhost:3333 -f mpegts srt://localhost:3333