closes #23
This commit is contained in:
@ -62,6 +62,9 @@ pub trait LNVpsDb: Sync + Send {
|
||||
/// Get a specific host disk
|
||||
async fn get_host_disk(&self, disk_id: u64) -> Result<VmHostDisk>;
|
||||
|
||||
/// Update a host disk
|
||||
async fn update_host_disk(&self, disk: &VmHostDisk) -> Result<()>;
|
||||
|
||||
/// Get OS image by id
|
||||
async fn get_os_image(&self, id: u64) -> Result<VmOsImage>;
|
||||
|
||||
|
@ -121,6 +121,15 @@ impl FromStr for DiskType {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DiskType {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
DiskType::HDD => write!(f, "hdd"),
|
||||
DiskType::SSD => write!(f, "ssd"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)]
|
||||
#[repr(u16)]
|
||||
pub enum DiskInterface {
|
||||
@ -143,6 +152,16 @@ impl FromStr for DiskInterface {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DiskInterface {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
DiskInterface::SATA => write!(f, "sata"),
|
||||
DiskInterface::SCSI => write!(f, "scsi"),
|
||||
DiskInterface::PCIe => write!(f, "pcie"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)]
|
||||
#[repr(u16)]
|
||||
pub enum OsDistribution {
|
||||
|
@ -62,13 +62,13 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
sqlx::query(
|
||||
"update users set email=?, contact_nip17=?, contact_email=?, country_code=? where id = ?",
|
||||
)
|
||||
.bind(&user.email)
|
||||
.bind(user.contact_nip17)
|
||||
.bind(user.contact_email)
|
||||
.bind(&user.email)
|
||||
.bind(user.contact_nip17)
|
||||
.bind(user.contact_email)
|
||||
.bind(&user.country_code)
|
||||
.bind(user.id)
|
||||
.execute(&self.db)
|
||||
.await?;
|
||||
.bind(user.id)
|
||||
.execute(&self.db)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -80,13 +80,13 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
Ok(sqlx::query(
|
||||
"insert into user_ssh_key(name,user_id,key_data) values(?, ?, ?) returning id",
|
||||
)
|
||||
.bind(&new_key.name)
|
||||
.bind(new_key.user_id)
|
||||
.bind(&new_key.key_data)
|
||||
.fetch_one(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)?
|
||||
.try_get(0)?)
|
||||
.bind(&new_key.name)
|
||||
.bind(new_key.user_id)
|
||||
.bind(&new_key.key_data)
|
||||
.fetch_one(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)?
|
||||
.try_get(0)?)
|
||||
}
|
||||
|
||||
async fn get_user_ssh_key(&self, id: u64) -> Result<UserSshKey> {
|
||||
@ -174,6 +174,18 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
async fn update_host_disk(&self, disk: &VmHostDisk) -> Result<()> {
|
||||
sqlx::query("update vm_host_disk set size=?,kind=?,interface=? where id=?")
|
||||
.bind(disk.size)
|
||||
.bind(disk.kind)
|
||||
.bind(disk.interface)
|
||||
.bind(disk.id)
|
||||
.execute(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_os_image(&self, id: u64) -> Result<VmOsImage> {
|
||||
sqlx::query_as("select * from vm_os_image where id=?")
|
||||
.bind(id)
|
||||
@ -324,16 +336,16 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
sqlx::query(
|
||||
"update vm set image_id=?,template_id=?,ssh_key_id=?,expires=?,disk_id=?,mac_address=? where id=?",
|
||||
)
|
||||
.bind(vm.image_id)
|
||||
.bind(vm.template_id)
|
||||
.bind(vm.ssh_key_id)
|
||||
.bind(vm.expires)
|
||||
.bind(vm.disk_id)
|
||||
.bind(&vm.mac_address)
|
||||
.bind(vm.id)
|
||||
.execute(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)?;
|
||||
.bind(vm.image_id)
|
||||
.bind(vm.template_id)
|
||||
.bind(vm.ssh_key_id)
|
||||
.bind(vm.expires)
|
||||
.bind(vm.disk_id)
|
||||
.bind(&vm.mac_address)
|
||||
.bind(vm.id)
|
||||
.execute(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -341,18 +353,18 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
Ok(sqlx::query(
|
||||
"insert into vm_ip_assignment(vm_id,ip_range_id,ip,arp_ref,dns_forward,dns_forward_ref,dns_reverse,dns_reverse_ref) values(?,?,?,?,?,?,?,?) returning id",
|
||||
)
|
||||
.bind(ip_assignment.vm_id)
|
||||
.bind(ip_assignment.ip_range_id)
|
||||
.bind(&ip_assignment.ip)
|
||||
.bind(&ip_assignment.arp_ref)
|
||||
.bind(&ip_assignment.dns_forward)
|
||||
.bind(&ip_assignment.dns_forward_ref)
|
||||
.bind(&ip_assignment.dns_reverse)
|
||||
.bind(&ip_assignment.dns_reverse_ref)
|
||||
.fetch_one(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)?
|
||||
.try_get(0)?)
|
||||
.bind(ip_assignment.vm_id)
|
||||
.bind(ip_assignment.ip_range_id)
|
||||
.bind(&ip_assignment.ip)
|
||||
.bind(&ip_assignment.arp_ref)
|
||||
.bind(&ip_assignment.dns_forward)
|
||||
.bind(&ip_assignment.dns_forward_ref)
|
||||
.bind(&ip_assignment.dns_reverse)
|
||||
.bind(&ip_assignment.dns_reverse_ref)
|
||||
.fetch_one(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)?
|
||||
.try_get(0)?)
|
||||
}
|
||||
|
||||
async fn update_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result<()> {
|
||||
@ -477,9 +489,9 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
sqlx::query_as(
|
||||
"select * from vm_payment where is_paid = true order by created desc limit 1",
|
||||
)
|
||||
.fetch_optional(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)
|
||||
.fetch_optional(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
async fn list_custom_pricing(&self, region_id: u64) -> Result<Vec<VmCustomPricing>> {
|
||||
|
@ -158,6 +158,9 @@ async fn main() -> Result<(), Error> {
|
||||
start_dvms(nostr_client.clone(), provisioner.clone());
|
||||
}
|
||||
|
||||
// request for host info to be patched
|
||||
sender.send(WorkJob::PatchHosts)?;
|
||||
|
||||
let mut config = rocket::Config::default();
|
||||
let ip: SocketAddr = match &settings.listen {
|
||||
Some(i) => i.parse()?,
|
||||
|
@ -25,6 +25,8 @@ pub struct TerminalStream {
|
||||
/// Generic type for creating VM's
|
||||
#[async_trait]
|
||||
pub trait VmHostClient: Send + Sync {
|
||||
async fn get_info(&self) -> Result<VmHostInfo>;
|
||||
|
||||
/// Download OS image to the host
|
||||
async fn download_os_image(&self, image: &VmOsImage) -> Result<()>;
|
||||
|
||||
@ -202,3 +204,17 @@ pub enum TimeSeries {
|
||||
Monthly,
|
||||
Yearly,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VmHostInfo {
|
||||
pub cpu: u16,
|
||||
pub memory: u64,
|
||||
pub disks: Vec<VmHostDiskInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VmHostDiskInfo {
|
||||
pub name: String,
|
||||
pub size: u64,
|
||||
pub used: u64,
|
||||
}
|
||||
|
@ -1,4 +1,7 @@
|
||||
use crate::host::{FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient};
|
||||
use crate::host::{
|
||||
FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient, VmHostDiskInfo,
|
||||
VmHostInfo,
|
||||
};
|
||||
use crate::json_api::JsonApi;
|
||||
use crate::settings::{QemuConfig, SshConfig};
|
||||
use crate::ssh_client::SshClient;
|
||||
@ -86,6 +89,14 @@ impl ProxmoxClient {
|
||||
Ok(rsp.data)
|
||||
}
|
||||
|
||||
pub async fn list_disks(&self, node: &str) -> Result<Vec<NodeDisk>> {
|
||||
let rsp: ResponseBase<Vec<NodeDisk>> = self
|
||||
.api
|
||||
.get(&format!("/api2/json/nodes/{node}/disks/list"))
|
||||
.await?;
|
||||
Ok(rsp.data)
|
||||
}
|
||||
|
||||
/// List files in a storage pool
|
||||
pub async fn list_storage_files(
|
||||
&self,
|
||||
@ -477,6 +488,29 @@ impl ProxmoxClient {
|
||||
|
||||
#[async_trait]
|
||||
impl VmHostClient for ProxmoxClient {
|
||||
async fn get_info(&self) -> Result<VmHostInfo> {
|
||||
let nodes = self.list_nodes().await?;
|
||||
if let Some(n) = nodes.iter().find(|n| n.name == self.node) {
|
||||
let storages = self.list_storage(&n.name).await?;
|
||||
let info = VmHostInfo {
|
||||
cpu: n.max_cpu.unwrap_or(0),
|
||||
memory: n.max_mem.unwrap_or(0),
|
||||
disks: storages
|
||||
.into_iter()
|
||||
.map(|s| VmHostDiskInfo {
|
||||
name: s.storage,
|
||||
size: s.total.unwrap_or(0),
|
||||
used: s.used.unwrap_or(0),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Ok(info)
|
||||
} else {
|
||||
bail!("Could not find node {}", self.node);
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_os_image(&self, image: &VmOsImage) -> Result<()> {
|
||||
let iso_storage = self.get_iso_storage(&self.node).await?;
|
||||
let files = self.list_storage_files(&self.node, &iso_storage).await?;
|
||||
@ -878,9 +912,14 @@ pub struct NodeStorage {
|
||||
pub content: String,
|
||||
pub storage: String,
|
||||
#[serde(rename = "type")]
|
||||
pub kind: Option<StorageType>,
|
||||
#[serde(rename = "thinpool")]
|
||||
pub thin_pool: Option<String>,
|
||||
pub kind: StorageType,
|
||||
/// Available storage space in bytes
|
||||
#[serde(rename = "avial")]
|
||||
pub available: Option<u64>,
|
||||
/// Total storage space in bytes
|
||||
pub total: Option<u64>,
|
||||
/// Used storage space in bytes
|
||||
pub used: Option<u64>,
|
||||
}
|
||||
|
||||
impl NodeStorage {
|
||||
@ -891,6 +930,11 @@ impl NodeStorage {
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct NodeDisk {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct DownloadUrlRequest {
|
||||
pub content: StorageContent,
|
||||
|
116
src/worker.rs
116
src/worker.rs
@ -18,6 +18,8 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum WorkJob {
|
||||
/// Sync resources from hosts to database
|
||||
PatchHosts,
|
||||
/// Check all running VMS
|
||||
CheckVms,
|
||||
/// Check the VM status matches database state
|
||||
@ -278,46 +280,92 @@ impl Worker {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle(&mut self) -> Result<()> {
|
||||
while let Some(job) = self.rx.recv().await {
|
||||
match &job {
|
||||
WorkJob::CheckVm { vm_id } => {
|
||||
let vm = self.db.get_vm(*vm_id).await?;
|
||||
if let Err(e) = self.check_vm(&vm).await {
|
||||
error!("Failed to check VM {}: {}", vm_id, e);
|
||||
self.queue_admin_notification(
|
||||
format!("Failed to check VM {}:\n{:?}\n{}", vm_id, &job, e),
|
||||
Some("Job Failed".to_string()),
|
||||
)?
|
||||
async fn try_job(&mut self, job: &WorkJob) -> Result<()> {
|
||||
match job {
|
||||
WorkJob::PatchHosts => {
|
||||
let mut hosts = self.db.list_hosts().await?;
|
||||
for mut host in &mut hosts {
|
||||
let client = match get_host_client(host, &self.settings.provisioner_config) {
|
||||
Ok(h) => h,
|
||||
Err(e) => {
|
||||
warn!("Failed to get host client: {} {}", host.name, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let info = client.get_info().await?;
|
||||
let needs_update = info.cpu != host.cpu || info.memory != host.memory;
|
||||
if needs_update {
|
||||
host.cpu = info.cpu;
|
||||
host.memory = info.memory;
|
||||
self.db.update_host(host).await?;
|
||||
info!(
|
||||
"Updated host {}: cpu={}, memory={}",
|
||||
host.name, host.cpu, host.memory
|
||||
);
|
||||
}
|
||||
|
||||
let mut host_disks = self.db.list_host_disks(host.id).await?;
|
||||
for disk in &info.disks {
|
||||
if let Some(mut hd) = host_disks.iter_mut().find(|d| d.name == disk.name) {
|
||||
if hd.size != disk.size {
|
||||
hd.size = disk.size;
|
||||
self.db.update_host_disk(hd).await?;
|
||||
info!(
|
||||
"Updated host disk {}: size={},type={},interface={}",
|
||||
hd.name, hd.size, hd.kind, hd.interface
|
||||
);
|
||||
}
|
||||
} else {
|
||||
warn!("Un-mapped host disk {}", disk.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
WorkJob::SendNotification {
|
||||
user_id,
|
||||
message,
|
||||
title,
|
||||
} => {
|
||||
if let Err(e) = self
|
||||
.send_notification(*user_id, message.clone(), title.clone())
|
||||
.await
|
||||
{
|
||||
error!("Failed to send notification {}: {}", user_id, e);
|
||||
self.queue_admin_notification(
|
||||
format!("Failed to send notification:\n{:?}\n{}", &job, e),
|
||||
Some("Job Failed".to_string()),
|
||||
)?
|
||||
}
|
||||
}
|
||||
WorkJob::CheckVm { vm_id } => {
|
||||
let vm = self.db.get_vm(*vm_id).await?;
|
||||
if let Err(e) = self.check_vm(&vm).await {
|
||||
error!("Failed to check VM {}: {}", vm_id, e);
|
||||
self.queue_admin_notification(
|
||||
format!("Failed to check VM {}:\n{:?}\n{}", vm_id, &job, e),
|
||||
Some("Job Failed".to_string()),
|
||||
)?
|
||||
}
|
||||
WorkJob::CheckVms => {
|
||||
if let Err(e) = self.check_vms().await {
|
||||
error!("Failed to check VMs: {}", e);
|
||||
self.queue_admin_notification(
|
||||
format!("Failed to check VM's:\n{:?}\n{}", &job, e),
|
||||
Some("Job Failed".to_string()),
|
||||
)?
|
||||
}
|
||||
}
|
||||
WorkJob::SendNotification {
|
||||
user_id,
|
||||
message,
|
||||
title,
|
||||
} => {
|
||||
if let Err(e) = self
|
||||
.send_notification(*user_id, message.clone(), title.clone())
|
||||
.await
|
||||
{
|
||||
error!("Failed to send notification {}: {}", user_id, e);
|
||||
self.queue_admin_notification(
|
||||
format!("Failed to send notification:\n{:?}\n{}", &job, e),
|
||||
Some("Job Failed".to_string()),
|
||||
)?
|
||||
}
|
||||
}
|
||||
WorkJob::CheckVms => {
|
||||
if let Err(e) = self.check_vms().await {
|
||||
error!("Failed to check VMs: {}", e);
|
||||
self.queue_admin_notification(
|
||||
format!("Failed to check VM's:\n{:?}\n{}", &job, e),
|
||||
Some("Job Failed".to_string()),
|
||||
)?
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle(&mut self) -> Result<()> {
|
||||
while let Some(job) = self.rx.recv().await {
|
||||
if let Err(e) = self.try_job(&job).await {
|
||||
error!("Job failed to execute: {:?} {}", job, e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user