From f38f436b6cc160472a20071d58898845da1c87ae Mon Sep 17 00:00:00 2001 From: kieran Date: Mon, 9 Dec 2024 11:36:05 +0000 Subject: [PATCH] feat: stream costs --- Cargo.lock | 4 +-- Cargo.toml | 2 +- TODO.md | 3 +-- config.yaml | 1 + src/background/mod.rs | 2 ++ src/background/monitor.rs | 18 +++++++++++++ src/bin/zap_stream_core.rs | 22 ++++++++++++--- src/egress/recorder.rs | 2 +- src/lib.rs | 1 + src/mux/hls.rs | 2 +- src/overseer/mod.rs | 5 ++++ src/overseer/zap_stream.rs | 55 ++++++++++++++++++++++++++++++++++++-- src/pipeline/runner.rs | 18 ++++++++----- src/settings.rs | 2 ++ zap-stream-db/src/db.rs | 43 ++++++++++++++++++++++++++++- 15 files changed, 159 insertions(+), 21 deletions(-) create mode 100644 src/background/mod.rs create mode 100644 src/background/monitor.rs diff --git a/Cargo.lock b/Cargo.lock index 81d2534..5b75c09 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1050,7 +1050,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0#8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=df69b2f05da4279e36ad55086d77b45b2caf5174#df69b2f05da4279e36ad55086d77b45b2caf5174" dependencies = [ "anyhow", "ffmpeg-sys-the-third", diff --git a/Cargo.toml b/Cargo.toml index 7a1e292..07f8cf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ test-pattern = [ ] [dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "df69b2f05da4279e36ad55086d77b45b2caf5174" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } pretty_env_logger = "0.5.0" diff --git a/TODO.md b/TODO.md index cb0f270..81964d2 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,4 @@ - RTMP? - Setup multi-variant output -- API parity -- fMP4 instead of MPEG-TS segments +- API parity https://git.v0l.io/Kieran/zap.stream/issues/7 - HLS-LL \ No newline at end of file diff --git a/config.yaml b/config.yaml index 370ff3c..6cd414c 100755 --- a/config.yaml +++ b/config.yaml @@ -38,6 +38,7 @@ listen_http: "127.0.0.1:8080" # overseer: zap-stream: + cost: 16 nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" #blossom: # - "http://localhost:8881" diff --git a/src/background/mod.rs b/src/background/mod.rs new file mode 100644 index 0000000..e70d00b --- /dev/null +++ b/src/background/mod.rs @@ -0,0 +1,2 @@ +mod monitor; +pub use monitor::*; diff --git a/src/background/monitor.rs b/src/background/monitor.rs new file mode 100644 index 0000000..06d819e --- /dev/null +++ b/src/background/monitor.rs @@ -0,0 +1,18 @@ +use crate::overseer::Overseer; +use anyhow::Result; +use std::sync::Arc; + +/// Monitor stream status, perform any necessary cleanup +pub struct BackgroundMonitor { + overseer: Arc, +} + +impl BackgroundMonitor { + pub fn new(overseer: Arc) -> Self { + Self { overseer } + } + + pub async fn check(&mut self) -> Result<()> { + self.overseer.check_streams().await + } +} diff --git a/src/bin/zap_stream_core.rs b/src/bin/zap_stream_core.rs index 58f871a..6f7f6f1 100644 --- a/src/bin/zap_stream_core.rs +++ b/src/bin/zap_stream_core.rs @@ -7,9 +7,12 @@ use log::{error, info}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use tokio::task::JoinHandle; +use tokio::time::sleep; use url::Url; use warp::{cors, Filter}; +use zap_stream_core::background::BackgroundMonitor; #[cfg(feature = "rtmp")] use zap_stream_core::ingress::rtmp; #[cfg(feature = "srt")] @@ -43,10 +46,10 @@ async fn main() -> Result<()> { let settings: Settings = builder.try_deserialize()?; let overseer = settings.get_overseer().await?; - let mut listeners = vec![]; + let mut tasks = vec![]; for e in &settings.endpoints { match try_create_listener(e, &settings.output_dir, &overseer) { - Ok(l) => listeners.push(l), + Ok(l) => tasks.push(l), Err(e) => error!("{}", e), } } @@ -55,7 +58,7 @@ async fn main() -> Result<()> { let http_dir = settings.output_dir.clone(); let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); - listeners.push(tokio::spawn(async move { + tasks.push(tokio::spawn(async move { let cors = cors().allow_any_origin().allow_methods(vec!["GET"]); let index_handle = warp::get() @@ -71,7 +74,18 @@ async fn main() -> Result<()> { Ok(()) })); - for handle in listeners { + // spawn background job + let mut bg = BackgroundMonitor::new(overseer.clone()); + tasks.push(tokio::spawn(async move { + loop { + if let Err(e) = bg.check().await { + error!("{}", e); + } + sleep(Duration::from_secs(10)).await; + } + })); + + for handle in tasks { if let Err(e) = handle.await? { error!("{e}"); } diff --git a/src/egress/recorder.rs b/src/egress/recorder.rs index 24c32df..3e7cca0 100644 --- a/src/egress/recorder.rs +++ b/src/egress/recorder.rs @@ -65,6 +65,6 @@ impl Egress for RecorderEgress { } unsafe fn reset(&mut self) -> Result<()> { - self.muxer.reset() + self.muxer.close() } } diff --git a/src/lib.rs b/src/lib.rs index f476d2a..c3c8e5d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod background; #[cfg(feature = "zap-stream")] pub mod blossom; pub mod egress; diff --git a/src/mux/hls.rs b/src/mux/hls.rs index c48433c..d41b570 100644 --- a/src/mux/hls.rs +++ b/src/mux/hls.rs @@ -226,7 +226,7 @@ impl HlsVariant { } pub unsafe fn reset(&mut self) -> Result<()> { - self.mux.reset() + self.mux.close() } unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result { diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs index a354866..f1ba8a8 100644 --- a/src/overseer/mod.rs +++ b/src/overseer/mod.rs @@ -66,6 +66,9 @@ pub enum IngressStreamType { #[async_trait] /// The control process that oversees streaming operations pub trait Overseer: Send + Sync { + /// Check all streams + async fn check_streams(&self) -> Result<()>; + /// Set up a new streaming pipeline async fn start_stream( &self, @@ -113,6 +116,7 @@ impl Settings { lnd, relays, blossom, + cost, } => Ok(Arc::new( ZapStreamOverseer::new( &self.output_dir, @@ -122,6 +126,7 @@ impl Settings { lnd, relays, blossom, + *cost, ) .await?, )), diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs index cd33ec2..3966cd8 100644 --- a/src/overseer/zap_stream.rs +++ b/src/overseer/zap_stream.rs @@ -14,14 +14,17 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame; use ffmpeg_rs_raw::Encoder; use futures_util::FutureExt; -use log::{info, warn}; +use log::{error, info, warn}; use nostr_sdk::bitcoin::PrivateKey; use nostr_sdk::prelude::Coordinate; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; +use std::collections::HashSet; use std::env::temp_dir; use std::fs::create_dir_all; use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; +use tokio::sync::RwLock; use url::Url; use uuid::Uuid; use warp::Filter; @@ -46,6 +49,11 @@ pub struct ZapStreamOverseer { blossom_servers: Vec, /// Public facing URL pointing to [out_dir] public_url: String, + /// Cost / second / variant + cost: i64, + /// Currently active streams + /// Any streams which are not contained in this set are dead + active_streams: Arc>>, } impl ZapStreamOverseer { @@ -57,6 +65,7 @@ impl ZapStreamOverseer { lnd: &LndSettings, relays: &Vec, blossom_servers: &Option>, + cost: i64, ) -> Result { let db = ZapStreamDb::new(db).await?; db.migrate().await?; @@ -94,6 +103,8 @@ impl ZapStreamOverseer { .map(|b| Blossom::new(b)) .collect(), public_url: public_url.clone(), + cost, + active_streams: Arc::new(RwLock::new(HashSet::new())), }) } @@ -206,6 +217,25 @@ impl ZapStreamOverseer { #[async_trait] impl Overseer for ZapStreamOverseer { + async fn check_streams(&self) -> Result<()> { + let active_streams = self.db.list_live_streams().await?; + for stream in active_streams { + // check + let id = Uuid::parse_str(&stream.id)?; + info!("Checking stream is alive: {}", stream.id); + let is_active = { + let streams = self.active_streams.read().await; + streams.contains(&id) + }; + if !is_active { + if let Err(e) = self.on_end(&id).await { + error!("Failed to end dead stream {}: {}", &id, e); + } + } + } + Ok(()) + } + async fn start_stream( &self, connection: &ConnectionInfo, @@ -217,6 +247,11 @@ impl Overseer for ZapStreamOverseer { .await? .ok_or_else(|| anyhow::anyhow!("User not found"))?; + let user = self.db.get_user(uid).await?; + if user.balance <= 0 { + bail!("Not enough balance"); + } + let variants = get_default_variants(&stream_info)?; let mut egress = vec![]; @@ -225,7 +260,6 @@ impl Overseer for ZapStreamOverseer { variants: variants.iter().map(|v| v.id()).collect(), })); - let user = self.db.get_user(uid).await?; let stream_id = Uuid::new_v4(); // insert new stream record let mut new_stream = UserStream { @@ -238,8 +272,12 @@ impl Overseer for ZapStreamOverseer { let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; new_stream.event = Some(stream_event.as_json()); + let mut streams = self.active_streams.write().await; + streams.insert(stream_id.clone()); + self.db.insert_stream(&new_stream).await?; self.db.update_stream(&new_stream).await?; + Ok(PipelineConfig { id: stream_id, variants, @@ -255,6 +293,16 @@ impl Overseer for ZapStreamOverseer { duration: f32, path: &PathBuf, ) -> Result<()> { + let cost = self.cost * duration.round() as i64; + let stream = self.db.get_stream(pipeline_id).await?; + let bal = self + .db + .tick_stream(pipeline_id, stream.user_id, duration, cost) + .await?; + if bal <= 0 { + bail!("Not enough balance"); + } + // Upload to blossom servers if configured let mut blobs = vec![]; for b in &self.blossom_servers { @@ -303,6 +351,9 @@ impl Overseer for ZapStreamOverseer { let mut stream = self.db.get_stream(pipeline_id).await?; let user = self.db.get_user(stream.user_id).await?; + let mut streams = self.active_streams.write().await; + streams.remove(pipeline_id); + stream.state = UserStreamState::Ended; let event = self.publish_stream_event(&stream, &user.pubkey).await?; stream.event = Some(event.as_json()); diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 42a6da2..4454d9c 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -73,6 +73,9 @@ pub struct PipelineRunner { overseer: Arc, fps_counter_start: Instant, + fps_last_frame_ctr: u64, + + /// Total number of frames produced frame_ctr: u64, out_dir: String, } @@ -100,6 +103,7 @@ impl PipelineRunner { fps_counter_start: Instant::now(), egress: Vec::new(), frame_ctr: 0, + fps_last_frame_ctr: 0, info: None, }) } @@ -162,9 +166,7 @@ impl PipelineRunner { let p = (*stream).codecpar; if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO { - let pts_sec = ((*frame).pts as f64 * av_q2d((*stream).time_base)).floor() as u64; - // write thumbnail every 1min - if pts_sec % 60 == 0 && pts_sec != 0 { + if (self.frame_ctr % 1800) == 0 { let dst_pic = PathBuf::from(&self.out_dir) .join(config.id.to_string()) .join("thumb.webp"); @@ -274,16 +276,18 @@ impl PipelineRunner { .on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path) .await { - error!("Failed to process segment: {}", e); + bail!("Failed to process segment {}", e.to_string()); } } } - }); + Ok(()) + })?; let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); if elapsed >= 2f32 { - info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed); + let n_frames = self.frame_ctr - self.fps_last_frame_ctr; + info!("Average fps: {:.2}", n_frames as f32 / elapsed); self.fps_counter_start = Instant::now(); - self.frame_ctr = 0; + self.fps_last_frame_ctr = self.frame_ctr; } Ok(true) } diff --git a/src/settings.rs b/src/settings.rs index 044bb36..2d3cc0d 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -44,6 +44,8 @@ pub enum OverseerConfig { nsec: String, /// Blossom servers blossom: Option>, + /// Cost (milli-sats) / second / variant + cost: i64, }, } diff --git a/zap-stream-db/src/db.rs b/zap-stream-db/src/db.rs index 3639b1f..8c6deb9 100644 --- a/zap-stream-db/src/db.rs +++ b/zap-stream-db/src/db.rs @@ -1,6 +1,6 @@ use crate::{User, UserStream}; use anyhow::Result; -use sqlx::{MySqlPool, Row}; +use sqlx::{Executor, MySqlPool, Row}; use uuid::Uuid; pub struct ZapStreamDb { @@ -101,4 +101,45 @@ impl ZapStreamDb { .await .map_err(anyhow::Error::new)?) } + + /// Get the list of active streams + pub async fn list_live_streams(&self) -> Result> { + Ok(sqlx::query_as("select * from user_stream where state = 2") + .fetch_all(&self.db) + .await?) + } + + /// Add [duration] & [cost] to a stream and return the new user balance + pub async fn tick_stream( + &self, + stream_id: &Uuid, + user_id: u64, + duration: f32, + cost: i64, + ) -> Result { + let mut tx = self.db.begin().await?; + + sqlx::query("update user_stream set duration = duration + ?, cost = cost + ? where id = ?") + .bind(&duration) + .bind(&cost) + .bind(stream_id.to_string()) + .execute(&mut *tx) + .await?; + + sqlx::query("update user set balance = balance - ? where id = ?") + .bind(&cost) + .bind(&user_id) + .execute(&mut *tx) + .await?; + + let balance: i64 = sqlx::query("select balance from user where id = ?") + .bind(&user_id) + .fetch_one(&mut *tx) + .await? + .try_get(0)?; + + tx.commit().await?; + + Ok(balance) + } }