Compare commits
2 Commits
9ee4232706
...
9fb4a38e72
Author | SHA1 | Date | |
---|---|---|---|
9fb4a38e72 | |||
cbafca8da7 |
25
Cargo.lock
generated
25
Cargo.lock
generated
@ -2079,6 +2079,12 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
@ -2201,9 +2207,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nostr"
|
||||
version = "0.39.0"
|
||||
version = "0.40.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d90b55eff1f0747d9e423972179672e1aacac3d3ccee4c1281147eaa90d6491e"
|
||||
checksum = "2f900ddcdc28395759fcd44b18a03255e7deee8858551bfe5d5d5a07311d82ea"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bech32",
|
||||
@ -2214,6 +2220,7 @@ dependencies = [
|
||||
"chacha20poly1305",
|
||||
"getrandom 0.2.15",
|
||||
"instant",
|
||||
"regex",
|
||||
"scrypt",
|
||||
"secp256k1",
|
||||
"serde",
|
||||
@ -2224,23 +2231,25 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nostr-database"
|
||||
version = "0.39.0"
|
||||
version = "0.40.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce07b47c77b8e5a856727885fe0ae47b9aa53d8d853a2190dd479b5a0d6e4f52"
|
||||
checksum = "714512e4653f4e7c4f4abb50a0ac82257541b22087dee780b9e3d787276e88d4"
|
||||
dependencies = [
|
||||
"lru",
|
||||
"nostr",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nostr-relay-pool"
|
||||
version = "0.39.0"
|
||||
version = "0.40.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "211ac5bbdda1a8eec0c21814a838da832038767a5d354fe2fcc1ca438cae56fd"
|
||||
checksum = "5bde07a729e0a1b306c9a07da81a0d1d55d0487316017090906f3b6660741b8d"
|
||||
dependencies = [
|
||||
"async-utility",
|
||||
"async-wsocket",
|
||||
"atomic-destructor",
|
||||
"lru",
|
||||
"negentropy 0.3.1",
|
||||
"negentropy 0.5.0",
|
||||
"nostr",
|
||||
@ -2251,9 +2260,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nostr-sdk"
|
||||
version = "0.39.0"
|
||||
version = "0.40.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5baca581deb810a88bb51c54d1d7980f4506a64a3e9a19270829b406e47adf31"
|
||||
checksum = "26238eee805d7dc3abcc8d570068c81cb4285b08e9db4d7999e54e20748c472e"
|
||||
dependencies = [
|
||||
"async-utility",
|
||||
"nostr",
|
||||
|
11
Cargo.toml
11
Cargo.toml
@ -7,9 +7,10 @@ edition = "2021"
|
||||
name = "api"
|
||||
|
||||
[features]
|
||||
default = ["mikrotik", "nostr-dm", "proxmox", "lnd", "cloudflare", "revolut", "bitvora"]
|
||||
default = ["mikrotik", "nostr-dm", "nostr-dvm", "proxmox", "lnd", "cloudflare", "revolut", "bitvora"]
|
||||
mikrotik = ["dep:reqwest"]
|
||||
nostr-dm = ["dep:nostr-sdk"]
|
||||
nostr-dvm = ["dep:nostr-sdk"]
|
||||
proxmox = ["dep:reqwest", "dep:ssh2", "dep:tokio-tungstenite"]
|
||||
libvirt = ["dep:virt"]
|
||||
lnd = ["dep:fedimint-tonic-lnd"]
|
||||
@ -19,7 +20,7 @@ revolut = ["dep:reqwest", "dep:sha2", "dep:hmac"]
|
||||
|
||||
[dependencies]
|
||||
lnvps_db = { path = "lnvps_db" }
|
||||
tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
|
||||
tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util"] }
|
||||
anyhow = "1.0.83"
|
||||
config = { version = "0.15.8", features = ["yaml"] }
|
||||
log = "0.4.21"
|
||||
@ -37,15 +38,15 @@ rand = "0.9.0"
|
||||
clap = { version = "4.5.21", features = ["derive"] }
|
||||
ssh-key = "0.6.7"
|
||||
lettre = { version = "0.11.10", features = ["tokio1-native-tls"] }
|
||||
ws = { package = "rocket_ws", version = "0.1.0" }
|
||||
ws = { package = "rocket_ws", version = "0.1.1" }
|
||||
native-tls = "0.2.12"
|
||||
hex = "0.4.3"
|
||||
futures = "0.3.31"
|
||||
isocountry = "0.3.2"
|
||||
|
||||
#nostr-dm
|
||||
nostr = { version = "0.39.0", default-features = false, features = ["std"] }
|
||||
nostr-sdk = { version = "0.39.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
|
||||
nostr = { version = "0.40.0", default-features = false, features = ["std"] }
|
||||
nostr-sdk = { version = "0.40.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
|
||||
|
||||
#proxmox
|
||||
tokio-tungstenite = { version = "^0.21", features = ["native-tls"], optional = true }
|
||||
|
@ -38,9 +38,15 @@ pub trait LNVpsDb: Sync + Send {
|
||||
/// List a users ssh keys
|
||||
async fn list_user_ssh_key(&self, user_id: u64) -> Result<Vec<UserSshKey>>;
|
||||
|
||||
/// Get VM host regions
|
||||
async fn list_host_region(&self) -> Result<Vec<VmHostRegion>>;
|
||||
|
||||
/// Get VM host region by id
|
||||
async fn get_host_region(&self, id: u64) -> Result<VmHostRegion>;
|
||||
|
||||
/// Get VM host region by name
|
||||
async fn get_host_region_by_name(&self, name: &str) -> Result<VmHostRegion>;
|
||||
|
||||
/// List VM's owned by a specific user
|
||||
async fn list_hosts(&self) -> Result<Vec<VmHost>>;
|
||||
|
||||
|
@ -103,6 +103,18 @@ pub enum DiskType {
|
||||
SSD = 1,
|
||||
}
|
||||
|
||||
impl FromStr for DiskType {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
match s.to_lowercase().as_str() {
|
||||
"hdd" => Ok(DiskType::HDD),
|
||||
"ssd" => Ok(DiskType::SSD),
|
||||
_ => Err(anyhow!("unknown disk type {}", s)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)]
|
||||
#[repr(u16)]
|
||||
pub enum DiskInterface {
|
||||
@ -112,6 +124,19 @@ pub enum DiskInterface {
|
||||
PCIe = 2,
|
||||
}
|
||||
|
||||
impl FromStr for DiskInterface {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
match s.to_lowercase().as_str() {
|
||||
"sata" => Ok(DiskInterface::SATA),
|
||||
"scsi" => Ok(DiskInterface::SCSI),
|
||||
"pcie" => Ok(DiskInterface::PCIe),
|
||||
_ => Err(anyhow!("unknown disk interface {}", s)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, sqlx::Type, Default, PartialEq, Eq)]
|
||||
#[repr(u16)]
|
||||
pub enum OsDistribution {
|
||||
|
@ -109,6 +109,13 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
async fn list_host_region(&self) -> Result<Vec<VmHostRegion>> {
|
||||
sqlx::query_as("select * from vm_host_region where enabled=1")
|
||||
.fetch_all(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
async fn get_host_region(&self, id: u64) -> Result<VmHostRegion> {
|
||||
sqlx::query_as("select * from vm_host_region where id=?")
|
||||
.bind(id)
|
||||
@ -117,6 +124,14 @@ impl LNVpsDb for LNVpsDbMysql {
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
async fn get_host_region_by_name(&self, name: &str) -> Result<VmHostRegion> {
|
||||
sqlx::query_as("select * from vm_host_region where name like ?")
|
||||
.bind(name)
|
||||
.fetch_one(&self.db)
|
||||
.await
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
async fn list_hosts(&self) -> Result<Vec<VmHost>> {
|
||||
sqlx::query_as("select * from vm_host where enabled = 1")
|
||||
.fetch_all(&self.db)
|
||||
|
@ -11,15 +11,17 @@ use crate::provisioner::{HostCapacityService, LNVpsProvisioner, PricingEngine};
|
||||
use crate::settings::Settings;
|
||||
use crate::status::{VmState, VmStateCache};
|
||||
use crate::worker::WorkJob;
|
||||
use anyhow::Result;
|
||||
use anyhow::{bail, Result};
|
||||
use futures::future::join_all;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use isocountry::CountryCode;
|
||||
use lnvps_db::{
|
||||
IpRange, LNVpsDb, PaymentMethod, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate,
|
||||
};
|
||||
use log::{error, info};
|
||||
use nostr::util::hex;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::{get, patch, post, Responder, Route, State};
|
||||
use rocket::{get, patch, post, routes, Responder, Route, State};
|
||||
use rocket_okapi::gen::OpenApiGenerator;
|
||||
use rocket_okapi::okapi::openapi3::Responses;
|
||||
use rocket_okapi::response::OpenApiResponderInner;
|
||||
@ -28,12 +30,15 @@ use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssh_key::PublicKey;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Display;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::sync::mpsc::{Sender, UnboundedSender};
|
||||
|
||||
pub fn routes() -> Vec<Route> {
|
||||
openapi_get_routes![
|
||||
let mut routes = vec![];
|
||||
|
||||
routes.append(&mut openapi_get_routes![
|
||||
v1_get_account,
|
||||
v1_patch_account,
|
||||
v1_list_vms,
|
||||
@ -54,7 +59,11 @@ pub fn routes() -> Vec<Route> {
|
||||
v1_custom_template_calc,
|
||||
v1_create_custom_vm_order,
|
||||
v1_get_payment_methods
|
||||
]
|
||||
]);
|
||||
|
||||
routes.append(&mut routes![v1_terminal_proxy]);
|
||||
|
||||
routes
|
||||
}
|
||||
|
||||
type ApiResult<T> = Result<Json<ApiData<T>>, ApiError>;
|
||||
@ -639,6 +648,117 @@ async fn v1_time_series(
|
||||
ApiData::ok(client.get_time_series_data(&vm, TimeSeries::Hourly).await?)
|
||||
}
|
||||
|
||||
#[get("/api/v1/vm/<id>/console?<auth>")]
|
||||
async fn v1_terminal_proxy(
|
||||
auth: &str,
|
||||
db: &State<Arc<dyn LNVpsDb>>,
|
||||
settings: &State<Settings>,
|
||||
id: u64,
|
||||
ws: ws::WebSocket,
|
||||
) -> Result<ws::Channel<'static>, &'static str> {
|
||||
let auth = Nip98Auth::from_base64(auth).map_err(|e| "Missing or invalid auth param")?;
|
||||
if auth
|
||||
.check(&format!("/api/v1/vm/{id}/console"), "GET")
|
||||
.is_err()
|
||||
{
|
||||
return Err("Invalid auth event");
|
||||
}
|
||||
let pubkey = auth.event.pubkey.to_bytes();
|
||||
let uid = db.upsert_user(&pubkey).await.map_err(|_| "Insert failed")?;
|
||||
let vm = db.get_vm(id).await.map_err(|_| "VM not found")?;
|
||||
if uid != vm.user_id {
|
||||
return Err("VM does not belong to you");
|
||||
}
|
||||
|
||||
let host = db
|
||||
.get_host(vm.host_id)
|
||||
.await
|
||||
.map_err(|_| "VM host not found")?;
|
||||
let client =
|
||||
get_host_client(&host, &settings.provisioner).map_err(|_| "Failed to get host client")?;
|
||||
|
||||
let mut ws_upstream = client.connect_terminal(&vm).await.map_err(|e| {
|
||||
error!("Failed to start terminal proxy: {}", e);
|
||||
"Failed to open terminal proxy"
|
||||
})?;
|
||||
let ws = ws.config(Default::default());
|
||||
Ok(ws.channel(move |mut stream| {
|
||||
use ws::*;
|
||||
|
||||
Box::pin(async move {
|
||||
async fn process_client<E>(
|
||||
msg: Result<Message, E>,
|
||||
ws_upstream: &mut Sender<Vec<u8>>,
|
||||
) -> Result<()>
|
||||
where
|
||||
E: Display,
|
||||
{
|
||||
match msg {
|
||||
Ok(m) => {
|
||||
let m_up = match m {
|
||||
Message::Text(t) => {
|
||||
info!("Got msg: {}", t);
|
||||
t.as_bytes().to_vec()
|
||||
}
|
||||
_ => panic!("todo"),
|
||||
};
|
||||
if let Err(e) = ws_upstream.send(m_up).await {
|
||||
bail!("Failed to send msg to upstream: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
bail!("Failed to read from client: {}", e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_upstream<E>(
|
||||
msg: Result<Vec<u8>, E>,
|
||||
tx_client: &mut stream::DuplexStream,
|
||||
) -> Result<()>
|
||||
where
|
||||
E: Display,
|
||||
{
|
||||
match msg {
|
||||
Ok(m) => {
|
||||
let down = String::from_utf8_lossy(&m).into_owned();
|
||||
info!("Got down msg: {}", &down);
|
||||
let m_down = Message::Text(down);
|
||||
if let Err(e) = tx_client.send(m_down).await {
|
||||
bail!("Failed to msg to client: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
bail!("Failed to read from upstream: {}", e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(msg) = stream.next() => {
|
||||
if let Err(e) = process_client(msg, &mut ws_upstream.tx).await {
|
||||
error!("{}", e);
|
||||
break;
|
||||
}
|
||||
},
|
||||
Some(r) = ws_upstream.rx.recv() => {
|
||||
let msg: Result<Vec<u8>, anyhow::Error> = Ok(r);
|
||||
if let Err(e) = process_upstream(msg, &mut stream).await {
|
||||
error!("{}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Websocket closed");
|
||||
Ok(())
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
#[openapi(tag = "Payment")]
|
||||
#[get("/api/v1/payment/methods")]
|
||||
async fn v1_get_payment_methods(settings: &State<Settings>) -> ApiResult<Vec<ApiPaymentInfo>> {
|
||||
|
@ -5,6 +5,7 @@ use config::{Config, File};
|
||||
use lnvps::api;
|
||||
use lnvps::cors::CORS;
|
||||
use lnvps::data_migration::run_data_migrations;
|
||||
use lnvps::dvm::start_dvms;
|
||||
use lnvps::exchange::{DefaultRateCache, ExchangeRateService};
|
||||
use lnvps::lightning::get_node;
|
||||
use lnvps::payments::listen_all_payments;
|
||||
@ -15,12 +16,12 @@ use lnvps_db::{LNVpsDb, LNVpsDbMysql};
|
||||
use log::{error, LevelFilter};
|
||||
use nostr::Keys;
|
||||
use nostr_sdk::Client;
|
||||
use rocket::http::Method;
|
||||
use rocket_okapi::swagger_ui::{make_swagger_ui, SwaggerUIConfig};
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use rocket::http::Method;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[clap(about, version, author)]
|
||||
@ -151,6 +152,12 @@ async fn main() -> Result<(), Error> {
|
||||
}
|
||||
});
|
||||
|
||||
#[cfg(feature = "nostr-dvm")]
|
||||
{
|
||||
let nostr_client = nostr_client.unwrap();
|
||||
start_dvms(nostr_client.clone(), provisioner.clone());
|
||||
}
|
||||
|
||||
let mut config = rocket::Config::default();
|
||||
let ip: SocketAddr = match &settings.listen {
|
||||
Some(i) => i.parse()?,
|
||||
@ -175,14 +182,15 @@ async fn main() -> Result<(), Error> {
|
||||
}),
|
||||
)
|
||||
.attach(CORS)
|
||||
.mount("/", vec![
|
||||
rocket::Route::ranked(
|
||||
.mount(
|
||||
"/",
|
||||
vec![rocket::Route::ranked(
|
||||
isize::MAX,
|
||||
Method::Options,
|
||||
"/<catch_all_options_route..>",
|
||||
CORS,
|
||||
)],
|
||||
)
|
||||
])
|
||||
.launch()
|
||||
.await
|
||||
{
|
||||
|
215
src/dvm/lnvps.rs
Normal file
215
src/dvm/lnvps.rs
Normal file
@ -0,0 +1,215 @@
|
||||
use crate::dvm::{build_status_for_job, DVMHandler, DVMJobRequest};
|
||||
use crate::provisioner::LNVpsProvisioner;
|
||||
use anyhow::Context;
|
||||
use lnvps_db::{DiskInterface, DiskType, LNVpsDb, PaymentMethod, UserSshKey, VmCustomTemplate};
|
||||
use nostr::prelude::DataVendingMachineStatus;
|
||||
use nostr::Tag;
|
||||
use nostr_sdk::Client;
|
||||
use ssh_key::PublicKey;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct LnvpsDvm {
|
||||
client: Client,
|
||||
provisioner: Arc<LNVpsProvisioner>,
|
||||
}
|
||||
|
||||
impl LnvpsDvm {
|
||||
pub fn new(provisioner: Arc<LNVpsProvisioner>, client: Client) -> LnvpsDvm {
|
||||
Self {
|
||||
provisioner,
|
||||
client,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DVMHandler for LnvpsDvm {
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
request: DVMJobRequest,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
|
||||
let provisioner = self.provisioner.clone();
|
||||
let client = self.client.clone();
|
||||
Box::pin(async move {
|
||||
let default_disk = "ssd".to_string();
|
||||
let default_interface = "pcie".to_string();
|
||||
let cpu = request.params.get("cpu").context("missing cpu parameter")?;
|
||||
let memory = request
|
||||
.params
|
||||
.get("memory")
|
||||
.context("missing memory parameter")?;
|
||||
let disk = request
|
||||
.params
|
||||
.get("disk")
|
||||
.context("missing disk parameter")?;
|
||||
let disk_type = request.params.get("disk_type").unwrap_or(&default_disk);
|
||||
let disk_interface = request
|
||||
.params
|
||||
.get("disk_interface")
|
||||
.unwrap_or(&default_interface);
|
||||
let ssh_key = request
|
||||
.params
|
||||
.get("ssh_key")
|
||||
.context("missing ssh_key parameter")?;
|
||||
let ssh_key_name = request.params.get("ssh_key_name");
|
||||
let region = request.params.get("region");
|
||||
|
||||
let db = provisioner.get_db();
|
||||
let host_region = if let Some(r) = region {
|
||||
db.get_host_region_by_name(r).await?
|
||||
} else {
|
||||
db.list_host_region()
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.context("no host region")?
|
||||
};
|
||||
let pricing = db.list_custom_pricing(host_region.id).await?;
|
||||
|
||||
// we expect only 1 pricing per region
|
||||
let pricing = pricing
|
||||
.first()
|
||||
.context("no custom pricing found in region")?;
|
||||
|
||||
let template = VmCustomTemplate {
|
||||
id: 0,
|
||||
cpu: cpu.parse()?,
|
||||
memory: memory.parse()?,
|
||||
disk_size: disk.parse()?,
|
||||
disk_type: DiskType::from_str(disk_type)?,
|
||||
disk_interface: DiskInterface::from_str(disk_interface)?,
|
||||
pricing_id: pricing.id,
|
||||
};
|
||||
let uid = db.upsert_user(request.event.pubkey.as_bytes()).await?;
|
||||
|
||||
let pk: PublicKey = ssh_key.parse()?;
|
||||
let key_name = if let Some(n) = ssh_key_name {
|
||||
n.clone()
|
||||
} else {
|
||||
pk.comment().to_string()
|
||||
};
|
||||
let new_key = UserSshKey {
|
||||
name: key_name,
|
||||
user_id: uid,
|
||||
key_data: pk.to_openssh()?,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// report as started if params are valid
|
||||
let processing =
|
||||
build_status_for_job(&request, DataVendingMachineStatus::Processing, None, None);
|
||||
client.send_event_builder(processing).await?;
|
||||
|
||||
let existing_keys = db.list_user_ssh_key(uid).await?;
|
||||
let ssh_key_id = if let Some(k) = existing_keys.iter().find(|k| {
|
||||
let ek: PublicKey = k.key_data.parse().unwrap();
|
||||
ek.eq(&pk)
|
||||
}) {
|
||||
k.id
|
||||
} else {
|
||||
db.insert_user_ssh_key(&new_key).await?
|
||||
};
|
||||
|
||||
let vm = provisioner
|
||||
.provision_custom(uid, template, 0, ssh_key_id, None)
|
||||
.await?;
|
||||
let invoice = provisioner.renew(vm.id, PaymentMethod::Lightning).await?;
|
||||
|
||||
let mut payment = build_status_for_job(
|
||||
&request,
|
||||
DataVendingMachineStatus::PaymentRequired,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
payment = payment.tag(Tag::parse([
|
||||
"amount",
|
||||
invoice.amount.to_string().as_str(),
|
||||
&invoice.external_data,
|
||||
])?);
|
||||
client.send_event_builder(payment).await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::dvm::parse_job_request;
|
||||
use crate::exchange::{ExchangeRateService, Ticker};
|
||||
use crate::mocks::{MockDb, MockExchangeRate, MockNode};
|
||||
use crate::settings::mock_settings;
|
||||
use lnvps_db::{VmCustomPricing, VmCustomPricingDisk};
|
||||
use nostr::{EventBuilder, Keys, Kind};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dvm() -> anyhow::Result<()> {
|
||||
let db = Arc::new(MockDb::default());
|
||||
let node = Arc::new(MockNode::new());
|
||||
let exch = Arc::new(MockExchangeRate::new());
|
||||
exch.set_rate(Ticker::btc_rate("EUR")?, 69_420.0).await;
|
||||
|
||||
{
|
||||
let mut cp = db.custom_pricing.lock().await;
|
||||
cp.insert(
|
||||
1,
|
||||
VmCustomPricing {
|
||||
id: 1,
|
||||
name: "mock".to_string(),
|
||||
enabled: true,
|
||||
created: Default::default(),
|
||||
expires: None,
|
||||
region_id: 1,
|
||||
currency: "EUR".to_string(),
|
||||
cpu_cost: 1.5,
|
||||
memory_cost: 0.5,
|
||||
ip4_cost: 1.5,
|
||||
ip6_cost: 0.05,
|
||||
},
|
||||
);
|
||||
let mut cpd = db.custom_pricing_disk.lock().await;
|
||||
cpd.insert(
|
||||
1,
|
||||
VmCustomPricingDisk {
|
||||
id: 1,
|
||||
pricing_id: 1,
|
||||
kind: DiskType::SSD,
|
||||
interface: DiskInterface::PCIe,
|
||||
cost: 0.05,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let settings = mock_settings();
|
||||
let provisioner = Arc::new(LNVpsProvisioner::new(
|
||||
settings,
|
||||
db.clone(),
|
||||
node.clone(),
|
||||
exch.clone(),
|
||||
));
|
||||
let keys = Keys::generate();
|
||||
let empty_client = Client::new(keys.clone());
|
||||
empty_client.add_relay("wss://nos.lol").await?;
|
||||
empty_client.connect().await;
|
||||
|
||||
let mut dvm = LnvpsDvm::new(provisioner.clone(), empty_client.clone());
|
||||
|
||||
let ev = EventBuilder::new(Kind::from_u16(5999), "")
|
||||
.tags([
|
||||
Tag::parse(["param", "cpu", "1"])?,
|
||||
Tag::parse(["param", "memory", "1024"])?,
|
||||
Tag::parse(["param", "disk", "50"])?,
|
||||
Tag::parse(["param", "disk_type", "ssd"])?,
|
||||
Tag::parse(["param", "ssh_key", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIGUSrwzZfbjqY81RRC7eg3zRvg0D53HOhjbG6h0SY3f3"])?,
|
||||
])
|
||||
.sign(&keys)
|
||||
.await?;
|
||||
let req = parse_job_request(&ev)?;
|
||||
dvm.handle_request(req).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
260
src/dvm/mod.rs
Normal file
260
src/dvm/mod.rs
Normal file
@ -0,0 +1,260 @@
|
||||
mod lnvps;
|
||||
|
||||
use crate::dvm::lnvps::LnvpsDvm;
|
||||
use crate::provisioner::LNVpsProvisioner;
|
||||
use anyhow::Result;
|
||||
use futures::FutureExt;
|
||||
use log::{error, info, warn};
|
||||
use nostr::Filter;
|
||||
use nostr_sdk::prelude::DataVendingMachineStatus;
|
||||
use nostr_sdk::{
|
||||
Client, Event, EventBuilder, EventId, Kind, RelayPoolNotification, Tag, Timestamp, Url,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DVMJobRequest {
|
||||
/// The source event
|
||||
pub event: Event,
|
||||
/// Input data for the job (zero or more inputs)
|
||||
pub inputs: Vec<DVMInput>,
|
||||
/// Expected output format. Different job request kind defines this more precisely.
|
||||
pub output_type: Option<String>,
|
||||
/// Optional parameters for the job as key (first argument)/value (second argument).
|
||||
/// Different job request kind defines this more precisely. (e.g. [ "param", "lang", "es" ])
|
||||
pub params: HashMap<String, String>,
|
||||
/// Customer MAY specify a maximum amount (in millisats) they are willing to pay
|
||||
pub bid: Option<u64>,
|
||||
/// List of relays where Service Providers SHOULD publish responses to
|
||||
pub relays: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum DVMInput {
|
||||
Url {
|
||||
url: Url,
|
||||
relay: Option<String>,
|
||||
marker: Option<String>,
|
||||
},
|
||||
Event {
|
||||
event: EventId,
|
||||
relay: Option<String>,
|
||||
marker: Option<String>,
|
||||
},
|
||||
Job {
|
||||
event: EventId,
|
||||
relay: Option<String>,
|
||||
marker: Option<String>,
|
||||
},
|
||||
Text {
|
||||
data: String,
|
||||
relay: Option<String>,
|
||||
marker: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Basic DVM handler that accepts a job request
|
||||
pub trait DVMHandler: Send + Sync {
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
request: DVMJobRequest,
|
||||
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
|
||||
}
|
||||
|
||||
pub(crate) fn build_status_for_job(
|
||||
req: &DVMJobRequest,
|
||||
status: DataVendingMachineStatus,
|
||||
extra: Option<&str>,
|
||||
content: Option<&str>,
|
||||
) -> EventBuilder {
|
||||
EventBuilder::new(Kind::JobFeedback, content.unwrap_or("")).tags([
|
||||
Tag::parse(["status", status.to_string().as_str(), extra.unwrap_or("")]).unwrap(),
|
||||
Tag::expiration(Timestamp::now() + Duration::from_secs(30)),
|
||||
Tag::event(req.event.id),
|
||||
Tag::public_key(req.event.pubkey),
|
||||
])
|
||||
}
|
||||
|
||||
/// Start listening for jobs with a specific handler
|
||||
fn listen_for_jobs(
|
||||
client: Client,
|
||||
kind: Kind,
|
||||
mut dvm: Box<dyn DVMHandler>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
|
||||
Box::pin(async move {
|
||||
let sub = client
|
||||
.subscribe(Filter::new().kind(kind).since(Timestamp::now()), None)
|
||||
.await?;
|
||||
|
||||
info!("Listening for jobs: {}", kind);
|
||||
let mut rx = client.notifications();
|
||||
while let Ok(e) = rx.recv().await {
|
||||
match e {
|
||||
RelayPoolNotification::Event { event, .. } if event.kind == kind => {
|
||||
match parse_job_request(&event) {
|
||||
Ok(req) => {
|
||||
if let Err(e) = dvm.handle_request(req.clone()).await {
|
||||
error!("Error handling job request: {}", e);
|
||||
|
||||
let data = build_status_for_job(
|
||||
&req,
|
||||
DataVendingMachineStatus::Error,
|
||||
Some(e.to_string().as_str()),
|
||||
None,
|
||||
);
|
||||
client.send_event_builder(data).await?;
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("Invalid job request: {:?}", e),
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
client.unsubscribe(&sub).await;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_job_request(event: &Event) -> Result<DVMJobRequest> {
|
||||
let mut inputs = vec![];
|
||||
for i_tag in event
|
||||
.tags
|
||||
.iter()
|
||||
.filter(|t| t.kind().as_str() == "i")
|
||||
.map(|t| t.as_slice())
|
||||
{
|
||||
let input = match i_tag[2].as_str() {
|
||||
"url" => DVMInput::Url {
|
||||
url: if let Ok(u) = i_tag[1].parse() {
|
||||
u
|
||||
} else {
|
||||
warn!("Invalid url: {}", i_tag[1]);
|
||||
continue;
|
||||
},
|
||||
relay: if i_tag.len() > 3 {
|
||||
Some(i_tag[3].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
marker: if i_tag.len() > 4 {
|
||||
Some(i_tag[4].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
},
|
||||
"event" => DVMInput::Event {
|
||||
event: if let Ok(t) = EventId::parse(&i_tag[1]) {
|
||||
t
|
||||
} else {
|
||||
warn!("Invalid event id: {}", i_tag[1]);
|
||||
continue;
|
||||
},
|
||||
relay: if i_tag.len() > 3 {
|
||||
Some(i_tag[3].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
marker: if i_tag.len() > 4 {
|
||||
Some(i_tag[4].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
},
|
||||
"job" => DVMInput::Job {
|
||||
event: if let Ok(t) = EventId::parse(&i_tag[1]) {
|
||||
t
|
||||
} else {
|
||||
warn!("Invalid event id in job: {}", i_tag[1]);
|
||||
continue;
|
||||
},
|
||||
relay: if i_tag.len() > 3 {
|
||||
Some(i_tag[3].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
marker: if i_tag.len() > 4 {
|
||||
Some(i_tag[4].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
},
|
||||
"text" => DVMInput::Text {
|
||||
data: i_tag[1].to_string(),
|
||||
relay: if i_tag.len() > 3 {
|
||||
Some(i_tag[3].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
marker: if i_tag.len() > 4 {
|
||||
Some(i_tag[4].to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
},
|
||||
t => {
|
||||
warn!("unknown tag: {}", t);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
inputs.push(input);
|
||||
}
|
||||
|
||||
let params: HashMap<String, String> = event
|
||||
.tags
|
||||
.iter()
|
||||
.filter(|t| t.kind().as_str() == "param")
|
||||
.filter_map(|p| {
|
||||
let p = p.as_slice();
|
||||
if p.len() == 3 {
|
||||
Some((p[1].clone(), p[2].clone()))
|
||||
} else {
|
||||
warn!("Invalid param: {}", p.join(", "));
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(DVMJobRequest {
|
||||
event: event.clone(),
|
||||
inputs,
|
||||
output_type: event
|
||||
.tags
|
||||
.iter()
|
||||
.find(|t| t.kind().as_str() == "output")
|
||||
.and_then(|t| t.content())
|
||||
.map(|s| s.to_string()),
|
||||
params,
|
||||
bid: event
|
||||
.tags
|
||||
.iter()
|
||||
.find(|t| t.kind().as_str() == "bid")
|
||||
.and_then(|t| t.content())
|
||||
.and_then(|t| t.parse::<u64>().ok()),
|
||||
relays: event
|
||||
.tags
|
||||
.iter()
|
||||
.filter(|t| t.kind().as_str() == "relay")
|
||||
.map(|c| &c.as_slice()[1..])
|
||||
.flatten()
|
||||
.map(|s| s.to_string())
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn start_dvms(
|
||||
client: Client,
|
||||
provisioner: Arc<LNVpsProvisioner>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let dvm = LnvpsDvm::new(provisioner, client.clone());
|
||||
if let Err(e) = listen_for_jobs(client, Kind::from_u16(5999), Box::new(dvm)).await {
|
||||
error!("Error listening jobs: {}", e);
|
||||
}
|
||||
})
|
||||
}
|
@ -2,6 +2,7 @@ use crate::settings::ProvisionerConfig;
|
||||
use crate::status::VmState;
|
||||
use anyhow::{bail, Result};
|
||||
use futures::future::join_all;
|
||||
use futures::{Sink, Stream};
|
||||
use lnvps_db::{
|
||||
async_trait, IpRange, LNVpsDb, UserSshKey, Vm, VmCustomTemplate, VmHost, VmHostDisk,
|
||||
VmHostKind, VmIpAssignment, VmOsImage, VmTemplate,
|
||||
@ -9,13 +10,24 @@ use lnvps_db::{
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
#[cfg(feature = "libvirt")]
|
||||
mod libvirt;
|
||||
//#[cfg(feature = "libvirt")]
|
||||
//mod libvirt;
|
||||
#[cfg(feature = "proxmox")]
|
||||
mod proxmox;
|
||||
|
||||
pub struct TerminalStream {
|
||||
pub shutdown: Arc<AtomicBool>,
|
||||
pub rx: Receiver<Vec<u8>>,
|
||||
pub tx: Sender<Vec<u8>>,
|
||||
}
|
||||
|
||||
/// Generic type for creating VM's
|
||||
#[async_trait]
|
||||
pub trait VmHostClient: Send + Sync {
|
||||
@ -52,6 +64,9 @@ pub trait VmHostClient: Send + Sync {
|
||||
vm: &Vm,
|
||||
series: TimeSeries,
|
||||
) -> Result<Vec<TimeSeriesData>>;
|
||||
|
||||
/// Connect to terminal serial port
|
||||
async fn connect_terminal(&self, vm: &Vm) -> Result<TerminalStream>;
|
||||
}
|
||||
|
||||
pub fn get_host_client(host: &VmHost, cfg: &ProvisionerConfig) -> Result<Arc<dyn VmHostClient>> {
|
||||
|
@ -1,23 +1,35 @@
|
||||
use crate::host::{FullVmInfo, TimeSeries, TimeSeriesData, VmHostClient};
|
||||
use crate::host::{FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient};
|
||||
use crate::json_api::JsonApi;
|
||||
use crate::settings::{QemuConfig, SshConfig};
|
||||
use crate::ssh_client::SshClient;
|
||||
use crate::status::{VmRunningState, VmState};
|
||||
use anyhow::{anyhow, bail, ensure, Result};
|
||||
use chrono::Utc;
|
||||
use futures::{Stream, StreamExt};
|
||||
use ipnetwork::IpNetwork;
|
||||
use lnvps_db::{async_trait, DiskType, Vm, VmOsImage};
|
||||
use log::{info, warn};
|
||||
use log::{error, info, warn};
|
||||
use rand::random;
|
||||
use reqwest::header::{HeaderMap, AUTHORIZATION};
|
||||
use reqwest::{ClientBuilder, Method, Url};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::io::{Read, Write};
|
||||
use std::net::IpAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::time;
|
||||
use tokio::time::sleep;
|
||||
use tokio_tungstenite::tungstenite::protocol::Role;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use ws::stream::DuplexStream;
|
||||
|
||||
pub struct ProxmoxClient {
|
||||
api: JsonApi,
|
||||
@ -650,6 +662,70 @@ impl VmHostClient for ProxmoxClient {
|
||||
.await?;
|
||||
Ok(r.into_iter().map(TimeSeriesData::from).collect())
|
||||
}
|
||||
|
||||
async fn connect_terminal(&self, vm: &Vm) -> Result<TerminalStream> {
|
||||
// the proxmox api for terminal connection is weird and doesn't work
|
||||
// when I tested it, using ssh instead to run qm terminal command
|
||||
|
||||
if let Some(ssh_config) = &self.ssh {
|
||||
let mut ses = SshClient::new()?;
|
||||
ses.connect(
|
||||
(self.api.base.host().unwrap().to_string(), 22),
|
||||
&ssh_config.user,
|
||||
&ssh_config.key,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let vm_id: ProxmoxVmId = vm.id.into();
|
||||
let sock_path = PathBuf::from(&format!("/var/run/qemu-server/{}.serial0", vm_id));
|
||||
let mut chan = ses.tunnel_unix_socket(&sock_path)?;
|
||||
|
||||
let (mut client_tx, client_rx) = channel::<Vec<u8>>(1024);
|
||||
let (server_tx, mut server_rx) = channel::<Vec<u8>>(1024);
|
||||
let shutdown = Arc::new(AtomicBool::new(false));
|
||||
let shut_chan = shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut w_buf = vec![0; 4096];
|
||||
|
||||
// fire calls to read every 100ms
|
||||
let mut chan_timer = time::interval(Duration::from_millis(100));
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(buf) = server_rx.recv() => {
|
||||
if let Err(e) = chan.write_all(&buf) {
|
||||
error!("Failed to send data: {}", e);
|
||||
}
|
||||
}
|
||||
_ = chan_timer.tick() => {
|
||||
if chan.eof() {
|
||||
info!("SSH connection terminated!");
|
||||
shut_chan.store(true, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
let r_window = chan.read_window();
|
||||
|
||||
let mut stream = chan.stream(0);
|
||||
if let Ok(r) = stream.read(w_buf.as_mut_slice()) {
|
||||
if r > 0 {
|
||||
if let Err(e) = client_tx.send(w_buf[..r].to_vec()).await {
|
||||
error!("Failed to write data: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("SSH connection terminated!");
|
||||
});
|
||||
return Ok(TerminalStream{
|
||||
shutdown,
|
||||
rx: client_rx,
|
||||
tx: server_tx,
|
||||
});
|
||||
}
|
||||
bail!("Cannot use terminal proxy without ssh")
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap a database vm id
|
||||
|
@ -20,6 +20,8 @@ pub mod worker;
|
||||
#[cfg(test)]
|
||||
pub mod mocks;
|
||||
|
||||
#[cfg(feature = "nostr-dvm")]
|
||||
pub mod dvm;
|
||||
|
||||
/// SATS per BTC
|
||||
pub const BTC_SATS: f64 = 100_000_000.0;
|
||||
|
21
src/mocks.rs
21
src/mocks.rs
@ -1,7 +1,7 @@
|
||||
#![allow(unused)]
|
||||
use crate::dns::{BasicRecord, DnsServer, RecordType};
|
||||
use crate::exchange::{ExchangeRateService, Ticker, TickerRate};
|
||||
use crate::host::{FullVmInfo, TimeSeries, TimeSeriesData, VmHostClient};
|
||||
use crate::host::{FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient};
|
||||
use crate::lightning::{AddInvoiceRequest, AddInvoiceResult, InvoiceUpdate, LightningNode};
|
||||
use crate::router::{ArpEntry, Router};
|
||||
use crate::settings::NetworkPolicy;
|
||||
@ -265,11 +265,26 @@ impl LNVpsDb for MockDb {
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn list_host_region(&self) -> anyhow::Result<Vec<VmHostRegion>> {
|
||||
let regions = self.regions.lock().await;
|
||||
Ok(regions.values().filter(|r| r.enabled).cloned().collect())
|
||||
}
|
||||
|
||||
async fn get_host_region(&self, id: u64) -> anyhow::Result<VmHostRegion> {
|
||||
let regions = self.regions.lock().await;
|
||||
Ok(regions.get(&id).ok_or(anyhow!("no region"))?.clone())
|
||||
}
|
||||
|
||||
async fn get_host_region_by_name(&self, name: &str) -> anyhow::Result<VmHostRegion> {
|
||||
let regions = self.regions.lock().await;
|
||||
Ok(regions
|
||||
.iter()
|
||||
.find(|(_, v)| v.name == name)
|
||||
.ok_or(anyhow!("no region"))?
|
||||
.1
|
||||
.clone())
|
||||
}
|
||||
|
||||
async fn list_hosts(&self) -> anyhow::Result<Vec<VmHost>> {
|
||||
let hosts = self.hosts.lock().await;
|
||||
Ok(hosts.values().filter(|h| h.enabled).cloned().collect())
|
||||
@ -802,6 +817,10 @@ impl VmHostClient for MockVmHost {
|
||||
) -> anyhow::Result<Vec<TimeSeriesData>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
async fn connect_terminal(&self, vm: &Vm) -> anyhow::Result<TerminalStream> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockDnsServer {
|
||||
|
@ -11,7 +11,7 @@ use crate::settings::{NetworkAccessPolicy, NetworkPolicy, ProvisionerConfig, Set
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use chrono::Utc;
|
||||
use isocountry::CountryCode;
|
||||
use lnvps_db::{LNVpsDb, PaymentMethod, Vm, VmCustomTemplate, VmIpAssignment, VmPayment};
|
||||
use lnvps_db::{LNVpsDb, PaymentMethod, User, Vm, VmCustomTemplate, VmIpAssignment, VmPayment};
|
||||
use log::{info, warn};
|
||||
use nostr::util::hex;
|
||||
use std::collections::HashMap;
|
||||
@ -242,6 +242,11 @@ impl LNVpsProvisioner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get database handle
|
||||
pub fn get_db(&self) -> Arc<dyn LNVpsDb> {
|
||||
self.db.clone()
|
||||
}
|
||||
|
||||
/// Provision a new VM for a user on the database
|
||||
///
|
||||
/// Note:
|
||||
@ -493,58 +498,21 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::exchange::{DefaultRateCache, Ticker};
|
||||
use crate::mocks::{MockDb, MockDnsServer, MockExchangeRate, MockNode, MockRouter};
|
||||
use crate::settings::{DnsServerConfig, LightningConfig, QemuConfig, RouterConfig};
|
||||
use crate::settings::{
|
||||
mock_settings, DnsServerConfig, LightningConfig, QemuConfig, RouterConfig,
|
||||
};
|
||||
use lnvps_db::{DiskInterface, DiskType, User, UserSshKey, VmTemplate};
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
const ROUTER_BRIDGE: &str = "bridge1";
|
||||
|
||||
fn settings() -> Settings {
|
||||
Settings {
|
||||
listen: None,
|
||||
db: "".to_string(),
|
||||
public_url: "http://localhost:8000".to_string(),
|
||||
lightning: LightningConfig::LND {
|
||||
url: "".to_string(),
|
||||
cert: Default::default(),
|
||||
macaroon: Default::default(),
|
||||
},
|
||||
read_only: false,
|
||||
provisioner: ProvisionerConfig::Proxmox {
|
||||
qemu: QemuConfig {
|
||||
machine: "q35".to_string(),
|
||||
os_type: "l26".to_string(),
|
||||
bridge: "vmbr1".to_string(),
|
||||
cpu: "kvm64".to_string(),
|
||||
vlan: None,
|
||||
kvm: false,
|
||||
},
|
||||
ssh: None,
|
||||
mac_prefix: Some("ff:ff:ff".to_string()),
|
||||
},
|
||||
network_policy: NetworkPolicy {
|
||||
access: NetworkAccessPolicy::StaticArp {
|
||||
pub fn settings() -> Settings {
|
||||
let mut settings = mock_settings();
|
||||
settings.network_policy.access = NetworkAccessPolicy::StaticArp {
|
||||
interface: ROUTER_BRIDGE.to_string(),
|
||||
},
|
||||
ip6_slaac: None,
|
||||
},
|
||||
delete_after: 0,
|
||||
smtp: None,
|
||||
router: Some(RouterConfig::Mikrotik {
|
||||
url: "https://localhost".to_string(),
|
||||
username: "admin".to_string(),
|
||||
password: "password123".to_string(),
|
||||
}),
|
||||
dns: Some(DnsServerConfig::Cloudflare {
|
||||
token: "abc".to_string(),
|
||||
forward_zone_id: "123".to_string(),
|
||||
reverse_zone_id: "456".to_string(),
|
||||
}),
|
||||
nostr: None,
|
||||
revolut: None,
|
||||
tax_rate: HashMap::from([(CountryCode::IRL, 23.0), (CountryCode::USA, 1.0)]),
|
||||
}
|
||||
};
|
||||
settings
|
||||
}
|
||||
|
||||
async fn add_user(db: &Arc<MockDb>) -> Result<(User, UserSshKey)> {
|
||||
|
@ -1,4 +1,3 @@
|
||||
use std::collections::HashMap;
|
||||
use crate::dns::DnsServer;
|
||||
use crate::exchange::ExchangeRateService;
|
||||
use crate::fiat::FiatPaymentService;
|
||||
@ -6,11 +5,12 @@ use crate::lightning::LightningNode;
|
||||
use crate::provisioner::LNVpsProvisioner;
|
||||
use crate::router::Router;
|
||||
use anyhow::Result;
|
||||
use isocountry::CountryCode;
|
||||
use lnvps_db::LNVpsDb;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use isocountry::CountryCode;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
@ -260,3 +260,49 @@ impl Settings {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn mock_settings() -> Settings {
|
||||
Settings {
|
||||
listen: None,
|
||||
db: "".to_string(),
|
||||
public_url: "http://localhost:8000".to_string(),
|
||||
lightning: LightningConfig::LND {
|
||||
url: "".to_string(),
|
||||
cert: Default::default(),
|
||||
macaroon: Default::default(),
|
||||
},
|
||||
read_only: false,
|
||||
provisioner: ProvisionerConfig::Proxmox {
|
||||
qemu: QemuConfig {
|
||||
machine: "q35".to_string(),
|
||||
os_type: "l26".to_string(),
|
||||
bridge: "vmbr1".to_string(),
|
||||
cpu: "kvm64".to_string(),
|
||||
vlan: None,
|
||||
kvm: false,
|
||||
},
|
||||
ssh: None,
|
||||
mac_prefix: Some("ff:ff:ff".to_string()),
|
||||
},
|
||||
network_policy: NetworkPolicy {
|
||||
access: NetworkAccessPolicy::Auto,
|
||||
ip6_slaac: None,
|
||||
},
|
||||
delete_after: 0,
|
||||
smtp: None,
|
||||
router: Some(RouterConfig::Mikrotik {
|
||||
url: "https://localhost".to_string(),
|
||||
username: "admin".to_string(),
|
||||
password: "password123".to_string(),
|
||||
}),
|
||||
dns: Some(DnsServerConfig::Cloudflare {
|
||||
token: "abc".to_string(),
|
||||
forward_zone_id: "123".to_string(),
|
||||
reverse_zone_id: "456".to_string(),
|
||||
}),
|
||||
nostr: None,
|
||||
revolut: None,
|
||||
tax_rate: HashMap::from([(CountryCode::IRL, 23.0), (CountryCode::USA, 1.0)]),
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::info;
|
||||
use ssh2::Channel;
|
||||
use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::net::{TcpStream, ToSocketAddrs};
|
||||
|
||||
pub struct SshClient {
|
||||
@ -34,6 +34,15 @@ impl SshClient {
|
||||
Ok(channel)
|
||||
}
|
||||
|
||||
pub fn tunnel_unix_socket(&mut self, remote_path: &Path) -> Result<Channel> {
|
||||
self.session
|
||||
.channel_direct_streamlocal(
|
||||
remote_path.to_str().unwrap(),
|
||||
None,
|
||||
)
|
||||
.map_err(|e| anyhow!(e))
|
||||
}
|
||||
|
||||
pub async fn execute(&mut self, command: &str) -> Result<(i32, String)> {
|
||||
info!("Executing command: {}", command);
|
||||
let mut channel = self.session.channel_session()?;
|
||||
|
@ -251,7 +251,7 @@ impl Worker {
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
c.send_event(ev).await?;
|
||||
c.send_event(&ev).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
Reference in New Issue
Block a user