diff --git a/Cargo.lock b/Cargo.lock index 43411b1..533305d 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "num-traits", +] + [[package]] name = "cipher" version = "0.4.4" @@ -563,7 +572,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=bde945fe887dfdb38fff096bbf1928b9e8e8469f#bde945fe887dfdb38fff096bbf1928b9e8e8469f" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=0abe0c5229adeb64b013d1895c7eba3d917f05a4#0abe0c5229adeb64b013d1895c7eba3d917f05a4" dependencies = [ "anyhow", "ffmpeg-sys-the-third", @@ -1092,6 +1101,16 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "m3u8-rs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03cd3335fb5f2447755d45cda9c70f76013626a9db44374973791b0926a86c3" +dependencies = [ + "chrono", + "nom", +] + [[package]] name = "memchr" version = "2.7.1" @@ -2493,6 +2512,7 @@ dependencies = [ "itertools 0.13.0", "libc", "log", + "m3u8-rs", "pretty_env_logger", "rand", "resvg", diff --git a/Cargo.toml b/Cargo.toml index 367d1bc..327eba7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ srt = ["dep:srt-tokio"] test-source = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf"] [dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "bde945fe887dfdb38fff096bbf1928b9e8e8469f" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "0abe0c5229adeb64b013d1895c7eba3d917f05a4" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } @@ -39,4 +39,5 @@ usvg = { version = "0.44.0", optional = true } tiny-skia = { version = "0.11.4", optional = true } fontdue = { version = "0.9.2", optional = true } ringbuf = { version = "0.4.7", optional = true } +m3u8-rs = "6.0.0" diff --git a/src/bin/nostr_sidecar.rs b/src/bin/nostr_sidecar.rs deleted file mode 100644 index 2c13210..0000000 --- a/src/bin/nostr_sidecar.rs +++ /dev/null @@ -1,6 +0,0 @@ - - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - -} \ No newline at end of file diff --git a/src/bin/zap_stream_core.rs b/src/bin/zap_stream_core.rs index 6b3d46f..63b5a0c 100644 --- a/src/bin/zap_stream_core.rs +++ b/src/bin/zap_stream_core.rs @@ -11,7 +11,6 @@ use zap_stream_core::ingress::srt; use zap_stream_core::ingress::{file, tcp, test}; use zap_stream_core::settings::Settings; - #[derive(Parser, Debug)] struct Args { /// Add file input at startup @@ -61,10 +60,7 @@ async fn main() -> anyhow::Result<()> { ))); if let Some(p) = args.file { - listeners.push(tokio::spawn(file::listen( - p.parse()?, - settings.clone(), - ))); + listeners.push(tokio::spawn(file::listen(p.parse()?, settings.clone()))); } #[cfg(feature = "test-source")] if args.test_pattern { diff --git a/src/egress/hls.rs b/src/egress/hls.rs index 8fc970b..30045ee 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -1,125 +1,308 @@ -use std::collections::HashMap; -use std::fmt::Display; -use std::path::PathBuf; - -use anyhow::Result; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; -use ffmpeg_rs_raw::{Encoder, Muxer}; +use anyhow::{bail, Result}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_free, av_opt_set, av_q2d, av_strdup, av_write_frame, avio_flush, avio_open, AVPacket, + AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY, +}; +use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; -use log::info; +use log::{info, warn}; +use m3u8_rs::MediaSegment; +use std::fmt::Display; +use std::fs::File; +use std::path::PathBuf; +use std::ptr; use uuid::Uuid; -use crate::egress::{Egress, EgressConfig}; +use crate::egress::Egress; use crate::variant::{StreamMapping, VariantStream}; pub struct HlsEgress { id: Uuid, - config: EgressConfig, - muxer: Muxer, + + /// All variant streams + variants: Vec, } -enum HlsMapEntry { - Video(usize), - Audio(usize), - Subtitle(usize), +enum HlsVariantStream { + Video { + group: usize, + index: usize, + id: Uuid, + }, + Audio { + group: usize, + index: usize, + id: Uuid, + }, + Subtitle { + group: usize, + index: usize, + id: Uuid, + }, } -impl Display for HlsMapEntry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl HlsVariantStream { + pub fn group(&self) -> usize { match self { - HlsMapEntry::Video(i) => write!(f, "v:{}", i), - HlsMapEntry::Audio(i) => write!(f, "a:{}", i), - HlsMapEntry::Subtitle(i) => write!(f, "s:{}", i), + HlsVariantStream::Video { group, .. } => *group, + HlsVariantStream::Audio { group, .. } => *group, + HlsVariantStream::Subtitle { group, .. } => *group, + } + } + + pub fn index(&self) -> usize { + match self { + HlsVariantStream::Video { index, .. } => *index, + HlsVariantStream::Audio { index, .. } => *index, + HlsVariantStream::Subtitle { index, .. } => *index, + } + } + + pub fn id(&self) -> &Uuid { + match self { + HlsVariantStream::Video { id, .. } => id, + HlsVariantStream::Audio { id, .. } => id, + HlsVariantStream::Subtitle { id, .. } => id, } } } -struct HlsStream { - name: String, - entries: Vec, +impl Display for HlsVariantStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HlsVariantStream::Video { index, .. } => write!(f, "v:{}", index), + HlsVariantStream::Audio { index, .. } => write!(f, "a:{}", index), + HlsVariantStream::Subtitle { index, .. } => write!(f, "s:{}", index), + } + } } -impl Display for HlsStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{},name:{}", self.entries.iter().join(","), self.name) +struct HlsVariant { + /// Name of this variant (720p) + name: String, + /// MPEG-TS muxer for this variant + mux: Muxer, + /// List of streams ids in this variant + streams: Vec, + /// Segment length in seconds + segment_length: f32, + /// Current segment index + idx: u64, + /// Output directory (base) + out_dir: String, + /// List of segments to be included in the playlist + segments: Vec, +} + +struct SegmentInfo(u64, f32); + +impl SegmentInfo { + fn to_media_segment(&self) -> MediaSegment { + MediaSegment { + uri: HlsVariant::segment_name(self.0), + duration: self.1, + title: Some("no desc".to_string()), + ..MediaSegment::default() + } + } + + fn filename(&self) -> String { + HlsVariant::segment_name(self.0) + } +} + +impl HlsVariant { + pub fn new<'a>( + out_dir: &'a str, + segment_length: f32, + group: usize, + encoded_vars: impl Iterator, + ) -> Result { + let name = format!("stream_{}", group); + let first_seg = Self::map_segment_path(out_dir, &name, 1); + std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?; + + let mut mux = unsafe { + Muxer::builder() + .with_output_path(first_seg.as_str(), Some("mpegts"))? + .build()? + }; + let mut streams = Vec::new(); + for (var, enc) in encoded_vars { + match var { + VariantStream::Video(v) => unsafe { + let stream = mux.add_stream_encoder(enc)?; + streams.push(HlsVariantStream::Video { + group, + index: (*stream).index as usize, + id: v.id(), + }) + }, + VariantStream::Audio(a) => unsafe { + let stream = mux.add_stream_encoder(enc)?; + streams.push(HlsVariantStream::Audio { + group, + index: (*stream).index as usize, + id: a.id(), + }) + }, + VariantStream::Subtitle(s) => unsafe { + let stream = mux.add_stream_encoder(enc)?; + streams.push(HlsVariantStream::Subtitle { + group, + index: (*stream).index as usize, + id: s.id(), + }) + }, + _ => panic!("unsupported variant stream"), + } + } + unsafe { + mux.open(None)?; + } + Ok(Self { + name: name.clone(), + segment_length, + mux, + streams, + idx: 1, + segments: Vec::from([SegmentInfo(1, segment_length)]), + out_dir: out_dir.to_string(), + }) + } + + pub fn segment_name(idx: u64) -> String { + format!("{}.ts", idx) + } + + pub fn out_dir(&self) -> PathBuf { + PathBuf::from(&self.out_dir).join(&self.name) + } + + pub fn map_segment_path(out_dir: &str, name: &str, idx: u64) -> String { + PathBuf::from(out_dir) + .join(name) + .join(Self::segment_name(idx)) + .to_string_lossy() + .to_string() + } + + /// Mux a packet created by the encoder for this variant + pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<()> { + // time of this packet in seconds + let pkt_time = (*pkt).pts as f32 * av_q2d((*pkt).time_base) as f32; + // what segment this pkt should be in (index) + let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64; + + let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY; + if pkt_seg != self.idx && can_split { + self.split_next_seg()?; + } + self.mux.write_packet(pkt) + } + + unsafe fn split_next_seg(&mut self) -> Result<()> { + self.idx += 1; + + // Manually reset muxer avio + let ctx = self.mux.context(); + av_write_frame(ctx, ptr::null_mut()); + avio_flush((*ctx).pb); + av_free((*ctx).url as *mut _); + + let next_seg_url = Self::map_segment_path(&*self.out_dir, &self.name, self.idx); + (*ctx).url = av_strdup(cstr!(next_seg_url.as_str())); + + let ret = avio_open(&mut (*ctx).pb, (*ctx).url, AVIO_FLAG_WRITE); + if ret < 0 { + bail!("Failed to re-alloc avio"); + } + + // tell muxer it needs to write headers again + av_opt_set( + (*ctx).priv_data, + cstr!("mpegts_flags"), + cstr!("resend_headers"), + 0, + ); + + info!("Writing segment {}", next_seg_url); + if let Err(e) = self.add_segment(self.idx, 2.0) { + warn!("Failed to update playlist: {}", e); + } + Ok(()) + } + + fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> { + self.segments.push(SegmentInfo(idx, duration)); + + const MAX_SEGMENTS: usize = 10; + + if self.segments.len() > MAX_SEGMENTS { + let n_drain = self.segments.len() - MAX_SEGMENTS; + let seg_dir = PathBuf::from(self.out_dir()); + for seg in self.segments.drain(..n_drain) { + // delete file + let seg_path = seg_dir.join(seg.filename()); + std::fs::remove_file(seg_path)?; + } + } + self.write_playlist() + } + + fn write_playlist(&mut self) -> Result<()> { + let mut pl = m3u8_rs::MediaPlaylist::default(); + pl.target_duration = self.segment_length as u64; + pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); + pl.version = Some(3); + pl.media_sequence = self.segments.first().map(|s| s.0).unwrap_or(0); + + let mut f_out = File::create(self.out_dir().join("live.m3u8"))?; + pl.write_to(&mut f_out)?; + Ok(()) + } + + pub fn to_mapping(&self) -> String { + format!( + "{},name:{}", + self.streams.iter().map(|j| j.to_string()).join(","), + self.name + ) } } impl HlsEgress { pub fn new<'a>( - config: EgressConfig, - encoded: impl Iterator, + out_dir: &str, + segment_length: f32, + encoders: impl Iterator, ) -> Result { let id = Uuid::new_v4(); - let base = PathBuf::from(&config.out_dir).join(id.to_string()); + let base = PathBuf::from(out_dir) + .join(id.to_string()) + .to_string_lossy() + .to_string(); - let mut opts = HashMap::new(); - opts.insert( - "hls_segment_filename".to_string(), - format!("{}/%v/%05d.ts", base.display()), - ); - opts.insert("master_pl_name".to_string(), "live.m3u8".to_string()); - opts.insert("master_pl_publish_rate".to_string(), "10".to_string()); - opts.insert("hls_time".to_string(), "2".to_string()); - opts.insert("hls_flags".to_string(), "delete_segments".to_string()); - - let muxer = unsafe { - let mut m = Muxer::builder() - .with_output_path( - base.join("%v/live.m3u8").to_str().unwrap(), - Some("hls"), - Some(opts), - )? - .build()?; - for e in encoded { - m.add_stream_encoder(e)?; - } - m.open()?; - m - }; - - Ok(Self { id, config, muxer }) - } - - unsafe fn setup_hls_mapping<'a>( - variants: impl Iterator, - ) -> Result { - // configure mapping - let mut stream_map = Vec::new(); - for (g, vars) in &variants - .sorted_by(|a, b| a.group_id().cmp(&b.group_id())) - .group_by(|x| x.group_id()) + let mut vars = Vec::new(); + for (k, group) in &encoders + .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) + .chunk_by(|a| a.0.group_id()) { - let group = HlsStream { - name: format!("stream_{}", g), - entries: Vec::new(), - }; - for var in vars { - todo!("get nth stream"); - let n = 0; - match var { - VariantStream::Video(_) => group.entries.push(HlsMapEntry::Video(n)), - VariantStream::Audio(_) => group.entries.push(HlsMapEntry::Audio(n)), - VariantStream::CopyVideo(_) => group.entries.push(HlsMapEntry::Video(n)), - VariantStream::CopyAudio(_) => group.entries.push(HlsMapEntry::Audio(n)), - }; - } - stream_map.push(group); + let var = HlsVariant::new(&base, segment_length, k, group)?; + vars.push(var); } - let stream_map = stream_map.iter().join(" "); - info!("map_str={}", stream_map); - - Ok(stream_map) + Ok(Self { id, variants: vars }) } } impl Egress for HlsEgress { unsafe fn process_pkt(&mut self, packet: *mut AVPacket, variant: &Uuid) -> Result<()> { - if self.config.variants.contains(variant) { - self.muxer.write_packet(packet) - } else { - Ok(()) + for var in self.variants.iter_mut() { + if var.streams.iter().any(|s| s.id() == variant) { + var.mux_packet(packet)?; + } } + Ok(()) } } diff --git a/src/egress/recorder.rs b/src/egress/recorder.rs index 206b2e8..8f7f7cd 100644 --- a/src/egress/recorder.rs +++ b/src/egress/recorder.rs @@ -26,12 +26,12 @@ impl RecorderEgress { let muxer = unsafe { let mut m = Muxer::builder() - .with_output_path(out_file.to_str().unwrap(), None, None)? + .with_output_path(out_file.to_str().unwrap(), None)? .build()?; for var in variants { m.add_stream_encoder(var)?; } - m.open()?; + m.open(None)?; m }; Ok(Self { id, config, muxer }) diff --git a/src/ingress/test.rs b/src/ingress/test.rs index 3533d4b..97affa7 100644 --- a/src/ingress/test.rs +++ b/src/ingress/test.rs @@ -75,10 +75,10 @@ impl TestPatternSrc { let muxer = unsafe { let mut m = Muxer::builder() - .with_output_write(writer, Some("mpegts"), None)? + .with_output_write(writer, Some("mpegts"))? .with_stream_encoder(&encoder)? .build()?; - m.open()?; + m.open(None)?; m }; diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 5ac7e96..114bc2b 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -12,7 +12,9 @@ use crate::pipeline::{EgressType, PipelineConfig}; use crate::variant::{StreamMapping, VariantStream}; use crate::webhook::Webhook; use anyhow::Result; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_get_sample_fmt, av_packet_free}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, +}; use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, }; @@ -100,7 +102,8 @@ impl PipelineRunner { self.frame_ctr += 1; // Copy frame from GPU if using hwaccel decoding - let frame = get_frame_from_hw(frame)?; + let mut frame = get_frame_from_hw(frame)?; + (*frame).time_base = (*stream).time_base; // Get the variants which want this pkt let pkt_vars = self @@ -116,9 +119,11 @@ impl PipelineRunner { continue; }; - let frame = match var { + let mut new_frame = false; + let mut frame = match var { VariantStream::Video(v) => { if let Some(s) = self.scalers.get_mut(&v.id()) { + new_frame = true; s.process_frame(frame, v.width, v.height, transmute(v.pixel_format))? } else { frame @@ -128,6 +133,7 @@ impl PipelineRunner { if let Some(r) = self.resampler.get_mut(&a.id()) { let frame_size = (*enc.codec_context()).frame_size; // TODO: resample audio fifo + new_frame = true; r.process_frame(frame, frame_size)? } else { frame @@ -136,6 +142,18 @@ impl PipelineRunner { _ => frame, }; + // before encoding frame, rescale timestamps + if !frame.is_null() { + let enc_ctx = enc.codec_context(); + (*frame).pts = + av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).pkt_dts = + av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).duration = + av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); + (*frame).time_base = (*enc_ctx).time_base; + } + let packets = enc.encode_frame(frame)?; // pass new packets to egress for mut pkt in packets { @@ -144,7 +162,13 @@ impl PipelineRunner { } av_packet_free(&mut pkt); } + + if new_frame { + av_frame_free(&mut frame); + } } + + av_frame_free(&mut frame); } av_packet_free(&mut pkt); @@ -212,13 +236,16 @@ impl PipelineRunner { for e in cfg.egress { match e { EgressType::HLS(ref c) => { - let encoders = self - .encoders - .iter() - .filter(|(k, v)| c.variants.contains(k)) - .map(|(_, v)| v); + let encoders = self.encoders.iter().filter_map(|(k, v)| { + if c.variants.contains(k) { + let var = cfg.variants.iter().find(|x| x.id() == *k)?; + Some((var, v)) + } else { + None + } + }); - let hls = HlsEgress::new(c.clone(), encoders)?; + let hls = HlsEgress::new(&c.out_dir, 2.0, encoders)?; self.egress.push(Box::new(hls)); } EgressType::Recorder(ref c) => { diff --git a/src/variant/mod.rs b/src/variant/mod.rs index 0c456f3..5b42d25 100644 --- a/src/variant/mod.rs +++ b/src/variant/mod.rs @@ -16,6 +16,7 @@ pub enum VariantStream { Video(VideoVariant), /// Audio stream mapping Audio(AudioVariant), + Subtitle(VariantMapping), /// Copy stream src<>dst stream CopyVideo(VariantMapping), /// Copy stream src<>dst stream @@ -27,6 +28,7 @@ impl StreamMapping for VariantStream { match self { VariantStream::Video(v) => v.id(), VariantStream::Audio(v) => v.id(), + VariantStream::Subtitle(v) => v.id(), VariantStream::CopyAudio(v) => v.id(), VariantStream::CopyVideo(v) => v.id(), } @@ -36,6 +38,7 @@ impl StreamMapping for VariantStream { match self { VariantStream::Video(v) => v.src_index(), VariantStream::Audio(v) => v.src_index(), + VariantStream::Subtitle(v) => v.src_index(), VariantStream::CopyAudio(v) => v.src_index(), VariantStream::CopyVideo(v) => v.src_index(), } @@ -45,6 +48,7 @@ impl StreamMapping for VariantStream { match self { VariantStream::Video(v) => v.dst_index(), VariantStream::Audio(v) => v.dst_index(), + VariantStream::Subtitle(v) => v.dst_index(), VariantStream::CopyAudio(v) => v.dst_index(), VariantStream::CopyVideo(v) => v.dst_index(), } @@ -54,6 +58,7 @@ impl StreamMapping for VariantStream { match self { VariantStream::Video(v) => v.set_dst_index(dst), VariantStream::Audio(v) => v.set_dst_index(dst), + VariantStream::Subtitle(v) => v.set_dst_index(dst), VariantStream::CopyAudio(v) => v.set_dst_index(dst), VariantStream::CopyVideo(v) => v.set_dst_index(dst), } @@ -63,6 +68,7 @@ impl StreamMapping for VariantStream { match self { VariantStream::Video(v) => v.group_id(), VariantStream::Audio(v) => v.group_id(), + VariantStream::Subtitle(v) => v.group_id(), VariantStream::CopyAudio(v) => v.group_id(), VariantStream::CopyVideo(v) => v.group_id(), } @@ -74,6 +80,7 @@ impl Display for VariantStream { match self { VariantStream::Video(v) => write!(f, "{}", v), VariantStream::Audio(a) => write!(f, "{}", a), + VariantStream::Subtitle(s) => write!(f, "{}", s), VariantStream::CopyVideo(c) => write!(f, "{}", c), VariantStream::CopyAudio(c) => write!(f, "{}", c), } @@ -98,6 +105,7 @@ pub fn find_stream<'a>( .find(|x| match x { VariantStream::Video(v) => v.id() == *id, VariantStream::Audio(a) => a.id() == *id, + VariantStream::Subtitle(v) => v.id() == *id, VariantStream::CopyVideo(c) => c.id() == *id, VariantStream::CopyAudio(c) => c.id() == *id, })