feat: stream costs

This commit is contained in:
kieran 2024-12-09 11:36:05 +00:00
parent afbc2fc8b2
commit f38f436b6c
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
15 changed files with 159 additions and 21 deletions

4
Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 4
[[package]] [[package]]
name = "addr2line" name = "addr2line"
@ -1050,7 +1050,7 @@ dependencies = [
[[package]] [[package]]
name = "ffmpeg-rs-raw" name = "ffmpeg-rs-raw"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0#8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=df69b2f05da4279e36ad55086d77b45b2caf5174#df69b2f05da4279e36ad55086d77b45b2caf5174"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"ffmpeg-sys-the-third", "ffmpeg-sys-the-third",

View File

@ -32,7 +32,7 @@ test-pattern = [
] ]
[dependencies] [dependencies]
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" } ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "df69b2f05da4279e36ad55086d77b45b2caf5174" }
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
anyhow = { version = "^1.0.91", features = ["backtrace"] } anyhow = { version = "^1.0.91", features = ["backtrace"] }
pretty_env_logger = "0.5.0" pretty_env_logger = "0.5.0"

View File

@ -1,5 +1,4 @@
- RTMP? - RTMP?
- Setup multi-variant output - Setup multi-variant output
- API parity - API parity https://git.v0l.io/Kieran/zap.stream/issues/7
- fMP4 instead of MPEG-TS segments
- HLS-LL - HLS-LL

View File

@ -38,6 +38,7 @@ listen_http: "127.0.0.1:8080"
# #
overseer: overseer:
zap-stream: zap-stream:
cost: 16
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
#blossom: #blossom:
# - "http://localhost:8881" # - "http://localhost:8881"

2
src/background/mod.rs Normal file
View File

@ -0,0 +1,2 @@
mod monitor;
pub use monitor::*;

18
src/background/monitor.rs Normal file
View File

@ -0,0 +1,18 @@
use crate::overseer::Overseer;
use anyhow::Result;
use std::sync::Arc;
/// Monitor stream status, perform any necessary cleanup
pub struct BackgroundMonitor {
overseer: Arc<dyn Overseer>,
}
impl BackgroundMonitor {
pub fn new(overseer: Arc<dyn Overseer>) -> Self {
Self { overseer }
}
pub async fn check(&mut self) -> Result<()> {
self.overseer.check_streams().await
}
}

View File

