feat: complete bitvora webhook
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
@ -7,7 +7,7 @@ edition = "2021"
|
|||||||
name = "api"
|
name = "api"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["mikrotik", "nostr-dm", "proxmox", "lnd", "cloudflare", "revolut"]
|
default = ["mikrotik", "nostr-dm", "proxmox", "lnd", "cloudflare", "revolut", "bitvora"]
|
||||||
mikrotik = ["dep:reqwest"]
|
mikrotik = ["dep:reqwest"]
|
||||||
nostr-dm = ["dep:nostr-sdk"]
|
nostr-dm = ["dep:nostr-sdk"]
|
||||||
proxmox = ["dep:reqwest", "dep:ssh2", "dep:tokio-tungstenite"]
|
proxmox = ["dep:reqwest", "dep:ssh2", "dep:tokio-tungstenite"]
|
||||||
|
@ -2,7 +2,7 @@ volumes:
|
|||||||
db:
|
db:
|
||||||
services:
|
services:
|
||||||
db:
|
db:
|
||||||
image: mariadb
|
image: docker.io/mariadb
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
environment:
|
environment:
|
||||||
- "MARIADB_ROOT_PASSWORD=root"
|
- "MARIADB_ROOT_PASSWORD=root"
|
||||||
|
@ -18,7 +18,6 @@ use lnvps_db::{
|
|||||||
IpRange, LNVpsDb, PaymentMethod, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate,
|
IpRange, LNVpsDb, PaymentMethod, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate,
|
||||||
};
|
};
|
||||||
use nostr::util::hex;
|
use nostr::util::hex;
|
||||||
use rocket::futures::{SinkExt, StreamExt};
|
|
||||||
use rocket::serde::json::Json;
|
use rocket::serde::json::Json;
|
||||||
use rocket::{get, patch, post, Responder, Route, State};
|
use rocket::{get, patch, post, Responder, Route, State};
|
||||||
use rocket_okapi::gen::OpenApiGenerator;
|
use rocket_okapi::gen::OpenApiGenerator;
|
||||||
|
@ -37,6 +37,7 @@ async fn revolut_webhook(req: WebhookMessage) -> Status {
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct WebhookMessage {
|
pub struct WebhookMessage {
|
||||||
|
pub endpoint: String,
|
||||||
pub body: Vec<u8>,
|
pub body: Vec<u8>,
|
||||||
pub headers: HashMap<String, String>,
|
pub headers: HashMap<String, String>,
|
||||||
}
|
}
|
||||||
@ -60,6 +61,7 @@ impl<'r> FromData<'r> for WebhookMessage {
|
|||||||
return rocket::data::Outcome::Error((Status::BadRequest, ()));
|
return rocket::data::Outcome::Error((Status::BadRequest, ()));
|
||||||
};
|
};
|
||||||
let msg = WebhookMessage {
|
let msg = WebhookMessage {
|
||||||
|
endpoint: req.uri().path().to_string(),
|
||||||
headers: header,
|
headers: header,
|
||||||
body: body.value.to_vec(),
|
body: body.value.to_vec(),
|
||||||
};
|
};
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
use crate::api::WEBHOOK_BRIDGE;
|
use crate::api::{WebhookMessage, WEBHOOK_BRIDGE};
|
||||||
use crate::json_api::JsonApi;
|
use crate::json_api::JsonApi;
|
||||||
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
|
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
|
||||||
use anyhow::bail;
|
use anyhow::{anyhow, bail};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
|
use hmac::{Hmac, Mac};
|
||||||
use lnvps_db::async_trait;
|
use lnvps_db::async_trait;
|
||||||
|
use log::{info, warn};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use tokio_stream::wrappers::BroadcastStream;
|
use tokio_stream::wrappers::BroadcastStream;
|
||||||
@ -46,6 +48,7 @@ impl LightningNode for BitvoraNode {
|
|||||||
Ok(AddInvoiceResult {
|
Ok(AddInvoiceResult {
|
||||||
pr: rsp.data.payment_request,
|
pr: rsp.data.payment_request,
|
||||||
payment_hash: rsp.data.r_hash,
|
payment_hash: rsp.data.r_hash,
|
||||||
|
external_id: Some(rsp.data.id),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,7 +57,45 @@ impl LightningNode for BitvoraNode {
|
|||||||
_from_payment_hash: Option<Vec<u8>>,
|
_from_payment_hash: Option<Vec<u8>>,
|
||||||
) -> anyhow::Result<Pin<Box<dyn Stream<Item = InvoiceUpdate> + Send>>> {
|
) -> anyhow::Result<Pin<Box<dyn Stream<Item = InvoiceUpdate> + Send>>> {
|
||||||
let rx = BroadcastStream::new(WEBHOOK_BRIDGE.listen());
|
let rx = BroadcastStream::new(WEBHOOK_BRIDGE.listen());
|
||||||
let mapped = rx.then(|r| async move { InvoiceUpdate::Unknown });
|
let secret = self.webhook_secret.clone();
|
||||||
|
let mapped = rx.then(move |r| {
|
||||||
|
let secret = secret.clone();
|
||||||
|
async move {
|
||||||
|
match r {
|
||||||
|
Ok(r) => {
|
||||||
|
if r.endpoint != "/api/v1/webhook/bitvora" {
|
||||||
|
return InvoiceUpdate::Unknown;
|
||||||
|
}
|
||||||
|
let body: BitvoraWebhookPayload<BitvoraWebhook> =
|
||||||
|
match serde_json::from_slice(r.body.as_slice()) {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(e) => return InvoiceUpdate::Error(e.to_string()),
|
||||||
|
};
|
||||||
|
info!("Received webhook {:?}", body);
|
||||||
|
let body = body.payload;
|
||||||
|
if let Err(e) = verify_webhook(&secret, &r) {
|
||||||
|
return InvoiceUpdate::Error(e.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
match body.event {
|
||||||
|
BitvoraWebhookEvent::DepositLightningComplete => {
|
||||||
|
InvoiceUpdate::Settled {
|
||||||
|
payment_hash: None,
|
||||||
|
external_id: Some(body.data.id),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BitvoraWebhookEvent::DepositLightningFailed => {
|
||||||
|
InvoiceUpdate::Error("Payment failed".to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error handling webhook: {}", e);
|
||||||
|
InvoiceUpdate::Error(e.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
Ok(Box::pin(mapped))
|
Ok(Box::pin(mapped))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -80,3 +121,47 @@ struct CreateInvoiceResponse {
|
|||||||
pub r_hash: String,
|
pub r_hash: String,
|
||||||
pub payment_request: String,
|
pub payment_request: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
struct BitvoraWebhookPayload<T> {
|
||||||
|
pub payload: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
struct BitvoraWebhook {
|
||||||
|
pub event: BitvoraWebhookEvent,
|
||||||
|
pub data: BitvoraPayment,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
enum BitvoraWebhookEvent {
|
||||||
|
#[serde(rename = "deposit.lightning.completed")]
|
||||||
|
DepositLightningComplete,
|
||||||
|
#[serde(rename = "deposit.lightning.failed")]
|
||||||
|
DepositLightningFailed,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
struct BitvoraPayment {
|
||||||
|
pub id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
type HmacSha256 = Hmac<sha2::Sha256>;
|
||||||
|
fn verify_webhook(secret: &str, msg: &WebhookMessage) -> anyhow::Result<()> {
|
||||||
|
let sig = msg
|
||||||
|
.headers
|
||||||
|
.get("bitvora-signature")
|
||||||
|
.ok_or_else(|| anyhow!("Missing bitvora-signature header"))?;
|
||||||
|
|
||||||
|
let mut mac = HmacSha256::new_from_slice(secret.as_bytes())?;
|
||||||
|
mac.update(msg.body.as_slice());
|
||||||
|
let result = mac.finalize().into_bytes();
|
||||||
|
|
||||||
|
if hex::encode(result) == *sig {
|
||||||
|
return Ok(());
|
||||||
|
} else {
|
||||||
|
warn!("Invalid signature found {} != {}", sig, hex::encode(result));
|
||||||
|
}
|
||||||
|
|
||||||
|
bail!("No valid signature found!");
|
||||||
|
}
|
||||||
|
@ -40,6 +40,7 @@ impl LightningNode for LndNode {
|
|||||||
Ok(AddInvoiceResult {
|
Ok(AddInvoiceResult {
|
||||||
pr: inner.payment_request,
|
pr: inner.payment_request,
|
||||||
payment_hash: hex::encode(inner.r_hash),
|
payment_hash: hex::encode(inner.r_hash),
|
||||||
|
external_id: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,7 +79,8 @@ impl LightningNode for LndNode {
|
|||||||
Ok(m) => {
|
Ok(m) => {
|
||||||
if m.state == InvoiceState::Settled as i32 {
|
if m.state == InvoiceState::Settled as i32 {
|
||||||
InvoiceUpdate::Settled {
|
InvoiceUpdate::Settled {
|
||||||
payment_hash: hex::encode(m.r_hash),
|
payment_hash: Some(hex::encode(m.r_hash)),
|
||||||
|
external_id: None,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
InvoiceUpdate::Unknown
|
InvoiceUpdate::Unknown
|
||||||
|
@ -31,6 +31,7 @@ pub struct AddInvoiceRequest {
|
|||||||
pub struct AddInvoiceResult {
|
pub struct AddInvoiceResult {
|
||||||
pub pr: String,
|
pub pr: String,
|
||||||
pub payment_hash: String,
|
pub payment_hash: String,
|
||||||
|
pub external_id: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -39,7 +40,8 @@ pub enum InvoiceUpdate {
|
|||||||
Unknown,
|
Unknown,
|
||||||
Error(String),
|
Error(String),
|
||||||
Settled {
|
Settled {
|
||||||
payment_hash: String,
|
payment_hash: Option<String>,
|
||||||
|
external_id: Option<String>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use crate::lightning::{InvoiceUpdate, LightningNode};
|
use crate::lightning::{InvoiceUpdate, LightningNode};
|
||||||
use crate::worker::WorkJob;
|
use crate::worker::WorkJob;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use lnvps_db::LNVpsDb;
|
use lnvps_db::{LNVpsDb, VmPayment};
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use nostr::util::hex;
|
use nostr::util::hex;
|
||||||
use rocket::futures::StreamExt;
|
use rocket::futures::StreamExt;
|
||||||
@ -25,10 +25,19 @@ impl NodeInvoiceHandler {
|
|||||||
|
|
||||||
async fn mark_paid(&self, id: &Vec<u8>) -> Result<()> {
|
async fn mark_paid(&self, id: &Vec<u8>) -> Result<()> {
|
||||||
let p = self.db.get_vm_payment(id).await?;
|
let p = self.db.get_vm_payment(id).await?;
|
||||||
self.db.vm_payment_paid(&p).await?;
|
self.mark_payment_paid(&p).await
|
||||||
|
}
|
||||||
|
|
||||||
info!("VM payment {} for {}, paid", hex::encode(p.id), p.vm_id);
|
async fn mark_paid_ext_id(&self, external_id: &str) -> Result<()> {
|
||||||
self.tx.send(WorkJob::CheckVm { vm_id: p.vm_id })?;
|
let p = self.db.get_vm_payment_by_ext_id(external_id).await?;
|
||||||
|
self.mark_payment_paid(&p).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn mark_payment_paid(&self, payment: &VmPayment) -> Result<()> {
|
||||||
|
self.db.vm_payment_paid(&payment).await?;
|
||||||
|
|
||||||
|
info!("VM payment {} for {}, paid", hex::encode(&payment.id), payment.vm_id);
|
||||||
|
self.tx.send(WorkJob::CheckVm { vm_id: payment.vm_id })?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -46,11 +55,23 @@ impl NodeInvoiceHandler {
|
|||||||
let mut handler = self.node.subscribe_invoices(from_ph).await?;
|
let mut handler = self.node.subscribe_invoices(from_ph).await?;
|
||||||
while let Some(msg) = handler.next().await {
|
while let Some(msg) = handler.next().await {
|
||||||
match msg {
|
match msg {
|
||||||
InvoiceUpdate::Settled { payment_hash } => {
|
InvoiceUpdate::Settled {
|
||||||
let r_hash = hex::decode(payment_hash)?;
|
payment_hash,
|
||||||
|
external_id,
|
||||||
|
} => {
|
||||||
|
if let Some(h) = payment_hash {
|
||||||
|
let r_hash = hex::decode(h)?;
|
||||||
if let Err(e) = self.mark_paid(&r_hash).await {
|
if let Err(e) = self.mark_paid(&r_hash).await {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
}
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Some(e) = external_id {
|
||||||
|
if let Err(e) = self.mark_paid_ext_id(&e).await {
|
||||||
|
error!("{}", e);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
v => warn!("Unknown invoice update: {:?}", v),
|
v => warn!("Unknown invoice update: {:?}", v),
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,6 @@ pub fn listen_all_payments(
|
|||||||
|
|
||||||
#[cfg(feature = "revolut")]
|
#[cfg(feature = "revolut")]
|
||||||
{
|
{
|
||||||
|
|
||||||
use crate::payments::revolut::RevolutPaymentHandler;
|
use crate::payments::revolut::RevolutPaymentHandler;
|
||||||
if let Some(r) = &settings.revolut {
|
if let Some(r) = &settings.revolut {
|
||||||
let mut handler = RevolutPaymentHandler::new(
|
let mut handler = RevolutPaymentHandler::new(
|
||||||
|
@ -57,6 +57,9 @@ impl RevolutPaymentHandler {
|
|||||||
// listen to events
|
// listen to events
|
||||||
let mut listenr = WEBHOOK_BRIDGE.listen();
|
let mut listenr = WEBHOOK_BRIDGE.listen();
|
||||||
while let Ok(m) = listenr.recv().await {
|
while let Ok(m) = listenr.recv().await {
|
||||||
|
if m.endpoint != "/api/v1/webhook/revolut" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
let body: RevolutWebhook = serde_json::from_slice(m.body.as_slice())?;
|
let body: RevolutWebhook = serde_json::from_slice(m.body.as_slice())?;
|
||||||
info!("Received webhook {:?}", body);
|
info!("Received webhook {:?}", body);
|
||||||
if let Err(e) = verify_webhook(&secret, &m) {
|
if let Err(e) = verify_webhook(&secret, &m) {
|
||||||
|
@ -145,8 +145,8 @@ impl HostCapacityService {
|
|||||||
.map(|s| {
|
.map(|s| {
|
||||||
let usage = vm_resources
|
let usage = vm_resources
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(k, v)| s.id == v.disk_id)
|
.filter(|(_k, v)| s.id == v.disk_id)
|
||||||
.fold(0, |acc, (k, v)| acc + v.disk);
|
.fold(0, |acc, (_k, v)| acc + v.disk);
|
||||||
DiskCapacity {
|
DiskCapacity {
|
||||||
load_factor: host.load_factor,
|
load_factor: host.load_factor,
|
||||||
disk: s.clone(),
|
disk: s.clone(),
|
||||||
|
@ -404,7 +404,7 @@ impl LNVpsProvisioner {
|
|||||||
is_paid: false,
|
is_paid: false,
|
||||||
rate,
|
rate,
|
||||||
external_data: invoice.pr,
|
external_data: invoice.pr,
|
||||||
external_id: None,
|
external_id: invoice.external_id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PaymentMethod::Revolut => {
|
PaymentMethod::Revolut => {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::exchange::{Currency, CurrencyAmount, ExchangeRateService, Ticker, TickerRate};
|
use crate::exchange::{Currency, CurrencyAmount, ExchangeRateService, Ticker, TickerRate};
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Result};
|
||||||
use chrono::{DateTime, Days, Months, TimeDelta, Utc};
|
use chrono::{DateTime, Days, Months, TimeDelta, Utc};
|
||||||
use ipnetwork::IpNetwork;
|
use ipnetwork::IpNetwork;
|
||||||
use isocountry::CountryCode;
|
use isocountry::CountryCode;
|
||||||
|
Reference in New Issue
Block a user