feat: pure vibes

feat: implement API
This commit is contained in:
2025-06-06 12:07:17 +01:00
parent 41773cc3a0
commit b295d7e7be
12 changed files with 1258 additions and 150 deletions

10
Cargo.lock generated
View File

@ -269,7 +269,7 @@ dependencies = [
"http-body 0.4.6", "http-body 0.4.6",
"hyper 0.14.32", "hyper 0.14.32",
"itoa", "itoa",
"matchit", "matchit 0.7.3",
"memchr", "memchr",
"mime", "mime",
"percent-encoding", "percent-encoding",
@ -588,6 +588,7 @@ dependencies = [
"iana-time-zone", "iana-time-zone",
"js-sys", "js-sys",
"num-traits", "num-traits",
"serde",
"wasm-bindgen", "wasm-bindgen",
"windows-targets 0.52.6", "windows-targets 0.52.6",
] ]
@ -2090,6 +2091,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f926ade0c4e170215ae43342bf13b9310a437609c81f29f86c5df6657582ef9"
[[package]] [[package]]
name = "md-5" name = "md-5"
version = "0.10.6" version = "0.10.6"
@ -4786,6 +4793,7 @@ dependencies = [
"hyper 1.6.0", "hyper 1.6.0",
"hyper-util", "hyper-util",
"log", "log",
"matchit 0.8.6",
"nostr-sdk", "nostr-sdk",
"pretty_env_logger", "pretty_env_logger",
"reqwest", "reqwest",

View File

@ -16,5 +16,5 @@ uuid = { version = "1.8.0", features = ["v4", "serde"] }
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
url = "2.5.0" url = "2.5.0"
itertools = "0.14.0" itertools = "0.14.0"
chrono = "^0.4.38" chrono = { version = "^0.4.38", features = ["serde"] }
hex = "0.4.3" hex = "0.4.3"

View File

@ -1,17 +1,33 @@
-- Add migration script here -- Add migration script here
create table user create table user
( (
id integer unsigned not null auto_increment primary key, id integer unsigned not null auto_increment primary key,
pubkey binary(32) not null, pubkey binary(32) not null,
created timestamp default current_timestamp, created timestamp not null default current_timestamp,
balance bigint not null default 0, balance bigint not null default 0,
tos_accepted timestamp, tos_accepted timestamp,
stream_key text not null default uuid(), stream_key text not null default uuid(),
is_admin bool not null default false, is_admin bool not null default false,
is_blocked bool not null default false, is_blocked bool not null default false,
recording bool not null default false recording bool not null default false,
title text,
summary text,
image text,
tags text,
content_warning text,
goal text
); );
create unique index ix_user_pubkey on user (pubkey); create unique index ix_user_pubkey on user (pubkey);
-- Add ingest endpoints table for pipeline configuration (must come before user_stream)
create table ingest_endpoint
(
id integer unsigned not null auto_increment primary key,
name varchar(255) not null,
cost bigint unsigned not null default 10000,
capabilities text
);
create table user_stream create table user_stream
( (
id varchar(50) not null primary key, id varchar(50) not null primary key,
@ -35,7 +51,56 @@ create table user_stream
fee integer unsigned, fee integer unsigned,
-- current nostr event json -- current nostr event json
event text, event text,
-- endpoint id if using specific endpoint
endpoint_id integer unsigned,
-- timestamp of last segment
last_segment timestamp,
constraint fk_user_stream_user constraint fk_user_stream_user
foreign key (user_id) references user (id),
constraint fk_user_stream_endpoint
foreign key (endpoint_id) references ingest_endpoint (id)
);
-- Add forwards table for payment forwarding
create table user_stream_forward
(
id integer unsigned not null auto_increment primary key,
user_id integer unsigned not null,
name text not null,
target text not null,
constraint fk_user_stream_forward_user
foreign key (user_id) references user (id) foreign key (user_id) references user (id)
); );
-- Add keys table for stream keys
create table user_stream_key
(
id integer unsigned not null auto_increment primary key,
user_id integer unsigned not null,
`key` text not null,
created timestamp not null default current_timestamp,
expires timestamp,
stream_id varchar(50) not null,
constraint fk_user_stream_key_user
foreign key (user_id) references user (id),
constraint fk_user_stream_key_stream
foreign key (stream_id) references user_stream (id)
);
-- Add payments table for payment logging
create table payment
(
payment_hash binary(32) not null primary key,
user_id integer unsigned not null,
invoice text,
is_paid bool not null default false,
amount bigint unsigned not null,
created timestamp not null default current_timestamp,
nostr text,
payment_type tinyint unsigned not null,
fee bigint unsigned not null default 0,
constraint fk_payment_user
foreign key (user_id) references user (id)
);

View File

@ -1,6 +1,8 @@
use crate::{User, UserStream}; use crate::{
IngestEndpoint, Payment, PaymentType, User, UserStream, UserStreamForward, UserStreamKey,
};
use anyhow::Result; use anyhow::Result;
use sqlx::{Executor, MySqlPool, Row}; use sqlx::{MySqlPool, Row};
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone)] #[derive(Clone)]
@ -53,6 +55,15 @@ impl ZapStreamDb {
Ok(()) Ok(())
} }
/// Mark TOS as accepted for a user
pub async fn accept_tos(&self, uid: u64) -> Result<()> {
sqlx::query("update user set tos_accepted = NOW() where id = ?")
.bind(uid)
.execute(&self.db)
.await?;
Ok(())
}
pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> { pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id") let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id")
.bind(pubkey.as_slice()) .bind(pubkey.as_slice())
@ -153,4 +164,220 @@ impl ZapStreamDb {
Ok(balance) Ok(balance)
} }
/// Create a new forward
pub async fn create_forward(&self, user_id: u64, name: &str, target: &str) -> Result<u64> {
let result =
sqlx::query("insert into user_stream_forward (user_id, name, target) values (?, ?, ?)")
.bind(user_id)
.bind(name)
.bind(target)
.execute(&self.db)
.await?;
Ok(result.last_insert_id())
}
/// Get all forwards for a user
pub async fn get_user_forwards(&self, user_id: u64) -> Result<Vec<UserStreamForward>> {
Ok(
sqlx::query_as("select * from user_stream_forward where user_id = ?")
.bind(user_id)
.fetch_all(&self.db)
.await?,
)
}
/// Delete a forward
pub async fn delete_forward(&self, user_id: u64, forward_id: u64) -> Result<()> {
sqlx::query("delete from user_stream_forward where id = ? and user_id = ?")
.bind(forward_id)
.bind(user_id)
.execute(&self.db)
.await?;
Ok(())
}
/// Create a new stream key
pub async fn create_stream_key(
&self,
user_id: u64,
key: &str,
expires: Option<chrono::DateTime<chrono::Utc>>,
stream_id: &str,
) -> Result<u64> {
let result = sqlx::query(
"insert into user_stream_key (user_id, key, expires, stream_id) values (?, ?, ?, ?)",
)
.bind(user_id)
.bind(key)
.bind(expires)
.bind(stream_id)
.execute(&self.db)
.await?;
Ok(result.last_insert_id())
}
/// Get all stream keys for a user
pub async fn get_user_stream_keys(&self, user_id: u64) -> Result<Vec<UserStreamKey>> {
Ok(
sqlx::query_as("select * from user_stream_key where user_id = ?")
.bind(user_id)
.fetch_all(&self.db)
.await?,
)
}
/// Delete a stream key
pub async fn delete_stream_key(&self, user_id: u64, key_id: u64) -> Result<()> {
sqlx::query("delete from user_stream_key where id = ? and user_id = ?")
.bind(key_id)
.bind(user_id)
.execute(&self.db)
.await?;
Ok(())
}
/// Find user by stream key (including temporary keys)
pub async fn find_user_by_any_stream_key(&self, key: &str) -> Result<Option<u64>> {
#[cfg(feature = "test-pattern")]
if key == "test" {
return Ok(Some(self.upsert_user(&[0; 32]).await?));
}
// First check primary stream key
if let Some(uid) = self.find_user_stream_key(key).await? {
return Ok(Some(uid));
}
// Then check temporary stream keys
Ok(sqlx::query("select user_id from user_stream_key where key = ? and (expires is null or expires > now())")
.bind(key)
.fetch_optional(&self.db)
.await?
.map(|r| r.try_get(0).unwrap()))
}
/// Create a payment record
pub async fn create_payment(
&self,
payment_hash: &[u8],
user_id: u64,
invoice: Option<&str>,
amount: u64,
payment_type: PaymentType,
fee: u64,
) -> Result<()> {
sqlx::query("insert into payment (payment_hash, user_id, invoice, amount, payment_type, fee) values (?, ?, ?, ?, ?, ?)")
.bind(payment_hash)
.bind(user_id)
.bind(invoice)
.bind(amount)
.bind(payment_type)
.bind(fee)
.execute(&self.db)
.await?;
Ok(())
}
/// Mark payment as paid
pub async fn mark_payment_paid(&self, payment_hash: &[u8]) -> Result<()> {
sqlx::query("update payment set is_paid = true where payment_hash = ?")
.bind(payment_hash)
.execute(&self.db)
.await?;
Ok(())
}
/// Update payment fee and mark as paid
pub async fn complete_payment(&self, payment_hash: &[u8], fee: u64) -> Result<()> {
sqlx::query("update payment set fee = ?, is_paid = true where payment_hash = ?")
.bind(fee)
.bind(payment_hash)
.execute(&self.db)
.await?;
Ok(())
}
/// Get payment by hash
pub async fn get_payment(&self, payment_hash: &[u8]) -> Result<Option<Payment>> {
Ok(
sqlx::query_as("select * from payment where payment_hash = ?")
.bind(payment_hash)
.fetch_optional(&self.db)
.await?,
)
}
/// Get payment history for user
pub async fn get_payment_history(
&self,
user_id: u64,
offset: u64,
limit: u64,
) -> Result<Vec<Payment>> {
Ok(sqlx::query_as(
"select * from payment where user_id = ? order by created desc limit ? offset ?",
)
.bind(user_id)
.bind(limit)
.bind(offset)
.fetch_all(&self.db)
.await?)
}
/// Update user default stream info
pub async fn update_user_defaults(
&self,
user_id: u64,
title: Option<&str>,
summary: Option<&str>,
image: Option<&str>,
tags: Option<&str>,
content_warning: Option<&str>,
goal: Option<&str>,
) -> Result<()> {
sqlx::query("update user set title = ?, summary = ?, image = ?, tags = ?, content_warning = ?, goal = ? where id = ?")
.bind(title)
.bind(summary)
.bind(image)
.bind(tags)
.bind(content_warning)
.bind(goal)
.bind(user_id)
.execute(&self.db)
.await?;
Ok(())
}
/// Get all ingest endpoints
pub async fn get_ingest_endpoints(&self) -> Result<Vec<IngestEndpoint>> {
Ok(sqlx::query_as("select * from ingest_endpoint")
.fetch_all(&self.db)
.await?)
}
/// Get ingest endpoint by id
pub async fn get_ingest_endpoint(&self, endpoint_id: u64) -> Result<Option<IngestEndpoint>> {
Ok(sqlx::query_as("select * from ingest_endpoint where id = ?")
.bind(endpoint_id)
.fetch_optional(&self.db)
.await?)
}
/// Create ingest endpoint
pub async fn create_ingest_endpoint(
&self,
name: &str,
cost: u64,
capabilities: Option<&str>,
) -> Result<u64> {
let result =
sqlx::query("insert into ingest_endpoint (name, cost, capabilities) values (?, ?, ?)")
.bind(name)
.bind(cost)
.bind(capabilities)
.execute(&self.db)
.await?;
Ok(result.last_insert_id())
}
} }