@ -7,9 +7,12 @@ use log::{error, info};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::sleep;
use url::Url; use url::Url;
use warp::{cors, Filter}; use warp::{cors, Filter};
use zap_stream_core::background::BackgroundMonitor;
#[cfg(feature = "rtmp")] #[cfg(feature = "rtmp")]
use zap_stream_core::ingress::rtmp; use zap_stream_core::ingress::rtmp;
#[cfg(feature = "srt")] #[cfg(feature = "srt")]
@ -43,10 +46,10 @@ async fn main() -> Result<()> {
let settings: Settings = builder.try_deserialize()?; let settings: Settings = builder.try_deserialize()?;
let overseer = settings.get_overseer().await?; let overseer = settings.get_overseer().await?;
let mut listeners = vec![]; let mut tasks = vec![];
for e in &settings.endpoints { for e in &settings.endpoints {
match try_create_listener(e, &settings.output_dir, &overseer) { match try_create_listener(e, &settings.output_dir, &overseer) {
Ok(l) => listeners.push(l), Ok(l) => tasks.push(l),
Err(e) => error!("{}", e), Err(e) => error!("{}", e),
} }
} }
@ -55,7 +58,7 @@ async fn main() -> Result<()> {
let http_dir = settings.output_dir.clone(); let http_dir = settings.output_dir.clone();
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
listeners.push(tokio::spawn(async move { tasks.push(tokio::spawn(async move {
let cors = cors().allow_any_origin().allow_methods(vec!["GET"]); let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
let index_handle = warp::get() let index_handle = warp::get()
@ -71,7 +74,18 @@ async fn main() -> Result<()> {
Ok(()) Ok(())
})); }));
for handle in listeners { // spawn background job
let mut bg = BackgroundMonitor::new(overseer.clone());
tasks.push(tokio::spawn(async move {
loop {
if let Err(e) = bg.check().await {
error!("{}", e);
}
sleep(Duration::from_secs(10)).await;
}
}));
for handle in tasks {
if let Err(e) = handle.await? { if let Err(e) = handle.await? {
error!("{e}"); error!("{e}");
} }

View File

@ -65,6 +65,6 @@ impl Egress for RecorderEgress {
} }
unsafe fn reset(&mut self) -> Result<()> { unsafe fn reset(&mut self) -> Result<()> {
self.muxer.reset() self.muxer.close()
} }
} }

View File

@ -1,3 +1,4 @@
pub mod background;
#[cfg(feature = "zap-stream")] #[cfg(feature = "zap-stream")]
pub mod blossom; pub mod blossom;
pub mod egress; pub mod egress;

View File

@ -226,7 +226,7 @@ impl HlsVariant {
} }
pub unsafe fn reset(&mut self) -> Result<()> { pub unsafe fn reset(&mut self) -> Result<()> {
self.mux.reset() self.mux.close()
} }
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<NewSegment> { unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<NewSegment> {

View File

@ -66,6 +66,9 @@ pub enum IngressStreamType {
#[async_trait] #[async_trait]
/// The control process that oversees streaming operations /// The control process that oversees streaming operations
pub trait Overseer: Send + Sync { pub trait Overseer: Send + Sync {
/// Check all streams
async fn check_streams(&self) -> Result<()>;
/// Set up a new streaming pipeline /// Set up a new streaming pipeline
async fn start_stream( async fn start_stream(
&self, &self,
@ -113,6 +116,7 @@ impl Settings {
lnd, lnd,
relays, relays,
blossom, blossom,
cost,
} => Ok(Arc::new( } => Ok(Arc::new(
ZapStreamOverseer::new( ZapStreamOverseer::new(
&self.output_dir, &self.output_dir,
@ -122,6 +126,7 @@ impl Settings {
lnd, lnd,
relays, relays,
blossom, blossom,
*cost,
) )
.await?, .await?,
)), )),

View File

@ -14,14 +14,17 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
use ffmpeg_rs_raw::Encoder; use ffmpeg_rs_raw::Encoder;
use futures_util::FutureExt; use futures_util::FutureExt;
use log::{info, warn}; use log::{error, info, warn};
use nostr_sdk::bitcoin::PrivateKey; use nostr_sdk::bitcoin::PrivateKey;
use nostr_sdk::prelude::Coordinate; use nostr_sdk::prelude::Coordinate;
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
use std::collections::HashSet;
use std::env::temp_dir; use std::env::temp_dir;
use std::fs::create_dir_all; use std::fs::create_dir_all;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
use warp::Filter; use warp::Filter;
@ -46,6 +49,11 @@ pub struct ZapStreamOverseer {
blossom_servers: Vec<Blossom>, blossom_servers: Vec<Blossom>,
/// Public facing URL pointing to [out_dir] /// Public facing URL pointing to [out_dir]
public_url: String, public_url: String,
/// Cost / second / variant
cost: i64,
/// Currently active streams
/// Any streams which are not contained in this set are dead
active_streams: Arc<RwLock<HashSet<Uuid>>>,
} }
impl ZapStreamOverseer { impl ZapStreamOverseer {
@ -57,6 +65,7 @@ impl ZapStreamOverseer {
lnd: &LndSettings, lnd: &LndSettings,
relays: &Vec<String>, relays: &Vec<String>,
blossom_servers: &Option<Vec<String>>, blossom_servers: &Option<Vec<String>>,
cost: i64,
) -> Result<Self> { ) -> Result<Self> {
let db = ZapStreamDb::new(db).await?; let db = ZapStreamDb::new(db).await?;
db.migrate().await?; db.migrate().await?;
@ -94,6 +103,8 @@ impl ZapStreamOverseer {
.map(|b| Blossom::new(b)) .map(|b| Blossom::new(b))
.collect(), .collect(),
public_url: public_url.clone(), public_url: public_url.clone(),
cost,
active_streams: Arc::new(RwLock::new(HashSet::new())),
}) })
} }
@ -206,6 +217,25 @@ impl ZapStreamOverseer {
#[async_trait] #[async_trait]
impl Overseer for ZapStreamOverseer { impl Overseer for ZapStreamOverseer {
async fn check_streams(&self) -> Result<()> {
let active_streams = self.db.list_live_streams().await?;
for stream in active_streams {
// check
let id = Uuid::parse_str(&stream.id)?;
info!("Checking stream is alive: {}", stream.id);
let is_active = {
let streams = self.active_streams.read().await;
streams.contains(&id)
};
if !is_active {
if let Err(e) = self.on_end(&id).await {
error!("Failed to end dead stream {}: {}", &id, e);
}
}
}
Ok(())
}
async fn start_stream( async fn start_stream(
&self, &self,
connection: &ConnectionInfo, connection: &ConnectionInfo,
@ -217,6 +247,11 @@ impl Overseer for ZapStreamOverseer {
.await? .await?
.ok_or_else(|| anyhow::anyhow!("User not found"))?; .ok_or_else(|| anyhow::anyhow!("User not found"))?;
let user = self.db.get_user(uid).await?;
if user.balance <= 0 {
bail!("Not enough balance");
}
let variants = get_default_variants(&stream_info)?; let variants = get_default_variants(&stream_info)?;
let mut egress = vec![]; let mut egress = vec![];
@ -225,7 +260,6 @@ impl Overseer for ZapStreamOverseer {
variants: variants.iter().map(|v| v.id()).collect(), variants: variants.iter().map(|v| v.id()).collect(),
})); }));
let user = self.db.get_user(uid).await?;
let stream_id = Uuid::new_v4(); let stream_id = Uuid::new_v4();
// insert new stream record // insert new stream record
let mut new_stream = UserStream { let mut new_stream = UserStream {
@ -238,8 +272,12 @@ impl Overseer for ZapStreamOverseer {
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
new_stream.event = Some(stream_event.as_json()); new_stream.event = Some(stream_event.as_json());
let mut streams = self.active_streams.write().await;
streams.insert(stream_id.clone());
self.db.insert_stream(&new_stream).await?; self.db.insert_stream(&new_stream).await?;
self.db.update_stream(&new_stream).await?; self.db.update_stream(&new_stream).await?;
Ok(PipelineConfig { Ok(PipelineConfig {
id: stream_id, id: stream_id,
variants, variants,
@ -255,6 +293,16 @@ impl Overseer for ZapStreamOverseer {
duration: f32, duration: f32,
path: &PathBuf, path: &PathBuf,
) -> Result<()> { ) -> Result<()> {
let cost = self.cost * duration.round() as i64;
let stream = self.db.get_stream(pipeline_id).await?;
let bal = self
.db
.tick_stream(pipeline_id, stream.user_id, duration, cost)
.await?;
if bal <= 0 {
bail!("Not enough balance");
}
// Upload to blossom servers if configured // Upload to blossom servers if configured
let mut blobs = vec![]; let mut blobs = vec![];
for b in &self.blossom_servers { for b in &self.blossom_servers {
@ -303,6 +351,9 @@ impl Overseer for ZapStreamOverseer {
let mut stream = self.db.get_stream(pipeline_id).await?; let mut stream = self.db.get_stream(pipeline_id).await?;
let user = self.db.get_user(stream.user_id).await?; let user = self.db.get_user(stream.user_id).await?;
let mut streams = self.active_streams.write().await;
streams.remove(pipeline_id);
stream.state = UserStreamState::Ended; stream.state = UserStreamState::Ended;
let event = self.publish_stream_event(&stream, &user.pubkey).await?; let event = self.publish_stream_event(&stream, &user.pubkey).await?;
stream.event = Some(event.as_json()); stream.event = Some(event.as_json());

View File

@ -73,6 +73,9 @@ pub struct PipelineRunner {
overseer: Arc<dyn Overseer>, overseer: Arc<dyn Overseer>,
fps_counter_start: Instant, fps_counter_start: Instant,
fps_last_frame_ctr: u64,
/// Total number of frames produced
frame_ctr: u64, frame_ctr: u64,
out_dir: String, out_dir: String,
} }
@ -100,6 +103,7 @@ impl PipelineRunner {
fps_counter_start: Instant::now(), fps_counter_start: Instant::now(),
egress: Vec::new(), egress: Vec::new(),
frame_ctr: 0, frame_ctr: 0,
fps_last_frame_ctr: 0,
info: None, info: None,
}) })
} }
@ -162,9 +166,7 @@ impl PipelineRunner {
let p = (*stream).codecpar; let p = (*stream).codecpar;
if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO { if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
let pts_sec = ((*frame).pts as f64 * av_q2d((*stream).time_base)).floor() as u64; if (self.frame_ctr % 1800) == 0 {
// write thumbnail every 1min
if pts_sec % 60 == 0 && pts_sec != 0 {
let dst_pic = PathBuf::from(&self.out_dir) let dst_pic = PathBuf::from(&self.out_dir)
.join(config.id.to_string()) .join(config.id.to_string())
.join("thumb.webp"); .join("thumb.webp");
@ -274,16 +276,18 @@ impl PipelineRunner {
.on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path) .on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path)
.await .await
{ {
error!("Failed to process segment: {}", e); bail!("Failed to process segment {}", e.to_string());
} }
} }
} }
}); Ok(())
})?;
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
if elapsed >= 2f32 { if elapsed >= 2f32 {
info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed); let n_frames = self.frame_ctr - self.fps_last_frame_ctr;
info!("Average fps: {:.2}", n_frames as f32 / elapsed);
self.fps_counter_start = Instant::now(); self.fps_counter_start = Instant::now();
self.frame_ctr = 0; self.fps_last_frame_ctr = self.frame_ctr;
} }
Ok(true) Ok(true)
} }

View File

@ -44,6 +44,8 @@ pub enum OverseerConfig {
nsec: String, nsec: String,
/// Blossom servers /// Blossom servers
blossom: Option<Vec<String>>, blossom: Option<Vec<String>>,
/// Cost (milli-sats) / second / variant
cost: i64,
}, },
} }

