From 9937f9a6f981f693b543cddb44d3a6ed18ae7af5 Mon Sep 17 00:00:00 2001 From: kieran Date: Thu, 21 Nov 2024 22:08:47 +0000 Subject: [PATCH] feat: cleanup stream on end fix: audio codecs fix: hls segmenter --- Cargo.lock | 4 - src/egress/hls.rs | 7 ++ src/egress/mod.rs | 1 + src/egress/recorder.rs | 4 + src/ingress/mod.rs | 13 +++- src/ingress/srt.rs | 4 + src/mux/hls.rs | 15 +++- src/overseer/mod.rs | 13 +++- src/overseer/webhook.rs | 4 + src/overseer/zap_stream.rs | 23 +++++- src/pipeline/runner.rs | 77 ++++++++++++------- zap-stream-db/Cargo.lock | 4 - zap-stream-db/Cargo.toml | 2 +- .../migrations/20241115120541_init.sql | 10 +-- zap-stream-db/src/db.rs | 2 +- zap-stream-db/src/model.rs | 2 +- 16 files changed, 131 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c37fc8f..7acb312 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -3430,7 +3430,6 @@ dependencies = [ "tokio-stream", "tracing", "url", - "uuid", ] [[package]] @@ -3512,7 +3511,6 @@ dependencies = [ "stringprep", "thiserror 1.0.57", "tracing", - "uuid", "whoami", ] @@ -3552,7 +3550,6 @@ dependencies = [ "stringprep", "thiserror 1.0.57", "tracing", - "uuid", "whoami", ] @@ -3578,7 +3575,6 @@ dependencies = [ "sqlx-core", "tracing", "url", - "uuid", ] [[package]] diff --git a/src/egress/hls.rs b/src/egress/hls.rs index 682efcd..3862458 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -20,4 +20,11 @@ impl Egress for HlsMuxer { Ok(EgressResult::None) } } + + unsafe fn reset(&mut self) -> Result<()> { + for var in &mut self.variants { + var.reset()? + } + Ok(()) + } } diff --git a/src/egress/mod.rs b/src/egress/mod.rs index 9d27a0d..8fcf0ac 100644 --- a/src/egress/mod.rs +++ b/src/egress/mod.rs @@ -19,6 +19,7 @@ pub struct EgressConfig { pub trait Egress { unsafe fn process_pkt(&mut self, packet: *mut AVPacket, variant: &Uuid) -> Result; + unsafe fn reset(&mut self) -> Result<()>; } #[derive(Debug, Clone)] diff --git a/src/egress/recorder.rs b/src/egress/recorder.rs index 861e62e..2482e37 100644 --- a/src/egress/recorder.rs +++ b/src/egress/recorder.rs @@ -63,4 +63,8 @@ impl Egress for RecorderEgress { } Ok(EgressResult::None) } + + unsafe fn reset(&mut self) -> Result<()> { + self.muxer.reset() + } } diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index f8500ad..0ad806b 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -38,9 +38,16 @@ pub async fn spawn_pipeline( std::thread::spawn(move || unsafe { match PipelineRunner::new(handle, out_dir, seer, info, reader) { Ok(mut pl) => loop { - if let Err(e) = pl.run() { - error!("Pipeline run failed: {}", e); - break; + match pl.run() { + Ok(c) => { + if !c { + break; + } + } + Err(e) => { + error!("Pipeline run failed: {}", e); + break; + } } }, Err(e) => { diff --git a/src/ingress/srt.rs b/src/ingress/srt.rs index 53905a1..75e9074 100644 --- a/src/ingress/srt.rs +++ b/src/ingress/srt.rs @@ -3,6 +3,7 @@ use crate::overseer::Overseer; use crate::pipeline::runner::PipelineRunner; use crate::settings::Settings; use anyhow::Result; +use futures_util::stream::FusedStream; use futures_util::{SinkExt, StreamExt, TryStreamExt}; use log::{error, info, warn}; use srt_tokio::{SrtListener, SrtSocket}; @@ -54,6 +55,9 @@ impl Read for SrtReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let (mut rx, _) = self.socket.split_mut(); while self.buf.len() < buf.len() { + if rx.is_terminated() { + return Ok(0); + } if let Some((_, mut data)) = self.handle.block_on(rx.next()) { self.buf.extend(data.iter().as_slice()); } diff --git a/src/mux/hls.rs b/src/mux/hls.rs index b56564a..361de79 100644 --- a/src/mux/hls.rs +++ b/src/mux/hls.rs @@ -2,6 +2,7 @@ use crate::egress::NewSegment; use crate::variant::{StreamMapping, VariantStream}; use anyhow::{bail, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ av_free, av_opt_set, av_q2d, av_write_frame, avio_flush, avio_open, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY, @@ -215,7 +216,11 @@ impl HlsVariant { let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64; let mut result = None; - let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY; + let pkt_stream = *(*self.mux.context()) + .streams + .add((*pkt).stream_index as usize); + let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY + && (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO; if pkt_seg != self.idx && can_split { result = Some(self.split_next_seg(pkt_time)?); } @@ -223,6 +228,10 @@ impl HlsVariant { Ok(result) } + pub unsafe fn reset(&mut self) -> Result<()> { + self.mux.reset() + } + unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result { self.idx += 1; @@ -368,8 +377,8 @@ impl HlsVariant { } pub struct HlsMuxer { - out_dir: PathBuf, - variants: Vec, + pub out_dir: PathBuf, + pub variants: Vec, } impl HlsMuxer { diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs index 4b1347a..8a0aa6a 100644 --- a/src/overseer/mod.rs +++ b/src/overseer/mod.rs @@ -82,6 +82,9 @@ pub trait Overseer: Send + Sync { height: usize, path: &PathBuf, ) -> Result<()>; + + /// Stream is finished + async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>; } impl Settings { @@ -171,10 +174,10 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result Result<()> { + + // nothing to do here + Ok(()) + } } diff --git a/src/overseer/webhook.rs b/src/overseer/webhook.rs index 0956ce1..7d7c026 100644 --- a/src/overseer/webhook.rs +++ b/src/overseer/webhook.rs @@ -49,4 +49,8 @@ impl Overseer for WebhookOverseer { ) -> Result<()> { todo!() } + + async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> { + todo!() + } } diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs index 903208c..8c97dc2 100644 --- a/src/overseer/zap_stream.rs +++ b/src/overseer/zap_stream.rs @@ -6,7 +6,7 @@ use crate::overseer::{get_default_variants, IngressInfo, Overseer}; use crate::pipeline::{EgressType, PipelineConfig}; use crate::settings::LndSettings; use crate::variant::StreamMapping; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use chrono::Utc; use fedimint_tonic_lnd::verrpc::VersionRequest; @@ -140,7 +140,7 @@ impl ZapStreamOverseer { } let kind = Kind::from(STREAM_EVENT_KIND); - let coord = Coordinate::new(kind, self.keys.public_key).identifier(stream.id); + let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id); tags.push(Tag::parse(&[ "alt", &format!("Watch live on https://zap.stream/{}", coord.to_bech32()?), @@ -226,9 +226,10 @@ impl Overseer for ZapStreamOverseer { })); let user = self.db.get_user(uid).await?; + let stream_id = Uuid::new_v4(); // insert new stream record let mut new_stream = UserStream { - id: Uuid::new_v4(), + id: stream_id.to_string(), user_id: uid, starts: Utc::now(), state: UserStreamState::Live, @@ -238,8 +239,9 @@ impl Overseer for ZapStreamOverseer { new_stream.event = Some(stream_event.as_json()); self.db.insert_stream(&new_stream).await?; + self.db.update_stream(&new_stream).await?; Ok(PipelineConfig { - id: new_stream.id, + id: stream_id, variants, egress, }) @@ -291,4 +293,17 @@ impl Overseer for ZapStreamOverseer { // nothing to do Ok(()) } + + async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> { + let mut stream = self.db.get_stream(pipeline_id).await?; + let user = self.db.get_user(stream.user_id).await?; + + stream.state = UserStreamState::Ended; + let event = self.publish_stream_event(&stream, &user.pubkey).await?; + stream.event = Some(event.as_json()); + self.db.update_stream(&stream).await?; + + info!("Stream ended {}", stream.id); + Ok(()) + } } diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index e22b3d3..b87beac 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -3,6 +3,7 @@ use std::io::Read; use std::mem::transmute; use std::ops::Sub; use std::path::PathBuf; +use std::ptr; use std::sync::Arc; use std::time::Instant; @@ -19,7 +20,8 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVMediaType, + av_frame_free, av_get_sample_fmt, av_packet_free, av_pkt_dump_log2, av_q2d, av_rescale_q, + AVMediaType, }; use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, @@ -103,11 +105,28 @@ impl PipelineRunner { }) } + /// EOF, cleanup + unsafe fn flush(&mut self) -> Result<()> { + for (var, enc) in &mut self.encoders { + for mut pkt in enc.encode_frame(ptr::null_mut())? { + for eg in self.egress.iter_mut() { + eg.process_pkt(pkt, &var)?; + } + av_packet_free(&mut pkt); + } + } + for eg in self.egress.iter_mut() { + eg.reset()?; + } + Ok(()) + } + /// Main processor, should be called in a loop - pub unsafe fn run(&mut self) -> Result<()> { + /// Returns false when stream data ended (EOF) + pub unsafe fn run(&mut self) -> Result { self.setup()?; - let config = if let Some(ref config) = self.config { + let config = if let Some(config) = &self.config { config } else { bail!("Pipeline not configured, cannot run") @@ -115,14 +134,23 @@ impl PipelineRunner { // run transcoder pipeline let (mut pkt, stream) = self.demuxer.get_packet()?; - let src_index = (*stream).index; + if pkt.is_null() { + self.handle.block_on(async { + if let Err(e) = self.overseer.on_end(&config.id).await { + error!("Failed to end stream: {e}"); + } + }); + self.flush()?; + return Ok(false); + } // TODO: For copy streams, skip decoder - let frames = if let Ok(frames) = self.decoder.decode_pkt(pkt) { - frames - } else { - warn!("Error decoding frames"); - return Ok(()); + let frames = match self.decoder.decode_pkt(pkt) { + Ok(f) => f, + Err(e) => { + warn!("Error decoding frames, {e}"); + return Ok(true); + } }; let mut egress_results = vec![]; @@ -164,7 +192,7 @@ impl PipelineRunner { let pkt_vars = config .variants .iter() - .filter(|v| v.src_index() == src_index as usize); + .filter(|v| v.src_index() == (*stream).index as usize); for var in pkt_vars { let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) { enc @@ -172,6 +200,18 @@ impl PipelineRunner { //warn!("Frame had nowhere to go in {} :/", var.id()); continue; }; + // before encoding frame, rescale timestamps + if !frame.is_null() { + let enc_ctx = enc.codec_context(); + (*frame).pict_type = AV_PICTURE_TYPE_NONE; + (*frame).pts = + av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).pkt_dts = + av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).duration = + av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); + (*frame).time_base = (*enc_ctx).time_base; + } let mut new_frame = false; let mut frame = match var { @@ -192,8 +232,6 @@ impl PipelineRunner { f.buffer_frame(resampled_frame, frame_size as usize)? { av_frame_free(&mut resampled_frame); - // assume timebase of the encoder - //(*ret).time_base = (*enc.codec_context()).time_base; ret } else { av_frame_free(&mut resampled_frame); @@ -206,19 +244,6 @@ impl PipelineRunner { _ => frame, }; - // before encoding frame, rescale timestamps - if !frame.is_null() { - let enc_ctx = enc.codec_context(); - (*frame).pict_type = AV_PICTURE_TYPE_NONE; - (*frame).pts = - av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).pkt_dts = - av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).duration = - av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); - (*frame).time_base = (*enc_ctx).time_base; - } - let packets = enc.encode_frame(frame)?; // pass new packets to egress for mut pkt in packets { @@ -259,7 +284,7 @@ impl PipelineRunner { self.fps_counter_start = Instant::now(); self.frame_ctr = 0; } - Ok(()) + Ok(true) } unsafe fn setup(&mut self) -> Result<()> { diff --git a/zap-stream-db/Cargo.lock b/zap-stream-db/Cargo.lock index eaa455f..668ef6a 100644 --- a/zap-stream-db/Cargo.lock +++ b/zap-stream-db/Cargo.lock @@ -1274,7 +1274,6 @@ dependencies = [ "tokio-stream", "tracing", "url", - "uuid", ] [[package]] @@ -1356,7 +1355,6 @@ dependencies = [ "stringprep", "thiserror", "tracing", - "uuid", "whoami", ] @@ -1396,7 +1394,6 @@ dependencies = [ "stringprep", "thiserror", "tracing", - "uuid", "whoami", ] @@ -1422,7 +1419,6 @@ dependencies = [ "sqlx-core", "tracing", "url", - "uuid", ] [[package]] diff --git a/zap-stream-db/Cargo.toml b/zap-stream-db/Cargo.toml index a68f990..c8478dc 100644 --- a/zap-stream-db/Cargo.toml +++ b/zap-stream-db/Cargo.toml @@ -10,6 +10,6 @@ test-pattern = [] [dependencies] anyhow = "^1.0.70" chrono = { version = "0.4.38", features = ["serde"] } -sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono", "uuid"] } +sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono"] } log = "0.4.22" uuid = { version = "1.11.0", features = ["v4"] } \ No newline at end of file diff --git a/zap-stream-db/migrations/20241115120541_init.sql b/zap-stream-db/migrations/20241115120541_init.sql index 08c69a4..0ab994a 100644 --- a/zap-stream-db/migrations/20241115120541_init.sql +++ b/zap-stream-db/migrations/20241115120541_init.sql @@ -14,11 +14,11 @@ create table user create unique index ix_user_pubkey on user (pubkey); create table user_stream ( - id UUID not null primary key, + id varchar(50) not null primary key, user_id integer unsigned not null, - starts timestamp not null, + starts timestamp not null, ends timestamp, - state smallint not null, + state tinyint unsigned not null, title text, summary text, image text, @@ -28,9 +28,9 @@ create table user_stream goal text, pinned text, -- milli-sats paid for this stream - cost bigint not null default 0, + cost bigint unsigned not null default 0, -- duration in seconds - duration float not null default 0, + duration float not null default 0, -- admission fee fee integer unsigned, -- current nostr event json diff --git a/zap-stream-db/src/db.rs b/zap-stream-db/src/db.rs index 678490e..3639b1f 100644 --- a/zap-stream-db/src/db.rs +++ b/zap-stream-db/src/db.rs @@ -96,7 +96,7 @@ impl ZapStreamDb { pub async fn get_stream(&self, id: &Uuid) -> Result { Ok(sqlx::query_as("select * from user_stream where id = ?") - .bind(id) + .bind(id.to_string()) .fetch_one(&self.db) .await .map_err(anyhow::Error::new)?) diff --git a/zap-stream-db/src/model.rs b/zap-stream-db/src/model.rs index 6250149..8ba1c49 100644 --- a/zap-stream-db/src/model.rs +++ b/zap-stream-db/src/model.rs @@ -48,7 +48,7 @@ impl Display for UserStreamState { #[derive(Debug, Clone, Default, FromRow)] pub struct UserStream { - pub id: Uuid, + pub id: String, pub user_id: u64, pub starts: DateTime, pub ends: Option>,