Compare commits

...

21 Commits

Author SHA1 Message Date
9fb4a38e72 feat: vmdvm
Some checks failed
continuous-integration/drone/push Build is failing
2025-03-24 11:42:14 +00:00
cbafca8da7 feat: terminal (wip) 2025-03-24 09:57:30 +00:00
9ee4232706 fix: use lightning_invoice_id
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-20 21:06:24 +00:00
39622315be fix: add missing trait impl
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-20 12:33:20 +00:00
b190fcdd1c feat: re-install vm
Some checks failed
continuous-integration/drone/push Build is failing
closes #10
2025-03-20 12:30:34 +00:00
6b12a9bddb chore: cargo update
Some checks failed
continuous-integration/drone/push Build is failing
2025-03-20 10:50:06 +00:00
6de4471861 fix: ip assignment prefix size
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-20 10:22:57 +00:00
3527742992 fix: webhook
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-20 09:36:38 +00:00
be4a981bea chore: update readme
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-19 12:38:57 +00:00
f934bb3132 fix: mock
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-18 21:58:12 +00:00
6c7ae6ac89 feat: complete bitvora webhook
Some checks failed
continuous-integration/drone/push Build is failing
2025-03-18 21:54:41 +00:00
5c57abb9c1 fix: config
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-18 15:53:02 +00:00
2d55392050 feat: update revolut order data
Some checks failed
continuous-integration/drone Build is failing
2025-03-14 10:03:52 +00:00
02d606d60c feat: taxes
closes #18
2025-03-11 15:58:34 +00:00
029f2cb6e1 fix: fiat rate 0.01 2025-03-11 12:44:01 +00:00
45dd0c4398 feat: fiat payments (revolut)
ref: #24
2025-03-11 12:42:25 +00:00
1c282e460f Merge branch 'prices' 2025-03-10 15:10:08 +00:00
a2e08c5965 fix: use reverse value as reverse name 2025-03-10 12:40:25 +00:00
b9f21c09bd fix: use rev_2_fwd 2025-03-10 12:37:29 +00:00
d94ca9e1bb feat: add data migration for dns entries 2025-03-10 12:33:11 +00:00
9606b91e6d fix: allow invalid certs 2025-03-08 18:31:44 +00:00
48 changed files with 3187 additions and 879 deletions