View File

@ -22,6 +22,18 @@ pub struct User {
pub is_blocked: bool, pub is_blocked: bool,
/// Streams are recorded /// Streams are recorded
pub recording: bool, pub recording: bool,
/// Default stream title
pub title: Option<String>,
/// Default stream summary
pub summary: Option<String>,
/// Default stream image
pub image: Option<String>,
/// Default tags (comma separated)
pub tags: Option<String>,
/// Default content warning
pub content_warning: Option<String>,
/// Default stream goal
pub goal: Option<String>,
} }
#[derive(Default, Debug, Clone, Type)] #[derive(Default, Debug, Clone, Type)]
@ -64,4 +76,56 @@ pub struct UserStream {
pub duration: f32, pub duration: f32,
pub fee: Option<u32>, pub fee: Option<u32>,
pub event: Option<String>, pub event: Option<String>,
pub endpoint_id: Option<u64>,
pub last_segment: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, FromRow)]
pub struct UserStreamForward {
pub id: u64,
pub user_id: u64,
pub name: String,
pub target: String,
}
#[derive(Debug, Clone, FromRow)]
pub struct UserStreamKey {
pub id: u64,
pub user_id: u64,
pub key: String,
pub created: DateTime<Utc>,
pub expires: Option<DateTime<Utc>>,
pub stream_id: String,
}
#[derive(Default, Debug, Clone, Type)]
#[repr(u8)]
pub enum PaymentType {
#[default]
TopUp = 0,
Zap = 1,
Credit = 2,
Withdrawal = 3,
AdmissionFee = 4,
}
#[derive(Debug, Clone, FromRow)]
pub struct Payment {
pub payment_hash: Vec<u8>,
pub user_id: u64,
pub invoice: Option<String>,
pub is_paid: bool,
pub amount: u64,
pub created: DateTime<Utc>,
pub nostr: Option<String>,
pub payment_type: PaymentType,
pub fee: u64,
}
#[derive(Debug, Clone, FromRow)]
pub struct IngestEndpoint {
pub id: u64,
pub name: String,
pub cost: u64,
pub capabilities: Option<String>, // JSON array stored as string
} }

