feat: extend host load factors
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-03-24 15:18:18 +00:00
parent af36d4e586
commit ec7fa92010
13 changed files with 138 additions and 46 deletions

View File

@ -0,0 +1,5 @@
-- Add migration script here
alter table vm_host
add column load_memory float not null default 1.0,
add column load_disk float not null default 1.0,
change column load_factor load_cpu float not null default 1.0

View File

@ -80,8 +80,12 @@ pub struct VmHost {
pub enabled: bool,
/// API token used to control this host via [ip]
pub api_token: String,
/// Load factor for provisioning
pub load_factor: f32,
/// CPU load factor for provisioning
pub load_cpu: f32,
/// Memory load factor
pub load_memory: f32,
/// Disk load factor
pub load_disk:f32,
}
#[derive(FromRow, Clone, Debug, Default)]

View File

@ -1,5 +1,6 @@
use crate::dvm::{build_status_for_job, DVMHandler, DVMJobRequest};
use crate::provisioner::LNVpsProvisioner;
use crate::{GB, MB};
use anyhow::Context;
use lnvps_db::{
DiskInterface, DiskType, LNVpsDb, OsDistribution, PaymentMethod, UserSshKey, VmCustomTemplate,
@ -12,7 +13,6 @@ use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use crate::{GB, MB};
pub struct LnvpsDvm {
client: Client,
@ -57,10 +57,7 @@ impl DVMHandler for LnvpsDvm {
.get("ssh_key")
.context("missing ssh_key parameter")?;
let ssh_key_name = request.params.get("ssh_key_name");
let os_image = request
.params
.get("os")
.context("missing os parameter")?;
let os_image = request.params.get("os").context("missing os parameter")?;
let os_version = request
.params
.get("os_version")

View File

@ -247,10 +247,7 @@ fn parse_job_request(event: &Event) -> Result<DVMJobRequest> {
})
}
pub fn start_dvms(
client: Client,
provisioner: Arc<LNVpsProvisioner>,
) -> JoinHandle<()> {
pub fn start_dvms(client: Client, provisioner: Arc<LNVpsProvisioner>) -> JoinHandle<()> {
tokio::spawn(async move {
let dvm = LnvpsDvm::new(provisioner, client.clone());
if let Err(e) = listen_for_jobs(client, Kind::from_u16(5999), Box::new(dvm)).await {

View File

@ -159,7 +159,7 @@ pub struct RevolutOrderPayment {
#[serde(skip_serializing_if = "Option::is_none")]
pub billing_address: Option<RevolutBillingAddress>,
#[serde(skip_serializing_if = "Option::is_none")]
pub risk_level: Option<RevolutRiskLevel>
pub risk_level: Option<RevolutRiskLevel>,
}
#[derive(Clone, Deserialize, Serialize)]
@ -198,7 +198,7 @@ pub enum RevolutPaymentMethodType {
#[serde(rename_all = "snake_case")]
pub enum RevolutRiskLevel {
High,
Low
Low,
}
#[derive(Clone, Deserialize, Serialize)]

View File

@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Semaphore;
@ -23,7 +22,6 @@ use tokio::sync::Semaphore;
mod proxmox;
pub struct TerminalStream {
pub shutdown: Arc<AtomicBool>,
pub rx: Receiver<Vec<u8>>,
pub tx: Sender<Vec<u8>>,
}

View File

@ -668,7 +668,6 @@ impl VmHostClient for ProxmoxClient {
let (mut client_tx, client_rx) = channel::<Vec<u8>>(1024);
let (server_tx, mut server_rx) = channel::<Vec<u8>>(1024);
let shutdown = Arc::new(AtomicBool::new(false));
tokio::spawn(async move {
// fire calls to read every 100ms
loop {
@ -684,7 +683,6 @@ impl VmHostClient for ProxmoxClient {
Ok::<(), anyhow::Error>(())
});
Ok(TerminalStream {
shutdown,
rx: client_rx,
tx: server_tx,
})

View File

@ -28,4 +28,4 @@ pub const BTC_SATS: f64 = 100_000_000.0;
pub const KB: u64 = 1024;
pub const MB: u64 = KB * 1024;
pub const GB: u64 = MB * 1024;
pub const TB: u64 = GB * 1024;
pub const TB: u64 = GB * 1024;

View File

@ -68,11 +68,10 @@ impl LightningNode for BitvoraNode {
}
let r_body = r.body.as_slice();
info!("Received webhook {}", String::from_utf8_lossy(r_body));
let body: BitvoraWebhook =
match serde_json::from_slice(r_body) {
Ok(b) => b,
Err(e) => return InvoiceUpdate::Error(e.to_string()),
};
let body: BitvoraWebhook = match serde_json::from_slice(r_body) {
Ok(b) => b,
Err(e) => return InvoiceUpdate::Error(e.to_string()),
};
if let Err(e) = verify_webhook(&secret, &r) {
return InvoiceUpdate::Error(e.to_string());

View File

@ -130,7 +130,9 @@ impl Default for MockDb {
memory: 8 * crate::GB,
enabled: true,
api_token: "".to_string(),
load_factor: 1.5,
load_cpu: 1.5,
load_memory: 2.0,
load_disk: 3.0,
},
);
let mut host_disks = HashMap::new();

View File

@ -36,8 +36,14 @@ impl NodeInvoiceHandler {
async fn mark_payment_paid(&self, payment: &VmPayment) -> Result<()> {
self.db.vm_payment_paid(&payment).await?;
info!("VM payment {} for {}, paid", hex::encode(&payment.id), payment.vm_id);
self.tx.send(WorkJob::CheckVm { vm_id: payment.vm_id })?;
info!(
"VM payment {} for {}, paid",
hex::encode(&payment.id),
payment.vm_id
);
self.tx.send(WorkJob::CheckVm {
vm_id: payment.vm_id,
})?;
Ok(())
}

View File

@ -2,8 +2,10 @@ use crate::provisioner::Template;
use anyhow::{bail, Result};
use chrono::Utc;
use futures::future::join_all;
use ipnetwork::{IpNetwork, NetworkSize};
use lnvps_db::{
DiskInterface, DiskType, LNVpsDb, VmCustomTemplate, VmHost, VmHostDisk, VmTemplate,
DiskInterface, DiskType, IpRange, LNVpsDb, VmCustomTemplate, VmHost, VmHostDisk,
VmIpAssignment, VmTemplate,
};
use std::collections::HashMap;
use std::sync::Arc;
@ -80,8 +82,25 @@ impl HostCapacityService {
disk_interface: Option<DiskInterface>,
) -> Result<HostCapacity> {
let vms = self.db.list_vms_on_host(host.id).await?;
// load ip ranges
let ip_ranges = self.db.list_ip_range_in_region(host.region_id).await?;
// TODO: handle very large number of assignments, maybe just count assignments
let ip_range_assigned: Vec<VmIpAssignment> = join_all(
ip_ranges
.iter()
.map(|r| self.db.list_vm_ip_assignments_in_range(r.id)),
)
.await
.into_iter()
.filter_map(|r| r.ok())
.flatten()
.collect();
// TODO: filter disks from DB? Should be very few disks anyway
let storage = self.db.list_host_disks(host.id).await?;
// load templates
let templates = self.db.list_vm_templates().await?;
let custom_templates: Vec<Result<VmCustomTemplate>> = join_all(
vms.iter()
@ -148,7 +167,7 @@ impl HostCapacityService {
.filter(|(_k, v)| s.id == v.disk_id)
.fold(0, |acc, (_k, v)| acc + v.disk);
DiskCapacity {
load_factor: host.load_factor,
load_factor: host.load_disk,
disk: s.clone(),
usage,
}
@ -161,19 +180,40 @@ impl HostCapacityService {
let memory_consumed = vm_resources.values().fold(0, |acc, vm| acc + vm.memory);
Ok(HostCapacity {
load_factor: host.load_factor,
load_factor: LoadFactors {
cpu: host.load_cpu,
memory: host.load_memory,
disk: host.load_disk,
},
host: host.clone(),
cpu: cpu_consumed,
memory: memory_consumed,
disks: storage_disks,
ranges: ip_ranges
.into_iter()
.map(|r| IPRangeCapacity {
usage: ip_range_assigned
.iter()
.filter(|z| z.ip_range_id == r.id)
.count() as u128,
range: r,
})
.collect(),
})
}
}
#[derive(Debug, Clone)]
pub struct LoadFactors {
pub cpu: f32,
pub memory: f32,
pub disk: f32,
}
#[derive(Debug, Clone)]
pub struct HostCapacity {
/// Load factor applied to resource consumption
pub load_factor: f32,
pub load_factor: LoadFactors,
/// The host
pub host: VmHost,
/// Number of consumed CPU cores
@ -182,6 +222,8 @@ pub struct HostCapacity {
pub memory: u64,
/// List of disks on the host and its used space
pub disks: Vec<DiskCapacity>,
/// List of IP ranges and its usage
pub ranges: Vec<IPRangeCapacity>,
}
impl HostCapacity {
@ -192,23 +234,24 @@ impl HostCapacity {
/// CPU usage as a percentage
pub fn cpu_load(&self) -> f32 {
self.cpu as f32 / (self.host.cpu as f32 * self.load_factor)
self.cpu as f32 / (self.host.cpu as f32 * self.load_factor.cpu)
}
/// Total number of available CPUs
pub fn available_cpu(&self) -> u16 {
let loaded_host_cpu = (self.host.cpu as f32 * self.load_factor).floor() as u16;
let loaded_host_cpu = (self.host.cpu as f32 * self.load_factor.cpu).floor() as u16;
loaded_host_cpu.saturating_sub(self.cpu)
}
/// Memory usage as a percentage
pub fn memory_load(&self) -> f32 {
self.memory as f32 / (self.host.memory as f32 * self.load_factor)
self.memory as f32 / (self.host.memory as f32 * self.load_factor.memory)
}
/// Total available bytes of memory
pub fn available_memory(&self) -> u64 {
let loaded_host_memory = (self.host.memory as f64 * self.load_factor as f64).floor() as u64;
let loaded_host_memory =
(self.host.memory as f64 * self.load_factor.memory as f64).floor() as u64;
loaded_host_memory.saturating_sub(self.memory)
}
@ -225,6 +268,7 @@ impl HostCapacity {
.disks
.iter()
.any(|d| d.available_capacity() >= template.disk_size())
&& self.ranges.iter().any(|r| r.available_capacity() >= 1)
}
}
@ -251,6 +295,30 @@ impl DiskCapacity {
}
}
#[derive(Debug, Clone)]
pub struct IPRangeCapacity {
/// IP Range
pub range: IpRange,
/// Number of allocated IPs
pub usage: u128,
}
impl IPRangeCapacity {
// first/last/gw
const RESERVED: u128 = 3;
/// Total number of IPs free
pub fn available_capacity(&self) -> u128 {
let net: IpNetwork = self.range.cidr.parse().unwrap();
match net.size() {
NetworkSize::V4(s) => (s as u128).saturating_sub(self.usage),
NetworkSize::V6(s) => s.saturating_sub(self.usage),
}
.saturating_sub(Self::RESERVED)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -259,7 +327,11 @@ mod tests {
#[test]
fn loads() {
let cap = HostCapacity {
load_factor: 2.0,
load_factor: LoadFactors {
cpu: 2.0,
memory: 3.0,
disk: 4.0,
},
host: VmHost {
cpu: 100,
memory: 100,
@ -268,23 +340,40 @@ mod tests {
cpu: 8,
memory: 8,
disks: vec![DiskCapacity {
load_factor: 2.0,
load_factor: 4.0,
disk: VmHostDisk {
size: 100,
..Default::default()
},
usage: 8,
}],
ranges: vec![IPRangeCapacity {
range: IpRange {
id: 1,
cidr: "10.0.0.0/24".to_string(),
gateway: "10.0.0.1".to_string(),
enabled: true,
region_id: 1,
},
usage: 69,
}],
};
// load factor halves load values 8/100 * (1/load_factor)
assert_eq!(cap.load(), 0.04);
assert_eq!(cap.cpu_load(), 0.04);
assert_eq!(cap.memory_load(), 0.04);
assert_eq!(cap.disk_load(), 0.04);
// load factor doubles memory to 200, 200 - 8
assert_eq!(cap.available_memory(), 192);
assert_eq!(cap.cpu_load(), 8.0 / 200.0);
assert_eq!(cap.memory_load(), 8.0 / 300.0);
assert_eq!(cap.disk_load(), 8.0 / 400.0);
assert_eq!(
cap.load(),
((8.0 / 200.0) + (8.0 / 300.0) + (8.0 / 400.0)) / 3.0
);
// load factor doubles memory to 300, 300 - 8
assert_eq!(cap.available_memory(), 292);
assert_eq!(cap.available_cpu(), 192);
for r in cap.ranges {
assert_eq!(r.usage, 69);
assert_eq!(r.available_capacity(), 256 - 3 - 69);
}
}
#[tokio::test]

View File

@ -36,10 +36,7 @@ impl SshClient {
pub fn tunnel_unix_socket(&mut self, remote_path: &Path) -> Result<Channel> {
self.session
.channel_direct_streamlocal(
remote_path.to_str().unwrap(),
None,
)
.channel_direct_streamlocal(remote_path.to_str().unwrap(), None)
.map_err(|e| anyhow!(e))
}