643
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,18 +7,20 @@ edition = "2021"
name = "api"
[features]
default = ["mikrotik", "nostr-dm", "proxmox", "lnd", "cloudflare"]
default = ["mikrotik", "nostr-dm", "nostr-dvm", "proxmox", "lnd", "cloudflare", "revolut", "bitvora"]
mikrotik = ["dep:reqwest"]
nostr-dm = ["dep:nostr-sdk"]
nostr-dvm = ["dep:nostr-sdk"]
proxmox = ["dep:reqwest", "dep:ssh2", "dep:tokio-tungstenite"]
libvirt = ["dep:virt"]
lnd = ["dep:fedimint-tonic-lnd"]
bitvora = ["dep:reqwest", "dep:tokio-stream"]
cloudflare = ["dep:reqwest"]
revolut = ["dep:reqwest", "dep:sha2", "dep:hmac"]
[dependencies]
lnvps_db = { path = "lnvps_db" }
tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util"] }
anyhow = "1.0.83"
config = { version = "0.15.8", features = ["yaml"] }
log = "0.4.21"
@ -36,14 +38,15 @@ rand = "0.9.0"
clap = { version = "4.5.21", features = ["derive"] }
ssh-key = "0.6.7"
lettre = { version = "0.11.10", features = ["tokio1-native-tls"] }
ws = { package = "rocket_ws", version = "0.1.0" }
ws = { package = "rocket_ws", version = "0.1.1" }
native-tls = "0.2.12"
hex = "0.4.3"
futures = "0.3.31"
isocountry = "0.3.2"
#nostr-dm
nostr = { version = "0.39.0", default-features = false, features = ["std"] }
nostr-sdk = { version = "0.39.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
nostr = { version = "0.40.0", default-features = false, features = ["std"] }
nostr-sdk = { version = "0.40.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
#proxmox
tokio-tungstenite = { version = "^0.21", features = ["native-tls"], optional = true }
@ -57,4 +60,8 @@ virt = { version = "0.4.2", optional = true }
fedimint-tonic-lnd = { version = "0.2.0", default-features = false, features = ["invoicesrpc"], optional = true }
#bitvora
tokio-stream = { version = "0.1.17", features = ["sync"], optional = true }
tokio-stream = { version = "0.1.17", features = ["sync"], optional = true }
#revolut
sha2 = { version = "0.10.8", optional = true }
hmac = { version = "0.12.1", optional = true }

View File

@ -4,11 +4,15 @@ A bitcoin powered VPS system.
## Requirements
- MySql database
- Lightning node:
- LND
- [Bitvora](https://bitvora.com?r=lnvps)
- Proxmox server
- MySQL database
- Payments:
- Bitcoin:
- LND
- [Bitvora](https://bitvora.com?r=lnvps)
- Fiat:
- [RevolutPay](https://www.revolut.com/business/revolut-pay/)
- VM Backend:
- Proxmox
## Required Config
@ -127,4 +131,14 @@ dns:
forward-zone-id: "my-forward-zone-id"
# API token to add/remove DNS records to this zone
token: "my-api-token"
```
```
### Taxes
To charge taxes add the following config, the values are percentage whole numbers:
```yaml
tax-rate:
IE: 23
US: 15
```
Taxes are charged based on the users specified country

View File

@ -5,6 +5,7 @@ lightning:
cert: "/home/kieran/.polar/networks/2/volumes/lnd/alice/tls.cert"
macaroon: "/home/kieran/.polar/networks/2/volumes/lnd/alice/data/chain/bitcoin/regtest/admin.macaroon"
delete-after: 3
public-url: "https://api.lnvps.net"
provisioner:
proxmox:
read-only: false

View File

@ -2,7 +2,7 @@ volumes:
db:
services:
db:
image: mariadb
image: docker.io/mariadb
restart: unless-stopped
environment:
- "MARIADB_ROOT_PASSWORD=root"

455
lnvps_db/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
alter table vm_payment
add column currency varchar(5) not null default 'BTC',
add column payment_method smallint unsigned not null default 0,
add column external_id varchar(255),
change invoice external_data varchar (4096) NOT NULL,
drop column settle_index;

View File

@ -0,0 +1,4 @@
alter table vm_payment
add column tax bigint unsigned not null;
alter table users
add column country_code varchar(3) not null default 'USA';

View File

@ -0,0 +1,5 @@
alter table users
change column country_code country_code varchar (3);
-- assume country_code was not actually set until now
update users
set country_code = null;

View File

@ -38,9 +38,15 @@ pub trait LNVpsDb: Sync + Send {
/// List a users ssh keys
async fn list_user_ssh_key(&self, user_id: u64) -> Result<Vec<UserSshKey>>;
/// Get VM host regions
async fn list_host_region(&self) -> Result<Vec<VmHostRegion>>;
/// Get VM host region by id
async fn get_host_region(&self, id: u64) -> Result<VmHostRegion>;
/// Get VM host region by name
async fn get_host_region_by_name(&self, name: &str) -> Result<VmHostRegion>;
/// List VM's owned by a specific user
async fn list_hosts(&self) -> Result<Vec<VmHost>>;
@ -131,6 +137,9 @@ pub trait LNVpsDb: Sync + Send {
/// Get VM payment by payment id
async fn get_vm_payment(&self, id: &Vec<u8>) -> Result<VmPayment>;
/// Get VM payment by payment id
async fn get_vm_payment_by_ext_id(&self, id: &str) -> Result<VmPayment>;
/// Update a VM payment record
async fn update_vm_payment(&self, vm_payment: &VmPayment) -> Result<()>;

View File

@ -1,8 +1,9 @@
use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use chrono::{DateTime, Utc};
use sqlx::FromRow;
use sqlx::{FromRow, Type};
use std::fmt::{Display, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
use url::Url;
#[derive(FromRow, Clone, Debug)]
@ -20,6 +21,8 @@ pub struct User {
pub contact_nip17: bool,
/// If user should be contacted via email for notifications
pub contact_email: bool,
/// Users country
pub country_code: Option<String>,
}
#[derive(FromRow, Clone, Debug, Default)]
@ -100,6 +103,18 @@ pub enum DiskType {
SSD = 1,
}
impl FromStr for DiskType {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"hdd" => Ok(DiskType::HDD),
"ssd" => Ok(DiskType::SSD),
_ => Err(anyhow!("unknown disk type {}", s)),
}
}
}
#[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)]
#[repr(u16)]
pub enum DiskInterface {
@ -109,6 +124,19 @@ pub enum DiskInterface {
PCIe = 2,
}
impl FromStr for DiskInterface {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"sata" => Ok(DiskInterface::SATA),
"scsi" => Ok(DiskInterface::SCSI),
"pcie" => Ok(DiskInterface::PCIe),
_ => Err(anyhow!("unknown disk interface {}", s)),
}
}
}
#[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)]
#[repr(u16)]
pub enum OsDistribution {
@ -309,17 +337,55 @@ impl Display for VmIpAssignment {
#[derive(FromRow, Clone, Debug, Default)]
pub struct VmPayment {
/// Payment hash
pub id: Vec<u8>,
pub vm_id: u64,
pub created: DateTime<Utc>,
pub expires: DateTime<Utc>,
pub amount: u64,
pub invoice: String,
pub currency: String,
pub payment_method: PaymentMethod,
/// External data (invoice / json)
pub external_data: String,
/// External id on other system
pub external_id: Option<String>,
pub is_paid: bool,
/// Exchange rate
/// TODO: handle other base currencies
/// Exchange rate back to base currency (EUR)
pub rate: f32,
/// Number of seconds this payment will add to vm expiry
pub time_value: u64,
pub settle_index: Option<u64>,
/// Taxes to charge on payment
pub tax: u64,
}
#[derive(Type, Clone, Copy, Debug, Default, PartialEq)]
#[repr(u16)]
pub enum PaymentMethod {
#[default]
Lightning,
Revolut,
Paypal,
}
impl Display for PaymentMethod {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
PaymentMethod::Lightning => write!(f, "Lightning"),
PaymentMethod::Revolut => write!(f, "Revolut"),
PaymentMethod::Paypal => write!(f, "PayPal"),
}
}
}
impl FromStr for PaymentMethod {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"lightning" => Ok(PaymentMethod::Lightning),
"revolut" => Ok(PaymentMethod::Revolut),
"paypal" => Ok(PaymentMethod::Paypal),
_ => bail!("Unknown payment method: {}", s),
}
}
}

View File

@ -60,11 +60,12 @@ impl LNVpsDb for LNVpsDbMysql {
async fn update_user(&self, user: &User) -> Result<()> {
sqlx::query(
"update users set email = ?, contact_nip17 = ?, contact_email = ? where id = ?",
"update users set email=?, contact_nip17=?, contact_email=?, country_code=? where id = ?",
)
.bind(&user.email)
.bind(user.contact_nip17)
.bind(user.contact_email)
.bind(&user.country_code)
.bind(user.id)
.execute(&self.db)
.await?;
@ -108,6 +109,13 @@ impl LNVpsDb for LNVpsDbMysql {
.map_err(Error::new)
}
async fn list_host_region(&self) -> Result<Vec<VmHostRegion>> {
sqlx::query_as("select * from vm_host_region where enabled=1")
.fetch_all(&self.db)
.await
.map_err(Error::new)
}
async fn get_host_region(&self, id: u64) -> Result<VmHostRegion> {
sqlx::query_as("select * from vm_host_region where id=?")
.bind(id)
@ -116,6 +124,14 @@ impl LNVpsDb for LNVpsDbMysql {
.map_err(Error::new)
}
async fn get_host_region_by_name(&self, name: &str) -> Result<VmHostRegion> {
sqlx::query_as("select * from vm_host_region where name like ?")
.bind(name)
.fetch_one(&self.db)
.await
.map_err(Error::new)
}
async fn list_hosts(&self) -> Result<Vec<VmHost>> {
sqlx::query_as("select * from vm_host where enabled = 1")
.fetch_all(&self.db)
@ -387,16 +403,20 @@ impl LNVpsDb for LNVpsDbMysql {
}
async fn insert_vm_payment(&self, vm_payment: &VmPayment) -> Result<()> {
sqlx::query("insert into vm_payment(id,vm_id,created,expires,amount,invoice,time_value,is_paid,rate) values(?,?,?,?,?,?,?,?,?)")
sqlx::query("insert into vm_payment(id,vm_id,created,expires,amount,tax,currency,payment_method,time_value,is_paid,rate,external_id,external_data) values(?,?,?,?,?,?,?,?,?,?,?,?,?)")
.bind(&vm_payment.id)
.bind(vm_payment.vm_id)
.bind(vm_payment.created)
.bind(vm_payment.expires)
.bind(vm_payment.amount)
.bind(&vm_payment.invoice)
.bind(vm_payment.tax)
.bind(&vm_payment.currency)
.bind(&vm_payment.payment_method)
.bind(vm_payment.time_value)
.bind(vm_payment.is_paid)
.bind(vm_payment.rate)
.bind(&vm_payment.external_id)
.bind(&vm_payment.external_data)
.execute(&self.db)
.await
.map_err(Error::new)?;
@ -411,6 +431,14 @@ impl LNVpsDb for LNVpsDbMysql {
.map_err(Error::new)
}
async fn get_vm_payment_by_ext_id(&self, id: &str) -> Result<VmPayment> {
sqlx::query_as("select * from vm_payment where external_id=?")
.bind(id)
.fetch_one(&self.db)
.await
.map_err(Error::new)
}
async fn update_vm_payment(&self, vm_payment: &VmPayment) -> Result<()> {
sqlx::query("update vm_payment set is_paid = ? where id = ?")
.bind(vm_payment.is_paid)
@ -428,8 +456,8 @@ impl LNVpsDb for LNVpsDbMysql {
let mut tx = self.db.begin().await?;
sqlx::query("update vm_payment set is_paid = true, settle_index = ? where id = ?")
.bind(vm_payment.settle_index)
sqlx::query("update vm_payment set is_paid = true, external_data = ? where id = ?")
.bind(&vm_payment.external_data)
.bind(&vm_payment.id)
.execute(&mut *tx)
.await?;
@ -446,7 +474,7 @@ impl LNVpsDb for LNVpsDbMysql {
async fn last_paid_invoice(&self) -> Result<Option<VmPayment>> {
sqlx::query_as(
"select * from vm_payment where is_paid = true order by settle_index desc limit 1",
"select * from vm_payment where is_paid = true order by created desc limit 1",
)
.fetch_optional(&self.db)
.await

View File

@ -10,4 +10,5 @@ pub fn routes() -> Vec<Route> {
r
}
pub use webhook::WebhookMessage;
pub use webhook::WEBHOOK_BRIDGE;

View File

@ -1,16 +1,17 @@
use crate::exchange::{alt_prices, Currency, CurrencyAmount, ExchangeRateService};
use crate::provisioner::{PricingData, PricingEngine};
use crate::provisioner::PricingEngine;
use crate::status::VmState;
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{anyhow, bail, Result};
use chrono::{DateTime, Utc};
use ipnetwork::IpNetwork;
use lnvps_db::{
LNVpsDb, Vm, VmCostPlan, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate, VmHost,
LNVpsDb, PaymentMethod, Vm, VmCostPlan, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate,
VmHostRegion, VmTemplate,
};
use nostr::util::hex;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
@ -95,9 +96,9 @@ impl From<lnvps_db::DiskType> for DiskType {
}
}
impl Into<lnvps_db::DiskType> for DiskType {
fn into(self) -> lnvps_db::DiskType {
match self {
impl From<DiskType> for lnvps_db::DiskType {
fn from(val: DiskType) -> Self {
match val {
DiskType::HDD => lnvps_db::DiskType::HDD,
DiskType::SSD => lnvps_db::DiskType::SSD,
}
@ -143,12 +144,13 @@ impl ApiTemplatesResponse {
pub async fn expand_pricing(&mut self, rates: &Arc<dyn ExchangeRateService>) -> Result<()> {
let rates = rates.list_rates().await?;
for mut template in &mut self.templates {
let list_price = CurrencyAmount(template.cost_plan.currency, template.cost_plan.amount);
for template in &mut self.templates {
let list_price =
CurrencyAmount::from_f32(template.cost_plan.currency, template.cost_plan.amount);
for alt_price in alt_prices(&rates, list_price) {
template.cost_plan.other_price.push(ApiPrice {
currency: alt_price.0,
amount: alt_price.1,
amount: alt_price.value_f32(),
});
}
}
@ -252,7 +254,7 @@ impl From<CurrencyAmount> for ApiPrice {
fn from(value: CurrencyAmount) -> Self {
Self {
currency: value.0,
amount: value.1,
amount: value.value_f32(),
}
}
}
@ -335,8 +337,8 @@ impl ApiVmTemplate {
cpu: template.cpu,
memory: template.memory,
disk_size: template.disk_size,
disk_type: template.disk_type.clone().into(),
disk_interface: template.disk_interface.clone().into(),
disk_type: template.disk_type.into(),
disk_interface: template.disk_interface.into(),
cost_plan: ApiVmCostPlan {
id: cost_plan.id,
name: cost_plan.name.clone(),
@ -402,6 +404,7 @@ pub struct AccountPatchRequest {
pub email: Option<String>,
pub contact_nip17: bool,
pub contact_email: bool,
pub country_code: Option<String>,
}
#[derive(Serialize, Deserialize, JsonSchema)]
@ -468,14 +471,15 @@ impl From<lnvps_db::VmOsImage> for ApiVmOsImage {
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct ApiVmPayment {
/// Payment hash hex
pub id: String,
pub vm_id: u64,
pub created: DateTime<Utc>,
pub expires: DateTime<Utc>,
pub amount: u64,
pub invoice: String,
pub tax: u64,
pub currency: String,
pub is_paid: bool,
pub data: ApiPaymentData,
}
impl From<lnvps_db::VmPayment> for ApiVmPayment {
@ -486,8 +490,65 @@ impl From<lnvps_db::VmPayment> for ApiVmPayment {
created: value.created,
expires: value.expires,
amount: value.amount,
invoice: value.invoice,
tax: value.tax,
currency: value.currency,
is_paid: value.is_paid,
data: match &value.payment_method {
PaymentMethod::Lightning => ApiPaymentData::Lightning(value.external_data),
PaymentMethod::Revolut => {
#[derive(Deserialize)]
struct RevolutData {
pub token: String,
}
let data: RevolutData = serde_json::from_str(&value.external_data).unwrap();
ApiPaymentData::Revolut { token: data.token }
}
PaymentMethod::Paypal => {
todo!()
}
},
}
}
}
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct ApiPaymentInfo {
pub name: ApiPaymentMethod,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub metadata: HashMap<String, String>,
pub currencies: Vec<Currency>,
}
/// Payment data related to the payment method
#[derive(Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum ApiPaymentData {
/// Just an LN invoice
Lightning(String),
/// Revolut order data
Revolut {
/// Order token
token: String,
},
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum ApiPaymentMethod {
#[default]
Lightning,
Revolut,
Paypal,
}
impl From<PaymentMethod> for ApiPaymentMethod {
fn from(value: PaymentMethod) -> Self {
match value {
PaymentMethod::Lightning => ApiPaymentMethod::Lightning,
PaymentMethod::Revolut => ApiPaymentMethod::Revolut,
PaymentMethod::Paypal => ApiPaymentMethod::Paypal,
}
}
}

View File

@ -1,23 +1,27 @@
use crate::api::model::{
AccountPatchRequest, ApiCustomTemplateDiskParam, ApiCustomTemplateParams, ApiCustomVmOrder,
ApiCustomVmRequest, ApiPrice, ApiTemplatesResponse, ApiUserSshKey, ApiVmHostRegion,
AccountPatchRequest, ApiCustomTemplateParams, ApiCustomVmOrder, ApiCustomVmRequest,
ApiPaymentInfo, ApiPaymentMethod, ApiPrice, ApiTemplatesResponse, ApiUserSshKey,
ApiVmIpAssignment, ApiVmOsImage, ApiVmPayment, ApiVmStatus, ApiVmTemplate, CreateSshKey,
CreateVmRequest, VMPatchRequest,
};
use crate::exchange::ExchangeRateService;
use crate::exchange::{Currency, ExchangeRateService};
use crate::host::{get_host_client, FullVmInfo, TimeSeries, TimeSeriesData};
use crate::nip98::Nip98Auth;
use crate::provisioner::{HostCapacityService, LNVpsProvisioner, PricingEngine};
use crate::settings::Settings;
use crate::status::{VmState, VmStateCache};
use crate::worker::WorkJob;
use anyhow::{Context, Result};
use anyhow::{bail, Result};
use futures::future::join_all;
use lnvps_db::{IpRange, LNVpsDb, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate};
use futures::{SinkExt, StreamExt};
use isocountry::CountryCode;
use lnvps_db::{
IpRange, LNVpsDb, PaymentMethod, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate,
};
use log::{error, info};
use nostr::util::hex;
use rocket::futures::{SinkExt, StreamExt};
use rocket::serde::json::Json;
use rocket::{get, patch, post, Responder, Route, State};
use rocket::{get, patch, post, routes, Responder, Route, State};
use rocket_okapi::gen::OpenApiGenerator;
use rocket_okapi::okapi::openapi3::Responses;
use rocket_okapi::response::OpenApiResponderInner;
@ -26,11 +30,15 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use ssh_key::PublicKey;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{Sender, UnboundedSender};
pub fn routes() -> Vec<Route> {
openapi_get_routes![
let mut routes = vec![];
routes.append(&mut openapi_get_routes![
v1_get_account,
v1_patch_account,
v1_list_vms,
@ -45,11 +53,17 @@ pub fn routes() -> Vec<Route> {
v1_start_vm,
v1_stop_vm,
v1_restart_vm,
v1_reinstall_vm,
v1_patch_vm,
v1_time_series,
v1_custom_template_calc,
v1_create_custom_vm_order
]
v1_create_custom_vm_order,
v1_get_payment_methods
]);
routes.append(&mut routes![v1_terminal_proxy]);
routes
}
type ApiResult<T> = Result<Json<ApiData<T>>, ApiError>;
@ -103,6 +117,11 @@ async fn v1_patch_account(
user.email = req.email.clone();
user.contact_nip17 = req.contact_nip17;
user.contact_email = req.contact_email;
user.country_code = req
.country_code
.as_ref()
.and_then(|c| CountryCode::for_alpha3(c).ok())
.map(|c| c.alpha3().to_string());
db.update_user(&user).await?;
ApiData::ok(())
@ -123,6 +142,7 @@ async fn v1_get_account(
email: user.email,
contact_nip17: user.contact_nip17,
contact_email: user.contact_email,
country_code: user.country_code,
})
}
@ -143,7 +163,7 @@ async fn vm_to_status(
.map(|i| (i.id, i))
.collect();
let template = ApiVmTemplate::from_vm(&db, &vm).await?;
let template = ApiVmTemplate::from_vm(db, &vm).await?;
Ok(ApiVmStatus {
id: vm.id,
created: vm.created,
@ -309,7 +329,7 @@ async fn v1_list_vm_templates(
})
.collect();
let custom_templates: Vec<VmCustomPricing> =
join_all(regions.iter().map(|(k, _)| db.list_custom_pricing(*k)))
join_all(regions.keys().map(|k| db.list_custom_pricing(*k)))
.await
.into_iter()
.filter_map(|r| r.ok())
@ -344,17 +364,15 @@ async fn v1_list_vm_templates(
.into_iter()
.filter_map(|t| {
let region = regions.get(&t.region_id)?;
Some(
ApiCustomTemplateParams::from(
&t,
&custom_template_disks,
region,
max_cpu,
max_memory,
max_disk,
)
.ok()?,
ApiCustomTemplateParams::from(
&t,
&custom_template_disks,
region,
max_cpu,
max_memory,
max_disk,
)
.ok()
})
.collect(),
)
@ -376,7 +394,7 @@ async fn v1_custom_template_calc(
let price = PricingEngine::get_custom_vm_cost_amount(db, 0, &template).await?;
ApiData::ok(ApiPrice {
currency: price.currency.clone(),
currency: price.currency,
amount: price.total(),
})
}
@ -484,12 +502,13 @@ async fn v1_create_vm_order(
/// Renew(Extend) a VM
#[openapi(tag = "VM")]
#[get("/api/v1/vm/<id>/renew")]
#[get("/api/v1/vm/<id>/renew?<method>")]
async fn v1_renew_vm(
auth: Nip98Auth,
db: &State<Arc<dyn LNVpsDb>>,
provisioner: &State<Arc<LNVpsProvisioner>>,
id: u64,
method: Option<&str>,
) -> ApiResult<ApiVmPayment> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
@ -498,7 +517,14 @@ async fn v1_renew_vm(
return ApiData::err("VM does not belong to you");
}
let rsp = provisioner.renew(id).await?;
let rsp = provisioner
.renew(
id,
method
.and_then(|m| PaymentMethod::from_str(m).ok())
.unwrap_or(PaymentMethod::Lightning),
)
.await?;
ApiData::ok(rsp.into())
}
@ -576,6 +602,32 @@ async fn v1_restart_vm(
ApiData::ok(())
}
/// Re-install a VM
#[openapi(tag = "VM")]
#[patch("/api/v1/vm/<id>/re-install")]
async fn v1_reinstall_vm(
auth: Nip98Auth,
db: &State<Arc<dyn LNVpsDb>>,
settings: &State<Settings>,
worker: &State<UnboundedSender<WorkJob>>,
id: u64,
) -> ApiResult<()> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
let vm = db.get_vm(id).await?;
if uid != vm.user_id {
return ApiData::err("VM does not belong to you");
}
let host = db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &settings.provisioner)?;
let info = FullVmInfo::load(vm.id, (*db).clone()).await?;
client.reinstall_vm(&info).await?;
worker.send(WorkJob::CheckVm { vm_id: id })?;
ApiData::ok(())
}
#[openapi(tag = "VM")]
#[get("/api/v1/vm/<id>/time-series")]
async fn v1_time_series(
@ -596,6 +648,137 @@ async fn v1_time_series(
ApiData::ok(client.get_time_series_data(&vm, TimeSeries::Hourly).await?)
}
#[get("/api/v1/vm/<id>/console?<auth>")]
async fn v1_terminal_proxy(
auth: &str,
db: &State<Arc<dyn LNVpsDb>>,
settings: &State<Settings>,
id: u64,
ws: ws::WebSocket,
) -> Result<ws::Channel<'static>, &'static str> {
let auth = Nip98Auth::from_base64(auth).map_err(|e| "Missing or invalid auth param")?;
if auth
.check(&format!("/api/v1/vm/{id}/console"), "GET")
.is_err()
{
return Err("Invalid auth event");
}
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await.map_err(|_| "Insert failed")?;
let vm = db.get_vm(id).await.map_err(|_| "VM not found")?;
if uid != vm.user_id {
return Err("VM does not belong to you");
}
let host = db
.get_host(vm.host_id)
.await
.map_err(|_| "VM host not found")?;
let client =
get_host_client(&host, &settings.provisioner).map_err(|_| "Failed to get host client")?;
let mut ws_upstream = client.connect_terminal(&vm).await.map_err(|e| {
error!("Failed to start terminal proxy: {}", e);
"Failed to open terminal proxy"
})?;
let ws = ws.config(Default::default());
Ok(ws.channel(move |mut stream| {
use ws::*;
Box::pin(async move {
async fn process_client<E>(
msg: Result<Message, E>,
ws_upstream: &mut Sender<Vec<u8>>,
) -> Result<()>
where
E: Display,
{
match msg {
Ok(m) => {
let m_up = match m {
Message::Text(t) => {
info!("Got msg: {}", t);
t.as_bytes().to_vec()
}
_ => panic!("todo"),
};
if let Err(e) = ws_upstream.send(m_up).await {
bail!("Failed to send msg to upstream: {}", e);
}
}
Err(e) => {
bail!("Failed to read from client: {}", e);
}
}
Ok(())
}
async fn process_upstream<E>(
msg: Result<Vec<u8>, E>,
tx_client: &mut stream::DuplexStream,
) -> Result<()>
where
E: Display,
{
match msg {
Ok(m) => {
let down = String::from_utf8_lossy(&m).into_owned();
info!("Got down msg: {}", &down);
let m_down = Message::Text(down);
if let Err(e) = tx_client.send(m_down).await {
bail!("Failed to msg to client: {}", e);
}
}
Err(e) => {
bail!("Failed to read from upstream: {}", e);
}
}
Ok(())
}
loop {
tokio::select! {
Some(msg) = stream.next() => {
if let Err(e) = process_client(msg, &mut ws_upstream.tx).await {
error!("{}", e);
break;
}
},
Some(r) = ws_upstream.rx.recv() => {
let msg: Result<Vec<u8>, anyhow::Error> = Ok(r);
if let Err(e) = process_upstream(msg, &mut stream).await {
error!("{}", e);
break;
}
}
}
}
info!("Websocket closed");
Ok(())
})
}))
}
#[openapi(tag = "Payment")]
#[get("/api/v1/payment/methods")]
async fn v1_get_payment_methods(settings: &State<Settings>) -> ApiResult<Vec<ApiPaymentInfo>> {
let mut ret = vec![ApiPaymentInfo {
name: ApiPaymentMethod::Lightning,
metadata: HashMap::new(),
currencies: vec![Currency::BTC],
}];
#[cfg(feature = "revolut")]
if let Some(r) = &settings.revolut {
ret.push(ApiPaymentInfo {
name: ApiPaymentMethod::Revolut,
metadata: HashMap::from([("pubkey".to_string(), r.public_key.to_string())]),
currencies: vec![Currency::EUR, Currency::USD],
})
}
ApiData::ok(ret)
}
/// Get payment status (for polling)
#[openapi(tag = "Payment")]
#[get("/api/v1/payment/<id>")]

View File

@ -6,25 +6,38 @@ use std::collections::HashMap;
use std::sync::LazyLock;
use tokio::sync::broadcast;
/// Messaging bridge for webhooks to other parts of the system (bitvora)
/// Messaging bridge for webhooks to other parts of the system (bitvora/revout)
pub static WEBHOOK_BRIDGE: LazyLock<WebhookBridge> = LazyLock::new(WebhookBridge::new);
pub fn routes() -> Vec<Route> {
if cfg!(feature = "bitvora") {
routes![bitvora_webhook]
} else {
routes![]
}
let mut routes = vec![];
#[cfg(feature = "bitvora")]
routes.append(&mut routes![bitvora_webhook]);
#[cfg(feature = "revolut")]
routes.append(&mut routes![revolut_webhook]);
routes
}
#[cfg(feature = "bitvora")]
#[post("/api/v1/webhook/bitvora", data = "<req>")]
async fn bitvora_webhook(req: WebhookMessage) -> Status {
WEBHOOK_BRIDGE.send(req);
Status::Ok
}
#[cfg(feature = "revolut")]
#[post("/api/v1/webhook/revolut", data = "<req>")]
async fn revolut_webhook(req: WebhookMessage) -> Status {
WEBHOOK_BRIDGE.send(req);
Status::Ok
}
#[derive(Debug, Clone)]
pub struct WebhookMessage {
pub endpoint: String,
pub body: Vec<u8>,
pub headers: HashMap<String, String>,
}
@ -48,6 +61,7 @@ impl<'r> FromData<'r> for WebhookMessage {
return rocket::data::Outcome::Error((Status::BadRequest, ()));
};
let msg = WebhookMessage {
endpoint: req.uri().path().to_string(),
headers: header,
body: body.value.to_vec(),
};

View File

@ -4,9 +4,11 @@ use clap::Parser;
use config::{Config, File};
use lnvps::api;
use lnvps::cors::CORS;
use lnvps::data_migration::run_data_migrations;
use lnvps::dvm::start_dvms;
use lnvps::exchange::{DefaultRateCache, ExchangeRateService};
use lnvps::invoice::InvoiceHandler;
use lnvps::lightning::get_node;
use lnvps::payments::listen_all_payments;
use lnvps::settings::Settings;
use lnvps::status::VmStateCache;
use lnvps::worker::{WorkJob, Worker};
@ -14,12 +16,12 @@ use lnvps_db::{LNVpsDb, LNVpsDbMysql};
use log::{error, LevelFilter};
use nostr::Keys;
use nostr_sdk::Client;
use rocket::http::Method;
use rocket_okapi::swagger_ui::{make_swagger_ui, SwaggerUIConfig};
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Parser)]
#[clap(about, version, author)]
@ -36,7 +38,7 @@ struct Args {
#[rocket::main]
async fn main() -> Result<(), Error> {
let log_level = std::env::var("RUST_LOG")
.unwrap_or_else(|_| "info".to_string()) // Default to "info" if not set
.unwrap_or_else(|_| "info".to_string())
.to_lowercase();
let max_level = match log_level.as_str() {
@ -46,7 +48,7 @@ async fn main() -> Result<(), Error> {
"warn" => LevelFilter::Warn,
"error" => LevelFilter::Error,
"off" => LevelFilter::Off,
_ => LevelFilter::Info,
_ => LevelFilter::Debug,
};
let args = Args::parse();
@ -84,6 +86,9 @@ async fn main() -> Result<(), Error> {
}
let db: Arc<dyn LNVpsDb> = Arc::new(db);
// run data migrations
run_data_migrations(db.clone(), &settings).await?;
let nostr_client = if let Some(ref c) = settings.nostr {
let cx = Client::builder().signer(Keys::parse(&c.nsec)?).build();
for r in &c.relays {
@ -117,15 +122,10 @@ async fn main() -> Result<(), Error> {
}
}
});
let mut handler = InvoiceHandler::new(node.clone(), db.clone(), sender.clone());
tokio::spawn(async move {
loop {
if let Err(e) = handler.listen().await {
error!("invoice-error: {}", e);
}
sleep(Duration::from_secs(5)).await;
}
});
// setup payment handlers
listen_all_payments(&settings, node.clone(), db.clone(), sender.clone())?;
// request work every 30s to check vm status
let sender_clone = sender.clone();
tokio::spawn(async move {
@ -152,6 +152,12 @@ async fn main() -> Result<(), Error> {
}
});
#[cfg(feature = "nostr-dvm")]
{
let nostr_client = nostr_client.unwrap();
start_dvms(nostr_client.clone(), provisioner.clone());
}
let mut config = rocket::Config::default();
let ip: SocketAddr = match &settings.listen {
Some(i) => i.parse()?,
@ -161,7 +167,6 @@ async fn main() -> Result<(), Error> {
config.port = ip.port();
if let Err(e) = rocket::Rocket::custom(config)
.attach(CORS)
.manage(db.clone())
.manage(provisioner.clone())
.manage(status.clone())
@ -176,6 +181,16 @@ async fn main() -> Result<(), Error> {
..Default::default()
}),
)
.attach(CORS)
.mount(
"/",
vec![rocket::Route::ranked(
isize::MAX,
Method::Options,
"/<catch_all_options_route..>",
CORS,
)],
)
.launch()
.await
{

View File

@ -1,8 +1,9 @@
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::{Header, Method, Status};
use rocket::{Request, Response};
use std::io::Cursor;
use rocket::http::Header;
use rocket::route::{Handler, Outcome};
use rocket::{Data, Request, Response};
#[derive(Clone)]
pub struct CORS;
#[rocket::async_trait]
@ -14,7 +15,7 @@ impl Fairing for CORS {
}
}
async fn on_response<'r>(&self, req: &'r Request<'_>, response: &mut Response<'r>) {
async fn on_response<'r>(&self, _req: &'r Request<'_>, response: &mut Response<'r>) {
response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
response.set_header(Header::new(
"Access-Control-Allow-Methods",
@ -22,11 +23,12 @@ impl Fairing for CORS {
));
response.set_header(Header::new("Access-Control-Allow-Headers", "*"));
response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
// force status 200 for options requests
if req.method() == Method::Options {
response.set_status(Status::Ok);
response.set_sized_body(None, Cursor::new(""))
}
}
}
#[rocket::async_trait]
impl Handler for CORS {
async fn handle<'r>(&self, _request: &'r Request<'_>, _data: Data<'r>) -> Outcome<'r> {
Outcome::Success(Response::new())
}
}

55
src/data_migration/dns.rs Normal file
View File

@ -0,0 +1,55 @@
use crate::data_migration::DataMigration;
use crate::dns::{BasicRecord, DnsServer};
use crate::settings::Settings;
use anyhow::Result;
use lnvps_db::LNVpsDb;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub struct DnsDataMigration {
db: Arc<dyn LNVpsDb>,
dns: Arc<dyn DnsServer>,
}
impl DnsDataMigration {
pub fn new(db: Arc<dyn LNVpsDb>, settings: &Settings) -> Option<Self> {
let dns = settings.get_dns().ok().flatten()?;
Some(Self { db, dns })
}
}
impl DataMigration for DnsDataMigration {
fn migrate(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
let db = self.db.clone();
let dns = self.dns.clone();
Box::pin(async move {
let vms = db.list_vms().await?;
for vm in vms {
let mut ips = db.list_vm_ip_assignments(vm.id).await?;
for ip in &mut ips {
let mut did_change = false;
if ip.dns_forward.is_none() {
let rec = BasicRecord::forward(ip)?;
let r = dns.add_record(&rec).await?;
ip.dns_forward = Some(r.name);
ip.dns_forward_ref = r.id;
did_change = true;
}
if ip.dns_reverse.is_none() {
let rec = BasicRecord::reverse_to_fwd(ip)?;
let r = dns.add_record(&rec).await?;
ip.dns_reverse = Some(r.value);
ip.dns_reverse_ref = r.id;
did_change = true;
}
if did_change {
db.update_vm_ip_assignment(ip).await?;
}
}
}
Ok(())
})
}
}

31
src/data_migration/mod.rs Normal file
View File

@ -0,0 +1,31 @@
use crate::data_migration::dns::DnsDataMigration;
use crate::settings::Settings;
use anyhow::Result;
use lnvps_db::LNVpsDb;
use log::{error, info};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
mod dns;
/// Basic data migration to run at startup
pub trait DataMigration: Send + Sync {
fn migrate(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
}
pub async fn run_data_migrations(db: Arc<dyn LNVpsDb>, settings: &Settings) -> Result<()> {
let mut migrations: Vec<Arc<dyn DataMigration>> = vec![];
if let Some(d) = DnsDataMigration::new(db.clone(), settings) {
migrations.push(Arc::new(d));
}
info!("Running {} data migrations", migrations.len());
for migration in migrations {
if let Err(e) = migration.migrate().await {
error!("Error running data migration: {}", e);
}
}
Ok(())
}

View File

@ -14,8 +14,12 @@ pub struct Cloudflare {
impl Cloudflare {
pub fn new(token: &str, reverse_zone_id: &str, forward_zone_id: &str) -> Cloudflare {
Self {
api: JsonApi::token("https://api.cloudflare.com", &format!("Bearer {}", token))
.unwrap(),
api: JsonApi::token(
"https://api.cloudflare.com",
&format!("Bearer {}", token),
false,
)
.unwrap(),
reverse_zone_id: reverse_zone_id.to_owned(),
forward_zone_id: forward_zone_id.to_owned(),
}

215
src/dvm/lnvps.rs Normal file
View File

@ -0,0 +1,215 @@
use crate::dvm::{build_status_for_job, DVMHandler, DVMJobRequest};
use crate::provisioner::LNVpsProvisioner;
use anyhow::Context;
use lnvps_db::{DiskInterface, DiskType, LNVpsDb, PaymentMethod, UserSshKey, VmCustomTemplate};
use nostr::prelude::DataVendingMachineStatus;
use nostr::Tag;
use nostr_sdk::Client;
use ssh_key::PublicKey;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
pub struct LnvpsDvm {
client: Client,
provisioner: Arc<LNVpsProvisioner>,
}
impl LnvpsDvm {
pub fn new(provisioner: Arc<LNVpsProvisioner>, client: Client) -> LnvpsDvm {
Self {
provisioner,
client,
}
}
}
impl DVMHandler for LnvpsDvm {
fn handle_request(
&mut self,
request: DVMJobRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
let provisioner = self.provisioner.clone();
let client = self.client.clone();
Box::pin(async move {
let default_disk = "ssd".to_string();
let default_interface = "pcie".to_string();
let cpu = request.params.get("cpu").context("missing cpu parameter")?;
let memory = request
.params
.get("memory")
.context("missing memory parameter")?;
let disk = request
.params
.get("disk")
.context("missing disk parameter")?;
let disk_type = request.params.get("disk_type").unwrap_or(&default_disk);
let disk_interface = request
.params
.get("disk_interface")
.unwrap_or(&default_interface);
let ssh_key = request
.params
.get("ssh_key")
.context("missing ssh_key parameter")?;
let ssh_key_name = request.params.get("ssh_key_name");
let region = request.params.get("region");
let db = provisioner.get_db();
let host_region = if let Some(r) = region {
db.get_host_region_by_name(r).await?
} else {
db.list_host_region()
.await?
.into_iter()
.next()
.context("no host region")?
};
let pricing = db.list_custom_pricing(host_region.id).await?;
// we expect only 1 pricing per region
let pricing = pricing
.first()
.context("no custom pricing found in region")?;
let template = VmCustomTemplate {
id: 0,
cpu: cpu.parse()?,
memory: memory.parse()?,
disk_size: disk.parse()?,
disk_type: DiskType::from_str(disk_type)?,
disk_interface: DiskInterface::from_str(disk_interface)?,
pricing_id: pricing.id,
};
let uid = db.upsert_user(request.event.pubkey.as_bytes()).await?;
let pk: PublicKey = ssh_key.parse()?;
let key_name = if let Some(n) = ssh_key_name {
n.clone()
} else {
pk.comment().to_string()
};
let new_key = UserSshKey {
name: key_name,
user_id: uid,
key_data: pk.to_openssh()?,
..Default::default()
};
// report as started if params are valid
let processing =
build_status_for_job(&request, DataVendingMachineStatus::Processing, None, None);
client.send_event_builder(processing).await?;
let existing_keys = db.list_user_ssh_key(uid).await?;
let ssh_key_id = if let Some(k) = existing_keys.iter().find(|k| {
let ek: PublicKey = k.key_data.parse().unwrap();
ek.eq(&pk)
}) {
k.id
} else {
db.insert_user_ssh_key(&new_key).await?
};
let vm = provisioner
.provision_custom(uid, template, 0, ssh_key_id, None)
.await?;
let invoice = provisioner.renew(vm.id, PaymentMethod::Lightning).await?;
let mut payment = build_status_for_job(
&request,
DataVendingMachineStatus::PaymentRequired,
None,
None,
);
payment = payment.tag(Tag::parse([
"amount",
invoice.amount.to_string().as_str(),
&invoice.external_data,
])?);
client.send_event_builder(payment).await?;
Ok(())
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dvm::parse_job_request;
use crate::exchange::{ExchangeRateService, Ticker};
use crate::mocks::{MockDb, MockExchangeRate, MockNode};
use crate::settings::mock_settings;
use lnvps_db::{VmCustomPricing, VmCustomPricingDisk};
use nostr::{EventBuilder, Keys, Kind};
#[tokio::test]
async fn test_dvm() -> anyhow::Result<()> {
let db = Arc::new(MockDb::default());
let node = Arc::new(MockNode::new());
let exch = Arc::new(MockExchangeRate::new());
exch.set_rate(Ticker::btc_rate("EUR")?, 69_420.0).await;
{
let mut cp = db.custom_pricing.lock().await;
cp.insert(
1,
VmCustomPricing {
id: 1,
name: "mock".to_string(),
enabled: true,
created: Default::default(),
expires: None,
region_id: 1,
currency: "EUR".to_string(),
cpu_cost: 1.5,
memory_cost: 0.5,
ip4_cost: 1.5,
ip6_cost: 0.05,
},
);
let mut cpd = db.custom_pricing_disk.lock().await;
cpd.insert(
1,
VmCustomPricingDisk {
id: 1,
pricing_id: 1,
kind: DiskType::SSD,
interface: DiskInterface::PCIe,
cost: 0.05,
},
);
}
let settings = mock_settings();
let provisioner = Arc::new(LNVpsProvisioner::new(
settings,
db.clone(),
node.clone(),
exch.clone(),
));
let keys = Keys::generate();
let empty_client = Client::new(keys.clone());
empty_client.add_relay("wss://nos.lol").await?;
empty_client.connect().await;
let mut dvm = LnvpsDvm::new(provisioner.clone(), empty_client.clone());
let ev = EventBuilder::new(Kind::from_u16(5999), "")
.tags([
Tag::parse(["param", "cpu", "1"])?,
Tag::parse(["param", "memory", "1024"])?,
Tag::parse(["param", "disk", "50"])?,
Tag::parse(["param", "disk_type", "ssd"])?,
Tag::parse(["param", "ssh_key", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIGUSrwzZfbjqY81RRC7eg3zRvg0D53HOhjbG6h0SY3f3"])?,
])
.sign(&keys)
.await?;
let req = parse_job_request(&ev)?;
dvm.handle_request(req).await?;
Ok(())
}
}

260
src/dvm/mod.rs Normal file
View File

@ -0,0 +1,260 @@
mod lnvps;
use crate::dvm::lnvps::LnvpsDvm;
use crate::provisioner::LNVpsProvisioner;
use anyhow::Result;
use futures::FutureExt;
use log::{error, info, warn};
use nostr::Filter;
use nostr_sdk::prelude::DataVendingMachineStatus;
use nostr_sdk::{
Client, Event, EventBuilder, EventId, Kind, RelayPoolNotification, Tag, Timestamp, Url,
};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
#[derive(Clone)]
pub struct DVMJobRequest {
/// The source event
pub event: Event,
/// Input data for the job (zero or more inputs)
pub inputs: Vec<DVMInput>,
/// Expected output format. Different job request kind defines this more precisely.
pub output_type: Option<String>,
/// Optional parameters for the job as key (first argument)/value (second argument).
/// Different job request kind defines this more precisely. (e.g. [ "param", "lang", "es" ])
pub params: HashMap<String, String>,
/// Customer MAY specify a maximum amount (in millisats) they are willing to pay
pub bid: Option<u64>,
/// List of relays where Service Providers SHOULD publish responses to
pub relays: Vec<String>,
}
#[derive(Clone)]
pub enum DVMInput {
Url {
url: Url,
relay: Option<String>,
marker: Option<String>,
},
Event {
event: EventId,
relay: Option<String>,
marker: Option<String>,
},
Job {
event: EventId,
relay: Option<String>,
marker: Option<String>,
},
Text {
data: String,
relay: Option<String>,
marker: Option<String>,
},
}
/// Basic DVM handler that accepts a job request
pub trait DVMHandler: Send + Sync {
fn handle_request(
&mut self,
request: DVMJobRequest,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
}
pub(crate) fn build_status_for_job(
req: &DVMJobRequest,
status: DataVendingMachineStatus,
extra: Option<&str>,
content: Option<&str>,
) -> EventBuilder {
EventBuilder::new(Kind::JobFeedback, content.unwrap_or("")).tags([
Tag::parse(["status", status.to_string().as_str(), extra.unwrap_or("")]).unwrap(),
Tag::expiration(Timestamp::now() + Duration::from_secs(30)),
Tag::event(req.event.id),
Tag::public_key(req.event.pubkey),
])
}
/// Start listening for jobs with a specific handler
fn listen_for_jobs(
client: Client,
kind: Kind,
mut dvm: Box<dyn DVMHandler>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
Box::pin(async move {
let sub = client
.subscribe(Filter::new().kind(kind).since(Timestamp::now()), None)
.await?;
info!("Listening for jobs: {}", kind);
let mut rx = client.notifications();
while let Ok(e) = rx.recv().await {
match e {
RelayPoolNotification::Event { event, .. } if event.kind == kind => {
match parse_job_request(&event) {
Ok(req) => {
if let Err(e) = dvm.handle_request(req.clone()).await {
error!("Error handling job request: {}", e);
let data = build_status_for_job(
&req,
DataVendingMachineStatus::Error,
Some(e.to_string().as_str()),
None,
);
client.send_event_builder(data).await?;
}
}
Err(e) => warn!("Invalid job request: {:?}", e),
}
}
_ => {}
}
}
client.unsubscribe(&sub).await;
Ok(())
})
}
fn parse_job_request(event: &Event) -> Result<DVMJobRequest> {
let mut inputs = vec![];
for i_tag in event
.tags
.iter()
.filter(|t| t.kind().as_str() == "i")
.map(|t| t.as_slice())
{
let input = match i_tag[2].as_str() {
"url" => DVMInput::Url {
url: if let Ok(u) = i_tag[1].parse() {
u
} else {
warn!("Invalid url: {}", i_tag[1]);
continue;
},
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
"event" => DVMInput::Event {
event: if let Ok(t) = EventId::parse(&i_tag[1]) {
t
} else {
warn!("Invalid event id: {}", i_tag[1]);
continue;
},
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
"job" => DVMInput::Job {
event: if let Ok(t) = EventId::parse(&i_tag[1]) {
t
} else {
warn!("Invalid event id in job: {}", i_tag[1]);
continue;
},
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
"text" => DVMInput::Text {
data: i_tag[1].to_string(),
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
t => {
warn!("unknown tag: {}", t);
continue;
}
};
inputs.push(input);
}
let params: HashMap<String, String> = event
.tags
.iter()
.filter(|t| t.kind().as_str() == "param")
.filter_map(|p| {
let p = p.as_slice();
if p.len() == 3 {
Some((p[1].clone(), p[2].clone()))
} else {
warn!("Invalid param: {}", p.join(", "));
None
}
})
.collect();
Ok(DVMJobRequest {
event: event.clone(),
inputs,
output_type: event
.tags
.iter()
.find(|t| t.kind().as_str() == "output")
.and_then(|t| t.content())
.map(|s| s.to_string()),
params,
bid: event
.tags
.iter()
.find(|t| t.kind().as_str() == "bid")
.and_then(|t| t.content())
.and_then(|t| t.parse::<u64>().ok()),
relays: event
.tags
.iter()
.filter(|t| t.kind().as_str() == "relay")
.map(|c| &c.as_slice()[1..])
.flatten()
.map(|s| s.to_string())
.collect(),
})
}
pub fn start_dvms(
client: Client,
provisioner: Arc<LNVpsProvisioner>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let dvm = LnvpsDvm::new(provisioner, client.clone());
if let Err(e) = listen_for_jobs(client, Kind::from_u16(5999), Box::new(dvm)).await {
error!("Error listening jobs: {}", e);
}
})
}

View File

@ -1,10 +1,10 @@
use anyhow::{anyhow, ensure, Context, Error, Result};
use anyhow::{anyhow, ensure, Result};
use lnvps_db::async_trait;
use log::info;
use rocket::serde::Deserialize;
use schemars::JsonSchema;
use serde::Serialize;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
@ -60,7 +60,37 @@ impl Display for Ticker {
pub struct TickerRate(pub Ticker, pub f32);
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct CurrencyAmount(pub Currency, pub f32);
pub struct CurrencyAmount(pub Currency, u64);
impl CurrencyAmount {
const MILLI_SATS: f64 = 1.0e11;
pub fn from_u64(currency: Currency, amount: u64) -> Self {
CurrencyAmount(currency, amount)
}
pub fn from_f32(currency: Currency, amount: f32) -> Self {
CurrencyAmount(
currency,
match currency {
Currency::EUR => (amount * 100.0) as u64, // cents
Currency::BTC => (amount as f64 * Self::MILLI_SATS) as u64, // milli-sats
Currency::USD => (amount * 100.0) as u64, // cents
},
)
}
pub fn value(&self) -> u64 {
self.1
}
pub fn value_f32(&self) -> f32 {
match self.0 {
Currency::EUR => self.1 as f32 / 100.0,
Currency::BTC => (self.1 as f64 / Self::MILLI_SATS) as f32,
Currency::USD => self.1 as f32 / 100.0,
}
}
}
impl TickerRate {
pub fn can_convert(&self, currency: Currency) -> bool {
@ -74,9 +104,15 @@ impl TickerRate {
"Cant convert, currency doesnt match"
);
if source.0 == self.0 .0 {
Ok(CurrencyAmount(self.0 .1, source.1 * self.1))
Ok(CurrencyAmount::from_f32(
self.0 .1,
source.value_f32() * self.1,
))
} else {
Ok(CurrencyAmount(self.0 .0, source.1 / self.1))
Ok(CurrencyAmount::from_f32(
self.0 .0,
source.value_f32() / self.1,
))
}
}
}
@ -99,7 +135,7 @@ pub fn alt_prices(rates: &Vec<TickerRate>, source: CurrencyAmount) -> Vec<Curren
let mut ret2 = vec![];
for y in rates.iter() {
for x in ret.iter() {
if let Ok(r1) = y.convert(x.clone()) {
if let Ok(r1) = y.convert(*x) {
if r1.0 != source.0 {
ret2.push(r1);
}
@ -171,12 +207,14 @@ mod tests {
let f = TickerRate(ticker, RATE);
assert_eq!(
f.convert(CurrencyAmount(Currency::EUR, 5.0)).unwrap(),
CurrencyAmount(Currency::BTC, 5.0 / RATE)
f.convert(CurrencyAmount::from_f32(Currency::EUR, 5.0))
.unwrap(),
CurrencyAmount::from_f32(Currency::BTC, 5.0 / RATE)
);
assert_eq!(
f.convert(CurrencyAmount(Currency::BTC, 0.001)).unwrap(),
CurrencyAmount(Currency::EUR, RATE * 0.001)
f.convert(CurrencyAmount::from_f32(Currency::BTC, 0.001))
.unwrap(),
CurrencyAmount::from_f32(Currency::EUR, RATE * 0.001)
);
assert!(!f.can_convert(Currency::USD));
assert!(f.can_convert(Currency::EUR));

25
src/fiat/mod.rs Normal file
View File

@ -0,0 +1,25 @@
/// Fiat payment integrations
use crate::exchange::CurrencyAmount;
use anyhow::Result;
use rocket::serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
#[cfg(feature = "revolut")]
mod revolut;
#[cfg(feature = "revolut")]
pub use revolut::*;
pub trait FiatPaymentService: Send + Sync {
fn create_order(
&self,
description: &str,
amount: CurrencyAmount,
) -> Pin<Box<dyn Future<Output = Result<FiatPaymentInfo>> + Send>>;
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FiatPaymentInfo {
pub external_id: String,
pub raw_data: String,
}

275
src/fiat/revolut.rs Normal file
View File

@ -0,0 +1,275 @@
use crate::exchange::{Currency, CurrencyAmount};
use crate::fiat::{FiatPaymentInfo, FiatPaymentService};
use crate::json_api::JsonApi;
use crate::settings::RevolutConfig;
use anyhow::{bail, Result};
use chrono::{DateTime, Utc};
use reqwest::header::{HeaderMap, ACCEPT, AUTHORIZATION};
use reqwest::{Client, Method};
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
pub struct RevolutApi {
api: JsonApi,
}
impl RevolutApi {
pub fn new(config: RevolutConfig) -> Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(AUTHORIZATION, format!("Bearer {}", config.token).parse()?);
headers.insert(ACCEPT, "application/json".parse()?);
headers.insert("Revolut-Api-Version", config.api_version.parse()?);
let client = Client::builder().default_headers(headers).build()?;
Ok(Self {
api: JsonApi {
client,
base: config
.url
.unwrap_or("https://merchant.revolut.com".to_string())
.parse()?,
},
})
}
pub async fn list_webhooks(&self) -> Result<Vec<RevolutWebhook>> {
self.api.get("/api/1.0/webhooks").await
}
pub async fn delete_webhook(&self, webhook_id: &str) -> Result<()> {
self.api
.req_status(
Method::DELETE,
&format!("/api/1.0/webhooks/{}", webhook_id),
(),
)
.await?;
Ok(())
}
pub async fn create_webhook(
&self,
url: &str,
events: Vec<RevolutWebhookEvent>,
) -> Result<RevolutWebhook> {
self.api
.post(
"/api/1.0/webhooks",
CreateWebhookRequest {
url: url.to_string(),
events,
},
)
.await
}
pub async fn create_order(
&self,
amount: CurrencyAmount,
description: Option<String>,
) -> Result<RevolutOrder> {
self.api
.post(
"/api/orders",
CreateOrderRequest {
currency: amount.0.to_string(),
amount: match amount.0 {
Currency::BTC => bail!("Bitcoin amount not allowed for fiat payments"),
Currency::EUR => amount.value(),
Currency::USD => amount.value(),
},
description,
},
)
.await
}
pub async fn get_order(&self, order_id: &str) -> Result<RevolutOrder> {
self.api.get(&format!("/api/orders/{}", order_id)).await
}
}
impl FiatPaymentService for RevolutApi {
fn create_order(
&self,
description: &str,
amount: CurrencyAmount,
) -> Pin<Box<dyn Future<Output = Result<FiatPaymentInfo>> + Send>> {
let s = self.clone();
let desc = description.to_string();
Box::pin(async move {
let rsp = s.create_order(amount, Some(desc)).await?;
Ok(FiatPaymentInfo {
raw_data: serde_json::to_string(&rsp)?,
external_id: rsp.id,
})
})
}
}
#[derive(Clone, Serialize)]
pub struct CreateOrderRequest {
pub amount: u64,
pub currency: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct RevolutOrder {
pub id: String,
pub token: String,
pub state: RevolutOrderState,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub description: Option<String>,
pub amount: u64,
pub currency: String,
pub outstanding_amount: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub checkout_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub payments: Option<Vec<RevolutOrderPayment>>,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct RevolutOrderPayment {
pub id: String,
pub state: RevolutPaymentState,
#[serde(skip_serializing_if = "Option::is_none")]
pub decline_reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bank_message: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub token: Option<String>,
pub amount: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub currency: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub settled_amount: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub settled_currency: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub payment_method: Option<RevolutPaymentMethod>,
#[serde(skip_serializing_if = "Option::is_none")]
pub billing_address: Option<RevolutBillingAddress>,
#[serde(skip_serializing_if = "Option::is_none")]
pub risk_level: Option<RevolutRiskLevel>
}
#[derive(Clone, Deserialize, Serialize)]
pub struct RevolutPaymentMethod {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(rename = "type")]
pub kind: RevolutPaymentMethodType,
#[serde(skip_serializing_if = "Option::is_none")]
pub card_brand: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub funding: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub card_country_code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub card_bin: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub card_last_four: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub card_expiry: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cardholder_name: Option<String>,
}
#[derive(Clone, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum RevolutPaymentMethodType {
ApplePay,
Card,
GooglePay,
RevolutPayCard,
RevolutPayAccount,
}
#[derive(Clone, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum RevolutRiskLevel {
High,
Low
}
#[derive(Clone, Deserialize, Serialize)]
pub struct RevolutBillingAddress {
#[serde(skip_serializing_if = "Option::is_none")]
pub street_line_1: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub street_line_2: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub city: Option<String>,
pub country_code: String,
pub postcode: String,
}
#[derive(Clone, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum RevolutOrderState {
Pending,
Processing,
Authorised,
Completed,
Cancelled,
Failed,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum RevolutPaymentState {
Pending,
AuthenticationChallenge,
AuthenticationVerified,
AuthorisationStarted,
AuthorisationPassed,
Authorised,
CaptureStarted,
Captured,
RefundValidated,
RefundStarted,
CancellationStarted,
Declining,
Completing,
Cancelling,
Failing,
Completed,
Declined,
SoftDeclined,
Cancelled,
Failed,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct RevolutWebhook {
pub id: String,
pub url: String,
pub events: Vec<RevolutWebhookEvent>,
pub signing_secret: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum RevolutWebhookEvent {
OrderAuthorised,
OrderCompleted,
OrderCancelled,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct CreateWebhookRequest {
pub url: String,
pub events: Vec<RevolutWebhookEvent>,
}

View File

@ -30,11 +30,15 @@ impl VmHostClient for LibVirt {
todo!()
}
async fn reinstall_vm(&self, cfg: &FullVmInfo) -> anyhow::Result<()> {
todo!()
}
async fn get_vm_state(&self, vm: &Vm) -> anyhow::Result<VmState> {
todo!()
}
async fn configure_vm(&self, vm: &Vm) -> anyhow::Result<()> {
async fn configure_vm(&self, vm: &FullVmInfo) -> anyhow::Result<()> {
todo!()
}

View File

@ -2,6 +2,7 @@ use crate::settings::ProvisionerConfig;
use crate::status::VmState;
use anyhow::{bail, Result};
use futures::future::join_all;
use futures::{Sink, Stream};
use lnvps_db::{
async_trait, IpRange, LNVpsDb, UserSshKey, Vm, VmCustomTemplate, VmHost, VmHostDisk,
VmHostKind, VmIpAssignment, VmOsImage, VmTemplate,
@ -9,13 +10,24 @@ use lnvps_db::{
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Semaphore;
#[cfg(feature = "libvirt")]
mod libvirt;
//#[cfg(feature = "libvirt")]
//mod libvirt;
#[cfg(feature = "proxmox")]
mod proxmox;
pub struct TerminalStream {
pub shutdown: Arc<AtomicBool>,
pub rx: Receiver<Vec<u8>>,
pub tx: Sender<Vec<u8>>,
}
/// Generic type for creating VM's
#[async_trait]
pub trait VmHostClient: Send + Sync {
@ -37,6 +49,9 @@ pub trait VmHostClient: Send + Sync {
/// Spawn a VM
async fn create_vm(&self, cfg: &FullVmInfo) -> Result<()>;
/// Re-install a vm OS
async fn reinstall_vm(&self, cfg: &FullVmInfo) -> Result<()>;
/// Get the running status of a VM
async fn get_vm_state(&self, vm: &Vm) -> Result<VmState>;
@ -49,6 +64,9 @@ pub trait VmHostClient: Send + Sync {
vm: &Vm,
series: TimeSeries,
) -> Result<Vec<TimeSeriesData>>;
/// Connect to terminal serial port
async fn connect_terminal(&self, vm: &Vm) -> Result<TerminalStream>;
}
pub fn get_host_client(host: &VmHost, cfg: &ProvisionerConfig) -> Result<Arc<dyn VmHostClient>> {

View File

@ -1,23 +1,35 @@
use crate::host::{FullVmInfo, TimeSeries, TimeSeriesData, VmHostClient};
use crate::host::{FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient};
use crate::json_api::JsonApi;
use crate::settings::{QemuConfig, SshConfig};
use crate::ssh_client::SshClient;
use crate::status::{VmRunningState, VmState};
use anyhow::{anyhow, bail, ensure, Result};
use chrono::Utc;
use futures::{Stream, StreamExt};
use ipnetwork::IpNetwork;
use lnvps_db::{async_trait, DiskType, Vm, VmOsImage};
use log::{info, warn};
use log::{error, info, warn};
use rand::random;
use reqwest::header::{HeaderMap, AUTHORIZATION};
use reqwest::{ClientBuilder, Method, Url};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::io::{Read, Write};
use std::net::IpAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::channel;
use tokio::sync::Semaphore;
use tokio::time;
use tokio::time::sleep;
use tokio_tungstenite::tungstenite::protocol::Role;
use tokio_tungstenite::WebSocketStream;
use ws::stream::DuplexStream;
pub struct ProxmoxClient {
api: JsonApi,
@ -365,6 +377,30 @@ impl ProxmoxClient {
node: node.to_string(),
})
}
/// Delete disks from VM
pub async fn unlink_disk(
&self,
node: &str,
vm: ProxmoxVmId,
disks: Vec<String>,
force: bool,
) -> Result<()> {
self.api
.req_status(
Method::PUT,
&format!(
"/api2/json/nodes/{}/qemu/{}/unlink?idlist={}&force={}",
node,
vm,
disks.join(","),
if force { "1" } else { "0" }
),
(),
)
.await?;
Ok(())
}
}
impl ProxmoxClient {
@ -376,12 +412,15 @@ impl ProxmoxClient {
if let Ok(net) = ip.ip.parse::<IpAddr>() {
Some(match net {
IpAddr::V4(addr) => {
let range = value.ranges.iter().find(|r| r.id == ip.ip_range_id)?;
let range: IpNetwork = range.gateway.parse().ok()?;
let ip_range = value.ranges.iter().find(|r| r.id == ip.ip_range_id)?;
let range: IpNetwork = ip_range.cidr.parse().ok()?;
let range_gw: IpNetwork = ip_range.gateway.parse().ok()?;
// take the largest (smallest prefix number) of the network prefixes
let max_net = range.prefix().min(range_gw.prefix());
format!(
"ip={},gw={}",
IpNetwork::new(addr.into(), range.prefix()).ok()?,
range.ip()
IpNetwork::new(addr.into(), max_net).ok()?,
range_gw.ip()
)
}
IpAddr::V6(addr) => format!("ip6={}", addr),
@ -415,7 +454,7 @@ impl ProxmoxClient {
bios: Some(VmBios::OVMF),
boot: Some("order=scsi0".to_string()),
cores: Some(vm_resources.cpu as i32),
memory: Some((vm_resources.memory / 1024 / 1024).to_string()),
memory: Some((vm_resources.memory / crate::MB).to_string()),
scsi_hw: Some("virtio-scsi-pci".to_string()),
serial_0: Some("socket".to_string()),
scsi_1: Some(format!("{}:cloudinit", &value.disk.name)),
@ -424,7 +463,38 @@ impl ProxmoxClient {
..Default::default()
})
}
/// Import main disk image from the template
async fn import_template_disk(&self, req: &FullVmInfo) -> Result<()> {
let vm_id = req.vm.id.into();
// import primary disk from image (scsi0)
self.import_disk_image(ImportDiskImageRequest {
vm_id,
node: self.node.clone(),
storage: req.disk.name.clone(),
disk: "scsi0".to_string(),
image: req.image.filename()?,
is_ssd: matches!(req.disk.kind, DiskType::SSD),
})
.await?;
// resize disk to match template
let j_resize = self
.resize_disk(ResizeDiskRequest {
node: self.node.clone(),
vm_id,
disk: "scsi0".to_string(),
size: req.resources()?.disk_size.to_string(),
})
.await?;
// TODO: rollback
self.wait_for_task(&j_resize).await?;
Ok(())
}
}
#[async_trait]
impl VmHostClient for ProxmoxClient {
async fn download_os_image(&self, image: &VmOsImage) -> Result<()> {
@ -496,28 +566,35 @@ impl VmHostClient for ProxmoxClient {
.await?;
self.wait_for_task(&t_create).await?;
// import primary disk from image (scsi0)
self.import_disk_image(ImportDiskImageRequest {
vm_id,
node: self.node.clone(),
storage: req.disk.name.clone(),
disk: "scsi0".to_string(),
image: req.image.filename()?,
is_ssd: matches!(req.disk.kind, DiskType::SSD),
})
.await?;
// import template image
self.import_template_disk(&req).await?;
// resize disk to match template
let j_resize = self
.resize_disk(ResizeDiskRequest {
node: self.node.clone(),
vm_id,
disk: "scsi0".to_string(),
size: req.resources()?.disk_size.to_string(),
})
// try start, otherwise ignore error (maybe its already running)
if let Ok(j_start) = self.start_vm(&self.node, vm_id).await {
if let Err(e) = self.wait_for_task(&j_start).await {
warn!("Failed to start vm: {}", e);
}
}
Ok(())
}
async fn reinstall_vm(&self, req: &FullVmInfo) -> Result<()> {
let vm_id = req.vm.id.into();
// try stop, otherwise ignore error (maybe its already running)
if let Ok(j_stop) = self.stop_vm(&self.node, vm_id).await {
if let Err(e) = self.wait_for_task(&j_stop).await {
warn!("Failed to stop vm: {}", e);
}
}
// unlink the existing main disk
self.unlink_disk(&self.node, vm_id, vec!["scsi0".to_string()], true)
.await?;
// TODO: rollback
self.wait_for_task(&j_resize).await?;
// import disk from template again
self.import_template_disk(&req).await?;
// try start, otherwise ignore error (maybe its already running)
if let Ok(j_start) = self.start_vm(&self.node, vm_id).await {
@ -585,6 +662,70 @@ impl VmHostClient for ProxmoxClient {
.await?;
Ok(r.into_iter().map(TimeSeriesData::from).collect())
}
async fn connect_terminal(&self, vm: &Vm) -> Result<TerminalStream> {
// the proxmox api for terminal connection is weird and doesn't work
// when I tested it, using ssh instead to run qm terminal command
if let Some(ssh_config) = &self.ssh {
let mut ses = SshClient::new()?;
ses.connect(
(self.api.base.host().unwrap().to_string(), 22),
&ssh_config.user,
&ssh_config.key,
)
.await?;
let vm_id: ProxmoxVmId = vm.id.into();
let sock_path = PathBuf::from(&format!("/var/run/qemu-server/{}.serial0", vm_id));
let mut chan = ses.tunnel_unix_socket(&sock_path)?;
let (mut client_tx, client_rx) = channel::<Vec<u8>>(1024);
let (server_tx, mut server_rx) = channel::<Vec<u8>>(1024);
let shutdown = Arc::new(AtomicBool::new(false));
let shut_chan = shutdown.clone();
tokio::spawn(async move {
let mut w_buf = vec![0; 4096];
// fire calls to read every 100ms
let mut chan_timer = time::interval(Duration::from_millis(100));
loop {
tokio::select! {
Some(buf) = server_rx.recv() => {
if let Err(e) = chan.write_all(&buf) {
error!("Failed to send data: {}", e);
}
}
_ = chan_timer.tick() => {
if chan.eof() {
info!("SSH connection terminated!");
shut_chan.store(true, Ordering::Relaxed);
break;
}
let r_window = chan.read_window();
let mut stream = chan.stream(0);
if let Ok(r) = stream.read(w_buf.as_mut_slice()) {
if r > 0 {
if let Err(e) = client_tx.send(w_buf[..r].to_vec()).await {
error!("Failed to write data: {}", e);
}
}
}
}
}
}
info!("SSH connection terminated!");
});
return Ok(TerminalStream{
shutdown,
rx: client_rx,
tx: server_tx,
});
}
bail!("Cannot use terminal proxy without ssh")
}
}
/// Wrap a database vm id
@ -977,3 +1118,122 @@ impl From<RrdDataPoint> for TimeSeriesData {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{GB, MB, TB};
use lnvps_db::{
DiskInterface, IpRange, OsDistribution, UserSshKey, VmHostDisk, VmIpAssignment, VmTemplate,
};
#[test]
fn test_config() -> Result<()> {
let template = VmTemplate {
id: 1,
name: "example".to_string(),
enabled: true,
created: Default::default(),
expires: None,
cpu: 2,
memory: 2 * GB,
disk_size: 100 * GB,
disk_type: DiskType::SSD,
disk_interface: DiskInterface::PCIe,
cost_plan_id: 1,
region_id: 1,
};
let cfg = FullVmInfo {
vm: Vm {
id: 1,
host_id: 1,
user_id: 1,
image_id: 1,
template_id: Some(template.id),
custom_template_id: None,
ssh_key_id: 1,
created: Default::default(),
expires: Default::default(),
disk_id: 1,
mac_address: "ff:ff:ff:ff:ff:fe".to_string(),
deleted: false,
ref_code: None,
},
disk: VmHostDisk {
id: 1,
host_id: 1,
name: "ssd".to_string(),
size: TB * 20,
kind: DiskType::SSD,
interface: DiskInterface::PCIe,
enabled: true,
},
template: Some(template.clone()),
custom_template: None,
image: VmOsImage {
id: 1,
distribution: OsDistribution::Ubuntu,
flavour: "Server".to_string(),
version: "24.04.03".to_string(),
enabled: true,
release_date: Utc::now(),
url: "http://localhost.com/ubuntu_server_24.04.img".to_string(),
},
ips: vec![VmIpAssignment {
id: 1,
vm_id: 1,
ip_range_id: 1,
ip: "192.168.1.2".to_string(),
deleted: false,
arp_ref: None,
dns_forward: None,
dns_forward_ref: None,
dns_reverse: None,
dns_reverse_ref: None,
}],
ranges: vec![IpRange {
id: 1,
cidr: "192.168.1.0/24".to_string(),
gateway: "192.168.1.1/16".to_string(),
enabled: true,
region_id: 1,
}],
ssh_key: UserSshKey {
id: 1,
name: "test".to_string(),
user_id: 1,
created: Default::default(),
key_data: "ssh-ed25519 AAA=".to_string(),
},
};
let q_cfg = QemuConfig {
machine: "q35".to_string(),
os_type: "l26".to_string(),
bridge: "vmbr1".to_string(),
cpu: "kvm64".to_string(),
vlan: Some(100),
kvm: true,
};
let p = ProxmoxClient::new(
"http://localhost:8006".parse()?,
"",
"",
None,
q_cfg.clone(),
None,
);
let vm = p.make_config(&cfg)?;
assert_eq!(vm.cpu, Some(q_cfg.cpu));
assert_eq!(vm.cores, Some(template.cpu as i32));
assert_eq!(vm.memory, Some((template.memory / MB).to_string()));
assert_eq!(vm.on_boot, Some(true));
assert_eq!(
vm.ip_config,
Some("ip=192.168.1.2/16,gw=192.168.1.1,ip6=auto".to_string())
);
Ok(())
}
}

View File

@ -1,64 +0,0 @@
use crate::lightning::{InvoiceUpdate, LightningNode};
use crate::worker::WorkJob;
use anyhow::Result;
use lnvps_db::LNVpsDb;
use log::{error, info, warn};
use nostr::util::hex;
use rocket::futures::StreamExt;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
pub struct InvoiceHandler {
node: Arc<dyn LightningNode>,
db: Arc<dyn LNVpsDb>,
tx: UnboundedSender<WorkJob>,
}
impl InvoiceHandler {
pub fn new(
node: Arc<dyn LightningNode>,
db: Arc<dyn LNVpsDb>,
tx: UnboundedSender<WorkJob>,
) -> Self {
Self { node, tx, db }
}
async fn mark_paid(&self, settle_index: u64, id: &Vec<u8>) -> Result<()> {
let mut p = self.db.get_vm_payment(id).await?;
p.settle_index = Some(settle_index);
self.db.vm_payment_paid(&p).await?;
info!("VM payment {} for {}, paid", hex::encode(p.id), p.vm_id);
self.tx.send(WorkJob::CheckVm { vm_id: p.vm_id })?;
Ok(())
}
pub async fn listen(&mut self) -> Result<()> {
let from_ph = self.db.last_paid_invoice().await?.map(|i| i.id.clone());
info!(
"Listening for invoices from {}",
from_ph
.as_ref()
.map(hex::encode)
.unwrap_or("NOW".to_string())
);
let mut handler = self.node.subscribe_invoices(from_ph).await?;
while let Some(msg) = handler.next().await {
match msg {
InvoiceUpdate::Settled {
payment_hash,
settle_index,
} => {
let r_hash = hex::decode(payment_hash)?;
if let Err(e) = self.mark_paid(settle_index, &r_hash).await {
error!("{}", e);
}
}
v => warn!("Unknown invoice update: {:?}", v),
}
}
Ok(())
}
}

View File

@ -1,21 +1,26 @@
use anyhow::bail;
use log::debug;
use reqwest::header::{HeaderMap, AUTHORIZATION};
use reqwest::header::{HeaderMap, ACCEPT, AUTHORIZATION};
use reqwest::{Client, Method, Url};
use serde::de::DeserializeOwned;
use serde::Serialize;
#[derive(Clone)]
pub struct JsonApi {
pub client: Client,
pub base: Url,
}
impl JsonApi {
pub fn token(base: &str, token: &str) -> anyhow::Result<Self> {
pub fn token(base: &str, token: &str, allow_invalid_certs: bool) -> anyhow::Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(AUTHORIZATION, token.parse()?);
headers.insert(ACCEPT, "application/json".parse()?);
let client = Client::builder().default_headers(headers).build()?;
let client = Client::builder()
.danger_accept_invalid_certs(allow_invalid_certs)
.default_headers(headers)
.build()?;
Ok(Self {
client,
base: base.parse()?,
@ -56,7 +61,6 @@ impl JsonApi {
.client
.request(method.clone(), self.base.join(path)?)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.body(body)
.send()
.await?;
@ -70,4 +74,31 @@ impl JsonApi {
bail!("{} {}: {}: {}", method, path, status, &text);
}
}
/// Make a request and only return the status code
pub async fn req_status<R: Serialize>(
&self,
method: Method,
path: &str,
body: R,
) -> anyhow::Result<u16> {
let body = serde_json::to_string(&body)?;
debug!(">> {} {}: {}", method.clone(), path, &body);
let rsp = self
.client
.request(method.clone(), self.base.join(path)?)
.header("Content-Type", "application/json")
.body(body)
.send()
.await?;
let status = rsp.status();
let text = rsp.text().await?;
#[cfg(debug_assertions)]
debug!("<< {}", text);
if status.is_success() {
Ok(status.as_u16())
} else {
bail!("{} {}: {}: {}", method, path, status, &text);
}
}
}

View File

@ -1,12 +1,14 @@
pub mod api;
pub mod cors;
pub mod data_migration;
pub mod dns;
pub mod exchange;
pub mod fiat;
pub mod host;
pub mod invoice;
pub mod json_api;
pub mod lightning;
pub mod nip98;
pub mod payments;
pub mod provisioner;
pub mod router;
pub mod settings;
@ -17,3 +19,13 @@ pub mod worker;
#[cfg(test)]
pub mod mocks;
#[cfg(feature = "nostr-dvm")]
pub mod dvm;
/// SATS per BTC
pub const BTC_SATS: f64 = 100_000_000.0;
pub const KB: u64 = 1024;
pub const MB: u64 = KB * 1024;
pub const GB: u64 = MB * 1024;
pub const TB: u64 = GB * 1024;

View File

@ -1,9 +1,11 @@
use crate::api::WEBHOOK_BRIDGE;
use crate::api::{WebhookMessage, WEBHOOK_BRIDGE};
use crate::json_api::JsonApi;
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
use anyhow::bail;
use anyhow::{anyhow, bail};
use futures::{Stream, StreamExt};
use hmac::{Hmac, Mac};
use lnvps_db::async_trait;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use tokio_stream::wrappers::BroadcastStream;
@ -17,7 +19,7 @@ impl BitvoraNode {
pub fn new(api_token: &str, webhook_secret: &str) -> Self {
let auth = format!("Bearer {}", api_token);
Self {
api: JsonApi::token("https://api.bitvora.com/", &auth).unwrap(),
api: JsonApi::token("https://api.bitvora.com/", &auth, false).unwrap(),
webhook_secret: webhook_secret.to_string(),
}
}
@ -46,6 +48,7 @@ impl LightningNode for BitvoraNode {
Ok(AddInvoiceResult {
pr: rsp.data.payment_request,
payment_hash: rsp.data.r_hash,
external_id: Some(rsp.data.id),
})
}
@ -54,7 +57,46 @@ impl LightningNode for BitvoraNode {
_from_payment_hash: Option<Vec<u8>>,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = InvoiceUpdate> + Send>>> {
let rx = BroadcastStream::new(WEBHOOK_BRIDGE.listen());
let mapped = rx.then(|r| async move { InvoiceUpdate::Unknown });
let secret = self.webhook_secret.clone();
let mapped = rx.then(move |r| {
let secret = secret.clone();
async move {
match r {
Ok(r) => {
if r.endpoint != "/api/v1/webhook/bitvora" {
return InvoiceUpdate::Unknown;
}
let r_body = r.body.as_slice();
info!("Received webhook {}", String::from_utf8_lossy(r_body));
let body: BitvoraWebhook =
match serde_json::from_slice(r_body) {
Ok(b) => b,
Err(e) => return InvoiceUpdate::Error(e.to_string()),
};
if let Err(e) = verify_webhook(&secret, &r) {
return InvoiceUpdate::Error(e.to_string());
}
match body.event {
BitvoraWebhookEvent::DepositLightningComplete => {
InvoiceUpdate::Settled {
payment_hash: None,
external_id: Some(body.data.lightning_invoice_id),
}
}
BitvoraWebhookEvent::DepositLightningFailed => {
InvoiceUpdate::Error("Payment failed".to_string())
}
}
}
Err(e) => {
warn!("Error handling webhook: {}", e);
InvoiceUpdate::Error(e.to_string())
}
}
}
});
Ok(Box::pin(mapped))
}
}
@ -80,3 +122,43 @@ struct CreateInvoiceResponse {
pub r_hash: String,
pub payment_request: String,
}
#[derive(Deserialize, Debug, Clone)]
struct BitvoraWebhook {
pub event: BitvoraWebhookEvent,
pub data: BitvoraPayment,
}
#[derive(Deserialize, Debug, Clone)]
enum BitvoraWebhookEvent {
#[serde(rename = "deposit.lightning.completed")]
DepositLightningComplete,
#[serde(rename = "deposit.lightning.failed")]
DepositLightningFailed,
}
#[derive(Deserialize, Debug, Clone)]
struct BitvoraPayment {
pub id: String,
pub lightning_invoice_id: String,
}
type HmacSha256 = Hmac<sha2::Sha256>;
fn verify_webhook(secret: &str, msg: &WebhookMessage) -> anyhow::Result<()> {
let sig = msg
.headers
.get("bitvora-signature")
.ok_or_else(|| anyhow!("Missing bitvora-signature header"))?;
let mut mac = HmacSha256::new_from_slice(secret.as_bytes())?;
mac.update(msg.body.as_slice());
let result = mac.finalize().into_bytes();
if hex::encode(result) == *sig {
return Ok(());
} else {
warn!("Invalid signature found {} != {}", sig, hex::encode(result));
}
bail!("No valid signature found!");
}

View File

@ -40,6 +40,7 @@ impl LightningNode for LndNode {
Ok(AddInvoiceResult {
pr: inner.payment_request,
payment_hash: hex::encode(inner.r_hash),
external_id: None,
})
}
@ -78,8 +79,8 @@ impl LightningNode for LndNode {
Ok(m) => {
if m.state == InvoiceState::Settled as i32 {
InvoiceUpdate::Settled {
settle_index: m.settle_index,
payment_hash: hex::encode(m.r_hash),
payment_hash: Some(hex::encode(m.r_hash)),
external_id: None,
}
} else {
InvoiceUpdate::Unknown

View File

@ -31,6 +31,7 @@ pub struct AddInvoiceRequest {
pub struct AddInvoiceResult {
pub pr: String,
pub payment_hash: String,
pub external_id: Option<String>,
}
#[derive(Debug, Clone)]
@ -39,8 +40,8 @@ pub enum InvoiceUpdate {
Unknown,
Error(String),
Settled {
payment_hash: String,
settle_index: u64,
payment_hash: Option<String>,
external_id: Option<String>,
},
}

View File

@ -1,7 +1,7 @@
#![allow(unused)]
use crate::dns::{BasicRecord, DnsServer, RecordType};
use crate::exchange::{ExchangeRateService, Ticker, TickerRate};
use crate::host::{FullVmInfo, TimeSeries, TimeSeriesData, VmHostClient};
use crate::host::{FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient};
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
use crate::router::{ArpEntry, Router};
use crate::settings::NetworkPolicy;
@ -40,11 +40,6 @@ pub struct MockDb {
}
impl MockDb {
pub const KB: u64 = 1024;
pub const MB: u64 = Self::KB * 1024;
pub const GB: u64 = Self::MB * 1024;
pub const TB: u64 = Self::GB * 1024;
pub fn empty() -> MockDb {
Self {
..Default::default()
@ -71,8 +66,8 @@ impl MockDb {
created: Utc::now(),
expires: None,
cpu: 2,
memory: Self::GB * 2,
disk_size: Self::GB * 64,
memory: crate::GB * 2,
disk_size: crate::GB * 64,
disk_type: DiskType::SSD,
disk_interface: DiskInterface::PCIe,
cost_plan_id: 1,
@ -132,7 +127,7 @@ impl Default for MockDb {
name: "mock-host".to_string(),
ip: "https://localhost".to_string(),
cpu: 4,
memory: 8 * Self::GB,
memory: 8 * crate::GB,
enabled: true,
api_token: "".to_string(),
load_factor: 1.5,
@ -145,7 +140,7 @@ impl Default for MockDb {
id: 1,
host_id: 1,
name: "mock-disk".to_string(),
size: Self::TB * 10,
size: crate::TB * 10,
kind: DiskType::SSD,
interface: DiskInterface::PCIe,
enabled: true,
@ -209,6 +204,7 @@ impl LNVpsDb for MockDb {
email: None,
contact_nip17: false,
contact_email: false,
country_code: Some("USA".to_string()),
},
);
Ok(max + 1)
@ -269,11 +265,26 @@ impl LNVpsDb for MockDb {
.collect())
}
async fn list_host_region(&self) -> anyhow::Result<Vec<VmHostRegion>> {
let regions = self.regions.lock().await;
Ok(regions.values().filter(|r| r.enabled).cloned().collect())
}
async fn get_host_region(&self, id: u64) -> anyhow::Result<VmHostRegion> {
let regions = self.regions.lock().await;
Ok(regions.get(&id).ok_or(anyhow!("no region"))?.clone())
}
async fn get_host_region_by_name(&self, name: &str) -> anyhow::Result<VmHostRegion> {
let regions = self.regions.lock().await;
Ok(regions
.iter()
.find(|(_, v)| v.name == name)
.ok_or(anyhow!("no region"))?
.1
.clone())
}
async fn list_hosts(&self) -> anyhow::Result<Vec<VmHost>> {
let hosts = self.hosts.lock().await;
Ok(hosts.values().filter(|h| h.enabled).cloned().collect())
@ -518,11 +529,18 @@ impl LNVpsDb for MockDb {
.clone())
}
async fn get_vm_payment_by_ext_id(&self, id: &str) -> anyhow::Result<VmPayment> {
let p = self.payments.lock().await;
Ok(p.iter()
.find(|p| p.external_id == Some(id.to_string()))
.context("no vm_payment")?
.clone())
}
async fn update_vm_payment(&self, vm_payment: &VmPayment) -> anyhow::Result<()> {
let mut p = self.payments.lock().await;
if let Some(p) = p.iter_mut().find(|p| p.id == *vm_payment.id) {
p.is_paid = vm_payment.is_paid.clone();
p.settle_index = vm_payment.settle_index.clone();
}
Ok(())
}
@ -539,7 +557,8 @@ impl LNVpsDb for MockDb {
async fn last_paid_invoice(&self) -> anyhow::Result<Option<VmPayment>> {
let p = self.payments.lock().await;
Ok(p.iter()
.max_by(|a, b| a.settle_index.cmp(&b.settle_index))
.filter(|p| p.is_paid)
.max_by(|a, b| a.created.cmp(&b.created))
.map(|v| v.clone()))
}
@ -642,14 +661,15 @@ impl Router for MockRouter {
#[derive(Clone, Debug, Default)]
pub struct MockNode {
invoices: Arc<Mutex<HashMap<String, MockInvoice>>>,
pub invoices: Arc<Mutex<HashMap<String, MockInvoice>>>,
}
#[derive(Debug, Clone)]
struct MockInvoice {
pr: String,
expiry: DateTime<Utc>,
settle_index: u64,
pub struct MockInvoice {
pub pr: String,
pub amount: u64,
pub expiry: DateTime<Utc>,
pub is_paid: bool,
}
impl MockNode {
@ -665,7 +685,23 @@ impl MockNode {
#[async_trait]
impl LightningNode for MockNode {
async fn add_invoice(&self, req: AddInvoiceRequest) -> anyhow::Result<AddInvoiceResult> {
todo!()
let mut invoices = self.invoices.lock().await;
let id: [u8; 32] = rand::random();
let hex_id = hex::encode(id);
invoices.insert(
hex_id.clone(),
MockInvoice {
pr: format!("lnrt1{}", hex_id),
amount: req.amount,
expiry: Utc::now().add(TimeDelta::seconds(req.expire.unwrap_or(3600) as i64)),
is_paid: false,
},
);
Ok(AddInvoiceResult {
pr: format!("lnrt1{}", hex_id),
payment_hash: hex_id.clone(),
external_id: None,
})
}
async fn subscribe_invoices(
@ -747,6 +783,10 @@ impl VmHostClient for MockVmHost {
Ok(())
}
async fn reinstall_vm(&self, cfg: &FullVmInfo) -> anyhow::Result<()> {
todo!()
}
async fn get_vm_state(&self, vm: &Vm) -> anyhow::Result<VmState> {
let vms = self.vms.lock().await;
if let Some(vm) = vms.get(&vm.id) {
@ -777,6 +817,10 @@ impl VmHostClient for MockVmHost {
) -> anyhow::Result<Vec<TimeSeriesData>> {
Ok(vec![])
}
async fn connect_terminal(&self, vm: &Vm) -> anyhow::Result<TerminalStream> {
todo!()
}
}
pub struct MockDnsServer {

View File

@ -98,7 +98,7 @@ impl<'r> FromRequest<'r> for Nip98Auth {
}
let auth = Nip98Auth::from_base64(&auth[6..]).unwrap();
match auth.check(
request.uri().to_string().as_str(),
request.uri().path().to_string().as_str(),
request.method().as_str(),
) {
Ok(_) => Outcome::Success(auth),

81
src/payments/invoice.rs Normal file
View File

@ -0,0 +1,81 @@
use crate::lightning::{InvoiceUpdate, LightningNode};
use crate::worker::WorkJob;
use anyhow::Result;
use lnvps_db::{LNVpsDb, VmPayment};
use log::{error, info, warn};
use nostr::util::hex;
use rocket::futures::StreamExt;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
pub struct NodeInvoiceHandler {
node: Arc<dyn LightningNode>,
db: Arc<dyn LNVpsDb>,
tx: UnboundedSender<WorkJob>,
}
impl NodeInvoiceHandler {
pub fn new(
node: Arc<dyn LightningNode>,
db: Arc<dyn LNVpsDb>,
tx: UnboundedSender<WorkJob>,
) -> Self {
Self { node, tx, db }
}
async fn mark_paid(&self, id: &Vec<u8>) -> Result<()> {
let p = self.db.get_vm_payment(id).await?;
self.mark_payment_paid(&p).await
}
async fn mark_paid_ext_id(&self, external_id: &str) -> Result<()> {
let p = self.db.get_vm_payment_by_ext_id(external_id).await?;
self.mark_payment_paid(&p).await
}
async fn mark_payment_paid(&self, payment: &VmPayment) -> Result<()> {
self.db.vm_payment_paid(&payment).await?;
info!("VM payment {} for {}, paid", hex::encode(&payment.id), payment.vm_id);
self.tx.send(WorkJob::CheckVm { vm_id: payment.vm_id })?;
Ok(())
}
pub async fn listen(&mut self) -> Result<()> {
let from_ph = self.db.last_paid_invoice().await?.map(|i| i.id.clone());
info!(
"Listening for invoices from {}",
from_ph
.as_ref()
.map(hex::encode)
.unwrap_or("NOW".to_string())
);
let mut handler = self.node.subscribe_invoices(from_ph).await?;
while let Some(msg) = handler.next().await {
match msg {
InvoiceUpdate::Settled {
payment_hash,
external_id,
} => {
if let Some(h) = payment_hash {
let r_hash = hex::decode(h)?;
if let Err(e) = self.mark_paid(&r_hash).await {
error!("{}", e);
}
continue;
}
if let Some(e) = external_id {
if let Err(e) = self.mark_paid_ext_id(&e).await {
error!("{}", e);
}
continue;
}
}
v => warn!("Unknown invoice update: {:?}", v),
}
}
Ok(())
}
}

55
src/payments/mod.rs Normal file
View File

@ -0,0 +1,55 @@
use crate::lightning::LightningNode;
use crate::payments::invoice::NodeInvoiceHandler;
use crate::settings::Settings;
use crate::worker::WorkJob;
use anyhow::Result;
use lnvps_db::LNVpsDb;
use log::error;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::sleep;
mod invoice;
#[cfg(feature = "revolut")]
mod revolut;
pub fn listen_all_payments(
settings: &Settings,
node: Arc<dyn LightningNode>,
db: Arc<dyn LNVpsDb>,
sender: UnboundedSender<WorkJob>,
) -> Result<()> {
let mut handler = NodeInvoiceHandler::new(node.clone(), db.clone(), sender.clone());
tokio::spawn(async move {
loop {
if let Err(e) = handler.listen().await {
error!("invoice-error: {}", e);
}
sleep(Duration::from_secs(30)).await;
}
});
#[cfg(feature = "revolut")]
{
use crate::payments::revolut::RevolutPaymentHandler;
if let Some(r) = &settings.revolut {
let mut handler = RevolutPaymentHandler::new(
r.clone(),
&settings.public_url,
db.clone(),
sender.clone(),
)?;
tokio::spawn(async move {
loop {
if let Err(e) = handler.listen().await {
error!("revolut-error: {}", e);
}
sleep(Duration::from_secs(30)).await;
}
});
}
}
Ok(())
}

156
src/payments/revolut.rs Normal file
View File

@ -0,0 +1,156 @@
use crate::api::{WebhookMessage, WEBHOOK_BRIDGE};
use crate::fiat::{RevolutApi, RevolutWebhookEvent};
use crate::settings::RevolutConfig;
use crate::worker::WorkJob;
use anyhow::{anyhow, bail, Context, Result};
use hmac::{Hmac, Mac};
use isocountry::CountryCode;
use lnvps_db::LNVpsDb;
use log::{error, info, warn};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
pub struct RevolutPaymentHandler {
api: RevolutApi,
db: Arc<dyn LNVpsDb>,
sender: UnboundedSender<WorkJob>,
public_url: String,
}
impl RevolutPaymentHandler {
pub fn new(
settings: RevolutConfig,
public_url: &str,
db: Arc<dyn LNVpsDb>,
sender: UnboundedSender<WorkJob>,
) -> Result<Self> {
Ok(Self {
api: RevolutApi::new(settings)?,
public_url: public_url.to_string(),
db,
sender,
})
}
pub async fn listen(&mut self) -> Result<()> {
let this_webhook = Url::parse(&self.public_url)?.join("/api/v1/webhook/revolut")?;
let webhooks = self.api.list_webhooks().await?;
for wh in webhooks {
info!("Deleting old webhook: {} {}", wh.id, wh.url);
self.api.delete_webhook(&wh.id).await?
}
info!("Setting up webhook for '{}'", this_webhook);
let wh = self
.api
.create_webhook(
this_webhook.as_str(),
vec![
RevolutWebhookEvent::OrderCompleted,
RevolutWebhookEvent::OrderAuthorised,
],
)
.await?;
let secret = wh.signing_secret.context("Signing secret is missing")?;
// listen to events
let mut listenr = WEBHOOK_BRIDGE.listen();
while let Ok(m) = listenr.recv().await {
if m.endpoint != "/api/v1/webhook/revolut" {
continue;
}
let body: RevolutWebhook = serde_json::from_slice(m.body.as_slice())?;
info!("Received webhook {:?}", body);
if let Err(e) = verify_webhook(&secret, &m) {
error!("Signature verification failed: {}", e);
continue;
}
if let RevolutWebhookEvent::OrderCompleted = body.event {
if let Err(e) = self.try_complete_payment(&body.order_id).await {
error!("Failed to complete order: {}", e);
}
}
}
Ok(())
}
async fn try_complete_payment(&self, ext_id: &str) -> Result<()> {
let mut p = self.db.get_vm_payment_by_ext_id(ext_id).await?;
// save payment state json into external_data
// TODO: encrypt payment_data
let order = self.api.get_order(ext_id).await?;
p.external_data = serde_json::to_string(&order)?;
// check user country matches card country
if let Some(cc) = order
.payments
.and_then(|p| p.first().cloned())
.and_then(|p| p.payment_method)
.and_then(|p| p.card_country_code)
.and_then(|c| CountryCode::for_alpha2(&c).ok())
{
let vm = self.db.get_vm(p.vm_id).await?;
let mut user = self.db.get_user(vm.user_id).await?;
if user.country_code.is_none() {
// update user country code to match card country
user.country_code = Some(cc.alpha3().to_string());
self.db.update_user(&user).await?;
}
}
self.db.vm_payment_paid(&p).await?;
self.sender.send(WorkJob::CheckVm { vm_id: p.vm_id })?;
info!("VM payment {} for {}, paid", hex::encode(p.id), p.vm_id);
Ok(())
}
}
type HmacSha256 = Hmac<sha2::Sha256>;
fn verify_webhook(secret: &str, msg: &WebhookMessage) -> Result<()> {
let sig = msg
.headers
.get("revolut-signature")
.ok_or_else(|| anyhow!("Missing Revolut-Signature header"))?;
let timestamp = msg
.headers
.get("revolut-request-timestamp")
.ok_or_else(|| anyhow!("Missing Revolut-Request-Timestamp header"))?;
// check if any signatures match
for sig in sig.split(",") {
let mut sig_split = sig.split("=");
let (version, code) = (
sig_split.next().context("Invalid signature format")?,
sig_split.next().context("Invalid signature format")?,
);
let mut mac = HmacSha256::new_from_slice(secret.as_bytes())?;
mac.update(version.as_bytes());
mac.update(b".");
mac.update(timestamp.as_bytes());
mac.update(b".");
mac.update(msg.body.as_slice());
let result = mac.finalize().into_bytes();
if hex::encode(result) == code {
return Ok(());
} else {
warn!(
"Invalid signature found {} != {}",
code,
hex::encode(result)
);
}
}
bail!("No valid signature found!");
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct RevolutWebhook {
pub event: RevolutWebhookEvent,
pub order_id: String,
pub merchant_order_ext_ref: Option<String>,
}

View File

@ -145,8 +145,8 @@ impl HostCapacityService {
.map(|s| {
let usage = vm_resources
.iter()
.filter(|(k, v)| s.id == v.disk_id)
.fold(0, |acc, (k, v)| acc + v.disk);
.filter(|(_k, v)| s.id == v.disk_id)
.fold(0, |acc, (_k, v)| acc + v.disk);
DiskCapacity {
load_factor: host.load_factor,
disk: s.clone(),

View File

@ -1,5 +1,6 @@
use crate::dns::{BasicRecord, DnsServer};
use crate::exchange::{ExchangeRateService, Ticker};
use crate::exchange::{Currency, CurrencyAmount, ExchangeRateService};
use crate::fiat::FiatPaymentService;
use crate::host::{get_host_client, FullVmInfo};
use crate::lightning::{AddInvoiceRequest, LightningNode};
use crate::provisioner::{
@ -8,10 +9,12 @@ use crate::provisioner::{
use crate::router::{ArpEntry, Router};
use crate::settings::{NetworkAccessPolicy, NetworkPolicy, ProvisionerConfig, Settings};
use anyhow::{bail, ensure, Context, Result};
use chrono::{Days, Months, Utc};
use lnvps_db::{DiskType, LNVpsDb, Vm, VmCostPlanIntervalType, VmCustomTemplate, VmIpAssignment, VmPayment};
use chrono::Utc;
use isocountry::CountryCode;
use lnvps_db::{LNVpsDb, PaymentMethod, User, Vm, VmCustomTemplate, VmIpAssignment, VmPayment};
use log::{info, warn};
use nostr::util::hex;
use std::collections::HashMap;
use std::ops::Add;
use std::sync::Arc;
use std::time::Duration;
@ -25,9 +28,11 @@ pub struct LNVpsProvisioner {
db: Arc<dyn LNVpsDb>,
node: Arc<dyn LightningNode>,
rates: Arc<dyn ExchangeRateService>,
tax_rates: HashMap<CountryCode, f32>,
router: Option<Arc<dyn Router>>,
dns: Option<Arc<dyn DnsServer>>,
revolut: Option<Arc<dyn FiatPaymentService>>,
network_policy: NetworkPolicy,
provisioner_config: ProvisionerConfig,
@ -46,6 +51,8 @@ impl LNVpsProvisioner {
rates,
router: settings.get_router().expect("router config"),
dns: settings.get_dns().expect("dns config"),
revolut: settings.get_revolut().expect("revolut config"),
tax_rates: settings.tax_rate,
network_policy: settings.network_policy,
provisioner_config: settings.provisioner,
read_only: settings.read_only,
@ -235,6 +242,11 @@ impl LNVpsProvisioner {
Ok(())
}
/// Get database handle
pub fn get_db(&self) -> Arc<dyn LNVpsDb> {
self.db.clone()
}
/// Provision a new VM for a user on the database
///
/// Note:
@ -255,7 +267,9 @@ impl LNVpsProvisioner {
// TODO: cache capacity somewhere
let cap = HostCapacityService::new(self.db.clone());
let host = cap.get_host_for_template(template.region_id, &template).await?;
let host = cap
.get_host_for_template(template.region_id, &template)
.await?;
let pick_disk = if let Some(hd) = host.disks.first() {
hd
@ -308,7 +322,9 @@ impl LNVpsProvisioner {
// TODO: cache capacity somewhere
let cap = HostCapacityService::new(self.db.clone());
let host = cap.get_host_for_template(pricing.region_id, &template).await?;
let host = cap
.get_host_for_template(pricing.region_id, &template)
.await?;
let pick_disk = if let Some(hd) = host.disks.first() {
hd
@ -345,40 +361,90 @@ impl LNVpsProvisioner {
}
/// Create a renewal payment
pub async fn renew(&self, vm_id: u64) -> Result<VmPayment> {
let pe = PricingEngine::new(self.db.clone(), self.rates.clone());
pub async fn renew(&self, vm_id: u64, method: PaymentMethod) -> Result<VmPayment> {
let pe = PricingEngine::new(self.db.clone(), self.rates.clone(), self.tax_rates.clone());
let price = pe.get_vm_cost(vm_id).await?;
let price = pe.get_vm_cost(vm_id, method).await?;
match price {
CostResult::Existing(p) => Ok(p),
CostResult::New {
msats,
amount,
currency,
time_value,
new_expiry,
rate,
tax,
} => {
const INVOICE_EXPIRE: u64 = 600;
info!("Creating invoice for {vm_id} for {} sats", msats / 1000);
let invoice = self
.node
.add_invoice(AddInvoiceRequest {
memo: Some(format!("VM renewal {vm_id} to {new_expiry}")),
amount: msats,
expire: Some(INVOICE_EXPIRE as u32),
})
.await?;
let vm_payment = VmPayment {
id: hex::decode(invoice.payment_hash)?,
vm_id,
created: Utc::now(),
expires: Utc::now().add(Duration::from_secs(INVOICE_EXPIRE)),
amount: msats,
invoice: invoice.pr,
time_value,
is_paid: false,
rate,
settle_index: None,
let desc = format!("VM renewal {vm_id} to {new_expiry}");
let vm_payment = match method {
PaymentMethod::Lightning => {
ensure!(
currency == Currency::BTC,
"Cannot create invoices for non-BTC currency"
);
const INVOICE_EXPIRE: u64 = 600;
let total_amount = amount + tax;
info!(
"Creating invoice for {vm_id} for {} sats",
total_amount / 1000
);
let invoice = self
.node
.add_invoice(AddInvoiceRequest {
memo: Some(desc),
amount: total_amount,
expire: Some(INVOICE_EXPIRE as u32),
})
.await?;
VmPayment {
id: hex::decode(invoice.payment_hash)?,
vm_id,
created: Utc::now(),
expires: Utc::now().add(Duration::from_secs(INVOICE_EXPIRE)),
amount,
tax,
currency: currency.to_string(),
payment_method: method,
time_value,
is_paid: false,
rate,
external_data: invoice.pr,
external_id: invoice.external_id,
}
}
PaymentMethod::Revolut => {
let rev = if let Some(r) = &self.revolut {
r
} else {
bail!("Revolut not configured")
};
ensure!(
currency != Currency::BTC,
"Cannot create revolut orders for BTC currency"
);
let order = rev
.create_order(&desc, CurrencyAmount::from_u64(currency, amount + tax))
.await?;
let new_id: [u8; 32] = rand::random();
VmPayment {
id: new_id.to_vec(),
vm_id,
created: Utc::now(),
expires: Utc::now().add(Duration::from_secs(3600)),
amount,
tax,
currency: currency.to_string(),
payment_method: method,
time_value,
is_paid: false,
rate,
external_data: order.raw_data,
external_id: Some(order.external_id),
}
}
PaymentMethod::Paypal => todo!(),
};
self.db.insert_vm_payment(&vm_payment).await?;
Ok(vm_payment)
@ -430,57 +496,23 @@ impl LNVpsProvisioner {
#[cfg(test)]
mod tests {
use super::*;
use crate::exchange::DefaultRateCache;
use crate::mocks::{MockDb, MockDnsServer, MockNode, MockRouter};
use crate::settings::{DnsServerConfig, LightningConfig, QemuConfig, RouterConfig};
use crate::exchange::{DefaultRateCache, Ticker};
use crate::mocks::{MockDb, MockDnsServer, MockExchangeRate, MockNode, MockRouter};
use crate::settings::{
mock_settings, DnsServerConfig, LightningConfig, QemuConfig, RouterConfig,
};
use lnvps_db::{DiskInterface, DiskType, User, UserSshKey, VmTemplate};
use std::net::IpAddr;
use std::str::FromStr;
const ROUTER_BRIDGE: &str = "bridge1";
fn settings() -> Settings {
Settings {
listen: None,
db: "".to_string(),
lightning: LightningConfig::LND {
url: "".to_string(),
cert: Default::default(),
macaroon: Default::default(),
},
read_only: false,
provisioner: ProvisionerConfig::Proxmox {
qemu: QemuConfig {
machine: "q35".to_string(),
os_type: "l26".to_string(),
bridge: "vmbr1".to_string(),
cpu: "kvm64".to_string(),
vlan: None,
kvm: false,
},
ssh: None,
mac_prefix: Some("ff:ff:ff".to_string()),
},
network_policy: NetworkPolicy {
access: NetworkAccessPolicy::StaticArp {
interface: ROUTER_BRIDGE.to_string(),
},
ip6_slaac: None,
},
delete_after: 0,
smtp: None,
router: Some(RouterConfig::Mikrotik {
url: "https://localhost".to_string(),
username: "admin".to_string(),
password: "password123".to_string(),
}),
dns: Some(DnsServerConfig::Cloudflare {
token: "abc".to_string(),
forward_zone_id: "123".to_string(),
reverse_zone_id: "456".to_string(),
}),
nostr: None,
}
pub fn settings() -> Settings {
let mut settings = mock_settings();
settings.network_policy.access = NetworkAccessPolicy::StaticArp {
interface: ROUTER_BRIDGE.to_string(),
};
settings
}
async fn add_user(db: &Arc<MockDb>) -> Result<(User, UserSshKey)> {
@ -504,7 +536,10 @@ mod tests {
let settings = settings();
let db = Arc::new(MockDb::default());
let node = Arc::new(MockNode::default());
let rates = Arc::new(DefaultRateCache::default());
let rates = Arc::new(MockExchangeRate::new());
const MOCK_RATE: f32 = 69_420.0;
rates.set_rate(Ticker::btc_rate("EUR")?, MOCK_RATE).await;
let router = MockRouter::new(settings.network_policy.clone());
let dns = MockDnsServer::new();
let provisioner = LNVpsProvisioner::new(settings, db.clone(), node.clone(), rates.clone());
@ -514,6 +549,21 @@ mod tests {
.provision(user.id, 1, 1, ssh_key.id, Some("mock-ref".to_string()))
.await?;
println!("{:?}", vm);
// renew vm
let payment = provisioner.renew(vm.id, PaymentMethod::Lightning).await?;
assert_eq!(vm.id, payment.vm_id);
assert_eq!(payment.tax, (payment.amount as f64 * 0.01).floor() as u64);
// check invoice amount matches amount+tax
let inv = node.invoices.lock().await;
if let Some(i) = inv.get(&hex::encode(payment.id)) {
assert_eq!(i.amount, payment.amount + payment.tax);
} else {
bail!("Invoice doesnt exist");
}
// spawn vm
provisioner.spawn_vm(vm.id).await?;
// check resources
@ -581,8 +631,8 @@ mod tests {
created: Default::default(),
expires: None,
cpu: 64,
memory: 512 * MockDb::GB,
disk_size: 20 * MockDb::TB,
memory: 512 * crate::GB,
disk_size: 20 * crate::TB,
disk_type: DiskType::SSD,
disk_interface: DiskInterface::PCIe,
cost_plan_id: 1,

View File

@ -59,4 +59,4 @@ impl Template for VmCustomTemplate {
fn disk_interface(&self) -> DiskInterface {
self.disk_interface
}
}
}

View File

@ -1,9 +1,13 @@
use crate::exchange::{Currency, ExchangeRateService, Ticker};
use anyhow::{bail, Context, Result};
use crate::exchange::{Currency, CurrencyAmount, ExchangeRateService, Ticker, TickerRate};
use anyhow::{bail, Result};
use chrono::{DateTime, Days, Months, TimeDelta, Utc};
use ipnetwork::IpNetwork;
use lnvps_db::{LNVpsDb, Vm, VmCostPlan, VmCostPlanIntervalType, VmCustomTemplate, VmPayment};
use isocountry::CountryCode;
use lnvps_db::{
LNVpsDb, PaymentMethod, Vm, VmCostPlan, VmCostPlanIntervalType, VmCustomTemplate, VmPayment,
};
use log::info;
use std::collections::HashMap;
use std::ops::Add;
use std::str::FromStr;
use std::sync::Arc;
@ -14,36 +18,39 @@ use std::sync::Arc;
pub struct PricingEngine {
db: Arc<dyn LNVpsDb>,
rates: Arc<dyn ExchangeRateService>,
tax_rates: HashMap<CountryCode, f32>,
}
impl PricingEngine {
/// SATS per BTC
const BTC_SATS: f64 = 100_000_000.0;
const KB: u64 = 1024;
const MB: u64 = Self::KB * 1024;
const GB: u64 = Self::MB * 1024;
pub fn new(db: Arc<dyn LNVpsDb>, rates: Arc<dyn ExchangeRateService>) -> Self {
Self { db, rates }
pub fn new(
db: Arc<dyn LNVpsDb>,
rates: Arc<dyn ExchangeRateService>,
tax_rates: HashMap<CountryCode, f32>,
) -> Self {
Self {
db,
rates,
tax_rates,
}
}
/// Get VM cost (for renewal)
pub async fn get_vm_cost(&self, vm_id: u64) -> Result<CostResult> {
pub async fn get_vm_cost(&self, vm_id: u64, method: PaymentMethod) -> Result<CostResult> {
let vm = self.db.get_vm(vm_id).await?;
// Reuse existing payment until expired
let payments = self.db.list_vm_payment(vm.id).await?;
if let Some(px) = payments
.into_iter()
.find(|p| p.expires > Utc::now() && !p.is_paid)
.find(|p| p.expires > Utc::now() && !p.is_paid && p.payment_method == method)
{
return Ok(CostResult::Existing(px));
}
if vm.template_id.is_some() {
Ok(self.get_template_vm_cost(&vm).await?)
Ok(self.get_template_vm_cost(&vm, method).await?)
} else {
Ok(self.get_custom_vm_cost(&vm).await?)
Ok(self.get_custom_vm_cost(&vm, method).await?)
}
}
@ -80,9 +87,9 @@ impl PricingEngine {
} else {
bail!("No disk price found")
};
let disk_cost = (template.disk_size / Self::GB) as f32 * disk_pricing.cost;
let disk_cost = (template.disk_size / crate::GB) as f32 * disk_pricing.cost;
let cpu_cost = pricing.cpu_cost * template.cpu as f32;
let memory_cost = pricing.memory_cost * (template.memory / Self::GB) as f32;
let memory_cost = pricing.memory_cost * (template.memory / crate::GB) as f32;
let ip4_cost = pricing.ip4_cost * v4s as f32;
let ip6_cost = pricing.ip6_cost * v6s as f32;
@ -101,7 +108,7 @@ impl PricingEngine {
})
}
async fn get_custom_vm_cost(&self, vm: &Vm) -> Result<CostResult> {
async fn get_custom_vm_cost(&self, vm: &Vm, method: PaymentMethod) -> Result<CostResult> {
let template_id = if let Some(i) = vm.custom_template_id {
i
} else {
@ -114,26 +121,49 @@ impl PricingEngine {
// custom templates are always 1-month intervals
let time_value = (vm.expires.add(Months::new(1)) - vm.expires).num_seconds() as u64;
let (cost_msats, rate) = self.get_msats_amount(price.currency, price.total()).await?;
let (currency, amount, rate) = self
.get_amount_and_rate(
CurrencyAmount::from_f32(price.currency, price.total()),
method,
)
.await?;
Ok(CostResult::New {
msats: cost_msats,
amount,
tax: self.get_tax_for_user(vm.user_id, amount).await?,
currency,
rate,
time_value,
new_expiry: vm.expires.add(TimeDelta::seconds(time_value as i64)),
})
}
async fn get_msats_amount(&self, currency: Currency, amount: f32) -> Result<(u64, f32)> {
async fn get_tax_for_user(&self, user_id: u64, amount: u64) -> Result<u64> {
let user = self.db.get_user(user_id).await?;
if let Some(cc) = user
.country_code
.and_then(|c| CountryCode::for_alpha3(&c).ok())
{
if let Some(c) = self.tax_rates.get(&cc) {
return Ok((amount as f64 * (*c as f64 / 100f64)).floor() as u64);
}
}
Ok(0)
}
async fn get_ticker(&self, currency: Currency) -> Result<TickerRate> {
let ticker = Ticker(Currency::BTC, currency);
let rate = if let Some(r) = self.rates.get_rate(ticker).await {
r
if let Some(r) = self.rates.get_rate(ticker).await {
Ok(TickerRate(ticker, r))
} else {
bail!("No exchange rate found")
};
}
}
let cost_btc = amount / rate;
let cost_msats = (cost_btc as f64 * Self::BTC_SATS) as u64 * 1000;
Ok((cost_msats, rate))
async fn get_msats_amount(&self, amount: CurrencyAmount) -> Result<(u64, f32)> {
let rate = self.get_ticker(amount.0).await?;
let cost_btc = amount.value_f32() / rate.1;
let cost_msats = (cost_btc as f64 * crate::BTC_SATS) as u64 * 1000;
Ok((cost_msats, rate.1))
}
fn next_template_expire(vm: &Vm, cost_plan: &VmCostPlan) -> u64 {
@ -150,7 +180,7 @@ impl PricingEngine {
(next_expire - vm.expires).num_seconds() as u64
}
async fn get_template_vm_cost(&self, vm: &Vm) -> Result<CostResult> {
async fn get_template_vm_cost(&self, vm: &Vm, method: PaymentMethod) -> Result<CostResult> {
let template_id = if let Some(i) = vm.template_id {
i
} else {
@ -159,20 +189,37 @@ impl PricingEngine {
let template = self.db.get_vm_template(template_id).await?;
let cost_plan = self.db.get_cost_plan(template.cost_plan_id).await?;
let (cost_msats, rate) = self
.get_msats_amount(
cost_plan.currency.parse().expect("Invalid currency"),
cost_plan.amount,
)
let currency = cost_plan.currency.parse().expect("Invalid currency");
let (currency, amount, rate) = self
.get_amount_and_rate(CurrencyAmount::from_f32(currency, cost_plan.amount), method)
.await?;
let time_value = Self::next_template_expire(&vm, &cost_plan);
let time_value = Self::next_template_expire(vm, &cost_plan);
Ok(CostResult::New {
msats: cost_msats,
amount,
tax: self.get_tax_for_user(vm.user_id, amount).await?,
currency,
rate,
time_value,
new_expiry: vm.expires.add(TimeDelta::seconds(time_value as i64)),
})
}
async fn get_amount_and_rate(
&self,
list_price: CurrencyAmount,
method: PaymentMethod,
) -> Result<(Currency, u64, f32)> {
Ok(match (list_price.0, method) {
(c, PaymentMethod::Lightning) if c != Currency::BTC => {
let new_price = self.get_msats_amount(list_price).await?;
(Currency::BTC, new_price.0, new_price.1)
}
(cur, PaymentMethod::Revolut) if cur != Currency::BTC => {
(cur, list_price.value(), 0.01)
}
(c, m) => bail!("Cannot create payment for method {} and currency {}", m, c),
})
}
}
#[derive(Clone)]
@ -181,14 +228,18 @@ pub enum CostResult {
Existing(VmPayment),
/// A new payment can be created with the specified amount
New {
/// The cost in milli-sats
msats: u64,
/// The cost
amount: u64,
/// Currency
currency: Currency,
/// The exchange rate used to calculate the price
rate: f32,
/// The time to extend the vm expiry in seconds
time_value: u64,
/// The absolute expiry time of the vm if renewed
new_expiry: DateTime<Utc>,
/// Taxes to charge
tax: u64,
},
}
@ -212,8 +263,7 @@ impl PricingData {
mod tests {
use super::*;
use crate::mocks::{MockDb, MockExchangeRate};
use lnvps_db::{DiskType, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate};
const GB: u64 = 1024 * 1024 * 1024;
use lnvps_db::{DiskType, User, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate};
const MOCK_RATE: f32 = 100_000.0;
async fn add_custom_pricing(db: &MockDb) {
@ -240,8 +290,8 @@ mod tests {
VmCustomTemplate {
id: 1,
cpu: 2,
memory: 2 * GB,
disk_size: 80 * GB,
memory: 2 * crate::GB,
disk_size: 80 * crate::GB,
disk_type: DiskType::SSD,
disk_interface: Default::default(),
pricing_id: 1,
@ -287,19 +337,67 @@ mod tests {
{
let mut v = db.vms.lock().await;
v.insert(1, MockDb::mock_vm());
v.insert(
2,
Vm {
user_id: 2,
..MockDb::mock_vm()
},
);
let mut u = db.users.lock().await;
u.insert(
1,
User {
id: 1,
pubkey: vec![],
created: Default::default(),
email: None,
contact_nip17: false,
contact_email: false,
country_code: Some("USA".to_string()),
},
);
u.insert(
2,
User {
id: 2,
pubkey: vec![],
created: Default::default(),
email: None,
contact_nip17: false,
contact_email: false,
country_code: Some("IRL".to_string()),
},
);
}
let db: Arc<dyn LNVpsDb> = Arc::new(db);
let pe = PricingEngine::new(db.clone(), rates);
let price = pe.get_vm_cost(1).await?;
let taxes = HashMap::from([(CountryCode::IRL, 23.0)]);
let pe = PricingEngine::new(db.clone(), rates, taxes);
let plan = MockDb::mock_cost_plan();
let price = pe.get_vm_cost(1, PaymentMethod::Lightning).await?;
match price {
CostResult::Existing(_) => bail!("??"),
CostResult::New { msats, .. } => {
CostResult::New { amount, tax, .. } => {
let expect_price = (plan.amount / MOCK_RATE * 1.0e11) as u64;
assert_eq!(expect_price, msats);
assert_eq!(expect_price, amount);
assert_eq!(0, tax);
}
_ => bail!("??"),
}
// with taxes
let price = pe.get_vm_cost(2, PaymentMethod::Lightning).await?;
match price {
CostResult::New { amount, tax, .. } => {
let expect_price = (plan.amount / MOCK_RATE * 1.0e11) as u64;
assert_eq!(expect_price, amount);
assert_eq!((expect_price as f64 * 0.23).floor() as u64, tax);
}
_ => bail!("??"),
}
Ok(())

View File

@ -19,7 +19,7 @@ impl MikrotikRouter {
STANDARD.encode(format!("{}:{}", username, password))
);
Self {
api: JsonApi::token(url, &auth).unwrap(),
api: JsonApi::token(url, &auth, true).unwrap(),
}
}
}

View File

@ -1,20 +1,29 @@
use crate::dns::DnsServer;
use crate::exchange::ExchangeRateService;
use crate::fiat::FiatPaymentService;
use crate::lightning::LightningNode;
use crate::provisioner::LNVpsProvisioner;
use crate::router::Router;
use anyhow::Result;
use isocountry::CountryCode;
use lnvps_db::LNVpsDb;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct Settings {
/// Listen address for http server
pub listen: Option<String>,
/// MYSQL connection string
pub db: String,
/// Public URL mapping to this service
pub public_url: String,
/// Lightning node config for creating LN payments
pub lightning: LightningConfig,
@ -24,8 +33,8 @@ pub struct Settings {
/// Provisioning profiles
pub provisioner: ProvisionerConfig,
/// Network policy
#[serde(default)]
/// Network policy
pub network_policy: NetworkPolicy,
/// Number of days after an expired VM is deleted
@ -42,6 +51,13 @@ pub struct Settings {
/// Nostr config for sending DMs
pub nostr: Option<NostrConfig>,
/// Config for accepting revolut payments
pub revolut: Option<RevolutConfig>,
#[serde(default)]
/// Tax rates to change per country as a percent of the amount
pub tax_rate: HashMap<CountryCode, f32>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@ -168,6 +184,15 @@ pub struct QemuConfig {
pub kvm: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct RevolutConfig {
pub url: Option<String>,
pub api_version: String,
pub token: String,
pub public_key: String,
}
impl Settings {
pub fn get_provisioner(
&self,
@ -226,4 +251,58 @@ impl Settings {
}
}
}
pub fn get_revolut(&self) -> Result<Option<Arc<dyn FiatPaymentService>>> {
match &self.revolut {
#[cfg(feature = "revolut")]
Some(c) => Ok(Some(Arc::new(crate::fiat::RevolutApi::new(c.clone())?))),
_ => Ok(None),
}
}
}
#[cfg(test)]
pub fn mock_settings() -> Settings {
Settings {
listen: None,
db: "".to_string(),
public_url: "http://localhost:8000".to_string(),
lightning: LightningConfig::LND {
url: "".to_string(),
cert: Default::default(),
macaroon: Default::default(),
},
read_only: false,
provisioner: ProvisionerConfig::Proxmox {
qemu: QemuConfig {
machine: "q35".to_string(),
os_type: "l26".to_string(),
bridge: "vmbr1".to_string(),
cpu: "kvm64".to_string(),
vlan: None,
kvm: false,
},
ssh: None,
mac_prefix: Some("ff:ff:ff".to_string()),
},
network_policy: NetworkPolicy {
access: NetworkAccessPolicy::Auto,
ip6_slaac: None,
},
delete_after: 0,
smtp: None,
router: Some(RouterConfig::Mikrotik {
url: "https://localhost".to_string(),
username: "admin".to_string(),
password: "password123".to_string(),
}),
dns: Some(DnsServerConfig::Cloudflare {
token: "abc".to_string(),
forward_zone_id: "123".to_string(),
reverse_zone_id: "456".to_string(),
}),
nostr: None,
revolut: None,
tax_rate: HashMap::from([(CountryCode::IRL, 23.0), (CountryCode::USA, 1.0)]),
}
}

View File

@ -1,8 +1,8 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use log::info;
use ssh2::Channel;
use std::io::Read;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use tokio::net::{TcpStream, ToSocketAddrs};
pub struct SshClient {
@ -34,6 +34,15 @@ impl SshClient {
Ok(channel)
}
pub fn tunnel_unix_socket(&mut self, remote_path: &Path) -> Result<Channel> {
self.session
.channel_direct_streamlocal(
remote_path.to_str().unwrap(),
None,
)
.map_err(|e| anyhow!(e))
}
pub async fn execute(&mut self, command: &str) -> Result<(i32, String)> {
info!("Executing command: {}", command);
let mut channel = self.session.channel_session()?;

View File

@ -251,7 +251,7 @@ impl Worker {
None,
)
.await?;
c.send_event(ev).await?;
c.send_event(&ev).await?;
}
}
Ok(())