diff --git a/Cargo.lock b/Cargo.lock index 59de8da..c32cfdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4793,6 +4793,7 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "log", + "m3u8-rs", "matchit 0.8.6", "nostr-sdk", "pretty_env_logger", diff --git a/Cargo.toml b/Cargo.toml index b0661ae..056c3e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,5 @@ serde = { version = "1.0.197", features = ["derive"] } url = "2.5.0" itertools = "0.14.0" chrono = { version = "^0.4.38", features = ["serde"] } -hex = "0.4.3" \ No newline at end of file +hex = "0.4.3" +m3u8-rs = "6.0.0" \ No newline at end of file diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 632033f..dce4db0 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -28,7 +28,7 @@ serde.workspace = true hex.workspace = true itertools.workspace = true futures-util = "0.3.30" -m3u8-rs = "6.0.0" +m3u8-rs.workspace = true # srt srt-tokio = { version = "0.4.3", optional = true } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 6ba68bf..38d95c7 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,3 +4,4 @@ pub mod mux; pub mod overseer; pub mod pipeline; pub mod variant; +pub mod viewer; diff --git a/crates/core/src/viewer.rs b/crates/core/src/viewer.rs new file mode 100644 index 0000000..52d28aa --- /dev/null +++ b/crates/core/src/viewer.rs @@ -0,0 +1,114 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use uuid::Uuid; +use tokio::task; +use log::debug; + +#[derive(Debug, Clone)] +pub struct ViewerInfo { + pub stream_id: String, + pub ip_address: String, + pub user_agent: Option, + pub last_seen: Instant, +} + +#[derive(Debug, Clone)] +pub struct ViewerTracker { + viewers: Arc>>, + timeout_duration: Duration, +} + +impl ViewerTracker { + pub fn new() -> Self { + let tracker = Self { + viewers: Arc::new(RwLock::new(HashMap::new())), + timeout_duration: Duration::from_secs(600), // 10 minutes + }; + + // Start cleanup task + let cleanup_tracker = tracker.clone(); + task::spawn(async move { + cleanup_tracker.cleanup_task().await; + }); + + tracker + } + + pub fn generate_viewer_token() -> String { + Uuid::new_v4().to_string() + } + + pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option) { + let mut viewers = self.viewers.write().unwrap(); + + let viewer_info = ViewerInfo { + stream_id: stream_id.to_string(), + ip_address: ip_address.to_string(), + user_agent, + last_seen: Instant::now(), + }; + + if let Some(existing) = viewers.get(token) { + debug!("Updating viewer {} for stream {}", token, stream_id); + } else { + debug!("New viewer {} for stream {}", token, stream_id); + } + + viewers.insert(token.to_string(), viewer_info); + } + + pub fn get_viewer_count(&self, stream_id: &str) -> usize { + let viewers = self.viewers.read().unwrap(); + viewers.values() + .filter(|v| v.stream_id == stream_id) + .count() + } + + pub fn get_active_viewers(&self, stream_id: &str) -> Vec { + let viewers = self.viewers.read().unwrap(); + viewers.iter() + .filter(|(_, v)| v.stream_id == stream_id) + .map(|(token, _)| token.clone()) + .collect() + } + + pub fn remove_viewer(&self, token: &str) { + let mut viewers = self.viewers.write().unwrap(); + if let Some(viewer) = viewers.remove(token) { + debug!("Removed viewer {} from stream {}", token, viewer.stream_id); + } + } + + async fn cleanup_task(&self) { + let mut interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute + + loop { + interval.tick().await; + self.cleanup_expired_viewers(); + } + } + + fn cleanup_expired_viewers(&self) { + let mut viewers = self.viewers.write().unwrap(); + let now = Instant::now(); + + let expired_tokens: Vec = viewers.iter() + .filter(|(_, viewer)| now.duration_since(viewer.last_seen) > self.timeout_duration) + .map(|(token, _)| token.clone()) + .collect(); + + for token in expired_tokens { + if let Some(viewer) = viewers.remove(&token) { + debug!("Expired viewer {} from stream {} (last seen {:?} ago)", + token, viewer.stream_id, now.duration_since(viewer.last_seen)); + } + } + } +} + +impl Default for ViewerTracker { + fn default() -> Self { + Self::new() + } +} \ No newline at end of file diff --git a/crates/zap-stream-db/src/model.rs b/crates/zap-stream-db/src/model.rs index e6cfaee..e0ab147 100644 --- a/crates/zap-stream-db/src/model.rs +++ b/crates/zap-stream-db/src/model.rs @@ -36,7 +36,7 @@ pub struct User { pub goal: Option, } -#[derive(Default, Debug, Clone, Type)] +#[derive(Default, Debug, Clone, PartialEq, Type)] #[repr(u8)] pub enum UserStreamState { #[default] diff --git a/crates/zap-stream/Cargo.toml b/crates/zap-stream/Cargo.toml index 0dde37f..32b241b 100644 --- a/crates/zap-stream/Cargo.toml +++ b/crates/zap-stream/Cargo.toml @@ -23,8 +23,9 @@ serde.workspace = true chrono.workspace = true hex.workspace = true url.workspace = true +m3u8-rs.workspace = true -# http setuff +# http stuff hyper = { version = "1.5.1", features = ["server"] } bytes = "1.8.0" http-body-util = "0.1.2" diff --git a/crates/zap-stream/src/api.rs b/crates/zap-stream/src/api.rs index 8bb9a08..2039295 100644 --- a/crates/zap-stream/src/api.rs +++ b/crates/zap-stream/src/api.rs @@ -648,6 +648,16 @@ impl Api { Ok(()) } + + /// Track a viewer for viewer count analytics + pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option) { + self.overseer.viewer_tracker().track_viewer(token, stream_id, ip_address, user_agent); + } + + /// Get current viewer count for a stream + pub fn get_viewer_count(&self, stream_id: &str) -> usize { + self.overseer.viewer_tracker().get_viewer_count(stream_id) + } } #[derive(Deserialize, Serialize)] diff --git a/crates/zap-stream/src/blossom.rs b/crates/zap-stream/src/blossom.rs index 26c629b..f962f50 100644 --- a/crates/zap-stream/src/blossom.rs +++ b/crates/zap-stream/src/blossom.rs @@ -11,6 +11,7 @@ use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use url::Url; +#[derive(Clone)] pub struct Blossom { url: Url, client: reqwest::Client, diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index 851665e..9de9383 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -16,6 +16,7 @@ use std::path::PathBuf; use std::pin::Pin; use tokio::fs::File; use tokio_util::io::ReaderStream; +use zap_stream_core::viewer::ViewerTracker; #[derive(Clone)] pub struct HttpServer { @@ -32,6 +33,123 @@ impl HttpServer { api, } } + + async fn handle_hls_playlist( + api: &Api, + req: &Request, + playlist_path: &PathBuf, + ) -> Result>, anyhow::Error> { + // Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid) + let path_parts: Vec<&str> = req.uri().path().trim_start_matches('/').split('/').collect(); + if path_parts.len() < 2 { + return Ok(Response::builder().status(404).body(BoxBody::default())?); + } + + let stream_id = path_parts[0]; + + // Get client IP and User-Agent for tracking + let client_ip = Self::get_client_ip(req); + let user_agent = req + .headers() + .get("user-agent") + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()); + + // Check for existing viewer token in query params + let query_params: std::collections::HashMap = req + .uri() + .query() + .map(|q| { + url::form_urlencoded::parse(q.as_bytes()) + .into_owned() + .collect() + }) + .unwrap_or_default(); + + let viewer_token = if let Some(token) = query_params.get("vt") { + // Track existing viewer + api.track_viewer(token, stream_id, &client_ip, user_agent.clone()); + token.clone() + } else { + // Generate new viewer token + let token = ViewerTracker::generate_viewer_token(); + api.track_viewer(&token, stream_id, &client_ip, user_agent); + token + }; + + // Read the playlist file + let playlist_content = tokio::fs::read(playlist_path).await?; + + // Parse and modify playlist to add viewer token to URLs + let modified_content = Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?; + + Ok(Response::builder() + .header("content-type", "application/vnd.apple.mpegurl") + .header("server", "zap-stream-core") + .header("access-control-allow-origin", "*") + .header("access-control-allow-headers", "*") + .header("access-control-allow-methods", "HEAD, GET") + .body( + Full::new(Bytes::from(modified_content)) + .map_err(|e| match e {}) + .boxed(), + )?) + } + + fn get_client_ip(req: &Request) -> String { + // Check common headers for real client IP + if let Some(forwarded) = req.headers().get("x-forwarded-for") { + if let Ok(forwarded_str) = forwarded.to_str() { + if let Some(first_ip) = forwarded_str.split(',').next() { + return first_ip.trim().to_string(); + } + } + } + + if let Some(real_ip) = req.headers().get("x-real-ip") { + if let Ok(ip_str) = real_ip.to_str() { + return ip_str.to_string(); + } + } + + // Fallback to connection IP (note: in real deployment this might be a proxy) + "unknown".to_string() + } + + fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result { + // Parse the M3U8 playlist using the m3u8-rs crate + let (_, playlist) = m3u8_rs::parse_playlist(content) + .map_err(|e| anyhow::anyhow!("Failed to parse M3U8 playlist: {}", e))?; + + match playlist { + m3u8_rs::Playlist::MasterPlaylist(mut master) => { + // For master playlists, add viewer token to variant streams + for variant in &mut master.variants { + variant.uri = Self::add_token_to_url(&variant.uri, viewer_token); + } + + // Write the modified playlist back to string + let mut output = Vec::new(); + master.write_to(&mut output) + .map_err(|e| anyhow::anyhow!("Failed to write master playlist: {}", e))?; + String::from_utf8(output) + .map_err(|e| anyhow::anyhow!("Failed to convert playlist to string: {}", e)) + } + m3u8_rs::Playlist::MediaPlaylist(_) => { + // For media playlists, return original content unchanged + String::from_utf8(content.to_vec()) + .map_err(|e| anyhow::anyhow!("Failed to convert playlist to string: {}", e)) + } + } + } + + fn add_token_to_url(url: &str, viewer_token: &str) -> String { + if url.contains('?') { + format!("{}&vt={}", url, viewer_token) + } else { + format!("{}?vt={}", url, viewer_token) + } + } } impl Service> for HttpServer { @@ -60,6 +178,7 @@ impl Service> for HttpServer { // check if mapped to file let dst_path = self.files_dir.join(req.uri().path()[1..].to_string()); if dst_path.exists() { + let api_clone = self.api.clone(); return Box::pin(async move { let rsp = Response::builder() .header("server", "zap-stream-core") @@ -70,6 +189,13 @@ impl Service> for HttpServer { if req.method() == Method::HEAD { return Ok(rsp.body(BoxBody::default())?); } + + // Handle HLS playlists with viewer tracking + if req.uri().path().ends_with("/live.m3u8") { + return Self::handle_hls_playlist(&api_clone, &req, &dst_path).await; + } + + // Handle regular files let f = File::open(&dst_path).await?; let f_stream = ReaderStream::new(f); let body = StreamBody::new( diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 51e472a..5c7424a 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -2,13 +2,13 @@ use crate::blossom::{BlobDescriptor, Blossom}; use crate::settings::LndSettings; use anyhow::{bail, Result}; use async_trait::async_trait; -use chrono::Utc; +use chrono::{DateTime, Utc}; use fedimint_tonic_lnd::verrpc::VersionRequest; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use log::{error, info, warn}; use nostr_sdk::prelude::Coordinate; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -23,11 +23,19 @@ use zap_stream_core::variant::audio::AudioVariant; use zap_stream_core::variant::mapping::VariantMapping; use zap_stream_core::variant::video::VideoVariant; use zap_stream_core::variant::{StreamMapping, VariantStream}; +use zap_stream_core::viewer::ViewerTracker; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; const STREAM_EVENT_KIND: u16 = 30_311; +#[derive(Clone)] +struct StreamViewerState { + last_published_count: usize, + last_update_time: DateTime, +} + /// zap.stream NIP-53 overseer +#[derive(Clone)] pub struct ZapStreamOverseer { /// Dir where HTTP server serves files from out_dir: String, @@ -46,6 +54,10 @@ pub struct ZapStreamOverseer { /// Currently active streams /// Any streams which are not contained in this set are dead active_streams: Arc>>, + /// Viewer tracking for active streams + viewer_tracker: ViewerTracker, + /// Track last published viewer count and update time for each stream + stream_viewer_states: Arc>>, } impl ZapStreamOverseer { @@ -95,7 +107,7 @@ impl ZapStreamOverseer { } client.connect().await; - Ok(Self { + let overseer = Self { out_dir: out_dir.clone(), db, lnd, @@ -109,7 +121,12 @@ impl ZapStreamOverseer { .collect(), public_url: public_url.clone(), active_streams: Arc::new(RwLock::new(HashSet::new())), - }) + viewer_tracker: ViewerTracker::new(), + stream_viewer_states: Arc::new(RwLock::new(HashMap::new())), + }; + + + Ok(overseer) } pub fn database(&self) -> ZapStreamDb { @@ -120,6 +137,11 @@ impl ZapStreamOverseer { self.lnd.clone() } + pub fn viewer_tracker(&self) -> &ViewerTracker { + &self.viewer_tracker + } + + fn stream_to_event_builder(&self, stream: &UserStream) -> Result { let mut tags = vec![ Tag::parse(&["d".to_string(), stream.id.to_string()])?, @@ -162,6 +184,15 @@ impl ZapStreamOverseer { } } + // Add current viewer count for live streams + if stream.state == UserStreamState::Live { + let viewer_count = self.viewer_tracker.get_viewer_count(&stream.id); + tags.push(Tag::parse(&[ + "current_participants".to_string(), + viewer_count.to_string(), + ])?); + } + let kind = Kind::from(STREAM_EVENT_KIND); let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id); tags.push(Tag::parse([ @@ -228,7 +259,7 @@ impl Overseer for ZapStreamOverseer { async fn check_streams(&self) -> Result<()> { let active_streams = self.db.list_live_streams().await?; for stream in active_streams { - // check + // check if stream is alive let id = Uuid::parse_str(&stream.id)?; info!("Checking stream is alive: {}", stream.id); let is_active = { @@ -239,6 +270,37 @@ impl Overseer for ZapStreamOverseer { if let Err(e) = self.on_end(&id).await { error!("Failed to end dead stream {}: {}", &id, e); } + } else { + // Stream is active, check if we should update viewer count in nostr event + let viewer_count = self.viewer_tracker.get_viewer_count(&stream.id); + let now = Utc::now(); + + let should_update = { + let viewer_states = self.stream_viewer_states.read().await; + if let Some(state) = viewer_states.get(&stream.id) { + // Update if count changed OR if 10 minutes have passed since last update + viewer_count != state.last_published_count || + (now - state.last_update_time).num_minutes() >= 10 + } else { + // First time tracking this stream, always update + viewer_count > 0 + } + }; + + if should_update && viewer_count > 0 { + if let Ok(user) = self.db.get_user(stream.user_id).await { + if let Ok(_) = self.publish_stream_event(&stream, &user.pubkey).await { + // Update the tracking state + let mut viewer_states = self.stream_viewer_states.write().await; + viewer_states.insert(stream.id.clone(), StreamViewerState { + last_published_count: viewer_count, + last_update_time: now, + }); + } else { + warn!("Failed to update viewer count for stream {}", stream.id); + } + } + } } } Ok(()) @@ -402,6 +464,10 @@ impl Overseer for ZapStreamOverseer { let mut streams = self.active_streams.write().await; streams.remove(pipeline_id); + // Clean up viewer tracking state for this stream + let mut viewer_states = self.stream_viewer_states.write().await; + viewer_states.remove(&stream.id); + stream.state = UserStreamState::Ended; let event = self.publish_stream_event(&stream, &user.pubkey).await?; stream.event = Some(event.as_json());