feat: terminal (wip)

This commit is contained in:
2025-03-24 09:57:30 +00:00
parent 9ee4232706
commit cbafca8da7
7 changed files with 255 additions and 25 deletions

25
Cargo.lock generated
View File

@ -2079,6 +2079,12 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "lru"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465"
[[package]]
name = "matchers"
version = "0.1.0"
@ -2201,9 +2207,9 @@ dependencies = [
[[package]]
name = "nostr"
version = "0.39.0"
version = "0.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d90b55eff1f0747d9e423972179672e1aacac3d3ccee4c1281147eaa90d6491e"
checksum = "2f900ddcdc28395759fcd44b18a03255e7deee8858551bfe5d5d5a07311d82ea"
dependencies = [
"base64 0.22.1",
"bech32",
@ -2214,6 +2220,7 @@ dependencies = [
"chacha20poly1305",
"getrandom 0.2.15",
"instant",
"regex",
"scrypt",
"secp256k1",
"serde",
@ -2224,23 +2231,25 @@ dependencies = [
[[package]]
name = "nostr-database"
version = "0.39.0"
version = "0.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce07b47c77b8e5a856727885fe0ae47b9aa53d8d853a2190dd479b5a0d6e4f52"
checksum = "714512e4653f4e7c4f4abb50a0ac82257541b22087dee780b9e3d787276e88d4"
dependencies = [
"lru",
"nostr",
"tokio",
]
[[package]]
name = "nostr-relay-pool"
version = "0.39.0"
version = "0.40.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "211ac5bbdda1a8eec0c21814a838da832038767a5d354fe2fcc1ca438cae56fd"
checksum = "5bde07a729e0a1b306c9a07da81a0d1d55d0487316017090906f3b6660741b8d"
dependencies = [
"async-utility",
"async-wsocket",
"atomic-destructor",
"lru",
"negentropy 0.3.1",
"negentropy 0.5.0",
"nostr",
@ -2251,9 +2260,9 @@ dependencies = [
[[package]]
name = "nostr-sdk"
version = "0.39.0"
version = "0.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baca581deb810a88bb51c54d1d7980f4506a64a3e9a19270829b406e47adf31"
checksum = "26238eee805d7dc3abcc8d570068c81cb4285b08e9db4d7999e54e20748c472e"
dependencies = [
"async-utility",
"nostr",

View File

@ -7,9 +7,10 @@ edition = "2021"
name = "api"
[features]
default = ["mikrotik", "nostr-dm", "proxmox", "lnd", "cloudflare", "revolut", "bitvora"]
default = ["mikrotik", "nostr-dm", "nostr-dvm", "proxmox", "lnd", "cloudflare", "revolut", "bitvora"]
mikrotik = ["dep:reqwest"]
nostr-dm = ["dep:nostr-sdk"]
nostr-dvm = ["dep:nostr-sdk"]
proxmox = ["dep:reqwest", "dep:ssh2", "dep:tokio-tungstenite"]
libvirt = ["dep:virt"]
lnd = ["dep:fedimint-tonic-lnd"]
@ -19,7 +20,7 @@ revolut = ["dep:reqwest", "dep:sha2", "dep:hmac"]
[dependencies]
lnvps_db = { path = "lnvps_db" }
tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util"] }
anyhow = "1.0.83"
config = { version = "0.15.8", features = ["yaml"] }
log = "0.4.21"
@ -37,15 +38,15 @@ rand = "0.9.0"
clap = { version = "4.5.21", features = ["derive"] }
ssh-key = "0.6.7"
lettre = { version = "0.11.10", features = ["tokio1-native-tls"] }
ws = { package = "rocket_ws", version = "0.1.0" }
ws = { package = "rocket_ws", version = "0.1.1" }
native-tls = "0.2.12"
hex = "0.4.3"
futures = "0.3.31"
isocountry = "0.3.2"
#nostr-dm
nostr = { version = "0.39.0", default-features = false, features = ["std"] }
nostr-sdk = { version = "0.39.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
nostr = { version = "0.40.0", default-features = false, features = ["std"] }
nostr-sdk = { version = "0.40.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
#proxmox
tokio-tungstenite = { version = "^0.21", features = ["native-tls"], optional = true }

View File

@ -11,15 +11,17 @@ use crate::provisioner::{HostCapacityService, LNVpsProvisioner, PricingEngine};
use crate::settings::Settings;
use crate::status::{VmState, VmStateCache};
use crate::worker::WorkJob;
use anyhow::Result;
use anyhow::{bail, Result};
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
use isocountry::CountryCode;
use lnvps_db::{
IpRange, LNVpsDb, PaymentMethod, VmCustomPricing, VmCustomPricingDisk, VmCustomTemplate,
};
use log::{error, info};
use nostr::util::hex;
use rocket::serde::json::Json;
use rocket::{get, patch, post, Responder, Route, State};
use rocket::{get, patch, post, routes, Responder, Route, State};
use rocket_okapi::gen::OpenApiGenerator;
use rocket_okapi::okapi::openapi3::Responses;
use rocket_okapi::response::OpenApiResponderInner;
@ -28,12 +30,15 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use ssh_key::PublicKey;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{Sender, UnboundedSender};
pub fn routes() -> Vec<Route> {
openapi_get_routes![
let mut routes = vec![];
routes.append(&mut openapi_get_routes![
v1_get_account,
v1_patch_account,
v1_list_vms,
@ -54,7 +59,11 @@ pub fn routes() -> Vec<Route> {
v1_custom_template_calc,
v1_create_custom_vm_order,
v1_get_payment_methods
]
]);
routes.append(&mut routes![v1_terminal_proxy]);
routes
}
type ApiResult<T> = Result<Json<ApiData<T>>, ApiError>;
@ -639,6 +648,117 @@ async fn v1_time_series(
ApiData::ok(client.get_time_series_data(&vm, TimeSeries::Hourly).await?)
}
#[get("/api/v1/vm/<id>/console?<auth>")]
async fn v1_terminal_proxy(
auth: &str,
db: &State<Arc<dyn LNVpsDb>>,
settings: &State<Settings>,
id: u64,
ws: ws::WebSocket,
) -> Result<ws::Channel<'static>, &'static str> {
let auth = Nip98Auth::from_base64(auth).map_err(|e| "Missing or invalid auth param")?;
if auth
.check(&format!("/api/v1/vm/{id}/console"), "GET")
.is_err()
{
return Err("Invalid auth event");
}
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await.map_err(|_| "Insert failed")?;
let vm = db.get_vm(id).await.map_err(|_| "VM not found")?;
if uid != vm.user_id {
return Err("VM does not belong to you");
}
let host = db
.get_host(vm.host_id)
.await
.map_err(|_| "VM host not found")?;
let client =
get_host_client(&host, &settings.provisioner).map_err(|_| "Failed to get host client")?;
let mut ws_upstream = client.connect_terminal(&vm).await.map_err(|e| {
error!("Failed to start terminal proxy: {}", e);
"Failed to open terminal proxy"
})?;
let ws = ws.config(Default::default());
Ok(ws.channel(move |mut stream| {
use ws::*;
Box::pin(async move {
async fn process_client<E>(
msg: Result<Message, E>,
ws_upstream: &mut Sender<Vec<u8>>,
) -> Result<()>
where
E: Display,
{
match msg {
Ok(m) => {
let m_up = match m {
Message::Text(t) => {
info!("Got msg: {}", t);
t.as_bytes().to_vec()
}
_ => panic!("todo"),
};
if let Err(e) = ws_upstream.send(m_up).await {
bail!("Failed to send msg to upstream: {}", e);
}
}
Err(e) => {
bail!("Failed to read from client: {}", e);
}
}
Ok(())
}
async fn process_upstream<E>(
msg: Result<Vec<u8>, E>,
tx_client: &mut stream::DuplexStream,
) -> Result<()>
where
E: Display,
{
match msg {
Ok(m) => {
let down = String::from_utf8_lossy(&m).into_owned();
info!("Got down msg: {}", &down);
let m_down = Message::Text(down);
if let Err(e) = tx_client.send(m_down).await {
bail!("Failed to msg to client: {}", e);
}
}
Err(e) => {
bail!("Failed to read from upstream: {}", e);
}
}
Ok(())
}
loop {
tokio::select! {
Some(msg) = stream.next() => {
if let Err(e) = process_client(msg, &mut ws_upstream.tx).await {
error!("{}", e);
break;
}
},
Some(r) = ws_upstream.rx.recv() => {
let msg: Result<Vec<u8>, anyhow::Error> = Ok(r);
if let Err(e) = process_upstream(msg, &mut stream).await {
error!("{}", e);
break;
}
}
}
}
info!("Websocket closed");
Ok(())
})
}))
}
#[openapi(tag = "Payment")]
#[get("/api/v1/payment/methods")]
async fn v1_get_payment_methods(settings: &State<Settings>) -> ApiResult<Vec<ApiPaymentInfo>> {

View File

@ -2,6 +2,7 @@ use crate::settings::ProvisionerConfig;
use crate::status::VmState;
use anyhow::{bail, Result};
use futures::future::join_all;
use futures::{Sink, Stream};
use lnvps_db::{
async_trait, IpRange, LNVpsDb, UserSshKey, Vm, VmCustomTemplate, VmHost, VmHostDisk,
VmHostKind, VmIpAssignment, VmOsImage, VmTemplate,
@ -9,13 +10,24 @@ use lnvps_db::{
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Semaphore;
#[cfg(feature = "libvirt")]
mod libvirt;
//#[cfg(feature = "libvirt")]
//mod libvirt;
#[cfg(feature = "proxmox")]
mod proxmox;
pub struct TerminalStream {
pub shutdown: Arc<AtomicBool>,
pub rx: Receiver<Vec<u8>>,
pub tx: Sender<Vec<u8>>,
}
/// Generic type for creating VM's
#[async_trait]
pub trait VmHostClient: Send + Sync {
@ -52,6 +64,9 @@ pub trait VmHostClient: Send + Sync {
vm: &Vm,
series: TimeSeries,
) -> Result<Vec<TimeSeriesData>>;
/// Connect to terminal serial port
async fn connect_terminal(&self, vm: &Vm) -> Result<TerminalStream>;
}
pub fn get_host_client(host: &VmHost, cfg: &ProvisionerConfig) -> Result<Arc<dyn VmHostClient>> {

View File

@ -1,23 +1,35 @@
use crate::host::{FullVmInfo, TimeSeries, TimeSeriesData, VmHostClient};
use crate::host::{FullVmInfo, TerminalStream, TimeSeries, TimeSeriesData, VmHostClient};
use crate::json_api::JsonApi;
use crate::settings::{QemuConfig, SshConfig};
use crate::ssh_client::SshClient;
use crate::status::{VmRunningState, VmState};
use anyhow::{anyhow, bail, ensure, Result};
use chrono::Utc;
use futures::{Stream, StreamExt};
use ipnetwork::IpNetwork;
use lnvps_db::{async_trait, DiskType, Vm, VmOsImage};
use log::{info, warn};
use log::{error, info, warn};
use rand::random;
use reqwest::header::{HeaderMap, AUTHORIZATION};
use reqwest::{ClientBuilder, Method, Url};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::io::{Read, Write};
use std::net::IpAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::channel;
use tokio::sync::Semaphore;
use tokio::time;
use tokio::time::sleep;
use tokio_tungstenite::tungstenite::protocol::Role;
use tokio_tungstenite::WebSocketStream;
use ws::stream::DuplexStream;
pub struct ProxmoxClient {
api: JsonApi,
@ -650,6 +662,70 @@ impl VmHostClient for ProxmoxClient {
.await?;
Ok(r.into_iter().map(TimeSeriesData::from).collect())
}
async fn connect_terminal(&self, vm: &Vm) -> Result<TerminalStream> {
// the proxmox api for terminal connection is weird and doesn't work
// when I tested it, using ssh instead to run qm terminal command
if let Some(ssh_config) = &self.ssh {
let mut ses = SshClient::new()?;
ses.connect(
(self.api.base.host().unwrap().to_string(), 22),
&ssh_config.user,
&ssh_config.key,
)
.await?;
let vm_id: ProxmoxVmId = vm.id.into();
let sock_path = PathBuf::from(&format!("/var/run/qemu-server/{}.serial0", vm_id));
let mut chan = ses.tunnel_unix_socket(&sock_path)?;
let (mut client_tx, client_rx) = channel::<Vec<u8>>(1024);
let (server_tx, mut server_rx) = channel::<Vec<u8>>(1024);
let shutdown = Arc::new(AtomicBool::new(false));
let shut_chan = shutdown.clone();
tokio::spawn(async move {
let mut w_buf = vec![0; 4096];
// fire calls to read every 100ms
let mut chan_timer = time::interval(Duration::from_millis(100));
loop {
tokio::select! {
Some(buf) = server_rx.recv() => {
if let Err(e) = chan.write_all(&buf) {
error!("Failed to send data: {}", e);
}
}
_ = chan_timer.tick() => {
if chan.eof() {
info!("SSH connection terminated!");
shut_chan.store(true, Ordering::Relaxed);
break;
}
let r_window = chan.read_window();
let mut stream = chan.stream(0);
if let Ok(r) = stream.read(w_buf.as_mut_slice()) {
if r > 0 {
if let Err(e) = client_tx.send(w_buf[..r].to_vec()).await {
error!("Failed to write data: {}", e);
}
}
}
}
}
}
info!("SSH connection terminated!");
});
return Ok(TerminalStream{
shutdown,
rx: client_rx,
tx: server_tx,
});
}
bail!("Cannot use terminal proxy without ssh")
}
}
/// Wrap a database vm id

View File

@ -1,8 +1,8 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use log::info;
use ssh2::Channel;
use std::io::Read;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use tokio::net::{TcpStream, ToSocketAddrs};
pub struct SshClient {
@ -34,6 +34,15 @@ impl SshClient {
Ok(channel)
}
pub fn tunnel_unix_socket(&mut self, remote_path: &Path) -> Result<Channel> {
self.session
.channel_direct_streamlocal(
remote_path.to_str().unwrap(),
None,
)
.map_err(|e| anyhow!(e))
}
pub async fn execute(&mut self, command: &str) -> Result<(i32, String)> {
info!("Executing command: {}", command);
let mut channel = self.session.channel_session()?;

View File

@ -251,7 +251,7 @@ impl Worker {
None,
)
.await?;
c.send_event(ev).await?;
c.send_event(&ev).await?;
}
}
Ok(())