View File

@ -1,6 +1,6 @@
use crate::{User, UserStream}; use crate::{User, UserStream};
use anyhow::Result; use anyhow::Result;
use sqlx::{MySqlPool, Row}; use sqlx::{Executor, MySqlPool, Row};
use uuid::Uuid; use uuid::Uuid;
pub struct ZapStreamDb { pub struct ZapStreamDb {
@ -101,4 +101,45 @@ impl ZapStreamDb {
.await .await
.map_err(anyhow::Error::new)?) .map_err(anyhow::Error::new)?)
} }
/// Get the list of active streams
pub async fn list_live_streams(&self) -> Result<Vec<UserStream>> {
Ok(sqlx::query_as("select * from user_stream where state = 2")
.fetch_all(&self.db)
.await?)
}
/// Add [duration] & [cost] to a stream and return the new user balance
pub async fn tick_stream(
&self,
stream_id: &Uuid,
user_id: u64,
duration: f32,
cost: i64,
) -> Result<i64> {
let mut tx = self.db.begin().await?;
sqlx::query("update user_stream set duration = duration + ?, cost = cost + ? where id = ?")
.bind(&duration)
.bind(&cost)
.bind(stream_id.to_string())
.execute(&mut *tx)
.await?;
sqlx::query("update user set balance = balance - ? where id = ?")
.bind(&cost)
.bind(&user_id)
.execute(&mut *tx)
.await?;
let balance: i64 = sqlx::query("select balance from user where id = ?")
.bind(&user_id)
.fetch_one(&mut *tx)
.await?
.try_get(0)?;
tx.commit().await?;
Ok(balance)
}
} }