feat: vm actions start/stop/delete

This commit is contained in:
kieran 2024-11-29 16:43:14 +00:00
parent 632a5aaa87
commit 2370204546
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
11 changed files with 304 additions and 45 deletions

View File

@ -3,6 +3,7 @@ lnd:
url: "https://127.0.0.1:10003"
cert: "/home/kieran/.polar/networks/2/volumes/lnd/alice/tls.cert"
macaroon: "/home/kieran/.polar/networks/2/volumes/lnd/alice/data/chain/bitcoin/regtest/admin.macaroon"
delete_after: 3
provisioner:
proxmox:
read_only: false

View File

@ -78,6 +78,9 @@ pub trait LNVpsDb: Sync + Send {
/// List all VM's
async fn list_vms(&self) -> Result<Vec<Vm>>;
/// List expired VM's
async fn list_expired_vms(&self) -> Result<Vec<Vm>>;
/// List VM's owned by a specific user
async fn list_user_vms(&self, id: u64) -> Result<Vec<Vm>>;
@ -87,6 +90,9 @@ pub trait LNVpsDb: Sync + Send {
/// Insert a new VM record
async fn insert_vm(&self, vm: &Vm) -> Result<u64>;
/// Delete a VM by id
async fn delete_vm(&self, vm_id: u64) -> Result<()>;
/// List VM ip assignments
async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result<u64>;

View File

@ -197,6 +197,13 @@ impl LNVpsDb for LNVpsDbMysql {
.map_err(Error::new)
}
async fn list_expired_vms(&self) -> Result<Vec<Vm>> {
sqlx::query_as("select * from vm where expires > current_timestamp()")
.fetch_all(&self.db)
.await
.map_err(Error::new)
}
async fn list_user_vms(&self, id: u64) -> Result<Vec<Vm>> {
sqlx::query_as("select * from vm where user_id = ?")
.bind(id)
@ -233,6 +240,15 @@ impl LNVpsDb for LNVpsDbMysql {
.try_get(0)?)
}
async fn delete_vm(&self, vm_id: u64) -> Result<()> {
sqlx::query("delete from vm where id = ?")
.bind(vm_id)
.execute(&self.db)
.await
.map_err(Error::new)?;
Ok(())
}
async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result<u64> {
Ok(sqlx::query(
"insert into vm_ip_assignment(vm_id,ip_range_id,ip) values(?, ?, ?) returning id",

View File

@ -1,13 +1,15 @@
use crate::nip98::Nip98Auth;
use crate::provisioner::Provisioner;
use crate::status::{VmState, VmStateCache};
use crate::worker::WorkJob;
use lnvps_db::hydrate::Hydrate;
use lnvps_db::{LNVpsDb, UserSshKey, Vm, VmOsImage, VmPayment, VmTemplate};
use nostr::util::hex;
use rocket::serde::json::Json;
use rocket::{get, post, routes, Responder, Route, State};
use rocket::{get, patch, post, routes, Responder, Route, State};
use serde::{Deserialize, Serialize};
use ssh_key::PublicKey;
use tokio::sync::mpsc::UnboundedSender;
pub fn routes() -> Vec<Route> {
routes![
@ -19,7 +21,10 @@ pub fn routes() -> Vec<Route> {
v1_add_ssh_key,
v1_create_vm_order,
v1_renew_vm,
v1_get_payment
v1_get_payment,
v1_start_vm,
v1_stop_vm,
v1_restart_vm
]
}
@ -195,6 +200,66 @@ async fn v1_renew_vm(
ApiData::ok(rsp)
}
#[patch("/api/v1/vm/<id>/start")]
async fn v1_start_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
worker: &State<UnboundedSender<WorkJob>>,
id: u64,
) -> ApiResult<()> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
let vm = db.get_vm(id).await?;
if uid != vm.user_id {
return ApiData::err("VM does not belong to you");
}
provisioner.start_vm(id).await?;
worker.send(WorkJob::CheckVm { vm_id: id })?;
ApiData::ok(())
}
#[patch("/api/v1/vm/<id>/stop")]
async fn v1_stop_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
worker: &State<UnboundedSender<WorkJob>>,
id: u64,
) -> ApiResult<()> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
let vm = db.get_vm(id).await?;
if uid != vm.user_id {
return ApiData::err("VM does not belong to you");
}
provisioner.stop_vm(id).await?;
worker.send(WorkJob::CheckVm { vm_id: id })?;
ApiData::ok(())
}
#[patch("/api/v1/vm/<id>/restart")]
async fn v1_restart_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
worker: &State<UnboundedSender<WorkJob>>,
id: u64,
) -> ApiResult<()> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
let vm = db.get_vm(id).await?;
if uid != vm.user_id {
return ApiData::err("VM does not belong to you");
}
provisioner.restart_vm(id).await?;
worker.send(WorkJob::CheckVm { vm_id: id })?;
ApiData::ok(())
}
#[get("/api/v1/payment/<id>")]
async fn v1_get_payment(
auth: Nip98Auth,
@ -216,6 +281,7 @@ async fn v1_get_payment(
ApiData::ok(payment)
}
#[derive(Deserialize)]
struct CreateVmRequest {
template_id: u64,

View File

@ -1,4 +1,5 @@
use anyhow::Error;
use chrono::Utc;
use clap::Parser;
use config::{Config, File};
use fedimint_tonic_lnd::connect;
@ -28,7 +29,9 @@ async fn main() -> Result<(), Error> {
let args = Args::parse();
let settings: Settings = Config::builder()
.add_source(File::with_name(&args.config.unwrap_or("config.yaml".to_string())))
.add_source(File::with_name(
&args.config.unwrap_or("config.yaml".to_string()),
))
.build()?
.try_deserialize()?;
@ -68,17 +71,12 @@ async fn main() -> Result<(), Error> {
}
});
// request work every 30s to check vm status
let db_clone = db.clone();
let sender_clone = sender.clone();
tokio::spawn(async move {
loop {
if let Ok(vms) = db_clone.list_vms().await {
for vm in vms {
if let Err(e) = sender_clone.send(WorkJob::CheckVm { vm_id: vm.id }) {
if let Err(e) = sender_clone.send(WorkJob::CheckVms) {
error!("failed to send check vm: {}", e);
}
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
});
@ -120,6 +118,7 @@ async fn main() -> Result<(), Error> {
.manage(pv)
.manage(status)
.manage(exchange)
.manage(sender)
.mount("/", api::routes())
.launch()
.await

View File

@ -1,2 +1,12 @@
use crate::host::proxmox::ProxmoxClient;
use anyhow::Result;
use lnvps_db::{VmHost, VmHostKind};
pub mod proxmox;
pub trait VmHostClient {}
pub fn get_host_client(host: &VmHost) -> Result<ProxmoxClient> {
Ok(match host.kind {
VmHostKind::Proxmox => ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token),
})
}

View File

@ -113,6 +113,27 @@ impl ProxmoxClient {
}
}
/// Delete VM
///
/// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/qemu
pub async fn delete_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
let rsp: ResponseBase<Option<String>> = self
.req(
Method::DELETE,
&format!("/api2/json/nodes/{node}/qemu/{vm}"),
(),
)
.await?;
if let Some(id) = rsp.data {
Ok(TaskId {
id,
node: node.to_string(),
})
} else {
Err(anyhow!("Failed to configure VM"))
}
}
/// Get the current status of a running task
///
/// https://pve.proxmox.com/pve-docs/api-viewer/?ref=public_apis#/nodes/{node}/tasks/{upid}/status
@ -164,7 +185,7 @@ impl ProxmoxClient {
/// Resize a disk on a VM
pub async fn resize_disk(&self, req: ResizeDiskRequest) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.put(
.req(
Method::PUT,
&format!("/api2/json/nodes/{}/qemu/{}/resize", &req.node, &req.vm_id),
&req,
@ -190,6 +211,48 @@ impl ProxmoxClient {
})
}
/// Stop a VM
pub async fn stop_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/status/stop", node, vm),
(),
)
.await?;
Ok(TaskId {
id: rsp.data,
node: node.to_string(),
})
}
/// Stop a VM
pub async fn shutdown_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/status/shutdown", node, vm),
(),
)
.await?;
Ok(TaskId {
id: rsp.data,
node: node.to_string(),
})
}
/// Stop a VM
pub async fn reset_vm(&self, node: &str, vm: u64) -> Result<TaskId> {
let rsp: ResponseBase<String> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/status/reset", node, vm),
(),
)
.await?;
Ok(TaskId {
id: rsp.data,
node: node.to_string(),
})
}
async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
let rsp = self
.client
@ -209,10 +272,10 @@ impl ProxmoxClient {
}
async fn post<T: DeserializeOwned, R: Serialize>(&self, path: &str, body: R) -> Result<T> {
self.put(Method::POST, path, body).await
self.req(Method::POST, path, body).await
}
async fn put<T: DeserializeOwned, R: Serialize>(
async fn req<T: DeserializeOwned, R: Serialize>(
&self,
method: Method,
path: &str,
@ -319,7 +382,7 @@ pub struct NodeResponse {
pub uptime: Option<u64>,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum VmStatus {
Stopped,

View File

@ -1,4 +1,5 @@
use crate::exchange::{ExchangeRateCache, Ticker};
use crate::host::get_host_client;
use crate::host::proxmox::{
ConfigureVm, CreateVm, DownloadUrlRequest, ProxmoxClient, ResizeDiskRequest, StorageContent,
TaskState, VmBios, VmConfig,
@ -57,14 +58,6 @@ impl LNVpsProvisioner {
}
}
fn get_host_client(host: &VmHost) -> Result<ProxmoxClient> {
Ok(match host.kind {
VmHostKind::Proxmox => {
ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token)
}
})
}
async fn get_iso_storage(node: &str, client: &ProxmoxClient) -> Result<String> {
let storages = client.list_storage(node).await?;
if let Some(s) = storages
@ -84,7 +77,7 @@ impl Provisioner for LNVpsProvisioner {
// tell hosts to download images
let hosts = self.db.list_hosts().await?;
for host in hosts {
let client = Self::get_host_client(&host)?;
let client = get_host_client(&host)?;
let iso_storage = Self::get_iso_storage(&host.name, &client).await?;
let files = client.list_storage_files(&host.name, &iso_storage).await?;
@ -294,7 +287,7 @@ impl Provisioner for LNVpsProvisioner {
}
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = Self::get_host_client(&host)?;
let client = get_host_client(&host)?;
let mut ips = self.db.list_vm_ip_assignments(vm.id).await?;
if ips.is_empty() {
@ -418,4 +411,49 @@ impl Provisioner for LNVpsProvisioner {
Ok(())
}
async fn start_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let j_start = client.start_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_start).await?;
Ok(())
}
async fn stop_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let j_start = client.shutdown_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_start).await?;
Ok(())
}
async fn restart_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let j_start = client.reset_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_start).await?;
Ok(())
}
async fn delete_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let j_start = client.delete_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_start).await?;
self.db.delete_vm(vm.id).await?;
Ok(())
}
}

