This commit is contained in:
kieran 2025-02-05 16:28:37 +00:00
parent be52a49c02
commit 222198fcb6
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
8 changed files with 137 additions and 34 deletions

View File

@ -1,7 +1,7 @@
-- Add migration script here -- Add migration script here
alter table users alter table users
add column paid_until timestamp, add column paid_until timestamp,
add column paid_space integer unsigned not null; add column paid_size integer unsigned not null;
create table payments create table payments
( (

View File

@ -3,6 +3,7 @@ use crate::filesystem::FileStore;
use crate::processing::probe_file; use crate::processing::probe_file;
use anyhow::Result; use anyhow::Result;
use log::{error, info, warn}; use log::{error, info, warn};
use tokio::sync::broadcast;
pub struct MediaMetadata { pub struct MediaMetadata {
db: Database, db: Database,
@ -14,12 +15,16 @@ impl MediaMetadata {
Self { db, fs } Self { db, fs }
} }
pub async fn process(&mut self) -> Result<()> { pub async fn process(&mut self, mut rx: broadcast::Receiver<()>) -> Result<()> {
let to_migrate = self.db.get_missing_media_metadata().await?; let to_migrate = self.db.get_missing_media_metadata().await?;
info!("{} files are missing metadata", to_migrate.len()); info!("{} files are missing metadata", to_migrate.len());
for file in to_migrate { for file in to_migrate {
if rx.try_recv().is_ok() {
info!("Shutting down MediaMetadata process");
break;
}
// probe file and update metadata // probe file and update metadata
let path = self.fs.get(&file.id); let path = self.fs.get(&file.id);
match probe_file(&path) { match probe_file(&path) {

View File

@ -1,7 +1,7 @@
use crate::db::Database; use crate::db::Database;
use crate::filesystem::FileStore; use crate::filesystem::FileStore;
use anyhow::Result; use log::{error, info, warn};
use log::{info, warn}; use tokio::sync::broadcast;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
#[cfg(feature = "media-compression")] #[cfg(feature = "media-compression")]
@ -13,31 +13,38 @@ mod payments;
pub fn start_background_tasks( pub fn start_background_tasks(
db: Database, db: Database,
file_store: FileStore, file_store: FileStore,
shutdown_rx: broadcast::Receiver<()>,
#[cfg(feature = "payments")] client: Option<fedimint_tonic_lnd::Client>, #[cfg(feature = "payments")] client: Option<fedimint_tonic_lnd::Client>,
) -> Vec<JoinHandle<Result<()>>> { ) -> Vec<JoinHandle<()>> {
let mut ret = vec![]; let mut ret = vec![];
#[cfg(feature = "media-compression")] #[cfg(feature = "media-compression")]
{ {
let db = db.clone(); let db = db.clone();
let rx = shutdown_rx.resubscribe();
ret.push(tokio::spawn(async move { ret.push(tokio::spawn(async move {
info!("Starting MediaMetadata background task"); info!("Starting MediaMetadata background task");
let mut m = media_metadata::MediaMetadata::new(db, file_store.clone()); let mut m = media_metadata::MediaMetadata::new(db, file_store.clone());
m.process().await?; if let Err(e) = m.process(rx).await {
error!("MediaMetadata failed: {}", e);
} else {
info!("MediaMetadata background task completed"); info!("MediaMetadata background task completed");
Ok(()) }
})); }));
} }
#[cfg(feature = "payments")] #[cfg(feature = "payments")]
{ {
if let Some(client) = client { if let Some(client) = client {
let db = db.clone(); let db = db.clone();
let rx = shutdown_rx.resubscribe();
ret.push(tokio::spawn(async move { ret.push(tokio::spawn(async move {
info!("Starting PaymentsHandler background task"); info!("Starting PaymentsHandler background task");
let mut m = payments::PaymentsHandler::new(client, db); let mut m = payments::PaymentsHandler::new(client, db);
m.process().await?; if let Err(e) = m.process(rx).await {
error!("PaymentsHandler failed: {}", e);
} else {
info!("PaymentsHandler background task completed"); info!("PaymentsHandler background task completed");
Ok(()) }
})); }));
} else { } else {
warn!("Not starting PaymentsHandler, configuration missing") warn!("Not starting PaymentsHandler, configuration missing")

View File

@ -1,6 +1,12 @@
use crate::db::Database; use crate::db::Database;
use anyhow::Result; use anyhow::Result;
use fedimint_tonic_lnd::lnrpc::invoice::InvoiceState;
use fedimint_tonic_lnd::lnrpc::InvoiceSubscription;
use fedimint_tonic_lnd::Client; use fedimint_tonic_lnd::Client;
use log::{error, info};
use rocket::futures::StreamExt;
use sqlx::Row;
use tokio::sync::broadcast;
pub struct PaymentsHandler { pub struct PaymentsHandler {
client: Client, client: Client,
@ -12,7 +18,54 @@ impl PaymentsHandler {
PaymentsHandler { client, database } PaymentsHandler { client, database }
} }
pub async fn process(&mut self) -> Result<()> { pub async fn process(&mut self, mut rx: broadcast::Receiver<()>) -> Result<()> {
let start_idx = self.database.get_last_settle_index().await?;
let mut invoices = self
.client
.lightning()
.subscribe_invoices(InvoiceSubscription {
add_index: 0,
settle_index: start_idx,
})
.await?;
info!("Starting invoice subscription from {}", start_idx);
let invoices = invoices.get_mut();
loop {
tokio::select! {
Ok(_) = rx.recv() => {
break;
}
Some(Ok(msg)) = invoices.next() => {
if msg.state == InvoiceState::Settled as i32 {
if let Ok(Some(mut p)) = self.database.get_payment(&msg.r_hash).await {
p.settle_index = Some(msg.settle_index);
p.is_paid = true;
match self.database.complete_payment(&p).await {
Ok(()) => info!(
"Successfully completed payment: {}",
hex::encode(&msg.r_hash)
),
Err(e) => error!("Failed to complete payment: {}", e),
}
}
}
}
}
}
Ok(()) Ok(())
} }
} }
impl Database {
async fn get_last_settle_index(&self) -> Result<u64> {
Ok(
sqlx::query("select max(settle_index) from payments where is_paid = true")
.fetch_one(&self.pool)
.await?
.try_get(0)
.unwrap_or(0),
)
}
}

View File

@ -21,6 +21,7 @@ use route96::filesystem::FileStore;
use route96::routes; use route96::routes;
use route96::routes::{get_blob, head_blob, root}; use route96::routes::{get_blob, head_blob, root};
use route96::settings::Settings; use route96::settings::Settings;
use tokio::sync::broadcast;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version, about)] #[command(version, about)]
@ -132,18 +133,18 @@ async fn main() -> Result<(), Error> {
} }
}; };
let jh = start_background_tasks(db, fs, lnd); let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let jh = start_background_tasks(db, fs, shutdown_rx, lnd);
if let Err(e) = rocket.launch().await { if let Err(e) = rocket.launch().await {
error!("Rocker error {}", e); error!("Rocker error {}", e);
for j in jh {
let _ = j.await?;
} }
Err(Error::from(e)) shutdown_tx
} else { .send(())
.expect("Failed to send shutdown signal");
for j in jh { for j in jh {
let _ = j.await?; j.await?;
} }
Ok(()) Ok(())
} }
}

View File

@ -64,7 +64,7 @@ pub struct User {
#[cfg(feature = "payments")] #[cfg(feature = "payments")]
pub paid_until: Option<DateTime<Utc>>, pub paid_until: Option<DateTime<Utc>>,
#[cfg(feature = "payments")] #[cfg(feature = "payments")]
pub paid_space: u64, pub paid_size: u64,
} }
#[cfg(feature = "labels")] #[cfg(feature = "labels")]
@ -104,8 +104,8 @@ pub struct Payment {
pub is_paid: bool, pub is_paid: bool,
pub days_value: u64, pub days_value: u64,
pub size_value: u64, pub size_value: u64,
pub settle_index: u64, pub settle_index: Option<u64>,
pub rate: f32, pub rate: Option<f32>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -294,13 +294,50 @@ impl Database {
pub async fn insert_payment(&self, payment: &Payment) -> Result<(), Error> { pub async fn insert_payment(&self, payment: &Payment) -> Result<(), Error> {
sqlx::query("insert into payments(payment_hash,user_id,amount,days_value,size_value,rate) values(?,?,?,?,?,?)") sqlx::query("insert into payments(payment_hash,user_id,amount,days_value,size_value,rate) values(?,?,?,?,?,?)")
.bind(&payment.payment_hash) .bind(&payment.payment_hash)
.bind(&payment.user_id) .bind(payment.user_id)
.bind(&payment.amount) .bind(payment.amount)
.bind(&payment.days_value) .bind(payment.days_value)
.bind(&payment.size_value) .bind(payment.size_value)
.bind(&payment.rate) .bind(payment.rate)
.execute(&self.pool) .execute(&self.pool)
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn get_payment(&self, payment_hash: &Vec<u8>) -> Result<Option<Payment>, Error> {
sqlx::query_as("select * from payments where payment_hash = ?")
.bind(payment_hash)
.fetch_optional(&self.pool)
.await
}
pub async fn get_user_payments(&self, uid: u64) -> Result<Vec<Payment>, Error> {
sqlx::query_as("select * from payments where user_id = ?")
.bind(uid)
.fetch_all(&self.pool)
.await
}
pub async fn complete_payment(&self, payment: &Payment) -> Result<(), Error> {
let mut tx = self.pool.begin().await?;
sqlx::query("update payments set is_paid = true, settle_index = ? where payment_hash = ?")
.bind(payment.settle_index)
.bind(&payment.payment_hash)
.execute(&mut *tx)
.await?;
// TODO: check space is not downgraded
sqlx::query("update users set paid_until = TIMESTAMPADD(DAY, ?, IFNULL(paid_until, current_timestamp)), paid_size = ? where id = ?")
.bind(payment.days_value)
.bind(payment.size_value)
.bind(payment.user_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
} }

View File

@ -86,7 +86,7 @@ async fn admin_get_self(auth: Nip98Auth, db: &State<Database>) -> AdminResponse<
} else { } else {
0 0
}, },
quota: user.paid_space, quota: user.paid_size,
}) })
} }
Err(_) => AdminResponse::error("User not found"), Err(_) => AdminResponse::error("User not found"),

View File

@ -109,15 +109,15 @@ async fn req_payment(
}; };
let record = Payment { let record = Payment {
payment_hash: invoice.get_ref().payment_addr.clone(), payment_hash: invoice.get_ref().r_hash.clone(),
user_id: uid, user_id: uid,
created: Default::default(), created: Default::default(),
amount: msat, amount: msat,
is_paid: false, is_paid: false,
days_value, days_value,
size_value: cfg.unit.to_size(req.units), size_value: cfg.unit.to_size(req.units),
settle_index: 0, settle_index: None,
rate: 0.0, rate: None,
}; };
if let Err(e) = db.insert_payment(&record).await { if let Err(e) = db.insert_payment(&record).await {