feat: terminal proxy

This commit is contained in:
2024-12-21 18:01:41 +00:00
parent 8641eeeca8
commit 3e7e0a789b
9 changed files with 377 additions and 77 deletions

51
Cargo.lock generated
View File

@ -194,7 +194,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.0",
"tokio-socks",
"tokio-tungstenite",
"tokio-tungstenite 0.24.0",
"url",
"wasm-bindgen",
"web-sys",
@ -2049,17 +2049,20 @@ dependencies = [
"lettre",
"lnvps_db",
"log",
"native-tls",
"nostr",
"nostr-sdk",
"pretty_env_logger",
"rand",
"reqwest",
"rocket",
"rocket_ws",
"serde",
"serde_json",
"ssh-key",
"ssh2",
"tokio",
"tokio-tungstenite 0.21.0",
"urlencoding",
]
@ -3166,6 +3169,16 @@ dependencies = [
"uncased",
]
[[package]]
name = "rocket_ws"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25f1877668c937b701177c349f21383c556cd3bb4ba8fa1d07fa96ccb3a8782e"
dependencies = [
"rocket",
"tokio-tungstenite 0.21.0",
]
[[package]]
name = "ron"
version = "0.8.1"
@ -4196,6 +4209,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite 0.21.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.24.0"
@ -4208,7 +4235,7 @@ dependencies = [
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.0",
"tungstenite",
"tungstenite 0.24.0",
"webpki-roots",
]
@ -4402,6 +4429,26 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"native-tls",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.24.0"

View File

@ -33,6 +33,10 @@ clap = { version = "4.5.21", features = ["derive"] }
ssh2 = "0.9.4"
ssh-key = "0.6.7"
lettre = { version = "0.11.10", features = ["tokio1-native-tls"] }
ws = { package = "rocket_ws", version = "0.1.0" }
tokio-tungstenite = { version = "^0.21", features = ["native-tls"] }
native-tls = "0.2.12"
#nostr-dm
nostr-sdk = { version = "0.37.0", optional = true, default-features = false, features = ["nip44", "nip59"] }
nostr-sdk = { version = "0.37.0", optional = true, default-features = false, features = ["nip44", "nip59"] }

View File

