diff --git a/Cargo.lock b/Cargo.lock index 34402b8..a1fe2ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -632,6 +632,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "der" version = "0.7.9" @@ -650,6 +685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1556,6 +1592,12 @@ dependencies = [ "syn", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.0.3" @@ -1585,6 +1627,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1632,6 +1675,15 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "ipnetwork" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" +dependencies = [ + "serde", +] + [[package]] name = "is-terminal" version = "0.4.13" @@ -1731,10 +1783,12 @@ dependencies = [ "chrono", "config", "fedimint-tonic-lnd", + "ipnetwork", "lnvps_db", "log", "nostr", "pretty_env_logger", + "rand", "reqwest", "rocket", "serde", @@ -1752,6 +1806,7 @@ dependencies = [ "async-trait", "chrono", "serde", + "serde_with", "sqlx", ] @@ -3046,6 +3101,36 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.6.0", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3433,6 +3518,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 71e32d4..459d7f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ log = "0.4.21" config = { version = "0.14.0", features = ["yaml"] } pretty_env_logger = "0.5.0" serde = { version = "1.0.213", features = ["derive"] } -reqwest = { version = "0.12.8", features = ["json"] } +reqwest = { version = "0.12.8" } serde_json = "1.0.132" rocket = { version = "0.5.1", features = ["json"] } chrono = { version = "0.4.38", features = ["serde"] } @@ -23,3 +23,5 @@ base64 = "0.22.1" ssh-key = "0.6.7" urlencoding = "2.1.3" fedimint-tonic-lnd = { version = "0.2.0", default-features = false, features = ["invoicesrpc"] } +ipnetwork = "0.20.0" +rand = "0.8.5" diff --git a/dev_setup.sql b/dev_setup.sql index 496c6eb..11bc8a3 100644 --- a/dev_setup.sql +++ b/dev_setup.sql @@ -22,8 +22,8 @@ insert ignore into vm_os_image(id,distribution,flavour,version,enabled,url,release_date) values(5, 1,"Server","11",1,"https://cloud.debian.org/images/cloud/bullseye/latest/debian-11-genericcloud-amd64.raw","2021-08-14"); insert -ignore into ip_range(id,cidr,enabled) -values(1,"185.18.221.80/28",1); +ignore into ip_range(id,cidr,enabled,region_id) +values(1,"185.18.221.80/28",1,1); insert ignore into vm_cost_plan(id,name,amount,currency,interval_amount,interval_type) values(1,"tiny_monthly",2,"EUR",1,1); diff --git a/lnvps_db/Cargo.toml b/lnvps_db/Cargo.toml index 8aefae0..4a30af9 100644 --- a/lnvps_db/Cargo.toml +++ b/lnvps_db/Cargo.toml @@ -11,5 +11,6 @@ mysql = ["sqlx/mysql"] anyhow = "1.0.83" sqlx = { version = "0.8.2", features = ["chrono", "migrate", "runtime-tokio"] } serde = { version = "1.0.213", features = ["derive"] } +serde_with = { version = "3.11.0", features = ["macros", "hex"] } chrono = { version = "0.4.38", features = ["serde"] } async-trait = "0.1.83" \ No newline at end of file diff --git a/lnvps_db/migrations/20241103155733_init.sql b/lnvps_db/migrations/20241103155733_init.sql index 9eadd19..7702f5b 100644 --- a/lnvps_db/migrations/20241103155733_init.sql +++ b/lnvps_db/migrations/20241103155733_init.sql @@ -130,20 +130,24 @@ create table vm_ip_assignment id integer unsigned not null auto_increment primary key, vm_id integer unsigned not null, ip_range_id integer unsigned not null, + ip varchar(255) not null, constraint fk_vm_ip_assignment_vm foreign key (vm_id) references vm (id), constraint fk_vm_ip_range foreign key (ip_range_id) references ip_range (id) ); +create unique index ix_vm_ip_assignment_ip on vm_ip_assignment (ip); create table vm_payment ( - id binary(32) not null, - vm_id integer unsigned not null, - created timestamp default current_timestamp, - expires timestamp not null, - amount bigint unsigned not null, - invoice varchar(2048) not null, - time_value bigint unsigned not null, - is_paid bit(1) not null, + id binary(32) not null, + vm_id integer unsigned not null, + created timestamp default current_timestamp, + expires timestamp not null, + amount bigint unsigned not null, + invoice varchar(2048) not null, + time_value bigint unsigned not null, + is_paid bit(1) not null, + settle_index bigint unsigned, constraint fk_vm_payment_vm foreign key (vm_id) references vm (id) -); \ No newline at end of file +); +create unique index ix_vm_payment_id on vm_payment (id); \ No newline at end of file diff --git a/lnvps_db/src/lib.rs b/lnvps_db/src/lib.rs index 3f7e95b..76ad201 100644 --- a/lnvps_db/src/lib.rs +++ b/lnvps_db/src/lib.rs @@ -45,6 +45,9 @@ pub trait LNVpsDb: Sync + Send { /// List VM's owned by a specific user async fn list_hosts(&self) -> Result>; + /// List VM's owned by a specific user + async fn get_host(&self, id: u64) -> Result; + /// Update host resources (usually from [auto_discover]) async fn update_host(&self, host: &VmHost) -> Result<()>; @@ -69,6 +72,9 @@ pub trait LNVpsDb: Sync + Send { /// List VM templates async fn list_vm_templates(&self) -> Result>; + /// List all VM's + async fn list_vms(&self) -> Result>; + /// List VM's owned by a specific user async fn list_user_vms(&self, id: u64) -> Result>; @@ -78,15 +84,30 @@ pub trait LNVpsDb: Sync + Send { /// Insert a new VM record async fn insert_vm(&self, vm: &Vm) -> Result; + /// List VM ip assignments + async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result; + /// List VM ip assignments async fn get_vm_ip_assignments(&self, vm_id: u64) -> Result>; + /// List VM ip assignments by IP range + async fn get_vm_ip_assignments_in_range(&self, range_id: u64) -> Result>; + /// List payments by VM id async fn list_vm_payment(&self, vm_id: u64) -> Result>; /// Insert a new VM payment record - async fn insert_vm_payment(&self, vm_payment: &VmPayment) -> Result; + async fn insert_vm_payment(&self, vm_payment: &VmPayment) -> Result<()>; + + /// Get VM payment by payment id + async fn get_vm_payment(&self, id: &Vec) -> Result; /// Update a VM payment record async fn update_vm_payment(&self, vm_payment: &VmPayment) -> Result<()>; + + /// Mark a payment as paid and update the vm expiry + async fn vm_payment_paid(&self, id: &VmPayment) -> Result<()>; + + /// Return the most recently settled invoice + async fn last_paid_invoice(&self) -> Result>; } \ No newline at end of file diff --git a/lnvps_db/src/model.rs b/lnvps_db/src/model.rs index ce3c671..27f929c 100644 --- a/lnvps_db/src/model.rs +++ b/lnvps_db/src/model.rs @@ -1,14 +1,19 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use serde_with::serde_as; use sqlx::FromRow; +#[serde_as] #[derive(Serialize, Deserialize, FromRow, Clone, Debug)] /// Users who buy VM's pub struct User { /// Unique ID of this user (database generated) pub id: u64, + /// The nostr public key for this user + #[serde_as(as = "serde_with::hex::Hex")] pub pubkey: Vec, + /// When this user first started using the service (first login) pub created: DateTime, @@ -222,16 +227,26 @@ pub struct VmIpAssignment { pub id: u64, pub vm_id: u64, pub ip_range_id: u64, + pub ip: String, } -#[derive(Serialize, Deserialize, FromRow, Clone, Debug)] +#[serde_as] +#[derive(Serialize, Deserialize, FromRow, Clone, Debug, Default)] pub struct VmPayment { - pub id: u64, + /// Payment hash + #[serde_as(as = "serde_with::hex::Hex")] + pub id: Vec, pub vm_id: u64, pub created: DateTime, pub expires: DateTime, pub amount: u64, pub invoice: String, - pub time_value: u64, pub is_paid: bool, + + /// Number of seconds this payment will add to vm expiry + #[serde(skip_serializing)] + pub time_value: u64, + + #[serde(skip_serializing)] + pub settle_index: Option, } \ No newline at end of file diff --git a/lnvps_db/src/mysql.rs b/lnvps_db/src/mysql.rs index cbff71f..2364bd6 100644 --- a/lnvps_db/src/mysql.rs +++ b/lnvps_db/src/mysql.rs @@ -1,5 +1,5 @@ use crate::{IpRange, LNVpsDb, User, UserSshKey, Vm, VmCostPlan, VmHost, VmHostDisk, VmHostRegion, VmIpAssignment, VmOsImage, VmPayment, VmTemplate}; -use anyhow::{Error, Result}; +use anyhow::{bail, Error, Result}; use async_trait::async_trait; use sqlx::{Executor, MySqlPool, Row}; @@ -107,6 +107,14 @@ impl LNVpsDb for LNVpsDbMysql { .map_err(Error::new) } + async fn get_host(&self, id: u64) -> Result { + sqlx::query_as("select * from vm_host where id = ?") + .bind(&id) + .fetch_one(&self.db) + .await + .map_err(Error::new) + } + async fn update_host(&self, host: &VmHost) -> Result<()> { sqlx::query("update vm_host set name = ?, cpu = ?, memory = ? where id = ?") .bind(&host.name) @@ -171,6 +179,13 @@ impl LNVpsDb for LNVpsDbMysql { .map_err(Error::new) } + async fn list_vms(&self) -> Result> { + sqlx::query_as("select * from vm") + .fetch_all(&self.db) + .await + .map_err(Error::new) + } + async fn list_user_vms(&self, id: u64) -> Result> { sqlx::query_as("select * from vm where user_id = ?") .bind(&id) @@ -206,24 +221,44 @@ impl LNVpsDb for LNVpsDbMysql { .try_get(0)?) } + async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result { + Ok(sqlx::query("insert into vm_ip_assignment(vm_id,ip_range_id,ip) values(?, ?, ?) returning id") + .bind(&ip_assignment.vm_id) + .bind(&ip_assignment.ip_range_id) + .bind(&ip_assignment.ip) + .fetch_one(&self.db) + .await + .map_err(Error::new)? + .try_get(0)?) + } + async fn get_vm_ip_assignments(&self, vm_id: u64) -> Result> { - sqlx::query_as("select * from vm_ip_assignment where vm_id=?") + sqlx::query_as("select * from vm_ip_assignment where vm_id = ?") .bind(vm_id) .fetch_all(&self.db) .await .map_err(Error::new) } + async fn get_vm_ip_assignments_in_range(&self, range_id: u64) -> Result> { + sqlx::query_as("select * from vm_ip_assignment where ip_range_id = ?") + .bind(range_id) + .fetch_all(&self.db) + .await + .map_err(Error::new) + } + async fn list_vm_payment(&self, vm_id: u64) -> Result> { - sqlx::query_as("select * from vm_payment where vm_id=?") + sqlx::query_as("select * from vm_payment where vm_id = ?") .bind(vm_id) .fetch_all(&self.db) .await .map_err(Error::new) } - async fn insert_vm_payment(&self, vm_payment: &VmPayment) -> Result { - Ok(sqlx::query("insert into vm_payment(vm_id,created,expires,amount,invoice,time_value,is_paid) values(?,?,?,?,?,?,?) returning id") + async fn insert_vm_payment(&self, vm_payment: &VmPayment) -> Result<()> { + sqlx::query("insert into vm_payment(id,vm_id,created,expires,amount,invoice,time_value,is_paid) values(?,?,?,?,?,?,?,?)") + .bind(&vm_payment.id) .bind(&vm_payment.vm_id) .bind(&vm_payment.created) .bind(&vm_payment.expires) @@ -231,10 +266,18 @@ impl LNVpsDb for LNVpsDbMysql { .bind(&vm_payment.invoice) .bind(&vm_payment.time_value) .bind(&vm_payment.is_paid) + .execute(&self.db) + .await + .map_err(Error::new)?; + Ok(()) + } + + async fn get_vm_payment(&self, id: &Vec) -> Result { + sqlx::query_as("select * from vm_payment where id=?") + .bind(&id) .fetch_one(&self.db) .await - .map_err(Error::new)? - .try_get(0)?) + .map_err(Error::new) } async fn update_vm_payment(&self, vm_payment: &VmPayment) -> Result<()> { @@ -246,4 +289,34 @@ impl LNVpsDb for LNVpsDbMysql { .map_err(Error::new)?; Ok(()) } + + async fn vm_payment_paid(&self, vm_payment: &VmPayment) -> Result<()> { + if vm_payment.is_paid { + bail!("Invoice already paid"); + } + + let mut tx = self.db.begin().await?; + + sqlx::query("update vm_payment set is_paid = true, settle_index = ? where id = ?") + .bind(&vm_payment.settle_index) + .bind(&vm_payment.id) + .execute(&mut *tx) + .await?; + + sqlx::query("update vm set expires = TIMESTAMPADD(SECOND, ?, expires) where id = ?") + .bind(&vm_payment.time_value) + .bind(&vm_payment.vm_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + async fn last_paid_invoice(&self) -> Result> { + sqlx::query_as("select * from vm_payment where is_paid = true order by settle_index desc limit 1") + .fetch_optional(&self.db) + .await + .map_err(Error::new) + } } \ No newline at end of file diff --git a/src/api.rs b/src/api.rs index 8ff4ef6..32b3f21 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,6 +2,7 @@ use crate::nip98::Nip98Auth; use crate::provisioner::Provisioner; use lnvps_db::hydrate::Hydrate; use lnvps_db::{LNVpsDb, UserSshKey, Vm, VmOsImage, VmPayment, VmTemplate}; +use nostr::util::hex; use rocket::serde::json::Json; use rocket::{get, post, routes, Responder, Route, State}; use serde::{Deserialize, Serialize}; @@ -10,12 +11,14 @@ use ssh_key::PublicKey; pub fn routes() -> Vec { routes![ v1_list_vms, + v1_get_vm, v1_list_vm_templates, v1_list_vm_images, v1_list_ssh_keys, v1_add_ssh_key, v1_create_vm_order, - v1_renew_vm + v1_renew_vm, + v1_get_payment ] } @@ -64,10 +67,28 @@ async fn v1_list_vms(auth: Nip98Auth, db: &State>) -> ApiResult let mut vms = db.list_user_vms(uid).await?; for vm in &mut vms { vm.hydrate_up(db).await?; + if let Some(t) = &mut vm.template { + t.hydrate_up(db).await?; + } } ApiData::ok(vms) } +#[get("/api/v1/vm/")] +async fn v1_get_vm(auth: Nip98Auth, db: &State>, id: u64) -> ApiResult { + let pubkey = auth.event.pubkey.to_bytes(); + let uid = db.upsert_user(&pubkey).await?; + let mut vm = db.get_vm(id).await?; + if vm.user_id != uid { + return ApiData::err("VM doesnt belong to you"); + } + vm.hydrate_up(db).await?; + if let Some(t) = &mut vm.template { + t.hydrate_up(db).await?; + } + ApiData::ok(vm) +} + #[get("/api/v1/image")] async fn v1_list_vm_images(db: &State>) -> ApiResult> { let vms = db.list_os_image().await?; @@ -156,6 +177,27 @@ async fn v1_renew_vm( ApiData::ok(rsp) } +#[get("/api/v1/payment/")] +async fn v1_get_payment( + auth: Nip98Auth, + db: &State>, + id: &str, +) -> ApiResult { + let pubkey = auth.event.pubkey.to_bytes(); + let uid = db.upsert_user(&pubkey).await?; + let id = if let Ok(i) = hex::decode(id) { + i + } else { + return ApiData::err("Invalid payment id"); + }; + let payment = db.get_vm_payment(&id).await?; + let vm = db.get_vm(payment.vm_id).await?; + if vm.user_id != uid { + return ApiData::err("VM does not belong to you"); + } + + ApiData::ok(payment) +} #[derive(Deserialize)] struct CreateVmRequest { template_id: u64, diff --git a/src/bin/api.rs b/src/bin/api.rs index 70512fe..218cf93 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -3,11 +3,15 @@ use config::{Config, File}; use fedimint_tonic_lnd::connect; use lnvps::api; use lnvps::cors::CORS; -use lnvps::provisioner::{LNVpsProvisioner, Provisioner}; +use lnvps::invoice::InvoiceHandler; +use lnvps::provisioner::lnvps::LNVpsProvisioner; +use lnvps::provisioner::Provisioner; +use lnvps::worker::{WorkJob, Worker}; use lnvps_db::{LNVpsDb, LNVpsDbMysql}; use log::error; use serde::{Deserialize, Serialize}; use std::path::PathBuf; +use std::time::Duration; #[derive(Debug, Deserialize, Serialize)] pub struct Settings { @@ -43,6 +47,39 @@ async fn main() -> Result<(), Error> { provisioner.auto_discover().await?; } + let mut worker = Worker::new(db.clone(), lnd.clone()); + let sender = worker.sender(); + tokio::spawn(async move { + loop { + if let Err(e) = worker.handle().await { + error!("worker-error: {}", e); + } + } + }); + let mut handler = InvoiceHandler::new(lnd.clone(), db.clone(), sender.clone()); + tokio::spawn(async move { + loop { + if let Err(e) = handler.listen().await { + error!("invoice-error: {}", e); + } + } + }); + // request work every 30s to check vm status + let db_clone = db.clone(); + let sender_clone = sender.clone(); + tokio::spawn(async move { + loop { + if let Ok(vms) = db_clone.list_vms().await { + for vm in vms { + if let Err(e) = sender_clone.send(WorkJob::CheckVm { vm_id: vm.id }) { + error!("failed to send check vm: {}", e); + } + } + } + tokio::time::sleep(Duration::from_secs(30)).await; + } + }); + let db: Box = Box::new(db.clone()); let pv: Box = Box::new(provisioner); if let Err(e) = rocket::build() diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index 5620e3e..fda37c3 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{bail, Result}; use log::info; use reqwest::{ClientBuilder, Url}; use serde::de::DeserializeOwned; @@ -72,14 +72,20 @@ impl ProxmoxClient { } async fn get(&self, path: &str) -> Result { - self.client + let rsp = self + .client .get(self.base.join(path)?) .header("Authorization", format!("PVEAPIToken={}", self.token)) .send() - .await? - .json::() - .await - .map_err(anyhow::Error::new) + .await?; + let status = rsp.status(); + let text = rsp.text().await?; + info!("<< {}", text); + if status.is_success() { + Ok(serde_json::from_str(&text)?) + } else { + bail!("{}", status); + } } async fn post(&self, path: &str, body: R) -> Result { @@ -87,12 +93,19 @@ impl ProxmoxClient { .client .post(self.base.join(path)?) .header("Authorization", format!("PVEAPIToken={}", self.token)) - .json::(&body) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .body(serde_json::to_string(&body)?) .send() .await?; - let rsp = rsp.text().await?; - info!("<< {}", rsp); - Ok(serde_json::from_str(&rsp)?) + let status = rsp.status(); + let text = rsp.text().await?; + info!("<< {}", text); + if status.is_success() { + Ok(serde_json::from_str(&text)?) + } else { + bail!("{}", status); + } } } diff --git a/src/invoice.rs b/src/invoice.rs new file mode 100644 index 0000000..7d8d4ad --- /dev/null +++ b/src/invoice.rs @@ -0,0 +1,70 @@ +use crate::worker::WorkJob; +use anyhow::Result; +use fedimint_tonic_lnd::lnrpc::invoice::InvoiceState; +use fedimint_tonic_lnd::lnrpc::InvoiceSubscription; +use fedimint_tonic_lnd::Client; +use lnvps_db::LNVpsDb; +use log::{error, info}; +use nostr::util::hex; +use rocket::futures::StreamExt; +use tokio::sync::mpsc::UnboundedSender; + +pub struct InvoiceHandler { + lnd: Client, + db: Box, + tx: UnboundedSender, +} + +impl InvoiceHandler { + pub fn new(lnd: Client, db: D, tx: UnboundedSender) -> Self { + Self { + lnd, + tx, + db: Box::new(db), + } + } + + async fn mark_paid(&self, settle_index: u64, id: &Vec) -> Result<()> { + let mut p = self.db.get_vm_payment(id).await?; + p.settle_index = Some(settle_index); + self.db.vm_payment_paid(&p).await?; + + info!("VM payment {} for {}, paid", hex::encode(p.id), p.vm_id); + self.tx.send(WorkJob::CheckVm { vm_id: p.vm_id })?; + + Ok(()) + } + + pub async fn listen(&mut self) -> Result<()> { + let from_settle_index = if let Some(p) = self.db.last_paid_invoice().await? { + p.settle_index.unwrap_or(0) + } else { + 0 + }; + info!("Listening for invoices from {from_settle_index}"); + + let handler = self + .lnd + .lightning() + .subscribe_invoices(InvoiceSubscription { + add_index: 0, + settle_index: from_settle_index, + }) + .await?; + + let mut stream = handler.into_inner(); + while let Some(msg) = stream.next().await { + match msg { + Ok(i) => { + if i.state == InvoiceState::Settled as i32 { + if let Err(e) = self.mark_paid(i.settle_index, &i.r_hash).await { + error!("{}", e); + } + } + } + Err(e) => error!("{}", e), + } + } + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 412786c..8227cdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ pub mod api; pub mod cors; pub mod host; -mod nip98; +pub mod invoice; +pub mod nip98; pub mod provisioner; +pub mod worker; diff --git a/src/provisioner.rs b/src/provisioner/lnvps.rs similarity index 66% rename from src/provisioner.rs rename to src/provisioner/lnvps.rs index 82c9e13..29f156b 100644 --- a/src/provisioner.rs +++ b/src/provisioner/lnvps.rs @@ -1,38 +1,30 @@ use crate::host::proxmox::ProxmoxClient; +use crate::provisioner::Provisioner; use anyhow::{bail, Result}; use chrono::{Days, Months, Utc}; use fedimint_tonic_lnd::lnrpc::Invoice; +use fedimint_tonic_lnd::tonic::async_trait; use fedimint_tonic_lnd::Client; -use lnvps_db::{LNVpsDb, Vm, VmCostPlanIntervalType, VmOsImage, VmPayment}; +use ipnetwork::IpNetwork; +use lnvps_db::hydrate::Hydrate; +use lnvps_db::{ + IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmOsImage, VmPayment, +}; use log::{info, warn}; -use rocket::async_trait; -use rocket::yansi::Paint; +use rand::seq::IteratorRandom; +use std::collections::HashSet; +use std::net::IpAddr; use std::ops::Add; use std::path::PathBuf; use std::time::Duration; -#[async_trait] -pub trait Provisioner: Send + Sync { - /// Provision a new VM - async fn provision( - &self, - user_id: u64, - template_id: u64, - image_id: u64, - ssh_key_id: u64, - ) -> Result; - - /// Create a renewal payment - async fn renew(&self, vm_id: u64) -> Result; -} - pub struct LNVpsProvisioner { db: Box, lnd: Client, } impl LNVpsProvisioner { - pub fn new(db: impl LNVpsDb + 'static, lnd: Client) -> Self { + pub fn new(db: D, lnd: Client) -> Self { Self { db: Box::new(db), lnd, @@ -143,6 +135,15 @@ impl Provisioner for LNVpsProvisioner { let template = self.db.get_vm_template(vm.template_id).await?; let cost_plan = self.db.get_cost_plan(template.cost_plan_id).await?; + /// Reuse existing payment until expired + let payments = self.db.list_vm_payment(vm.id).await?; + if let Some(px) = payments + .into_iter() + .find(|p| p.expires > Utc::now() && !p.is_paid) + { + return Ok(px); + } + // push the expiration forward by cost plan interval amount let new_expire = match cost_plan.interval_type { VmCostPlanIntervalType::Day => vm.expires.add(Days::new(cost_plan.interval_amount)), @@ -175,19 +176,74 @@ impl Provisioner for LNVpsProvisioner { }) .await?; - let mut vm_payment = VmPayment { - id: 0, + let invoice = invoice.into_inner(); + let vm_payment = VmPayment { + id: invoice.r_hash.clone(), vm_id, created: Utc::now(), expires: Utc::now().add(Duration::from_secs(INVOICE_EXPIRE as u64)), amount: cost, - invoice: invoice.into_inner().payment_request, + invoice: invoice.payment_request.clone(), time_value: (new_expire - vm.expires).num_seconds() as u64, is_paid: false, + ..Default::default() }; - let payment_id = self.db.insert_vm_payment(&vm_payment).await?; - vm_payment.id = payment_id; + self.db.insert_vm_payment(&vm_payment).await?; Ok(vm_payment) } + + async fn allocate_ips(&self, vm_id: u64) -> Result> { + let mut vm = self.db.get_vm(vm_id).await?; + let ips = self.db.get_vm_ip_assignments(vm.id).await?; + + if !ips.is_empty() { + bail!("IP resources are already assigned"); + } + + vm.hydrate_up(&self.db).await?; + let ip_ranges = self.db.list_ip_range().await?; + let ip_ranges: Vec = ip_ranges + .into_iter() + .filter(|i| i.region_id == vm.template.as_ref().unwrap().region_id) + .collect(); + + if ip_ranges.is_empty() { + bail!("No ip range found in this region"); + } + + let mut ret = vec![]; + /// Try all ranges + // TODO: pick round-robin ranges + for range in ip_ranges { + let range_cidr: IpNetwork = range.cidr.parse()?; + let ips = self.db.get_vm_ip_assignments_in_range(range.id).await?; + let ips: HashSet = ips.iter().map(|i| i.ip.parse().unwrap()).collect(); + + // pick an IP at random + let cidr: Vec = { + let mut rng = rand::thread_rng(); + range_cidr.iter().choose(&mut rng).into_iter().collect() + }; + + for ip in cidr { + if !ips.contains(&ip) { + info!("Attempting to allocate IP for {vm_id} to {ip}"); + let mut assignment = VmIpAssignment { + id: 0, + vm_id, + ip_range_id: range.id, + ip: IpNetwork::new(ip, range_cidr.prefix())?.to_string(), + }; + let id = self.db.insert_vm_ip_assignment(&assignment).await?; + assignment.id = id; + + ret.push(assignment); + break; + } + } + } + + Ok(ret) + } } diff --git a/src/provisioner/mod.rs b/src/provisioner/mod.rs new file mode 100644 index 0000000..70a4771 --- /dev/null +++ b/src/provisioner/mod.rs @@ -0,0 +1,27 @@ +use anyhow::Result; +use lnvps_db::{Vm, VmIpAssignment, VmPayment}; +use rocket::async_trait; + +pub mod lnvps; + +#[async_trait] +pub trait Provisioner: Send + Sync { + /// Provision a new VM for a user on the database + /// + /// Note: + /// 1. Does not create a VM on the host machine + /// 2. Does not assign any IP resources + async fn provision( + &self, + user_id: u64, + template_id: u64, + image_id: u64, + ssh_key_id: u64, + ) -> Result; + + /// Create a renewal payment + async fn renew(&self, vm_id: u64) -> Result; + + /// Allocate ips for a VM + async fn allocate_ips(&self, vm_id: u64) -> Result>; +} diff --git a/src/worker.rs b/src/worker.rs new file mode 100644 index 0000000..0bbc04b --- /dev/null +++ b/src/worker.rs @@ -0,0 +1,132 @@ +use crate::host::proxmox::{CreateVm, ProxmoxClient, VmBios}; +use crate::provisioner::lnvps::LNVpsProvisioner; +use crate::provisioner::Provisioner; +use anyhow::{bail, Result}; +use fedimint_tonic_lnd::Client; +use ipnetwork::IpNetwork; +use lnvps_db::{LNVpsDb, Vm, VmHost}; +use log::{error, info, warn}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +pub enum WorkJob { + /// Check the VM status matches database state + /// This job starts a vm if stopped and also creates the vm if it doesn't exist yet + CheckVm { vm_id: u64 }, + /// Send a notification to the users chosen contact preferences + SendNotification { user_id: u64, message: String }, +} + +pub struct Worker { + db: Box, + lnd: Client, + provisioner: Box, + tx: UnboundedSender, + rx: UnboundedReceiver, +} + +impl Worker { + pub fn new(db: D, lnd: Client) -> Self { + let (tx, rx) = unbounded_channel(); + let p = LNVpsProvisioner::new(db.clone(), lnd.clone()); + Self { + db: Box::new(db), + provisioner: Box::new(p), + lnd, + tx, + rx, + } + } + + pub fn sender(&self) -> UnboundedSender { + self.tx.clone() + } + + /// Spawn a VM on the host + async fn spawn_vm(&self, vm: &Vm, vm_host: &VmHost, client: &ProxmoxClient) -> Result<()> { + let mut ips = self.db.get_vm_ip_assignments(vm.id).await?; + if ips.is_empty() { + ips = self.provisioner.allocate_ips(vm.id).await?; + } + + let ip_config = ips + .iter() + .map_while(|ip| { + if let Ok(net) = ip.ip.parse::() { + Some(match net { + IpNetwork::V4(addr) => format!("ip={}", addr), + IpNetwork::V6(addr) => format!("ip6={}", addr), + }) + } else { + None + } + }) + .collect::>() + .join(","); + + let drives = self.db.list_host_disks(vm.host_id).await?; + let drive = if let Some(d) = drives.iter().find(|d| d.enabled) { + d + } else { + bail!("No host drive found!") + }; + + let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?; + + client + .create_vm(CreateVm { + node: vm_host.name.clone(), + vm_id: (vm.id + 100) as i32, + bios: Some(VmBios::OVMF), + boot: Some("order=scsi0".to_string()), + cores: Some(vm.cpu as i32), + cpu: Some("kvm64".to_string()), + ip_config: Some(ip_config), + machine: Some("q35".to_string()), + memory: Some((vm.memory / 1024 / 1024).to_string()), + net: Some("virtio,bridge=vmbr0,tag=100".to_string()), + os_type: Some("l26".to_string()), + scsi_1: Some(format!("{}:cloudinit", &drive.name)), + scsi_hw: Some("virtio-scsi-pci".to_string()), + ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()), + tags: Some("lnvps.net".to_string()), + efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)), + ..Default::default() + }) + .await?; + + Ok(()) + } + + /// Check a VM's status + async fn check_vm(&self, vm_id: u64) -> Result<()> { + info!("Checking VM {}", vm_id); + let vm = self.db.get_vm(vm_id).await?; + let host = self.db.get_host(vm.host_id).await?; + let client = ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token); + + match client.get_vm_status(&host.name, (vm.id + 100) as i32).await { + Ok(s) => { + info!("VM {} status: {:?}", vm_id, s.status); + } + Err(e) => { + warn!("Failed to get VM status: {}", e); + self.spawn_vm(&vm, &host, &client).await?; + } + } + Ok(()) + } + + pub async fn handle(&mut self) -> Result<()> { + while let Some(job) = self.rx.recv().await { + match job { + WorkJob::CheckVm { vm_id } => { + if let Err(e) = self.check_vm(vm_id).await { + error!("Failed to check VM {}: {:?}", vm_id, e); + } + } + WorkJob::SendNotification { .. } => {} + } + } + Ok(()) + } +}