diff --git a/lnvps_db/migrations/20250324143556_load_factors.sql b/lnvps_db/migrations/20250324143556_load_factors.sql new file mode 100644 index 0000000..4a54d11 --- /dev/null +++ b/lnvps_db/migrations/20250324143556_load_factors.sql @@ -0,0 +1,5 @@ +-- Add migration script here +alter table vm_host + add column load_memory float not null default 1.0, + add column load_disk float not null default 1.0, + change column load_factor load_cpu float not null default 1.0 \ No newline at end of file diff --git a/lnvps_db/src/model.rs b/lnvps_db/src/model.rs index 5f5d4ae..610de76 100644 --- a/lnvps_db/src/model.rs +++ b/lnvps_db/src/model.rs @@ -80,8 +80,12 @@ pub struct VmHost { pub enabled: bool, /// API token used to control this host via [ip] pub api_token: String, - /// Load factor for provisioning - pub load_factor: f32, + /// CPU load factor for provisioning + pub load_cpu: f32, + /// Memory load factor + pub load_memory: f32, + /// Disk load factor + pub load_disk:f32, } #[derive(FromRow, Clone, Debug, Default)] diff --git a/src/dvm/lnvps.rs b/src/dvm/lnvps.rs index a7e3bd6..1279a32 100644 --- a/src/dvm/lnvps.rs +++ b/src/dvm/lnvps.rs @@ -1,5 +1,6 @@ use crate::dvm::{build_status_for_job, DVMHandler, DVMJobRequest}; use crate::provisioner::LNVpsProvisioner; +use crate::{GB, MB}; use anyhow::Context; use lnvps_db::{ DiskInterface, DiskType, LNVpsDb, OsDistribution, PaymentMethod, UserSshKey, VmCustomTemplate, @@ -12,7 +13,6 @@ use std::future::Future; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; -use crate::{GB, MB}; pub struct LnvpsDvm { client: Client, @@ -57,10 +57,7 @@ impl DVMHandler for LnvpsDvm { .get("ssh_key") .context("missing ssh_key parameter")?; let ssh_key_name = request.params.get("ssh_key_name"); - let os_image = request - .params - .get("os") - .context("missing os parameter")?; + let os_image = request.params.get("os").context("missing os parameter")?; let os_version = request .params .get("os_version") diff --git a/src/dvm/mod.rs b/src/dvm/mod.rs index ccd7bc8..5ec2330 100644 --- a/src/dvm/mod.rs +++ b/src/dvm/mod.rs @@ -247,10 +247,7 @@ fn parse_job_request(event: &Event) -> Result { }) } -pub fn start_dvms( - client: Client, - provisioner: Arc, -) -> JoinHandle<()> { +pub fn start_dvms(client: Client, provisioner: Arc) -> JoinHandle<()> { tokio::spawn(async move { let dvm = LnvpsDvm::new(provisioner, client.clone()); if let Err(e) = listen_for_jobs(client, Kind::from_u16(5999), Box::new(dvm)).await { diff --git a/src/fiat/revolut.rs b/src/fiat/revolut.rs index b2a5c76..be76e3a 100644 --- a/src/fiat/revolut.rs +++ b/src/fiat/revolut.rs @@ -159,7 +159,7 @@ pub struct RevolutOrderPayment { #[serde(skip_serializing_if = "Option::is_none")] pub billing_address: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub risk_level: Option + pub risk_level: Option, } #[derive(Clone, Deserialize, Serialize)] @@ -198,7 +198,7 @@ pub enum RevolutPaymentMethodType { #[serde(rename_all = "snake_case")] pub enum RevolutRiskLevel { High, - Low + Low, } #[derive(Clone, Deserialize, Serialize)] diff --git a/src/host/mod.rs b/src/host/mod.rs index 305b544..3cbcffa 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::pin::Pin; use std::sync::Arc; -use std::sync::atomic::AtomicBool; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Semaphore; @@ -23,7 +22,6 @@ use tokio::sync::Semaphore; mod proxmox; pub struct TerminalStream { - pub shutdown: Arc, pub rx: Receiver>, pub tx: Sender>, } diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index b48268d..ce0ab52 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -668,7 +668,6 @@ impl VmHostClient for ProxmoxClient { let (mut client_tx, client_rx) = channel::>(1024); let (server_tx, mut server_rx) = channel::>(1024); - let shutdown = Arc::new(AtomicBool::new(false)); tokio::spawn(async move { // fire calls to read every 100ms loop { @@ -684,7 +683,6 @@ impl VmHostClient for ProxmoxClient { Ok::<(), anyhow::Error>(()) }); Ok(TerminalStream { - shutdown, rx: client_rx, tx: server_tx, }) diff --git a/src/lib.rs b/src/lib.rs index 9afe08c..7537386 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,4 +28,4 @@ pub const BTC_SATS: f64 = 100_000_000.0; pub const KB: u64 = 1024; pub const MB: u64 = KB * 1024; pub const GB: u64 = MB * 1024; -pub const TB: u64 = GB * 1024; \ No newline at end of file +pub const TB: u64 = GB * 1024; diff --git a/src/lightning/bitvora.rs b/src/lightning/bitvora.rs index c6e9788..e31bc00 100644 --- a/src/lightning/bitvora.rs +++ b/src/lightning/bitvora.rs @@ -68,11 +68,10 @@ impl LightningNode for BitvoraNode { } let r_body = r.body.as_slice(); info!("Received webhook {}", String::from_utf8_lossy(r_body)); - let body: BitvoraWebhook = - match serde_json::from_slice(r_body) { - Ok(b) => b, - Err(e) => return InvoiceUpdate::Error(e.to_string()), - }; + let body: BitvoraWebhook = match serde_json::from_slice(r_body) { + Ok(b) => b, + Err(e) => return InvoiceUpdate::Error(e.to_string()), + }; if let Err(e) = verify_webhook(&secret, &r) { return InvoiceUpdate::Error(e.to_string()); diff --git a/src/mocks.rs b/src/mocks.rs index 48ce9c7..cbcddda 100644 --- a/src/mocks.rs +++ b/src/mocks.rs @@ -130,7 +130,9 @@ impl Default for MockDb { memory: 8 * crate::GB, enabled: true, api_token: "".to_string(), - load_factor: 1.5, + load_cpu: 1.5, + load_memory: 2.0, + load_disk: 3.0, }, ); let mut host_disks = HashMap::new(); diff --git a/src/payments/invoice.rs b/src/payments/invoice.rs index 4d76d13..d3de1e4 100644 --- a/src/payments/invoice.rs +++ b/src/payments/invoice.rs @@ -36,8 +36,14 @@ impl NodeInvoiceHandler { async fn mark_payment_paid(&self, payment: &VmPayment) -> Result<()> { self.db.vm_payment_paid(&payment).await?; - info!("VM payment {} for {}, paid", hex::encode(&payment.id), payment.vm_id); - self.tx.send(WorkJob::CheckVm { vm_id: payment.vm_id })?; + info!( + "VM payment {} for {}, paid", + hex::encode(&payment.id), + payment.vm_id + ); + self.tx.send(WorkJob::CheckVm { + vm_id: payment.vm_id, + })?; Ok(()) } diff --git a/src/provisioner/capacity.rs b/src/provisioner/capacity.rs index cc3ee29..b06072f 100644 --- a/src/provisioner/capacity.rs +++ b/src/provisioner/capacity.rs @@ -2,8 +2,10 @@ use crate::provisioner::Template; use anyhow::{bail, Result}; use chrono::Utc; use futures::future::join_all; +use ipnetwork::{IpNetwork, NetworkSize}; use lnvps_db::{ - DiskInterface, DiskType, LNVpsDb, VmCustomTemplate, VmHost, VmHostDisk, VmTemplate, + DiskInterface, DiskType, IpRange, LNVpsDb, VmCustomTemplate, VmHost, VmHostDisk, + VmIpAssignment, VmTemplate, }; use std::collections::HashMap; use std::sync::Arc; @@ -80,8 +82,25 @@ impl HostCapacityService { disk_interface: Option, ) -> Result { let vms = self.db.list_vms_on_host(host.id).await?; + + // load ip ranges + let ip_ranges = self.db.list_ip_range_in_region(host.region_id).await?; + // TODO: handle very large number of assignments, maybe just count assignments + let ip_range_assigned: Vec = join_all( + ip_ranges + .iter() + .map(|r| self.db.list_vm_ip_assignments_in_range(r.id)), + ) + .await + .into_iter() + .filter_map(|r| r.ok()) + .flatten() + .collect(); + // TODO: filter disks from DB? Should be very few disks anyway let storage = self.db.list_host_disks(host.id).await?; + + // load templates let templates = self.db.list_vm_templates().await?; let custom_templates: Vec> = join_all( vms.iter() @@ -148,7 +167,7 @@ impl HostCapacityService { .filter(|(_k, v)| s.id == v.disk_id) .fold(0, |acc, (_k, v)| acc + v.disk); DiskCapacity { - load_factor: host.load_factor, + load_factor: host.load_disk, disk: s.clone(), usage, } @@ -161,19 +180,40 @@ impl HostCapacityService { let memory_consumed = vm_resources.values().fold(0, |acc, vm| acc + vm.memory); Ok(HostCapacity { - load_factor: host.load_factor, + load_factor: LoadFactors { + cpu: host.load_cpu, + memory: host.load_memory, + disk: host.load_disk, + }, host: host.clone(), cpu: cpu_consumed, memory: memory_consumed, disks: storage_disks, + ranges: ip_ranges + .into_iter() + .map(|r| IPRangeCapacity { + usage: ip_range_assigned + .iter() + .filter(|z| z.ip_range_id == r.id) + .count() as u128, + range: r, + }) + .collect(), }) } } +#[derive(Debug, Clone)] +pub struct LoadFactors { + pub cpu: f32, + pub memory: f32, + pub disk: f32, +} + #[derive(Debug, Clone)] pub struct HostCapacity { /// Load factor applied to resource consumption - pub load_factor: f32, + pub load_factor: LoadFactors, /// The host pub host: VmHost, /// Number of consumed CPU cores @@ -182,6 +222,8 @@ pub struct HostCapacity { pub memory: u64, /// List of disks on the host and its used space pub disks: Vec, + /// List of IP ranges and its usage + pub ranges: Vec, } impl HostCapacity { @@ -192,23 +234,24 @@ impl HostCapacity { /// CPU usage as a percentage pub fn cpu_load(&self) -> f32 { - self.cpu as f32 / (self.host.cpu as f32 * self.load_factor) + self.cpu as f32 / (self.host.cpu as f32 * self.load_factor.cpu) } /// Total number of available CPUs pub fn available_cpu(&self) -> u16 { - let loaded_host_cpu = (self.host.cpu as f32 * self.load_factor).floor() as u16; + let loaded_host_cpu = (self.host.cpu as f32 * self.load_factor.cpu).floor() as u16; loaded_host_cpu.saturating_sub(self.cpu) } /// Memory usage as a percentage pub fn memory_load(&self) -> f32 { - self.memory as f32 / (self.host.memory as f32 * self.load_factor) + self.memory as f32 / (self.host.memory as f32 * self.load_factor.memory) } /// Total available bytes of memory pub fn available_memory(&self) -> u64 { - let loaded_host_memory = (self.host.memory as f64 * self.load_factor as f64).floor() as u64; + let loaded_host_memory = + (self.host.memory as f64 * self.load_factor.memory as f64).floor() as u64; loaded_host_memory.saturating_sub(self.memory) } @@ -225,6 +268,7 @@ impl HostCapacity { .disks .iter() .any(|d| d.available_capacity() >= template.disk_size()) + && self.ranges.iter().any(|r| r.available_capacity() >= 1) } } @@ -251,6 +295,30 @@ impl DiskCapacity { } } +#[derive(Debug, Clone)] +pub struct IPRangeCapacity { + /// IP Range + pub range: IpRange, + /// Number of allocated IPs + pub usage: u128, +} + +impl IPRangeCapacity { + // first/last/gw + const RESERVED: u128 = 3; + + /// Total number of IPs free + pub fn available_capacity(&self) -> u128 { + let net: IpNetwork = self.range.cidr.parse().unwrap(); + + match net.size() { + NetworkSize::V4(s) => (s as u128).saturating_sub(self.usage), + NetworkSize::V6(s) => s.saturating_sub(self.usage), + } + .saturating_sub(Self::RESERVED) + } +} + #[cfg(test)] mod tests { use super::*; @@ -259,7 +327,11 @@ mod tests { #[test] fn loads() { let cap = HostCapacity { - load_factor: 2.0, + load_factor: LoadFactors { + cpu: 2.0, + memory: 3.0, + disk: 4.0, + }, host: VmHost { cpu: 100, memory: 100, @@ -268,23 +340,40 @@ mod tests { cpu: 8, memory: 8, disks: vec![DiskCapacity { - load_factor: 2.0, + load_factor: 4.0, disk: VmHostDisk { size: 100, ..Default::default() }, usage: 8, }], + ranges: vec![IPRangeCapacity { + range: IpRange { + id: 1, + cidr: "10.0.0.0/24".to_string(), + gateway: "10.0.0.1".to_string(), + enabled: true, + region_id: 1, + }, + usage: 69, + }], }; // load factor halves load values 8/100 * (1/load_factor) - assert_eq!(cap.load(), 0.04); - assert_eq!(cap.cpu_load(), 0.04); - assert_eq!(cap.memory_load(), 0.04); - assert_eq!(cap.disk_load(), 0.04); - // load factor doubles memory to 200, 200 - 8 - assert_eq!(cap.available_memory(), 192); + assert_eq!(cap.cpu_load(), 8.0 / 200.0); + assert_eq!(cap.memory_load(), 8.0 / 300.0); + assert_eq!(cap.disk_load(), 8.0 / 400.0); + assert_eq!( + cap.load(), + ((8.0 / 200.0) + (8.0 / 300.0) + (8.0 / 400.0)) / 3.0 + ); + // load factor doubles memory to 300, 300 - 8 + assert_eq!(cap.available_memory(), 292); assert_eq!(cap.available_cpu(), 192); + for r in cap.ranges { + assert_eq!(r.usage, 69); + assert_eq!(r.available_capacity(), 256 - 3 - 69); + } } #[tokio::test] diff --git a/src/ssh_client.rs b/src/ssh_client.rs index 308e996..f98b91f 100644 --- a/src/ssh_client.rs +++ b/src/ssh_client.rs @@ -36,10 +36,7 @@ impl SshClient { pub fn tunnel_unix_socket(&mut self, remote_path: &Path) -> Result { self.session - .channel_direct_streamlocal( - remote_path.to_str().unwrap(), - None, - ) + .channel_direct_streamlocal(remote_path.to_str().unwrap(), None) .map_err(|e| anyhow!(e)) }