View File

@ -30,4 +30,16 @@ pub trait Provisioner: Send + Sync {
/// Spawn a VM on the host
async fn spawn_vm(&self, vm_id: u64) -> Result<()>;
/// Start a VM
async fn start_vm(&self, vm_id: u64) -> Result<()>;
/// Stop a running VM
async fn stop_vm(&self, vm_id: u64) -> Result<()>;
/// Restart a VM
async fn restart_vm(&self, vm_id: u64) -> Result<()>;
/// Delete a VM
async fn delete_vm(&self, vm_id: u64) -> Result<()>;
}

View File

@ -12,6 +12,9 @@ pub struct Settings {
pub db: String,
pub lnd: LndConfig,
pub provisioner: ProvisionerConfig,
/// Number of days after an expired VM is deleted
pub delete_after: u16,
}
#[derive(Debug, Deserialize, Serialize)]

View File

@ -1,13 +1,17 @@
use crate::host::proxmox::{ProxmoxClient, VmStatus};
use crate::host::get_host_client;
use crate::host::proxmox::{ProxmoxClient, VmInfo, VmStatus};
use crate::provisioner::Provisioner;
use crate::status::{VmRunningState, VmState, VmStateCache};
use anyhow::Result;
use chrono::Utc;
use chrono::{Days, Utc};
use lnvps_db::LNVpsDb;
use log::{debug, error, info, warn};
use std::ops::Add;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
pub enum WorkJob {
/// Check all running VMS
CheckVms,
/// Check the VM status matches database state
///
/// This job starts a vm if stopped and also creates the vm if it doesn't exist yet
@ -44,15 +48,9 @@ impl Worker {
self.tx.clone()
}
/// Check a VM's status
async fn check_vm(&self, vm_id: u64) -> Result<()> {
debug!("Checking VM: {}", vm_id);
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = ProxmoxClient::new(host.ip.parse()?).with_api_token(&host.api_token);
match client.get_vm_status(&host.name, (vm.id + 100) as i32).await {
Ok(s) => {
async fn handle_vm_info(&self, s: VmInfo) -> Result<()> {
// TODO: remove assumption
let db_id = (s.vm_id - 100) as u64;
let state = VmState {
state: match s.status {
VmStatus::Stopped => VmRunningState::Stopped,
@ -66,10 +64,34 @@ impl Worker {
disk_write: s.disk_write.unwrap_or(0),
disk_read: s.disk_read.unwrap_or(0),
};
self.vm_state_cache.set_state(vm_id, state).await?;
self.vm_state_cache.set_state(db_id, state).await?;
if let Ok(db_vm) = self.db.get_vm(db_id).await {
// Stop VM if expired and is running
if db_vm.expires < Utc::now() && s.status == VmStatus::Running {
info!("Stopping expired VM {}", db_vm.id);
self.provisioner.stop_vm(db_vm.id).await?;
}
Err(e) => {
warn!("Failed to get VM {} status: {}", vm.id, e);
// Delete VM if expired > 3 days
if db_vm.expires.add(Days::new(3)) < Utc::now() {
info!("Deleting expired VM {}", db_vm.id);
self.provisioner.delete_vm(db_vm.id).await?;
}
}
Ok(())
}
/// Check a VM's status
async fn check_vm(&self, vm_id: u64) -> Result<()> {
debug!("Checking VM: {}", vm_id);
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
match client.get_vm_status(&host.name, (vm.id + 100) as i32).await {
Ok(s) => self.handle_vm_info(s).await?,
Err(_) => {
if vm.expires > Utc::now() {
self.provisioner.spawn_vm(vm.id).await?;
}
@ -78,6 +100,24 @@ impl Worker {
Ok(())
}
pub async fn check_vms(&self) -> Result<()> {
let hosts = self.db.list_hosts().await?;
for host in hosts {
let client = get_host_client(&host)?;
for node in client.list_nodes().await? {
info!("Checking vms for {}", node.name);
for vm in client.list_vms(&node.name).await? {
info!("\t{}: {:?}", vm.vm_id, vm.status);
if let Err(e) = self.handle_vm_info(vm).await {
error!("{}", e);
}
}
}
}
Ok(())
}
pub async fn handle(&mut self) -> Result<()> {
while let Some(job) = self.rx.recv().await {
match job {
@ -87,6 +127,11 @@ impl Worker {
}
}
WorkJob::SendNotification { .. } => {}
WorkJob::CheckVms => {
if let Err(e) = self.check_vms().await {
error!("Failed to check VMs: {}", e);
}
}
}
}
Ok(())