feat: exchange rates

This commit is contained in:
2024-11-26 21:08:20 +00:00
parent 7d8956e7c7
commit 088f22cea4
12 changed files with 383 additions and 74 deletions

View File

@ -3,6 +3,7 @@ use config::{Config, File};
use fedimint_tonic_lnd::connect;
use lnvps::api;
use lnvps::cors::CORS;
use lnvps::exchange::ExchangeRateCache;
use lnvps::invoice::InvoiceHandler;
use lnvps::provisioner::lnvps::LNVpsProvisioner;
use lnvps::provisioner::Provisioner;
@ -42,8 +43,10 @@ async fn main() -> Result<(), Error> {
let db = LNVpsDbMysql::new(&settings.db).await?;
db.migrate().await?;
let exchange = ExchangeRateCache::new();
let lnd = connect(settings.lnd.url, settings.lnd.cert, settings.lnd.macaroon).await?;
let provisioner = LNVpsProvisioner::new(db.clone(), lnd.clone());
let provisioner = LNVpsProvisioner::new(db.clone(), lnd.clone(), exchange.clone());
#[cfg(debug_assertions)]
{
let setup_script = include_str!("../../dev_setup.sql");
@ -52,7 +55,7 @@ async fn main() -> Result<(), Error> {
}
let status = VmStateCache::new();
let mut worker = Worker::new(settings.read_only, db.clone(), lnd.clone(), status.clone());
let mut worker = Worker::new(settings.read_only, db.clone(), lnd.clone(), status.clone(), exchange.clone());
let sender = worker.sender();
tokio::spawn(async move {
loop {
@ -84,6 +87,21 @@ async fn main() -> Result<(), Error> {
tokio::time::sleep(Duration::from_secs(30)).await;
}
});
// refresh rates every 1min
let rates = exchange.clone();
tokio::spawn(async move {
loop {
match rates.fetch_rates().await {
Ok(z) => {
for r in z {
rates.set_rate(r.0, r.1).await;
}
}
Err(e) => error!("Failed to fetch rates: {}", e)
}
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
let db: Box<dyn LNVpsDb> = Box::new(db.clone());
let pv: Box<dyn Provisioner> = Box::new(provisioner);
@ -101,6 +119,7 @@ async fn main() -> Result<(), Error> {
.manage(db)
.manage(pv)
.manage(status)
.manage(exchange)
.mount("/", api::routes())
.launch()
.await

105
src/exchange.rs Normal file
View File

@ -0,0 +1,105 @@
use anyhow::{Error, Result};
use log::info;
use rocket::serde::Deserialize;
use std::collections::HashMap;
use std::fmt::{write, Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum Currency {
EUR,
BTC,
USD,
}
impl Display for Currency {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Currency::EUR => write!(f, "EUR"),
Currency::BTC => write!(f, "BTC"),
Currency::USD => write!(f, "USD"),
}
}
}
impl FromStr for Currency {
type Err = ();
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"eur" => Ok(Currency::EUR),
"usd" => Ok(Currency::USD),
"btc" => Ok(Currency::BTC),
_ => Err(()),
}
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Ticker(Currency, Currency);
impl Ticker {
pub fn btc_rate(cur: &str) -> Result<Self> {
let to_cur: Currency = cur.parse().map_err(|_| Error::msg(""))?;
Ok(Ticker(Currency::BTC, to_cur))
}
}
impl Display for Ticker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.0, self.1)
}
}
#[derive(Debug, PartialEq)]
pub struct TickerRate(pub Ticker, pub f32);
#[derive(Clone)]
pub struct ExchangeRateCache {
cache: Arc<RwLock<HashMap<Ticker, f32>>>,
}
#[derive(Deserialize)]
struct MempoolRates {
pub time: u64,
#[serde(rename = "USD")]
pub usd: Option<f32>,
#[serde(rename = "EUR")]
pub eur: Option<f32>,
}
impl ExchangeRateCache {
pub fn new() -> Self {
Self { cache: Arc::new(RwLock::new(HashMap::new())) }
}
pub async fn fetch_rates(&self) -> Result<Vec<TickerRate>> {
let rsp = reqwest::get("https://mempool.space/api/v1/prices")
.await?
.text().await?;
let rates: MempoolRates = serde_json::from_str(&rsp)?;
let mut ret = vec![];
if let Some(usd) = rates.usd {
ret.push(TickerRate(Ticker(Currency::BTC, Currency::USD), usd));
}
if let Some(eur) = rates.eur {
ret.push(TickerRate(Ticker(Currency::BTC, Currency::EUR), eur));
}
Ok(ret)
}
pub async fn set_rate(&self, ticker: Ticker, amount: f32) {
let mut cache = self.cache.write().await;
info!("{}: {}", &ticker, amount);
cache.insert(ticker, amount);
}
pub async fn get_rate(&self, ticker: Ticker) -> Option<f32> {
let cache = self.cache.read().await;
cache.get(&ticker).cloned()
}
}

View File

@ -6,3 +6,4 @@ pub mod nip98;
pub mod provisioner;
pub mod status;
pub mod worker;
pub mod exchange;

View File

@ -1,3 +1,4 @@
use crate::exchange::{Currency, ExchangeRateCache, Ticker};
use crate::host::proxmox::ProxmoxClient;
use crate::provisioner::Provisioner;
use anyhow::{bail, Result};
@ -21,13 +22,15 @@ use std::time::Duration;
pub struct LNVpsProvisioner {
db: Box<dyn LNVpsDb>,
lnd: Client,
rates: ExchangeRateCache,
}
impl LNVpsProvisioner {
pub fn new<D: LNVpsDb + 'static>(db: D, lnd: Client) -> Self {
pub fn new<D: LNVpsDb + 'static>(db: D, lnd: Client, rates: ExchangeRateCache) -> Self {
Self {
db: Box::new(db),
lnd,
rates,
}
}
@ -155,22 +158,25 @@ impl Provisioner for LNVpsProvisioner {
.add(Months::new((12 * cost_plan.interval_amount) as u32)),
};
const BTC_MILLI_SATS: u64 = 100_000_000_000;
const BTC_SATS: f64 = 100_000_000.0;
const INVOICE_EXPIRE: i64 = 3600;
let cost = cost_plan.amount
* match cost_plan.currency.as_str() {
"EUR" => 1_100_000, //TODO: rates
"BTC" => 1, // BTC amounts are always millisats
c => bail!("Unknown currency {c}"),
};
info!("Creating invoice for {vm_id} for {cost} mSats");
let ticker = Ticker::btc_rate(cost_plan.currency.as_str())?;
let rate = if let Some(r) = self.rates.get_rate(ticker).await {
r
} else {
bail!("No exchange rate found")
};
let cost_btc = cost_plan.amount as f32 / rate;
let cost_msat = (cost_btc as f64 * BTC_SATS) as i64 * 1000;
info!("Creating invoice for {vm_id} for {} sats", cost_msat / 1000);
let mut lnd = self.lnd.clone();
let invoice = lnd
.lightning()
.add_invoice(Invoice {
memo: format!("VM renewal {vm_id} to {new_expire}"),
value_msat: cost as i64,
value_msat: cost_msat,
expiry: INVOICE_EXPIRE,
..Default::default()
})
@ -182,10 +188,11 @@ impl Provisioner for LNVpsProvisioner {
vm_id,
created: Utc::now(),
expires: Utc::now().add(Duration::from_secs(INVOICE_EXPIRE as u64)),
amount: cost,
amount: cost_msat as u64,
invoice: invoice.payment_request.clone(),
time_value: (new_expire - vm.expires).num_seconds() as u64,
is_paid: false,
rate,
..Default::default()
};
self.db.insert_vm_payment(&vm_payment).await?;

View File

@ -1,3 +1,4 @@
use crate::exchange::ExchangeRateCache;
use crate::host::proxmox::{CreateVm, ProxmoxClient, VmBios, VmStatus};
use crate::provisioner::lnvps::LNVpsProvisioner;
use crate::provisioner::Provisioner;
@ -35,9 +36,10 @@ impl Worker {
db: D,
lnd: Client,
vm_state_cache: VmStateCache,
rates: ExchangeRateCache,
) -> Self {
let (tx, rx) = unbounded_channel();
let p = LNVpsProvisioner::new(db.clone(), lnd.clone());
let p = LNVpsProvisioner::new(db.clone(), lnd.clone(), rates);
Self {
read_only,
db: Box::new(db),