mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-17 01:18:50 +00:00
feat: track viewers
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -4793,6 +4793,7 @@ dependencies = [
|
|||||||
"hyper 1.6.0",
|
"hyper 1.6.0",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"log",
|
"log",
|
||||||
|
"m3u8-rs",
|
||||||
"matchit 0.8.6",
|
"matchit 0.8.6",
|
||||||
"nostr-sdk",
|
"nostr-sdk",
|
||||||
"pretty_env_logger",
|
"pretty_env_logger",
|
||||||
|
@ -24,3 +24,4 @@ url = "2.5.0"
|
|||||||
itertools = "0.14.0"
|
itertools = "0.14.0"
|
||||||
chrono = { version = "^0.4.38", features = ["serde"] }
|
chrono = { version = "^0.4.38", features = ["serde"] }
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
|
m3u8-rs = "6.0.0"
|
@ -28,7 +28,7 @@ serde.workspace = true
|
|||||||
hex.workspace = true
|
hex.workspace = true
|
||||||
itertools.workspace = true
|
itertools.workspace = true
|
||||||
futures-util = "0.3.30"
|
futures-util = "0.3.30"
|
||||||
m3u8-rs = "6.0.0"
|
m3u8-rs.workspace = true
|
||||||
|
|
||||||
# srt
|
# srt
|
||||||
srt-tokio = { version = "0.4.3", optional = true }
|
srt-tokio = { version = "0.4.3", optional = true }
|
||||||
|
@ -4,3 +4,4 @@ pub mod mux;
|
|||||||
pub mod overseer;
|
pub mod overseer;
|
||||||
pub mod pipeline;
|
pub mod pipeline;
|
||||||
pub mod variant;
|
pub mod variant;
|
||||||
|
pub mod viewer;
|
||||||
|
114
crates/core/src/viewer.rs
Normal file
114
crates/core/src/viewer.rs
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use uuid::Uuid;
|
||||||
|
use tokio::task;
|
||||||
|
use log::debug;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ViewerInfo {
|
||||||
|
pub stream_id: String,
|
||||||
|
pub ip_address: String,
|
||||||
|
pub user_agent: Option<String>,
|
||||||
|
pub last_seen: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ViewerTracker {
|
||||||
|
viewers: Arc<RwLock<HashMap<String, ViewerInfo>>>,
|
||||||
|
timeout_duration: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ViewerTracker {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let tracker = Self {
|
||||||
|
viewers: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
timeout_duration: Duration::from_secs(600), // 10 minutes
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start cleanup task
|
||||||
|
let cleanup_tracker = tracker.clone();
|
||||||
|
task::spawn(async move {
|
||||||
|
cleanup_tracker.cleanup_task().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
tracker
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn generate_viewer_token() -> String {
|
||||||
|
Uuid::new_v4().to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option<String>) {
|
||||||
|
let mut viewers = self.viewers.write().unwrap();
|
||||||
|
|
||||||
|
let viewer_info = ViewerInfo {
|
||||||
|
stream_id: stream_id.to_string(),
|
||||||
|
ip_address: ip_address.to_string(),
|
||||||
|
user_agent,
|
||||||
|
last_seen: Instant::now(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(existing) = viewers.get(token) {
|
||||||
|
debug!("Updating viewer {} for stream {}", token, stream_id);
|
||||||
|
} else {
|
||||||
|
debug!("New viewer {} for stream {}", token, stream_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
viewers.insert(token.to_string(), viewer_info);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_viewer_count(&self, stream_id: &str) -> usize {
|
||||||
|
let viewers = self.viewers.read().unwrap();
|
||||||
|
viewers.values()
|
||||||
|
.filter(|v| v.stream_id == stream_id)
|
||||||
|
.count()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_active_viewers(&self, stream_id: &str) -> Vec<String> {
|
||||||
|
let viewers = self.viewers.read().unwrap();
|
||||||
|
viewers.iter()
|
||||||
|
.filter(|(_, v)| v.stream_id == stream_id)
|
||||||
|
.map(|(token, _)| token.clone())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_viewer(&self, token: &str) {
|
||||||
|
let mut viewers = self.viewers.write().unwrap();
|
||||||
|
if let Some(viewer) = viewers.remove(token) {
|
||||||
|
debug!("Removed viewer {} from stream {}", token, viewer.stream_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn cleanup_task(&self) {
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
self.cleanup_expired_viewers();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cleanup_expired_viewers(&self) {
|
||||||
|
let mut viewers = self.viewers.write().unwrap();
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
let expired_tokens: Vec<String> = viewers.iter()
|
||||||
|
.filter(|(_, viewer)| now.duration_since(viewer.last_seen) > self.timeout_duration)
|
||||||
|
.map(|(token, _)| token.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for token in expired_tokens {
|
||||||
|
if let Some(viewer) = viewers.remove(&token) {
|
||||||
|
debug!("Expired viewer {} from stream {} (last seen {:?} ago)",
|
||||||
|
token, viewer.stream_id, now.duration_since(viewer.last_seen));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ViewerTracker {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
@ -36,7 +36,7 @@ pub struct User {
|
|||||||
pub goal: Option<String>,
|
pub goal: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug, Clone, Type)]
|
#[derive(Default, Debug, Clone, PartialEq, Type)]
|
||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
pub enum UserStreamState {
|
pub enum UserStreamState {
|
||||||
#[default]
|
#[default]
|
||||||
|
@ -23,8 +23,9 @@ serde.workspace = true
|
|||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
hex.workspace = true
|
hex.workspace = true
|
||||||
url.workspace = true
|
url.workspace = true
|
||||||
|
m3u8-rs.workspace = true
|
||||||
|
|
||||||
# http setuff
|
# http stuff
|
||||||
hyper = { version = "1.5.1", features = ["server"] }
|
hyper = { version = "1.5.1", features = ["server"] }
|
||||||
bytes = "1.8.0"
|
bytes = "1.8.0"
|
||||||
http-body-util = "0.1.2"
|
http-body-util = "0.1.2"
|
||||||
|
@ -648,6 +648,16 @@ impl Api {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Track a viewer for viewer count analytics
|
||||||
|
pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option<String>) {
|
||||||
|
self.overseer.viewer_tracker().track_viewer(token, stream_id, ip_address, user_agent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get current viewer count for a stream
|
||||||
|
pub fn get_viewer_count(&self, stream_id: &str) -> usize {
|
||||||
|
self.overseer.viewer_tracker().get_viewer_count(stream_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
#[derive(Deserialize, Serialize)]
|
||||||
|
@ -11,6 +11,7 @@ use tokio::fs::File;
|
|||||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Blossom {
|
pub struct Blossom {
|
||||||
url: Url,
|
url: Url,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
|
@ -16,6 +16,7 @@ use std::path::PathBuf;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
|
use zap_stream_core::viewer::ViewerTracker;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct HttpServer {
|
pub struct HttpServer {
|
||||||
@ -32,6 +33,123 @@ impl HttpServer {
|
|||||||
api,
|
api,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_hls_playlist(
|
||||||
|
api: &Api,
|
||||||
|
req: &Request<Incoming>,
|
||||||
|
playlist_path: &PathBuf,
|
||||||
|
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||||
|
// Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid)
|
||||||
|
let path_parts: Vec<&str> = req.uri().path().trim_start_matches('/').split('/').collect();
|
||||||
|
if path_parts.len() < 2 {
|
||||||
|
return Ok(Response::builder().status(404).body(BoxBody::default())?);
|
||||||
|
}
|
||||||
|
|
||||||
|
let stream_id = path_parts[0];
|
||||||
|
|
||||||
|
// Get client IP and User-Agent for tracking
|
||||||
|
let client_ip = Self::get_client_ip(req);
|
||||||
|
let user_agent = req
|
||||||
|
.headers()
|
||||||
|
.get("user-agent")
|
||||||
|
.and_then(|h| h.to_str().ok())
|
||||||
|
.map(|s| s.to_string());
|
||||||
|
|
||||||
|
// Check for existing viewer token in query params
|
||||||
|
let query_params: std::collections::HashMap<String, String> = req
|
||||||
|
.uri()
|
||||||
|
.query()
|
||||||
|
.map(|q| {
|
||||||
|
url::form_urlencoded::parse(q.as_bytes())
|
||||||
|
.into_owned()
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let viewer_token = if let Some(token) = query_params.get("vt") {
|
||||||
|
// Track existing viewer
|
||||||
|
api.track_viewer(token, stream_id, &client_ip, user_agent.clone());
|
||||||
|
token.clone()
|
||||||
|
} else {
|
||||||
|
// Generate new viewer token
|
||||||
|
let token = ViewerTracker::generate_viewer_token();
|
||||||
|
api.track_viewer(&token, stream_id, &client_ip, user_agent);
|
||||||
|
token
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read the playlist file
|
||||||
|
let playlist_content = tokio::fs::read(playlist_path).await?;
|
||||||
|
|
||||||
|
// Parse and modify playlist to add viewer token to URLs
|
||||||
|
let modified_content = Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?;
|
||||||
|
|
||||||
|
Ok(Response::builder()
|
||||||
|
.header("content-type", "application/vnd.apple.mpegurl")
|
||||||
|
.header("server", "zap-stream-core")
|
||||||
|
.header("access-control-allow-origin", "*")
|
||||||
|
.header("access-control-allow-headers", "*")
|
||||||
|
.header("access-control-allow-methods", "HEAD, GET")
|
||||||
|
.body(
|
||||||
|
Full::new(Bytes::from(modified_content))
|
||||||
|
.map_err(|e| match e {})
|
||||||
|
.boxed(),
|
||||||
|
)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_client_ip(req: &Request<Incoming>) -> String {
|
||||||
|
// Check common headers for real client IP
|
||||||
|
if let Some(forwarded) = req.headers().get("x-forwarded-for") {
|
||||||
|
if let Ok(forwarded_str) = forwarded.to_str() {
|
||||||
|
if let Some(first_ip) = forwarded_str.split(',').next() {
|
||||||
|
return first_ip.trim().to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(real_ip) = req.headers().get("x-real-ip") {
|
||||||
|
if let Ok(ip_str) = real_ip.to_str() {
|
||||||
|
return ip_str.to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback to connection IP (note: in real deployment this might be a proxy)
|
||||||
|
"unknown".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result<String> {
|
||||||
|
// Parse the M3U8 playlist using the m3u8-rs crate
|
||||||
|
let (_, playlist) = m3u8_rs::parse_playlist(content)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to parse M3U8 playlist: {}", e))?;
|
||||||
|
|
||||||
|
match playlist {
|
||||||
|
m3u8_rs::Playlist::MasterPlaylist(mut master) => {
|
||||||
|
// For master playlists, add viewer token to variant streams
|
||||||
|
for variant in &mut master.variants {
|
||||||
|
variant.uri = Self::add_token_to_url(&variant.uri, viewer_token);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the modified playlist back to string
|
||||||
|
let mut output = Vec::new();
|
||||||
|
master.write_to(&mut output)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to write master playlist: {}", e))?;
|
||||||
|
String::from_utf8(output)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to convert playlist to string: {}", e))
|
||||||
|
}
|
||||||
|
m3u8_rs::Playlist::MediaPlaylist(_) => {
|
||||||
|
// For media playlists, return original content unchanged
|
||||||
|
String::from_utf8(content.to_vec())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to convert playlist to string: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_token_to_url(url: &str, viewer_token: &str) -> String {
|
||||||
|
if url.contains('?') {
|
||||||
|
format!("{}&vt={}", url, viewer_token)
|
||||||
|
} else {
|
||||||
|
format!("{}?vt={}", url, viewer_token)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service<Request<Incoming>> for HttpServer {
|
impl Service<Request<Incoming>> for HttpServer {
|
||||||
@ -60,6 +178,7 @@ impl Service<Request<Incoming>> for HttpServer {
|
|||||||
// check if mapped to file
|
// check if mapped to file
|
||||||
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
||||||
if dst_path.exists() {
|
if dst_path.exists() {
|
||||||
|
let api_clone = self.api.clone();
|
||||||
return Box::pin(async move {
|
return Box::pin(async move {
|
||||||
let rsp = Response::builder()
|
let rsp = Response::builder()
|
||||||
.header("server", "zap-stream-core")
|
.header("server", "zap-stream-core")
|
||||||
@ -70,6 +189,13 @@ impl Service<Request<Incoming>> for HttpServer {
|
|||||||
if req.method() == Method::HEAD {
|
if req.method() == Method::HEAD {
|
||||||
return Ok(rsp.body(BoxBody::default())?);
|
return Ok(rsp.body(BoxBody::default())?);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle HLS playlists with viewer tracking
|
||||||
|
if req.uri().path().ends_with("/live.m3u8") {
|
||||||
|
return Self::handle_hls_playlist(&api_clone, &req, &dst_path).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle regular files
|
||||||
let f = File::open(&dst_path).await?;
|
let f = File::open(&dst_path).await?;
|
||||||
let f_stream = ReaderStream::new(f);
|
let f_stream = ReaderStream::new(f);
|
||||||
let body = StreamBody::new(
|
let body = StreamBody::new(
|
||||||
|
@ -2,13 +2,13 @@ use crate::blossom::{BlobDescriptor, Blossom};
|
|||||||
use crate::settings::LndSettings;
|
use crate::settings::LndSettings;
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::Utc;
|
use chrono::{DateTime, Utc};
|
||||||
use fedimint_tonic_lnd::verrpc::VersionRequest;
|
use fedimint_tonic_lnd::verrpc::VersionRequest;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use nostr_sdk::prelude::Coordinate;
|
use nostr_sdk::prelude::Coordinate;
|
||||||
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
|
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
|
||||||
use std::collections::HashSet;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -23,11 +23,19 @@ use zap_stream_core::variant::audio::AudioVariant;
|
|||||||
use zap_stream_core::variant::mapping::VariantMapping;
|
use zap_stream_core::variant::mapping::VariantMapping;
|
||||||
use zap_stream_core::variant::video::VideoVariant;
|
use zap_stream_core::variant::video::VideoVariant;
|
||||||
use zap_stream_core::variant::{StreamMapping, VariantStream};
|
use zap_stream_core::variant::{StreamMapping, VariantStream};
|
||||||
|
use zap_stream_core::viewer::ViewerTracker;
|
||||||
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
|
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
|
||||||
|
|
||||||
const STREAM_EVENT_KIND: u16 = 30_311;
|
const STREAM_EVENT_KIND: u16 = 30_311;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct StreamViewerState {
|
||||||
|
last_published_count: usize,
|
||||||
|
last_update_time: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
/// zap.stream NIP-53 overseer
|
/// zap.stream NIP-53 overseer
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct ZapStreamOverseer {
|
pub struct ZapStreamOverseer {
|
||||||
/// Dir where HTTP server serves files from
|
/// Dir where HTTP server serves files from
|
||||||
out_dir: String,
|
out_dir: String,
|
||||||
@ -46,6 +54,10 @@ pub struct ZapStreamOverseer {
|
|||||||
/// Currently active streams
|
/// Currently active streams
|
||||||
/// Any streams which are not contained in this set are dead
|
/// Any streams which are not contained in this set are dead
|
||||||
active_streams: Arc<RwLock<HashSet<Uuid>>>,
|
active_streams: Arc<RwLock<HashSet<Uuid>>>,
|
||||||
|
/// Viewer tracking for active streams
|
||||||
|
viewer_tracker: ViewerTracker,
|
||||||
|
/// Track last published viewer count and update time for each stream
|
||||||
|
stream_viewer_states: Arc<RwLock<HashMap<String, StreamViewerState>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ZapStreamOverseer {
|
impl ZapStreamOverseer {
|
||||||
@ -95,7 +107,7 @@ impl ZapStreamOverseer {
|
|||||||
}
|
}
|
||||||
client.connect().await;
|
client.connect().await;
|
||||||
|
|
||||||
Ok(Self {
|
let overseer = Self {
|
||||||
out_dir: out_dir.clone(),
|
out_dir: out_dir.clone(),
|
||||||
db,
|
db,
|
||||||
lnd,
|
lnd,
|
||||||
@ -109,7 +121,12 @@ impl ZapStreamOverseer {
|
|||||||
.collect(),
|
.collect(),
|
||||||
public_url: public_url.clone(),
|
public_url: public_url.clone(),
|
||||||
active_streams: Arc::new(RwLock::new(HashSet::new())),
|
active_streams: Arc::new(RwLock::new(HashSet::new())),
|
||||||
})
|
viewer_tracker: ViewerTracker::new(),
|
||||||
|
stream_viewer_states: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
Ok(overseer)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn database(&self) -> ZapStreamDb {
|
pub fn database(&self) -> ZapStreamDb {
|
||||||
@ -120,6 +137,11 @@ impl ZapStreamOverseer {
|
|||||||
self.lnd.clone()
|
self.lnd.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn viewer_tracker(&self) -> &ViewerTracker {
|
||||||
|
&self.viewer_tracker
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
|
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
|
||||||
let mut tags = vec![
|
let mut tags = vec![
|
||||||
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
|
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
|
||||||
@ -162,6 +184,15 @@ impl ZapStreamOverseer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add current viewer count for live streams
|
||||||
|
if stream.state == UserStreamState::Live {
|
||||||
|
let viewer_count = self.viewer_tracker.get_viewer_count(&stream.id);
|
||||||
|
tags.push(Tag::parse(&[
|
||||||
|
"current_participants".to_string(),
|
||||||
|
viewer_count.to_string(),
|
||||||
|
])?);
|
||||||
|
}
|
||||||
|
|
||||||
let kind = Kind::from(STREAM_EVENT_KIND);
|
let kind = Kind::from(STREAM_EVENT_KIND);
|
||||||
let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id);
|
let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id);
|
||||||
tags.push(Tag::parse([
|
tags.push(Tag::parse([
|
||||||
@ -228,7 +259,7 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
async fn check_streams(&self) -> Result<()> {
|
async fn check_streams(&self) -> Result<()> {
|
||||||
let active_streams = self.db.list_live_streams().await?;
|
let active_streams = self.db.list_live_streams().await?;
|
||||||
for stream in active_streams {
|
for stream in active_streams {
|
||||||
// check
|
// check if stream is alive
|
||||||
let id = Uuid::parse_str(&stream.id)?;
|
let id = Uuid::parse_str(&stream.id)?;
|
||||||
info!("Checking stream is alive: {}", stream.id);
|
info!("Checking stream is alive: {}", stream.id);
|
||||||
let is_active = {
|
let is_active = {
|
||||||
@ -239,6 +270,37 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
if let Err(e) = self.on_end(&id).await {
|
if let Err(e) = self.on_end(&id).await {
|
||||||
error!("Failed to end dead stream {}: {}", &id, e);
|
error!("Failed to end dead stream {}: {}", &id, e);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Stream is active, check if we should update viewer count in nostr event
|
||||||
|
let viewer_count = self.viewer_tracker.get_viewer_count(&stream.id);
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
let should_update = {
|
||||||
|
let viewer_states = self.stream_viewer_states.read().await;
|
||||||
|
if let Some(state) = viewer_states.get(&stream.id) {
|
||||||
|
// Update if count changed OR if 10 minutes have passed since last update
|
||||||
|
viewer_count != state.last_published_count ||
|
||||||
|
(now - state.last_update_time).num_minutes() >= 10
|
||||||
|
} else {
|
||||||
|
// First time tracking this stream, always update
|
||||||
|
viewer_count > 0
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_update && viewer_count > 0 {
|
||||||
|
if let Ok(user) = self.db.get_user(stream.user_id).await {
|
||||||
|
if let Ok(_) = self.publish_stream_event(&stream, &user.pubkey).await {
|
||||||
|
// Update the tracking state
|
||||||
|
let mut viewer_states = self.stream_viewer_states.write().await;
|
||||||
|
viewer_states.insert(stream.id.clone(), StreamViewerState {
|
||||||
|
last_published_count: viewer_count,
|
||||||
|
last_update_time: now,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
warn!("Failed to update viewer count for stream {}", stream.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -402,6 +464,10 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
let mut streams = self.active_streams.write().await;
|
let mut streams = self.active_streams.write().await;
|
||||||
streams.remove(pipeline_id);
|
streams.remove(pipeline_id);
|
||||||
|
|
||||||
|
// Clean up viewer tracking state for this stream
|
||||||
|
let mut viewer_states = self.stream_viewer_states.write().await;
|
||||||
|
viewer_states.remove(&stream.id);
|
||||||
|
|
||||||
stream.state = UserStreamState::Ended;
|
stream.state = UserStreamState::Ended;
|
||||||
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
||||||
stream.event = Some(event.as_json());
|
stream.event = Some(event.as_json());
|
||||||
|
Reference in New Issue
Block a user