use crate::{ IngestEndpoint, Payment, PaymentType, User, UserStream, UserStreamForward, UserStreamKey, }; use anyhow::Result; use sqlx::{MySqlPool, Row}; use uuid::Uuid; #[derive(Clone)] pub struct ZapStreamDb { db: MySqlPool, } impl ZapStreamDb { pub async fn new(db: &str) -> Result { let db = MySqlPool::connect(db).await?; Ok(ZapStreamDb { db }) } pub async fn migrate(&self) -> Result<()> { sqlx::migrate!().run(&self.db).await?; Ok(()) } /// Find user by stream key, typical first lookup from ingress pub async fn find_user_stream_key(&self, key: &str) -> Result> { #[cfg(feature = "test-pattern")] if key == "test" { // use the 00 pubkey for test sources return Ok(Some(self.upsert_user(&[0; 32]).await?)); } Ok(sqlx::query("select id from user where stream_key = ?") .bind(key) .fetch_optional(&self.db) .await? .map(|r| r.try_get(0).unwrap())) } /// Get user by id pub async fn get_user(&self, uid: u64) -> Result { Ok(sqlx::query_as("select * from user where id = ?") .bind(uid) .fetch_one(&self.db) .await .map_err(anyhow::Error::new)?) } /// Update a users balance pub async fn update_user_balance(&self, uid: u64, diff: i64) -> Result<()> { sqlx::query("update user set balance = balance + ? where id = ?") .bind(diff) .bind(uid) .execute(&self.db) .await?; Ok(()) } /// Mark TOS as accepted for a user pub async fn accept_tos(&self, uid: u64) -> Result<()> { sqlx::query("update user set tos_accepted = NOW() where id = ?") .bind(uid) .execute(&self.db) .await?; Ok(()) } pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result { let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id") .bind(pubkey.as_slice()) .fetch_optional(&self.db) .await?; match res { None => sqlx::query("select id from user where pubkey = ?") .bind(pubkey.as_slice()) .fetch_one(&self.db) .await? .try_get(0) .map_err(anyhow::Error::new), Some(res) => res.try_get(0).map_err(anyhow::Error::new), } } pub async fn insert_stream(&self, user_stream: &UserStream) -> Result<()> { sqlx::query("insert into user_stream (id, user_id, state, starts) values (?, ?, ?, ?)") .bind(&user_stream.id) .bind(&user_stream.user_id) .bind(&user_stream.state) .bind(&user_stream.starts) .execute(&self.db) .await?; Ok(()) } pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> { sqlx::query( "update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ?, endpoint_id = ? where id = ?", ) .bind(&user_stream.state) .bind(&user_stream.starts) .bind(&user_stream.ends) .bind(&user_stream.title) .bind(&user_stream.summary) .bind(&user_stream.image) .bind(&user_stream.thumb) .bind(&user_stream.tags) .bind(&user_stream.content_warning) .bind(&user_stream.goal) .bind(&user_stream.pinned) .bind(&user_stream.fee) .bind(&user_stream.event) .bind(&user_stream.endpoint_id) .bind(&user_stream.id) .execute(&self.db) .await .map_err(anyhow::Error::new)?; Ok(()) } pub async fn get_stream(&self, id: &Uuid) -> Result { Ok(sqlx::query_as("select * from user_stream where id = ?") .bind(id.to_string()) .fetch_one(&self.db) .await .map_err(anyhow::Error::new)?) } /// Get the list of active streams pub async fn list_live_streams(&self) -> Result> { Ok(sqlx::query_as("select * from user_stream where state = 2") .fetch_all(&self.db) .await?) } /// Add [duration] & [cost] to a stream and return the new user balance pub async fn tick_stream( &self, stream_id: &Uuid, user_id: u64, duration: f32, cost: i64, ) -> Result { let mut tx = self.db.begin().await?; sqlx::query("update user_stream set duration = duration + ?, cost = cost + ? where id = ?") .bind(&duration) .bind(&cost) .bind(stream_id.to_string()) .execute(&mut *tx) .await?; sqlx::query("update user set balance = balance - ? where id = ?") .bind(&cost) .bind(&user_id) .execute(&mut *tx) .await?; let balance: i64 = sqlx::query("select balance from user where id = ?") .bind(&user_id) .fetch_one(&mut *tx) .await? .try_get(0)?; tx.commit().await?; Ok(balance) } /// Create a new forward pub async fn create_forward(&self, user_id: u64, name: &str, target: &str) -> Result { let result = sqlx::query("insert into user_stream_forward (user_id, name, target) values (?, ?, ?)") .bind(user_id) .bind(name) .bind(target) .execute(&self.db) .await?; Ok(result.last_insert_id()) } /// Get all forwards for a user pub async fn get_user_forwards(&self, user_id: u64) -> Result> { Ok( sqlx::query_as("select * from user_stream_forward where user_id = ?") .bind(user_id) .fetch_all(&self.db) .await?, ) } /// Delete a forward pub async fn delete_forward(&self, user_id: u64, forward_id: u64) -> Result<()> { sqlx::query("delete from user_stream_forward where id = ? and user_id = ?") .bind(forward_id) .bind(user_id) .execute(&self.db) .await?; Ok(()) } /// Create a new stream key pub async fn create_stream_key( &self, user_id: u64, key: &str, expires: Option>, stream_id: &str, ) -> Result { let result = sqlx::query( "insert into user_stream_key (user_id, key, expires, stream_id) values (?, ?, ?, ?)", ) .bind(user_id) .bind(key) .bind(expires) .bind(stream_id) .execute(&self.db) .await?; Ok(result.last_insert_id()) } /// Get all stream keys for a user pub async fn get_user_stream_keys(&self, user_id: u64) -> Result> { Ok( sqlx::query_as("select * from user_stream_key where user_id = ?") .bind(user_id) .fetch_all(&self.db) .await?, ) } /// Delete a stream key pub async fn delete_stream_key(&self, user_id: u64, key_id: u64) -> Result<()> { sqlx::query("delete from user_stream_key where id = ? and user_id = ?") .bind(key_id) .bind(user_id) .execute(&self.db) .await?; Ok(()) } /// Find user by stream key (including temporary keys) pub async fn find_user_by_any_stream_key(&self, key: &str) -> Result> { #[cfg(feature = "test-pattern")] if key == "test" { return Ok(Some(self.upsert_user(&[0; 32]).await?)); } // First check primary stream key if let Some(uid) = self.find_user_stream_key(key).await? { return Ok(Some(uid)); } // Then check temporary stream keys Ok(sqlx::query("select user_id from user_stream_key where key = ? and (expires is null or expires > now())") .bind(key) .fetch_optional(&self.db) .await? .map(|r| r.try_get(0).unwrap())) } /// Create a payment record pub async fn create_payment( &self, payment_hash: &[u8], user_id: u64, invoice: Option<&str>, amount: u64, payment_type: PaymentType, fee: u64, ) -> Result<()> { sqlx::query("insert into payment (payment_hash, user_id, invoice, amount, payment_type, fee) values (?, ?, ?, ?, ?, ?)") .bind(payment_hash) .bind(user_id) .bind(invoice) .bind(amount) .bind(payment_type) .bind(fee) .execute(&self.db) .await?; Ok(()) } /// Mark payment as paid pub async fn mark_payment_paid(&self, payment_hash: &[u8]) -> Result<()> { sqlx::query("update payment set is_paid = true where payment_hash = ?") .bind(payment_hash) .execute(&self.db) .await?; Ok(()) } /// Update payment fee and mark as paid pub async fn complete_payment(&self, payment_hash: &[u8], fee: u64) -> Result<()> { sqlx::query("update payment set fee = ?, is_paid = true where payment_hash = ?") .bind(fee) .bind(payment_hash) .execute(&self.db) .await?; Ok(()) } /// Get payment by hash pub async fn get_payment(&self, payment_hash: &[u8]) -> Result> { Ok( sqlx::query_as("select * from payment where payment_hash = ?") .bind(payment_hash) .fetch_optional(&self.db) .await?, ) } /// Get payment history for user pub async fn get_payment_history( &self, user_id: u64, offset: u64, limit: u64, ) -> Result> { Ok(sqlx::query_as( "select * from payment where user_id = ? order by created desc limit ? offset ?", ) .bind(user_id) .bind(limit) .bind(offset) .fetch_all(&self.db) .await?) } /// Update user default stream info pub async fn update_user_defaults( &self, user_id: u64, title: Option<&str>, summary: Option<&str>, image: Option<&str>, tags: Option<&str>, content_warning: Option<&str>, goal: Option<&str>, ) -> Result<()> { sqlx::query("update user set title = ?, summary = ?, image = ?, tags = ?, content_warning = ?, goal = ? where id = ?") .bind(title) .bind(summary) .bind(image) .bind(tags) .bind(content_warning) .bind(goal) .bind(user_id) .execute(&self.db) .await?; Ok(()) } /// Get all ingest endpoints pub async fn get_ingest_endpoints(&self) -> Result> { Ok(sqlx::query_as("select * from ingest_endpoint") .fetch_all(&self.db) .await?) } /// Get ingest endpoint by id pub async fn get_ingest_endpoint(&self, endpoint_id: u64) -> Result> { Ok(sqlx::query_as("select * from ingest_endpoint where id = ?") .bind(endpoint_id) .fetch_optional(&self.db) .await?) } /// Create ingest endpoint pub async fn create_ingest_endpoint( &self, name: &str, cost: u64, capabilities: Option<&str>, ) -> Result { let result = sqlx::query("insert into ingest_endpoint (name, cost, capabilities) values (?, ?, ?)") .bind(name) .bind(cost) .bind(capabilities) .execute(&self.db) .await?; Ok(result.last_insert_id()) } }