diff --git a/src/api.rs b/src/api.rs index 32b3f21..00fa3be 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,6 @@ use crate::nip98::Nip98Auth; use crate::provisioner::Provisioner; +use crate::status::{VmState, VmStateCache}; use lnvps_db::hydrate::Hydrate; use lnvps_db::{LNVpsDb, UserSshKey, Vm, VmOsImage, VmPayment, VmTemplate}; use nostr::util::hex; @@ -60,22 +61,37 @@ impl From for ApiError { } } +#[derive(Serialize)] +struct ApiVmStatus { + #[serde(flatten)] + pub vm: Vm, + pub status: VmState, +} + #[get("/api/v1/vm")] -async fn v1_list_vms(auth: Nip98Auth, db: &State>) -> ApiResult> { +async fn v1_list_vms(auth: Nip98Auth, db: &State>, vm_state: &State) -> ApiResult> { let pubkey = auth.event.pubkey.to_bytes(); let uid = db.upsert_user(&pubkey).await?; let mut vms = db.list_user_vms(uid).await?; - for vm in &mut vms { + let mut ret = vec![]; + for mut vm in vms { vm.hydrate_up(db).await?; if let Some(t) = &mut vm.template { t.hydrate_up(db).await?; } + + let state = vm_state.get_state(vm.id).await; + ret.push(ApiVmStatus { + vm, + status: state, + }); } - ApiData::ok(vms) + + ApiData::ok(ret) } #[get("/api/v1/vm/")] -async fn v1_get_vm(auth: Nip98Auth, db: &State>, id: u64) -> ApiResult { +async fn v1_get_vm(auth: Nip98Auth, db: &State>, vm_state: &State, id: u64) -> ApiResult { let pubkey = auth.event.pubkey.to_bytes(); let uid = db.upsert_user(&pubkey).await?; let mut vm = db.get_vm(id).await?; @@ -86,7 +102,11 @@ async fn v1_get_vm(auth: Nip98Auth, db: &State>, id: u64) -> Ap if let Some(t) = &mut vm.template { t.hydrate_up(db).await?; } - ApiData::ok(vm) + let state = vm_state.get_state(vm.id).await; + ApiData::ok(ApiVmStatus { + vm, + status: state, + }) } #[get("/api/v1/image")] diff --git a/src/bin/api.rs b/src/bin/api.rs index 218cf93..86b74b7 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -6,6 +6,7 @@ use lnvps::cors::CORS; use lnvps::invoice::InvoiceHandler; use lnvps::provisioner::lnvps::LNVpsProvisioner; use lnvps::provisioner::Provisioner; +use lnvps::status::VmStateCache; use lnvps::worker::{WorkJob, Worker}; use lnvps_db::{LNVpsDb, LNVpsDbMysql}; use log::error; @@ -47,7 +48,8 @@ async fn main() -> Result<(), Error> { provisioner.auto_discover().await?; } - let mut worker = Worker::new(db.clone(), lnd.clone()); + let status = VmStateCache::new(); + let mut worker = Worker::new(db.clone(), lnd.clone(), status.clone()); let sender = worker.sender(); tokio::spawn(async move { loop { @@ -86,6 +88,7 @@ async fn main() -> Result<(), Error> { .attach(CORS) .manage(db) .manage(pv) + .manage(status) .mount("/", api::routes()) .launch() .await diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index fda37c3..fa35273 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -165,6 +165,16 @@ pub struct VmInfo { pub name: Option, pub tags: Option, pub uptime: Option, + pub cpu: Option, + pub mem: Option, + #[serde(rename = "netin")] + pub net_in: Option, + #[serde(rename = "netout")] + pub net_out: Option, + #[serde(rename = "diskwrite")] + pub disk_write: Option, + #[serde(rename = "diskread")] + pub disk_read: Option, } #[derive(Debug, Deserialize)] diff --git a/src/lib.rs b/src/lib.rs index 8227cdf..a12a2df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,4 @@ pub mod invoice; pub mod nip98; pub mod provisioner; pub mod worker; +pub mod status; diff --git a/src/status.rs b/src/status.rs new file mode 100644 index 0000000..60df68c --- /dev/null +++ b/src/status.rs @@ -0,0 +1,52 @@ +use anyhow::Result; +use serde::Serialize; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone, Serialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum VmRunningState { + Running, + #[default] + Stopped, + Starting, + Deleting, +} + +#[derive(Clone, Serialize, Default)] +pub struct VmState { + pub state: VmRunningState, + pub cpu_usage: f32, + pub mem_usage: f32, + pub uptime: u64, + pub net_in: u64, + pub net_out: u64, + pub disk_write: u64, + pub disk_read: u64, +} + +/// Stores a cached vm status which is used to serve to api clients +#[derive(Clone)] +pub struct VmStateCache { + state: Arc>>, +} + +impl VmStateCache { + pub fn new() -> Self { + Self { + state: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn set_state(&self, id: u64, state: VmState) -> Result<()> { + let mut guard = self.state.write().await; + guard.insert(id, state); + Ok(()) + } + + pub async fn get_state(&self, id: u64) -> VmState { + let guard = self.state.read().await; + guard.get(&id).cloned().unwrap_or_default() + } +} \ No newline at end of file diff --git a/src/worker.rs b/src/worker.rs index 0bbc04b..e786bd3 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,6 +1,7 @@ -use crate::host::proxmox::{CreateVm, ProxmoxClient, VmBios}; +use crate::host::proxmox::{CreateVm, ProxmoxClient, VmBios, VmStatus}; use crate::provisioner::lnvps::LNVpsProvisioner; use crate::provisioner::Provisioner; +use crate::status::{VmRunningState, VmState, VmStateCache}; use anyhow::{bail, Result}; use fedimint_tonic_lnd::Client; use ipnetwork::IpNetwork; @@ -10,6 +11,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; pub enum WorkJob { /// Check the VM status matches database state + /// /// This job starts a vm if stopped and also creates the vm if it doesn't exist yet CheckVm { vm_id: u64 }, /// Send a notification to the users chosen contact preferences @@ -20,17 +22,19 @@ pub struct Worker { db: Box, lnd: Client, provisioner: Box, + vm_state_cache: VmStateCache, tx: UnboundedSender, rx: UnboundedReceiver, } impl Worker { - pub fn new(db: D, lnd: Client) -> Self { + pub fn new(db: D, lnd: Client, vm_state_cache: VmStateCache) -> Self { let (tx, rx) = unbounded_channel(); let p = LNVpsProvisioner::new(db.clone(), lnd.clone()); Self { db: Box::new(db), provisioner: Box::new(p), + vm_state_cache, lnd, tx, rx, @@ -88,7 +92,6 @@ impl Worker { scsi_1: Some(format!("{}:cloudinit", &drive.name)), scsi_hw: Some("virtio-scsi-pci".to_string()), ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()), - tags: Some("lnvps.net".to_string()), efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)), ..Default::default() }) @@ -107,6 +110,20 @@ impl Worker { match client.get_vm_status(&host.name, (vm.id + 100) as i32).await { Ok(s) => { info!("VM {} status: {:?}", vm_id, s.status); + let state = VmState { + state: match s.status { + VmStatus::Stopped => VmRunningState::Stopped, + VmStatus::Running => VmRunningState::Running + }, + cpu_usage: s.cpu.unwrap_or(0.0), + mem_usage: s.mem.unwrap_or(0) as f32 / s.max_mem.unwrap_or(1) as f32, + uptime: s.uptime.unwrap_or(0), + net_in: s.net_in.unwrap_or(0), + net_out: s.net_out.unwrap_or(0), + disk_write: s.disk_write.unwrap_or(0), + disk_read: s.disk_read.unwrap_or(0), + }; + self.vm_state_cache.set_state(vm_id, state).await?; } Err(e) => { warn!("Failed to get VM status: {}", e);