From 0202a7da5fb230558d05c1b5fcc7805c044a201f Mon Sep 17 00:00:00 2001 From: kieran Date: Mon, 9 Dec 2024 14:31:26 +0000 Subject: [PATCH] feat: overseer API setup --- Cargo.lock | 151 ++++--------------------------------- Cargo.toml | 8 +- src/bin/zap_stream_core.rs | 34 +++++---- src/http.rs | 90 ++++++++++++++++++++++ src/lib.rs | 1 + src/overseer/mod.rs | 8 ++ src/overseer/zap_stream.rs | 18 ++++- 7 files changed, 157 insertions(+), 153 deletions(-) create mode 100644 src/http.rs diff --git a/Cargo.lock b/Cargo.lock index 5b75c09..0cb0096 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,7 +227,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.0", "tokio-socks", - "tokio-tungstenite 0.24.0", + "tokio-tungstenite", "url", "wasm-bindgen", "web-sys", @@ -1410,30 +1410,6 @@ dependencies = [ "hashbrown 0.14.5", ] -[[package]] -name = "headers" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" -dependencies = [ - "base64 0.21.7", - "bytes", - "headers-core", - "http 0.2.12", - "httpdate", - "mime", - "sha1", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http 0.2.12", -] - [[package]] name = "heck" version = "0.5.0" @@ -1610,9 +1586,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" dependencies = [ "bytes", "futures-channel", @@ -1621,6 +1597,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1650,7 +1627,7 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.5.0", + "hyper 1.5.1", "hyper-util", "rustls 0.23.16", "rustls-pki-types", @@ -1680,7 +1657,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.5.0", + "hyper 1.5.1", "hyper-util", "native-tls", "tokio", @@ -1699,7 +1676,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.1", - "hyper 1.5.0", + "hyper 1.5.1", "pin-project-lite", "socket2", "tokio", @@ -2028,16 +2005,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2075,24 +2042,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "multer" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" -dependencies = [ - "bytes", - "encoding_rs", - "futures-util", - "http 0.2.12", - "httparse", - "log", - "memchr", - "mime", - "spin 0.9.8", - "version_check", -] - [[package]] name = "multimap" version = "0.10.0" @@ -2835,7 +2784,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.5.0", + "hyper 1.5.1", "hyper-rustls 0.27.3", "hyper-tls", "hyper-util", @@ -3157,12 +3106,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -4016,18 +3959,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "tokio-tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.21.0", -] - [[package]] name = "tokio-tungstenite" version = "0.24.0" @@ -4040,22 +3971,21 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", - "tungstenite 0.24.0", + "tungstenite", "webpki-roots", ] [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -4220,25 +4150,6 @@ dependencies = [ "core_maths", ] -[[package]] -name = "tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http 1.1.0", - "httparse", - "log", - "rand", - "sha1", - "thiserror 1.0.57", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.24.0" @@ -4271,12 +4182,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" -[[package]] -name = "unicase" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" - [[package]] name = "unicode-bidi" version = "0.3.15" @@ -4438,35 +4343,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "warp" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "headers", - "http 0.2.12", - "hyper 0.14.31", - "log", - "mime", - "mime_guess", - "multer", - "percent-encoding", - "pin-project", - "scoped-tls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-tungstenite 0.21.0", - "tokio-util", - "tower-service", - "tracing", -] - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -4862,6 +4738,9 @@ dependencies = [ "fontdue", "futures-util", "hex", + "http-body-util", + "hyper 1.5.1", + "hyper-util", "itertools 0.13.0", "libc", "log", @@ -4879,10 +4758,10 @@ dependencies = [ "tiny-skia", "tokio", "tokio-stream", + "tokio-util", "url", "usvg", "uuid", - "warp", "zap-stream-db", ] diff --git a/Cargo.toml b/Cargo.toml index 07f8cf6..0d23d1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,11 +47,15 @@ url = "2.5.0" itertools = "0.13.0" rand = "0.8.5" clap = { version = "4.5.16", features = ["derive"] } -warp = "0.3.7" libc = "0.2.162" m3u8-rs = "6.0.0" chrono = "^0.4.38" hex = "0.4.3" +hyper = { version = "1.5.1", features = ["server"] } +hyper-util = { version = "0.1.10", features = ["tokio"] } +bytes = "1.8.0" +http-body-util = "0.1.2" +tokio-util = "0.7.13" # srt srt-tokio = { version = "0.4.3", optional = true } @@ -73,5 +77,5 @@ fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = fa reqwest = { version = "0.12.9", optional = true, features = ["stream"] } base64 = { version = "0.22.1", optional = true } sha2 = { version = "0.10.8", optional = true } -bytes = "1.8.0" + diff --git a/src/bin/zap_stream_core.rs b/src/bin/zap_stream_core.rs index 6f7f6f1..8b4fed2 100644 --- a/src/bin/zap_stream_core.rs +++ b/src/bin/zap_stream_core.rs @@ -3,16 +3,19 @@ use clap::Parser; use config::Config; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info}; use ffmpeg_rs_raw::{av_log_redirect, rstr}; +use hyper::server::conn::http1; +use hyper_util::rt::TokioIo; use log::{error, info}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use tokio::net::TcpListener; use tokio::task::JoinHandle; use tokio::time::sleep; use url::Url; -use warp::{cors, Filter}; use zap_stream_core::background::BackgroundMonitor; +use zap_stream_core::http::HttpServer; #[cfg(feature = "rtmp")] use zap_stream_core::ingress::rtmp; #[cfg(feature = "srt")] @@ -55,23 +58,26 @@ async fn main() -> Result<()> { } let http_addr: SocketAddr = settings.listen_http.parse()?; - let http_dir = settings.output_dir.clone(); let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); + let server = HttpServer::new( + index_html, + PathBuf::from(settings.output_dir), + overseer.clone(), + ); tasks.push(tokio::spawn(async move { - let cors = cors().allow_any_origin().allow_methods(vec!["GET"]); + let listener = TcpListener::bind(&http_addr).await?; - let index_handle = warp::get() - .or(warp::path("index.html")) - .and(warp::path::end()) - .map(move |_| warp::reply::html(index_html.clone())); - - let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors); - - warp::serve(index_handle.or(dir_handle)) - .run(http_addr) - .await; - Ok(()) + loop { + let (socket, _) = listener.accept().await?; + let io = TokioIo::new(socket); + let server = server.clone(); + tokio::spawn(async move { + if let Err(e) = http1::Builder::new().serve_connection(io, server).await { + error!("Failed to handle request: {}", e); + } + }); + } })); // spawn background job diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..da85bc3 --- /dev/null +++ b/src/http.rs @@ -0,0 +1,90 @@ +use crate::overseer::Overseer; +use bytes::Bytes; +use futures_util::TryStreamExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full, StreamBody}; +use hyper::body::{Frame, Incoming}; +use hyper::service::Service; +use hyper::{Method, Request, Response}; +use std::future::Future; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; +use tokio::fs::File; +use tokio_util::io::ReaderStream; + +#[derive(Clone)] +pub struct HttpServer { + index: String, + files_dir: PathBuf, + overseer: Arc, +} + +impl HttpServer { + pub fn new(index: String, files_dir: PathBuf, overseer: Arc) -> Self { + Self { + index, + files_dir, + overseer, + } + } +} + +impl Service> for HttpServer { + type Response = Response>; + type Error = anyhow::Error; + type Future = Pin> + Send>>; + + fn call(&self, req: Request) -> Self::Future { + // check is index.html + if req.method() == Method::GET && req.uri().path() == "/" + || req.uri().path() == "/index.html" + { + let index = self.index.clone(); + return Box::pin(async move { + Ok(Response::builder() + .header("content-type", "text/html") + .header("server", "zap-stream-core") + .body( + Full::new(Bytes::from(index)) + .map_err(|e| match e {}) + .boxed(), + )?) + }); + } + + // check if mapped to file + let mut dst_path = self.files_dir.join(req.uri().path()[1..].to_string()); + if dst_path.exists() { + return Box::pin(async move { + let mut rsp = Response::builder() + .header("server", "zap-stream-core") + .header("access-control-allow-origin", "*") + .header("access-control-allow-headers", "*") + .header("access-control-allow-methods", "HEAD, GET"); + + if req.method() == Method::HEAD { + return Ok(rsp.body(BoxBody::default())?); + } + let f = File::open(&dst_path).await?; + let f_stream = ReaderStream::new(f); + let body = StreamBody::new( + f_stream + .map_ok(Frame::data) + .map_err(|e| Self::Error::new(e)), + ) + .boxed(); + Ok(rsp.body(body)?) + }); + } + + // otherwise handle in overseer + let overseer = self.overseer.clone(); + Box::pin(async move { + match overseer.api(req).await { + Ok(res) => Ok(res), + Err(e) => Err(e), + } + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index c3c8e5d..ce8b570 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod background; #[cfg(feature = "zap-stream")] pub mod blossom; pub mod egress; +pub mod http; pub mod ingress; pub mod mux; pub mod overseer; diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs index f1ba8a8..b36d19a 100644 --- a/src/overseer/mod.rs +++ b/src/overseer/mod.rs @@ -20,7 +20,12 @@ use crate::variant::video::VideoVariant; use crate::variant::VariantStream; use anyhow::Result; use async_trait::async_trait; +use bytes::Bytes; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; +use http_body_util::combinators::BoxBody; +use http_body_util::Full; +use hyper::body::Incoming; +use hyper::{Request, Response}; use std::cmp::PartialEq; use std::path::PathBuf; use std::sync::Arc; @@ -66,6 +71,9 @@ pub enum IngressStreamType { #[async_trait] /// The control process that oversees streaming operations pub trait Overseer: Send + Sync { + /// Add any API routes to the web server + async fn api(&self, req: Request) -> Result>>; + /// Check all streams async fn check_streams(&self) -> Result<()>; diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs index 3966cd8..6da6f23 100644 --- a/src/overseer/zap_stream.rs +++ b/src/overseer/zap_stream.rs @@ -8,12 +8,17 @@ use crate::settings::LndSettings; use crate::variant::StreamMapping; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; +use bytes::Bytes; use chrono::Utc; use fedimint_tonic_lnd::verrpc::VersionRequest; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame; use ffmpeg_rs_raw::Encoder; use futures_util::FutureExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::Incoming; +use hyper::{Method, Request, Response}; use log::{error, info, warn}; use nostr_sdk::bitcoin::PrivateKey; use nostr_sdk::prelude::Coordinate; @@ -27,7 +32,6 @@ use std::sync::Arc; use tokio::sync::RwLock; use url::Url; use uuid::Uuid; -use warp::Filter; use zap_stream_db::sqlx::Encode; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; @@ -217,6 +221,18 @@ impl ZapStreamOverseer { #[async_trait] impl Overseer for ZapStreamOverseer { + async fn api(&self, req: Request) -> Result>> { + Ok(match (req.method(), req.uri().path()) { + (&Method::GET, "/api/v1/account") => { + bail!("Not implemented") + } + _ => Response::builder() + .header("server", "zap-stream-core") + .status(404) + .body(Full::from("").map_err(anyhow::Error::new).boxed())?, + }) + } + async fn check_streams(&self) -> Result<()> { let active_streams = self.db.list_live_streams().await?; for stream in active_streams {