diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index c8feb10..6e74667 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -173,7 +173,7 @@ impl HlsVariant { id: s.id(), }) }, - _ => panic!("unsupported variant stream"), + _ => bail!("unsupported variant stream"), } } unsafe { diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 5c7424a..e1fa7b0 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -8,7 +8,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use log::{error, info, warn}; use nostr_sdk::prelude::Coordinate; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -34,6 +34,12 @@ struct StreamViewerState { last_update_time: DateTime, } +#[derive(Clone)] +struct ActiveStreamInfo { + started_at: DateTime, + last_segment_time: Option>, +} + /// zap.stream NIP-53 overseer #[derive(Clone)] pub struct ZapStreamOverseer { @@ -51,9 +57,9 @@ pub struct ZapStreamOverseer { blossom_servers: Vec, /// Public facing URL pointing to [out_dir] public_url: String, - /// Currently active streams - /// Any streams which are not contained in this set are dead - active_streams: Arc>>, + /// Currently active streams with timing info + /// Any streams which are not contained in this map are dead + active_streams: Arc>>, /// Viewer tracking for active streams viewer_tracker: ViewerTracker, /// Track last published viewer count and update time for each stream @@ -120,12 +126,11 @@ impl ZapStreamOverseer { .map(|b| Blossom::new(b)) .collect(), public_url: public_url.clone(), - active_streams: Arc::new(RwLock::new(HashSet::new())), + active_streams: Arc::new(RwLock::new(HashMap::new())), viewer_tracker: ViewerTracker::new(), stream_viewer_states: Arc::new(RwLock::new(HashMap::new())), }; - Ok(overseer) } @@ -141,7 +146,6 @@ impl ZapStreamOverseer { &self.viewer_tracker } - fn stream_to_event_builder(&self, stream: &UserStream) -> Result { let mut tags = vec![ Tag::parse(&["d".to_string(), stream.id.to_string()])?, @@ -222,7 +226,11 @@ impl ZapStreamOverseer { Ok(EventBuilder::new(Kind::FileMetadata, "").tags(tags)) } - pub async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec) -> Result { + pub async fn publish_stream_event( + &self, + stream: &UserStream, + pubkey: &Vec, + ) -> Result { let extra_tags = vec![ Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse([ @@ -258,15 +266,34 @@ impl ZapStreamOverseer { impl Overseer for ZapStreamOverseer { async fn check_streams(&self) -> Result<()> { let active_streams = self.db.list_live_streams().await?; + let now = Utc::now(); + for stream in active_streams { // check if stream is alive let id = Uuid::parse_str(&stream.id)?; info!("Checking stream is alive: {}", stream.id); - let is_active = { + + let (is_active, should_timeout) = { let streams = self.active_streams.read().await; - streams.contains(&id) + if let Some(stream_info) = streams.get(&id) { + // Stream is in active map, but check if it's been inactive too long + let timeout = if let Some(last_segment) = stream_info.last_segment_time { + // No segments for 60 seconds = timeout + (now - last_segment).num_seconds() > 60 + } else { + // No segments yet, but allow 30 seconds for stream to start producing + (now - stream_info.started_at).num_seconds() > 30 + }; + (true, timeout) + } else { + (false, false) + } }; - if !is_active { + + if !is_active || should_timeout { + if should_timeout { + warn!("Stream {} timed out - no recent segments", stream.id); + } if let Err(e) = self.on_end(&id).await { error!("Failed to end dead stream {}: {}", &id, e); } @@ -274,28 +301,31 @@ impl Overseer for ZapStreamOverseer { // Stream is active, check if we should update viewer count in nostr event let viewer_count = self.viewer_tracker.get_viewer_count(&stream.id); let now = Utc::now(); - + let should_update = { let viewer_states = self.stream_viewer_states.read().await; if let Some(state) = viewer_states.get(&stream.id) { // Update if count changed OR if 10 minutes have passed since last update - viewer_count != state.last_published_count || - (now - state.last_update_time).num_minutes() >= 10 + viewer_count != state.last_published_count + || (now - state.last_update_time).num_minutes() >= 10 } else { // First time tracking this stream, always update viewer_count > 0 } }; - + if should_update && viewer_count > 0 { if let Ok(user) = self.db.get_user(stream.user_id).await { if let Ok(_) = self.publish_stream_event(&stream, &user.pubkey).await { // Update the tracking state let mut viewer_states = self.stream_viewer_states.write().await; - viewer_states.insert(stream.id.clone(), StreamViewerState { - last_published_count: viewer_count, - last_update_time: now, - }); + viewer_states.insert( + stream.id.clone(), + StreamViewerState { + last_published_count: viewer_count, + last_update_time: now, + }, + ); } else { warn!("Failed to update viewer count for stream {}", stream.id); } @@ -355,8 +385,15 @@ impl Overseer for ZapStreamOverseer { let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; new_stream.event = Some(stream_event.as_json()); + let now = Utc::now(); let mut streams = self.active_streams.write().await; - streams.insert(stream_id.clone()); + streams.insert( + stream_id.clone(), + ActiveStreamInfo { + started_at: now, + last_segment_time: None, + }, + ); self.db.insert_stream(&new_stream).await?; self.db.update_stream(&new_stream).await?; @@ -400,6 +437,15 @@ impl Overseer for ZapStreamOverseer { bail!("Not enough balance"); } + // Update last segment time for this stream + let now = Utc::now(); + { + let mut streams = self.active_streams.write().await; + if let Some(info) = streams.get_mut(pipeline_id) { + info.last_segment_time = Some(now); + } + } + // Upload to blossom servers if configured (N94) let mut blobs = vec![]; for seg in added {