From 56f72b129d532cfb672048827766677a238dde1e Mon Sep 17 00:00:00 2001 From: Kieran Date: Fri, 6 Jun 2025 15:19:36 +0100 Subject: [PATCH] fix: try improve hls playback --- Cargo.toml | 6 ++++ crates/core/src/ingress/rtmp.rs | 22 +++++++++++--- crates/core/src/mux/hls.rs | 20 +++++++------ crates/core/src/pipeline/runner.rs | 46 +++++++++++++++++++----------- crates/zap-stream/src/api.rs | 41 ++++++++++++++++++++++---- crates/zap-stream/src/main.rs | 2 +- crates/zap-stream/src/overseer.rs | 2 +- 7 files changed, 102 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a698ccc..b0661ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,12 @@ members = [ "crates/zap-stream-db" ] +[profile.release] +opt-level = 3 +lto = true +codegen-units = 1 +panic = "abort" + [workspace.dependencies] ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "29ab0547478256c574766b4acc6fcda8ebf4cae6" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/crates/core/src/ingress/rtmp.rs b/crates/core/src/ingress/rtmp.rs index 5449639..a94cfc5 100644 --- a/crates/core/src/ingress/rtmp.rs +++ b/crates/core/src/ingress/rtmp.rs @@ -113,8 +113,12 @@ impl RtmpClient { } ServerSessionResult::RaisedEvent(ev) => self.handle_event(ev)?, ServerSessionResult::UnhandleableMessageReceived(m) => { - // treat any non-flv streams as raw media stream in rtmp - self.media_buf.extend(&m.data); + // Log unhandleable messages for debugging + error!("Received unhandleable message with {} bytes", m.data.len()); + // Only append data if it looks like valid media data + if !m.data.is_empty() && m.data.len() > 4 { + self.media_buf.extend(&m.data); + } } } } @@ -164,10 +168,20 @@ impl RtmpClient { ); } ServerSessionEvent::AudioDataReceived { data, .. } => { - self.media_buf.extend(data); + // Validate audio data before adding to buffer + if !data.is_empty() { + self.media_buf.extend(data); + } else { + error!("Received empty audio data"); + } } ServerSessionEvent::VideoDataReceived { data, .. } => { - self.media_buf.extend(data); + // Validate video data before adding to buffer + if !data.is_empty() { + self.media_buf.extend(data); + } else { + error!("Received empty video data"); + } } ServerSessionEvent::UnhandleableAmf0Command { .. } => {} ServerSessionEvent::PlayStreamRequested { request_id, .. } => { diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index 002d115..4cddca4 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -186,11 +186,7 @@ impl HlsVariant { streams, idx: 1, pkt_start: 0.0, - segments: Vec::from([SegmentInfo { - index: 1, - duration: segment_length, - kind: segment_type, - }]), + segments: Vec::new(), // Start with empty segments list out_dir: out_dir.to_string(), segment_type, }) @@ -220,8 +216,9 @@ impl HlsVariant { let pkt_q = av_q2d((*pkt).time_base); // time of this packet in seconds let pkt_time = (*pkt).pts as f32 * pkt_q as f32; - // what segment this pkt should be in (index) - let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64; + // what segment this pkt should be in (index) - use relative time from start + let relative_time = pkt_time - self.pkt_start; + let pkt_seg = self.idx + (relative_time / self.segment_length).floor() as u64; let mut result = EgressResult::None; let pkt_stream = *(*self.mux.context()) @@ -361,11 +358,18 @@ impl HlsVariant { } fn write_playlist(&mut self) -> Result<()> { + if self.segments.is_empty() { + return Ok(()); // Don't write empty playlists + } + let mut pl = m3u8_rs::MediaPlaylist::default(); - pl.target_duration = self.segment_length as u64; + // Round up target duration to ensure compliance + pl.target_duration = (self.segment_length.ceil() as u64).max(1); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); pl.version = Some(3); pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0); + // For live streams, don't set end list + pl.end_list = false; let mut f_out = File::create(self.out_dir().join("live.m3u8"))?; pl.write_to(&mut f_out)?; diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index afb2059..a5ce9fc 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -77,6 +77,9 @@ pub struct PipelineRunner { /// Total number of frames produced frame_ctr: u64, out_dir: String, + + /// Thumbnail generation interval (0 = disabled) + thumb_interval: u64, } impl PipelineRunner { @@ -104,6 +107,7 @@ impl PipelineRunner { frame_ctr: 0, fps_last_frame_ctr: 0, info: None, + thumb_interval: 1800, // Disable thumbnails by default for performance }) } @@ -165,7 +169,9 @@ impl PipelineRunner { let p = (*stream).codecpar; if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO { - if (self.frame_ctr % 1800) == 0 { + // Conditionally generate thumbnails based on interval (0 = disabled) + if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { + let thumb_start = Instant::now(); let dst_pic = PathBuf::from(&self.out_dir) .join(config.id.to_string()) .join("thumb.webp"); @@ -182,11 +188,15 @@ impl PipelineRunner { .with_pix_fmt(transmute((*frame).format)) .open(None)? .save_picture(frame, dst_pic.to_str().unwrap())?; - info!("Saved thumb to: {}", dst_pic.display()); + let thumb_duration = thumb_start.elapsed(); + info!( + "Saved thumb ({:.2}ms) to: {}", + thumb_duration.as_millis() as f32 / 1000.0, + dst_pic.display(), + ); av_frame_free(&mut frame); } - // TODO: fix this, multiple video streams in self.frame_ctr += 1; } @@ -224,7 +234,7 @@ impl PipelineRunner { { // Set correct timebase for audio (1/sample_rate) (*ret).time_base.num = 1; - (*ret).time_base.den = a.sample_rate as i32; + (*ret).time_base.den = a.sample_rate as i32; av_frame_free(&mut resampled_frame); ret } else { @@ -271,21 +281,23 @@ impl PipelineRunner { av_packet_free(&mut pkt); - // egress results - self.handle.block_on(async { - for er in egress_results { - if let EgressResult::Segments { created, deleted } = er { - if let Err(e) = self - .overseer - .on_segments(&config.id, &created, &deleted) - .await - { - bail!("Failed to process segment {}", e.to_string()); + // egress results - process async operations without blocking if possible + if !egress_results.is_empty() { + self.handle.block_on(async { + for er in egress_results { + if let EgressResult::Segments { created, deleted } = er { + if let Err(e) = self + .overseer + .on_segments(&config.id, &created, &deleted) + .await + { + bail!("Failed to process segment {}", e.to_string()); + } } } - } - Ok(()) - })?; + Ok(()) + })?; + } let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); if elapsed >= 2f32 { let n_frames = self.frame_ctr - self.fps_last_frame_ctr; diff --git a/crates/zap-stream/src/api.rs b/crates/zap-stream/src/api.rs index 54ffe05..8bb9a08 100644 --- a/crates/zap-stream/src/api.rs +++ b/crates/zap-stream/src/api.rs @@ -1,4 +1,5 @@ use crate::http::check_nip98_auth; +use crate::overseer::ZapStreamOverseer; use crate::settings::Settings; use crate::ListenerEndpoint; use anyhow::{anyhow, bail, Result}; @@ -8,14 +9,16 @@ use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; use hyper::body::Incoming; use hyper::{Method, Request, Response}; +use log::warn; use matchit::Router; -use nostr_sdk::{serde_json, PublicKey}; +use nostr_sdk::{serde_json, JsonUtil, PublicKey}; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::str::FromStr; +use std::sync::Arc; use url::Url; use uuid::Uuid; -use zap_stream_db::ZapStreamDb; +use zap_stream_db::{UserStream, ZapStreamDb}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Route { @@ -35,10 +38,11 @@ pub struct Api { settings: Settings, lnd: fedimint_tonic_lnd::Client, router: Router, + overseer: Arc, } impl Api { - pub fn new(db: ZapStreamDb, settings: Settings, lnd: fedimint_tonic_lnd::Client) -> Self { + pub fn new(overseer: Arc, settings: Settings) -> Self { let mut router = Router::new(); // Define routes (path only, method will be matched separately) @@ -54,10 +58,11 @@ impl Api { router.insert("/api/v1/keys", Route::Keys).unwrap(); Self { - db, + db: overseer.database(), settings, - lnd, + lnd: overseer.lnd_client(), router, + overseer, } } @@ -410,7 +415,16 @@ impl Api { self.db.update_stream(&stream).await?; - // TODO: Update the nostr event and republish like C# version + // Update the nostr event and republish like C# version + if let Err(e) = self + .republish_stream_event(&stream, pubkey.to_bytes()) + .await + { + warn!( + "Failed to republish nostr event for stream {}: {}", + stream.id, e + ); + } } else { // Update user default stream info self.db @@ -619,6 +633,21 @@ impl Api { event: None, // TODO: Build proper nostr event like C# version }) } + + /// Republish stream event to nostr relays using the same code as overseer + async fn republish_stream_event(&self, stream: &UserStream, pubkey: [u8; 32]) -> Result<()> { + let event = self + .overseer + .publish_stream_event(stream, &pubkey.to_vec()) + .await?; + + // Update the stream with the new event JSON + let mut updated_stream = stream.clone(); + updated_stream.event = Some(event.as_json()); + self.db.update_stream(&updated_stream).await?; + + Ok(()) + } } #[derive(Deserialize, Serialize)] diff --git a/crates/zap-stream/src/main.rs b/crates/zap-stream/src/main.rs index 1670669..2c3921b 100644 --- a/crates/zap-stream/src/main.rs +++ b/crates/zap-stream/src/main.rs @@ -71,7 +71,7 @@ async fn main() -> Result<()> { let http_addr: SocketAddr = settings.listen_http.parse()?; let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); - let api = Api::new(overseer.database(), settings.clone(), overseer.lnd_client()); + let api = Api::new(overseer.clone(), settings.clone()); // HTTP server let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api); tasks.push(tokio::spawn(async move { diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 1e07360..51e472a 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -191,7 +191,7 @@ impl ZapStreamOverseer { Ok(EventBuilder::new(Kind::FileMetadata, "").tags(tags)) } - async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec) -> Result { + pub async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec) -> Result { let extra_tags = vec![ Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse([