feat: notifications
This commit is contained in:
parent
119493e850
commit
7ffff1e698
5
lnvps_db/migrations/20241204142919_delete-vm.sql
Normal file
5
lnvps_db/migrations/20241204142919_delete-vm.sql
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
-- Add migration script here
|
||||||
|
alter table vm
|
||||||
|
add column deleted bit(1) not null default 0;
|
||||||
|
alter table vm_ip_assignment
|
||||||
|
add column deleted bit(1) not null default 0;
|
@ -102,6 +102,9 @@ pub trait LNVpsDb: Sync + Send {
|
|||||||
/// List VM ip assignments by IP range
|
/// List VM ip assignments by IP range
|
||||||
async fn list_vm_ip_assignments_in_range(&self, range_id: u64) -> Result<Vec<VmIpAssignment>>;
|
async fn list_vm_ip_assignments_in_range(&self, range_id: u64) -> Result<Vec<VmIpAssignment>>;
|
||||||
|
|
||||||
|
/// Delete assigned VM ips
|
||||||
|
async fn delete_vm_ip_assignment(&self, vm_id: u64) -> Result<()>;
|
||||||
|
|
||||||
/// List payments by VM id
|
/// List payments by VM id
|
||||||
async fn list_vm_payment(&self, vm_id: u64) -> Result<Vec<VmPayment>>;
|
async fn list_vm_payment(&self, vm_id: u64) -> Result<Vec<VmPayment>>;
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ use chrono::{DateTime, Utc};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_with::serde_as;
|
use serde_with::serde_as;
|
||||||
use sqlx::FromRow;
|
use sqlx::FromRow;
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
@ -151,6 +152,12 @@ impl VmOsImage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Display for VmOsImage {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{:?} {}", self.distribution, self.version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, FromRow, Clone, Debug)]
|
#[derive(Serialize, Deserialize, FromRow, Clone, Debug)]
|
||||||
pub struct IpRange {
|
pub struct IpRange {
|
||||||
pub id: u64,
|
pub id: u64,
|
||||||
@ -234,6 +241,8 @@ pub struct Vm {
|
|||||||
pub disk_id: u64,
|
pub disk_id: u64,
|
||||||
/// Network MAC address
|
/// Network MAC address
|
||||||
pub mac_address: String,
|
pub mac_address: String,
|
||||||
|
/// Is the VM deleted
|
||||||
|
pub deleted: bool,
|
||||||
|
|
||||||
#[sqlx(skip)]
|
#[sqlx(skip)]
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
@ -265,6 +274,12 @@ pub struct VmIpAssignment {
|
|||||||
pub ip_range: Option<IpRange>,
|
pub ip_range: Option<IpRange>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Display for VmIpAssignment {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Serialize, Deserialize, FromRow, Clone, Debug, Default)]
|
#[derive(Serialize, Deserialize, FromRow, Clone, Debug, Default)]
|
||||||
pub struct VmPayment {
|
pub struct VmPayment {
|
||||||
|
@ -191,21 +191,21 @@ impl LNVpsDb for LNVpsDbMysql {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn list_vms(&self) -> Result<Vec<Vm>> {
|
async fn list_vms(&self) -> Result<Vec<Vm>> {
|
||||||
sqlx::query_as("select * from vm")
|
sqlx::query_as("select * from vm ")
|
||||||
.fetch_all(&self.db)
|
.fetch_all(&self.db)
|
||||||
.await
|
.await
|
||||||
.map_err(Error::new)
|
.map_err(Error::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_expired_vms(&self) -> Result<Vec<Vm>> {
|
async fn list_expired_vms(&self) -> Result<Vec<Vm>> {
|
||||||
sqlx::query_as("select * from vm where expires > current_timestamp()")
|
sqlx::query_as("select * from vm where expires > current_timestamp() and deleted = 0")
|
||||||
.fetch_all(&self.db)
|
.fetch_all(&self.db)
|
||||||
.await
|
.await
|
||||||
.map_err(Error::new)
|
.map_err(Error::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_user_vms(&self, id: u64) -> Result<Vec<Vm>> {
|
async fn list_user_vms(&self, id: u64) -> Result<Vec<Vm>> {
|
||||||
sqlx::query_as("select * from vm where user_id = ?")
|
sqlx::query_as("select * from vm where user_id = ? and deleted = 0")
|
||||||
.bind(id)
|
.bind(id)
|
||||||
.fetch_all(&self.db)
|
.fetch_all(&self.db)
|
||||||
.await
|
.await
|
||||||
@ -241,7 +241,7 @@ impl LNVpsDb for LNVpsDbMysql {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_vm(&self, vm_id: u64) -> Result<()> {
|
async fn delete_vm(&self, vm_id: u64) -> Result<()> {
|
||||||
sqlx::query("delete from vm where id = ?")
|
sqlx::query("update vm set deleted = 1 where id = ?")
|
||||||
.bind(vm_id)
|
.bind(vm_id)
|
||||||
.execute(&self.db)
|
.execute(&self.db)
|
||||||
.await
|
.await
|
||||||
@ -263,7 +263,7 @@ impl LNVpsDb for LNVpsDbMysql {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn list_vm_ip_assignments(&self, vm_id: u64) -> Result<Vec<VmIpAssignment>> {
|
async fn list_vm_ip_assignments(&self, vm_id: u64) -> Result<Vec<VmIpAssignment>> {
|
||||||
sqlx::query_as("select * from vm_ip_assignment where vm_id = ?")
|
sqlx::query_as("select * from vm_ip_assignment where vm_id = ? and deleted = 0")
|
||||||
.bind(vm_id)
|
.bind(vm_id)
|
||||||
.fetch_all(&self.db)
|
.fetch_all(&self.db)
|
||||||
.await
|
.await
|
||||||
@ -271,13 +271,21 @@ impl LNVpsDb for LNVpsDbMysql {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn list_vm_ip_assignments_in_range(&self, range_id: u64) -> Result<Vec<VmIpAssignment>> {
|
async fn list_vm_ip_assignments_in_range(&self, range_id: u64) -> Result<Vec<VmIpAssignment>> {
|
||||||
sqlx::query_as("select * from vm_ip_assignment where ip_range_id = ?")
|
sqlx::query_as("select * from vm_ip_assignment where ip_range_id = ? and deleted = 0")
|
||||||
.bind(range_id)
|
.bind(range_id)
|
||||||
.fetch_all(&self.db)
|
.fetch_all(&self.db)
|
||||||
.await
|
.await
|
||||||
.map_err(Error::new)
|
.map_err(Error::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn delete_vm_ip_assignment(&self, vm_id: u64) -> Result<()> {
|
||||||
|
sqlx::query("update vm_ip_assignment set deleted = 1 where vm_id = ?")
|
||||||
|
.bind(&vm_id)
|
||||||
|
.execute(&self.db)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn list_vm_payment(&self, vm_id: u64) -> Result<Vec<VmPayment>> {
|
async fn list_vm_payment(&self, vm_id: u64) -> Result<Vec<VmPayment>> {
|
||||||
sqlx::query_as("select * from vm_payment where vm_id = ?")
|
sqlx::query_as("select * from vm_payment where vm_id = ?")
|
||||||
.bind(vm_id)
|
.bind(vm_id)
|
||||||
|
@ -577,4 +577,7 @@ pub struct VmConfig {
|
|||||||
pub efi_disk_0: Option<String>,
|
pub efi_disk_0: Option<String>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub kvm: Option<bool>,
|
pub kvm: Option<bool>,
|
||||||
|
#[serde(rename = "serial0")]
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub serial_0: Option<String>
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use base64::prelude::BASE64_STANDARD;
|
use base64::prelude::BASE64_STANDARD;
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use log::info;
|
use log::{debug, info};
|
||||||
use nostr::{Event, JsonUtil, Kind, Timestamp};
|
use nostr::{Event, JsonUtil, Kind, Timestamp};
|
||||||
use rocket::http::uri::{Absolute, Uri};
|
use rocket::http::uri::{Absolute, Uri};
|
||||||
use rocket::http::Status;
|
use rocket::http::Status;
|
||||||
@ -80,7 +80,7 @@ impl<'r> FromRequest<'r> for Nip98Auth {
|
|||||||
return Outcome::Error((Status::new(401), "Event signature invalid"));
|
return Outcome::Error((Status::new(401), "Event signature invalid"));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("{}", event.as_json());
|
debug!("{}", event.as_json());
|
||||||
Outcome::Success(Nip98Auth { event })
|
Outcome::Success(Nip98Auth { event })
|
||||||
} else {
|
} else {
|
||||||
Outcome::Error((Status::new(403), "Auth scheme must be Nostr"))
|
Outcome::Error((Status::new(403), "Auth scheme must be Nostr"))
|
||||||
|
@ -356,6 +356,7 @@ impl Provisioner for LNVpsProvisioner {
|
|||||||
scsi_hw: Some("virtio-scsi-pci".to_string()),
|
scsi_hw: Some("virtio-scsi-pci".to_string()),
|
||||||
ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()),
|
ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()),
|
||||||
efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)),
|
efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)),
|
||||||
|
serial_0: Some("socket".to_string()),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -445,9 +446,12 @@ impl Provisioner for LNVpsProvisioner {
|
|||||||
let host = self.db.get_host(vm.host_id).await?;
|
let host = self.db.get_host(vm.host_id).await?;
|
||||||
|
|
||||||
let client = get_host_client(&host)?;
|
let client = get_host_client(&host)?;
|
||||||
let j_start = client.delete_vm(&host.name, vm.id + 100).await?;
|
// TODO: delete not implemented, stop only
|
||||||
client.wait_for_task(&j_start).await?;
|
//let j_start = client.delete_vm(&host.name, vm.id + 100).await?;
|
||||||
|
let j_stop = client.stop_vm(&host.name, vm.id + 100).await?;
|
||||||
|
client.wait_for_task(&j_stop).await?;
|
||||||
|
|
||||||
|
self.db.delete_vm_ip_assignment(vm.id).await?;
|
||||||
self.db.delete_vm(vm.id).await?;
|
self.db.delete_vm(vm.id).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -4,7 +4,7 @@ use crate::provisioner::Provisioner;
|
|||||||
use crate::settings::{Settings, SmtpConfig};
|
use crate::settings::{Settings, SmtpConfig};
|
||||||
use crate::status::{VmRunningState, VmState, VmStateCache};
|
use crate::status::{VmRunningState, VmState, VmStateCache};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{Days, Utc};
|
use chrono::{DateTime, Days, Utc};
|
||||||
use lettre::message::MessageBuilder;
|
use lettre::message::MessageBuilder;
|
||||||
use lettre::transport::smtp::authentication::Credentials;
|
use lettre::transport::smtp::authentication::Credentials;
|
||||||
use lettre::transport::smtp::SmtpTransportBuilder;
|
use lettre::transport::smtp::SmtpTransportBuilder;
|
||||||
@ -16,6 +16,7 @@ use rocket::futures::SinkExt;
|
|||||||
use std::ops::Add;
|
use std::ops::Add;
|
||||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum WorkJob {
|
pub enum WorkJob {
|
||||||
/// Check all running VMS
|
/// Check all running VMS
|
||||||
CheckVms,
|
CheckVms,
|
||||||
@ -38,6 +39,7 @@ pub struct Worker {
|
|||||||
vm_state_cache: VmStateCache,
|
vm_state_cache: VmStateCache,
|
||||||
tx: UnboundedSender<WorkJob>,
|
tx: UnboundedSender<WorkJob>,
|
||||||
rx: UnboundedReceiver<WorkJob>,
|
rx: UnboundedReceiver<WorkJob>,
|
||||||
|
last_check_vms: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WorkerSettings {
|
pub struct WorkerSettings {
|
||||||
@ -69,6 +71,7 @@ impl Worker {
|
|||||||
settings: settings.into(),
|
settings: settings.into(),
|
||||||
tx,
|
tx,
|
||||||
rx,
|
rx,
|
||||||
|
last_check_vms: Utc::now().timestamp() as u64,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,15 +103,26 @@ impl Worker {
|
|||||||
if db_vm.expires < Utc::now() && s.status == VmStatus::Running {
|
if db_vm.expires < Utc::now() && s.status == VmStatus::Running {
|
||||||
info!("Stopping expired VM {}", db_vm.id);
|
info!("Stopping expired VM {}", db_vm.id);
|
||||||
self.provisioner.stop_vm(db_vm.id).await?;
|
self.provisioner.stop_vm(db_vm.id).await?;
|
||||||
|
self.tx.send(WorkJob::SendNotification {
|
||||||
|
user_id: db_vm.user_id,
|
||||||
|
title: Some(format!("[VM{}] Expired", db_vm.id)),
|
||||||
|
message: format!("Your VM #{} has expired and is now stopped, please renew in the next {} days or your VM will be deleted.", db_vm.id, self.settings.delete_after)
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
// Delete VM if expired > 3 days
|
// Delete VM if expired > self.settings.delete_after days
|
||||||
if db_vm
|
if db_vm
|
||||||
.expires
|
.expires
|
||||||
.add(Days::new(self.settings.delete_after as u64))
|
.add(Days::new(self.settings.delete_after as u64))
|
||||||
< Utc::now()
|
< Utc::now()
|
||||||
|
&& !db_vm.deleted
|
||||||
{
|
{
|
||||||
info!("Deleting expired VM {}", db_vm.id);
|
info!("Deleting expired VM {}", db_vm.id);
|
||||||
self.provisioner.delete_vm(db_vm.id).await?;
|
self.provisioner.delete_vm(db_vm.id).await?;
|
||||||
|
self.tx.send(WorkJob::SendNotification {
|
||||||
|
user_id: db_vm.user_id,
|
||||||
|
title: Some(format!("[VM{}] Deleted", db_vm.id)),
|
||||||
|
message: format!("Your VM #{} has been deleted!", db_vm.id),
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,6 +141,22 @@ impl Worker {
|
|||||||
Err(_) => {
|
Err(_) => {
|
||||||
if vm.expires > Utc::now() {
|
if vm.expires > Utc::now() {
|
||||||
self.provisioner.spawn_vm(vm.id).await?;
|
self.provisioner.spawn_vm(vm.id).await?;
|
||||||
|
let vm_ips = self.db.list_vm_ip_assignments(vm.id).await?;
|
||||||
|
let image = self.db.get_os_image(vm.image_id).await?;
|
||||||
|
self.tx.send(WorkJob::SendNotification {
|
||||||
|
user_id: vm.user_id,
|
||||||
|
title: Some(format!("[VM{}] Created", vm.id)),
|
||||||
|
message: format!(
|
||||||
|
"Your VM #{} been created!\nOS: {}\nIPs: {}",
|
||||||
|
vm.id,
|
||||||
|
image,
|
||||||
|
vm_ips
|
||||||
|
.iter()
|
||||||
|
.map(|i| i.to_string())
|
||||||
|
.collect::<Vec<String>>()
|
||||||
|
.join(", ")
|
||||||
|
),
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -141,9 +171,14 @@ impl Worker {
|
|||||||
for node in client.list_nodes().await? {
|
for node in client.list_nodes().await? {
|
||||||
info!("Checking vms for {}", node.name);
|
info!("Checking vms for {}", node.name);
|
||||||
for vm in client.list_vms(&node.name).await? {
|
for vm in client.list_vms(&node.name).await? {
|
||||||
info!("\t{}: {:?}", vm.vm_id, vm.status);
|
let vm_id = vm.vm_id;
|
||||||
|
info!("\t{}: {:?}", vm_id, vm.status);
|
||||||
if let Err(e) = self.handle_vm_info(vm).await {
|
if let Err(e) = self.handle_vm_info(vm).await {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
|
self.queue_admin_notification(
|
||||||
|
format!("Failed to check VM {}:\n{}", vm_id, e.to_string()),
|
||||||
|
Some("Job Failed".to_string()),
|
||||||
|
)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -206,12 +241,42 @@ impl Worker {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn queue_notification(
|
||||||
|
&self,
|
||||||
|
user_id: u64,
|
||||||
|
message: String,
|
||||||
|
title: Option<String>,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.tx.send(WorkJob::SendNotification {
|
||||||
|
user_id,
|
||||||
|
message,
|
||||||
|
title,
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queue_admin_notification(&self, message: String, title: Option<String>) -> Result<()> {
|
||||||
|
if let Some(a) = self.settings.smtp.as_ref().and_then(|s| s.admin) {
|
||||||
|
self.queue_notification(a, message, title)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn handle(&mut self) -> Result<()> {
|
pub async fn handle(&mut self) -> Result<()> {
|
||||||
while let Some(job) = self.rx.recv().await {
|
while let Some(ref job) = self.rx.recv().await {
|
||||||
match job {
|
match job {
|
||||||
WorkJob::CheckVm { vm_id } => {
|
WorkJob::CheckVm { vm_id } => {
|
||||||
if let Err(e) = self.check_vm(vm_id).await {
|
if let Err(e) = self.check_vm(*vm_id).await {
|
||||||
error!("Failed to check VM {}: {}", vm_id, e);
|
error!("Failed to check VM {}: {}", vm_id, e);
|
||||||
|
self.queue_admin_notification(
|
||||||
|
format!(
|
||||||
|
"Failed to check VM {}:\n{:?}\n{}",
|
||||||
|
vm_id,
|
||||||
|
&job,
|
||||||
|
e.to_string()
|
||||||
|
),
|
||||||
|
Some("Job Failed".to_string()),
|
||||||
|
)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
WorkJob::SendNotification {
|
WorkJob::SendNotification {
|
||||||
@ -219,13 +284,28 @@ impl Worker {
|
|||||||
message,
|
message,
|
||||||
title,
|
title,
|
||||||
} => {
|
} => {
|
||||||
if let Err(e) = self.send_notification(user_id, message, title).await {
|
if let Err(e) = self
|
||||||
|
.send_notification(*user_id, message.clone(), title.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
error!("Failed to send notification {}: {}", user_id, e);
|
error!("Failed to send notification {}: {}", user_id, e);
|
||||||
|
self.queue_admin_notification(
|
||||||
|
format!(
|
||||||
|
"Failed to send notification:\n{:?}\n{}",
|
||||||
|
&job,
|
||||||
|
e.to_string()
|
||||||
|
),
|
||||||
|
Some("Job Failed".to_string()),
|
||||||
|
)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
WorkJob::CheckVms => {
|
WorkJob::CheckVms => {
|
||||||
if let Err(e) = self.check_vms().await {
|
if let Err(e) = self.check_vms().await {
|
||||||
error!("Failed to check VMs: {}", e);
|
error!("Failed to check VMs: {}", e);
|
||||||
|
self.queue_admin_notification(
|
||||||
|
format!("Failed to check VM's:\n{:?}\n{}", &job, e.to_string()),
|
||||||
|
Some("Job Failed".to_string()),
|
||||||
|
)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user