From b295d7e7bece0b99e654057130b9d9a476f4b790 Mon Sep 17 00:00:00 2001 From: Kieran Date: Fri, 6 Jun 2025 12:07:17 +0100 Subject: [PATCH] feat: pure vibes feat: implement API --- Cargo.lock | 10 +- Cargo.toml | 2 +- .../migrations/20241115120541_init.sql | 85 +- crates/zap-stream-db/src/db.rs | 231 +++++- crates/zap-stream-db/src/model.rs | 64 ++ crates/zap-stream/Cargo.toml | 3 +- crates/zap-stream/config.yaml | 1 - crates/zap-stream/src/api.rs | 733 +++++++++++++++--- crates/zap-stream/src/http.rs | 84 +- crates/zap-stream/src/main.rs | 2 +- crates/zap-stream/src/overseer.rs | 189 ++++- crates/zap-stream/src/settings.rs | 4 - 12 files changed, 1258 insertions(+), 150 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ebd2b5..59de8da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -269,7 +269,7 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.32", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -588,6 +588,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets 0.52.6", ] @@ -2090,6 +2091,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f926ade0c4e170215ae43342bf13b9310a437609c81f29f86c5df6657582ef9" + [[package]] name = "md-5" version = "0.10.6" @@ -4786,6 +4793,7 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "log", + "matchit 0.8.6", "nostr-sdk", "pretty_env_logger", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 9acab4e..a698ccc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,5 +16,5 @@ uuid = { version = "1.8.0", features = ["v4", "serde"] } serde = { version = "1.0.197", features = ["derive"] } url = "2.5.0" itertools = "0.14.0" -chrono = "^0.4.38" +chrono = { version = "^0.4.38", features = ["serde"] } hex = "0.4.3" \ No newline at end of file diff --git a/crates/zap-stream-db/migrations/20241115120541_init.sql b/crates/zap-stream-db/migrations/20241115120541_init.sql index 0ab994a..c9526ce 100644 --- a/crates/zap-stream-db/migrations/20241115120541_init.sql +++ b/crates/zap-stream-db/migrations/20241115120541_init.sql @@ -1,17 +1,33 @@ -- Add migration script here create table user ( - id integer unsigned not null auto_increment primary key, - pubkey binary(32) not null, - created timestamp default current_timestamp, - balance bigint not null default 0, - tos_accepted timestamp, - stream_key text not null default uuid(), - is_admin bool not null default false, - is_blocked bool not null default false, - recording bool not null default false + id integer unsigned not null auto_increment primary key, + pubkey binary(32) not null, + created timestamp not null default current_timestamp, + balance bigint not null default 0, + tos_accepted timestamp, + stream_key text not null default uuid(), + is_admin bool not null default false, + is_blocked bool not null default false, + recording bool not null default false, + title text, + summary text, + image text, + tags text, + content_warning text, + goal text ); create unique index ix_user_pubkey on user (pubkey); + +-- Add ingest endpoints table for pipeline configuration (must come before user_stream) +create table ingest_endpoint +( + id integer unsigned not null auto_increment primary key, + name varchar(255) not null, + cost bigint unsigned not null default 10000, + capabilities text +); + create table user_stream ( id varchar(50) not null primary key, @@ -35,7 +51,56 @@ create table user_stream fee integer unsigned, -- current nostr event json event text, + -- endpoint id if using specific endpoint + endpoint_id integer unsigned, + -- timestamp of last segment + last_segment timestamp, constraint fk_user_stream_user + foreign key (user_id) references user (id), + constraint fk_user_stream_endpoint + foreign key (endpoint_id) references ingest_endpoint (id) +); + +-- Add forwards table for payment forwarding +create table user_stream_forward +( + id integer unsigned not null auto_increment primary key, + user_id integer unsigned not null, + name text not null, + target text not null, + constraint fk_user_stream_forward_user foreign key (user_id) references user (id) -); \ No newline at end of file +); + +-- Add keys table for stream keys +create table user_stream_key +( + id integer unsigned not null auto_increment primary key, + user_id integer unsigned not null, + `key` text not null, + created timestamp not null default current_timestamp, + expires timestamp, + stream_id varchar(50) not null, + constraint fk_user_stream_key_user + foreign key (user_id) references user (id), + constraint fk_user_stream_key_stream + foreign key (stream_id) references user_stream (id) +); + +-- Add payments table for payment logging +create table payment +( + payment_hash binary(32) not null primary key, + user_id integer unsigned not null, + invoice text, + is_paid bool not null default false, + amount bigint unsigned not null, + created timestamp not null default current_timestamp, + nostr text, + payment_type tinyint unsigned not null, + fee bigint unsigned not null default 0, + constraint fk_payment_user + foreign key (user_id) references user (id) +); + diff --git a/crates/zap-stream-db/src/db.rs b/crates/zap-stream-db/src/db.rs index 9293bc3..003e032 100644 --- a/crates/zap-stream-db/src/db.rs +++ b/crates/zap-stream-db/src/db.rs @@ -1,6 +1,8 @@ -use crate::{User, UserStream}; +use crate::{ + IngestEndpoint, Payment, PaymentType, User, UserStream, UserStreamForward, UserStreamKey, +}; use anyhow::Result; -use sqlx::{Executor, MySqlPool, Row}; +use sqlx::{MySqlPool, Row}; use uuid::Uuid; #[derive(Clone)] @@ -53,6 +55,15 @@ impl ZapStreamDb { Ok(()) } + /// Mark TOS as accepted for a user + pub async fn accept_tos(&self, uid: u64) -> Result<()> { + sqlx::query("update user set tos_accepted = NOW() where id = ?") + .bind(uid) + .execute(&self.db) + .await?; + Ok(()) + } + pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result { let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id") .bind(pubkey.as_slice()) @@ -153,4 +164,220 @@ impl ZapStreamDb { Ok(balance) } + + /// Create a new forward + pub async fn create_forward(&self, user_id: u64, name: &str, target: &str) -> Result { + let result = + sqlx::query("insert into user_stream_forward (user_id, name, target) values (?, ?, ?)") + .bind(user_id) + .bind(name) + .bind(target) + .execute(&self.db) + .await?; + Ok(result.last_insert_id()) + } + + /// Get all forwards for a user + pub async fn get_user_forwards(&self, user_id: u64) -> Result> { + Ok( + sqlx::query_as("select * from user_stream_forward where user_id = ?") + .bind(user_id) + .fetch_all(&self.db) + .await?, + ) + } + + /// Delete a forward + pub async fn delete_forward(&self, user_id: u64, forward_id: u64) -> Result<()> { + sqlx::query("delete from user_stream_forward where id = ? and user_id = ?") + .bind(forward_id) + .bind(user_id) + .execute(&self.db) + .await?; + Ok(()) + } + + /// Create a new stream key + pub async fn create_stream_key( + &self, + user_id: u64, + key: &str, + expires: Option>, + stream_id: &str, + ) -> Result { + let result = sqlx::query( + "insert into user_stream_key (user_id, key, expires, stream_id) values (?, ?, ?, ?)", + ) + .bind(user_id) + .bind(key) + .bind(expires) + .bind(stream_id) + .execute(&self.db) + .await?; + Ok(result.last_insert_id()) + } + + /// Get all stream keys for a user + pub async fn get_user_stream_keys(&self, user_id: u64) -> Result> { + Ok( + sqlx::query_as("select * from user_stream_key where user_id = ?") + .bind(user_id) + .fetch_all(&self.db) + .await?, + ) + } + + /// Delete a stream key + pub async fn delete_stream_key(&self, user_id: u64, key_id: u64) -> Result<()> { + sqlx::query("delete from user_stream_key where id = ? and user_id = ?") + .bind(key_id) + .bind(user_id) + .execute(&self.db) + .await?; + Ok(()) + } + + /// Find user by stream key (including temporary keys) + pub async fn find_user_by_any_stream_key(&self, key: &str) -> Result> { + #[cfg(feature = "test-pattern")] + if key == "test" { + return Ok(Some(self.upsert_user(&[0; 32]).await?)); + } + + // First check primary stream key + if let Some(uid) = self.find_user_stream_key(key).await? { + return Ok(Some(uid)); + } + + // Then check temporary stream keys + Ok(sqlx::query("select user_id from user_stream_key where key = ? and (expires is null or expires > now())") + .bind(key) + .fetch_optional(&self.db) + .await? + .map(|r| r.try_get(0).unwrap())) + } + + /// Create a payment record + pub async fn create_payment( + &self, + payment_hash: &[u8], + user_id: u64, + invoice: Option<&str>, + amount: u64, + payment_type: PaymentType, + fee: u64, + ) -> Result<()> { + sqlx::query("insert into payment (payment_hash, user_id, invoice, amount, payment_type, fee) values (?, ?, ?, ?, ?, ?)") + .bind(payment_hash) + .bind(user_id) + .bind(invoice) + .bind(amount) + .bind(payment_type) + .bind(fee) + .execute(&self.db) + .await?; + Ok(()) + } + + /// Mark payment as paid + pub async fn mark_payment_paid(&self, payment_hash: &[u8]) -> Result<()> { + sqlx::query("update payment set is_paid = true where payment_hash = ?") + .bind(payment_hash) + .execute(&self.db) + .await?; + Ok(()) + } + + /// Update payment fee and mark as paid + pub async fn complete_payment(&self, payment_hash: &[u8], fee: u64) -> Result<()> { + sqlx::query("update payment set fee = ?, is_paid = true where payment_hash = ?") + .bind(fee) + .bind(payment_hash) + .execute(&self.db) + .await?; + Ok(()) + } + + /// Get payment by hash + pub async fn get_payment(&self, payment_hash: &[u8]) -> Result> { + Ok( + sqlx::query_as("select * from payment where payment_hash = ?") + .bind(payment_hash) + .fetch_optional(&self.db) + .await?, + ) + } + + /// Get payment history for user + pub async fn get_payment_history( + &self, + user_id: u64, + offset: u64, + limit: u64, + ) -> Result> { + Ok(sqlx::query_as( + "select * from payment where user_id = ? order by created desc limit ? offset ?", + ) + .bind(user_id) + .bind(limit) + .bind(offset) + .fetch_all(&self.db) + .await?) + } + + /// Update user default stream info + pub async fn update_user_defaults( + &self, + user_id: u64, + title: Option<&str>, + summary: Option<&str>, + image: Option<&str>, + tags: Option<&str>, + content_warning: Option<&str>, + goal: Option<&str>, + ) -> Result<()> { + sqlx::query("update user set title = ?, summary = ?, image = ?, tags = ?, content_warning = ?, goal = ? where id = ?") + .bind(title) + .bind(summary) + .bind(image) + .bind(tags) + .bind(content_warning) + .bind(goal) + .bind(user_id) + .execute(&self.db) + .await?; + Ok(()) + } + + /// Get all ingest endpoints + pub async fn get_ingest_endpoints(&self) -> Result> { + Ok(sqlx::query_as("select * from ingest_endpoint") + .fetch_all(&self.db) + .await?) + } + + /// Get ingest endpoint by id + pub async fn get_ingest_endpoint(&self, endpoint_id: u64) -> Result> { + Ok(sqlx::query_as("select * from ingest_endpoint where id = ?") + .bind(endpoint_id) + .fetch_optional(&self.db) + .await?) + } + + /// Create ingest endpoint + pub async fn create_ingest_endpoint( + &self, + name: &str, + cost: u64, + capabilities: Option<&str>, + ) -> Result { + let result = + sqlx::query("insert into ingest_endpoint (name, cost, capabilities) values (?, ?, ?)") + .bind(name) + .bind(cost) + .bind(capabilities) + .execute(&self.db) + .await?; + Ok(result.last_insert_id()) + } } diff --git a/crates/zap-stream-db/src/model.rs b/crates/zap-stream-db/src/model.rs index e17fcb1..e6cfaee 100644 --- a/crates/zap-stream-db/src/model.rs +++ b/crates/zap-stream-db/src/model.rs @@ -22,6 +22,18 @@ pub struct User { pub is_blocked: bool, /// Streams are recorded pub recording: bool, + /// Default stream title + pub title: Option, + /// Default stream summary + pub summary: Option, + /// Default stream image + pub image: Option, + /// Default tags (comma separated) + pub tags: Option, + /// Default content warning + pub content_warning: Option, + /// Default stream goal + pub goal: Option, } #[derive(Default, Debug, Clone, Type)] @@ -64,4 +76,56 @@ pub struct UserStream { pub duration: f32, pub fee: Option, pub event: Option, + pub endpoint_id: Option, + pub last_segment: Option>, +} + +#[derive(Debug, Clone, FromRow)] +pub struct UserStreamForward { + pub id: u64, + pub user_id: u64, + pub name: String, + pub target: String, +} + +#[derive(Debug, Clone, FromRow)] +pub struct UserStreamKey { + pub id: u64, + pub user_id: u64, + pub key: String, + pub created: DateTime, + pub expires: Option>, + pub stream_id: String, +} + +#[derive(Default, Debug, Clone, Type)] +#[repr(u8)] +pub enum PaymentType { + #[default] + TopUp = 0, + Zap = 1, + Credit = 2, + Withdrawal = 3, + AdmissionFee = 4, +} + +#[derive(Debug, Clone, FromRow)] +pub struct Payment { + pub payment_hash: Vec, + pub user_id: u64, + pub invoice: Option, + pub is_paid: bool, + pub amount: u64, + pub created: DateTime, + pub nostr: Option, + pub payment_type: PaymentType, + pub fee: u64, +} + +#[derive(Debug, Clone, FromRow)] +pub struct IngestEndpoint { + pub id: u64, + pub name: String, + pub cost: u64, + pub capabilities: Option, // JSON array stored as string } diff --git a/crates/zap-stream/Cargo.toml b/crates/zap-stream/Cargo.toml index b64f042..0dde37f 100644 --- a/crates/zap-stream/Cargo.toml +++ b/crates/zap-stream/Cargo.toml @@ -40,4 +40,5 @@ base64 = { version = "0.22.1" } sha2 = { version = "0.10.8" } pretty_env_logger = "0.5.0" clap = { version = "4.5.16", features = ["derive"] } -futures-util = "0.3.31" \ No newline at end of file +futures-util = "0.3.31" +matchit = "0.8.4" \ No newline at end of file diff --git a/crates/zap-stream/config.yaml b/crates/zap-stream/config.yaml index 9c5e042..7f6f8af 100755 --- a/crates/zap-stream/config.yaml +++ b/crates/zap-stream/config.yaml @@ -42,7 +42,6 @@ listen_http: "127.0.0.1:8080" # overseer: zap-stream: - cost: 16 nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" #blossom: # - "http://localhost:8881" diff --git a/crates/zap-stream/src/api.rs b/crates/zap-stream/src/api.rs index 4d3987e..c82ddab 100644 --- a/crates/zap-stream/src/api.rs +++ b/crates/zap-stream/src/api.rs @@ -3,27 +3,62 @@ use crate::settings::Settings; use crate::ListenerEndpoint; use anyhow::{anyhow, bail, Result}; use bytes::Bytes; -use fedimint_tonic_lnd::tonic::codegen::Body; +use chrono::{DateTime, Utc}; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; use hyper::body::Incoming; use hyper::{Method, Request, Response}; -use nostr_sdk::{serde_json, Event, PublicKey}; +use matchit::Router; +use nostr_sdk::{serde_json, PublicKey}; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::str::FromStr; use url::Url; +use uuid::Uuid; use zap_stream_db::ZapStreamDb; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Route { + Account, + Topup, + Event, + Withdraw, + Forward, + ForwardId, + History, + Keys, +} + #[derive(Clone)] pub struct Api { db: ZapStreamDb, settings: Settings, + lnd: fedimint_tonic_lnd::Client, + router: Router, } impl Api { - pub fn new(db: ZapStreamDb, settings: Settings) -> Self { - Self { db, settings } + pub fn new(db: ZapStreamDb, settings: Settings, lnd: fedimint_tonic_lnd::Client) -> Self { + let mut router = Router::new(); + + // Define routes (path only, method will be matched separately) + router.insert("/api/v1/account", Route::Account).unwrap(); + router.insert("/api/v1/topup", Route::Topup).unwrap(); + router.insert("/api/v1/event", Route::Event).unwrap(); + router.insert("/api/v1/withdraw", Route::Withdraw).unwrap(); + router.insert("/api/v1/forward", Route::Forward).unwrap(); + router + .insert("/api/v1/forward/{id}", Route::ForwardId) + .unwrap(); + router.insert("/api/v1/history", Route::History).unwrap(); + router.insert("/api/v1/keys", Route::Keys).unwrap(); + + Self { + db, + settings, + lnd, + router, + } } pub async fn handler( @@ -32,60 +67,111 @@ impl Api { ) -> Result>, anyhow::Error> { let base = Response::builder() .header("server", "zap-stream") + .header("content-type", "application/json") .header("access-control-allow-origin", "*") .header("access-control-allow-headers", "*") - .header("access-control-allow-methods", "HEAD, GET"); + .header( + "access-control-allow-methods", + "HEAD, GET, PATCH, DELETE, POST, OPTIONS", + ); - Ok(match (req.method(), req.uri().path()) { - (&Method::GET, "/api/v1/account") => { - let auth = check_nip98_auth(&req)?; - let rsp = self.get_account(&auth.pubkey).await?; - return Ok(base.body(Self::body_json(&rsp)?)?); - } - (&Method::PATCH, "/api/v1/account") => { - let auth = check_nip98_auth(&req)?; - let body = req.collect().await?.to_bytes(); - let r_body: PatchAccount = serde_json::from_slice(&body)?; - let rsp = self.update_account(&auth.pubkey, r_body).await?; - return Ok(base.body(Self::body_json(&rsp)?)?); - } - (&Method::GET, "/api/v1/topup") => { - let auth = check_nip98_auth(&req)?; - let url: Url = req.uri().to_string().parse()?; - let amount: usize = url - .query_pairs() - .find_map(|(k, v)| if k == "amount" { Some(v) } else { None }) - .and_then(|v| v.parse().ok()) - .ok_or(anyhow!("Missing amount"))?; - let rsp = self.topup(&auth.pubkey, amount).await?; - return Ok(base.body(Self::body_json(&rsp)?)?); - } - (&Method::PATCH, "/api/v1/event") => { - bail!("Not implemented") - } - (&Method::POST, "/api/v1/withdraw") => { - bail!("Not implemented") - } - (&Method::POST, "/api/v1/account/forward") => { - bail!("Not implemented") - } - (&Method::DELETE, "/api/v1/account/forward/") => { - bail!("Not implemented") - } - (&Method::GET, "/api/v1/account/history") => { - bail!("Not implemented") - } - (&Method::GET, "/api/v1/account/keys") => { - bail!("Not implemented") - } - _ => { - if req.method() == Method::OPTIONS { - base.body(Default::default())? - } else { - base.status(404).body(Default::default())? + // Handle OPTIONS requests + if req.method() == Method::OPTIONS { + return Ok(base.body(Default::default())?); + } + + // Route matching + let path = req.uri().path(); + let matched = self.router.at(path); + + if let Ok(matched) = matched { + let route = *matched.value; + let params = matched.params; + + match (req.method(), route) { + (&Method::GET, Route::Account) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let rsp = self.get_account(&auth.pubkey).await?; + Ok(base.body(Self::body_json(&rsp)?)?) } + (&Method::PATCH, Route::Account) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let body = req.collect().await?.to_bytes(); + let r_body: PatchAccount = serde_json::from_slice(&body)?; + let rsp = self.update_account(&auth.pubkey, r_body).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::GET, Route::Topup) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let url: Url = req.uri().to_string().parse()?; + let amount: usize = url + .query_pairs() + .find_map(|(k, v)| if k == "amount" { Some(v) } else { None }) + .and_then(|v| v.parse().ok()) + .ok_or(anyhow!("Missing amount"))?; + let rsp = self.topup(&auth.pubkey, amount).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::PATCH, Route::Event) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let body = req.collect().await?.to_bytes(); + let patch_event: PatchEvent = serde_json::from_slice(&body)?; + let rsp = self.update_event(&auth.pubkey, patch_event).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::POST, Route::Withdraw) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let url: Url = req.uri().to_string().parse()?; + let invoice = url + .query_pairs() + .find_map(|(k, v)| { + if k == "invoice" { + Some(v.to_string()) + } else { + None + } + }) + .ok_or(anyhow!("Missing invoice parameter"))?; + let rsp = self.withdraw(&auth.pubkey, invoice).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::POST, Route::Forward) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let body = req.collect().await?.to_bytes(); + let forward_req: ForwardRequest = serde_json::from_slice(&body)?; + let rsp = self.create_forward(&auth.pubkey, forward_req).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::DELETE, Route::ForwardId) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let forward_id = params + .get("id") + .ok_or_else(|| anyhow!("Missing forward ID"))?; + let rsp = self.delete_forward(&auth.pubkey, forward_id).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::GET, Route::History) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let rsp = self.get_account_history(&auth.pubkey).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::GET, Route::Keys) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let rsp = self.get_account_keys(&auth.pubkey).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + (&Method::POST, Route::Keys) => { + let auth = check_nip98_auth(&req, &self.settings.public_url)?; + let body = req.collect().await?.to_bytes(); + let create_req: CreateStreamKeyRequest = serde_json::from_slice(&body)?; + let rsp = self.create_stream_key(&auth.pubkey, create_req).await?; + Ok(base.body(Self::body_json(&rsp)?)?) + } + _ => Ok(base.status(405).body(Default::default())?), // Method not allowed } - }) + } else { + Ok(base.status(404).body(Default::default())?) // Not found + } } fn body_json(obj: &T) -> Result> { @@ -98,79 +184,435 @@ impl Api { let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; let user = self.db.get_user(uid).await?; - Ok(AccountInfo { - endpoints: self - .settings - .endpoints - .iter() - .filter_map(|e| match ListenerEndpoint::from_str(&e).ok()? { + // Get user forwards + let forwards = self.db.get_user_forwards(uid).await?; + + // Get ingest endpoints from database + let db_ingest_endpoints = self.db.get_ingest_endpoints().await?; + + // Create 2D array: settings endpoints × database ingest endpoints + let mut endpoints = Vec::new(); + + for setting_endpoint in &self.settings.endpoints { + if let Ok(listener_endpoint) = ListenerEndpoint::from_str(&setting_endpoint) { + match listener_endpoint { ListenerEndpoint::SRT { endpoint } => { - let addr: SocketAddr = endpoint.parse().ok()?; - Some(Endpoint { - name: "SRT".to_string(), - url: format!( - "srt://{}:{}", - self.settings.endpoints_public_hostname, - addr.port() - ), - key: user.stream_key.clone(), - capabilities: vec![], - }) + if let Ok(addr) = endpoint.parse::() { + for ingest in &db_ingest_endpoints { + endpoints.push(Endpoint { + name: format!("SRT-{}", ingest.name), + url: format!( + "srt://{}:{}", + self.settings.endpoints_public_hostname, + addr.port() + ), + key: user.stream_key.clone(), + capabilities: ingest + .capabilities + .as_ref() + .map(|c| { + c.split(',').map(|s| s.trim().to_string()).collect() + }) + .unwrap_or_else(Vec::new), + cost: EndpointCost { + unit: "min".to_string(), + rate: ingest.cost as f32 / 1000.0, + }, + }); + } + } } ListenerEndpoint::RTMP { endpoint } => { - let addr: SocketAddr = endpoint.parse().ok()?; - Some(Endpoint { - name: "RTMP".to_string(), - url: format!( - "rtmp://{}:{}", - self.settings.endpoints_public_hostname, - addr.port() - ), - key: user.stream_key.clone(), - capabilities: vec![], - }) + if let Ok(addr) = endpoint.parse::() { + for ingest in &db_ingest_endpoints { + endpoints.push(Endpoint { + name: format!("RTMP-{}", ingest.name), + url: format!( + "rtmp://{}:{}", + self.settings.endpoints_public_hostname, + addr.port() + ), + key: user.stream_key.clone(), + capabilities: ingest + .capabilities + .as_ref() + .map(|c| { + c.split(',').map(|s| s.trim().to_string()).collect() + }) + .unwrap_or_else(Vec::new), + cost: EndpointCost { + unit: "min".to_string(), + rate: ingest.cost as f32 / 1000.0, + }, + }); + } + } } ListenerEndpoint::TCP { endpoint } => { - let addr: SocketAddr = endpoint.parse().ok()?; - Some(Endpoint { - name: "TCP".to_string(), - url: format!( - "tcp://{}:{}", - self.settings.endpoints_public_hostname, - addr.port() - ), - key: user.stream_key.clone(), - capabilities: vec![], - }) + if let Ok(addr) = endpoint.parse::() { + for ingest in &db_ingest_endpoints { + endpoints.push(Endpoint { + name: format!("TCP-{}", ingest.name), + url: format!( + "tcp://{}:{}", + self.settings.endpoints_public_hostname, + addr.port() + ), + key: user.stream_key.clone(), + capabilities: ingest + .capabilities + .as_ref() + .map(|c| { + c.split(',').map(|s| s.trim().to_string()).collect() + }) + .unwrap_or_else(Vec::new), + cost: EndpointCost { + unit: "min".to_string(), + rate: ingest.cost as f32 / 1000.0, + }, + }); + } + } } - ListenerEndpoint::File { .. } => None, - ListenerEndpoint::TestPattern => None, - }) - .collect(), - event: None, + ListenerEndpoint::File { .. } => {} + ListenerEndpoint::TestPattern => {} + } + } + } + + Ok(AccountInfo { + endpoints, balance: user.balance as u64, tos: AccountTos { accepted: user.tos_accepted.is_some(), link: "https://zap.stream/tos".to_string(), }, + forwards: forwards + .into_iter() + .map(|f| ForwardDest { + id: f.id, + name: f.name, + }) + .collect(), + details: Some(PatchEventDetails { + title: user.title, + summary: user.summary, + image: user.image, + tags: user + .tags + .map(|t| t.split(',').map(|s| s.to_string()).collect()), + content_warning: user.content_warning, + goal: user.goal, + }), }) } async fn update_account(&self, pubkey: &PublicKey, account: PatchAccount) -> Result<()> { - bail!("Not implemented") + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + + if let Some(accept_tos) = account.accept_tos { + if accept_tos { + let user = self.db.get_user(uid).await?; + if user.tos_accepted.is_none() { + self.db.accept_tos(uid).await?; + } + } + } + + Ok(()) } async fn topup(&self, pubkey: &PublicKey, amount: usize) -> Result { - bail!("Not implemented") + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + + // Create Lightning invoice + let invoice_req = fedimint_tonic_lnd::lnrpc::Invoice { + value: amount as i64, + memo: format!( + "zap.stream topup for user {}", + hex::encode(pubkey.to_bytes()) + ), + ..Default::default() + }; + + let response = self + .lnd + .clone() + .lightning() + .add_invoice(invoice_req) + .await?; + let invoice_response = response.into_inner(); + + // Create payment entry for this topup invoice + let payment_hash = hex::decode(&invoice_response.r_hash)?; + self.db + .create_payment( + &payment_hash, + uid, + Some(&invoice_response.payment_request), + amount as u64 * 1000, // Convert to milli-sats + zap_stream_db::PaymentType::TopUp, + 0, + ) + .await?; + + Ok(TopupResponse { + pr: invoice_response.payment_request, + }) + } + + async fn update_event(&self, pubkey: &PublicKey, patch_event: PatchEvent) -> Result<()> { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + + if let Some(stream_id) = patch_event.id { + // Update specific stream + let stream_uuid = Uuid::parse_str(&stream_id)?; + let mut stream = self.db.get_stream(&stream_uuid).await?; + + // Verify user owns this stream + if stream.user_id != uid { + bail!("Unauthorized: Stream belongs to different user"); + } + + // Update stream with patch data + if let Some(title) = patch_event.title { + stream.title = Some(title); + } + if let Some(summary) = patch_event.summary { + stream.summary = Some(summary); + } + if let Some(image) = patch_event.image { + stream.image = Some(image); + } + if let Some(tags) = patch_event.tags { + stream.tags = Some(tags.join(",")); + } + if let Some(content_warning) = patch_event.content_warning { + stream.content_warning = Some(content_warning); + } + if let Some(goal) = patch_event.goal { + stream.goal = Some(goal); + } + + self.db.update_stream(&stream).await?; + + // TODO: Update the nostr event and republish like C# version + } else { + // Update user default stream info + self.db + .update_user_defaults( + uid, + patch_event.title.as_deref(), + patch_event.summary.as_deref(), + patch_event.image.as_deref(), + patch_event.tags.as_ref().map(|t| t.join(",")).as_deref(), + patch_event.content_warning.as_deref(), + patch_event.goal.as_deref(), + ) + .await?; + } + + Ok(()) + } + + async fn withdraw(&self, pubkey: &PublicKey, invoice: String) -> Result { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + let user = self.db.get_user(uid).await?; + + let mut lnd = self.lnd.clone(); + + // Decode invoice to get amount and payment hash + let decode_req = fedimint_tonic_lnd::lnrpc::PayReqString { + pay_req: invoice.clone(), + }; + let decode_response = lnd.lightning().decode_pay_req(decode_req).await?; + let decoded = decode_response.into_inner(); + let invoice_amount = decoded.num_msat as u64; + let payment_hash = hex::decode(decoded.payment_hash)?; + + // Check if user has sufficient balance + if user.balance < invoice_amount as i64 { + bail!("Insufficient balance"); + } + + // 1. Deduct balance first (safer approach) + self.db + .update_user_balance(uid, -(invoice_amount as i64)) + .await?; + + // 2. Create payment record + self.db + .create_payment( + &payment_hash, + uid, + Some(&invoice), + invoice_amount, + zap_stream_db::PaymentType::Withdrawal, + 0, + ) + .await?; + + // 3. Attempt Lightning payment + let send_req = fedimint_tonic_lnd::lnrpc::SendRequest { + payment_request: invoice.clone(), + ..Default::default() + }; + + let response = lnd.lightning().send_payment_sync(send_req).await; + + match response { + Ok(resp) => { + let payment_response = resp.into_inner(); + if payment_response.payment_error.is_empty() { + // Payment successful + let fee = payment_response + .payment_route + .map(|r| r.total_fees_msat) + .unwrap_or(0); + + // Update payment record with fee and mark as paid + self.db.complete_payment(&payment_hash, fee as u64).await?; + + // Deduct additional fee if any + if fee > 0 { + self.db.update_user_balance(uid, -fee).await?; + } + + Ok(WithdrawResponse { + fee, + preimage: hex::encode(payment_response.payment_preimage), + }) + } else { + // Payment failed, reverse balance deduction + self.db + .update_user_balance(uid, invoice_amount as i64) + .await?; + bail!("Payment failed: {}", payment_response.payment_error); + } + } + Err(e) => { + // Payment failed, reverse balance deduction + self.db + .update_user_balance(uid, invoice_amount as i64) + .await?; + bail!("Payment failed: {}", e); + } + } + } + + async fn create_forward( + &self, + pubkey: &PublicKey, + req: ForwardRequest, + ) -> Result { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + let forward_id = self.db.create_forward(uid, &req.name, &req.target).await?; + + Ok(ForwardResponse { id: forward_id }) + } + + async fn delete_forward(&self, pubkey: &PublicKey, forward_id: &str) -> Result<()> { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + let forward_id: u64 = forward_id.parse()?; + self.db.delete_forward(uid, forward_id).await?; + Ok(()) + } + + async fn get_account_history(&self, pubkey: &PublicKey) -> Result { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + + // For now, just get first page with default page size + let payments = self.db.get_payment_history(uid, 0, 100).await?; + + let items = payments + .into_iter() + .map(|p| HistoryEntry { + payment_hash: hex::encode(p.payment_hash), + amount: p.amount as i64, + timestamp: p.created.timestamp(), + payment_type: match p.payment_type { + zap_stream_db::PaymentType::TopUp => "topup".to_string(), + zap_stream_db::PaymentType::Zap => "zap".to_string(), + zap_stream_db::PaymentType::Credit => "credit".to_string(), + zap_stream_db::PaymentType::Withdrawal => "withdrawal".to_string(), + zap_stream_db::PaymentType::AdmissionFee => "admission_fee".to_string(), + }, + is_paid: p.is_paid, + fee: p.fee, + }) + .collect(); + + Ok(HistoryResponse { + items, + page: 0, + page_size: 100, + }) + } + + async fn get_account_keys(&self, pubkey: &PublicKey) -> Result> { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + let keys = self.db.get_user_stream_keys(uid).await?; + + Ok(keys + .into_iter() + .map(|k| StreamKey { + id: k.id, + key: k.key, + created: k.created.timestamp(), + expires: k.expires.map(|e| e.timestamp()), + stream_id: k.stream_id, + }) + .collect()) + } + + async fn create_stream_key( + &self, + pubkey: &PublicKey, + req: CreateStreamKeyRequest, + ) -> Result { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + + // Create a new stream record for this key + let stream_id = Uuid::new_v4(); + let new_stream = zap_stream_db::UserStream { + id: stream_id.to_string(), + user_id: uid, + starts: Utc::now(), + state: zap_stream_db::UserStreamState::Planned, + title: req.event.title, + summary: req.event.summary, + image: req.event.image, + tags: req.event.tags.map(|t| t.join(",")), + content_warning: req.event.content_warning, + goal: req.event.goal, + ..Default::default() + }; + + // Create the stream record + self.db.insert_stream(&new_stream).await?; + + // Generate a new stream key + let key = Uuid::new_v4().to_string(); + let _key_id = self + .db + .create_stream_key(uid, &key, req.expires, &stream_id.to_string()) + .await?; + + // For now, return minimal response - event building would require nostr integration + Ok(CreateStreamKeyResponse { + key, + event: None, // TODO: Build proper nostr event like C# version + }) } } #[derive(Deserialize, Serialize)] struct AccountInfo { pub endpoints: Vec, - pub event: Option, pub balance: u64, pub tos: AccountTos, + pub forwards: Vec, + pub details: Option, } #[derive(Deserialize, Serialize)] @@ -179,12 +621,13 @@ struct Endpoint { pub url: String, pub key: String, pub capabilities: Vec, + pub cost: EndpointCost, } #[derive(Deserialize, Serialize)] struct EndpointCost { pub unit: String, - pub rate: u16, + pub rate: f32, } #[derive(Deserialize, Serialize)] @@ -202,3 +645,91 @@ struct PatchAccount { struct TopupResponse { pub pr: String, } + +#[derive(Deserialize, Serialize)] +struct WithdrawRequest { + pub payment_request: String, + pub amount: u64, +} + +#[derive(Deserialize, Serialize)] +struct WithdrawResponse { + pub fee: i64, + pub preimage: String, +} + +#[derive(Deserialize, Serialize)] +struct ForwardRequest { + pub name: String, + pub target: String, +} + +#[derive(Deserialize, Serialize)] +struct ForwardResponse { + pub id: u64, +} + +#[derive(Deserialize, Serialize)] +struct HistoryEntry { + pub payment_hash: String, + pub amount: i64, + pub timestamp: i64, + pub payment_type: String, + pub is_paid: bool, + pub fee: u64, +} + +#[derive(Deserialize, Serialize)] +struct HistoryResponse { + pub items: Vec, + pub page: i32, + pub page_size: i32, +} + +#[derive(Deserialize, Serialize)] +struct StreamKey { + pub id: u64, + pub key: String, + pub created: i64, + pub expires: Option, + pub stream_id: String, +} + +#[derive(Deserialize, Serialize)] +struct CreateStreamKeyRequest { + pub event: PatchEventDetails, + pub expires: Option>, +} + +#[derive(Deserialize, Serialize)] +struct CreateStreamKeyResponse { + pub key: String, + pub event: Option, +} + +#[derive(Deserialize, Serialize)] +struct PatchEvent { + pub id: Option, + pub title: Option, + pub summary: Option, + pub image: Option, + pub tags: Option>, + pub content_warning: Option, + pub goal: Option, +} + +#[derive(Deserialize, Serialize)] +struct PatchEventDetails { + pub title: Option, + pub summary: Option, + pub image: Option, + pub tags: Option>, + pub content_warning: Option, + pub goal: Option, +} + +#[derive(Deserialize, Serialize)] +struct ForwardDest { + pub id: u64, + pub name: String, +} diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index 7c67bff..851665e 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -1,23 +1,21 @@ use crate::api::Api; -use crate::overseer::ZapStreamOverseer; use anyhow::{bail, Result}; use base64::Engine; use bytes::Bytes; +use chrono::{DateTime, Utc}; use futures_util::TryStreamExt; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full, StreamBody}; use hyper::body::{Frame, Incoming}; use hyper::service::Service; use hyper::{Method, Request, Response}; -use log::{error, info}; -use nostr_sdk::{serde_json, Event}; +use log::error; +use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind}; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; -use std::sync::Arc; use tokio::fs::File; use tokio_util::io::ReaderStream; -use zap_stream_core::overseer::Overseer; #[derive(Clone)] pub struct HttpServer { @@ -98,7 +96,13 @@ impl Service> for HttpServer { } } -pub fn check_nip98_auth(req: &Request) -> Result { +#[derive(Debug, Clone)] +pub struct AuthResult { + pub pubkey: PublicKey, + pub event: Event, +} + +pub fn check_nip98_auth(req: &Request, public_url: &str) -> Result { let auth = if let Some(a) = req.headers().get("authorization") { a.to_str()? } else { @@ -109,10 +113,68 @@ pub fn check_nip98_auth(req: &Request) -> Result { bail!("Invalid authorization scheme"); } - let json = - String::from_utf8(base64::engine::general_purpose::STANDARD.decode(auth[6..].as_bytes())?)?; - info!("{}", json); + let token = &auth[6..]; + let decoded = base64::engine::general_purpose::STANDARD.decode(token.as_bytes())?; - // TODO: check tags - Ok(serde_json::from_str::(&json)?) + // Check if decoded data starts with '{' + if decoded.is_empty() || decoded[0] != b'{' { + bail!("Invalid token"); + } + + let json = String::from_utf8(decoded)?; + let event: Event = serde_json::from_str(&json)?; + + // Verify signature + if !event.verify().is_ok() { + bail!("Invalid nostr event, invalid signature"); + } + + // Check event kind (NIP-98: HTTP Auth, kind 27235) + if event.kind != Kind::Custom(27235) { + bail!("Invalid nostr event, wrong kind"); + } + + // Check timestamp (within 120 seconds) + let now = Utc::now(); + let event_time = DateTime::from_timestamp(event.created_at.as_u64() as i64, 0) + .ok_or_else(|| anyhow::anyhow!("Invalid timestamp"))?; + let diff_seconds = (now - event_time).num_seconds().abs(); + if diff_seconds > 120 { + bail!("Invalid nostr event, timestamp out of range"); + } + + // Check URL tag (full URI) + let url_tag = event + .tags + .iter() + .find(|tag| tag.kind() == TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::U))) + .and_then(|tag| tag.content()) + .ok_or_else(|| anyhow::anyhow!("Missing URL tag"))?; + + // Construct full URI using public_url + path + query + let request_uri = match req.uri().query() { + Some(query) => format!("{}{}?{}", public_url.trim_end_matches('/'), req.uri().path(), query), + None => format!("{}{}", public_url.trim_end_matches('/'), req.uri().path()), + }; + + if !url_tag.eq_ignore_ascii_case(&request_uri) { + bail!("Invalid nostr event, URL tag invalid. Expected: {}, Got: {}", request_uri, url_tag); + } + + // Check method tag + let method_tag = event + .tags + .iter() + .find(|tag| tag.kind() == TagKind::Method) + .and_then(|tag| tag.content()) + .ok_or_else(|| anyhow::anyhow!("Missing method tag"))?; + + if !method_tag.eq_ignore_ascii_case(req.method().as_str()) { + bail!("Invalid nostr event, method tag invalid"); + } + + Ok(AuthResult { + pubkey: event.pubkey.clone(), + event, + }) } diff --git a/crates/zap-stream/src/main.rs b/crates/zap-stream/src/main.rs index 20c8841..1670669 100644 --- a/crates/zap-stream/src/main.rs +++ b/crates/zap-stream/src/main.rs @@ -71,7 +71,7 @@ async fn main() -> Result<()> { let http_addr: SocketAddr = settings.listen_http.parse()?; let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); - let api = Api::new(overseer.database(), settings.clone()); + let api = Api::new(overseer.database(), settings.clone(), overseer.lnd_client()); // HTTP server let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api); tasks.push(tokio::spawn(async move { diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index b880aad..1e07360 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -8,7 +8,6 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use log::{error, info, warn}; use nostr_sdk::prelude::Coordinate; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; -use serde::Serialize; use std::collections::HashSet; use std::path::PathBuf; use std::str::FromStr; @@ -44,8 +43,6 @@ pub struct ZapStreamOverseer { blossom_servers: Vec, /// Public facing URL pointing to [out_dir] public_url: String, - /// Cost / second / variant - cost: i64, /// Currently active streams /// Any streams which are not contained in this set are dead active_streams: Arc>>, @@ -60,7 +57,6 @@ impl ZapStreamOverseer { lnd: &LndSettings, relays: &Vec, blossom_servers: &Option>, - cost: i64, ) -> Result { let db = ZapStreamDb::new(db).await?; db.migrate().await?; @@ -112,15 +108,18 @@ impl ZapStreamOverseer { .map(|b| Blossom::new(b)) .collect(), public_url: public_url.clone(), - cost, active_streams: Arc::new(RwLock::new(HashSet::new())), }) } - pub(crate) fn database(&self) -> ZapStreamDb { + pub fn database(&self) -> ZapStreamDb { self.db.clone() } + pub fn lnd_client(&self) -> fedimint_tonic_lnd::Client { + self.lnd.clone() + } + fn stream_to_event_builder(&self, stream: &UserStream) -> Result { let mut tags = vec![ Tag::parse(&["d".to_string(), stream.id.to_string()])?, @@ -224,15 +223,6 @@ impl ZapStreamOverseer { } } -#[derive(Serialize)] -struct Endpoint {} - -#[derive(Serialize)] -struct AccountInfo { - pub endpoints: Vec, - pub event: Event, - pub balance: u64, -} #[async_trait] impl Overseer for ZapStreamOverseer { async fn check_streams(&self) -> Result<()> { @@ -270,7 +260,19 @@ impl Overseer for ZapStreamOverseer { bail!("Not enough balance"); } - let variants = get_default_variants(&stream_info)?; + // Get ingest endpoint configuration based on connection type + let endpoint_id = self.detect_endpoint(&connection).await?; + let endpoint = if let Some(id) = endpoint_id { + self.db.get_ingest_endpoint(id).await? + } else { + None + }; + + let variants = if let Some(endpoint) = &endpoint { + get_variants_from_endpoint(&stream_info, endpoint)? + } else { + get_default_variants(&stream_info)? + }; let mut egress = vec![]; egress.push(EgressType::HLS(EgressConfig { @@ -285,6 +287,7 @@ impl Overseer for ZapStreamOverseer { user_id: uid, starts: Utc::now(), state: UserStreamState::Live, + endpoint_id, ..Default::default() }; let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; @@ -312,7 +315,21 @@ impl Overseer for ZapStreamOverseer { let stream = self.db.get_stream(pipeline_id).await?; let duration = added.iter().fold(0.0, |acc, v| acc + v.duration); - let cost = self.cost * duration.round() as i64; + + // Get the cost per minute from the ingest endpoint, or use default + let cost_per_minute = if let Some(endpoint_id) = stream.endpoint_id { + if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? { + endpoint.cost + } else { + 0 + } + } else { + 0 + }; + + // Convert duration from seconds to minutes and calculate cost + let duration_minutes = duration / 60.0; + let cost = (cost_per_minute as f32 * duration_minutes).round() as i64; let bal = self .db .tick_stream(pipeline_id, stream.user_id, duration, cost) @@ -455,3 +472,141 @@ fn get_default_variants(info: &IngressInfo) -> Result> { Ok(vars) } + +impl ZapStreamOverseer { + /// Detect which ingest endpoint should be used based on connection info + async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result> { + // Get all ingest endpoints and match by name against connection endpoint + let endpoints = self.db.get_ingest_endpoints().await?; + + for endpoint in endpoints { + if endpoint.name == connection.endpoint { + return Ok(Some(endpoint.id)); + } + } + + // No matching endpoint found + Ok(None) + } +} + +fn get_variants_from_endpoint( + info: &IngressInfo, + endpoint: &zap_stream_db::IngestEndpoint, +) -> Result> { + let capabilities_str = endpoint.capabilities.as_deref().unwrap_or(""); + let capabilities: Vec<&str> = capabilities_str.split(',').collect(); + + let mut vars: Vec = vec![]; + + let video_src = info + .streams + .iter() + .find(|c| c.stream_type == IngressStreamType::Video); + let audio_src = info + .streams + .iter() + .find(|c| c.stream_type == IngressStreamType::Audio); + + // Parse all variant capabilities and create grouped variants + let mut group_id = 0usize; + let mut dst_index = 0; + + for capability in capabilities { + let parts: Vec<&str> = capability.split(':').collect(); + + if parts.len() >= 2 && parts[0] == "variant" && parts[1] == "source" { + // Add copy variant (group for source) + if let Some(video_src) = video_src { + vars.push(VariantStream::CopyVideo(VariantMapping { + id: Uuid::new_v4(), + src_index: video_src.index, + dst_index, + group_id, + })); + dst_index += 1; + } + + if let Some(audio_src) = audio_src { + vars.push(VariantStream::CopyAudio(VariantMapping { + id: Uuid::new_v4(), + src_index: audio_src.index, + dst_index, + group_id, + })); + dst_index += 1; + } + + group_id += 1; + } else if parts.len() >= 3 && parts[0] == "variant" { + if let (Ok(target_height), Ok(bitrate)) = + (parts[1].parse::(), parts[2].parse::()) + { + // Add video variant for this group + if let Some(video_src) = video_src { + // Calculate dimensions maintaining aspect ratio + let input_width = video_src.width as f32; + let input_height = video_src.height as f32; + let aspect_ratio = input_width / input_height; + + let output_height = target_height; + let output_width = (output_height as f32 * aspect_ratio).round() as u16; + + // Ensure even dimensions for H.264 compatibility + let output_width = if output_width % 2 == 1 { + output_width + 1 + } else { + output_width + }; + let output_height = if output_height % 2 == 1 { + output_height + 1 + } else { + output_height + } as u16; + + vars.push(VariantStream::Video(VideoVariant { + mapping: VariantMapping { + id: Uuid::new_v4(), + src_index: video_src.index, + dst_index, + group_id, + }, + width: output_width, + height: output_height, + fps: video_src.fps, + bitrate: bitrate as u64, + codec: "libx264".to_string(), + profile: 77, // AV_PROFILE_H264_MAIN + level: 51, + keyframe_interval: video_src.fps as u16 * 2, + pixel_format: AV_PIX_FMT_YUV420P as u32, + })); + dst_index += 1; + } + + // Add audio variant for the same group + if let Some(audio_src) = audio_src { + vars.push(VariantStream::Audio(AudioVariant { + mapping: VariantMapping { + id: Uuid::new_v4(), + src_index: audio_src.index, + dst_index, + group_id, + }, + bitrate: 192_000, + codec: "aac".to_string(), + channels: 2, + sample_rate: 48_000, + sample_fmt: "fltp".to_owned(), + })); + dst_index += 1; + } + + group_id += 1; + } + } + // Handle other capabilities like dvr:720h here if needed + } + + Ok(vars) +} diff --git a/crates/zap-stream/src/settings.rs b/crates/zap-stream/src/settings.rs index 6e2ac79..602b4f8 100644 --- a/crates/zap-stream/src/settings.rs +++ b/crates/zap-stream/src/settings.rs @@ -49,8 +49,6 @@ pub enum OverseerConfig { nsec: String, /// Blossom servers blossom: Option>, - /// Cost (milli-sats) / second / variant - cost: i64, }, } @@ -70,7 +68,6 @@ impl Settings { lnd, relays, blossom, - cost, } => Ok(Arc::new( ZapStreamOverseer::new( &self.output_dir, @@ -80,7 +77,6 @@ impl Settings { lnd, relays, blossom, - *cost, ) .await?, )),