From 0b8742bd25ce5a44f000308ca7aad50d3d8f1b39 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 13 Nov 2024 16:06:52 +0000 Subject: [PATCH] feat: test-source fixes --- Cargo.lock | 25 ++++++++++++++++++++++++- Cargo.toml | 8 +++++--- src/egress/hls.rs | 4 +++- src/egress/recorder.rs | 13 ++++--------- src/ingress/test.rs | 40 +++++++++++++++++++++++++--------------- src/pipeline/runner.rs | 11 +++++++++-- src/variant/audio.rs | 2 +- 7 files changed, 71 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73c4585..acc1ca5 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,6 +441,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crunchy" version = "0.2.2" @@ -557,7 +563,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=07a2c728883dff15961edbe1f8c6aa5e65cef293#07a2c728883dff15961edbe1f8c6aa5e65cef293" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=6d443a24bcbe53c0a02a9dce3e41d22815229585#6d443a24bcbe53c0a02a9dce3e41d22815229585" dependencies = [ "anyhow", "ffmpeg-sys-the-third", @@ -1348,6 +1354,12 @@ dependencies = [ "miniz_oxide 0.8.0", ] +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1473,6 +1485,16 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "ringbuf" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726bb493fe9cac765e8f96a144c3a8396bdf766dedad22e504b70b908dcbceb4" +dependencies = [ + "crossbeam-utils", + "portable-atomic", +] + [[package]] name = "ron" version = "0.8.1" @@ -2474,6 +2496,7 @@ dependencies = [ "pretty_env_logger", "rand", "resvg", + "ringbuf", "serde", "srt-tokio", "tiny-skia", diff --git a/Cargo.toml b/Cargo.toml index 50f575d..dbe7e5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,10 +6,10 @@ edition = "2021" [features] default = ["test-source"] srt = ["dep:srt-tokio"] -test-source = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue"] +test-source = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf"] [dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "07a2c728883dff15961edbe1f8c6aa5e65cef293" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "6d443a24bcbe53c0a02a9dce3e41d22815229585" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } @@ -27,10 +27,12 @@ itertools = "0.13.0" rand = "0.8.5" clap = { version = "4.5.16", features = ["derive"] } warp = "0.3.7" +libc = "0.2.162" srt-tokio = { version = "0.4.3", optional = true } resvg = { version = "0.44.0", optional = true } usvg = { version = "0.44.0", optional = true } tiny-skia = { version = "0.11.4", optional = true } fontdue = { version = "0.9.2", optional = true } -libc = "0.2.162" +ringbuf = { version = "0.4.7", optional = true } + diff --git a/src/egress/hls.rs b/src/egress/hls.rs index 320f365..13460d0 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -64,7 +64,9 @@ impl HlsEgress { opts.insert("hls_flags".to_string(), "delete_segments".to_string()); let muxer = unsafe { - let mut m = Muxer::new().with_output(&base, Some("hls"), Some(opts))?; + let mut m = Muxer::builder() + .with_output_path(base.to_str().unwrap(), Some("hls"), Some(opts))? + .build()?; for e in encoded { m.add_stream_encoder(e)?; } diff --git a/src/egress/recorder.rs b/src/egress/recorder.rs index af43557..206b2e8 100644 --- a/src/egress/recorder.rs +++ b/src/egress/recorder.rs @@ -1,7 +1,6 @@ use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; use ffmpeg_rs_raw::{Encoder, Muxer}; -use std::collections::HashMap; use std::fs; use std::path::PathBuf; use uuid::Uuid; @@ -22,17 +21,13 @@ impl RecorderEgress { let id = Uuid::new_v4(); let base = PathBuf::from(&config.out_dir).join(id.to_string()); - let out_file = base.join("recording.mp4"); + let out_file = base.join("recording.ts"); fs::create_dir_all(&base)?; - let mut opt = HashMap::new(); - opt.insert( - "movflags".to_string(), - "+dash+delay_moov+skip_sidx+skip_trailer".to_string(), - ); - let muxer = unsafe { - let mut m = Muxer::new().with_output(&out_file, None, Some(opt))?; + let mut m = Muxer::builder() + .with_output_path(out_file.to_str().unwrap(), None, None)? + .build()?; for var in variants { m.add_stream_encoder(var)?; } diff --git a/src/ingress/test.rs b/src/ingress/test.rs index ce37291..3533d4b 100644 --- a/src/ingress/test.rs +++ b/src/ingress/test.rs @@ -8,14 +8,14 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::{AV_PIX_FMT_RGBA, AV_PIX use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ av_frame_alloc, av_frame_get_buffer, AV_PROFILE_H264_MAIN, }; -use ffmpeg_rs_raw::{Encoder, Scaler}; +use ffmpeg_rs_raw::{Encoder, Muxer, Scaler}; use fontdue::layout::{CoordinateSystem, Layout, TextStyle}; use fontdue::Font; use log::info; -use std::collections::VecDeque; +use ringbuf::traits::{Observer, Split}; +use ringbuf::{HeapCons, HeapRb}; use std::io::Read; use std::ops::Add; -use std::slice; use std::time::{Duration, Instant}; use tiny_skia::Pixmap; use warp::Buf; @@ -35,11 +35,12 @@ pub async fn listen(settings: Settings) -> Result<()> { struct TestPatternSrc { encoder: Encoder, scaler: Scaler, + muxer: Muxer, background: Pixmap, font: [Font; 1], frame_no: u64, start: Instant, - buf: VecDeque, + reader: HeapCons, } unsafe impl Send for TestPatternSrc {} @@ -69,18 +70,31 @@ impl TestPatternSrc { let font = include_bytes!("../../SourceCodePro-Regular.ttf") as &[u8]; let font = Font::from_bytes(font, Default::default()).unwrap(); + let buf = HeapRb::new(1024 * 1024); + let (writer, reader) = buf.split(); + + let muxer = unsafe { + let mut m = Muxer::builder() + .with_output_write(writer, Some("mpegts"), None)? + .with_stream_encoder(&encoder)? + .build()?; + m.open()?; + m + }; + Ok(Self { encoder, scaler, + muxer, background: pixmap, font: [font], frame_no: 0, start: Instant::now(), - buf: VecDeque::new(), + reader, }) } - pub unsafe fn next_pkt(&mut self) -> Result> { + pub unsafe fn next_pkt(&mut self) -> Result<()> { let stream_time = Duration::from_secs_f64(self.frame_no as f64 / 30.0); let real_time = Instant::now().duration_since(self.start); let wait_time = if stream_time > real_time { @@ -137,28 +151,24 @@ impl TestPatternSrc { } } - let mut ret = Vec::new(); // scale/encode let frame = self .scaler .process_frame(src_frame, 1280, 720, AV_PIX_FMT_YUV420P)?; for pkt in self.encoder.encode_frame(frame)? { - let buf = slice::from_raw_parts((*pkt).data, (*pkt).size as usize); - ret.extend(buf); + self.muxer.write_packet(pkt)?; } - Ok(ret) + Ok(()) } } impl Read for TestPatternSrc { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { unsafe { - while self.buf.len() < buf.len() { - let data = self.next_pkt().map_err(|e| std::io::Error::other(e))?; - self.buf.extend(data); + while self.reader.occupied_len() < buf.len() { + self.next_pkt().map_err(|e| std::io::Error::other(e))?; } } - self.buf.copy_to_slice(buf); - Ok(buf.len()) + self.reader.read(buf) } } diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index bace5e8..2a43de9 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -89,7 +89,14 @@ impl PipelineRunner { let src_index = (*stream).index; // TODO: For copy streams, skip decoder - for frame in self.decoder.decode_pkt(pkt)? { + let frames = if let Ok(frames) = self.decoder.decode_pkt(pkt) { + frames + } else { + warn!("Error decoding frames"); + return Ok(()); + }; + + for frame in frames { self.frame_ctr += 1; // Copy frame from GPU if using hwaccel decoding @@ -184,7 +191,7 @@ impl PipelineRunner { VariantStream::Audio(a) => { let enc = a.try_into()?; let rs = Resample::new( - av_get_sample_fmt(cstr!(&a.sample_fmt)), + av_get_sample_fmt(cstr!(a.sample_fmt.as_bytes())), a.sample_rate as _, a.channels as _, ); diff --git a/src/variant/audio.rs b/src/variant/audio.rs index 78469e4..a8c0845 100644 --- a/src/variant/audio.rs +++ b/src/variant/audio.rs @@ -71,7 +71,7 @@ impl TryInto for &AudioVariant { .with_sample_rate(self.sample_rate as _) .with_bitrate(self.bitrate as _) .with_default_channel_layout(self.channels as _) - .with_sample_format(av_get_sample_fmt(cstr!(&self.sample_fmt))) + .with_sample_format(av_get_sample_fmt(cstr!(self.sample_fmt.as_bytes()))) .open(None)?; Ok(enc)