View File

@ -41,3 +41,4 @@ sha2 = { version = "0.10.8" }
pretty_env_logger = "0.5.0" pretty_env_logger = "0.5.0"
clap = { version = "4.5.16", features = ["derive"] } clap = { version = "4.5.16", features = ["derive"] }
futures-util = "0.3.31" futures-util = "0.3.31"
matchit = "0.8.4"

View File

@ -42,7 +42,6 @@ listen_http: "127.0.0.1:8080"
# #
overseer: overseer:
zap-stream: zap-stream:
cost: 16
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
#blossom: #blossom:
# - "http://localhost:8881" # - "http://localhost:8881"

View File

@ -3,27 +3,62 @@ use crate::settings::Settings;
use crate::ListenerEndpoint; use crate::ListenerEndpoint;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use bytes::Bytes; use bytes::Bytes;
use fedimint_tonic_lnd::tonic::codegen::Body; use chrono::{DateTime, Utc};
use http_body_util::combinators::BoxBody; use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full}; use http_body_util::{BodyExt, Full};
use hyper::body::Incoming; use hyper::body::Incoming;
use hyper::{Method, Request, Response}; use hyper::{Method, Request, Response};
use nostr_sdk::{serde_json, Event, PublicKey}; use matchit::Router;
use nostr_sdk::{serde_json, PublicKey};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::str::FromStr; use std::str::FromStr;
use url::Url; use url::Url;
use uuid::Uuid;
use zap_stream_db::ZapStreamDb; use zap_stream_db::ZapStreamDb;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Route {
Account,
Topup,
Event,
Withdraw,
Forward,
ForwardId,
History,
Keys,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Api { pub struct Api {
db: ZapStreamDb, db: ZapStreamDb,
settings: Settings, settings: Settings,
lnd: fedimint_tonic_lnd::Client,
router: Router<Route>,
} }
impl Api { impl Api {
pub fn new(db: ZapStreamDb, settings: Settings) -> Self { pub fn new(db: ZapStreamDb, settings: Settings, lnd: fedimint_tonic_lnd::Client) -> Self {
Self { db, settings } let mut router = Router::new();
// Define routes (path only, method will be matched separately)
router.insert("/api/v1/account", Route::Account).unwrap();
router.insert("/api/v1/topup", Route::Topup).unwrap();
router.insert("/api/v1/event", Route::Event).unwrap();
router.insert("/api/v1/withdraw", Route::Withdraw).unwrap();
router.insert("/api/v1/forward", Route::Forward).unwrap();
router
.insert("/api/v1/forward/{id}", Route::ForwardId)
.unwrap();
router.insert("/api/v1/history", Route::History).unwrap();
router.insert("/api/v1/keys", Route::Keys).unwrap();
Self {
db,
settings,
lnd,
router,
}
} }
pub async fn handler( pub async fn handler(
@ -32,60 +67,111 @@ impl Api {
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> { ) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
let base = Response::builder() let base = Response::builder()
.header("server", "zap-stream") .header("server", "zap-stream")
.header("content-type", "application/json")
.header("access-control-allow-origin", "*") .header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*") .header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET"); .header(
"access-control-allow-methods",
"HEAD, GET, PATCH, DELETE, POST, OPTIONS",
);
Ok(match (req.method(), req.uri().path()) { // Handle OPTIONS requests
(&Method::GET, "/api/v1/account") => { if req.method() == Method::OPTIONS {
let auth = check_nip98_auth(&req)?; return Ok(base.body(Default::default())?);
let rsp = self.get_account(&auth.pubkey).await?; }
return Ok(base.body(Self::body_json(&rsp)?)?);
} // Route matching
(&Method::PATCH, "/api/v1/account") => { let path = req.uri().path();
let auth = check_nip98_auth(&req)?; let matched = self.router.at(path);
let body = req.collect().await?.to_bytes();
let r_body: PatchAccount = serde_json::from_slice(&body)?; if let Ok(matched) = matched {
let rsp = self.update_account(&auth.pubkey, r_body).await?; let route = *matched.value;
return Ok(base.body(Self::body_json(&rsp)?)?); let params = matched.params;
}
(&Method::GET, "/api/v1/topup") => { match (req.method(), route) {
let auth = check_nip98_auth(&req)?; (&Method::GET, Route::Account) => {
let url: Url = req.uri().to_string().parse()?; let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let amount: usize = url let rsp = self.get_account(&auth.pubkey).await?;
.query_pairs() Ok(base.body(Self::body_json(&rsp)?)?)
.find_map(|(k, v)| if k == "amount" { Some(v) } else { None })
.and_then(|v| v.parse().ok())
.ok_or(anyhow!("Missing amount"))?;
let rsp = self.topup(&auth.pubkey, amount).await?;
return Ok(base.body(Self::body_json(&rsp)?)?);
}
(&Method::PATCH, "/api/v1/event") => {
bail!("Not implemented")
}
(&Method::POST, "/api/v1/withdraw") => {
bail!("Not implemented")
}
(&Method::POST, "/api/v1/account/forward") => {
bail!("Not implemented")
}
(&Method::DELETE, "/api/v1/account/forward/<id>") => {
bail!("Not implemented")
}
(&Method::GET, "/api/v1/account/history") => {
bail!("Not implemented")
}
(&Method::GET, "/api/v1/account/keys") => {
bail!("Not implemented")
}
_ => {
if req.method() == Method::OPTIONS {
base.body(Default::default())?
} else {
base.status(404).body(Default::default())?
} }
(&Method::PATCH, Route::Account) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let body = req.collect().await?.to_bytes();
let r_body: PatchAccount = serde_json::from_slice(&body)?;
let rsp = self.update_account(&auth.pubkey, r_body).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::GET, Route::Topup) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let url: Url = req.uri().to_string().parse()?;
let amount: usize = url
.query_pairs()
.find_map(|(k, v)| if k == "amount" { Some(v) } else { None })
.and_then(|v| v.parse().ok())
.ok_or(anyhow!("Missing amount"))?;
let rsp = self.topup(&auth.pubkey, amount).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::PATCH, Route::Event) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let body = req.collect().await?.to_bytes();
let patch_event: PatchEvent = serde_json::from_slice(&body)?;
let rsp = self.update_event(&auth.pubkey, patch_event).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::POST, Route::Withdraw) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let url: Url = req.uri().to_string().parse()?;
let invoice = url
.query_pairs()
.find_map(|(k, v)| {
if k == "invoice" {
Some(v.to_string())
} else {
None
}
})
.ok_or(anyhow!("Missing invoice parameter"))?;
let rsp = self.withdraw(&auth.pubkey, invoice).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::POST, Route::Forward) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let body = req.collect().await?.to_bytes();
let forward_req: ForwardRequest = serde_json::from_slice(&body)?;
let rsp = self.create_forward(&auth.pubkey, forward_req).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::DELETE, Route::ForwardId) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let forward_id = params
.get("id")
.ok_or_else(|| anyhow!("Missing forward ID"))?;
let rsp = self.delete_forward(&auth.pubkey, forward_id).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::GET, Route::History) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let rsp = self.get_account_history(&auth.pubkey).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::GET, Route::Keys) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let rsp = self.get_account_keys(&auth.pubkey).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
(&Method::POST, Route::Keys) => {
let auth = check_nip98_auth(&req, &self.settings.public_url)?;
let body = req.collect().await?.to_bytes();
let create_req: CreateStreamKeyRequest = serde_json::from_slice(&body)?;
let rsp = self.create_stream_key(&auth.pubkey, create_req).await?;
Ok(base.body(Self::body_json(&rsp)?)?)
}
_ => Ok(base.status(405).body(Default::default())?), // Method not allowed
} }
}) } else {
Ok(base.status(404).body(Default::default())?) // Not found
}
} }
fn body_json<T: Serialize>(obj: &T) -> Result<BoxBody<Bytes, anyhow::Error>> { fn body_json<T: Serialize>(obj: &T) -> Result<BoxBody<Bytes, anyhow::Error>> {
@ -98,79 +184,435 @@ impl Api {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
let user = self.db.get_user(uid).await?; let user = self.db.get_user(uid).await?;
Ok(AccountInfo { // Get user forwards
endpoints: self let forwards = self.db.get_user_forwards(uid).await?;
.settings
.endpoints // Get ingest endpoints from database
.iter() let db_ingest_endpoints = self.db.get_ingest_endpoints().await?;
.filter_map(|e| match ListenerEndpoint::from_str(&e).ok()? {
// Create 2D array: settings endpoints × database ingest endpoints
let mut endpoints = Vec::new();
for setting_endpoint in &self.settings.endpoints {
if let Ok(listener_endpoint) = ListenerEndpoint::from_str(&setting_endpoint) {
match listener_endpoint {
ListenerEndpoint::SRT { endpoint } => { ListenerEndpoint::SRT { endpoint } => {
let addr: SocketAddr = endpoint.parse().ok()?; if let Ok(addr) = endpoint.parse::<SocketAddr>() {
Some(Endpoint { for ingest in &db_ingest_endpoints {
name: "SRT".to_string(), endpoints.push(Endpoint {
url: format!( name: format!("SRT-{}", ingest.name),
"srt://{}:{}", url: format!(
self.settings.endpoints_public_hostname, "srt://{}:{}",
addr.port() self.settings.endpoints_public_hostname,
), addr.port()
key: user.stream_key.clone(), ),
capabilities: vec![], key: user.stream_key.clone(),
}) capabilities: ingest
.capabilities
.as_ref()
.map(|c| {
c.split(',').map(|s| s.trim().to_string()).collect()
})
.unwrap_or_else(Vec::new),
cost: EndpointCost {
unit: "min".to_string(),
rate: ingest.cost as f32 / 1000.0,
},
});
}
}
} }
ListenerEndpoint::RTMP { endpoint } => { ListenerEndpoint::RTMP { endpoint } => {
let addr: SocketAddr = endpoint.parse().ok()?; if let Ok(addr) = endpoint.parse::<SocketAddr>() {
Some(Endpoint { for ingest in &db_ingest_endpoints {
name: "RTMP".to_string(), endpoints.push(Endpoint {
url: format!( name: format!("RTMP-{}", ingest.name),
"rtmp://{}:{}", url: format!(
self.settings.endpoints_public_hostname, "rtmp://{}:{}",
addr.port() self.settings.endpoints_public_hostname,
), addr.port()
key: user.stream_key.clone(), ),
capabilities: vec![], key: user.stream_key.clone(),
}) capabilities: ingest
.capabilities
.as_ref()
.map(|c| {
c.split(',').map(|s| s.trim().to_string()).collect()
})
.unwrap_or_else(Vec::new),
cost: EndpointCost {
unit: "min".to_string(),
rate: ingest.cost as f32 / 1000.0,
},
});
}
}
} }
ListenerEndpoint::TCP { endpoint } => { ListenerEndpoint::TCP { endpoint } => {
let addr: SocketAddr = endpoint.parse().ok()?; if let Ok(addr) = endpoint.parse::<SocketAddr>() {
Some(Endpoint { for ingest in &db_ingest_endpoints {
name: "TCP".to_string(), endpoints.push(Endpoint {
url: format!( name: format!("TCP-{}", ingest.name),
"tcp://{}:{}", url: format!(
self.settings.endpoints_public_hostname, "tcp://{}:{}",
addr.port() self.settings.endpoints_public_hostname,
), addr.port()
key: user.stream_key.clone(), ),
capabilities: vec![], key: user.stream_key.clone(),
}) capabilities: ingest
.capabilities
.as_ref()
.map(|c| {
c.split(',').map(|s| s.trim().to_string()).collect()
})
.unwrap_or_else(Vec::new),
cost: EndpointCost {
unit: "min".to_string(),
rate: ingest.cost as f32 / 1000.0,
},
});
}
}
} }
ListenerEndpoint::File { .. } => None, ListenerEndpoint::File { .. } => {}
ListenerEndpoint::TestPattern => None, ListenerEndpoint::TestPattern => {}
}) }
.collect(), }
event: None, }
Ok(AccountInfo {
endpoints,
balance: user.balance as u64, balance: user.balance as u64,
tos: AccountTos { tos: AccountTos {
accepted: user.tos_accepted.is_some(), accepted: user.tos_accepted.is_some(),
link: "https://zap.stream/tos".to_string(), link: "https://zap.stream/tos".to_string(),
}, },
forwards: forwards
.into_iter()
.map(|f| ForwardDest {
id: f.id,
name: f.name,
})
.collect(),
details: Some(PatchEventDetails {
title: user.title,
summary: user.summary,
image: user.image,
tags: user
.tags
.map(|t| t.split(',').map(|s| s.to_string()).collect()),
content_warning: user.content_warning,
goal: user.goal,
}),
}) })
} }
async fn update_account(&self, pubkey: &PublicKey, account: PatchAccount) -> Result<()> { async fn update_account(&self, pubkey: &PublicKey, account: PatchAccount) -> Result<()> {
bail!("Not implemented") let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
if let Some(accept_tos) = account.accept_tos {
if accept_tos {
let user = self.db.get_user(uid).await?;
if user.tos_accepted.is_none() {
self.db.accept_tos(uid).await?;
}
}
}
Ok(())
} }
async fn topup(&self, pubkey: &PublicKey, amount: usize) -> Result<TopupResponse> { async fn topup(&self, pubkey: &PublicKey, amount: usize) -> Result<TopupResponse> {
bail!("Not implemented") let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
// Create Lightning invoice
let invoice_req = fedimint_tonic_lnd::lnrpc::Invoice {
value: amount as i64,
memo: format!(
"zap.stream topup for user {}",
hex::encode(pubkey.to_bytes())
),
..Default::default()
};
let response = self
.lnd
.clone()
.lightning()
.add_invoice(invoice_req)
.await?;
let invoice_response = response.into_inner();
// Create payment entry for this topup invoice
let payment_hash = hex::decode(&invoice_response.r_hash)?;
self.db
.create_payment(
&payment_hash,
uid,
Some(&invoice_response.payment_request),
amount as u64 * 1000, // Convert to milli-sats
zap_stream_db::PaymentType::TopUp,
0,
)
.await?;
Ok(TopupResponse {
pr: invoice_response.payment_request,
})
}
async fn update_event(&self, pubkey: &PublicKey, patch_event: PatchEvent) -> Result<()> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
if let Some(stream_id) = patch_event.id {
// Update specific stream
let stream_uuid = Uuid::parse_str(&stream_id)?;
let mut stream = self.db.get_stream(&stream_uuid).await?;
// Verify user owns this stream
if stream.user_id != uid {
bail!("Unauthorized: Stream belongs to different user");
}
// Update stream with patch data
if let Some(title) = patch_event.title {
stream.title = Some(title);
}
if let Some(summary) = patch_event.summary {
stream.summary = Some(summary);
}
if let Some(image) = patch_event.image {
stream.image = Some(image);
}
if let Some(tags) = patch_event.tags {
stream.tags = Some(tags.join(","));
}
if let Some(content_warning) = patch_event.content_warning {
stream.content_warning = Some(content_warning);
}
if let Some(goal) = patch_event.goal {
stream.goal = Some(goal);
}
self.db.update_stream(&stream).await?;
// TODO: Update the nostr event and republish like C# version
} else {
// Update user default stream info
self.db
.update_user_defaults(
uid,
patch_event.title.as_deref(),
patch_event.summary.as_deref(),
patch_event.image.as_deref(),
patch_event.tags.as_ref().map(|t| t.join(",")).as_deref(),
patch_event.content_warning.as_deref(),
patch_event.goal.as_deref(),
)
.await?;
}
Ok(())
}
async fn withdraw(&self, pubkey: &PublicKey, invoice: String) -> Result<WithdrawResponse> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
let user = self.db.get_user(uid).await?;
let mut lnd = self.lnd.clone();
// Decode invoice to get amount and payment hash
let decode_req = fedimint_tonic_lnd::lnrpc::PayReqString {
pay_req: invoice.clone(),
};
let decode_response = lnd.lightning().decode_pay_req(decode_req).await?;
let decoded = decode_response.into_inner();
let invoice_amount = decoded.num_msat as u64;
let payment_hash = hex::decode(decoded.payment_hash)?;
// Check if user has sufficient balance
if user.balance < invoice_amount as i64 {
bail!("Insufficient balance");
}
// 1. Deduct balance first (safer approach)
self.db
.update_user_balance(uid, -(invoice_amount as i64))
.await?;
// 2. Create payment record
self.db
.create_payment(
&payment_hash,
uid,
Some(&invoice),
invoice_amount,
zap_stream_db::PaymentType::Withdrawal,
0,
)
.await?;
// 3. Attempt Lightning payment
let send_req = fedimint_tonic_lnd::lnrpc::SendRequest {
payment_request: invoice.clone(),
..Default::default()
};
let response = lnd.lightning().send_payment_sync(send_req).await;
match response {
Ok(resp) => {
let payment_response = resp.into_inner();
if payment_response.payment_error.is_empty() {
// Payment successful
let fee = payment_response
.payment_route
.map(|r| r.total_fees_msat)
.unwrap_or(0);
// Update payment record with fee and mark as paid
self.db.complete_payment(&payment_hash, fee as u64).await?;
// Deduct additional fee if any
if fee > 0 {
self.db.update_user_balance(uid, -fee).await?;
}
Ok(WithdrawResponse {
fee,
preimage: hex::encode(payment_response.payment_preimage),
})
} else {
// Payment failed, reverse balance deduction
self.db
.update_user_balance(uid, invoice_amount as i64)
.await?;
bail!("Payment failed: {}", payment_response.payment_error);
}
}
Err(e) => {
// Payment failed, reverse balance deduction
self.db
.update_user_balance(uid, invoice_amount as i64)
.await?;
bail!("Payment failed: {}", e);
}
}
}
async fn create_forward(
&self,
pubkey: &PublicKey,
req: ForwardRequest,
) -> Result<ForwardResponse> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
let forward_id = self.db.create_forward(uid, &req.name, &req.target).await?;
Ok(ForwardResponse { id: forward_id })
}
async fn delete_forward(&self, pubkey: &PublicKey, forward_id: &str) -> Result<()> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
let forward_id: u64 = forward_id.parse()?;
self.db.delete_forward(uid, forward_id).await?;
Ok(())
}
async fn get_account_history(&self, pubkey: &PublicKey) -> Result<HistoryResponse> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
// For now, just get first page with default page size
let payments = self.db.get_payment_history(uid, 0, 100).await?;
let items = payments
.into_iter()
.map(|p| HistoryEntry {
payment_hash: hex::encode(p.payment_hash),
amount: p.amount as i64,
timestamp: p.created.timestamp(),
payment_type: match p.payment_type {
zap_stream_db::PaymentType::TopUp => "topup".to_string(),
zap_stream_db::PaymentType::Zap => "zap".to_string(),
zap_stream_db::PaymentType::Credit => "credit".to_string(),
zap_stream_db::PaymentType::Withdrawal => "withdrawal".to_string(),
zap_stream_db::PaymentType::AdmissionFee => "admission_fee".to_string(),
},
is_paid: p.is_paid,
fee: p.fee,
})
.collect();
Ok(HistoryResponse {
items,
page: 0,
page_size: 100,
})
}
async fn get_account_keys(&self, pubkey: &PublicKey) -> Result<Vec<StreamKey>> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
let keys = self.db.get_user_stream_keys(uid).await?;
Ok(keys
.into_iter()
.map(|k| StreamKey {
id: k.id,
key: k.key,
created: k.created.timestamp(),
expires: k.expires.map(|e| e.timestamp()),
stream_id: k.stream_id,
})
.collect())
}
async fn create_stream_key(
&self,
pubkey: &PublicKey,
req: CreateStreamKeyRequest,
) -> Result<CreateStreamKeyResponse> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
// Create a new stream record for this key
let stream_id = Uuid::new_v4();
let new_stream = zap_stream_db::UserStream {
id: stream_id.to_string(),
user_id: uid,
starts: Utc::now(),
state: zap_stream_db::UserStreamState::Planned,
title: req.event.title,
summary: req.event.summary,
image: req.event.image,
tags: req.event.tags.map(|t| t.join(",")),
content_warning: req.event.content_warning,
goal: req.event.goal,
..Default::default()
};
// Create the stream record
self.db.insert_stream(&new_stream).await?;
// Generate a new stream key
let key = Uuid::new_v4().to_string();
let _key_id = self
.db
.create_stream_key(uid, &key, req.expires, &stream_id.to_string())
.await?;
// For now, return minimal response - event building would require nostr integration
Ok(CreateStreamKeyResponse {
key,
event: None, // TODO: Build proper nostr event like C# version
})
} }
} }
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
struct AccountInfo { struct AccountInfo {
pub endpoints: Vec<Endpoint>, pub endpoints: Vec<Endpoint>,
pub event: Option<Event>,
pub balance: u64, pub balance: u64,
pub tos: AccountTos, pub tos: AccountTos,
pub forwards: Vec<ForwardDest>,
pub details: Option<PatchEventDetails>,
} }
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
@ -179,12 +621,13 @@ struct Endpoint {
pub url: String, pub url: String,
pub key: String, pub key: String,
pub capabilities: Vec<String>, pub capabilities: Vec<String>,
pub cost: EndpointCost,
} }
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
struct EndpointCost { struct EndpointCost {
pub unit: String, pub unit: String,
pub rate: u16, pub rate: f32,
} }
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
@ -202,3 +645,91 @@ struct PatchAccount {
struct TopupResponse { struct TopupResponse {
pub pr: String, pub pr: String,
} }
#[derive(Deserialize, Serialize)]
struct WithdrawRequest {
pub payment_request: String,
pub amount: u64,
}
#[derive(Deserialize, Serialize)]
struct WithdrawResponse {
pub fee: i64,
pub preimage: String,
}
#[derive(Deserialize, Serialize)]
struct ForwardRequest {
pub name: String,
pub target: String,
}
#[derive(Deserialize, Serialize)]
struct ForwardResponse {
pub id: u64,
}
#[derive(Deserialize, Serialize)]
struct HistoryEntry {
pub payment_hash: String,
pub amount: i64,
pub timestamp: i64,
pub payment_type: String,
pub is_paid: bool,
pub fee: u64,
}
#[derive(Deserialize, Serialize)]
struct HistoryResponse {
pub items: Vec<HistoryEntry>,
pub page: i32,
pub page_size: i32,
}
#[derive(Deserialize, Serialize)]
struct StreamKey {
pub id: u64,
pub key: String,
pub created: i64,
pub expires: Option<i64>,
pub stream_id: String,
}
#[derive(Deserialize, Serialize)]
struct CreateStreamKeyRequest {
pub event: PatchEventDetails,
pub expires: Option<DateTime<Utc>>,
}
#[derive(Deserialize, Serialize)]
struct CreateStreamKeyResponse {
pub key: String,
pub event: Option<String>,
}
#[derive(Deserialize, Serialize)]
struct PatchEvent {
pub id: Option<String>,
pub title: Option<String>,
pub summary: Option<String>,
pub image: Option<String>,
pub tags: Option<Vec<String>>,
pub content_warning: Option<String>,
pub goal: Option<String>,
}
#[derive(Deserialize, Serialize)]
struct PatchEventDetails {
pub title: Option<String>,
pub summary: Option<String>,
pub image: Option<String>,
pub tags: Option<Vec<String>>,
pub content_warning: Option<String>,
pub goal: Option<String>,
}
#[derive(Deserialize, Serialize)]
struct ForwardDest {
pub id: u64,
pub name: String,
}

View File

@ -1,23 +1,21 @@
use crate::api::Api; use crate::api::Api;
use crate::overseer::ZapStreamOverseer;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use base64::Engine; use base64::Engine;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use http_body_util::combinators::BoxBody; use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody}; use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{Frame, Incoming}; use hyper::body::{Frame, Incoming};
use hyper::service::Service; use hyper::service::Service;
use hyper::{Method, Request, Response}; use hyper::{Method, Request, Response};
use log::{error, info}; use log::error;
use nostr_sdk::{serde_json, Event}; use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind};
use std::future::Future; use std::future::Future;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use tokio::fs::File; use tokio::fs::File;
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use zap_stream_core::overseer::Overseer;
#[derive(Clone)] #[derive(Clone)]
pub struct HttpServer { pub struct HttpServer {
@ -98,7 +96,13 @@ impl Service<Request<Incoming>> for HttpServer {
} }
} }
pub fn check_nip98_auth(req: &Request<Incoming>) -> Result<Event> { #[derive(Debug, Clone)]
pub struct AuthResult {
pub pubkey: PublicKey,
pub event: Event,
}
pub fn check_nip98_auth(req: &Request<Incoming>, public_url: &str) -> Result<AuthResult> {
let auth = if let Some(a) = req.headers().get("authorization") { let auth = if let Some(a) = req.headers().get("authorization") {
a.to_str()? a.to_str()?
} else { } else {
@ -109,10 +113,68 @@ pub fn check_nip98_auth(req: &Request<Incoming>) -> Result<Event> {
bail!("Invalid authorization scheme"); bail!("Invalid authorization scheme");
} }
let json = let token = &auth[6..];
String::from_utf8(base64::engine::general_purpose::STANDARD.decode(auth[6..].as_bytes())?)?; let decoded = base64::engine::general_purpose::STANDARD.decode(token.as_bytes())?;
info!("{}", json);
// TODO: check tags // Check if decoded data starts with '{'
Ok(serde_json::from_str::<Event>(&json)?) if decoded.is_empty() || decoded[0] != b'{' {
bail!("Invalid token");
}
let json = String::from_utf8(decoded)?;
let event: Event = serde_json::from_str(&json)?;
// Verify signature
if !event.verify().is_ok() {
bail!("Invalid nostr event, invalid signature");
}
// Check event kind (NIP-98: HTTP Auth, kind 27235)
if event.kind != Kind::Custom(27235) {
bail!("Invalid nostr event, wrong kind");
}
// Check timestamp (within 120 seconds)
let now = Utc::now();
let event_time = DateTime::from_timestamp(event.created_at.as_u64() as i64, 0)
.ok_or_else(|| anyhow::anyhow!("Invalid timestamp"))?;
let diff_seconds = (now - event_time).num_seconds().abs();
if diff_seconds > 120 {
bail!("Invalid nostr event, timestamp out of range");
}
// Check URL tag (full URI)
let url_tag = event
.tags
.iter()
.find(|tag| tag.kind() == TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::U)))
.and_then(|tag| tag.content())
.ok_or_else(|| anyhow::anyhow!("Missing URL tag"))?;
// Construct full URI using public_url + path + query
let request_uri = match req.uri().query() {
Some(query) => format!("{}{}?{}", public_url.trim_end_matches('/'), req.uri().path(), query),
None => format!("{}{}", public_url.trim_end_matches('/'), req.uri().path()),
};
if !url_tag.eq_ignore_ascii_case(&request_uri) {
bail!("Invalid nostr event, URL tag invalid. Expected: {}, Got: {}", request_uri, url_tag);
}
// Check method tag
let method_tag = event
.tags
.iter()
.find(|tag| tag.kind() == TagKind::Method)
.and_then(|tag| tag.content())
.ok_or_else(|| anyhow::anyhow!("Missing method tag"))?;
if !method_tag.eq_ignore_ascii_case(req.method().as_str()) {
bail!("Invalid nostr event, method tag invalid");
}
Ok(AuthResult {
pubkey: event.pubkey.clone(),
event,
})
} }

