Compare commits

..

10 Commits

Author SHA1 Message Date
06f7beb26f
fix: remove stop command on delete 2025-01-06 15:42:26 +00:00
de55d34086
feat: delete unpaid vms 2025-01-06 15:35:59 +00:00
265d91dd83
feat: patch vm 2024-12-29 19:16:02 +00:00
e51bd2722e
chore: log cleanup 2024-12-21 18:28:43 +00:00
7bfeba0ad1
feat: expire soon notification 2024-12-21 18:14:31 +00:00
3e7e0a789b
feat: terminal proxy 2024-12-21 18:01:41 +00:00
8641eeeca8
feat: readme 2024-12-06 10:00:22 +00:00
7270ecf6ba
feat: NIP17 dms 2024-12-05 16:20:53 +00:00
0f9b439b78
feat: email template 2024-12-05 15:01:41 +00:00
81b233a047
feat: router arp entry 2024-12-05 14:13:39 +00:00
17 changed files with 1169 additions and 195 deletions

243
Cargo.lock generated
View File

@ -169,6 +169,37 @@ dependencies = [
"syn",
]
[[package]]
name = "async-utility"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a349201d80b4aa18d17a34a182bdd7f8ddf845e9e57d2ea130a12e10ef1e3a47"
dependencies = [
"futures-util",
"gloo-timers",
"tokio",
"wasm-bindgen-futures",
]
[[package]]
name = "async-wsocket"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d50cb541e6d09e119e717c64c46ed33f49be7fa592fa805d56c11d6a7ff093c"
dependencies = [
"async-utility",
"futures",
"futures-util",
"js-sys",
"tokio",
"tokio-rustls 0.26.0",
"tokio-socks",
"tokio-tungstenite 0.24.0",
"url",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "atoi"
version = "2.0.0"
@ -193,6 +224,15 @@ dependencies = [
"bytemuck",
]
[[package]]
name = "atomic-destructor"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d919cb60ba95c87ba42777e9e246c4e8d658057299b437b7512531ce0a09a23"
dependencies = [
"tracing",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -324,9 +364,9 @@ dependencies = [
[[package]]
name = "bitcoin"
version = "0.32.4"
version = "0.32.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "788902099d47c8682efe6a7afb01c8d58b9794ba66c06affd81c3d6b560743eb"
checksum = "ce6bc65742dea50536e35ad42492b234c27904a27f0abdcbce605015cb4ea026"
dependencies = [
"base58ck",
"bech32",
@ -772,6 +812,12 @@ dependencies = [
"syn",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
[[package]]
name = "der"
version = "0.7.9"
@ -1055,6 +1101,12 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
[[package]]
name = "foreign-types"
version = "0.3.2"
@ -1215,6 +1267,18 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gloo-timers"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "group"
version = "0.13.0"
@ -1285,6 +1349,11 @@ name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
]
[[package]]
name = "hashlink"
@ -1980,16 +2049,20 @@ dependencies = [
"lettre",
"lnvps_db",
"log",
"native-tls",
"nostr",
"nostr-sdk",
"pretty_env_logger",
"rand",
"reqwest",
"rocket",
"rocket_ws",
"serde",
"serde_json",
"ssh-key",
"ssh2",
"tokio",
"tokio-tungstenite 0.21.0",
"urlencoding",
]
@ -2037,6 +2110,15 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "lru"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
dependencies = [
"hashbrown 0.15.2",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -2191,6 +2273,55 @@ dependencies = [
"url",
]
[[package]]
name = "nostr-database"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23696338d51e45cd44e061823847f4b0d1d362eca80d5033facf9c184149f72f"
dependencies = [
"async-trait",
"lru",
"nostr",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-relay-pool"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15fcc6e3f0ca54d0fc779009bc5f2684cea9147be3b6aa68a7d301ea590f95f5"
dependencies = [
"async-utility",
"async-wsocket",
"atomic-destructor",
"negentropy 0.3.1",
"negentropy 0.4.3",
"nostr",
"nostr-database",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "nostr-sdk"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "491221fc89b1aa189a0de640127127d68b4e7c5c1d44371b04d9a6d10694b5af"
dependencies = [
"async-utility",
"atomic-destructor",
"nostr",
"nostr-database",
"nostr-relay-pool",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -3038,6 +3169,16 @@ dependencies = [
"uncased",
]
[[package]]
name = "rocket_ws"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25f1877668c937b701177c349f21383c556cd3bb4ba8fa1d07fa96ccb3a8782e"
dependencies = [
"rocket",
"tokio-tungstenite 0.21.0",
]
[[package]]
name = "ron"
version = "0.8.1"
@ -3119,6 +3260,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1"
dependencies = [
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.8",
"subtle",
@ -4044,6 +4186,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-socks"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4770b8024672c1101b3f6733eab95b18007dbe0847a8afe341fcf79e06043f"
dependencies = [
"either",
"futures-util",
"thiserror",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.16"
@ -4055,6 +4209,36 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite 0.21.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
dependencies = [
"futures-util",
"log",
"rustls 0.23.19",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.0",
"tungstenite 0.24.0",
"webpki-roots",
]
[[package]]
name = "tokio-util"
version = "0.7.12"
@ -4245,6 +4429,46 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"native-tls",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"rand",
"rustls 0.23.19",
"rustls-pki-types",
"sha1",
"thiserror",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -4355,6 +4579,12 @@ version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf16_iter"
version = "1.0.5"
@ -4489,6 +4719,15 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.26.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "whoami"
version = "1.5.2"

View File

@ -6,6 +6,10 @@ edition = "2021"
[[bin]]
name = "api"
[features]
default = ["mikrotik", "nostr-dm"]
mikrotik = []
nostr-dm = ["dep:nostr-sdk"]
[dependencies]
lnvps_db = { path = "lnvps_db" }
@ -20,7 +24,7 @@ serde_json = "1.0.132"
rocket = { version = "0.5.1", features = ["json"] }
chrono = { version = "0.4.38", features = ["serde"] }
nostr = { version = "0.37.0", default-features = false, features = ["std"] }
base64 = "0.22.1"
base64 = { version = "0.22.1", features = ["alloc"] }
urlencoding = "2.1.3"
fedimint-tonic-lnd = { version = "0.2.0", default-features = false, features = ["invoicesrpc"] }
ipnetwork = "0.20.0"
@ -29,3 +33,10 @@ clap = { version = "4.5.21", features = ["derive"] }
ssh2 = "0.9.4"
ssh-key = "0.6.7"
lettre = { version = "0.11.10", features = ["tokio1-native-tls"] }
ws = { package = "rocket_ws", version = "0.1.0" }
tokio-tungstenite = { version = "^0.21", features = ["native-tls"] }
native-tls = "0.2.12"
#nostr-dm
nostr-sdk = { version = "0.37.0", optional = true, default-features = false, features = ["nip44", "nip59"] }

94
README.md Normal file
View File

@ -0,0 +1,94 @@
## LNVPS
A bitcoin powered VPS system.
## Requirements
- MySql database
- LND node
- Proxmox server
## Required Config
```yaml
# MySql database connection string
db: "mysql://root:root@localhost:3376/lnvps"
# LND node connection details
lnd:
url: "https://127.0.0.1:10003"
cert: "$HOME/.lnd/tls.cert"
macaroon: "$HOME/.lnd/data/chain/bitcoin/mainnet/admin.macaroon"
# Number of days after a VM expires to delete
delete_after: 3
# Provisioner is the main process which handles creating/deleting VM's
# Currently supports: Proxmox
provisioner:
proxmox:
# Read-only mode prevents spawning VM's
read_only: false
# Proxmox (QEMU) settings used for spawning VM's
qemu:
bios: "ovmf"
machine: "q35"
os_type: "l26"
bridge: "vmbr0"
cpu: "kvm64"
vlan: 100
kvm: false
```
### Email notifications
Email notifications can be enabled, this is primarily intended for admin notifications.
```yaml
# (Optional)
# Email notifications settings
smtp:
# Admin user id, used to send notifications of failed jobs etc. (optional)
admin: 1
# SMTP server url
server: "smtp.gmail.com"
# From header used in the email (optional)
from: "LNVPS <no-reply@example.com>"
username: "no-reply@example.com"
password: "mypassword123"
```
### Nostr notifications (NIP-17)
```yaml
# (Optional)
# Nostr connection settings for notifications
nostr:
# Nostr relays to publish notifications to
relays:
- "wss://relay.snort.social"
- "wss://relay.damus.io"
- "wss://nos.lol"
# Private key used to sign notifications
nsec: "nsec1234xxx"
```
### Network Setup (Advanced)
When ARP is disabled (reply-only) on your router you may need to create static ARP entries when allocating
IPs, we support managing ARP entries on routers directly as part of the provisioning process.
```yaml
# (Optional)
# When allocating IPs for VM's it may be necessary to create static ARP entries on
# your router, at least one router can be configured
#
# Currently supports: Mikrotik
router:
mikrotik:
# !! MAKE SURE TO USE HTTPS !!
url: "https://my-router.net"
username: "admin"
password: "admin"
# Interface where the static ARP entry is added
arp_interface: "bridge1"
```

54
email.html Normal file
View File

@ -0,0 +1,54 @@
<!doctype html>
<html lang="en">
<head>
<title>LNVPS</title>
<meta charset="UTF-8"/>
<link rel="preconnect" href="https://fonts.googleapis.com"/>
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin/>
<link
href="https://fonts.googleapis.com/css2?family=Source+Code+Pro:ital,wght@0,200..900;1,200..900&display=swap"
rel="stylesheet"
/>
<style>
html, body {
margin: 0;
font-size: 12px;
font-family: "Source Code Pro", monospace;
color: white;
background-color: black;
}
.page {
margin-left: 4rem;
margin-right: 4rem;
}
.header {
display: flex;
gap: 2rem;
align-items: center;
justify-content: space-between;
font-size: 3rem;
margin: 2rem 0;
}
p {
min-height: 200px;
}
</style>
</head>
<body>
<div class="page">
<div class="header">
LNVPS
<img height="48" src="https://lnvps.net/logo.jpg" alt="logo"/>
</div>
<hr/>
<p>%%_MESSAGE_%%</p>
<hr/>
<small>
(c) 2024 LNVPS.net
</small>
</div>
</body>
</html>

View File

@ -93,6 +93,9 @@ pub trait LNVpsDb: Sync + Send {
/// Delete a VM by id
async fn delete_vm(&self, vm_id: u64) -> Result<()>;
/// Update a VM
async fn update_vm(&self, vm: &Vm) -> Result<()>;
/// List VM ip assignments
async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result<u64>;

View File

@ -31,7 +31,8 @@ impl LNVpsDb for LNVpsDbMysql {
}
async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
let res = sqlx::query("insert ignore into users(pubkey) values(?) returning id")
let res =
sqlx::query("insert ignore into users(pubkey,contact_nip17) values(?,1) returning id")
.bind(pubkey.as_slice())
.fetch_optional(&self.db)
.await?;
@ -249,6 +250,23 @@ impl LNVpsDb for LNVpsDbMysql {
Ok(())
}
async fn update_vm(&self, vm: &Vm) -> Result<()> {
sqlx::query("update vm set image_id=?,template_id=?,ssh_key_id=?,expires=?,cpu=?,memory=?,disk_size=?,disk_id=? where id=?")
.bind(vm.image_id)
.bind(vm.template_id)
.bind(vm.ssh_key_id)
.bind(vm.expires)
.bind(vm.cpu)
.bind(vm.memory)
.bind(vm.disk_size)
.bind(vm.disk_id)
.bind(vm.id)
.execute(&self.db)
.await
.map_err(Error::new)?;
Ok(())
}
async fn insert_vm_ip_assignment(&self, ip_assignment: &VmIpAssignment) -> Result<u64> {
Ok(sqlx::query(
"insert into vm_ip_assignment(vm_id,ip_range_id,ip) values(?, ?, ?) returning id",

View File

@ -2,14 +2,19 @@ use crate::nip98::Nip98Auth;
use crate::provisioner::Provisioner;
use crate::status::{VmState, VmStateCache};
use crate::worker::WorkJob;
use anyhow::{bail, Result};
use lnvps_db::hydrate::Hydrate;
use lnvps_db::{LNVpsDb, UserSshKey, Vm, VmOsImage, VmPayment, VmTemplate};
use log::{debug, error};
use nostr::util::hex;
use rocket::futures::{Sink, SinkExt, StreamExt};
use rocket::serde::json::Json;
use rocket::{get, patch, post, routes, Responder, Route, State};
use serde::{Deserialize, Serialize};
use ssh_key::PublicKey;
use std::fmt::Display;
use tokio::sync::mpsc::UnboundedSender;
use ws::Message;
pub fn routes() -> Vec<Route> {
routes![
@ -24,7 +29,9 @@ pub fn routes() -> Vec<Route> {
v1_get_payment,
v1_start_vm,
v1_stop_vm,
v1_restart_vm
v1_restart_vm,
v1_terminal_proxy,
v1_patch_vm
]
}
@ -65,6 +72,11 @@ struct ApiVmStatus {
pub status: VmState,
}
#[derive(Serialize, Deserialize)]
struct VMPatchRequest {
pub ssh_key_id: Option<u64>,
}
#[get("/api/v1/vm")]
async fn v1_list_vms(
auth: Nip98Auth,
@ -117,6 +129,35 @@ async fn v1_get_vm(
})
}
#[patch("/api/v1/vm/<id>", data = "<data>", format = "json")]
async fn v1_patch_vm(
auth: Nip98Auth,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
id: u64,
data: Json<VMPatchRequest>,
) -> ApiResult<()> {
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await?;
let mut vm = db.get_vm(id).await?;
if vm.user_id != uid {
return ApiData::err("VM doesnt belong to you");
}
if let Some(k) = data.ssh_key_id {
let ssh_key = db.get_user_ssh_key(k).await?;
if ssh_key.user_id != uid {
return ApiData::err("SSH key doesnt belong to you");
}
vm.ssh_key_id = ssh_key.id;
}
db.update_vm(&vm).await?;
provisioner.patch_vm(vm.id).await?;
ApiData::ok(())
}
#[get("/api/v1/image")]
async fn v1_list_vm_images(db: &State<Box<dyn LNVpsDb>>) -> ApiResult<Vec<VmOsImage>> {
let vms = db.list_os_image().await?;
@ -289,6 +330,112 @@ async fn v1_get_payment(
ApiData::ok(payment)
}
#[get("/api/v1/console/<id>?<auth>")]
async fn v1_terminal_proxy(
auth: &str,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
id: u64,
ws: ws::WebSocket,
) -> Result<ws::Channel<'static>, &'static str> {
let auth = Nip98Auth::from_base64(auth).map_err(|_| "Missing or invalid auth param")?;
if auth.check(&format!("/api/v1/console/{id}"), "GET").is_err() {
return Err("Invalid auth event");
}
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await.map_err(|_| "Insert failed")?;
let vm = db.get_vm(id).await.map_err(|_| "VM not found")?;
if uid != vm.user_id {
return Err("VM does not belong to you");
}
let ws_upstream = provisioner.terminal_proxy(vm.id).await.map_err(|e| {
error!("Failed to start terminal proxy: {}", e);
"Failed to open terminal proxy"
})?;
let ws = ws.config(Default::default());
Ok(ws.channel(move |stream| {
Box::pin(async move {
let (mut tx_upstream, mut rx_upstream) = ws_upstream.split();
let (mut tx_client, mut rx_client) = stream.split();
async fn process_client<S, E>(
msg: Result<Message, E>,
tx_upstream: &mut S,
) -> Result<()>
where
S: SinkExt<Message> + Unpin,
<S as Sink<Message>>::Error: Display,
E: Display,
{
match msg {
Ok(m) => {
let m_up = match m {
Message::Text(t) => Message::Text(format!("0:{}:{}", t.len(), t)),
_ => panic!("todo"),
};
debug!("Sending data to upstream: {:?}", m_up);
if let Err(e) = tx_upstream.send(m_up).await {
bail!("Failed to send msg to upstream: {}", e);
}
}
Err(e) => {
bail!("Failed to read from client: {}", e);
}
}
Ok(())
}
async fn process_upstream<S, E>(
msg: Result<Message, E>,
tx_client: &mut S,
) -> Result<()>
where
S: SinkExt<Message> + Unpin,
<S as Sink<Message>>::Error: Display,
E: Display,
{
match msg {
Ok(m) => {
let m_down = match m {
Message::Binary(data) => {
Message::Text(String::from_utf8_lossy(&data).to_string())
}
_ => panic!("todo"),
};
debug!("Sending data to downstream: {:?}", m_down);
if let Err(e) = tx_client.send(m_down).await {
bail!("Failed to msg to client: {}", e);
}
}
Err(e) => {
bail!("Failed to read from upstream: {}", e);
}
}
Ok(())
}
loop {
tokio::select! {
Some(msg) = rx_client.next() => {
if let Err(e) = process_client(msg, &mut tx_upstream).await {
error!("{}", e);
break;
}
},
Some(msg) = rx_upstream.next() => {
if let Err(e) = process_upstream(msg, &mut tx_client).await {
error!("{}", e);
break;
}
}
}
}
Ok(())
})
}))
}
#[derive(Deserialize)]
struct CreateVmRequest {
template_id: u64,

View File

@ -12,8 +12,11 @@ use lnvps::status::VmStateCache;
use lnvps::worker::{WorkJob, Worker};
use lnvps_db::{LNVpsDb, LNVpsDbMysql};
use log::error;
use nostr::Keys;
use nostr_sdk::Client;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use tokio::time::sleep;
#[derive(Parser)]
#[clap(about, version, author)]
@ -50,14 +53,31 @@ async fn main() -> Result<(), Error> {
db.execute(setup_script).await?;
}
let nostr_client = if let Some(ref c) = settings.nostr {
let cx = Client::builder().signer(Keys::parse(&c.nsec)?).build();
for r in &c.relays {
cx.add_relay(r.clone()).await?;
}
cx.connect().await;
Some(cx)
} else {
None
};
let router = settings.router.as_ref().map(|r| r.get_router());
let status = VmStateCache::new();
let worker_provisioner =
settings
.provisioner
.get_provisioner(db.clone(), lnd.clone(), exchange.clone());
.get_provisioner(db.clone(), router, lnd.clone(), exchange.clone());
worker_provisioner.init().await?;
let mut worker = Worker::new(db.clone(), worker_provisioner, &settings, status.clone());
let mut worker = Worker::new(
db.clone(),
worker_provisioner,
&settings,
status.clone(),
nostr_client.clone(),
);
let sender = worker.sender();
// send a startup notification
@ -82,6 +102,7 @@ async fn main() -> Result<(), Error> {
if let Err(e) = handler.listen().await {
error!("invoice-error: {}", e);
}
sleep(Duration::from_secs(5)).await;
}
});
// request work every 30s to check vm status
@ -110,10 +131,11 @@ async fn main() -> Result<(), Error> {
}
});
let router = settings.router.as_ref().map(|r| r.get_router());
let provisioner =
settings
.provisioner
.get_provisioner(db.clone(), lnd.clone(), exchange.clone());
.get_provisioner(db.clone(), router, lnd.clone(), exchange.clone());
let db: Box<dyn LNVpsDb> = Box::new(db.clone());
let pv: Box<dyn Provisioner> = Box::new(provisioner);

View File

@ -6,7 +6,10 @@ use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::str::FromStr;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::sleep;
use tokio_tungstenite::tungstenite::handshake::client::{generate_key, Request};
use tokio_tungstenite::{Connector, MaybeTlsStream, WebSocketStream};
pub struct ProxmoxClient {
base: Url,
@ -253,7 +256,68 @@ impl ProxmoxClient {
})
}
/// Create terminal proxy session
pub async fn terminal_proxy(&self, node: &str, vm: u64) -> Result<TerminalProxyTicket> {
let rsp: ResponseBase<TerminalProxyTicket> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/termproxy", node, vm),
(),
)
.await?;
Ok(rsp.data)
}
/// Open websocket connection to terminal proxy
pub async fn open_terminal_proxy(
&self,
node: &str,
vm: u64,
req: TerminalProxyTicket,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
self.get_task_status(&TaskId {
id: req.upid,
node: node.to_string(),
})
.await?;
let mut url: Url = self.base.join(&format!(
"/api2/json/nodes/{}/qemu/{}/vncwebsocket",
node, vm
))?;
url.set_scheme("wss").unwrap();
url.query_pairs_mut().append_pair("port", &req.port);
url.query_pairs_mut().append_pair("vncticket", &req.ticket);
let r = Request::builder()
.method("GET")
.header("Host", url.host().unwrap().to_string())
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", generate_key())
.header("Sec-WebSocket-Protocol", "binary")
.header("Authorization", format!("PVEAPIToken={}", self.token))
.uri(url.as_str())
.body(())?;
debug!("Connecting terminal proxy: {:?}", &r);
let (ws, _rsp) = tokio_tungstenite::connect_async_tls_with_config(
r,
None,
false,
Some(Connector::NativeTls(
native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()?,
)),
)
.await?;
Ok(ws)
}
async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
debug!(">> GET {}", path);
let rsp = self
.client
.get(self.base.join(path)?)
@ -282,10 +346,10 @@ impl ProxmoxClient {
body: R,
) -> Result<T> {
let body = serde_json::to_string(&body)?;
debug!("{}", &body);
debug!(">> {} {}: {}", method.clone(), path, &body);
let rsp = self
.client
.request(method, self.base.join(path)?)
.request(method.clone(), self.base.join(path)?)
.header("Authorization", format!("PVEAPIToken={}", self.token))
.header("Content-Type", "application/json")
.header("Accept", "application/json")
@ -299,11 +363,19 @@ impl ProxmoxClient {
if status.is_success() {
Ok(serde_json::from_str(&text)?)
} else {
bail!("{}", status);
bail!("{} {}: {}", method, path, status);
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct TerminalProxyTicket {
pub port: String,
pub ticket: String,
pub upid: String,
pub user: String,
}
#[derive(Debug, Clone)]
pub struct TaskId {
pub id: String,
@ -579,5 +651,5 @@ pub struct VmConfig {
pub kvm: Option<bool>,
#[serde(rename = "serial0")]
#[serde(skip_serializing_if = "Option::is_none")]
pub serial_0: Option<String>
pub serial_0: Option<String>,
}

View File

@ -1,6 +1,7 @@
use anyhow::bail;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use log::{debug, info};
use log::debug;
use nostr::{Event, JsonUtil, Kind, Timestamp};
use rocket::http::uri::{Absolute, Uri};
use rocket::http::Status;
@ -11,37 +12,23 @@ pub struct Nip98Auth {
pub event: Event,
}
#[async_trait]
impl<'r> FromRequest<'r> for Nip98Auth {
type Error = &'static str;
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
if let Some(auth) = request.headers().get_one("authorization") {
if auth.starts_with("Nostr ") {
let event = if let Ok(j) = BASE64_STANDARD.decode(&auth[6..]) {
if let Ok(ev) = Event::from_json(j) {
ev
} else {
return Outcome::Error((Status::new(403), "Invalid nostr event"));
impl Nip98Auth {
pub fn check(&self, path: &str, method: &str) -> anyhow::Result<()> {
if self.event.kind != Kind::HttpAuth {
bail!("Wrong event kind");
}
} else {
return Outcome::Error((Status::new(403), "Invalid auth string"));
};
if event.kind != Kind::HttpAuth {
return Outcome::Error((Status::new(401), "Wrong event kind"));
}
if event
if self
.event
.created_at
.as_u64()
.abs_diff(Timestamp::now().as_u64())
> 600
{
return Outcome::Error((Status::new(401), "Created timestamp is out of range"));
bail!("Created timestamp is out of range");
}
// check url tag
if let Some(url) = event.tags.iter().find_map(|t| {
if let Some(url) = self.event.tags.iter().find_map(|t| {
let vec = t.as_slice();
if vec[0] == "u" {
Some(vec[1].clone())
@ -50,18 +37,18 @@ impl<'r> FromRequest<'r> for Nip98Auth {
}
}) {
if let Ok(u_req) = Uri::parse::<Absolute>(&url) {
if request.uri().path() != u_req.absolute().unwrap().path() {
return Outcome::Error((Status::new(401), "U tag does not match"));
if path != u_req.absolute().unwrap().path() {
bail!("U tag does not match");
}
} else {
return Outcome::Error((Status::new(401), "Invalid U tag"));
bail!("Invalid U tag");
}
} else {
return Outcome::Error((Status::new(401), "Missing url tag"));
bail!("Missing url tag");
}
// check method tag
if let Some(method) = event.tags.iter().find_map(|t| {
if let Some(t_method) = self.event.tags.iter().find_map(|t| {
let vec = t.as_slice();
if vec[0] == "method" {
Some(vec[1].clone())
@ -69,24 +56,53 @@ impl<'r> FromRequest<'r> for Nip98Auth {
None
}
}) {
if request.method().to_string() != *method {
return Outcome::Error((Status::new(401), "Method tag incorrect"));
if method != t_method {
bail!("Method tag incorrect")
}
} else {
return Outcome::Error((Status::new(401), "Missing method tag"));
bail!("Missing method tag")
}
if let Err(_err) = event.verify() {
return Outcome::Error((Status::new(401), "Event signature invalid"));
if let Err(_err) = self.event.verify() {
bail!("Event signature invalid");
}
debug!("{}", event.as_json());
Outcome::Success(Nip98Auth { event })
debug!("{}", self.event.as_json());
Ok(())
}
pub fn from_base64(i: &str) -> anyhow::Result<Self> {
if let Ok(j) = BASE64_STANDARD.decode(i) {
if let Ok(ev) = Event::from_json(j) {
Ok(Self { event: ev })
} else {
Outcome::Error((Status::new(403), "Auth scheme must be Nostr"))
bail!("Invalid nostr event")
}
} else {
Outcome::Error((Status::new(403), "Auth header not found"))
bail!("Invalid auth string");
}
}
}
#[async_trait]
impl<'r> FromRequest<'r> for Nip98Auth {
type Error = String;
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
if let Some(auth) = request.headers().get_one("authorization") {
if !auth.starts_with("Nostr ") {
return Outcome::Error((Status::new(403), "Auth scheme must be Nostr".to_string()));
}
let auth = Nip98Auth::from_base64(&auth[6..]).unwrap();
match auth.check(
request.uri().to_string().as_str(),
request.method().as_str(),
) {
Ok(_) => Outcome::Success(auth),
Err(e) => Outcome::Error((Status::new(401), e.to_string())),
}
} else {
Outcome::Error((Status::new(403), "Auth header not found".to_string()))
}
}
}

View File

@ -1,10 +1,11 @@
use crate::exchange::{ExchangeRateCache, Ticker};
use crate::host::get_host_client;
use crate::host::proxmox::{
CreateVm, DownloadUrlRequest, ProxmoxClient, ResizeDiskRequest, StorageContent, VmBios,
VmConfig,
ConfigureVm, CreateVm, DownloadUrlRequest, ProxmoxClient, ResizeDiskRequest, StorageContent,
VmBios, VmConfig,
};
use crate::provisioner::Provisioner;
use crate::router::Router;
use crate::settings::{QemuConfig, SshConfig};
use crate::ssh_client::SshClient;
use anyhow::{bail, Result};
@ -15,18 +16,23 @@ use fedimint_tonic_lnd::Client;
use ipnetwork::IpNetwork;
use lnvps_db::hydrate::Hydrate;
use lnvps_db::{IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment};
use log::info;
use log::{debug, info, warn};
use nostr::util::hex;
use rand::random;
use rand::seq::IteratorRandom;
use reqwest::Url;
use rocket::futures::{SinkExt, StreamExt};
use std::collections::HashSet;
use std::net::IpAddr;
use std::ops::Add;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub struct LNVpsProvisioner {
db: Box<dyn LNVpsDb>,
router: Option<Box<dyn Router>>,
lnd: Client,
rates: ExchangeRateCache,
read_only: bool,
@ -40,11 +46,13 @@ impl LNVpsProvisioner {
config: QemuConfig,
ssh: Option<SshConfig>,
db: impl LNVpsDb + 'static,
router: Option<impl Router + 'static>,
lnd: Client,
rates: ExchangeRateCache,
) -> Self {
Self {
db: Box::new(db),
router: router.map(|r| Box::new(r) as Box<dyn Router>),
lnd,
rates,
config,
@ -64,6 +72,76 @@ impl LNVpsProvisioner {
bail!("No image storage found");
}
}
async fn get_vm_config(&self, vm: &Vm) -> Result<VmConfig> {
let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?;
let mut ips = self.db.list_vm_ip_assignments(vm.id).await?;
if ips.is_empty() {
ips = self.allocate_ips(vm.id).await?;
}
// load ranges
for ip in &mut ips {
ip.hydrate_up(&self.db).await?;
}
let mut ip_config = ips
.iter()
.map_while(|ip| {
if let Ok(net) = ip.ip.parse::<IpNetwork>() {
Some(match net {
IpNetwork::V4(addr) => {
format!(
"ip={},gw={}",
addr,
ip.ip_range.as_ref().map(|r| &r.gateway).unwrap()
)
}
IpNetwork::V6(addr) => format!("ip6={}", addr),
})
} else {
None
}
})
.collect::<Vec<_>>();
ip_config.push("ip6=auto".to_string());
let mut net = vec![
format!("virtio={}", vm.mac_address),
format!("bridge={}", self.config.bridge),
];
if let Some(t) = self.config.vlan {
net.push(format!("tag={}", t));
}
let drives = self.db.list_host_disks(vm.host_id).await?;
let drive = if let Some(d) = drives.iter().find(|d| d.enabled) {
d
} else {
bail!("No host drive found!")
};
Ok(VmConfig {
cpu: Some(self.config.cpu.clone()),
kvm: Some(self.config.kvm),
ip_config: Some(ip_config.join(",")),
machine: Some(self.config.machine.clone()),
net: Some(net.join(",")),
os_type: Some(self.config.os_type.clone()),
on_boot: Some(true),
bios: Some(VmBios::OVMF),
boot: Some("order=scsi0".to_string()),
cores: Some(vm.cpu as i32),
memory: Some((vm.memory / 1024 / 1024).to_string()),
scsi_hw: Some("virtio-scsi-pci".to_string()),
serial_0: Some("socket".to_string()),
scsi_1: Some(format!("{}:cloudinit", &drive.name)),
ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()),
efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)),
..Default::default()
})
}
}
#[async_trait]
@ -265,6 +343,12 @@ impl Provisioner for LNVpsProvisioner {
ip: ip_net.to_string(),
..Default::default()
};
// add arp entry for router
if let Some(r) = self.router.as_ref() {
r.add_arp_entry(ip, &vm.mac_address, Some(&format!("VM{}", vm.id)))
.await?;
}
let id = self.db.insert_vm_ip_assignment(&assignment).await?;
assignment.id = id;
@ -274,6 +358,10 @@ impl Provisioner for LNVpsProvisioner {
}
}
if ret.is_empty() {
bail!("No ip ranges found in this region");
}
Ok(ret)
}
@ -284,55 +372,6 @@ impl Provisioner for LNVpsProvisioner {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let mut ips = self.db.list_vm_ip_assignments(vm.id).await?;
if ips.is_empty() {
ips = self.allocate_ips(vm.id).await?;
}
// load ranges
for ip in &mut ips {
ip.hydrate_up(&self.db).await?;
}
let mut ip_config = ips
.iter()
.map_while(|ip| {
if let Ok(net) = ip.ip.parse::<IpNetwork>() {
Some(match net {
IpNetwork::V4(addr) => {
format!(
"ip={},gw={}",
addr,
ip.ip_range.as_ref().map(|r| &r.gateway).unwrap()
)
}
IpNetwork::V6(addr) => format!("ip6={}", addr),
})
} else {
None
}
})
.collect::<Vec<_>>();
ip_config.push("ip6=auto".to_string());
let drives = self.db.list_host_disks(vm.host_id).await?;
let drive = if let Some(d) = drives.iter().find(|d| d.enabled) {
d
} else {
bail!("No host drive found!")
};
let ssh_key = self.db.get_user_ssh_key(vm.ssh_key_id).await?;
let mut net = vec![
format!("virtio={}", vm.mac_address),
format!("bridge={}", self.config.bridge),
];
if let Some(t) = self.config.vlan {
net.push(format!("tag={}", t));
}
let vm_id = 100 + vm.id as i32;
// create VM
@ -340,25 +379,7 @@ impl Provisioner for LNVpsProvisioner {
.create_vm(CreateVm {
node: host.name.clone(),
vm_id,
config: VmConfig {
on_boot: Some(true),
bios: Some(VmBios::OVMF),
boot: Some("order=scsi0".to_string()),
cores: Some(vm.cpu as i32),
cpu: Some(self.config.cpu.clone()),
kvm: Some(self.config.kvm),
ip_config: Some(ip_config.join(",")),
machine: Some(self.config.machine.clone()),
memory: Some((vm.memory / 1024 / 1024).to_string()),
net: Some(net.join(",")),
os_type: Some(self.config.os_type.clone()),
scsi_1: Some(format!("{}:cloudinit", &drive.name)),
scsi_hw: Some("virtio-scsi-pci".to_string()),
ssh_keys: Some(urlencoding::encode(&ssh_key.key_data).to_string()),
efi_disk_0: Some(format!("{}:0,efitype=4m", &drive.name)),
serial_0: Some("socket".to_string()),
..Default::default()
},
config: self.get_vm_config(&vm).await?,
})
.await?;
client.wait_for_task(&t_create).await?;
@ -376,6 +397,12 @@ impl Provisioner for LNVpsProvisioner {
)
.await?;
let drives = self.db.list_host_disks(vm.host_id).await?;
let drive = if let Some(d) = drives.iter().find(|d| d.enabled) {
d
} else {
bail!("No host drive found!")
};
let cmd = format!(
"/usr/sbin/qm set {} --scsi0 {}:0,import-from=/var/lib/vz/template/iso/{}",
vm_id,
@ -443,17 +470,82 @@ impl Provisioner for LNVpsProvisioner {
async fn delete_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
//let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
// TODO: delete not implemented, stop only
//let client = get_host_client(&host)?;
//let j_start = client.delete_vm(&host.name, vm.id + 100).await?;
let j_stop = client.stop_vm(&host.name, vm.id + 100).await?;
client.wait_for_task(&j_stop).await?;
//let j_stop = client.stop_vm(&host.name, vm.id + 100).await?;
//client.wait_for_task(&j_stop).await?;
if let Some(r) = self.router.as_ref() {
let ent = r.list_arp_entry().await?;
if let Some(ent) = ent.iter().find(|e| {
e.mac_address
.as_ref()
.map(|m| m.eq_ignore_ascii_case(&vm.mac_address))
.unwrap_or(false)
}) {
r.remove_arp_entry(ent.id.as_ref().unwrap().as_str())
.await?;
} else {
warn!("ARP entry not found, skipping")
}
}
self.db.delete_vm_ip_assignment(vm.id).await?;
self.db.delete_vm(vm.id).await?;
Ok(())
}
async fn terminal_proxy(
&self,
vm_id: u64,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let host_vm_id = vm.id + 100;
let term = client.terminal_proxy(&host.name, host_vm_id).await?;
let login_msg = format!("{}:{}\n", term.user, term.ticket);
let mut ws = client
.open_terminal_proxy(&host.name, host_vm_id, term)
.await?;
debug!("Sending login msg: {}", login_msg);
ws.send(Message::Text(login_msg)).await?;
if let Some(n) = ws.next().await {
debug!("{:?}", n);
} else {
bail!("No response from terminal_proxy");
}
ws.send(Message::Text("1:86:24:".to_string())).await?;
Ok(ws)
}
async fn patch_vm(&self, vm_id: u64) -> Result<()> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let host_vm_id = vm.id + 100;
let t = client
.configure_vm(ConfigureVm {
node: host.name.clone(),
vm_id: host_vm_id as i32,
current: None,
snapshot: None,
config: VmConfig {
scsi_0: None,
scsi_1: None,
efi_disk_0: None,
..self.get_vm_config(&vm).await?
},
})
.await?;
client.wait_for_task(&t).await?;
Ok(())
}
}

View File

@ -1,6 +1,8 @@
use anyhow::Result;
use lnvps_db::{Vm, VmIpAssignment, VmPayment};
use rocket::async_trait;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub mod lnvps;
@ -42,4 +44,13 @@ pub trait Provisioner: Send + Sync {
/// Delete a VM
async fn delete_vm(&self, vm_id: u64) -> Result<()>;
/// Open terminal proxy connection
async fn terminal_proxy(
&self,
vm_id: u64,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>>;
/// Re-Configure VM
async fn patch_vm(&self, vm_id: u64) -> Result<()>;
}

View File

@ -1,29 +1,101 @@
use crate::router::Router;
use crate::router::{ArpEntry, Router};
use anyhow::{bail, Result};
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use log::debug;
use reqwest::{Client, Method, Url};
use rocket::async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::net::IpAddr;
pub struct MikrotikRouter {
url: String,
token: String,
url: Url,
username: String,
password: String,
client: Client,
arp_interface: String,
}
impl MikrotikRouter {
pub fn new(url: &str, token: &str) -> Self {
pub fn new(url: &str, username: &str, password: &str, arp_interface: &str) -> Self {
Self {
url: url.to_string(),
token: token.to_string(),
url: url.parse().unwrap(),
username: username.to_string(),
password: password.to_string(),
client: Client::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap(),
arp_interface: arp_interface.to_string(),
}
}
async fn req<T: DeserializeOwned, R: Serialize>(
&self,
method: Method,
path: &str,
body: R,
) -> Result<T> {
let body = serde_json::to_string(&body)?;
debug!(">> {} {}: {}", method.clone(), path, &body);
let rsp = self
.client
.request(method.clone(), self.url.join(path)?)
.header(
"Authorization",
format!(
"Basic {}",
STANDARD.encode(format!("{}:{}", self.username, self.password))
),
)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.body(body)
.send()
.await?;
let status = rsp.status();
let text = rsp.text().await?;
#[cfg(debug_assertions)]
debug!("<< {}", text);
if status.is_success() {
Ok(serde_json::from_str(&text)?)
} else {
bail!("{} {}: {}", method, path, status);
}
}
}
#[async_trait]
impl Router for MikrotikRouter {
async fn add_arp_entry(
&self,
ip: IpAddr,
mac: &[u8; 6],
comment: Option<&str>,
) -> anyhow::Result<()> {
todo!()
async fn list_arp_entry(&self) -> Result<Vec<ArpEntry>> {
let rsp: Vec<ArpEntry> = self.req(Method::GET, "/rest/ip/arp", ()).await?;
Ok(rsp)
}
async fn add_arp_entry(&self, ip: IpAddr, mac: &str, comment: Option<&str>) -> Result<()> {
let _rsp: ArpEntry = self
.req(
Method::PUT,
"/rest/ip/arp",
ArpEntry {
address: ip.to_string(),
mac_address: Some(mac.to_string()),
interface: self.arp_interface.to_string(),
comment: comment.map(|c| c.to_string()),
..Default::default()
},
)
.await?;
Ok(())
}
async fn remove_arp_entry(&self, id: &str) -> Result<()> {
let _rsp: ArpEntry = self
.req(Method::DELETE, &format!("/rest/ip/arp/{id}"), ())
.await?;
Ok(())
}
}

View File

@ -1,5 +1,6 @@
use anyhow::Result;
use rocket::async_trait;
use rocket::serde::{Deserialize, Serialize};
use std::net::IpAddr;
/// Router defines a network device used to access the hosts
@ -10,9 +11,27 @@ use std::net::IpAddr;
///
/// It also prevents people from re-assigning their IP to another in the range,
#[async_trait]
pub trait Router {
async fn add_arp_entry(&self, ip: IpAddr, mac: &[u8; 6], comment: Option<&str>) -> Result<()>;
pub trait Router: Send + Sync {
async fn list_arp_entry(&self) -> Result<Vec<ArpEntry>>;
async fn add_arp_entry(&self, ip: IpAddr, mac: &str, comment: Option<&str>) -> Result<()>;
async fn remove_arp_entry(&self, id: &str) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ArpEntry {
#[serde(rename = ".id")]
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
pub address: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "mac-address")]
pub mac_address: Option<String>,
pub interface: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
}
#[cfg(feature = "mikrotik")]
mod mikrotik;
#[cfg(feature = "mikrotik")]
pub use mikrotik::*;

View File

@ -1,6 +1,7 @@
use crate::exchange::ExchangeRateCache;
use crate::provisioner::lnvps::LNVpsProvisioner;
use crate::provisioner::Provisioner;
use crate::router::{MikrotikRouter, Router};
use fedimint_tonic_lnd::Client;
use lnvps_db::LNVpsDb;
use serde::{Deserialize, Serialize};
@ -20,6 +21,12 @@ pub struct Settings {
/// SMTP settings for sending emails
pub smtp: Option<SmtpConfig>,
/// Network router config
pub router: Option<RouterConfig>,
/// Nostr config for sending DM's
pub nostr: Option<NostrConfig>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@ -29,6 +36,24 @@ pub struct LndConfig {
pub macaroon: PathBuf,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct NostrConfig {
pub relays: Vec<String>,
pub nsec: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum RouterConfig {
Mikrotik {
url: String,
username: String,
password: String,
/// Interface used to add arp entries
arp_interface: String,
},
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SmtpConfig {
/// Admin user id, for sending system notifications
@ -92,6 +117,7 @@ impl ProvisionerConfig {
pub fn get_provisioner(
&self,
db: impl LNVpsDb + 'static,
router: Option<impl Router + 'static>,
lnd: Client,
exchange: ExchangeRateCache,
) -> impl Provisioner + 'static {
@ -100,7 +126,28 @@ impl ProvisionerConfig {
qemu,
ssh,
read_only,
} => LNVpsProvisioner::new(*read_only, qemu.clone(), ssh.clone(), db, lnd, exchange),
} => LNVpsProvisioner::new(
*read_only,
qemu.clone(),
ssh.clone(),
db,
router,
lnd,
exchange,
),
}
}
}
impl RouterConfig {
pub fn get_router(&self) -> impl Router + 'static {
match self {
RouterConfig::Mikrotik {
url,
username,
password,
arp_interface,
} => MikrotikRouter::new(&url, &username, &password, &arp_interface),
}
}
}

View File

@ -1,5 +1,6 @@
use anyhow::Result;
use log::info;
use ssh2::Channel;
use std::io::Read;
use std::path::PathBuf;
use tokio::net::{TcpStream, ToSocketAddrs};
@ -28,6 +29,11 @@ impl SshClient {
Ok(())
}
pub async fn open_channel(&mut self) -> Result<Channel> {
let channel = self.session.channel_session()?;
Ok(channel)
}
pub async fn execute(&mut self, command: &str) -> Result<(i32, String)> {
info!("Executing command: {}", command);
let mut channel = self.session.channel_session()?;

View File

@ -5,15 +5,15 @@ use crate::settings::{Settings, SmtpConfig};
use crate::status::{VmRunningState, VmState, VmStateCache};
use anyhow::Result;
use chrono::{DateTime, Days, Utc};
use lettre::message::MessageBuilder;
use lettre::message::{MessageBuilder, MultiPart};
use lettre::transport::smtp::authentication::Credentials;
use lettre::transport::smtp::SmtpTransportBuilder;
use lettre::AsyncTransport;
use lettre::{AsyncSmtpTransport, SmtpTransport, Tokio1Executor, Transport};
use lettre::{AsyncSmtpTransport, Tokio1Executor};
use lnvps_db::LNVpsDb;
use log::{debug, error, info};
use rocket::futures::SinkExt;
use std::ops::Add;
use nostr::{EventBuilder, PublicKey};
use nostr_sdk::Client;
use std::ops::{Add, Sub};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
#[derive(Debug)]
@ -39,7 +39,8 @@ pub struct Worker {
vm_state_cache: VmStateCache,
tx: UnboundedSender<WorkJob>,
rx: UnboundedReceiver<WorkJob>,
last_check_vms: u64,
client: Option<Client>,
last_check_vms: DateTime<Utc>,
}
pub struct WorkerSettings {
@ -62,6 +63,7 @@ impl Worker {
provisioner: P,
settings: impl Into<WorkerSettings>,
vm_state_cache: VmStateCache,
client: Option<Client>,
) -> Self {
let (tx, rx) = unbounded_channel();
Self {
@ -71,7 +73,8 @@ impl Worker {
settings: settings.into(),
tx,
rx,
last_check_vms: Utc::now().timestamp() as u64,
client,
last_check_vms: Utc::now(),
}
}
@ -99,6 +102,23 @@ impl Worker {
self.vm_state_cache.set_state(db_id, state).await?;
if let Ok(db_vm) = self.db.get_vm(db_id).await {
const BEFORE_EXPIRE_NOTIFICATION: u64 = 1;
// Send notification of VM expiring soon
if db_vm.expires < Utc::now().add(Days::new(BEFORE_EXPIRE_NOTIFICATION))
&& db_vm.expires
> self
.last_check_vms
.add(Days::new(BEFORE_EXPIRE_NOTIFICATION))
{
info!("Sending expire soon notification VM {}", db_vm.id);
self.tx.send(WorkJob::SendNotification {
user_id: db_vm.user_id,
title: Some(format!("[VM{}] Expiring Soon", db_vm.id)),
message: format!("Your VM #{} will expire soon, please renew in the next {} days or your VM will be stopped.", db_vm.id, BEFORE_EXPIRE_NOTIFICATION)
})?;
}
// Stop VM if expired and is running
if db_vm.expires < Utc::now() && s.status == VmStatus::Running {
info!("Stopping expired VM {}", db_vm.id);
@ -118,11 +138,16 @@ impl Worker {
{
info!("Deleting expired VM {}", db_vm.id);
self.provisioner.delete_vm(db_vm.id).await?;
let title = Some(format!("[VM{}] Deleted", db_vm.id));
self.tx.send(WorkJob::SendNotification {
user_id: db_vm.user_id,
title: Some(format!("[VM{}] Deleted", db_vm.id)),
title: title.clone(),
message: format!("Your VM #{} has been deleted!", db_vm.id),
})?;
self.queue_admin_notification(
format!("VM{} is ready for deletion", db_vm.id),
title,
)?;
}
}
@ -163,16 +188,16 @@ impl Worker {
Ok(())
}
pub async fn check_vms(&self) -> Result<()> {
pub async fn check_vms(&mut self) -> Result<()> {
let hosts = self.db.list_hosts().await?;
for host in hosts {
let client = get_host_client(&host)?;
for node in client.list_nodes().await? {
info!("Checking vms for {}", node.name);
debug!("Checking vms for {}", node.name);
for vm in client.list_vms(&node.name).await? {
let vm_id = vm.vm_id;
info!("\t{}: {:?}", vm_id, vm.status);
debug!("\t{}: {:?}", vm_id, vm.status);
if let Err(e) = self.handle_vm_info(vm).await {
error!("{}", e);
self.queue_admin_notification(
@ -184,6 +209,7 @@ impl Worker {
}
}
// check VM status from db vm list
let db_vms = self.db.list_vms().await?;
for vm in db_vms {
let state = if let Some(s) = self.vm_state_cache.get_state(vm.id).await {
@ -196,10 +222,19 @@ impl Worker {
None
};
if state.is_none() && vm.expires > Utc::now() {
// create VM if not spawned yet
if vm.expires > Utc::now() && state.is_none() {
self.check_vm(vm.id).await?;
}
// delete vm if not paid (in new state)
if !vm.deleted && vm.expires < Utc::now().sub(Days::new(1)) && state.is_none() {
info!("Deleting unpaid VM {}", vm.id);
self.provisioner.delete_vm(vm.id).await?;
}
}
self.last_check_vms = Utc::now();
Ok(())
}
@ -220,9 +255,15 @@ impl Worker {
if let Some(f) = &smtp.from {
b = b.from(f.parse()?);
}
let msg = b.body(message)?;
let template = include_str!("../email.html");
let html = MultiPart::alternative_plain_html(
message.clone(),
template.replace("%%_MESSAGE_%%", &message),
);
let mut sender = AsyncSmtpTransport::<Tokio1Executor>::relay(&smtp.server)?
let msg = b.multipart(html)?;
let sender = AsyncSmtpTransport::<Tokio1Executor>::relay(&smtp.server)?
.credentials(Credentials::new(
smtp.username.to_string(),
smtp.password.to_string(),
@ -233,10 +274,20 @@ impl Worker {
}
}
if user.contact_nip4 {
// send DM
// send dm
}
if user.contact_nip17 {
// send dm
if let Some(c) = self.client.as_ref() {
let sig = c.signer().await?;
let ev = EventBuilder::private_msg(
&sig,
PublicKey::from_slice(&user.pubkey)?,
message,
None,
)
.await?;
c.send_event(ev).await?;
}
}
Ok(())
}