refactor: move spawn_vm out of worker into provisioner
feat: spawn vm params to settings
This commit is contained in:
@ -1,6 +1,10 @@
|
||||
use crate::exchange::{Currency, ExchangeRateCache, Ticker};
|
||||
use crate::host::proxmox::ProxmoxClient;
|
||||
use crate::exchange::{ExchangeRateCache, Ticker};
|
||||
use crate::host::proxmox::{
|
||||
ConfigureVm, CreateVm, DownloadUrlRequest, ProxmoxClient, StorageContent, TaskState, VmBios,
|
||||
VmConfig,
|
||||
};
|
||||
use crate::provisioner::Provisioner;
|
||||
use crate::settings::QemuConfig;
|
||||
use anyhow::{bail, Result};
|
||||
use chrono::{Days, Months, Utc};
|
||||
use fedimint_tonic_lnd::lnrpc::Invoice;
|
||||
@ -9,84 +13,92 @@ use fedimint_tonic_lnd::Client;
|
||||
use ipnetwork::IpNetwork;
|
||||
use lnvps_db::hydrate::Hydrate;
|
||||
use lnvps_db::{
|
||||
IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmOsImage, VmPayment,
|
||||
IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmHost, VmHostKind, VmIpAssignment, VmOsImage,
|
||||
VmPayment,
|
||||
};
|
||||
use log::{info, warn};
|
||||
use log::{error, info, warn};
|
||||
use rand::seq::IteratorRandom;
|
||||
use reqwest::Url;
|
||||
use std::collections::HashSet;
|
||||
use std::net::IpAddr;
|
||||
use std::ops::Add;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
pub struct LNVpsProvisioner {
|
||||
db: Box<dyn LNVpsDb>,
|
||||
lnd: Client,
|
||||
rates: ExchangeRateCache,
|
||||
config: QemuConfig,
|
||||
}
|
||||
|
||||
impl LNVpsProvisioner {
|
||||
pub fn new<D: LNVpsDb + 'static>(db: D, lnd: Client, rates: ExchangeRateCache) -> Self {
|
||||
pub fn new(
|
||||
config: QemuConfig,
|
||||
db: impl LNVpsDb + 'static,
|
||||
lnd: Client,
|
||||
rates: ExchangeRateCache,
|
||||
) -> Self {
|
||||
Self {
|
||||
db: Box::new(db),
|
||||
lnd,
|
||||
rates,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
/// Auto-discover resources
|
||||
pub async fn auto_discover(&self) -> Result<()> {
|
||||
let hosts = self.db.list_hosts().await?;
|
||||
for host in hosts {
|
||||
let api = ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token);
|
||||
|
||||
let nodes = api.list_nodes().await?;
|
||||
if let Some(node) = nodes.iter().find(|n| n.name == host.name) {
|
||||
// Update host resources
|
||||
if node.max_cpu.unwrap_or(host.cpu) != host.cpu
|
||||
|| node.max_mem.unwrap_or(host.memory) != host.memory
|
||||
{
|
||||
let mut host = host.clone();
|
||||
host.cpu = node.max_cpu.unwrap_or(host.cpu);
|
||||
host.memory = node.max_mem.unwrap_or(host.memory);
|
||||
info!("Patching host: {:?}", host);
|
||||
self.db.update_host(&host).await?;
|
||||
}
|
||||
// Update disk info
|
||||
let storages = api.list_storage().await?;
|
||||
let host_disks = self.db.list_host_disks(host.id).await?;
|
||||
for storage in storages {
|
||||
let host_storage =
|
||||
if let Some(s) = host_disks.iter().find(|d| d.name == storage.storage) {
|
||||
s
|
||||
} else {
|
||||
warn!("Disk not found: {} on {}", storage.storage, host.name);
|
||||
continue;
|
||||
};
|
||||
|
||||
// TODO: patch host storage info
|
||||
}
|
||||
fn get_host_client(host: &VmHost) -> Result<ProxmoxClient> {
|
||||
Ok(match host.kind {
|
||||
VmHostKind::Proxmox => {
|
||||
ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token)
|
||||
}
|
||||
info!(
|
||||
"Discovering resources from: {} v{}",
|
||||
&host.name,
|
||||
api.version().await?.version
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn map_os_image(image: &VmOsImage) -> PathBuf {
|
||||
PathBuf::from("/var/lib/vz/images/").join(format!(
|
||||
"{:?}_{}_{}.img",
|
||||
image.distribution, image.flavour, image.version
|
||||
))
|
||||
async fn get_iso_storage(node: &str, client: &ProxmoxClient) -> Result<String> {
|
||||
let storages = client.list_storage(node).await?;
|
||||
if let Some(s) = storages
|
||||
.iter()
|
||||
.find(|s| s.contents().contains(&StorageContent::Import))
|
||||
{
|
||||
Ok(s.storage.clone())
|
||||
} else {
|
||||
bail!("No image storage found");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Provisioner for LNVpsProvisioner {
|
||||
async fn init(&self) -> Result<()> {
|
||||
// tell hosts to download images
|
||||
let hosts = self.db.list_hosts().await?;
|
||||
for host in hosts {
|
||||
let client = Self::get_host_client(&host)?;
|
||||
let iso_storage = Self::get_iso_storage(&host.name, &client).await?;
|
||||
let files = client.list_storage_files(&host.name, &iso_storage).await?;
|
||||
|
||||
for image in self.db.list_os_image().await? {
|
||||
info!("Downloading image {} on {}", image.url, host.name);
|
||||
let i_name = image.filename()?;
|
||||
if files.iter().any(|v| v.vol_id.ends_with(&format!("iso/{i_name}"))) {
|
||||
info!("Already downloaded, skipping");
|
||||
continue;
|
||||
}
|
||||
let t_download = client.download_image(DownloadUrlRequest {
|
||||
content: StorageContent::Import,
|
||||
node: host.name.clone(),
|
||||
storage: iso_storage.clone(),
|
||||
url: image.url.clone(),
|
||||
filename: i_name,
|
||||
}).await?;
|
||||
client.wait_for_task(&t_download).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn provision(
|
||||
&self,
|
||||
user_id: u64,
|
||||
@ -138,7 +150,7 @@ impl Provisioner for LNVpsProvisioner {
|
||||
let template = self.db.get_vm_template(vm.template_id).await?;
|
||||
let cost_plan = self.db.get_cost_plan(template.cost_plan_id).await?;
|
||||
|
||||
/// Reuse existing payment until expired
|
||||
// Reuse existing payment until expired
|
||||
let payments = self.db.list_vm_payment(vm.id).await?;
|
||||
if let Some(px) = payments
|
||||
.into_iter()
|
||||
@ -220,12 +232,12 @@ impl Provisioner for LNVpsProvisioner {
|
||||
}
|
||||
|
||||
let mut ret = vec![];
|
||||
/// Try all ranges
|
||||
// Try all ranges
|
||||
// TODO: pick round-robin ranges
|
||||
for range in ip_ranges {
|
||||
let range_cidr: IpNetwork = range.cidr.parse()?;
|
||||
let ips = self.db.list_vm_ip_assignments_in_range(range.id).await?;
|
||||
let ips: HashSet<IpAddr> = ips.iter().map(|i| i.ip.parse().unwrap()).collect();
|
||||
let ips: HashSet<IpAddr> = ips.iter().map_while(|i| i.ip.parse().ok()).collect();
|
||||
|
||||
// pick an IP at random
|
||||
let cidr: Vec<IpAddr> = {
|
||||
@ -253,4 +265,79 @@ impl Provisioner for LNVpsProvisioner {
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn spawn_vm(&self, vm_id: u64) -> Result<()> {
|
||||
if self.config.read_only {
|
||||
bail!("Cant spawn VM's in read-only mode");
|
||||
}
|
||||
let vm = self.db.get_vm(vm_id).await?;
|
||||
let host = self.db.get_host(vm.host_id).await?;
|
||||
let client = Self::get_host_client(&host)?;
|
||||
|
||||
let mut ips = self.db.list_vm_ip_assignments(vm.id).await?;
|
||||
if ips.is_empty() {
|
||||
ips = self.allocate_ips(vm.id).await?;
|
||||
}
|
||||
|
||||
let mut ip_config = ips
|
||||
.iter()
|
||||
.map_while(|ip| {
|
||||
if let Ok(net) = ip.ip.parse::<IpNetwork>() {
|
||||
Some(match net {
|
||||
IpNetwork::V4(addr) => format!("ip={}", addr),
|
||||
IpNetwork::V6(addr) => format!("ip6={}", addr),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
ip_config.push("ip6=auto".to_string());
|
||||
|
||||
let drives = self.db.list_host_disks(vm.host_id).await?;
|
||||
let drive = if let Some(d) = drives.iter().find(|d| d.enabled) {
|
||||
d
|
||||
} else {
|
||||
bail!("No host drive found!")
|
||||
};
|
||||
|
||||
let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?;
|
||||
|
||||
let mut net = vec![
|
||||
"virtio".to_string(),
|
||||
format!("bridge={}", self.config.bridge),
|
||||
];
|
||||
if let Some(t) = self.config.vlan {
|
||||
net.push(format!("tag={}", t));
|
||||
}
|
||||
|
||||
// create VM
|
||||
let t_create = client
|
||||
.create_vm(CreateVm {
|
||||
node: host.name.clone(),
|
||||
vm_id: (vm.id + 100) as i32,
|
||||
config: VmConfig {
|
||||
on_boot: Some(true),
|
||||
bios: Some(VmBios::OVMF),
|
||||
boot: Some("order=scsi0".to_string()),
|
||||
cores: Some(vm.cpu as i32),
|
||||
cpu: Some(self.config.cpu.clone()),
|
||||
ip_config: Some(ip_config.join(",")),
|
||||
machine: Some(self.config.machine.clone()),
|
||||
memory: Some((vm.memory / 1024 / 1024).to_string()),
|
||||
net: Some(net.join(",")),
|
||||
os_type: Some(self.config.os_type.clone()),
|
||||
scsi_1: Some(format!("{}:cloudinit", &drive.name)),
|
||||
scsi_hw: Some("virtio-scsi-pci".to_string()),
|
||||
ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()),
|
||||
efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)),
|
||||
..Default::default()
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
client.wait_for_task(&t_create).await?;
|
||||
|
||||
// TODO: Find a way to automate importing disk
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user