From 5da01fbd502fd5a077016806ef1ce014986c7c0c Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 5 Mar 2025 14:11:16 +0000 Subject: [PATCH] feat: filter templates by available capacity --- src/api/routes.rs | 12 +++++----- src/api/webhook.rs | 10 +++++--- src/host/proxmox.rs | 21 +++++++--------- src/provisioner/capacity.rs | 48 +++++++++++++++++++++++++++++-------- src/provisioner/lnvps.rs | 11 +++------ src/router/mikrotik.rs | 29 +++++++++++----------- src/worker.rs | 6 ++--- 7 files changed, 79 insertions(+), 58 deletions(-) diff --git a/src/api/routes.rs b/src/api/routes.rs index 8f8cbfd..cda130e 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -4,7 +4,7 @@ use crate::api::model::{ }; use crate::host::{get_host_client, FullVmInfo}; use crate::nip98::Nip98Auth; -use crate::provisioner::LNVpsProvisioner; +use crate::provisioner::{HostCapacityService, LNVpsProvisioner}; use crate::settings::Settings; use crate::status::{VmState, VmStateCache}; use crate::worker::WorkJob; @@ -231,8 +231,8 @@ async fn v1_patch_vm( let mut ips = db.list_vm_ip_assignments(vm.id).await?; for mut ip in ips.iter_mut() { ip.dns_reverse = Some(ptr.to_string()); - provisioner.update_reverse_ip_dns(&mut ip).await?; - db.update_vm_ip_assignment(&ip).await?; + provisioner.update_reverse_ip_dns(ip).await?; + db.update_vm_ip_assignment(ip).await?; } } @@ -264,7 +264,8 @@ async fn v1_list_vm_images(db: &State>) -> ApiResult>) -> ApiResult> { - let templates = db.list_vm_templates().await?; + let hc = HostCapacityService::new((*db).clone()); + let templates = hc.list_available_vm_templates().await?; let cost_plans: HashSet = templates.iter().map(|t| t.cost_plan_id).collect(); let regions: HashSet = templates.iter().map(|t| t.region_id).collect(); @@ -294,7 +295,6 @@ async fn v1_list_vm_templates(db: &State>) -> ApiResult>, provisioner: &State>, - req: Json + req: Json, ) -> ApiResult { let pubkey = auth.event.pubkey.to_bytes(); let uid = db.upsert_user(&pubkey).await?; diff --git a/src/api/webhook.rs b/src/api/webhook.rs index 5dc3bbf..85a9ead 100644 --- a/src/api/webhook.rs +++ b/src/api/webhook.rs @@ -1,15 +1,13 @@ use log::warn; -use reqwest::header::HeaderMap; use rocket::data::{FromData, ToByteUnit}; use rocket::http::Status; use rocket::{post, routes, Data, Route}; use std::collections::HashMap; use std::sync::LazyLock; -use tokio::io::AsyncReadExt; use tokio::sync::broadcast; /// Messaging bridge for webhooks to other parts of the system (bitvora) -pub static WEBHOOK_BRIDGE: LazyLock = LazyLock::new(|| WebhookBridge::new()); +pub static WEBHOOK_BRIDGE: LazyLock = LazyLock::new(WebhookBridge::new); pub fn routes() -> Vec { if cfg!(feature = "bitvora") { @@ -61,6 +59,12 @@ pub struct WebhookBridge { tx: broadcast::Sender, } +impl Default for WebhookBridge { + fn default() -> Self { + Self::new() + } +} + impl WebhookBridge { pub fn new() -> Self { let (tx, _rx) = broadcast::channel(100); diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index 4571c88..8cee4ab 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -5,9 +5,8 @@ use crate::ssh_client::SshClient; use crate::status::{VmRunningState, VmState}; use anyhow::{anyhow, bail, ensure, Result}; use chrono::Utc; -use futures::future::join_all; use ipnetwork::IpNetwork; -use lnvps_db::{async_trait, DiskType, IpRange, LNVpsDb, Vm, VmIpAssignment, VmOsImage}; +use lnvps_db::{async_trait, DiskType, Vm, VmOsImage}; use log::{info, warn}; use rand::random; use reqwest::header::{HeaderMap, AUTHORIZATION}; @@ -469,7 +468,7 @@ impl VmHostClient for ProxmoxClient { } async fn create_vm(&self, req: &FullVmInfo) -> Result<()> { - let config = self.make_config(&req)?; + let config = self.make_config(req)?; let vm_id = req.vm.id.into(); let t_create = self .create_vm(CreateVm { @@ -481,7 +480,7 @@ impl VmHostClient for ProxmoxClient { self.wait_for_task(&t_create).await?; // import primary disk from image (scsi0) - if let Err(e) = self + self .import_disk_image(ImportDiskImageRequest { vm_id, node: self.node.clone(), @@ -490,11 +489,7 @@ impl VmHostClient for ProxmoxClient { image: req.image.filename()?, is_ssd: matches!(req.disk.kind, DiskType::SSD), }) - .await - { - // TODO: rollback - return Err(e); - } + .await?; // resize disk to match template let j_resize = self @@ -537,7 +532,7 @@ impl VmHostClient for ProxmoxClient { } async fn configure_vm(&self, cfg: &FullVmInfo) -> Result<()> { - let mut config = self.make_config(&cfg)?; + let mut config = self.make_config(cfg)?; // dont re-create the disks config.scsi_0 = None; @@ -560,9 +555,9 @@ impl VmHostClient for ProxmoxClient { #[derive(Debug, Copy, Clone, Default)] pub struct ProxmoxVmId(u64); -impl Into for ProxmoxVmId { - fn into(self) -> i32 { - self.0 as i32 + 100 +impl From for i32 { + fn from(val: ProxmoxVmId) -> Self { + val.0 as i32 + 100 } } diff --git a/src/provisioner/capacity.rs b/src/provisioner/capacity.rs index 97f5d64..6240179 100644 --- a/src/provisioner/capacity.rs +++ b/src/provisioner/capacity.rs @@ -4,7 +4,7 @@ use lnvps_db::{DiskType, LNVpsDb, VmHost, VmHostDisk, VmTemplate}; use std::collections::HashMap; use std::sync::Arc; -/// Simple capacity reporting per node +/// Simple capacity management #[derive(Clone)] pub struct HostCapacityService { /// Database @@ -16,6 +16,27 @@ impl HostCapacityService { Self { db } } + /// List templates which can be sold, based on available capacity + pub async fn list_available_vm_templates(&self) -> Result> { + let templates = self.db.list_vm_templates().await?; + + // TODO: list hosts in regions where templates are active? + // use all hosts since we dont expect there to be many + let hosts = self.db.list_hosts().await?; + let caps: Vec> = + join_all(hosts.iter().map(|h| self.get_host_capacity(h, None))).await; + let caps: Vec = caps.into_iter().filter_map(Result::ok).collect(); + + Ok(templates + .into_iter() + .filter(|t| { + caps.iter() + .filter(|c| c.host.region_id == t.region_id) + .any(|c| c.can_accommodate(t)) + }) + .collect()) + } + /// Pick a host for the purposes of provisioning a new VM pub async fn get_host_for_template(&self, template: &VmTemplate) -> Result { let hosts = self.db.list_hosts().await?; @@ -30,13 +51,7 @@ impl HostCapacityService { let mut host_cap: Vec = caps .into_iter() .filter_map(|v| v.ok()) - .filter(|v| { - v.available_cpu() >= template.cpu - && v.available_memory() >= template.memory - && v.disks - .iter() - .any(|d| d.available_capacity() >= template.disk_size) - }) + .filter(|v| v.can_accommodate(template)) .collect(); host_cap.sort_by(|a, b| a.load().partial_cmp(&b.load()).unwrap()); @@ -65,8 +80,7 @@ impl HostCapacityService { .filter_map(|v| { templates .iter() - .find(|t| t.id == v.template_id) - .and_then(|t| Some((v.id, t))) + .find(|t| t.id == v.template_id).map(|t| (v.id, t)) }) .collect(); @@ -147,6 +161,16 @@ impl HostCapacity { pub fn disk_load(&self) -> f32 { self.disks.iter().fold(0.0, |acc, disk| acc + disk.load()) / self.disks.len() as f32 } + + /// Can this host and its available capacity accommodate the given template + pub fn can_accommodate(&self, template: &VmTemplate) -> bool { + self.available_cpu() >= template.cpu + && self.available_memory() >= template.memory + && self + .disks + .iter() + .any(|d| d.available_capacity() >= template.disk_size) + } } #[derive(Debug, Clone)] @@ -230,6 +254,10 @@ mod tests { let host = hc.get_host_for_template(&template).await?; assert_eq!(host.host.id, 1); + // all templates should be available + let templates = hc.list_available_vm_templates().await?; + assert_eq!(templates.len(), db.list_vm_templates().await?.len()); + Ok(()) } } diff --git a/src/provisioner/lnvps.rs b/src/provisioner/lnvps.rs index 7baab4f..5cfa538 100644 --- a/src/provisioner/lnvps.rs +++ b/src/provisioner/lnvps.rs @@ -3,21 +3,16 @@ use crate::exchange::{ExchangeRateService, Ticker}; use crate::host::{get_host_client, FullVmInfo}; use crate::lightning::{AddInvoiceRequest, LightningNode}; use crate::provisioner::{ - HostCapacity, HostCapacityService, NetworkProvisioner, ProvisionerMethod, + HostCapacityService, NetworkProvisioner, ProvisionerMethod, }; use crate::router::{ArpEntry, Router}; use crate::settings::{NetworkAccessPolicy, NetworkPolicy, ProvisionerConfig, Settings}; use anyhow::{bail, ensure, Context, Result}; use chrono::{Days, Months, Utc}; -use futures::future::join_all; -use lnvps_db::{IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment}; +use lnvps_db::{LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment}; use log::{info, warn}; use nostr::util::hex; -use rand::random; -use std::collections::HashSet; -use std::net::IpAddr; use std::ops::Add; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -63,7 +58,7 @@ impl LNVpsProvisioner { if let NetworkAccessPolicy::StaticArp { interface } = &self.network_policy.access { if let Some(r) = self.router.as_ref() { let vm = self.db.get_vm(assignment.vm_id).await?; - let entry = ArpEntry::new(&vm, &assignment, Some(interface.clone()))?; + let entry = ArpEntry::new(&vm, assignment, Some(interface.clone()))?; let arp = if let Some(_id) = &assignment.arp_ref { r.update_arp_entry(&entry).await? } else { diff --git a/src/router/mikrotik.rs b/src/router/mikrotik.rs index 89a8df2..74d5af0 100644 --- a/src/router/mikrotik.rs +++ b/src/router/mikrotik.rs @@ -7,7 +7,6 @@ use log::debug; use reqwest::Method; use rocket::async_trait; use serde::{Deserialize, Serialize}; -use std::net::IpAddr; pub struct MikrotikRouter { api: JsonApi, @@ -78,26 +77,26 @@ pub struct MikrotikArpEntry { pub comment: Option, } -impl Into for MikrotikArpEntry { - fn into(self) -> ArpEntry { +impl From for ArpEntry { + fn from(val: MikrotikArpEntry) -> Self { ArpEntry { - id: self.id, - address: self.address, - mac_address: self.mac_address.unwrap(), - interface: Some(self.interface), - comment: self.comment, + id: val.id, + address: val.address, + mac_address: val.mac_address.unwrap(), + interface: Some(val.interface), + comment: val.comment, } } } -impl Into for ArpEntry { - fn into(self) -> MikrotikArpEntry { +impl From for MikrotikArpEntry { + fn from(val: ArpEntry) -> Self { MikrotikArpEntry { - id: self.id, - address: self.address, - mac_address: Some(self.mac_address), - interface: self.interface.unwrap(), - comment: self.comment, + id: val.id, + address: val.address, + mac_address: Some(val.mac_address), + interface: val.interface.unwrap(), + comment: val.comment, } } } diff --git a/src/worker.rs b/src/worker.rs index 790d9eb..ff56c8a 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -146,9 +146,9 @@ impl Worker { let host = self.db.get_host(vm.host_id).await?; let client = get_host_client(&host, &self.settings.provisioner_config)?; - match client.get_vm_state(&vm).await { + match client.get_vm_state(vm).await { Ok(s) => { - self.handle_vm_state(&vm, &s).await?; + self.handle_vm_state(vm, &s).await?; self.vm_state_cache.set_state(vm.id, s).await?; } Err(_) => { @@ -191,7 +191,7 @@ impl Worker { let db_vms = self.db.list_vms().await?; for vm in &db_vms { // Refresh VM status if active - self.check_vm(&vm).await?; + self.check_vm(vm).await?; // delete vm if not paid (in new state) if vm.expires < Utc::now().sub(Days::new(1)) {