refactor: remove Provisioner trait

refactor: abstract VmHostClient from ProxmoxClient
tests: add integration test for LnVpsProvisioner
This commit is contained in:
2025-03-03 11:07:09 +00:00
parent b7b940abff
commit 834ed44408
21 changed files with 957 additions and 709 deletions

29
Cargo.lock generated
View File

@ -2027,6 +2027,7 @@ dependencies = [
"nostr-sdk",
"pretty_env_logger",
"rand 0.9.0",
"regex",
"reqwest",
"rocket",
"rocket_okapi",
@ -2039,6 +2040,7 @@ dependencies = [
"tokio",
"tokio-tungstenite 0.21.0",
"urlencoding",
"virt",
]
[[package]]
@ -4619,6 +4621,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587"
[[package]]
name = "valuable"
version = "0.1.1"
@ -4637,6 +4645,27 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "virt"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a05f77c836efa9be343b5419663cf829d75203b813579993cdd9c44f51767e"
dependencies = [
"libc",
"uuid",
"virt-sys",
]
[[package]]
name = "virt-sys"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c504e459878f09177f41bf2f8bb3e9a8af4fca7a09e73152fee02535d501601c"
dependencies = [
"libc",
"pkg-config",
]
[[package]]
name = "want"
version = "0.3.1"

View File

@ -7,9 +7,11 @@ edition = "2021"
name = "api"
[features]
default = ["mikrotik", "nostr-dm"]
mikrotik = []
default = ["mikrotik", "nostr-dm", "proxmox"]
mikrotik = ["dep:reqwest"]
nostr-dm = ["dep:nostr-sdk"]
proxmox = ["dep:reqwest", "dep:ssh2", "dep:tokio-tungstenite"]
libvirt = ["dep:virt"]
[dependencies]
lnvps_db = { path = "lnvps_db" }
@ -20,29 +22,32 @@ log = "0.4.21"
pretty_env_logger = "0.5.0"
serde = { version = "1.0.213", features = ["derive"] }
serde_json = "1.0.132"
reqwest = { version = "0.12.8" }
rocket = { version = "0.5.1", features = ["json"] }
rocket_okapi = { version = "0.9.0", features = ["swagger", "rapidoc"] }
schemars = { version = "0.8.22", features = ["chrono"] }
chrono = { version = "0.4.38", features = ["serde"] }
nostr = { version = "0.39.0", default-features = false, features = ["std"] }
base64 = { version = "0.22.1", features = ["alloc"] }
urlencoding = "2.1.3"
fedimint-tonic-lnd = { version = "0.2.0", default-features = false, features = ["invoicesrpc"] }
ipnetwork = { git = "https://git.v0l.io/Kieran/ipnetwork.git", rev = "35977adc8103cfc232bc95fbc32f4e34f2b6a6d7" }
rand = "0.9.0"
clap = { version = "4.5.21", features = ["derive"] }
ssh2 = "0.9.4"
ssh-key = "0.6.7"
lettre = { version = "0.11.10", features = ["tokio1-native-tls"] }
ws = { package = "rocket_ws", version = "0.1.0" }
tokio-tungstenite = { version = "^0.21", features = ["native-tls"] }
native-tls = "0.2.12"
hex = "0.4.3"
futures = "0.3.31"
#nostr-dm
nostr = { version = "0.39.0", default-features = false, features = ["std"] }
nostr-sdk = { version = "0.39.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
#proxmox
tokio-tungstenite = { version = "^0.21", features = ["native-tls"], optional = true}
ssh2 = { version = "0.9.4", optional = true }
reqwest = { version = "0.12.8", optional = true }
#libvirt
virt = { version = "0.4.2", optional = true }
regex = "1.11.1"

View File

@ -4,7 +4,7 @@ FROM $IMAGE AS build
WORKDIR /app/src
COPY . .
RUN apt update && apt -y install protobuf-compiler
RUN cargo install --path . --root /app/build
RUN cargo test && cargo install --path . --root /app/build
FROM $IMAGE AS runner
WORKDIR /app

View File

@ -2,7 +2,7 @@ insert
ignore into vm_host_region(id,name,enabled) values(1,"uat",1);
insert
ignore into vm_host(id,kind,region_id,name,ip,cpu,memory,enabled,api_token)
values(1, 0, 1, "lab", "https://185.18.221.8:8006", 4, 4096*1024, 1, "root@pam!tester=c82f8a57-f876-4ca4-8610-c086d8d9d51c");
values(1, 0, 1, "lab", "https://10.100.1.5:8006", 4, 4096*1024, 1, "root@pam!tester=c82f8a57-f876-4ca4-8610-c086d8d9d51c");
insert
ignore into vm_host_disk(id,host_id,name,size,kind,interface,enabled)
values(1,1,"local-zfs",1000*1000*1000*1000, 0, 0, 1);
@ -22,8 +22,8 @@ insert
ignore into vm_os_image(id,distribution,flavour,version,enabled,url,release_date)
values(5, 1,"Server","11",1,"https://cloud.debian.org/images/cloud/bullseye/latest/debian-11-genericcloud-amd64.raw","2021-08-14");
insert
ignore into ip_range(id,cidr,enabled,region_id)
values(1,"185.18.221.80/28",1,1);
ignore into ip_range(id,cidr,enabled,region_id,gateway)
values(1,"10.100.1.128/25",1,1,"10.100.1.1/24");
insert
ignore into vm_cost_plan(id,name,amount,currency,interval_amount,interval_type)
values(1,"tiny_monthly",2,"EUR",1,1);

View File

@ -50,9 +50,12 @@ pub trait LNVpsDb: Sync + Send {
/// Update host resources (usually from [auto_discover])
async fn update_host(&self, host: &VmHost) -> Result<()>;
/// List VM's owned by a specific user
/// List enabled storage disks on the host
async fn list_host_disks(&self, host_id: u64) -> Result<Vec<VmHostDisk>>;
/// Get a specific host disk
async fn get_host_disk(&self, disk_id: u64) -> Result<VmHostDisk>;
/// Get OS image by id
async fn get_os_image(&self, id: u64) -> Result<VmOsImage>;

View File

@ -34,6 +34,16 @@ pub struct UserSshKey {
/// The type of VM host
pub enum VmHostKind {
Proxmox = 0,
LibVirt = 1,
}
impl Display for VmHostKind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
VmHostKind::Proxmox => write!(f, "proxmox"),
VmHostKind::LibVirt => write!(f, "libvirt"),
}
}
}
#[derive(FromRow, Clone, Debug)]

View File

@ -140,13 +140,21 @@ impl LNVpsDb for LNVpsDbMysql {
}
async fn list_host_disks(&self, host_id: u64) -> Result<Vec<VmHostDisk>> {
sqlx::query_as("select * from vm_host_disk where host_id = ?")
sqlx::query_as("select * from vm_host_disk where host_id = ? and enabled = 1")
.bind(host_id)
.fetch_all(&self.db)
.await
.map_err(Error::new)
}
async fn get_host_disk(&self, disk_id: u64) -> Result<VmHostDisk> {
sqlx::query_as("select * from vm_host_disk where id = ?")
.bind(disk_id)
.fetch_one(&self.db)
.await
.map_err(Error::new)
}
async fn get_os_image(&self, id: u64) -> Result<VmOsImage> {
sqlx::query_as("select * from vm_os_image where id=?")
.bind(id)

View File

@ -2,15 +2,17 @@ use crate::api::model::{
AccountPatchRequest, ApiUserSshKey, ApiVmIpAssignment, ApiVmOsImage, ApiVmPayment, ApiVmStatus,
ApiVmTemplate, CreateSshKey, CreateVmRequest, VMPatchRequest,
};
use crate::host::get_host_client;
use crate::nip98::Nip98Auth;
use crate::provisioner::Provisioner;
use crate::provisioner::LNVpsProvisioner;
use crate::settings::Settings;
use crate::status::{VmState, VmStateCache};
use crate::worker::WorkJob;
use anyhow::{bail, Result};
use futures::future::join_all;
use lnvps_db::{IpRange, LNVpsDb};
use log::{debug, error};
use nostr::util::hex;
use nostr_sdk::async_utility::futures_util::future::join_all;
use rocket::futures::{Sink, SinkExt, StreamExt};
use rocket::serde::json::Json;
use rocket::{get, patch, post, Responder, Route, State};
@ -23,6 +25,7 @@ use serde::{Deserialize, Serialize};
use ssh_key::PublicKey;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use ws::Message;
@ -87,7 +90,7 @@ impl OpenApiResponderInner for ApiError {
#[patch("/api/v1/account", format = "json", data = "<req>")]
async fn v1_patch_account(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
db: &State<Arc<dyn LNVpsDb>>,
req: Json<AccountPatchRequest>,
) -> ApiResult<()> {
let pubkey = auth.event.pubkey.to_bytes();
@ -107,7 +110,7 @@ async fn v1_patch_account(
#[get("/api/v1/account")]
async fn v1_get_account(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
db: &State<Arc<dyn LNVpsDb>>,
) -> ApiResult<AccountPatchRequest> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
@ -121,7 +124,7 @@ async fn v1_get_account(
}
async fn vm_to_status(
db: &Box<dyn LNVpsDb>,
db: &Arc<dyn LNVpsDb>,
vm: lnvps_db::Vm,
state: Option<VmState>,
) -> Result<ApiVmStatus> {
@ -166,7 +169,7 @@ async fn vm_to_status(
#[get("/api/v1/vm")]
async fn v1_list_vms(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
db: &State<Arc<dyn LNVpsDb>>,
vm_state: &State<VmStateCache>,
) -> ApiResult<Vec<ApiVmStatus>> {
let pubkey = auth.event.pubkey.to_bytes();
@ -186,7 +189,7 @@ async fn v1_list_vms(
#[get("/api/v1/vm/<id>")]
async fn v1_get_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
db: &State<Arc<dyn LNVpsDb>>,
vm_state: &State<VmStateCache>,
id: u64,
) -> ApiResult<ApiVmStatus> {
@ -204,8 +207,8 @@ async fn v1_get_vm(
#[patch("/api/v1/vm/<id>", data = "<data>", format = "json")]
async fn v1_patch_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
db: &State<Arc<dyn LNVpsDb>>,
settings: &State<Settings>,
id: u64,
data: Json<VMPatchRequest>,
) -> ApiResult<()> {
@ -225,7 +228,10 @@ async fn v1_patch_vm(
}
db.update_vm(&vm).await?;
provisioner.patch_vm(vm.id).await?;
let host = db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &settings.provisioner)?;
client.configure_vm(&vm).await?;
ApiData::ok(())
}
@ -233,7 +239,7 @@ async fn v1_patch_vm(
/// List available VM OS images
#[openapi(tag = "Image")]
#[get("/api/v1/image")]
async fn v1_list_vm_images(db: &State<Box<dyn LNVpsDb>>) -> ApiResult<Vec<ApiVmOsImage>> {
async fn v1_list_vm_images(db: &State<Arc<dyn LNVpsDb>>) -> ApiResult<Vec<ApiVmOsImage>> {
let images = db.list_os_image().await?;
let ret = images
.into_iter()
@ -246,7 +252,7 @@ async fn v1_list_vm_images(db: &State<Box<dyn LNVpsDb>>) -> ApiResult<Vec<ApiVmO
/// List available VM templates (Offers)
#[openapi(tag = "Template")]
#[get("/api/v1/vm/templates")]
async fn v1_list_vm_templates(db: &State<Box<dyn LNVpsDb>>) -> ApiResult<Vec<ApiVmTemplate>> {
async fn v1_list_vm_templates(db: &State<Arc<dyn LNVpsDb>>) -> ApiResult<Vec<ApiVmTemplate>> {
let templates = db.list_vm_templates().await?;
let cost_plans: HashSet<u64> = templates.iter().map(|t| t.cost_plan_id).collect();
@ -292,7 +298,7 @@ async fn v1_list_vm_templates(db: &State<Box<dyn LNVpsDb>>) -> ApiResult<Vec<Api
#[get("/api/v1/ssh-key")]
async fn v1_list_ssh_keys(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
db: &State<Arc<dyn LNVpsDb>>,
) -> ApiResult<Vec<ApiUserSshKey>> {
let uid = db.upsert_user(&auth.event.pubkey.to_bytes()).await?;
let ret = db
@ -309,7 +315,7 @@ async fn v1_list_ssh_keys(
#[post("/api/v1/ssh-key", data = "<req>", format = "json")]
async fn v1_add_ssh_key(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
db: &State<Arc<dyn LNVpsDb>>,
req: Json<CreateSshKey>,
) -> ApiResult<ApiUserSshKey> {
let uid = db.upsert_user(&auth.event.pubkey.to_bytes()).await?;
@ -342,8 +348,8 @@ async fn v1_add_ssh_key(
#[post("/api/v1/vm", data = "<req>", format = "json")]
async fn v1_create_vm_order(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
db: &State<Arc<dyn LNVpsDb>>,
provisioner: &State<Arc<LNVpsProvisioner>>,
req: Json<CreateVmRequest>,
) -> ApiResult<ApiVmStatus> {
let pubkey = auth.event.pubkey.to_bytes();
@ -361,8 +367,8 @@ async fn v1_create_vm_order(
#[get("/api/v1/vm/<id>/renew")]
async fn v1_renew_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
db: &State<Arc<dyn LNVpsDb>>,
provisioner: &State<Arc<LNVpsProvisioner>>,
id: u64,
) -> ApiResult<ApiVmPayment> {
let pubkey = auth.event.pubkey.to_bytes();
@ -381,8 +387,8 @@ async fn v1_renew_vm(
#[patch("/api/v1/vm/<id>/start")]
async fn v1_start_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
db: &State<Arc<dyn LNVpsDb>>,
settings: &State<Settings>,
worker: &State<UnboundedSender<WorkJob>>,
id: u64,
) -> ApiResult<()> {
@ -392,8 +398,10 @@ async fn v1_start_vm(
if uid != vm.user_id {
return ApiData::err("VM does not belong to you");
}
let host = db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &settings.provisioner)?;
client.start_vm(&vm).await?;
provisioner.start_vm(id).await?;
worker.send(WorkJob::CheckVm { vm_id: id })?;
ApiData::ok(())
}
@ -403,8 +411,8 @@ async fn v1_start_vm(
#[patch("/api/v1/vm/<id>/stop")]
async fn v1_stop_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
db: &State<Arc<dyn LNVpsDb>>,
settings: &State<Settings>,
worker: &State<UnboundedSender<WorkJob>>,
id: u64,
) -> ApiResult<()> {
@ -415,7 +423,10 @@ async fn v1_stop_vm(
return ApiData::err("VM does not belong to you");
}
provisioner.stop_vm(id).await?;
let host = db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &settings.provisioner)?;
client.stop_vm(&vm).await?;
worker.send(WorkJob::CheckVm { vm_id: id })?;
ApiData::ok(())
}
@ -425,8 +436,8 @@ async fn v1_stop_vm(
#[patch("/api/v1/vm/<id>/restart")]
async fn v1_restart_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
db: &State<Arc<dyn LNVpsDb>>,
settings: &State<Settings>,
worker: &State<UnboundedSender<WorkJob>>,
id: u64,
) -> ApiResult<()> {
@ -437,7 +448,10 @@ async fn v1_restart_vm(
return ApiData::err("VM does not belong to you");
}
provisioner.restart_vm(id).await?;
let host = db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &settings.provisioner)?;
client.stop_vm(&vm).await?;
worker.send(WorkJob::CheckVm { vm_id: id })?;
ApiData::ok(())
}
@ -447,7 +461,7 @@ async fn v1_restart_vm(
#[get("/api/v1/payment/<id>")]
async fn v1_get_payment(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
db: &State<Arc<dyn LNVpsDb>>,
id: &str,
) -> ApiResult<ApiVmPayment> {
let pubkey = auth.event.pubkey.to_bytes();
@ -470,10 +484,10 @@ async fn v1_get_payment(
#[get("/api/v1/console/<id>?<auth>")]
async fn v1_terminal_proxy(
auth: &str,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
db: &State<Arc<dyn LNVpsDb>>,
_provisioner: &State<Arc<LNVpsProvisioner>>,
id: u64,
ws: ws::WebSocket,
_ws: ws::WebSocket,
) -> Result<ws::Channel<'static>, &'static str> {
let auth = Nip98Auth::from_base64(auth).map_err(|_| "Missing or invalid auth param")?;
if auth.check(&format!("/api/v1/console/{id}"), "GET").is_err() {
@ -486,89 +500,5 @@ async fn v1_terminal_proxy(
return Err("VM does not belong to you");
}
let ws_upstream = provisioner.terminal_proxy(vm.id).await.map_err(|e| {
error!("Failed to start terminal proxy: {}", e);
"Failed to open terminal proxy"
})?;
let ws = ws.config(Default::default());
Ok(ws.channel(move |stream| {
Box::pin(async move {
let (mut tx_upstream, mut rx_upstream) = ws_upstream.split();
let (mut tx_client, mut rx_client) = stream.split();
async fn process_client<S, E>(
msg: Result<Message, E>,
tx_upstream: &mut S,
) -> Result<()>
where
S: SinkExt<Message> + Unpin,
<S as Sink<Message>>::Error: Display,
E: Display,
{
match msg {
Ok(m) => {
let m_up = match m {
Message::Text(t) => Message::Text(format!("0:{}:{}", t.len(), t)),
_ => panic!("todo"),
};
debug!("Sending data to upstream: {:?}", m_up);
if let Err(e) = tx_upstream.send(m_up).await {
bail!("Failed to send msg to upstream: {}", e);
}
}
Err(e) => {
bail!("Failed to read from client: {}", e);
}
}
Ok(())
}
async fn process_upstream<S, E>(
msg: Result<Message, E>,
tx_client: &mut S,
) -> Result<()>
where
S: SinkExt<Message> + Unpin,
<S as Sink<Message>>::Error: Display,
E: Display,
{
match msg {
Ok(m) => {
let m_down = match m {
Message::Binary(data) => {
Message::Text(String::from_utf8_lossy(&data).to_string())
}
_ => panic!("todo"),
};
debug!("Sending data to downstream: {:?}", m_down);
if let Err(e) = tx_client.send(m_down).await {
bail!("Failed to msg to client: {}", e);
}
}
Err(e) => {
bail!("Failed to read from upstream: {}", e);
}
}
Ok(())
}
loop {
tokio::select! {
Some(msg) = rx_client.next() => {
if let Err(e) = process_client(msg, &mut tx_upstream).await {
error!("{}", e);
break;
}
},
Some(msg) = rx_upstream.next() => {
if let Err(e) = process_upstream(msg, &mut tx_client).await {
error!("{}", e);
break;
}
}
}
}
Ok(())
})
}))
Err("Not implemented")
}

View File

@ -5,6 +5,7 @@ use lnvps::api;
use lnvps::cors::CORS;
use lnvps::exchange::{DefaultRateCache, ExchangeRateService};
use lnvps::invoice::InvoiceHandler;
use lnvps::lightning::get_node;
use lnvps::settings::Settings;
use lnvps::status::VmStateCache;
use lnvps::worker::{WorkJob, Worker};
@ -17,7 +18,6 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use lnvps::lightning::get_node;
#[derive(Parser)]
#[clap(about, version, author)]
@ -38,13 +38,15 @@ async fn main() -> Result<(), Error> {
.build()?
.try_deserialize()?;
let db = Arc::new(LNVpsDbMysql::new(&settings.db).await?);
// Connect database and migrate
let db = LNVpsDbMysql::new(&settings.db).await?;
db.migrate().await?;
#[cfg(debug_assertions)]
{
let setup_script = include_str!("../../dev_setup.sql");
db.execute(setup_script).await?;
}
let db: Arc<dyn LNVpsDb> = Arc::new(db);
let nostr_client = if let Some(ref c) = settings.nostr {
let cx = Client::builder().signer(Keys::parse(&c.nsec)?).build();
@ -61,8 +63,7 @@ async fn main() -> Result<(), Error> {
let node = get_node(&settings).await?;
let status = VmStateCache::new();
let provisioner =
settings.get_provisioner(db.clone(), node.clone(), exchange.clone());
let provisioner = settings.get_provisioner(db.clone(), node.clone(), exchange.clone());
provisioner.init().await?;
let mut worker = Worker::new(
@ -121,7 +122,7 @@ async fn main() -> Result<(), Error> {
}
Err(e) => error!("Failed to fetch rates: {}", e),
}
tokio::time::sleep(Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(120)).await;
}
});
@ -135,10 +136,11 @@ async fn main() -> Result<(), Error> {
if let Err(e) = rocket::Rocket::custom(config)
.attach(CORS)
.manage(db)
.manage(provisioner)
.manage(status)
.manage(exchange)
.manage(db.clone())
.manage(provisioner.clone())
.manage(status.clone())
.manage(exchange.clone())
.manage(settings.clone())
.manage(sender)
.mount("/", api::routes())
.mount(

40
src/host/libvirt.rs Normal file
View File

@ -0,0 +1,40 @@
use crate::host::{CreateVmRequest, VmHostClient};
use crate::status::VmState;
use lnvps_db::{async_trait, Vm, VmOsImage};
pub struct LibVirt {}
#[async_trait]
impl VmHostClient for LibVirt {
async fn download_os_image(&self, image: &VmOsImage) -> anyhow::Result<()> {
todo!()
}
async fn generate_mac(&self, vm: &Vm) -> anyhow::Result<String> {
todo!()
}
async fn start_vm(&self, vm: &Vm) -> anyhow::Result<()> {
todo!()
}
async fn stop_vm(&self, vm: &Vm) -> anyhow::Result<()> {
todo!()
}
async fn reset_vm(&self, vm: &Vm) -> anyhow::Result<()> {
todo!()
}
async fn create_vm(&self, cfg: &CreateVmRequest) -> anyhow::Result<()> {
todo!()
}
async fn get_vm_state(&self, vm: &Vm) -> anyhow::Result<VmState> {
todo!()
}
async fn configure_vm(&self, vm: &Vm) -> anyhow::Result<()> {
todo!()
}
}

View File

@ -1,28 +1,91 @@
use crate::host::proxmox::ProxmoxClient;
use crate::settings::ProvisionerConfig;
use crate::status::VmState;
use anyhow::{bail, Result};
use lnvps_db::{async_trait, VmHost, VmHostKind};
use lnvps_db::{
async_trait, IpRange, UserSshKey, Vm, VmHost, VmHostDisk, VmHostKind, VmIpAssignment,
VmOsImage, VmTemplate,
};
use std::sync::Arc;
pub mod proxmox;
#[cfg(feature = "libvirt")]
mod libvirt;
#[cfg(feature = "proxmox")]
mod proxmox;
/// Generic type for creating VM's
#[async_trait]
pub trait VmHostClient {
pub trait VmHostClient: Send + Sync {
/// Download OS image to the host
async fn download_os_image(&self, image: &VmOsImage) -> Result<()>;
/// Create a random MAC address for the NIC
async fn generate_mac(&self, vm: &Vm) -> Result<String>;
/// Start a VM
async fn start_vm(&self, vm: &Vm) -> Result<()>;
/// Stop a VM
async fn stop_vm(&self, vm: &Vm) -> Result<()>;
/// Reset VM (Hard)
async fn reset_vm(&self, vm: &Vm) -> Result<()>;
/// Spawn a VM
async fn create_vm(&self, cfg: &CreateVmRequest) -> Result<()>;
/// Get the running status of a VM
async fn get_vm_state(&self, vm: &Vm) -> Result<VmState>;
/// Apply vm configuration (update)
async fn configure_vm(&self, vm: &Vm) -> Result<()>;
}
#[cfg(not(test))]
pub fn get_host_client(host: &VmHost, cfg: &ProvisionerConfig) -> Result<ProxmoxClient> {
Ok(match (host.kind.clone(), &cfg) {
(VmHostKind::Proxmox, ProvisionerConfig::Proxmox { qemu, ssh, .. }) => {
ProxmoxClient::new(host.ip.parse()?, qemu.clone(), ssh.clone())
.with_api_token(&host.api_token)
}
_ => bail!("Unsupported host type"),
})
pub fn get_host_client(host: &VmHost, cfg: &ProvisionerConfig) -> Result<Arc<dyn VmHostClient>> {
#[cfg(test)]
{
Ok(Arc::new(crate::mocks::MockVmHost::default()))
}
#[cfg(not(test))]
{
Ok(match (host.kind.clone(), &cfg) {
#[cfg(feature = "proxmox")]
(
VmHostKind::Proxmox,
ProvisionerConfig::Proxmox {
qemu,
ssh,
mac_prefix,
},
) => Arc::new(
proxmox::ProxmoxClient::new(
host.ip.parse()?,
&host.name,
mac_prefix.clone(),
qemu.clone(),
ssh.clone(),
)
.with_api_token(&host.api_token),
),
_ => bail!("Unknown host config: {}", host.kind),
})
}
}
#[cfg(test)]
pub fn get_host_client(host: &VmHost, cfg: &ProvisionerConfig) -> Result<ProxmoxClient> {
todo!()
}
/// Generic VM create request, host impl decides how VMs are created
/// based on app settings
pub struct CreateVmRequest {
/// Instance to create
pub vm: Vm,
/// Disk where this VM will be saved on the host
pub disk: VmHostDisk,
/// VM template resources
pub template: VmTemplate,
/// The OS image used to create the VM
pub image: VmOsImage,
/// List of IP resources assigned to this VM
pub ips: Vec<VmIpAssignment>,
/// Ranges associated with [ips]
pub ranges: Vec<IpRange>,
/// SSH key to access the VM
pub ssh_key: UserSshKey,
}

View File

@ -1,15 +1,21 @@
use crate::host::{CreateVmRequest, VmHostClient};
use crate::settings::{QemuConfig, SshConfig};
use crate::ssh_client::SshClient;
use anyhow::{anyhow, bail, Result};
use crate::status::{VmRunningState, VmState};
use anyhow::{anyhow, bail, ensure, Result};
use chrono::Utc;
use futures::future::join_all;
use ipnetwork::IpNetwork;
use lnvps_db::{IpRange, LNVpsDb, Vm, VmIpAssignment};
use lnvps_db::{async_trait, DiskType, IpRange, LNVpsDb, Vm, VmIpAssignment, VmOsImage};
use log::{debug, info};
use nostr_sdk::async_utility::futures_util::future::join_all;
use rand::random;
use reqwest::{ClientBuilder, Method, Url};
use serde::de::value::I32Deserializer;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::fmt::{Debug, Display, Formatter};
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@ -24,10 +30,18 @@ pub struct ProxmoxClient {
client: reqwest::Client,
config: QemuConfig,
ssh: Option<SshConfig>,
mac_prefix: String,
node: String,
}
impl ProxmoxClient {
pub fn new(base: Url, config: QemuConfig, ssh: Option<SshConfig>) -> Self {
pub fn new(
base: Url,
node: &str,
mac_prefix: Option<String>,
config: QemuConfig,
ssh: Option<SshConfig>,
) -> Self {
let client = ClientBuilder::new()
.danger_accept_invalid_certs(true)
.build()
@ -39,84 +53,12 @@ impl ProxmoxClient {
client,
config,
ssh,
node: node.to_string(),
mac_prefix: mac_prefix.unwrap_or("bc:24:11".to_string()),
}
}
/// Create [VmConfig] for a given VM and list of IPs
pub async fn make_vm_config(
&self,
db: &Arc<dyn LNVpsDb>,
vm: &Vm,
ips: &Vec<VmIpAssignment>,
) -> Result<VmConfig> {
let ssh_key = db.get_user_ssh_key(vm.ssh_key_id).await?;
let ip_range_ids: HashSet<u64> = ips.iter().map(|i| i.ip_range_id).collect();
let ip_ranges: Vec<_> = ip_range_ids.iter().map(|i| db.get_ip_range(*i)).collect();
let ip_ranges: HashMap<u64, IpRange> = join_all(ip_ranges)
.await
.into_iter()
.filter_map(Result::ok)
.map(|i| (i.id, i))
.collect();
let mut ip_config = ips
.iter()
.map_while(|ip| {
if let Ok(net) = ip.ip.parse::<IpNetwork>() {
Some(match net {
IpNetwork::V4(addr) => {
let range = ip_ranges.get(&ip.ip_range_id)?;
format!("ip={},gw={}", addr, range.gateway)
}
IpNetwork::V6(addr) => format!("ip6={}", addr),
})
} else {
None
}
})
.collect::<Vec<_>>();
ip_config.push("ip6=auto".to_string());
let mut net = vec![
format!("virtio={}", vm.mac_address),
format!("bridge={}", self.config.bridge),
];
if let Some(t) = self.config.vlan {
net.push(format!("tag={}", t));
}
let drives = db.list_host_disks(vm.host_id).await?;
let drive = if let Some(d) = drives.iter().find(|d| d.enabled) {
d
} else {
bail!("No host drive found!")
};
let template = db.get_vm_template(vm.template_id).await?;
Ok(VmConfig {
cpu: Some(self.config.cpu.clone()),
kvm: Some(self.config.kvm),
ip_config: Some(ip_config.join(",")),
machine: Some(self.config.machine.clone()),
net: Some(net.join(",")),
os_type: Some(self.config.os_type.clone()),
on_boot: Some(true),
bios: Some(VmBios::OVMF),
boot: Some("order=scsi0".to_string()),
cores: Some(template.cpu as i32),
memory: Some((template.memory / 1024 / 1024).to_string()),
scsi_hw: Some("virtio-scsi-pci".to_string()),
serial_0: Some("socket".to_string()),
scsi_1: Some(format!("{}:cloudinit", &drive.name)),
ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()),
efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)),
..Default::default()
})
}
pub fn with_api_token(mut self, token: &str) -> Self {
// PVEAPIToken=USER@REALM!TOKENID=UUID
self.token = token.to_string();
self
}
@ -133,7 +75,7 @@ impl ProxmoxClient {
Ok(rsp.data)
}
pub async fn get_vm_status(&self, node: &str, vm_id: i32) -> Result<VmInfo> {
pub async fn get_vm_status(&self, node: &str, vm_id: ProxmoxVmId) -> Result<VmInfo> {
let rsp: ResponseBase<VmInfo> = self
.get(&format!(
"/api2/json/nodes/{node}/qemu/{vm_id}/status/current"
@ -203,7 +145,7 @@ impl ProxmoxClient {
/// Delete VM
///
/// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/qemu
pub async fn delete_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
pub async fn delete_vm(&self, node: &str, vm: ProxmoxVmId) -> Result<TaskId> {
let rsp: ResponseBase<Option<String>> = self
.req(
Method::DELETE,
@ -252,6 +194,18 @@ impl ProxmoxClient {
}
}
async fn get_iso_storage(&self, node: &str) -> Result<String> {
let storages = self.list_storage(node).await?;
if let Some(s) = storages
.iter()
.find(|s| s.contents().contains(&StorageContent::ISO))
{
Ok(s.storage.clone())
} else {
bail!("No image storage found");
}
}
/// Download an image to the host disk
pub async fn download_image(&self, req: DownloadUrlRequest) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
@ -333,7 +287,7 @@ impl ProxmoxClient {
}
/// Start a VM
pub async fn start_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
pub async fn start_vm(&self, node: &str, vm: ProxmoxVmId) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/status/start", node, vm),
@ -347,7 +301,7 @@ impl ProxmoxClient {
}
/// Stop a VM
pub async fn stop_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
pub async fn stop_vm(&self, node: &str, vm: ProxmoxVmId) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/status/stop", node, vm),
@ -361,7 +315,7 @@ impl ProxmoxClient {
}
/// Stop a VM
pub async fn shutdown_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
pub async fn shutdown_vm(&self, node: &str, vm: ProxmoxVmId) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/status/shutdown", node, vm),
@ -375,7 +329,7 @@ impl ProxmoxClient {
}
/// Stop a VM
pub async fn reset_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
pub async fn reset_vm(&self, node: &str, vm: ProxmoxVmId) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/status/reset", node, vm),
@ -389,7 +343,7 @@ impl ProxmoxClient {
}
/// Create terminal proxy session
pub async fn terminal_proxy(&self, node: &str, vm: u64) -> Result<TerminalProxyTicket> {
pub async fn terminal_proxy(&self, node: &str, vm: ProxmoxVmId) -> Result<TerminalProxyTicket> {
let rsp: ResponseBase<TerminalProxyTicket> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/termproxy", node, vm),
@ -403,7 +357,7 @@ impl ProxmoxClient {
pub async fn open_terminal_proxy(
&self,
node: &str,
vm: u64,
vm: ProxmoxVmId,
req: TerminalProxyTicket,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
self.get_task_status(&TaskId {
@ -500,6 +454,241 @@ impl ProxmoxClient {
}
}
impl ProxmoxClient {
fn make_config(&self, value: &CreateVmRequest) -> Result<VmConfig> {
let mut ip_config = value
.ips
.iter()
.map_while(|ip| {
if let Ok(net) = ip.ip.parse::<IpAddr>() {
Some(match net {
IpAddr::V4(addr) => {
let range = value.ranges.iter().find(|r| r.id == ip.ip_range_id)?;
let range: IpNetwork = range.gateway.parse().ok()?;
format!(
"ip={},gw={}",
IpNetwork::new(addr.into(), range.prefix()).ok()?,
range.ip()
)
}
IpAddr::V6(addr) => format!("ip6={}", addr),
})
} else {
None
}
})
.collect::<Vec<_>>();
// TODO: make this configurable
ip_config.push("ip6=auto".to_string());
let mut net = vec![
format!("virtio={}", value.vm.mac_address),
format!("bridge={}", self.config.bridge),
];
if let Some(t) = self.config.vlan {
net.push(format!("tag={}", t));
}
Ok(VmConfig {
cpu: Some(self.config.cpu.clone()),
kvm: Some(self.config.kvm),
ip_config: Some(ip_config.join(",")),
machine: Some(self.config.machine.clone()),
net: Some(net.join(",")),
os_type: Some(self.config.os_type.clone()),
on_boot: Some(true),
bios: Some(VmBios::OVMF),
boot: Some("order=scsi0".to_string()),
cores: Some(value.template.cpu as i32),
memory: Some((value.template.memory / 1024 / 1024).to_string()),
scsi_hw: Some("virtio-scsi-pci".to_string()),
serial_0: Some("socket".to_string()),
scsi_1: Some(format!("{}:cloudinit", &value.disk.name)),
ssh_keys: Some(urlencoding::encode(&value.ssh_key.key_data).to_string()),
efi_disk_0: Some(format!("{}:0,efitype=4m", &value.disk.name)),
..Default::default()
})
}
}
#[async_trait]
impl VmHostClient for ProxmoxClient {
async fn download_os_image(&self, image: &VmOsImage) -> Result<()> {
let iso_storage = self.get_iso_storage(&self.node).await?;
let files = self.list_storage_files(&self.node, &iso_storage).await?;
info!("Downloading image {} on {}", image.url, &self.node);
let i_name = image.filename()?;
if files
.iter()
.any(|v| v.vol_id.ends_with(&format!("iso/{i_name}")))
{
info!("Already downloaded, skipping");
return Ok(());
}
let t_download = self
.download_image(DownloadUrlRequest {
content: StorageContent::ISO,
node: self.node.clone(),
storage: iso_storage.clone(),
url: image.url.clone(),
filename: i_name,
})
.await?;
self.wait_for_task(&t_download).await?;
Ok(())
}
async fn generate_mac(&self, _vm: &Vm) -> Result<String> {
ensure!(self.mac_prefix.len() == 8, "Invalid mac prefix");
ensure!(self.mac_prefix.contains(":"), "Invalid mac prefix");
Ok(format!(
"{}:{}:{}:{}",
self.mac_prefix,
hex::encode([random::<u8>()]),
hex::encode([random::<u8>()]),
hex::encode([random::<u8>()])
))
}
async fn start_vm(&self, vm: &Vm) -> Result<()> {
let task = self.start_vm(&self.node, vm.id.into()).await?;
self.wait_for_task(&task).await?;
Ok(())
}
async fn stop_vm(&self, vm: &Vm) -> Result<()> {
let task = self.stop_vm(&self.node, vm.id.into()).await?;
self.wait_for_task(&task).await?;
Ok(())
}
async fn reset_vm(&self, vm: &Vm) -> Result<()> {
let task = self.reset_vm(&self.node, vm.id.into()).await?;
self.wait_for_task(&task).await?;
Ok(())
}
async fn create_vm(&self, req: &CreateVmRequest) -> Result<()> {
let config = self.make_config(&req)?;
let vm_id = req.vm.id.into();
let t_create = self
.create_vm(CreateVm {
node: self.node.clone(),
vm_id,
config,
})
.await?;
self.wait_for_task(&t_create).await?;
// import primary disk from image (scsi0)
if let Err(e) = self
.import_disk_image(ImportDiskImageRequest {
vm_id,
node: self.node.clone(),
storage: req.disk.name.clone(),
disk: "scsi0".to_string(),
image: req.image.filename()?,
is_ssd: matches!(req.disk.kind, DiskType::SSD),
})
.await
{
// TODO: rollback
return Err(e);
}
// resize disk to match template
let j_resize = self
.resize_disk(ResizeDiskRequest {
node: self.node.clone(),
vm_id,
disk: "scsi0".to_string(),
size: req.template.disk_size.to_string(),
})
.await?;
self.wait_for_task(&j_resize).await?;
// try start, otherwise ignore error (maybe its already running)
if let Ok(j_start) = self.start_vm(&self.node, vm_id).await {
self.wait_for_task(&j_start).await?;
}
Ok(())
}
async fn get_vm_state(&self, vm: &Vm) -> Result<VmState> {
let s = self.get_vm_status(&self.node, vm.id.into()).await?;
Ok(VmState {
timestamp: Utc::now().timestamp() as u64,
state: match s.status {
VmStatus::Stopped => VmRunningState::Stopped,
VmStatus::Running => VmRunningState::Running,
},
cpu_usage: s.cpu.unwrap_or(0.0),
mem_usage: s.mem.unwrap_or(0) as f32 / s.max_mem.unwrap_or(1) as f32,
uptime: s.uptime.unwrap_or(0),
net_in: s.net_in.unwrap_or(0),
net_out: s.net_out.unwrap_or(0),
disk_write: s.disk_write.unwrap_or(0),
disk_read: s.disk_read.unwrap_or(0),
})
}
async fn configure_vm(&self, vm: &Vm) -> Result<()> {
todo!()
}
}
/// Wrap a database vm id
#[derive(Debug, Copy, Clone, Default)]
pub struct ProxmoxVmId(u64);
impl Into<i32> for ProxmoxVmId {
fn into(self) -> i32 {
self.0 as i32 + 100
}
}
impl From<u64> for ProxmoxVmId {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<i32> for ProxmoxVmId {
fn from(value: i32) -> Self {
Self(value as u64 - 100)
}
}
impl Display for ProxmoxVmId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let id: i32 = (*self).into();
write!(f, "{}", id)
}
}
impl Serialize for ProxmoxVmId {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let id: i32 = (*self).into();
serializer.serialize_i32(id)
}
}
impl<'de> Deserialize<'de> for ProxmoxVmId {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let id = i32::deserialize(deserializer)?;
Ok(id.into())
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct TerminalProxyTicket {
pub port: String,
@ -694,7 +883,7 @@ pub struct StorageContentEntry {
pub struct ResizeDiskRequest {
pub node: String,
#[serde(rename = "vmid")]
pub vm_id: i32,
pub vm_id: ProxmoxVmId,
pub disk: String,
/// The new size.
///
@ -706,7 +895,7 @@ pub struct ResizeDiskRequest {
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct ImportDiskImageRequest {
/// VM id
pub vm_id: i32,
pub vm_id: ProxmoxVmId,
/// Node name
pub node: String,
/// Storage pool to import disk to
@ -730,7 +919,7 @@ pub enum VmBios {
pub struct CreateVm {
pub node: String,
#[serde(rename = "vmid")]
pub vm_id: i32,
pub vm_id: ProxmoxVmId,
#[serde(flatten)]
pub config: VmConfig,
}
@ -739,7 +928,7 @@ pub struct CreateVm {
pub struct ConfigureVm {
pub node: String,
#[serde(rename = "vmid")]
pub vm_id: i32,
pub vm_id: ProxmoxVmId,
#[serde(skip_serializing_if = "Option::is_none")]
pub current: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]

View File

@ -3,14 +3,15 @@ pub mod cors;
pub mod exchange;
pub mod host;
pub mod invoice;
pub mod lightning;
pub mod nip98;
pub mod provisioner;
pub mod router;
pub mod settings;
#[cfg(feature = "proxmox")]
pub mod ssh_client;
pub mod status;
pub mod worker;
pub mod lightning;
#[cfg(test)]
pub mod mocks;

View File

@ -1,6 +1,8 @@
use crate::host::{CreateVmRequest, VmHostClient};
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
use crate::router::{ArpEntry, Router};
use crate::settings::NetworkPolicy;
use crate::status::{VmRunningState, VmState};
use anyhow::{anyhow, bail};
use chrono::{DateTime, Utc};
use fedimint_tonic_lnd::tonic::codegen::tokio_stream::Stream;
@ -12,14 +14,16 @@ use lnvps_db::{
use std::collections::HashMap;
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub struct MockDb {
pub regions: Arc<Mutex<HashMap<u64, VmHostRegion>>>,
pub hosts: Arc<Mutex<HashMap<u64, VmHost>>>,
pub host_disks: Arc<Mutex<HashMap<u64, VmHostDisk>>>,
pub users: Arc<Mutex<HashMap<u64, User>>>,
pub user_ssh_keys: Arc<Mutex<HashMap<u64, UserSshKey>>>,
pub cost_plans: Arc<Mutex<HashMap<u64, VmCostPlan>>>,
pub os_images: Arc<Mutex<HashMap<u64, VmOsImage>>>,
pub templates: Arc<Mutex<HashMap<u64, VmTemplate>>>,
@ -38,6 +42,9 @@ impl MockDb {
impl Default for MockDb {
fn default() -> Self {
const GB: u64 = 1024 * 1024 * 1024;
const TB: u64 = GB * 1024;
let mut regions = HashMap::new();
regions.insert(
1,
@ -52,8 +59,8 @@ impl Default for MockDb {
1,
IpRange {
id: 1,
cidr: "10.0.0.0/8".to_string(),
gateway: "10.0.0.1".to_string(),
cidr: "10.0.0.0/24".to_string(),
gateway: "10.0.0.1/8".to_string(),
enabled: true,
region_id: 1,
},
@ -73,6 +80,19 @@ impl Default for MockDb {
api_token: "".to_string(),
},
);
let mut host_disks = HashMap::new();
host_disks.insert(
1,
VmHostDisk {
id: 1,
host_id: 1,
name: "mock-disk".to_string(),
size: TB * 10,
kind: DiskType::SSD,
interface: DiskInterface::PCIe,
enabled: true,
},
);
let mut cost_plans = HashMap::new();
cost_plans.insert(
1,
@ -96,8 +116,8 @@ impl Default for MockDb {
created: Utc::now(),
expires: None,
cpu: 2,
memory: 1024 * 1024 * 1024 * 2,
disk_size: 1024 * 1024 * 1024 * 64,
memory: GB * 2,
disk_size: GB * 64,
disk_type: DiskType::SSD,
disk_interface: DiskInterface::PCIe,
cost_plan_id: 1,
@ -121,12 +141,14 @@ impl Default for MockDb {
regions: Arc::new(Mutex::new(regions)),
ip_range: Arc::new(Mutex::new(ip_ranges)),
hosts: Arc::new(Mutex::new(hosts)),
host_disks: Arc::new(Mutex::new(host_disks)),
cost_plans: Arc::new(Mutex::new(cost_plans)),
templates: Arc::new(Mutex::new(templates)),
os_images: Arc::new(Mutex::new(os_images)),
users: Arc::new(Default::default()),
vms: Arc::new(Default::default()),
ip_assignments: Arc::new(Default::default()),
user_ssh_keys: Arc::new(Mutex::new(Default::default())),
}
}
}
@ -182,19 +204,39 @@ impl LNVpsDb for MockDb {
}
async fn insert_user_ssh_key(&self, new_key: &UserSshKey) -> anyhow::Result<u64> {
todo!()
let mut ssh_keys = self.user_ssh_keys.lock().await;
let max_keys = *ssh_keys.keys().max().unwrap_or(&0);
ssh_keys.insert(
max_keys + 1,
UserSshKey {
id: max_keys + 1,
name: new_key.name.clone(),
user_id: new_key.user_id,
created: Utc::now(),
key_data: new_key.key_data.clone(),
},
);
Ok(max_keys + 1)
}
async fn get_user_ssh_key(&self, id: u64) -> anyhow::Result<UserSshKey> {
todo!()
let keys = self.user_ssh_keys.lock().await;
Ok(keys.get(&id).ok_or(anyhow!("no key"))?.clone())
}
async fn delete_user_ssh_key(&self, id: u64) -> anyhow::Result<()> {
todo!()
let mut keys = self.user_ssh_keys.lock().await;
keys.remove(&id);
Ok(())
}
async fn list_user_ssh_key(&self, user_id: u64) -> anyhow::Result<Vec<UserSshKey>> {
todo!()
let keys = self.user_ssh_keys.lock().await;
Ok(keys
.values()
.filter(|u| u.user_id == user_id)
.cloned()
.collect())
}
async fn get_host_region(&self, id: u64) -> anyhow::Result<VmHostRegion> {
@ -223,7 +265,13 @@ impl LNVpsDb for MockDb {
}
async fn list_host_disks(&self, host_id: u64) -> anyhow::Result<Vec<VmHostDisk>> {
todo!()
let disks = self.host_disks.lock().await;
Ok(disks.values().filter(|d| d.enabled).cloned().collect())
}
async fn get_host_disk(&self, disk_id: u64) -> anyhow::Result<VmHostDisk> {
let disks = self.host_disks.lock().await;
Ok(disks.get(&disk_id).ok_or(anyhow!("no disk"))?.clone())
}
async fn get_os_image(&self, id: u64) -> anyhow::Result<VmOsImage> {
@ -305,7 +353,31 @@ impl LNVpsDb for MockDb {
async fn insert_vm(&self, vm: &Vm) -> anyhow::Result<u64> {
let mut vms = self.vms.lock().await;
let max_id = *vms.keys().max().unwrap_or(&0);
vms.insert(max_id + 1, vm.clone());
// lazy test FK
self.get_host(vm.host_id).await?;
self.get_user(vm.user_id).await?;
self.get_os_image(vm.image_id).await?;
self.get_vm_template(vm.template_id).await?;
self.get_user_ssh_key(vm.ssh_key_id).await?;
self.get_host_disk(vm.disk_id).await?;
vms.insert(
max_id + 1,
Vm {
id: max_id + 1,
host_id: vm.host_id,
user_id: vm.user_id,
image_id: vm.image_id,
template_id: vm.template_id,
ssh_key_id: vm.ssh_key_id,
created: Utc::now(),
expires: Utc::now(),
disk_id: vm.disk_id,
mac_address: vm.mac_address.clone(),
deleted: false,
},
);
Ok(max_id + 1)
}
@ -398,9 +470,20 @@ impl LNVpsDb for MockDb {
#[derive(Debug, Clone)]
pub struct MockRouter {
pub policy: NetworkPolicy,
pub arp: Arc<Mutex<HashMap<String, ArpEntry>>>,
arp: Arc<Mutex<HashMap<u64, ArpEntry>>>,
}
impl MockRouter {
pub fn new(policy: NetworkPolicy) -> Self {
static ARP: LazyLock<Arc<Mutex<HashMap<u64, ArpEntry>>>> =
LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
Self {
policy,
arp: ARP.clone(),
}
}
}
#[async_trait]
impl Router for MockRouter {
async fn list_arp_entry(&self) -> anyhow::Result<Vec<ArpEntry>> {
@ -419,12 +502,13 @@ impl Router for MockRouter {
if arp.iter().any(|(k, v)| v.address == ip.to_string()) {
bail!("Address is already in use");
}
let max_id = *arp.keys().max().unwrap_or(&0);
arp.insert(
mac.to_string(),
max_id + 1,
ArpEntry {
id: Some(mac.to_string()),
id: Some((max_id + 1).to_string()),
address: ip.to_string(),
mac_address: None,
mac_address: Some(mac.to_string()),
interface: interface.to_string(),
comment: comment.map(|s| s.to_string()),
},
@ -434,7 +518,7 @@ impl Router for MockRouter {
async fn remove_arp_entry(&self, id: &str) -> anyhow::Result<()> {
let mut arp = self.arp.lock().await;
arp.remove(id);
arp.remove(&id.parse::<u64>()?);
Ok(())
}
}
@ -464,3 +548,88 @@ impl LightningNode for MockNode {
todo!()
}
}
#[derive(Debug, Clone, Default)]
pub struct MockVmHost {
vms: Arc<Mutex<HashMap<u64, MockVm>>>,
}
#[derive(Debug, Clone)]
struct MockVm {
pub state: VmRunningState,
}
#[async_trait]
impl VmHostClient for MockVmHost {
async fn download_os_image(&self, image: &VmOsImage) -> anyhow::Result<()> {
Ok(())
}
async fn generate_mac(&self, vm: &Vm) -> anyhow::Result<String> {
Ok(format!(
"ff:ff:ff:{}:{}:{}",
hex::encode([rand::random::<u8>()]),
hex::encode([rand::random::<u8>()]),
hex::encode([rand::random::<u8>()]),
))
}
async fn start_vm(&self, vm: &Vm) -> anyhow::Result<()> {
let mut vms = self.vms.lock().await;
if let Some(mut vm) = vms.get_mut(&vm.id) {
vm.state = VmRunningState::Running;
}
Ok(())
}
async fn stop_vm(&self, vm: &Vm) -> anyhow::Result<()> {
let mut vms = self.vms.lock().await;
if let Some(mut vm) = vms.get_mut(&vm.id) {
vm.state = VmRunningState::Stopped;
}
Ok(())
}
async fn reset_vm(&self, vm: &Vm) -> anyhow::Result<()> {
let mut vms = self.vms.lock().await;
if let Some(mut vm) = vms.get_mut(&vm.id) {
vm.state = VmRunningState::Running;
}
Ok(())
}
async fn create_vm(&self, cfg: &CreateVmRequest) -> anyhow::Result<()> {
let mut vms = self.vms.lock().await;
let max_id = *vms.keys().max().unwrap_or(&0);
vms.insert(
max_id + 1,
MockVm {
state: VmRunningState::Stopped,
},
);
Ok(())
}
async fn get_vm_state(&self, vm: &Vm) -> anyhow::Result<VmState> {
let vms = self.vms.lock().await;
if let Some(vm) = vms.get(&vm.id) {
Ok(VmState {
timestamp: Utc::now().timestamp() as u64,
state: vm.state.clone(),
cpu_usage: 69.0,
mem_usage: 69.0,
uptime: 100,
net_in: 69,
net_out: 69,
disk_write: 69,
disk_read: 69,
})
} else {
bail!("No vm with id {}", vm.id)
}
}
async fn configure_vm(&self, vm: &Vm) -> anyhow::Result<()> {
Ok(())
}
}

View File

@ -1,29 +1,25 @@
use crate::exchange::{ExchangeRateService, Ticker};
use crate::host::get_host_client;
use crate::host::proxmox::{
ConfigureVm, CreateVm, DownloadUrlRequest, ImportDiskImageRequest, ProxmoxClient,
ResizeDiskRequest, StorageContent, VmConfig,
};
use crate::host::{get_host_client, CreateVmRequest, VmHostClient};
use crate::lightning::{AddInvoiceRequest, LightningNode};
use crate::provisioner::{NetworkProvisioner, Provisioner, ProvisionerMethod};
use crate::provisioner::{NetworkProvisioner, ProvisionerMethod};
use crate::router::Router;
use crate::settings::{NetworkAccessPolicy, NetworkPolicy, ProvisionerConfig, Settings};
use anyhow::{bail, Result};
use chrono::{Days, Months, Utc};
use fedimint_tonic_lnd::tonic::async_trait;
use lnvps_db::{DiskType, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment};
use futures::future::join_all;
use lnvps_db::{DiskType, IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment};
use log::{debug, info, warn};
use nostr::util::hex;
use rand::random;
use rocket::futures::{SinkExt, StreamExt};
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use std::ops::Add;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
/// Main provisioner class for LNVPS
///
@ -58,16 +54,24 @@ impl LNVpsProvisioner {
}
}
async fn get_iso_storage(node: &str, client: &ProxmoxClient) -> Result<String> {
let storages = client.list_storage(node).await?;
if let Some(s) = storages
.iter()
.find(|s| s.contents().contains(&StorageContent::ISO))
{
Ok(s.storage.clone())
} else {
bail!("No image storage found");
async fn delete_ip_assignment(&self, vm: &Vm) -> Result<()> {
if let NetworkAccessPolicy::StaticArp { .. } = &self.network_policy.access {
if let Some(r) = self.router.as_ref() {
let ent = r.list_arp_entry().await?;
if let Some(ent) = ent.iter().find(|e| {
e.mac_address
.as_ref()
.map(|m| m.eq_ignore_ascii_case(&vm.mac_address))
.unwrap_or(false)
}) {
r.remove_arp_entry(ent.id.as_ref().unwrap().as_str())
.await?;
} else {
warn!("ARP entry not found, skipping")
}
}
}
Ok(())
}
async fn save_ip_assignment(&self, vm: &Vm, assignment: &VmIpAssignment) -> Result<()> {
@ -90,44 +94,55 @@ impl LNVpsProvisioner {
self.db.insert_vm_ip_assignment(assignment).await?;
Ok(())
}
}
#[async_trait]
impl Provisioner for LNVpsProvisioner {
async fn init(&self) -> Result<()> {
// tell hosts to download images
async fn allocate_ips(&self, vm_id: u64) -> Result<Vec<VmIpAssignment>> {
let vm = self.db.get_vm(vm_id).await?;
let existing_ips = self.db.list_vm_ip_assignments(vm_id).await?;
if !existing_ips.is_empty() {
return Ok(existing_ips);
}
// Use random network provisioner
let network = NetworkProvisioner::new(ProvisionerMethod::Random, self.db.clone());
let template = self.db.get_vm_template(vm.template_id).await?;
let ip = network.pick_ip_for_region(template.region_id).await?;
let assignment = VmIpAssignment {
id: 0,
vm_id,
ip_range_id: ip.range_id,
ip: ip.ip.to_string(),
deleted: false,
};
self.save_ip_assignment(&vm, &assignment).await?;
Ok(vec![assignment])
}
/// Do any necessary initialization
pub async fn init(&self) -> Result<()> {
let hosts = self.db.list_hosts().await?;
let images = self.db.list_os_image().await?;
for host in hosts {
let client = get_host_client(&host, &self.provisioner_config)?;
let iso_storage = Self::get_iso_storage(&host.name, &client).await?;
let files = client.list_storage_files(&host.name, &iso_storage).await?;
for image in self.db.list_os_image().await? {
info!("Downloading image {} on {}", image.url, host.name);
let i_name = image.filename()?;
if files
.iter()
.any(|v| v.vol_id.ends_with(&format!("iso/{i_name}")))
{
info!("Already downloaded, skipping");
continue;
for image in &images {
if let Err(e) = client.download_os_image(image).await {
warn!(
"Error downloading image {} on {}: {}",
image.url, host.name, e
);
}
let t_download = client
.download_image(DownloadUrlRequest {
content: StorageContent::ISO,
node: host.name.clone(),
storage: iso_storage.clone(),
url: image.url.clone(),
filename: i_name,
})
.await?;
client.wait_for_task(&t_download).await?;
}
}
Ok(())
}
async fn provision(
/// Provision a new VM for a user on the database
///
/// Note:
/// 1. Does not create a VM on the host machine
/// 2. Does not assign any IP resources
pub async fn provision(
&self,
user_id: u64,
template_id: u64,
@ -146,6 +161,7 @@ impl Provisioner for LNVpsProvisioner {
} else {
bail!("No host found")
};
// TODO: impl resource usage based provisioning (disk)
let host_disks = self.db.list_host_disks(pick_host.id).await?;
let pick_disk = if let Some(hd) = host_disks.first() {
hd
@ -153,6 +169,7 @@ impl Provisioner for LNVpsProvisioner {
bail!("No host disk found")
};
let client = get_host_client(&pick_host, &self.provisioner_config)?;
let mut new_vm = Vm {
host_id: pick_host.id,
user_id: user.id,
@ -162,21 +179,19 @@ impl Provisioner for LNVpsProvisioner {
created: Utc::now(),
expires: Utc::now(),
disk_id: pick_disk.id,
mac_address: format!(
"bc:24:11:{}:{}:{}",
hex::encode([random::<u8>()]),
hex::encode([random::<u8>()]),
hex::encode([random::<u8>()])
),
..Default::default()
};
// ask host client to generate the mac address
new_vm.mac_address = client.generate_mac(&new_vm).await?;
let new_id = self.db.insert_vm(&new_vm).await?;
new_vm.id = new_id;
Ok(new_vm)
}
async fn renew(&self, vm_id: u64) -> Result<VmPayment> {
/// Create a renewal payment
pub async fn renew(&self, vm_id: u64) -> Result<VmPayment> {
let vm = self.db.get_vm(vm_id).await?;
let template = self.db.get_vm_template(vm.template_id).await?;
let cost_plan = self.db.get_cost_plan(template.cost_plan_id).await?;
@ -239,212 +254,70 @@ impl Provisioner for LNVpsProvisioner {
Ok(vm_payment)
}
async fn allocate_ips(&self, vm_id: u64) -> Result<Vec<VmIpAssignment>> {
let vm = self.db.get_vm(vm_id).await?;
let existing_ips = self.db.list_vm_ip_assignments(vm_id).await?;
if !existing_ips.is_empty() {
return Ok(existing_ips);
}
// Use random network provisioner
let prov = NetworkProvisioner::new(ProvisionerMethod::Random, self.db.clone());
let template = self.db.get_vm_template(vm.template_id).await?;
let ip = prov.pick_ip_for_region(template.region_id).await?;
let assignment = VmIpAssignment {
id: 0,
vm_id,
ip_range_id: ip.range_id,
ip: ip.ip.to_string(),
deleted: false,
};
self.save_ip_assignment(&vm, &assignment).await?;
Ok(vec![assignment])
}
/// Create a vm on the host as configured by the template
async fn spawn_vm(&self, vm_id: u64) -> Result<()> {
pub async fn spawn_vm(&self, vm_id: u64) -> Result<()> {
if self.read_only {
bail!("Cant spawn VM's in read-only mode")
}
let vm = self.db.get_vm(vm_id).await?;
let template = self.db.get_vm_template(vm.template_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &self.provisioner_config)?;
let image = self.db.get_os_image(vm.image_id).await?;
let disk = self.db.get_host_disk(vm.disk_id).await?;
let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?;
// TODO: remove +100 nonsense (proxmox specific)
let vm_id = 100 + vm.id as i32;
let client = get_host_client(&host, &self.provisioner_config)?;
// setup network by allocating some IP space
let ips = self.allocate_ips(vm.id).await?;
let ip_range_ids: HashSet<u64> = ips.iter().map(|i| i.ip_range_id).collect();
let ip_ranges: Vec<_> = ip_range_ids
.iter()
.map(|i| self.db.get_ip_range(*i))
.collect();
let ranges: Vec<IpRange> = join_all(ip_ranges)
.await
.into_iter()
.filter_map(Result::ok)
.collect();
// create VM
let config = client.make_vm_config(&self.db, &vm, &ips).await?;
let t_create = client
.create_vm(CreateVm {
node: host.name.clone(),
vm_id,
config,
})
.await?;
client.wait_for_task(&t_create).await?;
// TODO: pick disk based on available space
// TODO: build external module to manage picking disks
// pick disk
let drives = self.db.list_host_disks(vm.host_id).await?;
let drive = if let Some(d) = drives.iter().find(|d| d.enabled) {
d
} else {
bail!("No host drive found!")
let req = CreateVmRequest {
vm,
template,
image,
ips,
disk,
ranges,
ssh_key,
};
// TODO: remove scsi0 terms (proxmox specific)
// import primary disk from image (scsi0)?
client
.import_disk_image(ImportDiskImageRequest {
vm_id,
node: host.name.clone(),
storage: drive.name.clone(),
disk: "scsi0".to_string(),
image: image.filename()?,
is_ssd: matches!(drive.kind, DiskType::SSD),
})
.await?;
// TODO: remove scsi0 terms (proxmox specific)
// resize disk to match template
let j_resize = client
.resize_disk(ResizeDiskRequest {
node: host.name.clone(),
vm_id,
disk: "scsi0".to_string(),
size: template.disk_size.to_string(),
})
.await?;
client.wait_for_task(&j_resize).await?;
// try start, otherwise ignore error (maybe its already running)
if let Ok(j_start) = client.start_vm(&host.name, vm_id as u64).await {
client.wait_for_task(&j_start).await?;
}
client.create_vm(&req).await?;
Ok(())
}
async fn start_vm(&self, vm_id: u64) -> Result<()> {
/// Delete a VM and its associated resources
pub async fn delete_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &self.provisioner_config)?;
let j_start = client.start_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_start).await?;
Ok(())
}
async fn stop_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &self.provisioner_config)?;
let j_start = client.shutdown_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_start).await?;
Ok(())
}
async fn restart_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &self.provisioner_config)?;
let j_start = client.reset_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_start).await?;
Ok(())
}
async fn delete_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
//let host = self.db.get_host(vm.host_id).await?;
// TODO: delete not implemented, stop only
//let client = get_host_client(&host)?;
//let j_start = client.delete_vm(&host.name, vm.id + 100).await?;
//let j_stop = client.stop_vm(&host.name, vm.id + 100).await?;
//client.wait_for_task(&j_stop).await?;
if let Some(r) = self.router.as_ref() {
let ent = r.list_arp_entry().await?;
if let Some(ent) = ent.iter().find(|e| {
e.mac_address
.as_ref()
.map(|m| m.eq_ignore_ascii_case(&vm.mac_address))
.unwrap_or(false)
}) {
r.remove_arp_entry(ent.id.as_ref().unwrap().as_str())
.await?;
} else {
warn!("ARP entry not found, skipping")
}
}
// host client currently doesn't support delete (proxmox)
// VM should already be stopped by [Worker]
self.delete_ip_assignment(&vm).await?;
self.db.delete_vm_ip_assignment(vm.id).await?;
self.db.delete_vm(vm.id).await?;
Ok(())
}
async fn terminal_proxy(
&self,
vm_id: u64,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
/// Stop a running VM
pub async fn stop_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &self.provisioner_config)?;
let host_vm_id = vm.id + 100;
let term = client.terminal_proxy(&host.name, host_vm_id).await?;
let login_msg = format!("{}:{}\n", term.user, term.ticket);
let mut ws = client
.open_terminal_proxy(&host.name, host_vm_id, term)
.await?;
debug!("Sending login msg: {}", login_msg);
ws.send(Message::Text(login_msg)).await?;
if let Some(n) = ws.next().await {
debug!("{:?}", n);
} else {
bail!("No response from terminal_proxy");
}
ws.send(Message::Text("1:86:24:".to_string())).await?;
Ok(ws)
}
async fn patch_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let ips = self.db.list_vm_ip_assignments(vm.id).await?;
let client = get_host_client(&host, &self.provisioner_config)?;
let host_vm_id = vm.id + 100;
let t = client
.configure_vm(ConfigureVm {
node: host.name.clone(),
vm_id: host_vm_id as i32,
current: None,
snapshot: None,
config: VmConfig {
scsi_0: None,
scsi_1: None,
efi_disk_0: None,
..client.make_vm_config(&self.db, &vm, &ips).await?
},
})
.await?;
client.wait_for_task(&t).await?;
client.stop_vm(&vm).await?;
Ok(())
}
}
@ -457,10 +330,12 @@ mod tests {
use crate::settings::{
ApiConfig, Credentials, LndConfig, ProvisionerConfig, QemuConfig, RouterConfig,
};
use lnvps_db::UserSshKey;
#[ignore]
#[tokio::test]
async fn test_basic_provisioner() -> Result<()> {
const ROUTER_BRIDGE: &str = "bridge1";
let settings = Settings {
listen: None,
db: "".to_string(),
@ -473,18 +348,20 @@ mod tests {
provisioner: ProvisionerConfig::Proxmox {
qemu: QemuConfig {
machine: "q35".to_string(),
os_type: "linux26".to_string(),
os_type: "l26".to_string(),
bridge: "vmbr1".to_string(),
cpu: "kvm64".to_string(),
vlan: None,
kvm: false,
},
ssh: None,
mac_prefix: Some("ff:ff:ff".to_string()),
},
network_policy: NetworkPolicy {
access: NetworkAccessPolicy::StaticArp {
interface: "bridge1".to_string(),
interface: ROUTER_BRIDGE.to_string(),
},
ip6_slaac: None,
},
delete_after: 0,
smtp: None,
@ -502,24 +379,45 @@ mod tests {
let db = Arc::new(MockDb::default());
let node = Arc::new(MockNode::default());
let rates = Arc::new(DefaultRateCache::default());
let router = settings.get_router().expect("router").unwrap();
let provisioner = LNVpsProvisioner::new(settings, db.clone(), node.clone(), rates.clone());
let vm = db
.insert_vm(&Vm {
id: 1,
host_id: 1,
user_id: 1,
image_id: 1,
template_id: 1,
ssh_key_id: 1,
created: Utc::now(),
expires: Utc::now() + Duration::from_secs(30),
disk_id: 1,
mac_address: "00:00:00:00:00:00".to_string(),
deleted: false,
})
.await?;
provisioner.spawn_vm(1).await?;
let pubkey: [u8; 32] = random();
let user_id = db.upsert_user(&pubkey).await?;
let new_key = UserSshKey {
id: 0,
name: "test-key".to_string(),
user_id,
created: Default::default(),
key_data: "ssh-rsa AAA==".to_string(),
};
let ssh_key = db.insert_user_ssh_key(&new_key).await?;
let vm = provisioner.provision(user_id, 1, 1, ssh_key).await?;
println!("{:?}", vm);
provisioner.spawn_vm(vm.id).await?;
// check resources
let arp = router.list_arp_entry().await?;
assert_eq!(1, arp.len());
let arp = arp.first().unwrap();
assert_eq!(&vm.mac_address, arp.mac_address.as_ref().unwrap());
assert_eq!(ROUTER_BRIDGE, &arp.interface);
println!("{:?}", arp);
let ips = db.list_vm_ip_assignments(vm.id).await?;
assert_eq!(1, ips.len());
let ip = ips.first().unwrap();
assert_eq!(ip.ip, arp.address);
assert_eq!(ip.ip_range_id, 1);
assert_eq!(ip.vm_id, vm.id);
// assert IP address is not CIDR
assert!(IpAddr::from_str(&ip.ip).is_ok());
assert!(!ip.ip.ends_with("/8"));
assert!(!ip.ip.ends_with("/24"));
Ok(())
}
}

View File

@ -1,60 +1,5 @@
use anyhow::Result;
use lnvps_db::{Vm, VmIpAssignment, VmPayment};
use rocket::async_trait;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
mod lnvps;
mod network;
pub use lnvps::*;
pub use network::*;
#[async_trait]
pub trait Provisioner: Send + Sync {
/// Do any necessary initialization
async fn init(&self) -> Result<()>;
/// Provision a new VM for a user on the database
///
/// Note:
/// 1. Does not create a VM on the host machine
/// 2. Does not assign any IP resources
async fn provision(
&self,
user_id: u64,
template_id: u64,
image_id: u64,
ssh_key_id: u64,
) -> Result<Vm>;
/// Create a renewal payment
async fn renew(&self, vm_id: u64) -> Result<VmPayment>;
/// Allocate ips for a VM
async fn allocate_ips(&self, vm_id: u64) -> Result<Vec<VmIpAssignment>>;
/// Spawn a VM on the host
async fn spawn_vm(&self, vm_id: u64) -> Result<()>;
/// Start a VM
async fn start_vm(&self, vm_id: u64) -> Result<()>;
/// Stop a running VM
async fn stop_vm(&self, vm_id: u64) -> Result<()>;
/// Restart a VM
async fn restart_vm(&self, vm_id: u64) -> Result<()>;
/// Delete a VM
async fn delete_vm(&self, vm_id: u64) -> Result<()>;
/// Open terminal proxy connection
async fn terminal_proxy(
&self,
vm_id: u64,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>>;
/// Re-Configure VM
async fn patch_vm(&self, vm_id: u64) -> Result<()>;
}

View File

@ -15,6 +15,7 @@ pub enum ProvisionerMethod {
#[derive(Debug, Clone, Copy)]
pub struct AvailableIp {
pub ip: IpAddr,
pub gateway: IpNetwork,
pub range_id: u64,
pub region_id: u64,
}
@ -28,10 +29,7 @@ pub struct NetworkProvisioner {
impl NetworkProvisioner {
pub fn new(method: ProvisionerMethod, db: Arc<dyn LNVpsDb>) -> Self {
Self {
method,
db,
}
Self { method, db }
}
/// Pick an IP from one of the available ip ranges
@ -45,21 +43,27 @@ impl NetworkProvisioner {
for range in ip_ranges {
let range_cidr: IpNetwork = range.cidr.parse()?;
let ips = self.db.list_vm_ip_assignments_in_range(range.id).await?;
let ips: HashSet<IpAddr> = ips.iter().map_while(|i| i.ip.parse().ok()).collect();
let mut ips: HashSet<IpAddr> = ips.iter().map_while(|i| i.ip.parse().ok()).collect();
let gateway: IpNetwork = range.gateway.parse()?;
// mark some IPS as always used
// Namely:
// .0 & .255 of /24 (first and last)
// gateway ip of the range
ips.insert(range_cidr.iter().next().unwrap());
ips.insert(range_cidr.iter().last().unwrap());
ips.insert(gateway.ip());
// pick an IP at random
let ip_pick = {
let first_ip = range_cidr.iter().next().unwrap();
let last_ip = range_cidr.iter().last().unwrap();
match self.method {
ProvisionerMethod::Sequential => range_cidr
.iter()
.find(|i| *i != first_ip && *i != last_ip && !ips.contains(i)),
ProvisionerMethod::Sequential => range_cidr.iter().find(|i| !ips.contains(i)),
ProvisionerMethod::Random => {
let mut rng = rand::rng();
loop {
if let Some(i) = range_cidr.iter().choose(&mut rng) {
if i != first_ip && i != last_ip && !ips.contains(&i) {
if !ips.contains(&i) {
break Some(i);
}
} else {
@ -73,6 +77,7 @@ impl NetworkProvisioner {
if let Some(ip_pick) = ip_pick {
return Ok(AvailableIp {
range_id: range.id,
gateway,
ip: ip_pick,
region_id,
});
@ -86,23 +91,22 @@ impl NetworkProvisioner {
mod tests {
use super::*;
use crate::mocks::*;
use lnvps_db::VmIpAssignment;
use std::str::FromStr;
#[tokio::test]
async fn pick_seq_ip_for_region_test() {
let db: Arc<dyn LNVpsDb> = Arc::new(MockDb::default());
let mgr = NetworkProvisioner::new(
ProvisionerMethod::Sequential,
db.clone(),
);
let mgr = NetworkProvisioner::new(ProvisionerMethod::Sequential, db.clone());
let first = IpAddr::from_str("10.0.0.1").unwrap();
let second = IpAddr::from_str("10.0.0.2").unwrap();
let gateway = IpNetwork::from_str("10.0.0.1/8").unwrap();
let first = IpAddr::from_str("10.0.0.2").unwrap();
let second = IpAddr::from_str("10.0.0.3").unwrap();
let ip = mgr.pick_ip_for_region(1).await.expect("No ip found in db");
assert_eq!(1, ip.region_id);
assert_eq!(first, ip.ip);
assert_eq!(gateway, ip.gateway);
let ip = mgr.pick_ip_for_region(1).await.expect("No ip found in db");
assert_eq!(1, ip.region_id);
@ -123,10 +127,7 @@ mod tests {
#[tokio::test]
async fn pick_rng_ip_for_region_test() {
let db: Arc<dyn LNVpsDb> = Arc::new(MockDb::default());
let mgr = NetworkProvisioner::new(
ProvisionerMethod::Random,
db,
);
let mgr = NetworkProvisioner::new(ProvisionerMethod::Random, db);
let ip = mgr.pick_ip_for_region(1).await.expect("No ip found in db");
assert_eq!(1, ip.region_id);

View File

@ -40,4 +40,4 @@ pub struct ArpEntry {
#[cfg(feature = "mikrotik")]
mod mikrotik;
#[cfg(feature = "mikrotik")]
pub use mikrotik::*;
pub use mikrotik::*;

View File

@ -1,7 +1,6 @@
use crate::exchange::ExchangeRateService;
use crate::lightning::LightningNode;
use crate::provisioner::LNVpsProvisioner;
use crate::provisioner::Provisioner;
use crate::router::{MikrotikRouter, Router};
use anyhow::{bail, Result};
use lnvps_db::LNVpsDb;
@ -103,7 +102,11 @@ pub enum NetworkAccessPolicy {
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub struct NetworkPolicy {
/// Policy that determines how packets arrive at the VM
pub access: NetworkAccessPolicy,
/// Use SLAAC for IPv6 allocation
pub ip6_slaac: Option<bool>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@ -133,6 +136,8 @@ pub enum ProvisionerConfig {
qemu: QemuConfig,
/// SSH config for issuing commands via CLI
ssh: Option<SshConfig>,
/// MAC address prefix for NIC (eg. bc:24:11)
mac_prefix: Option<String>,
},
}
@ -167,7 +172,7 @@ impl Settings {
db: Arc<dyn LNVpsDb>,
node: Arc<dyn LightningNode>,
exchange: Arc<dyn ExchangeRateService>,
) -> Arc<dyn Provisioner> {
) -> Arc<LNVpsProvisioner> {
Arc::new(LNVpsProvisioner::new(self.clone(), db, node, exchange))
}
@ -187,10 +192,7 @@ impl Settings {
#[cfg(test)]
pub fn get_router(&self) -> Result<Option<Arc<dyn Router>>> {
if self.router.is_some() {
let router = crate::mocks::MockRouter {
policy: self.network_policy.clone(),
arp: Arc::new(Default::default()),
};
let router = crate::mocks::MockRouter::new(self.network_policy.clone());
Ok(Some(Arc::new(router)))
} else {
Ok(None)

View File

@ -6,7 +6,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone, Serialize, Deserialize, Default, JsonSchema)]
#[derive(Clone, Serialize, Deserialize, Default, JsonSchema, PartialEq, Debug)]
#[serde(rename_all = "lowercase")]
pub enum VmRunningState {
Running,

View File

@ -1,6 +1,5 @@
use crate::host::get_host_client;
use crate::host::proxmox::{VmInfo, VmStatus};
use crate::provisioner::Provisioner;
use crate::provisioner::LNVpsProvisioner;
use crate::settings::{ProvisionerConfig, Settings, SmtpConfig};
use crate::status::{VmRunningState, VmState, VmStateCache};
use anyhow::Result;
@ -9,8 +8,8 @@ use lettre::message::{MessageBuilder, MultiPart};
use lettre::transport::smtp::authentication::Credentials;
use lettre::AsyncTransport;
use lettre::{AsyncSmtpTransport, Tokio1Executor};
use lnvps_db::LNVpsDb;
use log::{debug, error, info};
use lnvps_db::{LNVpsDb, Vm};
use log::{debug, error, info, warn};
use nostr::{EventBuilder, PublicKey, ToBech32};
use nostr_sdk::Client;
use std::ops::{Add, Sub};
@ -33,11 +32,13 @@ pub enum WorkJob {
},
}
/// Primary background worker logic
/// Handles deleting expired VMs and sending notifications
pub struct Worker {
settings: WorkerSettings,
db: Arc<dyn LNVpsDb>,
provisioner: Arc<dyn Provisioner>,
provisioner: Arc<LNVpsProvisioner>,
nostr: Option<Client>,
vm_state_cache: VmStateCache,
@ -65,7 +66,7 @@ impl From<&Settings> for WorkerSettings {
impl Worker {
pub fn new(
db: Arc<dyn LNVpsDb>,
provisioner: Arc<dyn Provisioner>,
provisioner: Arc<LNVpsProvisioner>,
settings: impl Into<WorkerSettings>,
vm_state_cache: VmStateCache,
nostr: Option<Client>,
@ -87,88 +88,71 @@ impl Worker {
self.tx.clone()
}
async fn handle_vm_info(&self, s: VmInfo) -> Result<()> {
// TODO: remove assumption
let db_id = (s.vm_id - 100) as u64;
let state = VmState {
timestamp: Utc::now().timestamp() as u64,
state: match s.status {
VmStatus::Stopped => VmRunningState::Stopped,
VmStatus::Running => VmRunningState::Running,
},
cpu_usage: s.cpu.unwrap_or(0.0),
mem_usage: s.mem.unwrap_or(0) as f32 / s.max_mem.unwrap_or(1) as f32,
uptime: s.uptime.unwrap_or(0),
net_in: s.net_in.unwrap_or(0),
net_out: s.net_out.unwrap_or(0),
disk_write: s.disk_write.unwrap_or(0),
disk_read: s.disk_read.unwrap_or(0),
};
self.vm_state_cache.set_state(db_id, state).await?;
/// Handle VM state
/// 1. Expire VM and send notification
/// 2. Stop VM if expired and still running
/// 3. Send notification for expiring soon
async fn handle_vm_state(&self, vm: &Vm, state: &VmState) -> Result<()> {
const BEFORE_EXPIRE_NOTIFICATION: u64 = 1;
if let Ok(db_vm) = self.db.get_vm(db_id).await {
const BEFORE_EXPIRE_NOTIFICATION: u64 = 1;
// Send notification of VM expiring soon
if vm.expires < Utc::now().add(Days::new(BEFORE_EXPIRE_NOTIFICATION))
&& vm.expires
> self
.last_check_vms
.add(Days::new(BEFORE_EXPIRE_NOTIFICATION))
{
info!("Sending expire soon notification VM {}", vm.id);
self.tx.send(WorkJob::SendNotification {
user_id: vm.user_id,
title: Some(format!("[VM{}] Expiring Soon", vm.id)),
message: format!("Your VM #{} will expire soon, please renew in the next {} days or your VM will be stopped.", vm.id, BEFORE_EXPIRE_NOTIFICATION)
})?;
}
// Send notification of VM expiring soon
if db_vm.expires < Utc::now().add(Days::new(BEFORE_EXPIRE_NOTIFICATION))
&& db_vm.expires
> self
.last_check_vms
.add(Days::new(BEFORE_EXPIRE_NOTIFICATION))
{
info!("Sending expire soon notification VM {}", db_vm.id);
self.tx.send(WorkJob::SendNotification {
user_id: db_vm.user_id,
title: Some(format!("[VM{}] Expiring Soon", db_vm.id)),
message: format!("Your VM #{} will expire soon, please renew in the next {} days or your VM will be stopped.", db_vm.id, BEFORE_EXPIRE_NOTIFICATION)
})?;
// Stop VM if expired and is running
if vm.expires < Utc::now() && state.state == VmRunningState::Running {
info!("Stopping expired VM {}", vm.id);
if let Err(e) = self.provisioner.stop_vm(vm.id).await {
warn!("Failed to stop VM {}: {}", vm.id, e);
}
self.tx.send(WorkJob::SendNotification {
user_id: vm.user_id,
title: Some(format!("[VM{}] Expired", vm.id)),
message: format!("Your VM #{} has expired and is now stopped, please renew in the next {} days or your VM will be deleted.", vm.id, self.settings.delete_after)
})?;
}
// Stop VM if expired and is running
if db_vm.expires < Utc::now() && s.status == VmStatus::Running {
info!("Stopping expired VM {}", db_vm.id);
self.provisioner.stop_vm(db_vm.id).await?;
self.tx.send(WorkJob::SendNotification {
user_id: db_vm.user_id,
title: Some(format!("[VM{}] Expired", db_vm.id)),
message: format!("Your VM #{} has expired and is now stopped, please renew in the next {} days or your VM will be deleted.", db_vm.id, self.settings.delete_after)
})?;
}
// Delete VM if expired > self.settings.delete_after days
if db_vm
.expires
.add(Days::new(self.settings.delete_after as u64))
< Utc::now()
&& !db_vm.deleted
{
info!("Deleting expired VM {}", db_vm.id);
self.provisioner.delete_vm(db_vm.id).await?;
let title = Some(format!("[VM{}] Deleted", db_vm.id));
self.tx.send(WorkJob::SendNotification {
user_id: db_vm.user_id,
title: title.clone(),
message: format!("Your VM #{} has been deleted!", db_vm.id),
})?;
self.queue_admin_notification(
format!("VM{} is ready for deletion", db_vm.id),
title,
)?;
}
// Delete VM if expired > self.settings.delete_after days
if vm.expires.add(Days::new(self.settings.delete_after as u64)) < Utc::now() && !vm.deleted
{
info!("Deleting expired VM {}", vm.id);
self.provisioner.delete_vm(vm.id).await?;
let title = Some(format!("[VM{}] Deleted", vm.id));
self.tx.send(WorkJob::SendNotification {
user_id: vm.user_id,
title: title.clone(),
message: format!("Your VM #{} has been deleted!", vm.id),
})?;
self.queue_admin_notification(format!("VM{} is ready for deletion", vm.id), title)?;
}
Ok(())
}
/// Check a VM's status
async fn check_vm(&self, vm_id: u64) -> Result<()> {
debug!("Checking VM: {}", vm_id);
let vm = self.db.get_vm(vm_id).await?;
async fn check_vm(&self, vm: &Vm) -> Result<()> {
debug!("Checking VM: {}", vm.id);
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host, &self.settings.provisioner_config)?;
match client.get_vm_status(&host.name, (vm.id + 100) as i32).await {
Ok(s) => self.handle_vm_info(s).await?,
match client.get_vm_state(&vm).await {
Ok(s) => {
self.handle_vm_state(&vm, &s).await?;
self.vm_state_cache.set_state(vm.id, s).await?;
}
Err(_) => {
// spawn VM if doesnt exist
if vm.expires > Utc::now() {
self.provisioner.spawn_vm(vm.id).await?;
let vm_ips = self.db.list_vm_ip_assignments(vm.id).await?;
@ -199,46 +183,14 @@ impl Worker {
}
pub async fn check_vms(&mut self) -> Result<()> {
let hosts = self.db.list_hosts().await?;
for host in hosts {
let client = get_host_client(&host, &self.settings.provisioner_config)?;
for node in client.list_nodes().await? {
debug!("Checking vms for {}", node.name);
for vm in client.list_vms(&node.name).await? {
let vm_id = vm.vm_id;
debug!("\t{}: {:?}", vm_id, vm.status);
if let Err(e) = self.handle_vm_info(vm).await {
error!("{}", e);
self.queue_admin_notification(
format!("Failed to check VM {}:\n{}", vm_id, e),
Some("Job Failed".to_string()),
)?
}
}
}
}
// check VM status from db vm list
let db_vms = self.db.list_vms().await?;
for vm in db_vms {
let state = if let Some(s) = self.vm_state_cache.get_state(vm.id).await {
if s.timestamp > Utc::now().timestamp() as u64 - 120 {
Some(s)
} else {
None
}
} else {
None
};
// create VM if not spawned yet
if vm.expires > Utc::now() && state.is_none() {
self.check_vm(vm.id).await?;
}
for vm in &db_vms {
// Refresh VM status if active
self.check_vm(&vm).await?;
// delete vm if not paid (in new state)
if vm.expires < Utc::now().sub(Days::new(1)) && state.is_none() {
if vm.expires < Utc::now().sub(Days::new(1)) {
info!("Deleting unpaid VM {}", vm.id);
self.provisioner.delete_vm(vm.id).await?;
}
@ -326,10 +278,11 @@ impl Worker {
}
pub async fn handle(&mut self) -> Result<()> {
while let Some(ref job) = self.rx.recv().await {
match job {
while let Some(job) = self.rx.recv().await {
match &job {
WorkJob::CheckVm { vm_id } => {
if let Err(e) = self.check_vm(*vm_id).await {
let vm = self.db.get_vm(*vm_id).await?;
if let Err(e) = self.check_vm(&vm).await {
error!("Failed to check VM {}: {}", vm_id, e);
self.queue_admin_notification(
format!("Failed to check VM {}:\n{:?}\n{}", vm_id, &job, e),