View File

@ -71,7 +71,7 @@ async fn main() -> Result<()> {
let http_addr: SocketAddr = settings.listen_http.parse()?; let http_addr: SocketAddr = settings.listen_http.parse()?;
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
let api = Api::new(overseer.database(), settings.clone()); let api = Api::new(overseer.database(), settings.clone(), overseer.lnd_client());
// HTTP server // HTTP server
let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api); let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api);
tasks.push(tokio::spawn(async move { tasks.push(tokio::spawn(async move {

View File

@ -8,7 +8,6 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use log::{error, info, warn}; use log::{error, info, warn};
use nostr_sdk::prelude::Coordinate; use nostr_sdk::prelude::Coordinate;
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
use serde::Serialize;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
@ -44,8 +43,6 @@ pub struct ZapStreamOverseer {
blossom_servers: Vec<Blossom>, blossom_servers: Vec<Blossom>,
/// Public facing URL pointing to [out_dir] /// Public facing URL pointing to [out_dir]
public_url: String, public_url: String,
/// Cost / second / variant
cost: i64,
/// Currently active streams /// Currently active streams
/// Any streams which are not contained in this set are dead /// Any streams which are not contained in this set are dead
active_streams: Arc<RwLock<HashSet<Uuid>>>, active_streams: Arc<RwLock<HashSet<Uuid>>>,
@ -60,7 +57,6 @@ impl ZapStreamOverseer {
lnd: &LndSettings, lnd: &LndSettings,
relays: &Vec<String>, relays: &Vec<String>,
blossom_servers: &Option<Vec<String>>, blossom_servers: &Option<Vec<String>>,
cost: i64,
) -> Result<Self> { ) -> Result<Self> {
let db = ZapStreamDb::new(db).await?; let db = ZapStreamDb::new(db).await?;
db.migrate().await?; db.migrate().await?;
@ -112,15 +108,18 @@ impl ZapStreamOverseer {
.map(|b| Blossom::new(b)) .map(|b| Blossom::new(b))
.collect(), .collect(),
public_url: public_url.clone(), public_url: public_url.clone(),
cost,
active_streams: Arc::new(RwLock::new(HashSet::new())), active_streams: Arc::new(RwLock::new(HashSet::new())),
}) })
} }
pub(crate) fn database(&self) -> ZapStreamDb { pub fn database(&self) -> ZapStreamDb {
self.db.clone() self.db.clone()
} }
pub fn lnd_client(&self) -> fedimint_tonic_lnd::Client {
self.lnd.clone()
}
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> { fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
let mut tags = vec![ let mut tags = vec![
Tag::parse(&["d".to_string(), stream.id.to_string()])?, Tag::parse(&["d".to_string(), stream.id.to_string()])?,
@ -224,15 +223,6 @@ impl ZapStreamOverseer {
} }
} }
#[derive(Serialize)]
struct Endpoint {}
#[derive(Serialize)]
struct AccountInfo {
pub endpoints: Vec<Endpoint>,
pub event: Event,
pub balance: u64,
}
#[async_trait] #[async_trait]
impl Overseer for ZapStreamOverseer { impl Overseer for ZapStreamOverseer {
async fn check_streams(&self) -> Result<()> { async fn check_streams(&self) -> Result<()> {
@ -270,7 +260,19 @@ impl Overseer for ZapStreamOverseer {
bail!("Not enough balance"); bail!("Not enough balance");
} }
let variants = get_default_variants(&stream_info)?; // Get ingest endpoint configuration based on connection type
let endpoint_id = self.detect_endpoint(&connection).await?;
let endpoint = if let Some(id) = endpoint_id {
self.db.get_ingest_endpoint(id).await?
} else {
None
};
let variants = if let Some(endpoint) = &endpoint {
get_variants_from_endpoint(&stream_info, endpoint)?
} else {
get_default_variants(&stream_info)?
};
let mut egress = vec![]; let mut egress = vec![];
egress.push(EgressType::HLS(EgressConfig { egress.push(EgressType::HLS(EgressConfig {
@ -285,6 +287,7 @@ impl Overseer for ZapStreamOverseer {
user_id: uid, user_id: uid,
starts: Utc::now(), starts: Utc::now(),
state: UserStreamState::Live, state: UserStreamState::Live,
endpoint_id,
..Default::default() ..Default::default()
}; };
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
@ -312,7 +315,21 @@ impl Overseer for ZapStreamOverseer {
let stream = self.db.get_stream(pipeline_id).await?; let stream = self.db.get_stream(pipeline_id).await?;
let duration = added.iter().fold(0.0, |acc, v| acc + v.duration); let duration = added.iter().fold(0.0, |acc, v| acc + v.duration);
let cost = self.cost * duration.round() as i64;
// Get the cost per minute from the ingest endpoint, or use default
let cost_per_minute = if let Some(endpoint_id) = stream.endpoint_id {
if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? {
endpoint.cost
} else {
0
}
} else {
0
};
// Convert duration from seconds to minutes and calculate cost
let duration_minutes = duration / 60.0;
let cost = (cost_per_minute as f32 * duration_minutes).round() as i64;
let bal = self let bal = self
.db .db
.tick_stream(pipeline_id, stream.user_id, duration, cost) .tick_stream(pipeline_id, stream.user_id, duration, cost)
@ -455,3 +472,141 @@ fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
Ok(vars) Ok(vars)
} }
impl ZapStreamOverseer {
/// Detect which ingest endpoint should be used based on connection info
async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result<Option<u64>> {
// Get all ingest endpoints and match by name against connection endpoint
let endpoints = self.db.get_ingest_endpoints().await?;
for endpoint in endpoints {
if endpoint.name == connection.endpoint {
return Ok(Some(endpoint.id));
}
}
// No matching endpoint found
Ok(None)
}
}
fn get_variants_from_endpoint(
info: &IngressInfo,
endpoint: &zap_stream_db::IngestEndpoint,
) -> Result<Vec<VariantStream>> {
let capabilities_str = endpoint.capabilities.as_deref().unwrap_or("");
let capabilities: Vec<&str> = capabilities_str.split(',').collect();
let mut vars: Vec<VariantStream> = vec![];
let video_src = info
.streams
.iter()
.find(|c| c.stream_type == IngressStreamType::Video);
let audio_src = info
.streams
.iter()
.find(|c| c.stream_type == IngressStreamType::Audio);
// Parse all variant capabilities and create grouped variants
let mut group_id = 0usize;
let mut dst_index = 0;
for capability in capabilities {
let parts: Vec<&str> = capability.split(':').collect();
if parts.len() >= 2 && parts[0] == "variant" && parts[1] == "source" {
// Add copy variant (group for source)
if let Some(video_src) = video_src {
vars.push(VariantStream::CopyVideo(VariantMapping {
id: Uuid::new_v4(),
src_index: video_src.index,
dst_index,
group_id,
}));
dst_index += 1;
}
if let Some(audio_src) = audio_src {
vars.push(VariantStream::CopyAudio(VariantMapping {
id: Uuid::new_v4(),
src_index: audio_src.index,
dst_index,
group_id,
}));
dst_index += 1;
}
group_id += 1;
} else if parts.len() >= 3 && parts[0] == "variant" {
if let (Ok(target_height), Ok(bitrate)) =
(parts[1].parse::<u32>(), parts[2].parse::<u32>())
{
// Add video variant for this group
if let Some(video_src) = video_src {
// Calculate dimensions maintaining aspect ratio
let input_width = video_src.width as f32;
let input_height = video_src.height as f32;
let aspect_ratio = input_width / input_height;
let output_height = target_height;
let output_width = (output_height as f32 * aspect_ratio).round() as u16;
// Ensure even dimensions for H.264 compatibility
let output_width = if output_width % 2 == 1 {
output_width + 1
} else {
output_width
};
let output_height = if output_height % 2 == 1 {
output_height + 1
} else {
output_height
} as u16;
vars.push(VariantStream::Video(VideoVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: video_src.index,
dst_index,
group_id,
},
width: output_width,
height: output_height,
fps: video_src.fps,
bitrate: bitrate as u64,
codec: "libx264".to_string(),
profile: 77, // AV_PROFILE_H264_MAIN
level: 51,
keyframe_interval: video_src.fps as u16 * 2,
pixel_format: AV_PIX_FMT_YUV420P as u32,
}));
dst_index += 1;
}
// Add audio variant for the same group
if let Some(audio_src) = audio_src {
vars.push(VariantStream::Audio(AudioVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: audio_src.index,
dst_index,
group_id,
},
bitrate: 192_000,
codec: "aac".to_string(),
channels: 2,
sample_rate: 48_000,
sample_fmt: "fltp".to_owned(),
}));
dst_index += 1;
}
group_id += 1;
}
}
// Handle other capabilities like dvr:720h here if needed
}
Ok(vars)
}

View File

@ -49,8 +49,6 @@ pub enum OverseerConfig {
nsec: String, nsec: String,
/// Blossom servers /// Blossom servers
blossom: Option<Vec<String>>, blossom: Option<Vec<String>>,
/// Cost (milli-sats) / second / variant
cost: i64,
}, },
} }
@ -70,7 +68,6 @@ impl Settings {
lnd, lnd,
relays, relays,
blossom, blossom,
cost,
} => Ok(Arc::new( } => Ok(Arc::new(
ZapStreamOverseer::new( ZapStreamOverseer::new(
&self.output_dir, &self.output_dir,
@ -80,7 +77,6 @@ impl Settings {
lnd, lnd,
relays, relays,
blossom, blossom,
*cost,
) )
.await?, .await?,
)), )),