From a212ed661a8828be307525fca823a6c917a5ab2c Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 5 Mar 2025 13:25:27 +0000 Subject: [PATCH] feat: get host for template feat: apply load factor --- .../migrations/20250305132119_load_factor.sql | 3 + lnvps_db/src/lib.rs | 3 + lnvps_db/src/model.rs | 15 +- lnvps_db/src/mysql.rs | 21 ++- src/api/webhook.rs | 6 +- src/dns/mod.rs | 1 - src/mocks.rs | 43 ++--- src/provisioner/capacity.rs | 166 ++++++++++++++++-- src/provisioner/lnvps.rs | 99 ++++++++--- 9 files changed, 273 insertions(+), 84 deletions(-) create mode 100644 lnvps_db/migrations/20250305132119_load_factor.sql diff --git a/lnvps_db/migrations/20250305132119_load_factor.sql b/lnvps_db/migrations/20250305132119_load_factor.sql new file mode 100644 index 0000000..4c3b9fb --- /dev/null +++ b/lnvps_db/migrations/20250305132119_load_factor.sql @@ -0,0 +1,3 @@ +-- Add migration script here +alter table vm_host + add column load_factor float not null default 1.0; \ No newline at end of file diff --git a/lnvps_db/src/lib.rs b/lnvps_db/src/lib.rs index 9c3cd29..1008d8d 100644 --- a/lnvps_db/src/lib.rs +++ b/lnvps_db/src/lib.rs @@ -80,6 +80,9 @@ pub trait LNVpsDb: Sync + Send { /// List VM templates async fn list_vm_templates(&self) -> Result>; + /// Insert a new VM template + async fn insert_vm_template(&self, template: &VmTemplate) -> Result; + /// List all VM's async fn list_vms(&self) -> Result>; diff --git a/lnvps_db/src/model.rs b/lnvps_db/src/model.rs index dff679a..ed14d03 100644 --- a/lnvps_db/src/model.rs +++ b/lnvps_db/src/model.rs @@ -31,10 +31,11 @@ pub struct UserSshKey { pub key_data: String, } -#[derive(Clone, Debug, sqlx::Type)] +#[derive(Clone, Debug, sqlx::Type, Default, PartialEq, Eq)] #[repr(u16)] /// The type of VM host pub enum VmHostKind { + #[default] Proxmox = 0, LibVirt = 1, } @@ -55,7 +56,7 @@ pub struct VmHostRegion { pub enabled: bool, } -#[derive(FromRow, Clone, Debug)] +#[derive(FromRow, Clone, Debug, Default)] /// A VM host pub struct VmHost { /// Unique id of this host @@ -76,9 +77,11 @@ pub struct VmHost { pub enabled: bool, /// API token used to control this host via [ip] pub api_token: String, + /// Load factor for provisioning + pub load_factor: f32, } -#[derive(FromRow, Clone, Debug)] +#[derive(FromRow, Clone, Debug, Default)] pub struct VmHostDisk { pub id: u64, pub host_id: u64, @@ -89,7 +92,7 @@ pub struct VmHostDisk { pub enabled: bool, } -#[derive(Clone, Debug, sqlx::Type, Default)] +#[derive(Clone, Debug, sqlx::Type, Default, PartialEq, Eq)] #[repr(u16)] pub enum DiskType { #[default] @@ -97,7 +100,7 @@ pub enum DiskType { SSD = 1, } -#[derive(Clone, Debug, sqlx::Type, Default)] +#[derive(Clone, Debug, sqlx::Type, Default, PartialEq, Eq)] #[repr(u16)] pub enum DiskInterface { #[default] @@ -106,7 +109,7 @@ pub enum DiskInterface { PCIe = 2, } -#[derive(Clone, Debug, sqlx::Type, Default)] +#[derive(Clone, Debug, sqlx::Type, Default, PartialEq, Eq)] #[repr(u16)] pub enum OsDistribution { #[default] diff --git a/lnvps_db/src/mysql.rs b/lnvps_db/src/mysql.rs index 6701d97..089028b 100644 --- a/lnvps_db/src/mysql.rs +++ b/lnvps_db/src/mysql.rs @@ -114,7 +114,7 @@ impl LNVpsDb for LNVpsDbMysql { } async fn list_hosts(&self) -> Result> { - sqlx::query_as("select * from vm_host") + sqlx::query_as("select * from vm_host where enabled = 1") .fetch_all(&self.db) .await .map_err(Error::new) @@ -216,6 +216,25 @@ impl LNVpsDb for LNVpsDbMysql { .map_err(Error::new) } + async fn insert_vm_template(&self, template: &VmTemplate) -> Result { + Ok(sqlx::query("insert into vm_template(name,enabled,created,expires,cpu,memory,disk_size,disk_type,disk_interface,cost_plan_id,region_id) values(?,?,?,?,?,?,?,?,?,?,?) returning id") + .bind(&template.name) + .bind(&template.enabled) + .bind(&template.created) + .bind(&template.expires) + .bind(template.cpu) + .bind(template.memory) + .bind(template.disk_size) + .bind(&template.disk_type) + .bind(&template.disk_interface) + .bind(template.cost_plan_id) + .bind(template.region_id) + .fetch_one(&self.db) + .await + .map_err(Error::new)? + .try_get(0)?) + } + async fn list_vms(&self) -> Result> { sqlx::query_as("select * from vm where deleted = 0") .fetch_all(&self.db) diff --git a/src/api/webhook.rs b/src/api/webhook.rs index 4eccd34..5dc3bbf 100644 --- a/src/api/webhook.rs +++ b/src/api/webhook.rs @@ -1,11 +1,7 @@ -use lettre::message::header::Headers; use log::warn; use reqwest::header::HeaderMap; -use reqwest::Request; -use rocket::data::{ByteUnit, FromData, ToByteUnit}; +use rocket::data::{FromData, ToByteUnit}; use rocket::http::Status; -use rocket::outcome::IntoOutcome; -use rocket::request::{FromRequest, Outcome}; use rocket::{post, routes, Data, Route}; use std::collections::HashMap; use std::sync::LazyLock; diff --git a/src/dns/mod.rs b/src/dns/mod.rs index d1027db..957e5fc 100644 --- a/src/dns/mod.rs +++ b/src/dns/mod.rs @@ -1,6 +1,5 @@ use anyhow::{bail, Context, Result}; use lnvps_db::{async_trait, VmIpAssignment}; -use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; use std::net::IpAddr; use std::str::FromStr; diff --git a/src/mocks.rs b/src/mocks.rs index 2e3e7aa..4af7089 100644 --- a/src/mocks.rs +++ b/src/mocks.rs @@ -76,9 +76,10 @@ impl Default for MockDb { name: "mock-host".to_string(), ip: "https://localhost".to_string(), cpu: 4, - memory: 8192, + memory: 8 * GB, enabled: true, api_token: "".to_string(), + load_factor: 1.5, }, ); let mut host_disks = HashMap::new(); @@ -209,10 +210,7 @@ impl LNVpsDb for MockDb { max_keys + 1, UserSshKey { id: max_keys + 1, - name: new_key.name.clone(), - user_id: new_key.user_id, - created: Utc::now(), - key_data: new_key.key_data.clone(), + ..new_key.clone() }, ); Ok(max_keys + 1) @@ -321,6 +319,19 @@ impl LNVpsDb for MockDb { .collect()) } + async fn insert_vm_template(&self, template: &VmTemplate) -> anyhow::Result { + let mut templates = self.templates.lock().await; + let max_id = *templates.keys().max().unwrap_or(&0); + templates.insert( + max_id + 1, + VmTemplate { + id: max_id + 1, + ..template.clone() + }, + ); + Ok(max_id + 1) + } + async fn list_vms(&self) -> anyhow::Result> { let vms = self.vms.lock().await; Ok(vms.values().filter(|v| !v.deleted).cloned().collect()) @@ -374,17 +385,7 @@ impl LNVpsDb for MockDb { max_id + 1, Vm { id: max_id + 1, - host_id: vm.host_id, - user_id: vm.user_id, - image_id: vm.image_id, - template_id: vm.template_id, - ssh_key_id: vm.ssh_key_id, - created: Utc::now(), - expires: Utc::now(), - disk_id: vm.disk_id, - mac_address: vm.mac_address.clone(), - deleted: false, - ref_code: vm.ref_code.clone(), + ..vm.clone() }, ); Ok(max_id + 1) @@ -411,15 +412,7 @@ impl LNVpsDb for MockDb { max + 1, VmIpAssignment { id: max + 1, - vm_id: ip_assignment.vm_id, - ip_range_id: ip_assignment.ip_range_id, - ip: ip_assignment.ip.clone(), - deleted: false, - arp_ref: ip_assignment.arp_ref.clone(), - dns_forward: ip_assignment.dns_forward.clone(), - dns_forward_ref: ip_assignment.dns_forward_ref.clone(), - dns_reverse: ip_assignment.dns_reverse.clone(), - dns_reverse_ref: ip_assignment.dns_reverse_ref.clone(), + ..ip_assignment.clone() }, ); Ok(max + 1) diff --git a/src/provisioner/capacity.rs b/src/provisioner/capacity.rs index f9b6b6e..97f5d64 100644 --- a/src/provisioner/capacity.rs +++ b/src/provisioner/capacity.rs @@ -1,21 +1,61 @@ -use anyhow::Result; -use lnvps_db::{LNVpsDb, VmHost, VmHostDisk, VmTemplate}; +use anyhow::{bail, Result}; +use futures::future::join_all; +use lnvps_db::{DiskType, LNVpsDb, VmHost, VmHostDisk, VmTemplate}; use std::collections::HashMap; use std::sync::Arc; /// Simple capacity reporting per node #[derive(Clone)] -pub struct HostCapacity { +pub struct HostCapacityService { + /// Database db: Arc, } -impl HostCapacity { +impl HostCapacityService { pub fn new(db: Arc) -> Self { Self { db } } - pub async fn get_available_capacity(&self, host: &VmHost) -> Result { + /// Pick a host for the purposes of provisioning a new VM + pub async fn get_host_for_template(&self, template: &VmTemplate) -> Result { + let hosts = self.db.list_hosts().await?; + let caps: Vec> = join_all( + hosts + .iter() + .filter(|h| h.region_id == template.region_id) + // TODO: filter disk interface? + .map(|h| self.get_host_capacity(h, Some(template.disk_type.clone()))), + ) + .await; + let mut host_cap: Vec = caps + .into_iter() + .filter_map(|v| v.ok()) + .filter(|v| { + v.available_cpu() >= template.cpu + && v.available_memory() >= template.memory + && v.disks + .iter() + .any(|d| d.available_capacity() >= template.disk_size) + }) + .collect(); + + host_cap.sort_by(|a, b| a.load().partial_cmp(&b.load()).unwrap()); + + if let Some(f) = host_cap.into_iter().next() { + Ok(f) + } else { + bail!("No available hosts found"); + } + } + + /// Get available capacity of a given host + pub async fn get_host_capacity( + &self, + host: &VmHost, + disk_type: Option, + ) -> Result { let vms = self.db.list_vms_on_host(host.id).await?; + // TODO: filter disks from DB? Should be very few disks anyway let storage = self.db.list_host_disks(host.id).await?; let templates = self.db.list_vm_templates().await?; @@ -30,43 +70,89 @@ impl HostCapacity { }) .collect(); - let storage_disks: Vec = storage + let mut storage_disks: Vec = storage .iter() + .filter(|d| disk_type.as_ref().map(|t| d.kind == *t).unwrap_or(true)) .map(|s| { let usage = vm_template .iter() .filter(|(k, v)| v.id == s.id) .fold(0, |acc, (k, v)| acc + v.disk_size); DiskCapacity { + load_factor: host.load_factor, disk: s.clone(), usage, } }) .collect(); + storage_disks.sort_by(|a, b| a.load_factor.partial_cmp(&b.load_factor).unwrap()); + let cpu_consumed = vm_template.values().fold(0, |acc, vm| acc + vm.cpu); let memory_consumed = vm_template.values().fold(0, |acc, vm| acc + vm.memory); - Ok(AvailableCapacity { - cpu: host.cpu.saturating_sub(cpu_consumed), - memory: host.memory.saturating_sub(memory_consumed), + Ok(HostCapacity { + load_factor: host.load_factor, + host: host.clone(), + cpu: cpu_consumed, + memory: memory_consumed, disks: storage_disks, }) } } #[derive(Debug, Clone)] -pub struct AvailableCapacity { - /// Number of CPU cores available +pub struct HostCapacity { + /// Load factor applied to resource consumption + pub load_factor: f32, + /// The host + pub host: VmHost, + /// Number of consumed CPU cores pub cpu: u16, - /// Number of bytes of memory available + /// Number of consumed bytes of memory pub memory: u64, - /// List of disks on the host and its available space + /// List of disks on the host and its used space pub disks: Vec, } +impl HostCapacity { + /// Total average usage as a percentage + pub fn load(&self) -> f32 { + (self.cpu_load() + self.memory_load() + self.disk_load()) / 3.0 + } + + /// CPU usage as a percentage + pub fn cpu_load(&self) -> f32 { + self.cpu as f32 / (self.host.cpu as f32 * self.load_factor) + } + + /// Total number of available CPUs + pub fn available_cpu(&self) -> u16 { + let loaded_host_cpu = (self.host.cpu as f32 * self.load_factor).floor() as u16; + loaded_host_cpu.saturating_sub(self.cpu) + } + + /// Memory usage as a percentage + pub fn memory_load(&self) -> f32 { + self.memory as f32 / (self.host.memory as f32 * self.load_factor) + } + + /// Total available bytes of memory + pub fn available_memory(&self) -> u64 { + let loaded_host_memory = (self.host.memory as f64 * self.load_factor as f64).floor() as u64; + loaded_host_memory.saturating_sub(self.memory) + } + + /// Disk usage as a percentage (average over all disks) + pub fn disk_load(&self) -> f32 { + self.disks.iter().fold(0.0, |acc, disk| acc + disk.load()) / self.disks.len() as f32 + } +} + #[derive(Debug, Clone)] pub struct DiskCapacity { + /// Load factor applied to resource consumption + pub load_factor: f32, /// Disk ID pub disk: VmHostDisk, /// Space consumed by VMs @@ -74,8 +160,15 @@ pub struct DiskCapacity { } impl DiskCapacity { + /// Total available bytes of disk space pub fn available_capacity(&self) -> u64 { - self.disk.size.saturating_sub(self.usage) + let loaded_disk_size = (self.disk.size as f64 * self.load_factor as f64).floor() as u64; + loaded_disk_size.saturating_sub(self.usage) + } + + /// Disk usage as percentage + pub fn load(&self) -> f32 { + (self.usage as f32 / self.disk.size as f32) * (1.0 / self.load_factor) } } @@ -84,22 +177,59 @@ mod tests { use super::*; use crate::mocks::MockDb; + #[test] + fn loads() { + let cap = HostCapacity { + load_factor: 2.0, + host: VmHost { + cpu: 100, + memory: 100, + ..Default::default() + }, + cpu: 8, + memory: 8, + disks: vec![DiskCapacity { + load_factor: 2.0, + disk: VmHostDisk { + size: 100, + ..Default::default() + }, + usage: 8, + }], + }; + + // load factor halves load values 8/100 * (1/load_factor) + assert_eq!(cap.load(), 0.04); + assert_eq!(cap.cpu_load(), 0.04); + assert_eq!(cap.memory_load(), 0.04); + assert_eq!(cap.disk_load(), 0.04); + // load factor doubles memory to 200, 200 - 8 + assert_eq!(cap.available_memory(), 192); + assert_eq!(cap.available_cpu(), 192); + } + #[tokio::test] async fn empty_available_capacity() -> Result<()> { let db = Arc::new(MockDb::default()); - let hc = HostCapacity::new(db.clone()); + let hc = HostCapacityService::new(db.clone()); let host = db.get_host(1).await?; - let cap = hc.get_available_capacity(&host).await?; + let cap = hc.get_host_capacity(&host, None).await?; let disks = db.list_host_disks(1).await?; /// check all resources are available - assert_eq!(cap.cpu, host.cpu); - assert_eq!(cap.memory, host.memory); + assert_eq!(cap.cpu, 0); + assert_eq!(cap.memory, 0); assert_eq!(cap.disks.len(), disks.len()); + assert_eq!(cap.load(), 0.0); for disk in cap.disks { assert_eq!(0, disk.usage); + assert_eq!(disk.load(), 0.0); } + let template = db.get_vm_template(1).await?; + let host = hc.get_host_for_template(&template).await?; + assert_eq!(host.host.id, 1); + Ok(()) } } diff --git a/src/provisioner/lnvps.rs b/src/provisioner/lnvps.rs index 6e2025b..7baab4f 100644 --- a/src/provisioner/lnvps.rs +++ b/src/provisioner/lnvps.rs @@ -2,7 +2,9 @@ use crate::dns::{BasicRecord, DnsServer}; use crate::exchange::{ExchangeRateService, Ticker}; use crate::host::{get_host_client, FullVmInfo}; use crate::lightning::{AddInvoiceRequest, LightningNode}; -use crate::provisioner::{NetworkProvisioner, ProvisionerMethod}; +use crate::provisioner::{ + HostCapacity, HostCapacityService, NetworkProvisioner, ProvisionerMethod, +}; use crate::router::{ArpEntry, Router}; use crate::settings::{NetworkAccessPolicy, NetworkPolicy, ProvisionerConfig, Settings}; use anyhow::{bail, ensure, Context, Result}; @@ -255,33 +257,28 @@ impl LNVpsProvisioner { let template = self.db.get_vm_template(template_id).await?; let image = self.db.get_os_image(image_id).await?; let ssh_key = self.db.get_user_ssh_key(ssh_key_id).await?; - let hosts = self.db.list_hosts().await?; - // TODO: impl resource usage based provisioning - let pick_host = if let Some(h) = hosts.first() { - h - } else { - bail!("No host found") - }; - // TODO: impl resource usage based provisioning (disk) - let host_disks = self.db.list_host_disks(pick_host.id).await?; - let pick_disk = if let Some(hd) = host_disks.first() { + // TODO: cache capacity somewhere + let cap = HostCapacityService::new(self.db.clone()); + let host = cap.get_host_for_template(&template).await?; + + let pick_disk = if let Some(hd) = host.disks.first() { hd } else { bail!("No host disk found") }; - let client = get_host_client(&pick_host, &self.provisioner_config)?; + let client = get_host_client(&host.host, &self.provisioner_config)?; let mut new_vm = Vm { id: 0, - host_id: pick_host.id, + host_id: host.host.id, user_id: user.id, image_id: image.id, template_id: template.id, ssh_key_id: ssh_key.id, created: Utc::now(), expires: Utc::now(), - disk_id: pick_disk.id, + disk_id: pick_disk.disk.id, mac_address: "NOT FILLED YET".to_string(), deleted: false, ref_code, @@ -406,13 +403,14 @@ mod tests { use crate::exchange::DefaultRateCache; use crate::mocks::{MockDb, MockDnsServer, MockNode, MockRouter}; use crate::settings::{DnsServerConfig, LightningConfig, QemuConfig, RouterConfig}; - use lnvps_db::UserSshKey; + use lnvps_db::{DiskInterface, DiskType, User, UserSshKey, VmTemplate}; - #[tokio::test] - async fn test_basic_provisioner() -> Result<()> { - const ROUTER_BRIDGE: &str = "bridge1"; + const ROUTER_BRIDGE: &str = "bridge1"; + const GB: u64 = 1024 * 1024 * 1024; + const TB: u64 = GB * 1024; - let settings = Settings { + fn settings() -> Settings { + Settings { listen: None, db: "".to_string(), lightning: LightningConfig::LND { @@ -452,18 +450,14 @@ mod tests { reverse_zone_id: "456".to_string(), }), nostr: None, - }; - let db = Arc::new(MockDb::default()); - let node = Arc::new(MockNode::default()); - let rates = Arc::new(DefaultRateCache::default()); - let router = MockRouter::new(settings.network_policy.clone()); - let dns = MockDnsServer::new(); - let provisioner = LNVpsProvisioner::new(settings, db.clone(), node.clone(), rates.clone()); + } + } + async fn add_user(db: &Arc) -> Result<(User, UserSshKey)> { let pubkey: [u8; 32] = random(); let user_id = db.upsert_user(&pubkey).await?; - let new_key = UserSshKey { + let mut new_key = UserSshKey { id: 0, name: "test-key".to_string(), user_id, @@ -471,9 +465,23 @@ mod tests { key_data: "ssh-rsa AAA==".to_string(), }; let ssh_key = db.insert_user_ssh_key(&new_key).await?; + new_key.id = ssh_key; + Ok((db.get_user(user_id).await?, new_key)) + } + #[tokio::test] + async fn basic() -> Result<()> { + let settings = settings(); + let db = Arc::new(MockDb::default()); + let node = Arc::new(MockNode::default()); + let rates = Arc::new(DefaultRateCache::default()); + let router = MockRouter::new(settings.network_policy.clone()); + let dns = MockDnsServer::new(); + let provisioner = LNVpsProvisioner::new(settings, db.clone(), node.clone(), rates.clone()); + + let (user, ssh_key) = add_user(&db).await?; let vm = provisioner - .provision(user_id, 1, 1, ssh_key, Some("mock-ref".to_string())) + .provision(user.id, 1, 1, ssh_key.id, Some("mock-ref".to_string())) .await?; println!("{:?}", vm); provisioner.spawn_vm(vm.id).await?; @@ -527,4 +535,39 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_no_capacity() -> Result<()> { + let settings = settings(); + let db = Arc::new(MockDb::default()); + let node = Arc::new(MockNode::default()); + let rates = Arc::new(DefaultRateCache::default()); + let prov = LNVpsProvisioner::new(settings.clone(), db.clone(), node.clone(), rates.clone()); + + let large_template = VmTemplate { + id: 0, + name: "mock-large-template".to_string(), + enabled: true, + created: Default::default(), + expires: None, + cpu: 64, + memory: 512 * GB, + disk_size: 20 * TB, + disk_type: DiskType::SSD, + disk_interface: DiskInterface::PCIe, + cost_plan_id: 1, + region_id: 1, + }; + let id = db.insert_vm_template(&large_template).await?; + + let (user, ssh_key) = add_user(&db).await?; + + let prov = prov.provision(user.id, id, 1, ssh_key.id, None).await; + assert!(prov.is_err()); + if let Err(e) = prov { + println!("{}", e); + assert!(e.to_string().to_lowercase().contains("no available host")) + } + Ok(()) + } }