From 7ffff1e698f2a68b5de0801e159b2447893c6685 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 4 Dec 2024 14:57:35 +0000 Subject: [PATCH] feat: notifications --- .../migrations/20241204142919_delete-vm.sql | 5 + lnvps_db/src/lib.rs | 3 + lnvps_db/src/model.rs | 15 +++ lnvps_db/src/mysql.rs | 20 ++-- src/host/proxmox.rs | 3 + src/nip98.rs | 4 +- src/provisioner/lnvps.rs | 8 +- src/worker.rs | 92 +++++++++++++++++-- 8 files changed, 134 insertions(+), 16 deletions(-) create mode 100644 lnvps_db/migrations/20241204142919_delete-vm.sql diff --git a/lnvps_db/migrations/20241204142919_delete-vm.sql b/lnvps_db/migrations/20241204142919_delete-vm.sql new file mode 100644 index 0000000..80a2648 --- /dev/null +++ b/lnvps_db/migrations/20241204142919_delete-vm.sql @@ -0,0 +1,5 @@ +-- Add migration script here +alter table vm + add column deleted bit(1) not null default 0; +alter table vm_ip_assignment + add column deleted bit(1) not null default 0; \ No newline at end of file diff --git a/lnvps_db/src/lib.rs b/lnvps_db/src/lib.rs index cf3136d..cc1f704 100644 --- a/lnvps_db/src/lib.rs +++ b/lnvps_db/src/lib.rs @@ -102,6 +102,9 @@ pub trait LNVpsDb: Sync + Send { /// List VM ip assignments by IP range async fn list_vm_ip_assignments_in_range(&self, range_id: u64) -> Result>; + /// Delete assigned VM ips + async fn delete_vm_ip_assignment(&self, vm_id: u64) -> Result<()>; + /// List payments by VM id async fn list_vm_payment(&self, vm_id: u64) -> Result>; diff --git a/lnvps_db/src/model.rs b/lnvps_db/src/model.rs index 31e5689..51b0dd9 100644 --- a/lnvps_db/src/model.rs +++ b/lnvps_db/src/model.rs @@ -3,6 +3,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sqlx::FromRow; +use std::fmt::{Display, Formatter}; use std::path::PathBuf; use url::Url; @@ -151,6 +152,12 @@ impl VmOsImage { } } +impl Display for VmOsImage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?} {}", self.distribution, self.version) + } +} + #[derive(Serialize, Deserialize, FromRow, Clone, Debug)] pub struct IpRange { pub id: u64, @@ -234,6 +241,8 @@ pub struct Vm { pub disk_id: u64, /// Network MAC address pub mac_address: String, + /// Is the VM deleted + pub deleted: bool, #[sqlx(skip)] #[serde(skip_serializing_if = "Option::is_none")] @@ -265,6 +274,12 @@ pub struct VmIpAssignment { pub ip_range: Option, } +impl Display for VmIpAssignment { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.ip) + } +} + #[serde_as] #[derive(Serialize, Deserialize, FromRow, Clone, Debug, Default)] pub struct VmPayment { diff --git a/lnvps_db/src/mysql.rs b/lnvps_db/src/mysql.rs index b05891d..26d1712 100644 --- a/lnvps_db/src/mysql.rs +++ b/lnvps_db/src/mysql.rs @@ -191,21 +191,21 @@ impl LNVpsDb for LNVpsDbMysql { } async fn list_vms(&self) -> Result> { - sqlx::query_as("select * from vm") + sqlx::query_as("select * from vm ") .fetch_all(&self.db) .await .map_err(Error::new) } async fn list_expired_vms(&self) -> Result> { - sqlx::query_as("select * from vm where expires > current_timestamp()") + sqlx::query_as("select * from vm where expires > current_timestamp() and deleted = 0") .fetch_all(&self.db) .await .map_err(Error::new) } async fn list_user_vms(&self, id: u64) -> Result> { - sqlx::query_as("select * from vm where user_id = ?") + sqlx::query_as("select * from vm where user_id = ? and deleted = 0") .bind(id) .fetch_all(&self.db) .await @@ -241,7 +241,7 @@ impl LNVpsDb for LNVpsDbMysql { } async fn delete_vm(&self, vm_id: u64) -> Result<()> { - sqlx::query("delete from vm where id = ?") + sqlx::query("update vm set deleted = 1 where id = ?") .bind(vm_id) .execute(&self.db) .await @@ -263,7 +263,7 @@ impl LNVpsDb for LNVpsDbMysql { } async fn list_vm_ip_assignments(&self, vm_id: u64) -> Result> { - sqlx::query_as("select * from vm_ip_assignment where vm_id = ?") + sqlx::query_as("select * from vm_ip_assignment where vm_id = ? and deleted = 0") .bind(vm_id) .fetch_all(&self.db) .await @@ -271,13 +271,21 @@ impl LNVpsDb for LNVpsDbMysql { } async fn list_vm_ip_assignments_in_range(&self, range_id: u64) -> Result> { - sqlx::query_as("select * from vm_ip_assignment where ip_range_id = ?") + sqlx::query_as("select * from vm_ip_assignment where ip_range_id = ? and deleted = 0") .bind(range_id) .fetch_all(&self.db) .await .map_err(Error::new) } + async fn delete_vm_ip_assignment(&self, vm_id: u64) -> Result<()> { + sqlx::query("update vm_ip_assignment set deleted = 1 where vm_id = ?") + .bind(&vm_id) + .execute(&self.db) + .await?; + Ok(()) + } + async fn list_vm_payment(&self, vm_id: u64) -> Result> { sqlx::query_as("select * from vm_payment where vm_id = ?") .bind(vm_id) diff --git a/src/host/proxmox.rs b/src/host/proxmox.rs index a640610..ad9cc24 100644 --- a/src/host/proxmox.rs +++ b/src/host/proxmox.rs @@ -577,4 +577,7 @@ pub struct VmConfig { pub efi_disk_0: Option, #[serde(skip_serializing_if = "Option::is_none")] pub kvm: Option, + #[serde(rename = "serial0")] + #[serde(skip_serializing_if = "Option::is_none")] + pub serial_0: Option } diff --git a/src/nip98.rs b/src/nip98.rs index 3f30fe9..2e57bf5 100644 --- a/src/nip98.rs +++ b/src/nip98.rs @@ -1,6 +1,6 @@ use base64::prelude::BASE64_STANDARD; use base64::Engine; -use log::info; +use log::{debug, info}; use nostr::{Event, JsonUtil, Kind, Timestamp}; use rocket::http::uri::{Absolute, Uri}; use rocket::http::Status; @@ -80,7 +80,7 @@ impl<'r> FromRequest<'r> for Nip98Auth { return Outcome::Error((Status::new(401), "Event signature invalid")); } - info!("{}", event.as_json()); + debug!("{}", event.as_json()); Outcome::Success(Nip98Auth { event }) } else { Outcome::Error((Status::new(403), "Auth scheme must be Nostr")) diff --git a/src/provisioner/lnvps.rs b/src/provisioner/lnvps.rs index 6902ae5..e1aecdd 100644 --- a/src/provisioner/lnvps.rs +++ b/src/provisioner/lnvps.rs @@ -356,6 +356,7 @@ impl Provisioner for LNVpsProvisioner { scsi_hw: Some("virtio-scsi-pci".to_string()), ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()), efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)), + serial_0: Some("socket".to_string()), ..Default::default() }, }) @@ -445,9 +446,12 @@ impl Provisioner for LNVpsProvisioner { let host = self.db.get_host(vm.host_id).await?; let client = get_host_client(&host)?; - let j_start = client.delete_vm(&host.name, vm.id + 100).await?; - client.wait_for_task(&j_start).await?; + // TODO: delete not implemented, stop only + //let j_start = client.delete_vm(&host.name, vm.id + 100).await?; + let j_stop = client.stop_vm(&host.name, vm.id + 100).await?; + client.wait_for_task(&j_stop).await?; + self.db.delete_vm_ip_assignment(vm.id).await?; self.db.delete_vm(vm.id).await?; Ok(()) diff --git a/src/worker.rs b/src/worker.rs index 0236636..a6a26fa 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -4,7 +4,7 @@ use crate::provisioner::Provisioner; use crate::settings::{Settings, SmtpConfig}; use crate::status::{VmRunningState, VmState, VmStateCache}; use anyhow::Result; -use chrono::{Days, Utc}; +use chrono::{DateTime, Days, Utc}; use lettre::message::MessageBuilder; use lettre::transport::smtp::authentication::Credentials; use lettre::transport::smtp::SmtpTransportBuilder; @@ -16,6 +16,7 @@ use rocket::futures::SinkExt; use std::ops::Add; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +#[derive(Debug)] pub enum WorkJob { /// Check all running VMS CheckVms, @@ -38,6 +39,7 @@ pub struct Worker { vm_state_cache: VmStateCache, tx: UnboundedSender, rx: UnboundedReceiver, + last_check_vms: u64, } pub struct WorkerSettings { @@ -69,6 +71,7 @@ impl Worker { settings: settings.into(), tx, rx, + last_check_vms: Utc::now().timestamp() as u64, } } @@ -100,15 +103,26 @@ impl Worker { if db_vm.expires < Utc::now() && s.status == VmStatus::Running { info!("Stopping expired VM {}", db_vm.id); self.provisioner.stop_vm(db_vm.id).await?; + self.tx.send(WorkJob::SendNotification { + user_id: db_vm.user_id, + title: Some(format!("[VM{}] Expired", db_vm.id)), + message: format!("Your VM #{} has expired and is now stopped, please renew in the next {} days or your VM will be deleted.", db_vm.id, self.settings.delete_after) + })?; } - // Delete VM if expired > 3 days + // Delete VM if expired > self.settings.delete_after days if db_vm .expires .add(Days::new(self.settings.delete_after as u64)) < Utc::now() + && !db_vm.deleted { info!("Deleting expired VM {}", db_vm.id); self.provisioner.delete_vm(db_vm.id).await?; + self.tx.send(WorkJob::SendNotification { + user_id: db_vm.user_id, + title: Some(format!("[VM{}] Deleted", db_vm.id)), + message: format!("Your VM #{} has been deleted!", db_vm.id), + })?; } } @@ -127,6 +141,22 @@ impl Worker { Err(_) => { if vm.expires > Utc::now() { self.provisioner.spawn_vm(vm.id).await?; + let vm_ips = self.db.list_vm_ip_assignments(vm.id).await?; + let image = self.db.get_os_image(vm.image_id).await?; + self.tx.send(WorkJob::SendNotification { + user_id: vm.user_id, + title: Some(format!("[VM{}] Created", vm.id)), + message: format!( + "Your VM #{} been created!\nOS: {}\nIPs: {}", + vm.id, + image, + vm_ips + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(", ") + ), + })?; } } } @@ -141,9 +171,14 @@ impl Worker { for node in client.list_nodes().await? { info!("Checking vms for {}", node.name); for vm in client.list_vms(&node.name).await? { - info!("\t{}: {:?}", vm.vm_id, vm.status); + let vm_id = vm.vm_id; + info!("\t{}: {:?}", vm_id, vm.status); if let Err(e) = self.handle_vm_info(vm).await { error!("{}", e); + self.queue_admin_notification( + format!("Failed to check VM {}:\n{}", vm_id, e.to_string()), + Some("Job Failed".to_string()), + )? } } } @@ -206,12 +241,42 @@ impl Worker { Ok(()) } + fn queue_notification( + &self, + user_id: u64, + message: String, + title: Option, + ) -> Result<()> { + self.tx.send(WorkJob::SendNotification { + user_id, + message, + title, + })?; + Ok(()) + } + + fn queue_admin_notification(&self, message: String, title: Option) -> Result<()> { + if let Some(a) = self.settings.smtp.as_ref().and_then(|s| s.admin) { + self.queue_notification(a, message, title)?; + } + Ok(()) + } + pub async fn handle(&mut self) -> Result<()> { - while let Some(job) = self.rx.recv().await { + while let Some(ref job) = self.rx.recv().await { match job { WorkJob::CheckVm { vm_id } => { - if let Err(e) = self.check_vm(vm_id).await { + if let Err(e) = self.check_vm(*vm_id).await { error!("Failed to check VM {}: {}", vm_id, e); + self.queue_admin_notification( + format!( + "Failed to check VM {}:\n{:?}\n{}", + vm_id, + &job, + e.to_string() + ), + Some("Job Failed".to_string()), + )? } } WorkJob::SendNotification { @@ -219,13 +284,28 @@ impl Worker { message, title, } => { - if let Err(e) = self.send_notification(user_id, message, title).await { + if let Err(e) = self + .send_notification(*user_id, message.clone(), title.clone()) + .await + { error!("Failed to send notification {}: {}", user_id, e); + self.queue_admin_notification( + format!( + "Failed to send notification:\n{:?}\n{}", + &job, + e.to_string() + ), + Some("Job Failed".to_string()), + )? } } WorkJob::CheckVms => { if let Err(e) = self.check_vms().await { error!("Failed to check VMs: {}", e); + self.queue_admin_notification( + format!("Failed to check VM's:\n{:?}\n{}", &job, e.to_string()), + Some("Job Failed".to_string()), + )? } } }