@ -2,14 +2,21 @@ use crate::nip98::Nip98Auth;
use crate::provisioner::Provisioner;
use crate::status::{VmState, VmStateCache};
use crate::worker::WorkJob;
use anyhow::{bail, Result};
use lnvps_db::hydrate::Hydrate;
use lnvps_db::{LNVpsDb, UserSshKey, Vm, VmOsImage, VmPayment, VmTemplate};
use log::{debug, error, warn};
use nostr::util::hex;
use rocket::futures::{Sink, SinkExt, StreamExt};
use rocket::serde::json::Json;
use rocket::{get, patch, post, routes, Responder, Route, State};
use serde::{Deserialize, Serialize};
use ssh_key::PublicKey;
use std::error::Error;
use std::fmt::Display;
use std::mem::transmute;
use tokio::sync::mpsc::UnboundedSender;
use ws::Message;
pub fn routes() -> Vec<Route> {
routes![
@ -24,7 +31,8 @@ pub fn routes() -> Vec<Route> {
v1_get_payment,
v1_start_vm,
v1_stop_vm,
v1_restart_vm
v1_restart_vm,
v1_terminal_proxy
]
}
@ -289,6 +297,112 @@ async fn v1_get_payment(
ApiData::ok(payment)
}
#[get("/api/v1/console/<id>?<auth>")]
async fn v1_terminal_proxy(
auth: &str,
db: &State<Box<dyn LNVpsDb>>,
provisioner: &State<Box<dyn Provisioner>>,
id: u64,
ws: ws::WebSocket,
) -> Result<ws::Channel<'static>, &'static str> {
let auth = Nip98Auth::from_base64(auth).map_err(|e| "Missing or invalid auth param")?;
if auth.check(&format!("/api/v1/console/{id}"), "GET").is_err() {
return Err("Invalid auth event");
}
let pubkey = auth.event.pubkey.to_bytes();
let uid = db.upsert_user(&pubkey).await.map_err(|_| "Insert failed")?;
let vm = db.get_vm(id).await.map_err(|_| "VM not found")?;
if uid != vm.user_id {
return Err("VM does not belong to you");
}
let ws_upstream = provisioner.terminal_proxy(vm.id).await.map_err(|e| {
error!("Failed to start terminal proxy: {}", e);
"Failed to open terminal proxy"
})?;
let ws = ws.config(Default::default());
Ok(ws.channel(move |mut stream| {
Box::pin(async move {
let (mut tx_upstream, mut rx_upstream) = ws_upstream.split();
let (mut tx_client, mut rx_client) = stream.split();
async fn process_client<S, E>(
msg: Result<Message, E>,
tx_upstream: &mut S,
) -> Result<()>
where
S: SinkExt<Message> + Unpin,
<S as Sink<Message>>::Error: Display,
E: Display,
{
match msg {
Ok(m) => {
let m_up = match m {
Message::Text(t) => Message::Text(format!("0:{}:{}", t.len(), t)),
_ => panic!("todo"),
};
debug!("Sending data to upstream: {:?}", m_up);
if let Err(e) = tx_upstream.send(m_up).await {
bail!("Failed to send msg to upstream: {}", e);
}
}
Err(e) => {
bail!("Failed to read from client: {}", e);
}
}
Ok(())
}
async fn process_upstream<S, E>(
msg: Result<Message, E>,
tx_client: &mut S,
) -> Result<()>
where
S: SinkExt<Message> + Unpin,
<S as Sink<Message>>::Error: Display,
E: Display,
{
match msg {
Ok(m) => {
let m_down = match m {
Message::Binary(data) => {
Message::Text(String::from_utf8_lossy(&data).to_string())
}
_ => panic!("todo"),
};
debug!("Sending data to downstream: {:?}", m_down);
if let Err(e) = tx_client.send(m_down).await {
bail!("Failed to msg to client: {}", e);
}
}
Err(e) => {
bail!("Failed to read from upstream: {}", e);
}
}
Ok(())
}
loop {
tokio::select! {
Some(msg) = rx_client.next() => {
if let Err(e) = process_client(msg, &mut tx_upstream).await {
error!("{}", e);
break;
}
},
Some(msg) = rx_upstream.next() => {
if let Err(e) = process_upstream(msg, &mut tx_client).await {
error!("{}", e);
break;
}
}
}
}
Ok(())
})
}))
}
#[derive(Deserialize)]
struct CreateVmRequest {
template_id: u64,

View File

@ -16,6 +16,7 @@ use nostr::Keys;
use nostr_sdk::Client;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use tokio::time::sleep;
#[derive(Parser)]
#[clap(about, version, author)]
@ -101,6 +102,7 @@ async fn main() -> Result<(), Error> {
if let Err(e) = handler.listen().await {
error!("invoice-error: {}", e);
}
sleep(Duration::from_secs(5)).await;
}
});
// request work every 30s to check vm status

View File

