diff --git a/Cargo.toml b/Cargo.toml index 28a5c3e..6b215c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" name = "api" [features] -default = ["mikrotik", "nostr-dm", "proxmox", "lnd", "cloudflare", "revolut"] +default = ["mikrotik", "nostr-dm", "proxmox", "lnd", "cloudflare", "revolut", "bitvora"] mikrotik = ["dep:reqwest"] nostr-dm = ["dep:nostr-sdk"] proxmox = ["dep:reqwest", "dep:ssh2", "dep:tokio-tungstenite"] diff --git a/docker-compose.yaml b/docker-compose.yaml index b65e96e..fd5b401 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,7 +2,7 @@ volumes: db: services: db: - image: mariadb + image: docker.io/mariadb restart: unless-stopped environment: - "MARIADB_ROOT_PASSWORD=root" diff --git a/src/api/routes.rs b/src/api/routes.rs index 83e060e..940bd19 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -18,7 +18,6 @@ use lnvps_db::{ IpRange, LNVpsDb, PaymentMethod, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate, }; use nostr::util::hex; -use rocket::futures::{SinkExt, StreamExt}; use rocket::serde::json::Json; use rocket::{get, patch, post, Responder, Route, State}; use rocket_okapi::gen::OpenApiGenerator; diff --git a/src/api/webhook.rs b/src/api/webhook.rs index 6f34718..52f6bcf 100644 --- a/src/api/webhook.rs +++ b/src/api/webhook.rs @@ -37,6 +37,7 @@ async fn revolut_webhook(req: WebhookMessage) -> Status { #[derive(Debug, Clone)] pub struct WebhookMessage { + pub endpoint: String, pub body: Vec, pub headers: HashMap, } @@ -60,6 +61,7 @@ impl<'r> FromData<'r> for WebhookMessage { return rocket::data::Outcome::Error((Status::BadRequest, ())); }; let msg = WebhookMessage { + endpoint: req.uri().path().to_string(), headers: header, body: body.value.to_vec(), }; diff --git a/src/lightning/bitvora.rs b/src/lightning/bitvora.rs index 2a538e8..2cf6462 100644 --- a/src/lightning/bitvora.rs +++ b/src/lightning/bitvora.rs @@ -1,9 +1,11 @@ -use crate::api::WEBHOOK_BRIDGE; +use crate::api::{WebhookMessage, WEBHOOK_BRIDGE}; use crate::json_api::JsonApi; use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode}; -use anyhow::bail; +use anyhow::{anyhow, bail}; use futures::{Stream, StreamExt}; +use hmac::{Hmac, Mac}; use lnvps_db::async_trait; +use log::{info, warn}; use serde::{Deserialize, Serialize}; use std::pin::Pin; use tokio_stream::wrappers::BroadcastStream; @@ -46,6 +48,7 @@ impl LightningNode for BitvoraNode { Ok(AddInvoiceResult { pr: rsp.data.payment_request, payment_hash: rsp.data.r_hash, + external_id: Some(rsp.data.id), }) } @@ -54,7 +57,45 @@ impl LightningNode for BitvoraNode { _from_payment_hash: Option>, ) -> anyhow::Result + Send>>> { let rx = BroadcastStream::new(WEBHOOK_BRIDGE.listen()); - let mapped = rx.then(|r| async move { InvoiceUpdate::Unknown }); + let secret = self.webhook_secret.clone(); + let mapped = rx.then(move |r| { + let secret = secret.clone(); + async move { + match r { + Ok(r) => { + if r.endpoint != "/api/v1/webhook/bitvora" { + return InvoiceUpdate::Unknown; + } + let body: BitvoraWebhookPayload = + match serde_json::from_slice(r.body.as_slice()) { + Ok(b) => b, + Err(e) => return InvoiceUpdate::Error(e.to_string()), + }; + info!("Received webhook {:?}", body); + let body = body.payload; + if let Err(e) = verify_webhook(&secret, &r) { + return InvoiceUpdate::Error(e.to_string()); + } + + match body.event { + BitvoraWebhookEvent::DepositLightningComplete => { + InvoiceUpdate::Settled { + payment_hash: None, + external_id: Some(body.data.id), + } + } + BitvoraWebhookEvent::DepositLightningFailed => { + InvoiceUpdate::Error("Payment failed".to_string()) + } + } + } + Err(e) => { + warn!("Error handling webhook: {}", e); + InvoiceUpdate::Error(e.to_string()) + } + } + } + }); Ok(Box::pin(mapped)) } } @@ -80,3 +121,47 @@ struct CreateInvoiceResponse { pub r_hash: String, pub payment_request: String, } + +#[derive(Deserialize, Debug, Clone)] +struct BitvoraWebhookPayload { + pub payload: T, +} + +#[derive(Deserialize, Debug, Clone)] +struct BitvoraWebhook { + pub event: BitvoraWebhookEvent, + pub data: BitvoraPayment, +} + +#[derive(Deserialize, Debug, Clone)] +enum BitvoraWebhookEvent { + #[serde(rename = "deposit.lightning.completed")] + DepositLightningComplete, + #[serde(rename = "deposit.lightning.failed")] + DepositLightningFailed, +} + +#[derive(Deserialize, Debug, Clone)] +struct BitvoraPayment { + pub id: String, +} + +type HmacSha256 = Hmac; +fn verify_webhook(secret: &str, msg: &WebhookMessage) -> anyhow::Result<()> { + let sig = msg + .headers + .get("bitvora-signature") + .ok_or_else(|| anyhow!("Missing bitvora-signature header"))?; + + let mut mac = HmacSha256::new_from_slice(secret.as_bytes())?; + mac.update(msg.body.as_slice()); + let result = mac.finalize().into_bytes(); + + if hex::encode(result) == *sig { + return Ok(()); + } else { + warn!("Invalid signature found {} != {}", sig, hex::encode(result)); + } + + bail!("No valid signature found!"); +} diff --git a/src/lightning/lnd.rs b/src/lightning/lnd.rs index 2df7e0b..f996eb7 100644 --- a/src/lightning/lnd.rs +++ b/src/lightning/lnd.rs @@ -40,6 +40,7 @@ impl LightningNode for LndNode { Ok(AddInvoiceResult { pr: inner.payment_request, payment_hash: hex::encode(inner.r_hash), + external_id: None, }) } @@ -78,7 +79,8 @@ impl LightningNode for LndNode { Ok(m) => { if m.state == InvoiceState::Settled as i32 { InvoiceUpdate::Settled { - payment_hash: hex::encode(m.r_hash), + payment_hash: Some(hex::encode(m.r_hash)), + external_id: None, } } else { InvoiceUpdate::Unknown diff --git a/src/lightning/mod.rs b/src/lightning/mod.rs index 2f81f16..e455c48 100644 --- a/src/lightning/mod.rs +++ b/src/lightning/mod.rs @@ -31,6 +31,7 @@ pub struct AddInvoiceRequest { pub struct AddInvoiceResult { pub pr: String, pub payment_hash: String, + pub external_id: Option, } #[derive(Debug, Clone)] @@ -39,7 +40,8 @@ pub enum InvoiceUpdate { Unknown, Error(String), Settled { - payment_hash: String, + payment_hash: Option, + external_id: Option, }, } diff --git a/src/payments/invoice.rs b/src/payments/invoice.rs index 09c47c8..4d76d13 100644 --- a/src/payments/invoice.rs +++ b/src/payments/invoice.rs @@ -1,7 +1,7 @@ use crate::lightning::{InvoiceUpdate, LightningNode}; use crate::worker::WorkJob; use anyhow::Result; -use lnvps_db::LNVpsDb; +use lnvps_db::{LNVpsDb, VmPayment}; use log::{error, info, warn}; use nostr::util::hex; use rocket::futures::StreamExt; @@ -25,10 +25,19 @@ impl NodeInvoiceHandler { async fn mark_paid(&self, id: &Vec) -> Result<()> { let p = self.db.get_vm_payment(id).await?; - self.db.vm_payment_paid(&p).await?; + self.mark_payment_paid(&p).await + } - info!("VM payment {} for {}, paid", hex::encode(p.id), p.vm_id); - self.tx.send(WorkJob::CheckVm { vm_id: p.vm_id })?; + async fn mark_paid_ext_id(&self, external_id: &str) -> Result<()> { + let p = self.db.get_vm_payment_by_ext_id(external_id).await?; + self.mark_payment_paid(&p).await + } + + async fn mark_payment_paid(&self, payment: &VmPayment) -> Result<()> { + self.db.vm_payment_paid(&payment).await?; + + info!("VM payment {} for {}, paid", hex::encode(&payment.id), payment.vm_id); + self.tx.send(WorkJob::CheckVm { vm_id: payment.vm_id })?; Ok(()) } @@ -46,10 +55,22 @@ impl NodeInvoiceHandler { let mut handler = self.node.subscribe_invoices(from_ph).await?; while let Some(msg) = handler.next().await { match msg { - InvoiceUpdate::Settled { payment_hash } => { - let r_hash = hex::decode(payment_hash)?; - if let Err(e) = self.mark_paid(&r_hash).await { - error!("{}", e); + InvoiceUpdate::Settled { + payment_hash, + external_id, + } => { + if let Some(h) = payment_hash { + let r_hash = hex::decode(h)?; + if let Err(e) = self.mark_paid(&r_hash).await { + error!("{}", e); + } + continue; + } + if let Some(e) = external_id { + if let Err(e) = self.mark_paid_ext_id(&e).await { + error!("{}", e); + } + continue; } } v => warn!("Unknown invoice update: {:?}", v), diff --git a/src/payments/mod.rs b/src/payments/mod.rs index 3749b8a..6c70287 100644 --- a/src/payments/mod.rs +++ b/src/payments/mod.rs @@ -32,7 +32,6 @@ pub fn listen_all_payments( #[cfg(feature = "revolut")] { - use crate::payments::revolut::RevolutPaymentHandler; if let Some(r) = &settings.revolut { let mut handler = RevolutPaymentHandler::new( diff --git a/src/payments/revolut.rs b/src/payments/revolut.rs index b82c91c..f5a29db 100644 --- a/src/payments/revolut.rs +++ b/src/payments/revolut.rs @@ -57,6 +57,9 @@ impl RevolutPaymentHandler { // listen to events let mut listenr = WEBHOOK_BRIDGE.listen(); while let Ok(m) = listenr.recv().await { + if m.endpoint != "/api/v1/webhook/revolut" { + continue; + } let body: RevolutWebhook = serde_json::from_slice(m.body.as_slice())?; info!("Received webhook {:?}", body); if let Err(e) = verify_webhook(&secret, &m) { diff --git a/src/provisioner/capacity.rs b/src/provisioner/capacity.rs index 067140c..cc3ee29 100644 --- a/src/provisioner/capacity.rs +++ b/src/provisioner/capacity.rs @@ -145,8 +145,8 @@ impl HostCapacityService { .map(|s| { let usage = vm_resources .iter() - .filter(|(k, v)| s.id == v.disk_id) - .fold(0, |acc, (k, v)| acc + v.disk); + .filter(|(_k, v)| s.id == v.disk_id) + .fold(0, |acc, (_k, v)| acc + v.disk); DiskCapacity { load_factor: host.load_factor, disk: s.clone(), diff --git a/src/provisioner/lnvps.rs b/src/provisioner/lnvps.rs index bc5f553..5b09bf5 100644 --- a/src/provisioner/lnvps.rs +++ b/src/provisioner/lnvps.rs @@ -404,7 +404,7 @@ impl LNVpsProvisioner { is_paid: false, rate, external_data: invoice.pr, - external_id: None, + external_id: invoice.external_id, } } PaymentMethod::Revolut => { diff --git a/src/provisioner/pricing.rs b/src/provisioner/pricing.rs index 66b2520..57fada1 100644 --- a/src/provisioner/pricing.rs +++ b/src/provisioner/pricing.rs @@ -1,5 +1,5 @@ use crate::exchange::{Currency, CurrencyAmount, ExchangeRateService, Ticker, TickerRate}; -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use chrono::{DateTime, Days, Months, TimeDelta, Utc}; use ipnetwork::IpNetwork; use isocountry::CountryCode;