From d28ca286fb69b33dda7b7a69d7e125c15c183514 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 27 Nov 2024 14:38:23 +0000 Subject: [PATCH] refactor: move spawn_vm out of worker into provisioner feat: spawn vm params to settings --- Cargo.lock | 1 + config.yaml | 12 ++- lnvps_db/Cargo.lock | 1 + lnvps_db/Cargo.toml | 3 +- lnvps_db/src/hydrate.rs | 19 ++-- lnvps_db/src/model.rs | 18 ++++ src/api.rs | 26 ++--- src/bin/api.rs | 37 +++---- src/exchange.rs | 12 ++- src/host/proxmox.rs | 211 +++++++++++++++++++++++++++++++++++++-- src/lib.rs | 3 +- src/provisioner/lnvps.rs | 193 +++++++++++++++++++++++++---------- src/provisioner/mod.rs | 6 ++ src/settings.rs | 61 +++++++++++ src/worker.rs | 90 ++--------------- 15 files changed, 491 insertions(+), 202 deletions(-) create mode 100644 src/settings.rs diff --git a/Cargo.lock b/Cargo.lock index a1fe2ec..13a3e0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1808,6 +1808,7 @@ dependencies = [ "serde", "serde_with", "sqlx", + "url", ] [[package]] diff --git a/config.yaml b/config.yaml index b7f3ce3..16d23bb 100644 --- a/config.yaml +++ b/config.yaml @@ -1,7 +1,15 @@ # MySQL database connection string db: "mysql://root:root@localhost:3376/lnvps" -read_only: true lnd: url: "https://127.0.0.1:10003" cert: "/home/kieran/.polar/networks/2/volumes/lnd/alice/tls.cert" - macaroon: "/home/kieran/.polar/networks/2/volumes/lnd/alice/data/chain/bitcoin/regtest/admin.macaroon" \ No newline at end of file + macaroon: "/home/kieran/.polar/networks/2/volumes/lnd/alice/data/chain/bitcoin/regtest/admin.macaroon" +provisioner: + proxmox: + read_only: false + bios: "ovmf" + machine: "q35" + os_type: "l26" + bridge: "vmbr0" + cpu: "kvm64" + vlan: 100 \ No newline at end of file diff --git a/lnvps_db/Cargo.lock b/lnvps_db/Cargo.lock index 5ac9848..9d814f2 100644 --- a/lnvps_db/Cargo.lock +++ b/lnvps_db/Cargo.lock @@ -843,6 +843,7 @@ dependencies = [ "serde", "serde_with", "sqlx", + "url", ] [[package]] diff --git a/lnvps_db/Cargo.toml b/lnvps_db/Cargo.toml index 4a30af9..77669df 100644 --- a/lnvps_db/Cargo.toml +++ b/lnvps_db/Cargo.toml @@ -13,4 +13,5 @@ sqlx = { version = "0.8.2", features = ["chrono", "migrate", "runtime-tokio"] } serde = { version = "1.0.213", features = ["derive"] } serde_with = { version = "3.11.0", features = ["macros", "hex"] } chrono = { version = "0.4.38", features = ["serde"] } -async-trait = "0.1.83" \ No newline at end of file +async-trait = "0.1.83" +url = "2.5.4" \ No newline at end of file diff --git a/lnvps_db/src/hydrate.rs b/lnvps_db/src/hydrate.rs index 62e87cc..37d574e 100644 --- a/lnvps_db/src/hydrate.rs +++ b/lnvps_db/src/hydrate.rs @@ -1,19 +1,20 @@ use crate::{LNVpsDb, Vm, VmTemplate}; use anyhow::Result; use async_trait::async_trait; +use std::ops::Deref; #[async_trait] -pub trait Hydrate { +pub trait Hydrate { /// Load parent resources - async fn hydrate_up(&mut self, db: &Box) -> Result<()>; + async fn hydrate_up(&mut self, db: &D) -> Result<()>; /// Load child resources - async fn hydrate_down(&mut self, db: &Box) -> Result<()>; + async fn hydrate_down(&mut self, db: &D) -> Result<()>; } #[async_trait] -impl Hydrate for Vm { - async fn hydrate_up(&mut self, db: &Box) -> Result<()> { +impl + Sync> Hydrate for Vm { + async fn hydrate_up(&mut self, db: &D) -> Result<()> { let image = db.get_os_image(self.image_id).await?; let template = db.get_vm_template(self.template_id).await?; let ssh_key = db.get_user_ssh_key(self.ssh_key_id).await?; @@ -24,7 +25,7 @@ impl Hydrate for Vm { Ok(()) } - async fn hydrate_down(&mut self, db: &Box) -> Result<()> { + async fn hydrate_down(&mut self, db: &D) -> Result<()> { //let payments = db.list_vm_payment(self.id).await?; let ips = db.list_vm_ip_assignments(self.id).await?; @@ -35,8 +36,8 @@ impl Hydrate for Vm { } #[async_trait] -impl Hydrate for VmTemplate { - async fn hydrate_up(&mut self, db: &Box) -> Result<()> { +impl + Sync> Hydrate for VmTemplate { + async fn hydrate_up(&mut self, db: &D) -> Result<()> { let cost_plan = db.get_cost_plan(self.cost_plan_id).await?; let region = db.get_host_region(self.region_id).await?; self.cost_plan = Some(cost_plan); @@ -44,7 +45,7 @@ impl Hydrate for VmTemplate { Ok(()) } - async fn hydrate_down(&mut self, db: &Box) -> Result<()> { + async fn hydrate_down(&mut self, db: &D) -> Result<()> { todo!() } } diff --git a/lnvps_db/src/model.rs b/lnvps_db/src/model.rs index cc20ef7..8861eb0 100644 --- a/lnvps_db/src/model.rs +++ b/lnvps_db/src/model.rs @@ -1,7 +1,10 @@ +use anyhow::{anyhow, Result}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sqlx::FromRow; +use std::path::PathBuf; +use url::Url; #[serde_as] #[derive(Serialize, Deserialize, FromRow, Clone, Debug)] @@ -122,11 +125,26 @@ pub struct VmOsImage { pub version: String, pub enabled: bool, pub release_date: DateTime, + #[serde(skip_serializing)] /// URL location of cloud image pub url: String, } +impl VmOsImage { + pub fn filename(&self) -> Result { + let u: Url = self.url.parse()?; + let mut name: PathBuf = u + .path_segments() + .ok_or(anyhow!("Invalid URL"))? + .last() + .ok_or(anyhow!("Invalid URL"))? + .parse()?; + name.set_extension("img"); + Ok(name.to_string_lossy().to_string()) + } +} + #[derive(Serialize, Deserialize, FromRow, Clone, Debug)] pub struct IpRange { pub id: u64, diff --git a/src/api.rs b/src/api.rs index 4f09b06..6896c5e 100644 --- a/src/api.rs +++ b/src/api.rs @@ -45,14 +45,6 @@ struct ApiError { pub error: String, } -impl ApiError { - pub fn new(error: &str) -> Self { - Self { - error: error.to_owned(), - } - } -} - impl From for ApiError { fn from(value: T) -> Self { Self { @@ -76,13 +68,13 @@ async fn v1_list_vms( ) -> ApiResult> { let pubkey = auth.event.pubkey.to_bytes(); let uid = db.upsert_user(&pubkey).await?; - let mut vms = db.list_user_vms(uid).await?; + let vms = db.list_user_vms(uid).await?; let mut ret = vec![]; for mut vm in vms { - vm.hydrate_up(db).await?; - vm.hydrate_down(db).await?; + vm.hydrate_up(db.inner()).await?; + vm.hydrate_down(db.inner()).await?; if let Some(t) = &mut vm.template { - t.hydrate_up(db).await?; + t.hydrate_up(db.inner()).await?; } let state = vm_state.get_state(vm.id).await; @@ -105,10 +97,10 @@ async fn v1_get_vm( if vm.user_id != uid { return ApiData::err("VM doesnt belong to you"); } - vm.hydrate_up(db).await?; - vm.hydrate_down(db).await?; + vm.hydrate_up(db.inner()).await?; + vm.hydrate_down(db.inner()).await?; if let Some(t) = &mut vm.template { - t.hydrate_up(db).await?; + t.hydrate_up(db.inner()).await?; } let state = vm_state.get_state(vm.id).await; ApiData::ok(ApiVmStatus { vm, status: state }) @@ -124,7 +116,7 @@ async fn v1_list_vm_images(db: &State>) -> ApiResult>) -> ApiResult> { let mut vms = db.list_vm_templates().await?; for vm in &mut vms { - vm.hydrate_up(db).await?; + vm.hydrate_up(db.inner()).await?; } let ret: Vec = vms.into_iter().filter(|v| v.enabled).collect(); ApiData::ok(ret) @@ -180,7 +172,7 @@ async fn v1_create_vm_order( let mut rsp = provisioner .provision(uid, req.template_id, req.image_id, req.ssh_key_id) .await?; - rsp.hydrate_up(db).await?; + rsp.hydrate_up(db.inner()).await?; ApiData::ok(rsp) } diff --git a/src/bin/api.rs b/src/bin/api.rs index 162b8a2..5a13940 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -5,32 +5,15 @@ use lnvps::api; use lnvps::cors::CORS; use lnvps::exchange::ExchangeRateCache; use lnvps::invoice::InvoiceHandler; -use lnvps::provisioner::lnvps::LNVpsProvisioner; use lnvps::provisioner::Provisioner; +use lnvps::settings::Settings; use lnvps::status::VmStateCache; use lnvps::worker::{WorkJob, Worker}; use lnvps_db::{LNVpsDb, LNVpsDbMysql}; use log::error; -use serde::{Deserialize, Serialize}; use std::net::{IpAddr, SocketAddr}; -use std::path::PathBuf; use std::time::Duration; -#[derive(Debug, Deserialize, Serialize)] -pub struct Settings { - pub listen: Option, - pub db: String, - pub lnd: LndConfig, - pub read_only: bool, -} - -#[derive(Debug, Deserialize, Serialize)] -pub struct LndConfig { - pub url: String, - pub cert: PathBuf, - pub macaroon: PathBuf, -} - #[rocket::main] async fn main() -> Result<(), Error> { pretty_env_logger::init(); @@ -43,19 +26,22 @@ async fn main() -> Result<(), Error> { let db = LNVpsDbMysql::new(&settings.db).await?; db.migrate().await?; - let exchange = ExchangeRateCache::new(); let lnd = connect(settings.lnd.url, settings.lnd.cert, settings.lnd.macaroon).await?; - let provisioner = LNVpsProvisioner::new(db.clone(), lnd.clone(), exchange.clone()); #[cfg(debug_assertions)] { let setup_script = include_str!("../../dev_setup.sql"); db.execute(setup_script).await?; - provisioner.auto_discover().await?; } let status = VmStateCache::new(); - let mut worker = Worker::new(settings.read_only, db.clone(), lnd.clone(), status.clone(), exchange.clone()); + let worker_provisioner = + settings + .provisioner + .get_provisioner(db.clone(), lnd.clone(), exchange.clone()); + worker_provisioner.init().await?; + + let mut worker = Worker::new(db.clone(), worker_provisioner, status.clone()); let sender = worker.sender(); tokio::spawn(async move { loop { @@ -97,12 +83,17 @@ async fn main() -> Result<(), Error> { rates.set_rate(r.0, r.1).await; } } - Err(e) => error!("Failed to fetch rates: {}", e) + Err(e) => error!("Failed to fetch rates: {}", e), } tokio::time::sleep(Duration::from_secs(60)).await; } }); + let provisioner = + settings + .provisioner + .get_provisioner(db.clone(), lnd.clone(), exchange.clone()); + let db: Box = Box::new(db.clone()); let pv: Box = Box::new(provisioner); diff --git a/src/exchange.rs b/src/exchange.rs index 99dbe89..a1e1c22 100644 --- a/src/exchange.rs +++ b/src/exchange.rs @@ -2,7 +2,7 @@ use anyhow::{Error, Result}; use log::info; use rocket::serde::Deserialize; use std::collections::HashMap; -use std::fmt::{write, Display, Formatter}; +use std::fmt::{Display, Formatter}; use std::str::FromStr; use std::sync::Arc; use tokio::sync::RwLock; @@ -63,7 +63,6 @@ pub struct ExchangeRateCache { #[derive(Deserialize)] struct MempoolRates { - pub time: u64, #[serde(rename = "USD")] pub usd: Option, #[serde(rename = "EUR")] @@ -72,13 +71,16 @@ struct MempoolRates { impl ExchangeRateCache { pub fn new() -> Self { - Self { cache: Arc::new(RwLock::new(HashMap::new())) } + Self { + cache: Arc::new(RwLock::new(HashMap::new())), + } } pub async fn fetch_rates(&self) -> Result> { let rsp = reqwest::get("https://mempool.space/api/v1/prices") .await? - .text().await?; + .text() + .await?; let rates: MempoolRates = serde_json::from_str(&rsp)?; let mut ret = vec![]; @@ -102,4 +104,4 @@ impl ExchangeRateCache { let cache = self.cache.read().await; cache.get(&ticker).cloned() } -} \ No newline at end of file +} diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index ee84146..7347a3f 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -1,9 +1,12 @@ -use anyhow::{bail, Result}; -use log::info; +use anyhow::{anyhow, bail, Result}; +use log::{error, info}; use reqwest::{ClientBuilder, Url}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::fmt::Debug; +use std::str::FromStr; +use std::time::Duration; +use tokio::time::sleep; pub struct ProxmoxClient { base: Url, @@ -52,23 +55,101 @@ impl ProxmoxClient { Ok(rsp.data) } - pub async fn list_vms(&self, node: &str, full: bool) -> Result> { + pub async fn list_vms(&self, node: &str) -> Result> { let rsp: ResponseBase> = self.get(&format!("/api2/json/nodes/{node}/qemu")).await?; Ok(rsp.data) } - pub async fn list_storage(&self) -> Result> { - let rsp: ResponseBase> = self.get("/api2/json/storage").await?; + pub async fn list_storage(&self, node: &str) -> Result> { + let rsp: ResponseBase> = + self.get(&format!("/api2/json/nodes/{node}/storage")).await?; Ok(rsp.data) } - pub async fn create_vm(&self, req: CreateVm) -> Result { + /// List files in a storage pool + pub async fn list_storage_files(&self, node: &str, storage: &str) -> Result> { + let rsp: ResponseBase> = + self.get(&format!("/api2/json/nodes/{node}/storage/{storage}/content")).await?; + Ok(rsp.data) + } + + /// Create a new VM + /// + /// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/qemu + pub async fn create_vm(&self, req: CreateVm) -> Result { info!("{}", serde_json::to_string_pretty(&req)?); - let _rsp: ResponseBase> = self + let rsp: ResponseBase> = self .post(&format!("/api2/json/nodes/{}/qemu", req.node), &req) .await?; - self.get_vm_status(&req.node, req.vm_id).await + if let Some(id) = rsp.data { + Ok(TaskId { id, node: req.node }) + } else { + Err(anyhow!("Failed to configure VM")) + } + } + + /// Configure a VM + /// + /// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/qemu/{vmid}/config + pub async fn configure_vm(&self, req: ConfigureVm) -> Result { + info!("{}", serde_json::to_string_pretty(&req)?); + let rsp: ResponseBase> = self + .post( + &format!("/api2/json/nodes/{}/qemu/{}/config", req.node, req.vm_id), + &req, + ) + .await?; + if let Some(id) = rsp.data { + Ok(TaskId { id, node: req.node }) + } else { + Err(anyhow!("Failed to configure VM")) + } + } + + /// Get the current status of a running task + /// + /// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/tasks/{upid}/status + pub async fn get_task_status(&self, task: &TaskId) -> Result { + let rsp: ResponseBase = self + .get(&format!( + "/api2/json/nodes/{}/tasks/{}/status", + task.node, task.id + )) + .await?; + Ok(rsp.data) + } + + /// Helper function to wait for a task to complete + pub async fn wait_for_task(&self, task: &TaskId) -> Result { + loop { + let s = self.get_task_status(task).await?; + if s.is_finished() { + if s.is_success() { + return Ok(s); + } else { + bail!( + "Task finished with error: {}", + s.exit_status.unwrap_or("no error message".to_string()) + ); + } + } + sleep(Duration::from_secs(1)).await; + } + } + + /// Download an image to the host disk + pub async fn download_image(&self, req: DownloadUrlRequest) -> Result { + let rsp: ResponseBase = self + .post( + &format!( + "/api2/json/nodes/{}/storage/{}/download-url", + req.node, req.storage + ), + &req, + ) + .await?; + Ok(TaskId { id: rsp.data, node: req.node }) } async fn get(&self, path: &str) -> Result { @@ -80,7 +161,8 @@ impl ProxmoxClient { .await?; let status = rsp.status(); let text = rsp.text().await?; - //info!("<< {}", text); + #[cfg(debug_assertions)] + info!("<< {}", text); if status.is_success() { Ok(serde_json::from_str(&text)?) } else { @@ -100,7 +182,8 @@ impl ProxmoxClient { .await?; let status = rsp.status(); let text = rsp.text().await?; - //info!("<< {}", text); + #[cfg(debug_assertions)] + info!("<< {}", text); if status.is_success() { Ok(serde_json::from_str(&text)?) } else { @@ -109,6 +192,48 @@ impl ProxmoxClient { } } +#[derive(Debug, Clone)] +pub struct TaskId { + pub id: String, + pub node: String, +} + +#[derive(Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum TaskState { + Running, + Stopped, +} + +#[derive(Deserialize)] +pub struct TaskStatus { + pub id: String, + pub node: String, + pub pid: u32, + #[serde(rename = "pstart")] + pub p_start: u64, + #[serde(rename = "starttime")] + pub start_time: u64, + pub status: TaskState, + #[serde(rename = "type")] + pub task_type: String, + #[serde(rename = "upid")] + pub up_id: String, + pub user: String, + #[serde(rename = "exitstatus")] + pub exit_status: Option, +} + +impl TaskStatus { + pub fn is_finished(&self) -> bool { + self.status == TaskState::Stopped + } + + pub fn is_success(&self) -> bool { + self.is_finished() && self.exit_status == Some("OK".to_string()) + } +} + #[derive(Deserialize)] pub struct ResponseBase { pub data: T, @@ -184,7 +309,7 @@ pub enum StorageType { Dir, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum StorageContent { Images, @@ -192,10 +317,28 @@ pub enum StorageContent { Backup, ISO, VZTmpL, + Import, +} + +impl FromStr for StorageContent { + type Err = (); + + fn from_str(s: &str) -> std::result::Result { + match s.to_lowercase().as_str() { + "images" => Ok(StorageContent::Images), + "rootdir" => Ok(StorageContent::RootDir), + "backup" => Ok(StorageContent::Backup), + "iso" => Ok(StorageContent::ISO), + "vztmpl" => Ok(StorageContent::VZTmpL), + "import" => Ok(StorageContent::Import), + _ => Err(()), + } + } } #[derive(Debug, Deserialize)] pub struct NodeStorage { + pub content: String, pub storage: String, #[serde(rename = "type")] pub kind: Option, @@ -203,6 +346,33 @@ pub struct NodeStorage { pub thin_pool: Option, } +impl NodeStorage { + pub fn contents(&self) -> Vec { + self.content + .split(",") + .map_while(|s| s.parse().ok()) + .collect() + } +} +#[derive(Debug, Serialize)] +pub struct DownloadUrlRequest { + pub content: StorageContent, + pub node: String, + pub storage: String, + pub url: String, + pub filename: String, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct StorageContentEntry { + pub format: String, + pub size: u64, + #[serde(rename = "volid")] + pub vol_id: String, + #[serde(rename = "vmid")] + pub vm_id: Option, +} + #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum VmBios { @@ -215,6 +385,25 @@ pub struct CreateVm { pub node: String, #[serde(rename = "vmid")] pub vm_id: i32, + #[serde(flatten)] + pub config: VmConfig, +} + +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct ConfigureVm { + pub node: String, + #[serde(rename = "vmid")] + pub vm_id: i32, + #[serde(skip_serializing_if = "Option::is_none")] + pub current: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot: Option, + #[serde(flatten)] + pub config: VmConfig, +} + +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct VmConfig { #[serde(rename = "onboot")] #[serde(skip_serializing_if = "Option::is_none")] pub on_boot: Option, diff --git a/src/lib.rs b/src/lib.rs index b730c6e..5bdb162 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,10 @@ pub mod api; pub mod cors; +pub mod exchange; pub mod host; pub mod invoice; pub mod nip98; pub mod provisioner; +pub mod settings; pub mod status; pub mod worker; -pub mod exchange; diff --git a/src/provisioner/lnvps.rs b/src/provisioner/lnvps.rs index 548d277..7e390cd 100644 --- a/src/provisioner/lnvps.rs +++ b/src/provisioner/lnvps.rs @@ -1,6 +1,10 @@ -use crate::exchange::{Currency, ExchangeRateCache, Ticker}; -use crate::host::proxmox::ProxmoxClient; +use crate::exchange::{ExchangeRateCache, Ticker}; +use crate::host::proxmox::{ + ConfigureVm, CreateVm, DownloadUrlRequest, ProxmoxClient, StorageContent, TaskState, VmBios, + VmConfig, +}; use crate::provisioner::Provisioner; +use crate::settings::QemuConfig; use anyhow::{bail, Result}; use chrono::{Days, Months, Utc}; use fedimint_tonic_lnd::lnrpc::Invoice; @@ -9,84 +13,92 @@ use fedimint_tonic_lnd::Client; use ipnetwork::IpNetwork; use lnvps_db::hydrate::Hydrate; use lnvps_db::{ - IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmOsImage, VmPayment, + IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmHost, VmHostKind, VmIpAssignment, VmOsImage, + VmPayment, }; -use log::{info, warn}; +use log::{error, info, warn}; use rand::seq::IteratorRandom; +use reqwest::Url; use std::collections::HashSet; use std::net::IpAddr; use std::ops::Add; use std::path::PathBuf; use std::time::Duration; +use tokio::time::sleep; pub struct LNVpsProvisioner { db: Box, lnd: Client, rates: ExchangeRateCache, + config: QemuConfig, } impl LNVpsProvisioner { - pub fn new(db: D, lnd: Client, rates: ExchangeRateCache) -> Self { + pub fn new( + config: QemuConfig, + db: impl LNVpsDb + 'static, + lnd: Client, + rates: ExchangeRateCache, + ) -> Self { Self { db: Box::new(db), lnd, rates, + config, } } - /// Auto-discover resources - pub async fn auto_discover(&self) -> Result<()> { - let hosts = self.db.list_hosts().await?; - for host in hosts { - let api = ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token); - - let nodes = api.list_nodes().await?; - if let Some(node) = nodes.iter().find(|n| n.name == host.name) { - // Update host resources - if node.max_cpu.unwrap_or(host.cpu) != host.cpu - || node.max_mem.unwrap_or(host.memory) != host.memory - { - let mut host = host.clone(); - host.cpu = node.max_cpu.unwrap_or(host.cpu); - host.memory = node.max_mem.unwrap_or(host.memory); - info!("Patching host: {:?}", host); - self.db.update_host(&host).await?; - } - // Update disk info - let storages = api.list_storage().await?; - let host_disks = self.db.list_host_disks(host.id).await?; - for storage in storages { - let host_storage = - if let Some(s) = host_disks.iter().find(|d| d.name == storage.storage) { - s - } else { - warn!("Disk not found: {} on {}", storage.storage, host.name); - continue; - }; - - // TODO: patch host storage info - } + fn get_host_client(host: &VmHost) -> Result { + Ok(match host.kind { + VmHostKind::Proxmox => { + ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token) } - info!( - "Discovering resources from: {} v{}", - &host.name, - api.version().await?.version - ); - } - - Ok(()) + }) } - fn map_os_image(image: &VmOsImage) -> PathBuf { - PathBuf::from("/var/lib/vz/images/").join(format!( - "{:?}_{}_{}.img", - image.distribution, image.flavour, image.version - )) + async fn get_iso_storage(node: &str, client: &ProxmoxClient) -> Result { + let storages = client.list_storage(node).await?; + if let Some(s) = storages + .iter() + .find(|s| s.contents().contains(&StorageContent::Import)) + { + Ok(s.storage.clone()) + } else { + bail!("No image storage found"); + } } } #[async_trait] impl Provisioner for LNVpsProvisioner { + async fn init(&self) -> Result<()> { + // tell hosts to download images + let hosts = self.db.list_hosts().await?; + for host in hosts { + let client = Self::get_host_client(&host)?; + let iso_storage = Self::get_iso_storage(&host.name, &client).await?; + let files = client.list_storage_files(&host.name, &iso_storage).await?; + + for image in self.db.list_os_image().await? { + info!("Downloading image {} on {}", image.url, host.name); + let i_name = image.filename()?; + if files.iter().any(|v| v.vol_id.ends_with(&format!("iso/{i_name}"))) { + info!("Already downloaded, skipping"); + continue; + } + let t_download = client.download_image(DownloadUrlRequest { + content: StorageContent::Import, + node: host.name.clone(), + storage: iso_storage.clone(), + url: image.url.clone(), + filename: i_name, + }).await?; + client.wait_for_task(&t_download).await?; + } + } + Ok(()) + } + async fn provision( &self, user_id: u64, @@ -138,7 +150,7 @@ impl Provisioner for LNVpsProvisioner { let template = self.db.get_vm_template(vm.template_id).await?; let cost_plan = self.db.get_cost_plan(template.cost_plan_id).await?; - /// Reuse existing payment until expired + // Reuse existing payment until expired let payments = self.db.list_vm_payment(vm.id).await?; if let Some(px) = payments .into_iter() @@ -220,12 +232,12 @@ impl Provisioner for LNVpsProvisioner { } let mut ret = vec![]; - /// Try all ranges + // Try all ranges // TODO: pick round-robin ranges for range in ip_ranges { let range_cidr: IpNetwork = range.cidr.parse()?; let ips = self.db.list_vm_ip_assignments_in_range(range.id).await?; - let ips: HashSet = ips.iter().map(|i| i.ip.parse().unwrap()).collect(); + let ips: HashSet = ips.iter().map_while(|i| i.ip.parse().ok()).collect(); // pick an IP at random let cidr: Vec = { @@ -253,4 +265,79 @@ impl Provisioner for LNVpsProvisioner { Ok(ret) } + + async fn spawn_vm(&self, vm_id: u64) -> Result<()> { + if self.config.read_only { + bail!("Cant spawn VM's in read-only mode"); + } + let vm = self.db.get_vm(vm_id).await?; + let host = self.db.get_host(vm.host_id).await?; + let client = Self::get_host_client(&host)?; + + let mut ips = self.db.list_vm_ip_assignments(vm.id).await?; + if ips.is_empty() { + ips = self.allocate_ips(vm.id).await?; + } + + let mut ip_config = ips + .iter() + .map_while(|ip| { + if let Ok(net) = ip.ip.parse::() { + Some(match net { + IpNetwork::V4(addr) => format!("ip={}", addr), + IpNetwork::V6(addr) => format!("ip6={}", addr), + }) + } else { + None + } + }) + .collect::>(); + ip_config.push("ip6=auto".to_string()); + + let drives = self.db.list_host_disks(vm.host_id).await?; + let drive = if let Some(d) = drives.iter().find(|d| d.enabled) { + d + } else { + bail!("No host drive found!") + }; + + let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?; + + let mut net = vec![ + "virtio".to_string(), + format!("bridge={}", self.config.bridge), + ]; + if let Some(t) = self.config.vlan { + net.push(format!("tag={}", t)); + } + + // create VM + let t_create = client + .create_vm(CreateVm { + node: host.name.clone(), + vm_id: (vm.id + 100) as i32, + config: VmConfig { + on_boot: Some(true), + bios: Some(VmBios::OVMF), + boot: Some("order=scsi0".to_string()), + cores: Some(vm.cpu as i32), + cpu: Some(self.config.cpu.clone()), + ip_config: Some(ip_config.join(",")), + machine: Some(self.config.machine.clone()), + memory: Some((vm.memory / 1024 / 1024).to_string()), + net: Some(net.join(",")), + os_type: Some(self.config.os_type.clone()), + scsi_1: Some(format!("{}:cloudinit", &drive.name)), + scsi_hw: Some("virtio-scsi-pci".to_string()), + ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()), + efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)), + ..Default::default() + }, + }) + .await?; + client.wait_for_task(&t_create).await?; + + // TODO: Find a way to automate importing disk + Ok(()) + } } diff --git a/src/provisioner/mod.rs b/src/provisioner/mod.rs index 70a4771..ef00d22 100644 --- a/src/provisioner/mod.rs +++ b/src/provisioner/mod.rs @@ -6,6 +6,9 @@ pub mod lnvps; #[async_trait] pub trait Provisioner: Send + Sync { + /// Do any necessary initialization + async fn init(&self) -> Result<()>; + /// Provision a new VM for a user on the database /// /// Note: @@ -24,4 +27,7 @@ pub trait Provisioner: Send + Sync { /// Allocate ips for a VM async fn allocate_ips(&self, vm_id: u64) -> Result>; + + /// Spawn a VM on the host + async fn spawn_vm(&self, vm_id: u64) -> Result<()>; } diff --git a/src/settings.rs b/src/settings.rs new file mode 100644 index 0000000..f709775 --- /dev/null +++ b/src/settings.rs @@ -0,0 +1,61 @@ +use crate::exchange::ExchangeRateCache; +use crate::provisioner::lnvps::LNVpsProvisioner; +use crate::provisioner::Provisioner; +use fedimint_tonic_lnd::Client; +use lnvps_db::LNVpsDb; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Settings { + pub listen: Option, + pub db: String, + pub lnd: LndConfig, + pub provisioner: ProvisionerConfig, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct LndConfig { + pub url: String, + pub cert: PathBuf, + pub macaroon: PathBuf, +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum ProvisionerConfig { + Proxmox(QemuConfig), +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct QemuConfig { + /// Readonly mode, don't spawn any VM's + pub read_only: bool, + + /// Machine type (q35) + pub machine: String, + + /// OS Type + pub os_type: String, + + /// Network bridge used for the networking interface + pub bridge: String, + + /// CPU type + pub cpu: String, + + /// VLAN tag all spawned VM's + pub vlan: Option +} + +impl ProvisionerConfig { + pub fn get_provisioner( + &self, + db: impl LNVpsDb + 'static, + lnd: Client, + exchange: ExchangeRateCache, + ) -> impl Provisioner + 'static { + match self { + ProvisionerConfig::Proxmox(c) => LNVpsProvisioner::new(c.clone(), db, lnd, exchange), + } + } +} diff --git a/src/worker.rs b/src/worker.rs index c6e1fa1..0a95979 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,14 +1,10 @@ -use crate::exchange::ExchangeRateCache; -use crate::host::proxmox::{CreateVm, ProxmoxClient, VmBios, VmStatus}; -use crate::provisioner::lnvps::LNVpsProvisioner; +use crate::host::proxmox::{ProxmoxClient, VmStatus}; use crate::provisioner::Provisioner; use crate::status::{VmRunningState, VmState, VmStateCache}; -use anyhow::{bail, Result}; +use anyhow::Result; use chrono::Utc; -use fedimint_tonic_lnd::Client; -use ipnetwork::IpNetwork; -use lnvps_db::{LNVpsDb, Vm, VmHost}; -use log::{error, info, warn}; +use lnvps_db::LNVpsDb; +use log::{debug, error, info, warn}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; pub enum WorkJob { @@ -21,9 +17,7 @@ pub enum WorkJob { } pub struct Worker { - read_only: bool, db: Box, - lnd: Client, provisioner: Box, vm_state_cache: VmStateCache, tx: UnboundedSender, @@ -31,21 +25,16 @@ pub struct Worker { } impl Worker { - pub fn new( - read_only: bool, + pub fn new( db: D, - lnd: Client, + provisioner: P, vm_state_cache: VmStateCache, - rates: ExchangeRateCache, ) -> Self { let (tx, rx) = unbounded_channel(); - let p = LNVpsProvisioner::new(db.clone(), lnd.clone(), rates); Self { - read_only, db: Box::new(db), - provisioner: Box::new(p), + provisioner: Box::new(provisioner), vm_state_cache, - lnd, tx, rx, } @@ -55,74 +44,15 @@ impl Worker { self.tx.clone() } - /// Spawn a VM on the host - async fn spawn_vm(&self, vm: &Vm, vm_host: &VmHost, client: &ProxmoxClient) -> Result<()> { - if self.read_only { - bail!("Cant spawn VM's in read-only mode"); - } - let mut ips = self.db.list_vm_ip_assignments(vm.id).await?; - if ips.is_empty() { - ips = self.provisioner.allocate_ips(vm.id).await?; - } - - let ip_config = ips - .iter() - .map_while(|ip| { - if let Ok(net) = ip.ip.parse::() { - Some(match net { - IpNetwork::V4(addr) => format!("ip={}", addr), - IpNetwork::V6(addr) => format!("ip6={}", addr), - }) - } else { - None - } - }) - .collect::>() - .join(","); - - let drives = self.db.list_host_disks(vm.host_id).await?; - let drive = if let Some(d) = drives.iter().find(|d| d.enabled) { - d - } else { - bail!("No host drive found!") - }; - - let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?; - - client - .create_vm(CreateVm { - node: vm_host.name.clone(), - vm_id: (vm.id + 100) as i32, - bios: Some(VmBios::OVMF), - boot: Some("order=scsi0".to_string()), - cores: Some(vm.cpu as i32), - cpu: Some("kvm64".to_string()), - ip_config: Some(ip_config), - machine: Some("q35".to_string()), - memory: Some((vm.memory / 1024 / 1024).to_string()), - net: Some("virtio,bridge=vmbr0,tag=100".to_string()), - os_type: Some("l26".to_string()), - scsi_1: Some(format!("{}:cloudinit", &drive.name)), - scsi_hw: Some("virtio-scsi-pci".to_string()), - ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()), - efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)), - ..Default::default() - }) - .await?; - - Ok(()) - } - /// Check a VM's status async fn check_vm(&self, vm_id: u64) -> Result<()> { - info!("Checking VM {}", vm_id); + debug!("Checking VM: {}", vm_id); let vm = self.db.get_vm(vm_id).await?; let host = self.db.get_host(vm.host_id).await?; let client = ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token); match client.get_vm_status(&host.name, (vm.id + 100) as i32).await { Ok(s) => { - info!("VM {} status: {:?}", vm_id, s.status); let state = VmState { state: match s.status { VmStatus::Stopped => VmRunningState::Stopped, @@ -139,9 +69,9 @@ impl Worker { self.vm_state_cache.set_state(vm_id, state).await?; } Err(e) => { - warn!("Failed to get VM status: {}", e); + warn!("Failed to get VM {} status: {}", vm.id, e); if vm.expires > Utc::now() { - self.spawn_vm(&vm, &host, &client).await?; + self.provisioner.spawn_vm(vm.id).await?; } } }