@ -1,12 +1,18 @@
use anyhow::{anyhow, bail, Result};
use log::debug;
use reqwest::{ClientBuilder, Method, Url};
use rocket::futures::{SinkExt, StreamExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::sleep;
use tokio_tungstenite::tungstenite::handshake::client::{generate_key, Request};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{Connector, MaybeTlsStream, WebSocketStream};
pub struct ProxmoxClient {
base: Url,
@ -253,7 +259,68 @@ impl ProxmoxClient {
})
}
/// Create terminal proxy session
pub async fn terminal_proxy(&self, node: &str, vm: u64) -> Result<TerminalProxyTicket> {
let rsp: ResponseBase<TerminalProxyTicket> = self
.post(
&format!("/api2/json/nodes/{}/qemu/{}/termproxy", node, vm),
(),
)
.await?;
Ok(rsp.data)
}
/// Open websocket connection to terminal proxy
pub async fn open_terminal_proxy(
&self,
node: &str,
vm: u64,
req: TerminalProxyTicket,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
self.get_task_status(&TaskId {
id: req.upid,
node: node.to_string(),
})
.await?;
let mut url: Url = self.base.join(&format!(
"/api2/json/nodes/{}/qemu/{}/vncwebsocket",
node, vm
))?;
url.set_scheme("wss").unwrap();
url.query_pairs_mut().append_pair("port", &req.port);
url.query_pairs_mut().append_pair("vncticket", &req.ticket);
let r = Request::builder()
.method("GET")
.header("Host", url.host().unwrap().to_string())
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", generate_key())
.header("Sec-WebSocket-Protocol", "binary")
.header("Authorization", format!("PVEAPIToken={}", self.token))
.uri(url.as_str())
.body(())?;
debug!("Connecting terminal proxy: {:?}", &r);
let (ws, _rsp) = tokio_tungstenite::connect_async_tls_with_config(
r,
None,
false,
Some(Connector::NativeTls(
native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()?,
)),
)
.await?;
Ok(ws)
}
async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
debug!(">> GET {}", path);
let rsp = self
.client
.get(self.base.join(path)?)
@ -304,6 +371,14 @@ impl ProxmoxClient {
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct TerminalProxyTicket {
pub port: String,
pub ticket: String,
pub upid: String,
pub user: String,
}
#[derive(Debug, Clone)]
pub struct TaskId {
pub id: String,

View File

@ -1,7 +1,9 @@
use anyhow::bail;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use log::debug;
use nostr::{Event, JsonUtil, Kind, Timestamp};
use reqwest::Url;
use rocket::http::uri::{Absolute, Uri};
use rocket::http::Status;
use rocket::request::{FromRequest, Outcome};
@ -11,82 +13,97 @@ pub struct Nip98Auth {
pub event: Event,
}
#[async_trait]
impl<'r> FromRequest<'r> for Nip98Auth {
type Error = &'static str;
impl Nip98Auth {
pub fn check(&self, path: &str, method: &str) -> anyhow::Result<()> {
if self.event.kind != Kind::HttpAuth {
bail!("Wrong event kind");
}
if self
.event
.created_at
.as_u64()
.abs_diff(Timestamp::now().as_u64())
> 600
{
bail!("Created timestamp is out of range");
}
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
if let Some(auth) = request.headers().get_one("authorization") {
if auth.starts_with("Nostr ") {
let event = if let Ok(j) = BASE64_STANDARD.decode(&auth[6..]) {
if let Ok(ev) = Event::from_json(j) {
ev
} else {
return Outcome::Error((Status::new(403), "Invalid nostr event"));
}
} else {
return Outcome::Error((Status::new(403), "Invalid auth string"));
};
if event.kind != Kind::HttpAuth {
return Outcome::Error((Status::new(401), "Wrong event kind"));
}
if event
.created_at
.as_u64()
.abs_diff(Timestamp::now().as_u64())
> 600
{
return Outcome::Error((Status::new(401), "Created timestamp is out of range"));
}
// check url tag
if let Some(url) = event.tags.iter().find_map(|t| {
let vec = t.as_slice();
if vec[0] == "u" {
Some(vec[1].clone())
} else {
None
}
}) {
if let Ok(u_req) = Uri::parse::<Absolute>(&url) {
if request.uri().path() != u_req.absolute().unwrap().path() {
return Outcome::Error((Status::new(401), "U tag does not match"));
}
} else {
return Outcome::Error((Status::new(401), "Invalid U tag"));
}
} else {
return Outcome::Error((Status::new(401), "Missing url tag"));
}
// check method tag
if let Some(method) = event.tags.iter().find_map(|t| {
let vec = t.as_slice();
if vec[0] == "method" {
Some(vec[1].clone())
} else {
None
}
}) {
if request.method().to_string() != *method {
return Outcome::Error((Status::new(401), "Method tag incorrect"));
}
} else {
return Outcome::Error((Status::new(401), "Missing method tag"));
}
if let Err(_err) = event.verify() {
return Outcome::Error((Status::new(401), "Event signature invalid"));
}
debug!("{}", event.as_json());
Outcome::Success(Nip98Auth { event })
// check url tag
if let Some(url) = self.event.tags.iter().find_map(|t| {
let vec = t.as_slice();
if vec[0] == "u" {
Some(vec[1].clone())
} else {
Outcome::Error((Status::new(403), "Auth scheme must be Nostr"))
None
}
}) {
if let Ok(u_req) = Uri::parse::<Absolute>(&url) {
if path != u_req.absolute().unwrap().path() {
bail!("U tag does not match");
}
} else {
bail!("Invalid U tag");
}
} else {
Outcome::Error((Status::new(403), "Auth header not found"))
bail!("Missing url tag");
}
// check method tag
if let Some(t_method) = self.event.tags.iter().find_map(|t| {
let vec = t.as_slice();
if vec[0] == "method" {
Some(vec[1].clone())
} else {
None
}
}) {
if method != t_method {
bail!("Method tag incorrect")
}
} else {
bail!("Missing method tag")
}
if let Err(_err) = self.event.verify() {
bail!("Event signature invalid");
}
debug!("{}", self.event.as_json());
Ok(())
}
pub fn from_base64(i: &str) -> anyhow::Result<Self> {
if let Ok(j) = BASE64_STANDARD.decode(i) {
if let Ok(ev) = Event::from_json(j) {
Ok(Self { event: ev })
} else {
bail!("Invalid nostr event")
}
} else {
bail!("Invalid auth string");
}
}
}
#[async_trait]
impl<'r> FromRequest<'r> for Nip98Auth {
type Error = String;
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
if let Some(auth) = request.headers().get_one("authorization") {
if !auth.starts_with("Nostr ") {
return Outcome::Error((Status::new(403), "Auth scheme must be Nostr".to_string()));
}
let auth = Nip98Auth::from_base64(&auth[6..]).unwrap();
match auth.check(
request.uri().to_string().as_str(),
request.method().as_str(),
) {
Ok(_) => Outcome::Success(auth),
Err(e) => Outcome::Error((Status::new(401), e.to_string())),
}
} else {
Outcome::Error((Status::new(403), "Auth header not found".to_string()))
}
}
}

View File

@ -16,15 +16,19 @@ use fedimint_tonic_lnd::Client;
use ipnetwork::IpNetwork;
use lnvps_db::hydrate::Hydrate;
use lnvps_db::{IpRange, LNVpsDb, Vm, VmCostPlanIntervalType, VmIpAssignment, VmPayment};
use log::{info, warn};
use log::{debug, info, warn};
use nostr::util::hex;
use rand::random;
use rand::seq::IteratorRandom;
use reqwest::Url;
use rocket::futures::{SinkExt, StreamExt};
use std::collections::HashSet;
use std::net::IpAddr;
use std::ops::Add;
use std::time::Duration;
use tokio::net::{TcpSocket, TcpStream};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub struct LNVpsProvisioner {
db: Box<dyn LNVpsDb>,
@ -485,4 +489,30 @@ impl Provisioner for LNVpsProvisioner {
Ok(())
}
async fn terminal_proxy(
&self,
vm_id: u64,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let vm = self.db.get_vm(vm_id).await?;
let host = self.db.get_host(vm.host_id).await?;
let client = get_host_client(&host)?;
let host_vm_id = vm.id + 100;
let term = client.terminal_proxy(&host.name, host_vm_id).await?;
let login_msg = format!("{}:{}\n", term.user, term.ticket);
let mut ws = client
.open_terminal_proxy(&host.name, host_vm_id, term)
.await?;
debug!("Sending login msg: {}", login_msg);
ws.send(Message::Text(login_msg)).await?;
if let Some(n) = ws.next().await {
debug!("{:?}", n);
} else {
bail!("No response from terminal_proxy");
}
ws.send(Message::Text("1:86:24:".to_string())).await?;
Ok(ws)
}
}

View File

@ -1,6 +1,8 @@
use anyhow::Result;
use lnvps_db::{Vm, VmIpAssignment, VmPayment};
use rocket::async_trait;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub mod lnvps;
@ -42,4 +44,7 @@ pub trait Provisioner: Send + Sync {
/// Delete a VM
async fn delete_vm(&self, vm_id: u64) -> Result<()>;
/// Open terminal proxy connection
async fn terminal_proxy(&self, vm_id: u64) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>>;
}

View File

@ -1,5 +1,6 @@
use anyhow::Result;
use log::info;
use ssh2::Channel;
use std::io::Read;
use std::path::PathBuf;
use tokio::net::{TcpStream, ToSocketAddrs};
@ -28,6 +29,11 @@ impl SshClient {
Ok(())
}
pub async fn open_channel(&mut self) -> Result<Channel> {
let channel = self.session.channel_session()?;
Ok(channel)
}
pub async fn execute(&mut self, command: &str) -> Result<(i32, String)> {
info!("Executing command: {}", command);
let mut channel = self.session.channel_session()?;