feat: filter templates by available capacity
This commit is contained in:
@ -4,7 +4,7 @@ use crate::api::model::{
|
||||
};
|
||||
use crate::host::{get_host_client, FullVmInfo};
|
||||
use crate::nip98::Nip98Auth;
|
||||
use crate::provisioner::LNVpsProvisioner;
|
||||
use crate::provisioner::{HostCapacityService, LNVpsProvisioner};
|
||||
use crate::settings::Settings;
|
||||
use crate::status::{VmState, VmStateCache};
|
||||
use crate::worker::WorkJob;
|
||||
@ -231,8 +231,8 @@ async fn v1_patch_vm(
|
||||
let mut ips = db.list_vm_ip_assignments(vm.id).await?;
|
||||
for mut ip in ips.iter_mut() {
|
||||
ip.dns_reverse = Some(ptr.to_string());
|
||||
provisioner.update_reverse_ip_dns(&mut ip).await?;
|
||||
db.update_vm_ip_assignment(&ip).await?;
|
||||
provisioner.update_reverse_ip_dns(ip).await?;
|
||||
db.update_vm_ip_assignment(ip).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@ -264,7 +264,8 @@ async fn v1_list_vm_images(db: &State<Arc<dyn LNVpsDb>>) -> ApiResult<Vec<ApiVmO
|
||||
#[openapi(tag = "Template")]
|
||||
#[get("/api/v1/vm/templates")]
|
||||
async fn v1_list_vm_templates(db: &State<Arc<dyn LNVpsDb>>) -> ApiResult<Vec<ApiVmTemplate>> {
|
||||
let templates = db.list_vm_templates().await?;
|
||||
let hc = HostCapacityService::new((*db).clone());
|
||||
let templates = hc.list_available_vm_templates().await?;
|
||||
|
||||
let cost_plans: HashSet<u64> = templates.iter().map(|t| t.cost_plan_id).collect();
|
||||
let regions: HashSet<u64> = templates.iter().map(|t| t.region_id).collect();
|
||||
@ -294,7 +295,6 @@ async fn v1_list_vm_templates(db: &State<Arc<dyn LNVpsDb>>) -> ApiResult<Vec<Api
|
||||
|
||||
let ret = templates
|
||||
.into_iter()
|
||||
.filter(|v| v.enabled)
|
||||
.filter_map(|i| {
|
||||
let cp = cost_plans.get(&i.cost_plan_id)?;
|
||||
let hr = regions.get(&i.region_id)?;
|
||||
@ -361,7 +361,7 @@ async fn v1_create_vm_order(
|
||||
auth: Nip98Auth,
|
||||
db: &State<Arc<dyn LNVpsDb>>,
|
||||
provisioner: &State<Arc<LNVpsProvisioner>>,
|
||||
req: Json<CreateVmRequest>
|
||||
req: Json<CreateVmRequest>,
|
||||
) -> ApiResult<ApiVmStatus> {
|
||||
let pubkey = auth.event.pubkey.to_bytes();
|
||||
let uid = db.upsert_user(&pubkey).await?;
|
||||
|
@ -1,15 +1,13 @@
|
||||
use log::warn;
|
||||
use reqwest::header::HeaderMap;
|
||||
use rocket::data::{FromData, ToByteUnit};
|
||||
use rocket::http::Status;
|
||||
use rocket::{post, routes, Data, Route};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::LazyLock;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// Messaging bridge for webhooks to other parts of the system (bitvora)
|
||||
pub static WEBHOOK_BRIDGE: LazyLock<WebhookBridge> = LazyLock::new(|| WebhookBridge::new());
|
||||
pub static WEBHOOK_BRIDGE: LazyLock<WebhookBridge> = LazyLock::new(WebhookBridge::new);
|
||||
|
||||
pub fn routes() -> Vec<Route> {
|
||||
if cfg!(feature = "bitvora") {
|
||||
@ -61,6 +59,12 @@ pub struct WebhookBridge {
|
||||
tx: broadcast::Sender<WebhookMessage>,
|
||||
}
|
||||
|
||||
impl Default for WebhookBridge {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl WebhookBridge {
|
||||
pub fn new() -> Self {
|
||||
let (tx, _rx) = broadcast::channel(100);
|
||||
|
@ -5,9 +5,8 @@ use crate::ssh_client::SshClient;
|
||||
use crate::status::{VmRunningState, VmState};
|
||||
use anyhow::{anyhow, bail, ensure, Result};
|
||||
use chrono::Utc;
|
||||
use futures::future::join_all;
|
||||
use ipnetwork::IpNetwork;
|
||||
use lnvps_db::{async_trait, DiskType, IpRange, LNVpsDb, Vm, VmIpAssignment, VmOsImage};
|
||||
use lnvps_db::{async_trait, DiskType, Vm, VmOsImage};
|
||||
use log::{info, warn};
|
||||
use rand::random;
|
||||
use reqwest::header::{HeaderMap, AUTHORIZATION};
|
||||
@ -469,7 +468,7 @@ impl VmHostClient for ProxmoxClient {
|
||||
}
|
||||
|
||||
async fn create_vm(&self, req: &FullVmInfo) -> Result<()> {
|
||||
let config = self.make_config(&req)?;
|
||||
let config = self.make_config(req)?;
|
||||
let vm_id = req.vm.id.into();
|
||||
let t_create = self
|
||||
.create_vm(CreateVm {
|
||||
@ -481,7 +480,7 @@ impl VmHostClient for ProxmoxClient {
|
||||
self.wait_for_task(&t_create).await?;
|
||||
|
||||
// import primary disk from image (scsi0)
|
||||
if let Err(e) = self
|
||||
self
|
||||
.import_disk_image(ImportDiskImageRequest {
|
||||
vm_id,
|
||||
node: self.node.clone(),
|
||||
@ -490,11 +489,7 @@ impl VmHostClient for ProxmoxClient {
|
||||
image: req.image.filename()?,
|
||||
is_ssd: matches!(req.disk.kind, DiskType::SSD),
|
||||
})
|
||||
.await
|
||||
{
|
||||
// TODO: rollback
|
||||
return Err(e);
|
||||
}
|
||||
.await?;
|
||||
|
||||
// resize disk to match template
|
||||
let j_resize = self
|
||||
@ -537,7 +532,7 @@ impl VmHostClient for ProxmoxClient {
|
||||
}
|
||||
|
||||
async fn configure_vm(&self, cfg: &FullVmInfo) -> Result<()> {
|
||||
let mut config = self.make_config(&cfg)?;
|
||||
let mut config = self.make_config(cfg)?;
|
||||
|
||||
// dont re-create the disks
|
||||
config.scsi_0 = None;
|
||||
@ -560,9 +555,9 @@ impl VmHostClient for ProxmoxClient {
|
||||
#[derive(Debug, Copy, Clone, Default)]
|
||||
pub struct ProxmoxVmId(u64);
|
||||
|
||||
impl Into<i32> for ProxmoxVmId {
|
||||
fn into(self) -> i32 {
|
||||
self.0 as i32 + 100
|
||||
impl From<ProxmoxVmId> for i32 {
|
||||
fn from(val: ProxmoxVmId) -> Self {
|
||||
val.0 as i32 + 100
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ use lnvps_db::{DiskType, LNVpsDb, VmHost, VmHostDisk, VmTemplate};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Simple capacity reporting per node
|
||||
/// Simple capacity management
|
||||
#[derive(Clone)]
|
||||
pub struct HostCapacityService {
|
||||
/// Database
|
||||
@ -16,6 +16,27 @@ impl HostCapacityService {
|
||||
Self { db }
|
||||
}
|
||||
|
||||
/// List templates which can be sold, based on available capacity
|
||||
pub async fn list_available_vm_templates(&self) -> Result<Vec<VmTemplate>> {
|
||||
let templates = self.db.list_vm_templates().await?;
|
||||
|
||||
// TODO: list hosts in regions where templates are active?
|
||||
// use all hosts since we dont expect there to be many
|
||||
let hosts = self.db.list_hosts().await?;
|
||||
let caps: Vec<Result<HostCapacity>> =
|
||||
join_all(hosts.iter().map(|h| self.get_host_capacity(h, None))).await;
|
||||
let caps: Vec<HostCapacity> = caps.into_iter().filter_map(Result::ok).collect();
|
||||
|
||||
Ok(templates
|
||||
.into_iter()
|
||||
.filter(|t| {
|
||||
caps.iter()
|
||||
.filter(|c| c.host.region_id == t.region_id)
|
||||
.any(|c| c.can_accommodate(t))
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Pick a host for the purposes of provisioning a new VM
|
||||
pub async fn get_host_for_template(&self, template: &VmTemplate) -> Result<HostCapacity> {
|
||||
let hosts = self.db.list_hosts().await?;
|
||||
@ -30,13 +51,7 @@ impl HostCapacityService {
|
||||
let mut host_cap: Vec<HostCapacity> = caps
|
||||
.into_iter()
|
||||
.filter_map(|v| v.ok())
|
||||
.filter(|v| {
|
||||
v.available_cpu() >= template.cpu
|
||||
&& v.available_memory() >= template.memory
|
||||
&& v.disks
|
||||
.iter()
|
||||
.any(|d| d.available_capacity() >= template.disk_size)
|
||||
})
|
||||
.filter(|v| v.can_accommodate(template))
|
||||
.collect();
|
||||
|
||||
host_cap.sort_by(|a, b| a.load().partial_cmp(&b.load()).unwrap());
|
||||
@ -65,8 +80,7 @@ impl HostCapacityService {
|
||||
.filter_map(|v| {
|
||||
templates
|
||||
.iter()
|
||||
.find(|t| t.id == v.template_id)
|
||||
.and_then(|t| Some((v.id, t)))
|
||||
.find(|t| t.id == v.template_id).map(|t| (v.id, t))
|
||||
})
|
||||
.collect();
|
||||
|
||||
@ -147,6 +161,16 @@ impl HostCapacity {
|
||||
pub fn disk_load(&self) -> f32 {
|
||||
self.disks.iter().fold(0.0, |acc, disk| acc + disk.load()) / self.disks.len() as f32
|
||||
}
|
||||
|
||||
/// Can this host and its available capacity accommodate the given template
|
||||
pub fn can_accommodate(&self, template: &VmTemplate) -> bool {
|
||||
self.available_cpu() >= template.cpu
|
||||
&& self.available_memory() >= template.memory
|
||||
&& self
|
||||
.disks
|
||||
.iter()
|
||||
.any(|d| d.available_capacity() >= template.disk_size)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@ -230,6 +254,10 @@ mod tests {
|
||||
let host = hc.get_host_for_template(&template).await?;
|
||||
assert_eq!(host.host.id, 1);
|
||||
|
||||
// all templates should be available
|
||||
let templates = hc.list_available_vm_templates().await?;
|
||||
assert_eq!(templates.len(), db.list_vm_templates().await?.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -3,21 +3,16 @@ use crate::exchange::{ExchangeRateService, Ticker};
|
||||
use crate::host::{get_host_client, FullVmInfo};
|
||||
use crate::lightning::{AddInvoiceRequest, LightningNode};
|
||||
use crate::provisioner::{
|
||||
HostCapacity, HostCapacityService, NetworkProvisioner, ProvisionerMethod,
|
||||
HostCapacityService, NetworkProvisioner, ProvisionerMethod,
|
||||
};
|
||||
use crate::router::{ArpEntry, Router};
|
||||
use crate::settings::{NetworkAccessPolicy, NetworkPolicy, ProvisionerConfig, Settings};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use chrono::{Days, Months, Utc};
|
||||
use futures::future::join_all;
|
||||
use lnvps_db::{IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment};
|
||||
use lnvps_db::{LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment};
|
||||
use log::{info, warn};
|
||||
use nostr::util::hex;
|
||||
use rand::random;
|
||||
use std::collections::HashSet;
|
||||
use std::net::IpAddr;
|
||||
use std::ops::Add;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -63,7 +58,7 @@ impl LNVpsProvisioner {
|
||||
if let NetworkAccessPolicy::StaticArp { interface } = &self.network_policy.access {
|
||||
if let Some(r) = self.router.as_ref() {
|
||||
let vm = self.db.get_vm(assignment.vm_id).await?;
|
||||
let entry = ArpEntry::new(&vm, &assignment, Some(interface.clone()))?;
|
||||
let entry = ArpEntry::new(&vm, assignment, Some(interface.clone()))?;
|
||||
let arp = if let Some(_id) = &assignment.arp_ref {
|
||||
r.update_arp_entry(&entry).await?
|
||||
} else {
|
||||
|
@ -7,7 +7,6 @@ use log::debug;
|
||||
use reqwest::Method;
|
||||
use rocket::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::IpAddr;
|
||||
|
||||
pub struct MikrotikRouter {
|
||||
api: JsonApi,
|
||||
@ -78,26 +77,26 @@ pub struct MikrotikArpEntry {
|
||||
pub comment: Option<String>,
|
||||
}
|
||||
|
||||
impl Into<ArpEntry> for MikrotikArpEntry {
|
||||
fn into(self) -> ArpEntry {
|
||||
impl From<MikrotikArpEntry> for ArpEntry {
|
||||
fn from(val: MikrotikArpEntry) -> Self {
|
||||
ArpEntry {
|
||||
id: self.id,
|
||||
address: self.address,
|
||||
mac_address: self.mac_address.unwrap(),
|
||||
interface: Some(self.interface),
|
||||
comment: self.comment,
|
||||
id: val.id,
|
||||
address: val.address,
|
||||
mac_address: val.mac_address.unwrap(),
|
||||
interface: Some(val.interface),
|
||||
comment: val.comment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<MikrotikArpEntry> for ArpEntry {
|
||||
fn into(self) -> MikrotikArpEntry {
|
||||
impl From<ArpEntry> for MikrotikArpEntry {
|
||||
fn from(val: ArpEntry) -> Self {
|
||||
MikrotikArpEntry {
|
||||
id: self.id,
|
||||
address: self.address,
|
||||
mac_address: Some(self.mac_address),
|
||||
interface: self.interface.unwrap(),
|
||||
comment: self.comment,
|
||||
id: val.id,
|
||||
address: val.address,
|
||||
mac_address: Some(val.mac_address),
|
||||
interface: val.interface.unwrap(),
|
||||
comment: val.comment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -146,9 +146,9 @@ impl Worker {
|
||||
let host = self.db.get_host(vm.host_id).await?;
|
||||
let client = get_host_client(&host, &self.settings.provisioner_config)?;
|
||||
|
||||
match client.get_vm_state(&vm).await {
|
||||
match client.get_vm_state(vm).await {
|
||||
Ok(s) => {
|
||||
self.handle_vm_state(&vm, &s).await?;
|
||||
self.handle_vm_state(vm, &s).await?;
|
||||
self.vm_state_cache.set_state(vm.id, s).await?;
|
||||
}
|
||||
Err(_) => {
|
||||
@ -191,7 +191,7 @@ impl Worker {
|
||||
let db_vms = self.db.list_vms().await?;
|
||||
for vm in &db_vms {
|
||||
// Refresh VM status if active
|
||||
self.check_vm(&vm).await?;
|
||||
self.check_vm(vm).await?;
|
||||
|
||||
// delete vm if not paid (in new state)
|
||||
if vm.expires < Utc::now().sub(Days::new(1)) {
|
||||
|
Reference in New Issue
Block a user