diff --git a/config.yaml b/config.yaml index a7d8809..11c769c 100644 --- a/config.yaml +++ b/config.yaml @@ -3,6 +3,7 @@ lnd: url: "https://127.0.0.1:10003" cert: "/home/kieran/.polar/networks/2/volumes/lnd/alice/tls.cert" macaroon: "/home/kieran/.polar/networks/2/volumes/lnd/alice/data/chain/bitcoin/regtest/admin.macaroon" +delete_after: 3 provisioner: proxmox: read_only: false diff --git a/lnvps_db/src/lib.rs b/lnvps_db/src/lib.rs index 408582d..cf3136d 100644 --- a/lnvps_db/src/lib.rs +++ b/lnvps_db/src/lib.rs @@ -78,6 +78,9 @@ pub trait LNVpsDb: Sync + Send { /// List all VM's async fn list_vms(&self) -> Result>; + /// List expired VM's + async fn list_expired_vms(&self) -> Result>; + /// List VM's owned by a specific user async fn list_user_vms(&self, id: u64) -> Result>; @@ -87,6 +90,9 @@ pub trait LNVpsDb: Sync + Send { /// Insert a new VM record async fn insert_vm(&self, vm: &Vm) -> Result; + /// Delete a VM by id + async fn delete_vm(&self, vm_id: u64) -> Result<()>; + /// List VM ip assignments async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result; diff --git a/lnvps_db/src/mysql.rs b/lnvps_db/src/mysql.rs index ee58b27..b05891d 100644 --- a/lnvps_db/src/mysql.rs +++ b/lnvps_db/src/mysql.rs @@ -197,6 +197,13 @@ impl LNVpsDb for LNVpsDbMysql { .map_err(Error::new) } + async fn list_expired_vms(&self) -> Result> { + sqlx::query_as("select * from vm where expires > current_timestamp()") + .fetch_all(&self.db) + .await + .map_err(Error::new) + } + async fn list_user_vms(&self, id: u64) -> Result> { sqlx::query_as("select * from vm where user_id = ?") .bind(id) @@ -233,6 +240,15 @@ impl LNVpsDb for LNVpsDbMysql { .try_get(0)?) } + async fn delete_vm(&self, vm_id: u64) -> Result<()> { + sqlx::query("delete from vm where id = ?") + .bind(vm_id) + .execute(&self.db) + .await + .map_err(Error::new)?; + Ok(()) + } + async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result { Ok(sqlx::query( "insert into vm_ip_assignment(vm_id,ip_range_id,ip) values(?, ?, ?) returning id", diff --git a/src/api.rs b/src/api.rs index 6896c5e..c949a2a 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,13 +1,15 @@ use crate::nip98::Nip98Auth; use crate::provisioner::Provisioner; use crate::status::{VmState, VmStateCache}; +use crate::worker::WorkJob; use lnvps_db::hydrate::Hydrate; use lnvps_db::{LNVpsDb, UserSshKey, Vm, VmOsImage, VmPayment, VmTemplate}; use nostr::util::hex; use rocket::serde::json::Json; -use rocket::{get, post, routes, Responder, Route, State}; +use rocket::{get, patch, post, routes, Responder, Route, State}; use serde::{Deserialize, Serialize}; use ssh_key::PublicKey; +use tokio::sync::mpsc::UnboundedSender; pub fn routes() -> Vec { routes![ @@ -19,7 +21,10 @@ pub fn routes() -> Vec { v1_add_ssh_key, v1_create_vm_order, v1_renew_vm, - v1_get_payment + v1_get_payment, + v1_start_vm, + v1_stop_vm, + v1_restart_vm ] } @@ -195,6 +200,66 @@ async fn v1_renew_vm( ApiData::ok(rsp) } +#[patch("/api/v1/vm//start")] +async fn v1_start_vm( + auth: Nip98Auth, + db: &State>, + provisioner: &State>, + worker: &State>, + id: u64, +) -> ApiResult<()> { + let pubkey = auth.event.pubkey.to_bytes(); + let uid = db.upsert_user(&pubkey).await?; + let vm = db.get_vm(id).await?; + if uid != vm.user_id { + return ApiData::err("VM does not belong to you"); + } + + provisioner.start_vm(id).await?; + worker.send(WorkJob::CheckVm { vm_id: id })?; + ApiData::ok(()) +} + +#[patch("/api/v1/vm//stop")] +async fn v1_stop_vm( + auth: Nip98Auth, + db: &State>, + provisioner: &State>, + worker: &State>, + id: u64, +) -> ApiResult<()> { + let pubkey = auth.event.pubkey.to_bytes(); + let uid = db.upsert_user(&pubkey).await?; + let vm = db.get_vm(id).await?; + if uid != vm.user_id { + return ApiData::err("VM does not belong to you"); + } + + provisioner.stop_vm(id).await?; + worker.send(WorkJob::CheckVm { vm_id: id })?; + ApiData::ok(()) +} + +#[patch("/api/v1/vm//restart")] +async fn v1_restart_vm( + auth: Nip98Auth, + db: &State>, + provisioner: &State>, + worker: &State>, + id: u64, +) -> ApiResult<()> { + let pubkey = auth.event.pubkey.to_bytes(); + let uid = db.upsert_user(&pubkey).await?; + let vm = db.get_vm(id).await?; + if uid != vm.user_id { + return ApiData::err("VM does not belong to you"); + } + + provisioner.restart_vm(id).await?; + worker.send(WorkJob::CheckVm { vm_id: id })?; + ApiData::ok(()) +} + #[get("/api/v1/payment/")] async fn v1_get_payment( auth: Nip98Auth, @@ -216,6 +281,7 @@ async fn v1_get_payment( ApiData::ok(payment) } + #[derive(Deserialize)] struct CreateVmRequest { template_id: u64, diff --git a/src/bin/api.rs b/src/bin/api.rs index 0d53b4b..28497f4 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -1,4 +1,5 @@ use anyhow::Error; +use chrono::Utc; use clap::Parser; use config::{Config, File}; use fedimint_tonic_lnd::connect; @@ -28,7 +29,9 @@ async fn main() -> Result<(), Error> { let args = Args::parse(); let settings: Settings = Config::builder() - .add_source(File::with_name(&args.config.unwrap_or("config.yaml".to_string()))) + .add_source(File::with_name( + &args.config.unwrap_or("config.yaml".to_string()), + )) .build()? .try_deserialize()?; @@ -68,16 +71,11 @@ async fn main() -> Result<(), Error> { } }); // request work every 30s to check vm status - let db_clone = db.clone(); let sender_clone = sender.clone(); tokio::spawn(async move { loop { - if let Ok(vms) = db_clone.list_vms().await { - for vm in vms { - if let Err(e) = sender_clone.send(WorkJob::CheckVm { vm_id: vm.id }) { - error!("failed to send check vm: {}", e); - } - } + if let Err(e) = sender_clone.send(WorkJob::CheckVms) { + error!("failed to send check vm: {}", e); } tokio::time::sleep(Duration::from_secs(30)).await; } @@ -120,6 +118,7 @@ async fn main() -> Result<(), Error> { .manage(pv) .manage(status) .manage(exchange) + .manage(sender) .mount("/", api::routes()) .launch() .await diff --git a/src/host/mod.rs b/src/host/mod.rs index f1a8841..6a26153 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -1,2 +1,12 @@ +use crate::host::proxmox::ProxmoxClient; +use anyhow::Result; +use lnvps_db::{VmHost, VmHostKind}; + pub mod proxmox; pub trait VmHostClient {} + +pub fn get_host_client(host: &VmHost) -> Result { + Ok(match host.kind { + VmHostKind::Proxmox => ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token), + }) +} diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index 802a34f..8e25191 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -113,6 +113,27 @@ impl ProxmoxClient { } } + /// Delete VM + /// + /// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/qemu + pub async fn delete_vm(&self, node: &str, vm: u64) -> Result { + let rsp: ResponseBase> = self + .req( + Method::DELETE, + &format!("/api2/json/nodes/{node}/qemu/{vm}"), + (), + ) + .await?; + if let Some(id) = rsp.data { + Ok(TaskId { + id, + node: node.to_string(), + }) + } else { + Err(anyhow!("Failed to configure VM")) + } + } + /// Get the current status of a running task /// /// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/tasks/{upid}/status @@ -164,7 +185,7 @@ impl ProxmoxClient { /// Resize a disk on a VM pub async fn resize_disk(&self, req: ResizeDiskRequest) -> Result { let rsp: ResponseBase = self - .put( + .req( Method::PUT, &format!("/api2/json/nodes/{}/qemu/{}/resize", &req.node, &req.vm_id), &req, @@ -190,6 +211,48 @@ impl ProxmoxClient { }) } + /// Stop a VM + pub async fn stop_vm(&self, node: &str, vm: u64) -> Result { + let rsp: ResponseBase = self + .post( + &format!("/api2/json/nodes/{}/qemu/{}/status/stop", node, vm), + (), + ) + .await?; + Ok(TaskId { + id: rsp.data, + node: node.to_string(), + }) + } + + /// Stop a VM + pub async fn shutdown_vm(&self, node: &str, vm: u64) -> Result { + let rsp: ResponseBase = self + .post( + &format!("/api2/json/nodes/{}/qemu/{}/status/shutdown", node, vm), + (), + ) + .await?; + Ok(TaskId { + id: rsp.data, + node: node.to_string(), + }) + } + + /// Stop a VM + pub async fn reset_vm(&self, node: &str, vm: u64) -> Result { + let rsp: ResponseBase = self + .post( + &format!("/api2/json/nodes/{}/qemu/{}/status/reset", node, vm), + (), + ) + .await?; + Ok(TaskId { + id: rsp.data, + node: node.to_string(), + }) + } + async fn get(&self, path: &str) -> Result { let rsp = self .client @@ -209,10 +272,10 @@ impl ProxmoxClient { } async fn post(&self, path: &str, body: R) -> Result { - self.put(Method::POST, path, body).await + self.req(Method::POST, path, body).await } - async fn put( + async fn req( &self, method: Method, path: &str, @@ -319,7 +382,7 @@ pub struct NodeResponse { pub uptime: Option, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum VmStatus { Stopped, diff --git a/src/provisioner/lnvps.rs b/src/provisioner/lnvps.rs index da4c614..ddfd91c 100644 --- a/src/provisioner/lnvps.rs +++ b/src/provisioner/lnvps.rs @@ -1,4 +1,5 @@ use crate::exchange::{ExchangeRateCache, Ticker}; +use crate::host::get_host_client; use crate::host::proxmox::{ ConfigureVm, CreateVm, DownloadUrlRequest, ProxmoxClient, ResizeDiskRequest, StorageContent, TaskState, VmBios, VmConfig, @@ -57,14 +58,6 @@ impl LNVpsProvisioner { } } - fn get_host_client(host: &VmHost) -> Result { - Ok(match host.kind { - VmHostKind::Proxmox => { - ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token) - } - }) - } - async fn get_iso_storage(node: &str, client: &ProxmoxClient) -> Result { let storages = client.list_storage(node).await?; if let Some(s) = storages @@ -84,7 +77,7 @@ impl Provisioner for LNVpsProvisioner { // tell hosts to download images let hosts = self.db.list_hosts().await?; for host in hosts { - let client = Self::get_host_client(&host)?; + let client = get_host_client(&host)?; let iso_storage = Self::get_iso_storage(&host.name, &client).await?; let files = client.list_storage_files(&host.name, &iso_storage).await?; @@ -294,7 +287,7 @@ impl Provisioner for LNVpsProvisioner { } let vm = self.db.get_vm(vm_id).await?; let host = self.db.get_host(vm.host_id).await?; - let client = Self::get_host_client(&host)?; + let client = get_host_client(&host)?; let mut ips = self.db.list_vm_ip_assignments(vm.id).await?; if ips.is_empty() { @@ -418,4 +411,49 @@ impl Provisioner for LNVpsProvisioner { Ok(()) } + + async fn start_vm(&self, vm_id: u64) -> Result<()> { + let vm = self.db.get_vm(vm_id).await?; + let host = self.db.get_host(vm.host_id).await?; + + let client = get_host_client(&host)?; + let j_start = client.start_vm(&host.name, vm.id + 100).await?; + client.wait_for_task(&j_start).await?; + Ok(()) + } + + async fn stop_vm(&self, vm_id: u64) -> Result<()> { + let vm = self.db.get_vm(vm_id).await?; + let host = self.db.get_host(vm.host_id).await?; + + let client = get_host_client(&host)?; + let j_start = client.shutdown_vm(&host.name, vm.id + 100).await?; + client.wait_for_task(&j_start).await?; + + Ok(()) + } + + async fn restart_vm(&self, vm_id: u64) -> Result<()> { + let vm = self.db.get_vm(vm_id).await?; + let host = self.db.get_host(vm.host_id).await?; + + let client = get_host_client(&host)?; + let j_start = client.reset_vm(&host.name, vm.id + 100).await?; + client.wait_for_task(&j_start).await?; + + Ok(()) + } + + async fn delete_vm(&self, vm_id: u64) -> Result<()> { + let vm = self.db.get_vm(vm_id).await?; + let host = self.db.get_host(vm.host_id).await?; + + let client = get_host_client(&host)?; + let j_start = client.delete_vm(&host.name, vm.id + 100).await?; + client.wait_for_task(&j_start).await?; + + self.db.delete_vm(vm.id).await?; + + Ok(()) + } } diff --git a/src/provisioner/mod.rs b/src/provisioner/mod.rs index ef00d22..e02e7fe 100644 --- a/src/provisioner/mod.rs +++ b/src/provisioner/mod.rs @@ -30,4 +30,16 @@ pub trait Provisioner: Send + Sync { /// Spawn a VM on the host async fn spawn_vm(&self, vm_id: u64) -> Result<()>; + + /// Start a VM + async fn start_vm(&self, vm_id: u64) -> Result<()>; + + /// Stop a running VM + async fn stop_vm(&self, vm_id: u64) -> Result<()>; + + /// Restart a VM + async fn restart_vm(&self, vm_id: u64) -> Result<()>; + + /// Delete a VM + async fn delete_vm(&self, vm_id: u64) -> Result<()>; } diff --git a/src/settings.rs b/src/settings.rs index 0bc0b8d..78af7c7 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -12,6 +12,9 @@ pub struct Settings { pub db: String, pub lnd: LndConfig, pub provisioner: ProvisionerConfig, + + /// Number of days after an expired VM is deleted + pub delete_after: u16, } #[derive(Debug, Deserialize, Serialize)] diff --git a/src/worker.rs b/src/worker.rs index 0a95979..aebafe4 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,13 +1,17 @@ -use crate::host::proxmox::{ProxmoxClient, VmStatus}; +use crate::host::get_host_client; +use crate::host::proxmox::{ProxmoxClient, VmInfo, VmStatus}; use crate::provisioner::Provisioner; use crate::status::{VmRunningState, VmState, VmStateCache}; use anyhow::Result; -use chrono::Utc; +use chrono::{Days, Utc}; use lnvps_db::LNVpsDb; use log::{debug, error, info, warn}; +use std::ops::Add; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; pub enum WorkJob { + /// Check all running VMS + CheckVms, /// Check the VM status matches database state /// /// This job starts a vm if stopped and also creates the vm if it doesn't exist yet @@ -44,32 +48,50 @@ impl Worker { self.tx.clone() } + async fn handle_vm_info(&self, s: VmInfo) -> Result<()> { + // TODO: remove assumption + let db_id = (s.vm_id - 100) as u64; + let state = VmState { + state: match s.status { + VmStatus::Stopped => VmRunningState::Stopped, + VmStatus::Running => VmRunningState::Running, + }, + cpu_usage: s.cpu.unwrap_or(0.0), + mem_usage: s.mem.unwrap_or(0) as f32 / s.max_mem.unwrap_or(1) as f32, + uptime: s.uptime.unwrap_or(0), + net_in: s.net_in.unwrap_or(0), + net_out: s.net_out.unwrap_or(0), + disk_write: s.disk_write.unwrap_or(0), + disk_read: s.disk_read.unwrap_or(0), + }; + self.vm_state_cache.set_state(db_id, state).await?; + + if let Ok(db_vm) = self.db.get_vm(db_id).await { + // Stop VM if expired and is running + if db_vm.expires < Utc::now() && s.status == VmStatus::Running { + info!("Stopping expired VM {}", db_vm.id); + self.provisioner.stop_vm(db_vm.id).await?; + } + // Delete VM if expired > 3 days + if db_vm.expires.add(Days::new(3)) < Utc::now() { + info!("Deleting expired VM {}", db_vm.id); + self.provisioner.delete_vm(db_vm.id).await?; + } + } + + Ok(()) + } + /// Check a VM's status async fn check_vm(&self, vm_id: u64) -> Result<()> { debug!("Checking VM: {}", vm_id); let vm = self.db.get_vm(vm_id).await?; let host = self.db.get_host(vm.host_id).await?; - let client = ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token); + let client = get_host_client(&host)?; match client.get_vm_status(&host.name, (vm.id + 100) as i32).await { - Ok(s) => { - let state = VmState { - state: match s.status { - VmStatus::Stopped => VmRunningState::Stopped, - VmStatus::Running => VmRunningState::Running, - }, - cpu_usage: s.cpu.unwrap_or(0.0), - mem_usage: s.mem.unwrap_or(0) as f32 / s.max_mem.unwrap_or(1) as f32, - uptime: s.uptime.unwrap_or(0), - net_in: s.net_in.unwrap_or(0), - net_out: s.net_out.unwrap_or(0), - disk_write: s.disk_write.unwrap_or(0), - disk_read: s.disk_read.unwrap_or(0), - }; - self.vm_state_cache.set_state(vm_id, state).await?; - } - Err(e) => { - warn!("Failed to get VM {} status: {}", vm.id, e); + Ok(s) => self.handle_vm_info(s).await?, + Err(_) => { if vm.expires > Utc::now() { self.provisioner.spawn_vm(vm.id).await?; } @@ -78,6 +100,24 @@ impl Worker { Ok(()) } + pub async fn check_vms(&self) -> Result<()> { + let hosts = self.db.list_hosts().await?; + for host in hosts { + let client = get_host_client(&host)?; + + for node in client.list_nodes().await? { + info!("Checking vms for {}", node.name); + for vm in client.list_vms(&node.name).await? { + info!("\t{}: {:?}", vm.vm_id, vm.status); + if let Err(e) = self.handle_vm_info(vm).await { + error!("{}", e); + } + } + } + } + Ok(()) + } + pub async fn handle(&mut self) -> Result<()> { while let Some(job) = self.rx.recv().await { match job { @@ -87,6 +127,11 @@ impl Worker { } } WorkJob::SendNotification { .. } => {} + WorkJob::CheckVms => { + if let Err(e) = self.check_vms().await { + error!("Failed to check VMs: {}", e); + } + } } } Ok(())