diff --git a/lnvps_db/src/lib.rs b/lnvps_db/src/lib.rs index 7b11639..b50c655 100644 --- a/lnvps_db/src/lib.rs +++ b/lnvps_db/src/lib.rs @@ -62,6 +62,9 @@ pub trait LNVpsDb: Sync + Send { /// Get a specific host disk async fn get_host_disk(&self, disk_id: u64) -> Result; + /// Update a host disk + async fn update_host_disk(&self, disk: &VmHostDisk) -> Result<()>; + /// Get OS image by id async fn get_os_image(&self, id: u64) -> Result; diff --git a/lnvps_db/src/model.rs b/lnvps_db/src/model.rs index 02d6e2c..ffe275b 100644 --- a/lnvps_db/src/model.rs +++ b/lnvps_db/src/model.rs @@ -121,6 +121,15 @@ impl FromStr for DiskType { } } +impl Display for DiskType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DiskType::HDD => write!(f, "hdd"), + DiskType::SSD => write!(f, "ssd"), + } + } +} + #[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)] #[repr(u16)] pub enum DiskInterface { @@ -143,6 +152,16 @@ impl FromStr for DiskInterface { } } +impl Display for DiskInterface { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DiskInterface::SATA => write!(f, "sata"), + DiskInterface::SCSI => write!(f, "scsi"), + DiskInterface::PCIe => write!(f, "pcie"), + } + } +} + #[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)] #[repr(u16)] pub enum OsDistribution { diff --git a/lnvps_db/src/mysql.rs b/lnvps_db/src/mysql.rs index e7a9915..bc3f644 100644 --- a/lnvps_db/src/mysql.rs +++ b/lnvps_db/src/mysql.rs @@ -62,13 +62,13 @@ impl LNVpsDb for LNVpsDbMysql { sqlx::query( "update users set email=?, contact_nip17=?, contact_email=?, country_code=? where id = ?", ) - .bind(&user.email) - .bind(user.contact_nip17) - .bind(user.contact_email) + .bind(&user.email) + .bind(user.contact_nip17) + .bind(user.contact_email) .bind(&user.country_code) - .bind(user.id) - .execute(&self.db) - .await?; + .bind(user.id) + .execute(&self.db) + .await?; Ok(()) } @@ -80,13 +80,13 @@ impl LNVpsDb for LNVpsDbMysql { Ok(sqlx::query( "insert into user_ssh_key(name,user_id,key_data) values(?, ?, ?) returning id", ) - .bind(&new_key.name) - .bind(new_key.user_id) - .bind(&new_key.key_data) - .fetch_one(&self.db) - .await - .map_err(Error::new)? - .try_get(0)?) + .bind(&new_key.name) + .bind(new_key.user_id) + .bind(&new_key.key_data) + .fetch_one(&self.db) + .await + .map_err(Error::new)? + .try_get(0)?) } async fn get_user_ssh_key(&self, id: u64) -> Result { @@ -174,6 +174,18 @@ impl LNVpsDb for LNVpsDbMysql { .map_err(Error::new) } + async fn update_host_disk(&self, disk: &VmHostDisk) -> Result<()> { + sqlx::query("update vm_host_disk set size=?,kind=?,interface=? where id=?") + .bind(disk.size) + .bind(disk.kind) + .bind(disk.interface) + .bind(disk.id) + .execute(&self.db) + .await + .map_err(Error::new)?; + Ok(()) + } + async fn get_os_image(&self, id: u64) -> Result { sqlx::query_as("select * from vm_os_image where id=?") .bind(id) @@ -324,16 +336,16 @@ impl LNVpsDb for LNVpsDbMysql { sqlx::query( "update vm set image_id=?,template_id=?,ssh_key_id=?,expires=?,disk_id=?,mac_address=? where id=?", ) - .bind(vm.image_id) - .bind(vm.template_id) - .bind(vm.ssh_key_id) - .bind(vm.expires) - .bind(vm.disk_id) - .bind(&vm.mac_address) - .bind(vm.id) - .execute(&self.db) - .await - .map_err(Error::new)?; + .bind(vm.image_id) + .bind(vm.template_id) + .bind(vm.ssh_key_id) + .bind(vm.expires) + .bind(vm.disk_id) + .bind(&vm.mac_address) + .bind(vm.id) + .execute(&self.db) + .await + .map_err(Error::new)?; Ok(()) } @@ -341,18 +353,18 @@ impl LNVpsDb for LNVpsDbMysql { Ok(sqlx::query( "insert into vm_ip_assignment(vm_id,ip_range_id,ip,arp_ref,dns_forward,dns_forward_ref,dns_reverse,dns_reverse_ref) values(?,?,?,?,?,?,?,?) returning id", ) - .bind(ip_assignment.vm_id) - .bind(ip_assignment.ip_range_id) - .bind(&ip_assignment.ip) - .bind(&ip_assignment.arp_ref) - .bind(&ip_assignment.dns_forward) - .bind(&ip_assignment.dns_forward_ref) - .bind(&ip_assignment.dns_reverse) - .bind(&ip_assignment.dns_reverse_ref) - .fetch_one(&self.db) - .await - .map_err(Error::new)? - .try_get(0)?) + .bind(ip_assignment.vm_id) + .bind(ip_assignment.ip_range_id) + .bind(&ip_assignment.ip) + .bind(&ip_assignment.arp_ref) + .bind(&ip_assignment.dns_forward) + .bind(&ip_assignment.dns_forward_ref) + .bind(&ip_assignment.dns_reverse) + .bind(&ip_assignment.dns_reverse_ref) + .fetch_one(&self.db) + .await + .map_err(Error::new)? + .try_get(0)?) } async fn update_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result<()> { @@ -477,9 +489,9 @@ impl LNVpsDb for LNVpsDbMysql { sqlx::query_as( "select * from vm_payment where is_paid = true order by created desc limit 1", ) - .fetch_optional(&self.db) - .await - .map_err(Error::new) + .fetch_optional(&self.db) + .await + .map_err(Error::new) } async fn list_custom_pricing(&self, region_id: u64) -> Result> { diff --git a/src/bin/api.rs b/src/bin/api.rs index 743b116..103d5f2 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -158,6 +158,9 @@ async fn main() -> Result<(), Error> { start_dvms(nostr_client.clone(), provisioner.clone()); } + // request for host info to be patched + sender.send(WorkJob::PatchHosts)?; + let mut config = rocket::Config::default(); let ip: SocketAddr = match &settings.listen { Some(i) => i.parse()?, diff --git a/src/host/mod.rs b/src/host/mod.rs index 4806172..b26d367 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -25,6 +25,8 @@ pub struct TerminalStream { /// Generic type for creating VM's #[async_trait] pub trait VmHostClient: Send + Sync { + async fn get_info(&self) -> Result; + /// Download OS image to the host async fn download_os_image(&self, image: &VmOsImage) -> Result<()>; @@ -202,3 +204,17 @@ pub enum TimeSeries { Monthly, Yearly, } + +#[derive(Debug, Clone)] +pub struct VmHostInfo { + pub cpu: u16, + pub memory: u64, + pub disks: Vec, +} + +#[derive(Debug, Clone)] +pub struct VmHostDiskInfo { + pub name: String, + pub size: u64, + pub used: u64, +} diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index 0174c26..cae8c38 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -1,4 +1,7 @@ -use crate::host::{FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient}; +use crate::host::{ + FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient, VmHostDiskInfo, + VmHostInfo, +}; use crate::json_api::JsonApi; use crate::settings::{QemuConfig, SshConfig}; use crate::ssh_client::SshClient; @@ -86,6 +89,14 @@ impl ProxmoxClient { Ok(rsp.data) } + pub async fn list_disks(&self, node: &str) -> Result> { + let rsp: ResponseBase> = self + .api + .get(&format!("/api2/json/nodes/{node}/disks/list")) + .await?; + Ok(rsp.data) + } + /// List files in a storage pool pub async fn list_storage_files( &self, @@ -477,6 +488,29 @@ impl ProxmoxClient { #[async_trait] impl VmHostClient for ProxmoxClient { + async fn get_info(&self) -> Result { + let nodes = self.list_nodes().await?; + if let Some(n) = nodes.iter().find(|n| n.name == self.node) { + let storages = self.list_storage(&n.name).await?; + let info = VmHostInfo { + cpu: n.max_cpu.unwrap_or(0), + memory: n.max_mem.unwrap_or(0), + disks: storages + .into_iter() + .map(|s| VmHostDiskInfo { + name: s.storage, + size: s.total.unwrap_or(0), + used: s.used.unwrap_or(0), + }) + .collect(), + }; + + Ok(info) + } else { + bail!("Could not find node {}", self.node); + } + } + async fn download_os_image(&self, image: &VmOsImage) -> Result<()> { let iso_storage = self.get_iso_storage(&self.node).await?; let files = self.list_storage_files(&self.node, &iso_storage).await?; @@ -878,9 +912,14 @@ pub struct NodeStorage { pub content: String, pub storage: String, #[serde(rename = "type")] - pub kind: Option, - #[serde(rename = "thinpool")] - pub thin_pool: Option, + pub kind: StorageType, + /// Available storage space in bytes + #[serde(rename = "avial")] + pub available: Option, + /// Total storage space in bytes + pub total: Option, + /// Used storage space in bytes + pub used: Option, } impl NodeStorage { @@ -891,6 +930,11 @@ impl NodeStorage { .collect() } } + +#[derive(Debug, Deserialize)] +pub struct NodeDisk { +} + #[derive(Debug, Serialize)] pub struct DownloadUrlRequest { pub content: StorageContent, diff --git a/src/worker.rs b/src/worker.rs index 32adfa4..c9a7cde 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -18,6 +18,8 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; #[derive(Debug)] pub enum WorkJob { + /// Sync resources from hosts to database + PatchHosts, /// Check all running VMS CheckVms, /// Check the VM status matches database state @@ -278,46 +280,92 @@ impl Worker { Ok(()) } - pub async fn handle(&mut self) -> Result<()> { - while let Some(job) = self.rx.recv().await { - match &job { - WorkJob::CheckVm { vm_id } => { - let vm = self.db.get_vm(*vm_id).await?; - if let Err(e) = self.check_vm(&vm).await { - error!("Failed to check VM {}: {}", vm_id, e); - self.queue_admin_notification( - format!("Failed to check VM {}:\n{:?}\n{}", vm_id, &job, e), - Some("Job Failed".to_string()), - )? + async fn try_job(&mut self, job: &WorkJob) -> Result<()> { + match job { + WorkJob::PatchHosts => { + let mut hosts = self.db.list_hosts().await?; + for mut host in &mut hosts { + let client = match get_host_client(host, &self.settings.provisioner_config) { + Ok(h) => h, + Err(e) => { + warn!("Failed to get host client: {} {}", host.name, e); + continue; + } + }; + let info = client.get_info().await?; + let needs_update = info.cpu != host.cpu || info.memory != host.memory; + if needs_update { + host.cpu = info.cpu; + host.memory = info.memory; + self.db.update_host(host).await?; + info!( + "Updated host {}: cpu={}, memory={}", + host.name, host.cpu, host.memory + ); + } + + let mut host_disks = self.db.list_host_disks(host.id).await?; + for disk in &info.disks { + if let Some(mut hd) = host_disks.iter_mut().find(|d| d.name == disk.name) { + if hd.size != disk.size { + hd.size = disk.size; + self.db.update_host_disk(hd).await?; + info!( + "Updated host disk {}: size={},type={},interface={}", + hd.name, hd.size, hd.kind, hd.interface + ); + } + } else { + warn!("Un-mapped host disk {}", disk.name); + } } } - WorkJob::SendNotification { - user_id, - message, - title, - } => { - if let Err(e) = self - .send_notification(*user_id, message.clone(), title.clone()) - .await - { - error!("Failed to send notification {}: {}", user_id, e); - self.queue_admin_notification( - format!("Failed to send notification:\n{:?}\n{}", &job, e), - Some("Job Failed".to_string()), - )? - } + } + WorkJob::CheckVm { vm_id } => { + let vm = self.db.get_vm(*vm_id).await?; + if let Err(e) = self.check_vm(&vm).await { + error!("Failed to check VM {}: {}", vm_id, e); + self.queue_admin_notification( + format!("Failed to check VM {}:\n{:?}\n{}", vm_id, &job, e), + Some("Job Failed".to_string()), + )? } - WorkJob::CheckVms => { - if let Err(e) = self.check_vms().await { - error!("Failed to check VMs: {}", e); - self.queue_admin_notification( - format!("Failed to check VM's:\n{:?}\n{}", &job, e), - Some("Job Failed".to_string()), - )? - } + } + WorkJob::SendNotification { + user_id, + message, + title, + } => { + if let Err(e) = self + .send_notification(*user_id, message.clone(), title.clone()) + .await + { + error!("Failed to send notification {}: {}", user_id, e); + self.queue_admin_notification( + format!("Failed to send notification:\n{:?}\n{}", &job, e), + Some("Job Failed".to_string()), + )? + } + } + WorkJob::CheckVms => { + if let Err(e) = self.check_vms().await { + error!("Failed to check VMs: {}", e); + self.queue_admin_notification( + format!("Failed to check VM's:\n{:?}\n{}", &job, e), + Some("Job Failed".to_string()), + )? } } } Ok(()) } + + pub async fn handle(&mut self) -> Result<()> { + while let Some(job) = self.rx.recv().await { + if let Err(e) = self.try_job(&job).await { + error!("Job failed to execute: {:?} {}", job, e); + } + } + Ok(()) + } }