feat: payments

feat: spawn vm after payment
This commit is contained in:
2024-11-26 13:19:28 +00:00
parent a0e49d83bd
commit ae2af2feb1
16 changed files with 646 additions and 60 deletions

View File

@ -2,6 +2,7 @@ use crate::nip98::Nip98Auth;
use crate::provisioner::Provisioner;
use lnvps_db::hydrate::Hydrate;
use lnvps_db::{LNVpsDb, UserSshKey, Vm, VmOsImage, VmPayment, VmTemplate};
use nostr::util::hex;
use rocket::serde::json::Json;
use rocket::{get, post, routes, Responder, Route, State};
use serde::{Deserialize, Serialize};
@ -10,12 +11,14 @@ use ssh_key::PublicKey;
pub fn routes() -> Vec<Route> {
routes![
v1_list_vms,
v1_get_vm,
v1_list_vm_templates,
v1_list_vm_images,
v1_list_ssh_keys,
v1_add_ssh_key,
v1_create_vm_order,
v1_renew_vm
v1_renew_vm,
v1_get_payment
]
}
@ -64,10 +67,28 @@ async fn v1_list_vms(auth: Nip98Auth, db: &State<Box<dyn LNVpsDb>>) -> ApiResult
let mut vms = db.list_user_vms(uid).await?;
for vm in &mut vms {
vm.hydrate_up(db).await?;
if let Some(t) = &mut vm.template {
t.hydrate_up(db).await?;
}
}
ApiData::ok(vms)
}
#[get("/api/v1/vm/<id>")]
async fn v1_get_vm(auth: Nip98Auth, db: &State<Box<dyn LNVpsDb>>, id: u64) -> ApiResult<Vm> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
let mut vm = db.get_vm(id).await?;
if vm.user_id != uid {
return ApiData::err("VM doesnt belong to you");
}
vm.hydrate_up(db).await?;
if let Some(t) = &mut vm.template {
t.hydrate_up(db).await?;
}
ApiData::ok(vm)
}
#[get("/api/v1/image")]
async fn v1_list_vm_images(db: &State<Box<dyn LNVpsDb>>) -> ApiResult<Vec<VmOsImage>> {
let vms = db.list_os_image().await?;
@ -156,6 +177,27 @@ async fn v1_renew_vm(
ApiData::ok(rsp)
}
#[get("/api/v1/payment/<id>")]
async fn v1_get_payment(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
id: &str,
) -> ApiResult<VmPayment> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
let id = if let Ok(i) = hex::decode(id) {
i
} else {
return ApiData::err("Invalid payment id");
};
let payment = db.get_vm_payment(&id).await?;
let vm = db.get_vm(payment.vm_id).await?;
if vm.user_id != uid {
return ApiData::err("VM does not belong to you");
}
ApiData::ok(payment)
}
#[derive(Deserialize)]
struct CreateVmRequest {
template_id: u64,

View File

@ -3,11 +3,15 @@ use config::{Config, File};
use fedimint_tonic_lnd::connect;
use lnvps::api;
use lnvps::cors::CORS;
use lnvps::provisioner::{LNVpsProvisioner, Provisioner};
use lnvps::invoice::InvoiceHandler;
use lnvps::provisioner::lnvps::LNVpsProvisioner;
use lnvps::provisioner::Provisioner;
use lnvps::worker::{WorkJob, Worker};
use lnvps_db::{LNVpsDb, LNVpsDbMysql};
use log::error;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug, Deserialize, Serialize)]
pub struct Settings {
@ -43,6 +47,39 @@ async fn main() -> Result<(), Error> {
provisioner.auto_discover().await?;
}
let mut worker = Worker::new(db.clone(), lnd.clone());
let sender = worker.sender();
tokio::spawn(async move {
loop {
if let Err(e) = worker.handle().await {
error!("worker-error: {}", e);
}
}
});
let mut handler = InvoiceHandler::new(lnd.clone(), db.clone(), sender.clone());
tokio::spawn(async move {
loop {
if let Err(e) = handler.listen().await {
error!("invoice-error: {}", e);
}
}
});
// request work every 30s to check vm status
let db_clone = db.clone();
let sender_clone = sender.clone();
tokio::spawn(async move {
loop {
if let Ok(vms) = db_clone.list_vms().await {
for vm in vms {
if let Err(e) = sender_clone.send(WorkJob::CheckVm { vm_id: vm.id }) {
error!("failed to send check vm: {}", e);
}
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
});
let db: Box<dyn LNVpsDb> = Box::new(db.clone());
let pv: Box<dyn Provisioner> = Box::new(provisioner);
if let Err(e) = rocket::build()

View File

@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{bail, Result};
use log::info;
use reqwest::{ClientBuilder, Url};
use serde::de::DeserializeOwned;
@ -72,14 +72,20 @@ impl ProxmoxClient {
}
async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
self.client
let rsp = self
.client
.get(self.base.join(path)?)
.header("Authorization", format!("PVEAPIToken={}", self.token))
.send()
.await?
.json::<T>()
.await
.map_err(anyhow::Error::new)
.await?;
let status = rsp.status();
let text = rsp.text().await?;
info!("<< {}", text);
if status.is_success() {
Ok(serde_json::from_str(&text)?)
} else {
bail!("{}", status);
}
}
async fn post<T: DeserializeOwned, R: Serialize>(&self, path: &str, body: R) -> Result<T> {
@ -87,12 +93,19 @@ impl ProxmoxClient {
.client
.post(self.base.join(path)?)
.header("Authorization", format!("PVEAPIToken={}", self.token))
.json::<R>(&body)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.body(serde_json::to_string(&body)?)
.send()
.await?;
let rsp = rsp.text().await?;
info!("<< {}", rsp);
Ok(serde_json::from_str(&rsp)?)
let status = rsp.status();
let text = rsp.text().await?;
info!("<< {}", text);
if status.is_success() {
Ok(serde_json::from_str(&text)?)
} else {
bail!("{}", status);
}
}
}

70
src/invoice.rs Normal file
View File

@ -0,0 +1,70 @@
use crate::worker::WorkJob;
use anyhow::Result;
use fedimint_tonic_lnd::lnrpc::invoice::InvoiceState;
use fedimint_tonic_lnd::lnrpc::InvoiceSubscription;
use fedimint_tonic_lnd::Client;
use lnvps_db::LNVpsDb;
use log::{error, info};
use nostr::util::hex;
use rocket::futures::StreamExt;
use tokio::sync::mpsc::UnboundedSender;
pub struct InvoiceHandler {
lnd: Client,
db: Box<dyn LNVpsDb>,
tx: UnboundedSender<WorkJob>,
}
impl InvoiceHandler {
pub fn new<D: LNVpsDb + 'static>(lnd: Client, db: D, tx: UnboundedSender<WorkJob>) -> Self {
Self {
lnd,
tx,
db: Box::new(db),
}
}
async fn mark_paid(&self, settle_index: u64, id: &Vec<u8>) -> Result<()> {
let mut p = self.db.get_vm_payment(id).await?;
p.settle_index = Some(settle_index);
self.db.vm_payment_paid(&p).await?;
info!("VM payment {} for {}, paid", hex::encode(p.id), p.vm_id);
self.tx.send(WorkJob::CheckVm { vm_id: p.vm_id })?;
Ok(())
}
pub async fn listen(&mut self) -> Result<()> {
let from_settle_index = if let Some(p) = self.db.last_paid_invoice().await? {
p.settle_index.unwrap_or(0)
} else {
0
};
info!("Listening for invoices from {from_settle_index}");
let handler = self
.lnd
.lightning()
.subscribe_invoices(InvoiceSubscription {
add_index: 0,
settle_index: from_settle_index,
})
.await?;
let mut stream = handler.into_inner();
while let Some(msg) = stream.next().await {
match msg {
Ok(i) => {
if i.state == InvoiceState::Settled as i32 {
if let Err(e) = self.mark_paid(i.settle_index, &i.r_hash).await {
error!("{}", e);
}
}
}
Err(e) => error!("{}", e),
}
}
Ok(())
}
}

View File

@ -1,5 +1,7 @@
pub mod api;
pub mod cors;
pub mod host;
mod nip98;
pub mod invoice;
pub mod nip98;
pub mod provisioner;
pub mod worker;

View File

@ -1,38 +1,30 @@
use crate::host::proxmox::ProxmoxClient;
use crate::provisioner::Provisioner;
use anyhow::{bail, Result};
use chrono::{Days, Months, Utc};
use fedimint_tonic_lnd::lnrpc::Invoice;
use fedimint_tonic_lnd::tonic::async_trait;
use fedimint_tonic_lnd::Client;
use lnvps_db::{LNVpsDb, Vm, VmCostPlanIntervalType, VmOsImage, VmPayment};
use ipnetwork::IpNetwork;
use lnvps_db::hydrate::Hydrate;
use lnvps_db::{
IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmOsImage, VmPayment,
};
use log::{info, warn};
use rocket::async_trait;
use rocket::yansi::Paint;
use rand::seq::IteratorRandom;
use std::collections::HashSet;
use std::net::IpAddr;
use std::ops::Add;
use std::path::PathBuf;
use std::time::Duration;
#[async_trait]
pub trait Provisioner: Send + Sync {
/// Provision a new VM
async fn provision(
&self,
user_id: u64,
template_id: u64,
image_id: u64,
ssh_key_id: u64,
) -> Result<Vm>;
/// Create a renewal payment
async fn renew(&self, vm_id: u64) -> Result<VmPayment>;
}
pub struct LNVpsProvisioner {
db: Box<dyn LNVpsDb>,
lnd: Client,
}
impl LNVpsProvisioner {
pub fn new(db: impl LNVpsDb + 'static, lnd: Client) -> Self {
pub fn new<D: LNVpsDb + 'static>(db: D, lnd: Client) -> Self {
Self {
db: Box::new(db),
lnd,
@ -143,6 +135,15 @@ impl Provisioner for LNVpsProvisioner {
let template = self.db.get_vm_template(vm.template_id).await?;
let cost_plan = self.db.get_cost_plan(template.cost_plan_id).await?;
/// Reuse existing payment until expired
let payments = self.db.list_vm_payment(vm.id).await?;
if let Some(px) = payments
.into_iter()
.find(|p| p.expires > Utc::now() && !p.is_paid)
{
return Ok(px);
}
// push the expiration forward by cost plan interval amount
let new_expire = match cost_plan.interval_type {
VmCostPlanIntervalType::Day => vm.expires.add(Days::new(cost_plan.interval_amount)),
@ -175,19 +176,74 @@ impl Provisioner for LNVpsProvisioner {
})
.await?;
let mut vm_payment = VmPayment {
id: 0,
let invoice = invoice.into_inner();
let vm_payment = VmPayment {
id: invoice.r_hash.clone(),
vm_id,
created: Utc::now(),
expires: Utc::now().add(Duration::from_secs(INVOICE_EXPIRE as u64)),
amount: cost,
invoice: invoice.into_inner().payment_request,
invoice: invoice.payment_request.clone(),
time_value: (new_expire - vm.expires).num_seconds() as u64,
is_paid: false,
..Default::default()
};
let payment_id = self.db.insert_vm_payment(&vm_payment).await?;
vm_payment.id = payment_id;
self.db.insert_vm_payment(&vm_payment).await?;
Ok(vm_payment)
}
async fn allocate_ips(&self, vm_id: u64) -> Result<Vec<VmIpAssignment>> {
let mut vm = self.db.get_vm(vm_id).await?;
let ips = self.db.get_vm_ip_assignments(vm.id).await?;
if !ips.is_empty() {
bail!("IP resources are already assigned");
}
vm.hydrate_up(&self.db).await?;
let ip_ranges = self.db.list_ip_range().await?;
let ip_ranges: Vec<IpRange> = ip_ranges
.into_iter()
.filter(|i| i.region_id == vm.template.as_ref().unwrap().region_id)
.collect();
if ip_ranges.is_empty() {
bail!("No ip range found in this region");
}
let mut ret = vec![];
/// Try all ranges
// TODO: pick round-robin ranges
for range in ip_ranges {
let range_cidr: IpNetwork = range.cidr.parse()?;
let ips = self.db.get_vm_ip_assignments_in_range(range.id).await?;
let ips: HashSet<IpAddr> = ips.iter().map(|i| i.ip.parse().unwrap()).collect();
// pick an IP at random
let cidr: Vec<IpAddr> = {
let mut rng = rand::thread_rng();
range_cidr.iter().choose(&mut rng).into_iter().collect()
};
for ip in cidr {
if !ips.contains(&ip) {
info!("Attempting to allocate IP for {vm_id} to {ip}");
let mut assignment = VmIpAssignment {
id: 0,
vm_id,
ip_range_id: range.id,
ip: IpNetwork::new(ip, range_cidr.prefix())?.to_string(),
};
let id = self.db.insert_vm_ip_assignment(&assignment).await?;
assignment.id = id;
ret.push(assignment);
break;
}
}
}
Ok(ret)
}
}

27
src/provisioner/mod.rs Normal file
View File

@ -0,0 +1,27 @@
use anyhow::Result;
use lnvps_db::{Vm, VmIpAssignment, VmPayment};
use rocket::async_trait;
pub mod lnvps;
#[async_trait]
pub trait Provisioner: Send + Sync {
/// Provision a new VM for a user on the database
///
/// Note:
/// 1. Does not create a VM on the host machine
/// 2. Does not assign any IP resources
async fn provision(
&self,
user_id: u64,
template_id: u64,
image_id: u64,
ssh_key_id: u64,
) -> Result<Vm>;
/// Create a renewal payment
async fn renew(&self, vm_id: u64) -> Result<VmPayment>;
/// Allocate ips for a VM
async fn allocate_ips(&self, vm_id: u64) -> Result<Vec<VmIpAssignment>>;
}

132
src/worker.rs Normal file
View File

@ -0,0 +1,132 @@
use crate::host::proxmox::{CreateVm, ProxmoxClient, VmBios};
use crate::provisioner::lnvps::LNVpsProvisioner;
use crate::provisioner::Provisioner;
use anyhow::{bail, Result};
use fedimint_tonic_lnd::Client;
use ipnetwork::IpNetwork;
use lnvps_db::{LNVpsDb, Vm, VmHost};
use log::{error, info, warn};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
pub enum WorkJob {
/// Check the VM status matches database state
/// This job starts a vm if stopped and also creates the vm if it doesn't exist yet
CheckVm { vm_id: u64 },
/// Send a notification to the users chosen contact preferences
SendNotification { user_id: u64, message: String },
}
pub struct Worker {
db: Box<dyn LNVpsDb>,
lnd: Client,
provisioner: Box<dyn Provisioner>,
tx: UnboundedSender<WorkJob>,
rx: UnboundedReceiver<WorkJob>,
}
impl Worker {
pub fn new<D: LNVpsDb + Clone + 'static>(db: D, lnd: Client) -> Self {
let (tx, rx) = unbounded_channel();
let p = LNVpsProvisioner::new(db.clone(), lnd.clone());
Self {
db: Box::new(db),
provisioner: Box::new(p),
lnd,
tx,
rx,
}
}
pub fn sender(&self) -> UnboundedSender<WorkJob> {
self.tx.clone()
}
/// Spawn a VM on the host
async fn spawn_vm(&self, vm: &Vm, vm_host: &VmHost, client: &ProxmoxClient) -> Result<()> {
let mut ips = self.db.get_vm_ip_assignments(vm.id).await?;
if ips.is_empty() {
ips = self.provisioner.allocate_ips(vm.id).await?;
}
let ip_config = ips
.iter()
.map_while(|ip| {
if let Ok(net) = ip.ip.parse::<IpNetwork>() {
Some(match net {
IpNetwork::V4(addr) => format!("ip={}", addr),
IpNetwork::V6(addr) => format!("ip6={}", addr),
})
} else {
None
}
})
.collect::<Vec<_>>()
.join(",");
let drives = self.db.list_host_disks(vm.host_id).await?;
let drive = if let Some(d) = drives.iter().find(|d| d.enabled) {
d
} else {
bail!("No host drive found!")
};
let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?;
client
.create_vm(CreateVm {
node: vm_host.name.clone(),
vm_id: (vm.id + 100) as i32,
bios: Some(VmBios::OVMF),
boot: Some("order=scsi0".to_string()),
cores: Some(vm.cpu as i32),
cpu: Some("kvm64".to_string()),
ip_config: Some(ip_config),
machine: Some("q35".to_string()),
memory: Some((vm.memory / 1024 / 1024).to_string()),
net: Some("virtio,bridge=vmbr0,tag=100".to_string()),
os_type: Some("l26".to_string()),
scsi_1: Some(format!("{}:cloudinit", &drive.name)),
scsi_hw: Some("virtio-scsi-pci".to_string()),
ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()),
tags: Some("lnvps.net".to_string()),
efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)),
..Default::default()
})
.await?;
Ok(())
}
/// Check a VM's status
async fn check_vm(&self, vm_id: u64) -> Result<()> {
info!("Checking VM {}", vm_id);
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token);
match client.get_vm_status(&host.name, (vm.id + 100) as i32).await {
Ok(s) => {
info!("VM {} status: {:?}", vm_id, s.status);
}
Err(e) => {
warn!("Failed to get VM status: {}", e);
self.spawn_vm(&vm, &host, &client).await?;
}
}
Ok(())
}
pub async fn handle(&mut self) -> Result<()> {
while let Some(job) = self.rx.recv().await {
match job {
WorkJob::CheckVm { vm_id } => {
if let Err(e) = self.check_vm(vm_id).await {
error!("Failed to check VM {}: {:?}", vm_id, e);
}
}
WorkJob::SendNotification { .. } => {}
}
}
Ok(())
}
}