From 222198fcb656287171c321083f4fe822d9449180 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 5 Feb 2025 16:28:37 +0000 Subject: [PATCH] wip --- migrations/20250202135844_payments.sql | 2 +- src/background/media_metadata.rs | 7 +++- src/background/mod.rs | 25 +++++++----- src/background/payments.rs | 55 +++++++++++++++++++++++++- src/bin/main.rs | 21 +++++----- src/db.rs | 53 +++++++++++++++++++++---- src/routes/admin.rs | 2 +- src/routes/payment.rs | 6 +-- 8 files changed, 137 insertions(+), 34 deletions(-) diff --git a/migrations/20250202135844_payments.sql b/migrations/20250202135844_payments.sql index 387ea04..c86a593 100644 --- a/migrations/20250202135844_payments.sql +++ b/migrations/20250202135844_payments.sql @@ -1,7 +1,7 @@ -- Add migration script here alter table users add column paid_until timestamp, - add column paid_space integer unsigned not null; + add column paid_size integer unsigned not null; create table payments ( diff --git a/src/background/media_metadata.rs b/src/background/media_metadata.rs index be35fe6..71cf9e2 100644 --- a/src/background/media_metadata.rs +++ b/src/background/media_metadata.rs @@ -3,6 +3,7 @@ use crate::filesystem::FileStore; use crate::processing::probe_file; use anyhow::Result; use log::{error, info, warn}; +use tokio::sync::broadcast; pub struct MediaMetadata { db: Database, @@ -14,12 +15,16 @@ impl MediaMetadata { Self { db, fs } } - pub async fn process(&mut self) -> Result<()> { + pub async fn process(&mut self, mut rx: broadcast::Receiver<()>) -> Result<()> { let to_migrate = self.db.get_missing_media_metadata().await?; info!("{} files are missing metadata", to_migrate.len()); for file in to_migrate { + if rx.try_recv().is_ok() { + info!("Shutting down MediaMetadata process"); + break; + } // probe file and update metadata let path = self.fs.get(&file.id); match probe_file(&path) { diff --git a/src/background/mod.rs b/src/background/mod.rs index 0b3b691..9b7057a 100644 --- a/src/background/mod.rs +++ b/src/background/mod.rs @@ -1,7 +1,7 @@ use crate::db::Database; use crate::filesystem::FileStore; -use anyhow::Result; -use log::{info, warn}; +use log::{error, info, warn}; +use tokio::sync::broadcast; use tokio::task::JoinHandle; #[cfg(feature = "media-compression")] @@ -13,31 +13,38 @@ mod payments; pub fn start_background_tasks( db: Database, file_store: FileStore, + shutdown_rx: broadcast::Receiver<()>, #[cfg(feature = "payments")] client: Option, -) -> Vec>> { +) -> Vec> { let mut ret = vec![]; #[cfg(feature = "media-compression")] { let db = db.clone(); + let rx = shutdown_rx.resubscribe(); ret.push(tokio::spawn(async move { info!("Starting MediaMetadata background task"); let mut m = media_metadata::MediaMetadata::new(db, file_store.clone()); - m.process().await?; - info!("MediaMetadata background task completed"); - Ok(()) + if let Err(e) = m.process(rx).await { + error!("MediaMetadata failed: {}", e); + } else { + info!("MediaMetadata background task completed"); + } })); } #[cfg(feature = "payments")] { if let Some(client) = client { let db = db.clone(); + let rx = shutdown_rx.resubscribe(); ret.push(tokio::spawn(async move { info!("Starting PaymentsHandler background task"); let mut m = payments::PaymentsHandler::new(client, db); - m.process().await?; - info!("PaymentsHandler background task completed"); - Ok(()) + if let Err(e) = m.process(rx).await { + error!("PaymentsHandler failed: {}", e); + } else { + info!("PaymentsHandler background task completed"); + } })); } else { warn!("Not starting PaymentsHandler, configuration missing") diff --git a/src/background/payments.rs b/src/background/payments.rs index 9595418..cd8b324 100644 --- a/src/background/payments.rs +++ b/src/background/payments.rs @@ -1,6 +1,12 @@ use crate::db::Database; use anyhow::Result; +use fedimint_tonic_lnd::lnrpc::invoice::InvoiceState; +use fedimint_tonic_lnd::lnrpc::InvoiceSubscription; use fedimint_tonic_lnd::Client; +use log::{error, info}; +use rocket::futures::StreamExt; +use sqlx::Row; +use tokio::sync::broadcast; pub struct PaymentsHandler { client: Client, @@ -12,7 +18,54 @@ impl PaymentsHandler { PaymentsHandler { client, database } } - pub async fn process(&mut self) -> Result<()> { + pub async fn process(&mut self, mut rx: broadcast::Receiver<()>) -> Result<()> { + let start_idx = self.database.get_last_settle_index().await?; + let mut invoices = self + .client + .lightning() + .subscribe_invoices(InvoiceSubscription { + add_index: 0, + settle_index: start_idx, + }) + .await?; + info!("Starting invoice subscription from {}", start_idx); + + let invoices = invoices.get_mut(); + loop { + tokio::select! { + Ok(_) = rx.recv() => { + break; + } + Some(Ok(msg)) = invoices.next() => { + if msg.state == InvoiceState::Settled as i32 { + if let Ok(Some(mut p)) = self.database.get_payment(&msg.r_hash).await { + p.settle_index = Some(msg.settle_index); + p.is_paid = true; + match self.database.complete_payment(&p).await { + Ok(()) => info!( + "Successfully completed payment: {}", + hex::encode(&msg.r_hash) + ), + Err(e) => error!("Failed to complete payment: {}", e), + } + } + } + } + } + } + Ok(()) } } + +impl Database { + async fn get_last_settle_index(&self) -> Result { + Ok( + sqlx::query("select max(settle_index) from payments where is_paid = true") + .fetch_one(&self.pool) + .await? + .try_get(0) + .unwrap_or(0), + ) + } +} diff --git a/src/bin/main.rs b/src/bin/main.rs index ce6cc3b..d047766 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -21,6 +21,7 @@ use route96::filesystem::FileStore; use route96::routes; use route96::routes::{get_blob, head_blob, root}; use route96::settings::Settings; +use tokio::sync::broadcast; #[derive(Parser, Debug)] #[command(version, about)] @@ -132,18 +133,18 @@ async fn main() -> Result<(), Error> { } }; - let jh = start_background_tasks(db, fs, lnd); + let (shutdown_tx, shutdown_rx) = broadcast::channel(1); + let jh = start_background_tasks(db, fs, shutdown_rx, lnd); if let Err(e) = rocket.launch().await { error!("Rocker error {}", e); - for j in jh { - let _ = j.await?; - } - Err(Error::from(e)) - } else { - for j in jh { - let _ = j.await?; - } - Ok(()) } + shutdown_tx + .send(()) + .expect("Failed to send shutdown signal"); + + for j in jh { + j.await?; + } + Ok(()) } diff --git a/src/db.rs b/src/db.rs index 63b5dd6..7c1e0de 100644 --- a/src/db.rs +++ b/src/db.rs @@ -64,7 +64,7 @@ pub struct User { #[cfg(feature = "payments")] pub paid_until: Option>, #[cfg(feature = "payments")] - pub paid_space: u64, + pub paid_size: u64, } #[cfg(feature = "labels")] @@ -104,8 +104,8 @@ pub struct Payment { pub is_paid: bool, pub days_value: u64, pub size_value: u64, - pub settle_index: u64, - pub rate: f32, + pub settle_index: Option, + pub rate: Option, } #[derive(Clone)] @@ -294,13 +294,50 @@ impl Database { pub async fn insert_payment(&self, payment: &Payment) -> Result<(), Error> { sqlx::query("insert into payments(payment_hash,user_id,amount,days_value,size_value,rate) values(?,?,?,?,?,?)") .bind(&payment.payment_hash) - .bind(&payment.user_id) - .bind(&payment.amount) - .bind(&payment.days_value) - .bind(&payment.size_value) - .bind(&payment.rate) + .bind(payment.user_id) + .bind(payment.amount) + .bind(payment.days_value) + .bind(payment.size_value) + .bind(payment.rate) .execute(&self.pool) .await?; Ok(()) } + + pub async fn get_payment(&self, payment_hash: &Vec) -> Result, Error> { + sqlx::query_as("select * from payments where payment_hash = ?") + .bind(payment_hash) + .fetch_optional(&self.pool) + .await + } + + pub async fn get_user_payments(&self, uid: u64) -> Result, Error> { + sqlx::query_as("select * from payments where user_id = ?") + .bind(uid) + .fetch_all(&self.pool) + .await + } + + pub async fn complete_payment(&self, payment: &Payment) -> Result<(), Error> { + let mut tx = self.pool.begin().await?; + + sqlx::query("update payments set is_paid = true, settle_index = ? where payment_hash = ?") + .bind(payment.settle_index) + .bind(&payment.payment_hash) + .execute(&mut *tx) + .await?; + + // TODO: check space is not downgraded + + sqlx::query("update users set paid_until = TIMESTAMPADD(DAY, ?, IFNULL(paid_until, current_timestamp)), paid_size = ? where id = ?") + .bind(payment.days_value) + .bind(payment.size_value) + .bind(payment.user_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(()) + } } diff --git a/src/routes/admin.rs b/src/routes/admin.rs index 8672ee8..cde1ec1 100644 --- a/src/routes/admin.rs +++ b/src/routes/admin.rs @@ -86,7 +86,7 @@ async fn admin_get_self(auth: Nip98Auth, db: &State) -> AdminResponse< } else { 0 }, - quota: user.paid_space, + quota: user.paid_size, }) } Err(_) => AdminResponse::error("User not found"), diff --git a/src/routes/payment.rs b/src/routes/payment.rs index 53fc88b..9d47bc0 100644 --- a/src/routes/payment.rs +++ b/src/routes/payment.rs @@ -109,15 +109,15 @@ async fn req_payment( }; let record = Payment { - payment_hash: invoice.get_ref().payment_addr.clone(), + payment_hash: invoice.get_ref().r_hash.clone(), user_id: uid, created: Default::default(), amount: msat, is_paid: false, days_value, size_value: cfg.unit.to_size(req.units), - settle_index: 0, - rate: 0.0, + settle_index: None, + rate: None, }; if let Err(e) = db.insert_payment(&record).await {