feat: setup webhook bridge for bitvora
This commit is contained in:
@ -1,4 +1,13 @@
|
||||
use rocket::{routes, Route};
|
||||
|
||||
mod model;
|
||||
mod routes;
|
||||
mod webhook;
|
||||
|
||||
pub use routes::routes;
|
||||
pub fn routes() -> Vec<Route> {
|
||||
let mut r = routes::routes();
|
||||
r.append(&mut webhook::routes());
|
||||
r
|
||||
}
|
||||
|
||||
pub use webhook::WEBHOOK_BRIDGE;
|
84
src/api/webhook.rs
Normal file
84
src/api/webhook.rs
Normal file
@ -0,0 +1,84 @@
|
||||
use anyhow::anyhow;
|
||||
use lettre::message::header::Headers;
|
||||
use log::warn;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::Request;
|
||||
use rocket::data::{ByteUnit, FromData, ToByteUnit};
|
||||
use rocket::http::Status;
|
||||
use rocket::outcome::IntoOutcome;
|
||||
use rocket::request::{FromRequest, Outcome};
|
||||
use rocket::{post, routes, Data, Route};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::LazyLock;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// Messaging bridge for webhooks to other parts of the system (bitvora)
|
||||
pub static WEBHOOK_BRIDGE: LazyLock<WebhookBridge> = LazyLock::new(|| WebhookBridge::new());
|
||||
|
||||
pub fn routes() -> Vec<Route> {
|
||||
if cfg!(feature = "bitvora") {
|
||||
routes![bitvora_webhook]
|
||||
} else {
|
||||
routes![]
|
||||
}
|
||||
}
|
||||
|
||||
#[post("/api/v1/webhook/bitvora", data = "<req>")]
|
||||
async fn bitvora_webhook(req: WebhookMessage) -> Status {
|
||||
WEBHOOK_BRIDGE.send(req);
|
||||
Status::Ok
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WebhookMessage {
|
||||
pub body: Vec<u8>,
|
||||
pub headers: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[rocket::async_trait]
|
||||
impl<'r> FromData<'r> for WebhookMessage {
|
||||
type Error = ();
|
||||
|
||||
async fn from_data(
|
||||
req: &'r rocket::Request<'_>,
|
||||
data: Data<'r>,
|
||||
) -> rocket::data::Outcome<'r, Self, Self::Error> {
|
||||
let header = req
|
||||
.headers()
|
||||
.iter()
|
||||
.map(|v| (v.name.to_string(), v.value.to_string()))
|
||||
.collect();
|
||||
let body = if let Ok(d) = data.open(4.megabytes()).into_bytes().await {
|
||||
d
|
||||
} else {
|
||||
return rocket::data::Outcome::Error((Status::BadRequest, ()));
|
||||
};
|
||||
let msg = WebhookMessage {
|
||||
headers: header,
|
||||
body: body.value.to_vec(),
|
||||
};
|
||||
rocket::data::Outcome::Success(msg)
|
||||
}
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub struct WebhookBridge {
|
||||
tx: broadcast::Sender<WebhookMessage>,
|
||||
}
|
||||
|
||||
impl WebhookBridge {
|
||||
pub fn new() -> Self {
|
||||
let (tx, _rx) = broadcast::channel(100);
|
||||
Self { tx }
|
||||
}
|
||||
|
||||
pub fn send(&self, message: WebhookMessage) {
|
||||
if let Err(e) = self.tx.send(message) {
|
||||
warn!("Failed to send webhook message: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn listen(&self) -> broadcast::Receiver<WebhookMessage> {
|
||||
self.tx.subscribe()
|
||||
}
|
||||
}
|
@ -1,21 +1,25 @@
|
||||
use crate::api::WEBHOOK_BRIDGE;
|
||||
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
|
||||
use anyhow::bail;
|
||||
use futures::Stream;
|
||||
use futures::{Stream, StreamExt};
|
||||
use lnvps_db::async_trait;
|
||||
use log::debug;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::{Method, Url};
|
||||
use rocket::http::ext::IntoCollection;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::pin::Pin;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
pub struct BitvoraNode {
|
||||
base: Url,
|
||||
client: reqwest::Client,
|
||||
webhook_secret: String,
|
||||
}
|
||||
|
||||
impl BitvoraNode {
|
||||
pub fn new(api_token: &str) -> Self {
|
||||
pub fn new(api_token: &str, webhook_secret: &str) -> Self {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"Authorization",
|
||||
@ -30,6 +34,7 @@ impl BitvoraNode {
|
||||
Self {
|
||||
base: Url::parse("https://api.bitvora.com/").unwrap(),
|
||||
client,
|
||||
webhook_secret: webhook_secret.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,6 +100,13 @@ impl LightningNode for BitvoraNode {
|
||||
let rsp: BitvoraResponse<CreateInvoiceResponse> = self
|
||||
.req(Method::POST, "/v1/bitcoin/deposit/lightning-invoice", req)
|
||||
.await?;
|
||||
if rsp.status >= 400 {
|
||||
bail!(
|
||||
"API error: {} {}",
|
||||
rsp.status,
|
||||
rsp.message.unwrap_or_default()
|
||||
);
|
||||
}
|
||||
Ok(AddInvoiceResult {
|
||||
pr: rsp.data.payment_request,
|
||||
payment_hash: rsp.data.r_hash,
|
||||
@ -103,9 +115,11 @@ impl LightningNode for BitvoraNode {
|
||||
|
||||
async fn subscribe_invoices(
|
||||
&self,
|
||||
from_payment_hash: Option<Vec<u8>>,
|
||||
_from_payment_hash: Option<Vec<u8>>,
|
||||
) -> anyhow::Result<Pin<Box<dyn Stream<Item = InvoiceUpdate> + Send>>> {
|
||||
todo!()
|
||||
let rx = BroadcastStream::new(WEBHOOK_BRIDGE.listen());
|
||||
let mapped = rx.then(|r| async move { InvoiceUpdate::Unknown });
|
||||
Ok(Box::pin(mapped))
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,8 +132,7 @@ struct CreateInvoiceRequest {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
struct BitvoraResponse<T>
|
||||
{
|
||||
struct BitvoraResponse<T> {
|
||||
pub status: isize,
|
||||
pub message: Option<String>,
|
||||
pub data: T,
|
||||
|
@ -1,6 +1,5 @@
|
||||
use std::path::Path;
|
||||
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
|
||||
use crate::settings::LndConfig;
|
||||
use anyhow::Result;
|
||||
use fedimint_tonic_lnd::invoicesrpc::lookup_invoice_msg::InvoiceRef;
|
||||
use fedimint_tonic_lnd::invoicesrpc::LookupInvoiceMsg;
|
||||
|
@ -53,7 +53,7 @@ pub async fn get_node(settings: &Settings) -> Result<Arc<dyn LightningNode>> {
|
||||
macaroon,
|
||||
} => Ok(Arc::new(lnd::LndNode::new(url, cert, macaroon).await?)),
|
||||
#[cfg(feature = "bitvora")]
|
||||
LightningConfig::Bitvora { token } => Ok(Arc::new(bitvora::BitvoraNode::new(token))),
|
||||
LightningConfig::Bitvora { token, webhook_secret } => Ok(Arc::new(bitvora::BitvoraNode::new(token, webhook_secret))),
|
||||
_ => anyhow::bail!("Unsupported lightning config!"),
|
||||
}
|
||||
}
|
||||
|
@ -52,8 +52,10 @@ pub enum LightningConfig {
|
||||
cert: PathBuf,
|
||||
macaroon: PathBuf,
|
||||
},
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
Bitvora {
|
||||
token: String,
|
||||
webhook_secret: String,
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user