fix: cleanup dead streams

This commit is contained in:
2025-06-06 18:41:03 +01:00
parent 29919c63f5
commit d60dfd790e
2 changed files with 67 additions and 21 deletions

View File

@ -173,7 +173,7 @@ impl HlsVariant {
id: s.id(),
})
},
_ => panic!("unsupported variant stream"),
_ => bail!("unsupported variant stream"),
}
}
unsafe {

View File

@ -8,7 +8,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use log::{error, info, warn};
use nostr_sdk::prelude::Coordinate;
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
@ -34,6 +34,12 @@ struct StreamViewerState {
last_update_time: DateTime<Utc>,
}
#[derive(Clone)]
struct ActiveStreamInfo {
started_at: DateTime<Utc>,
last_segment_time: Option<DateTime<Utc>>,
}
/// zap.stream NIP-53 overseer
#[derive(Clone)]
pub struct ZapStreamOverseer {
@ -51,9 +57,9 @@ pub struct ZapStreamOverseer {
blossom_servers: Vec<Blossom>,
/// Public facing URL pointing to [out_dir]
public_url: String,
/// Currently active streams
/// Any streams which are not contained in this set are dead
active_streams: Arc<RwLock<HashSet<Uuid>>>,
/// Currently active streams with timing info
/// Any streams which are not contained in this map are dead
active_streams: Arc<RwLock<HashMap<Uuid, ActiveStreamInfo>>>,
/// Viewer tracking for active streams
viewer_tracker: ViewerTracker,
/// Track last published viewer count and update time for each stream
@ -120,12 +126,11 @@ impl ZapStreamOverseer {
.map(|b| Blossom::new(b))
.collect(),
public_url: public_url.clone(),
active_streams: Arc::new(RwLock::new(HashSet::new())),
active_streams: Arc::new(RwLock::new(HashMap::new())),
viewer_tracker: ViewerTracker::new(),
stream_viewer_states: Arc::new(RwLock::new(HashMap::new())),
};
Ok(overseer)
}
@ -141,7 +146,6 @@ impl ZapStreamOverseer {
&self.viewer_tracker
}
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
let mut tags = vec![
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
@ -222,7 +226,11 @@ impl ZapStreamOverseer {
Ok(EventBuilder::new(Kind::FileMetadata, "").tags(tags))
}
pub async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
pub async fn publish_stream_event(
&self,
stream: &UserStream,
pubkey: &Vec<u8>,
) -> Result<Event> {
let extra_tags = vec![
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse([
@ -258,15 +266,34 @@ impl ZapStreamOverseer {
impl Overseer for ZapStreamOverseer {
async fn check_streams(&self) -> Result<()> {
let active_streams = self.db.list_live_streams().await?;
let now = Utc::now();
for stream in active_streams {
// check if stream is alive
let id = Uuid::parse_str(&stream.id)?;
info!("Checking stream is alive: {}", stream.id);
let is_active = {
let (is_active, should_timeout) = {
let streams = self.active_streams.read().await;
streams.contains(&id)
if let Some(stream_info) = streams.get(&id) {
// Stream is in active map, but check if it's been inactive too long
let timeout = if let Some(last_segment) = stream_info.last_segment_time {
// No segments for 60 seconds = timeout
(now - last_segment).num_seconds() > 60
} else {
// No segments yet, but allow 30 seconds for stream to start producing
(now - stream_info.started_at).num_seconds() > 30
};
(true, timeout)
} else {
(false, false)
}
};
if !is_active {
if !is_active || should_timeout {
if should_timeout {
warn!("Stream {} timed out - no recent segments", stream.id);
}
if let Err(e) = self.on_end(&id).await {
error!("Failed to end dead stream {}: {}", &id, e);
}
@ -279,8 +306,8 @@ impl Overseer for ZapStreamOverseer {
let viewer_states = self.stream_viewer_states.read().await;
if let Some(state) = viewer_states.get(&stream.id) {
// Update if count changed OR if 10 minutes have passed since last update
viewer_count != state.last_published_count ||
(now - state.last_update_time).num_minutes() >= 10
viewer_count != state.last_published_count
|| (now - state.last_update_time).num_minutes() >= 10
} else {
// First time tracking this stream, always update
viewer_count > 0
@ -292,10 +319,13 @@ impl Overseer for ZapStreamOverseer {
if let Ok(_) = self.publish_stream_event(&stream, &user.pubkey).await {
// Update the tracking state
let mut viewer_states = self.stream_viewer_states.write().await;
viewer_states.insert(stream.id.clone(), StreamViewerState {
last_published_count: viewer_count,
last_update_time: now,
});
viewer_states.insert(
stream.id.clone(),
StreamViewerState {
last_published_count: viewer_count,
last_update_time: now,
},
);
} else {
warn!("Failed to update viewer count for stream {}", stream.id);
}
@ -355,8 +385,15 @@ impl Overseer for ZapStreamOverseer {
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
new_stream.event = Some(stream_event.as_json());
let now = Utc::now();
let mut streams = self.active_streams.write().await;
streams.insert(stream_id.clone());
streams.insert(
stream_id.clone(),
ActiveStreamInfo {
started_at: now,
last_segment_time: None,
},
);
self.db.insert_stream(&new_stream).await?;
self.db.update_stream(&new_stream).await?;
@ -400,6 +437,15 @@ impl Overseer for ZapStreamOverseer {
bail!("Not enough balance");
}
// Update last segment time for this stream
let now = Utc::now();
{
let mut streams = self.active_streams.write().await;
if let Some(info) = streams.get_mut(pipeline_id) {
info.last_segment_time = Some(now);
}
}
// Upload to blossom servers if configured (N94)
let mut blobs = vec![];
for seg in added {