feat: overseer API setup

This commit is contained in:
kieran 2024-12-09 14:31:26 +00:00
parent f38f436b6c
commit 0202a7da5f
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
7 changed files with 157 additions and 153 deletions

151
Cargo.lock generated
View File

@ -227,7 +227,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.0",
"tokio-socks",
"tokio-tungstenite 0.24.0",
"tokio-tungstenite",
"url",
"wasm-bindgen",
"web-sys",
@ -1410,30 +1410,6 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "headers"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270"
dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 0.2.12",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http 0.2.12",
]
[[package]]
name = "heck"
version = "0.5.0"
@ -1610,9 +1586,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.5.0"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a"
checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f"
dependencies = [
"bytes",
"futures-channel",
@ -1621,6 +1597,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.1",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"smallvec",
@ -1650,7 +1627,7 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.5.0",
"hyper 1.5.1",
"hyper-util",
"rustls 0.23.16",
"rustls-pki-types",
@ -1680,7 +1657,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper 1.5.0",
"hyper 1.5.1",
"hyper-util",
"native-tls",
"tokio",
@ -1699,7 +1676,7 @@ dependencies = [
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"hyper 1.5.0",
"hyper 1.5.1",
"pin-project-lite",
"socket2",
"tokio",
@ -2028,16 +2005,6 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -2075,24 +2042,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 0.2.12",
"httparse",
"log",
"memchr",
"mime",
"spin 0.9.8",
"version_check",
]
[[package]]
name = "multimap"
version = "0.10.0"
@ -2835,7 +2784,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.5.0",
"hyper 1.5.1",
"hyper-rustls 0.27.3",
"hyper-tls",
"hyper-util",
@ -3157,12 +3106,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -4016,18 +3959,6 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.21.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.24.0"
@ -4040,22 +3971,21 @@ dependencies = [
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.0",
"tungstenite 0.24.0",
"tungstenite",
"webpki-roots",
]
[[package]]
name = "tokio-util"
version = "0.7.10"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
@ -4220,25 +4150,6 @@ dependencies = [
"core_maths",
]
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"rand",
"sha1",
"thiserror 1.0.57",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.24.0"
@ -4271,12 +4182,6 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "unicase"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df"
[[package]]
name = "unicode-bidi"
version = "0.3.15"
@ -4438,35 +4343,6 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"headers",
"http 0.2.12",
"hyper 0.14.31",
"log",
"mime",
"mime_guess",
"multer",
"percent-encoding",
"pin-project",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tungstenite 0.21.0",
"tokio-util",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@ -4862,6 +4738,9 @@ dependencies = [
"fontdue",
"futures-util",
"hex",
"http-body-util",
"hyper 1.5.1",
"hyper-util",
"itertools 0.13.0",
"libc",
"log",
@ -4879,10 +4758,10 @@ dependencies = [
"tiny-skia",
"tokio",
"tokio-stream",
"tokio-util",
"url",
"usvg",
"uuid",
"warp",
"zap-stream-db",
]

View File

@ -47,11 +47,15 @@ url = "2.5.0"
itertools = "0.13.0"
rand = "0.8.5"
clap = { version = "4.5.16", features = ["derive"] }
warp = "0.3.7"
libc = "0.2.162"
m3u8-rs = "6.0.0"
chrono = "^0.4.38"
hex = "0.4.3"
hyper = { version = "1.5.1", features = ["server"] }
hyper-util = { version = "0.1.10", features = ["tokio"] }
bytes = "1.8.0"
http-body-util = "0.1.2"
tokio-util = "0.7.13"
# srt
srt-tokio = { version = "0.4.3", optional = true }
@ -73,5 +77,5 @@ fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = fa
reqwest = { version = "0.12.9", optional = true, features = ["stream"] }
base64 = { version = "0.22.1", optional = true }
sha2 = { version = "0.10.8", optional = true }
bytes = "1.8.0"

View File

@ -3,16 +3,19 @@ use clap::Parser;
use config::Config;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
use ffmpeg_rs_raw::{av_log_redirect, rstr};
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use log::{error, info};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use url::Url;
use warp::{cors, Filter};
use zap_stream_core::background::BackgroundMonitor;
use zap_stream_core::http::HttpServer;
#[cfg(feature = "rtmp")]
use zap_stream_core::ingress::rtmp;
#[cfg(feature = "srt")]
@ -55,23 +58,26 @@ async fn main() -> Result<()> {
}
let http_addr: SocketAddr = settings.listen_http.parse()?;
let http_dir = settings.output_dir.clone();
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
let server = HttpServer::new(
index_html,
PathBuf::from(settings.output_dir),
overseer.clone(),
);
tasks.push(tokio::spawn(async move {
let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
let listener = TcpListener::bind(&http_addr).await?;
let index_handle = warp::get()
.or(warp::path("index.html"))
.and(warp::path::end())
.map(move |_| warp::reply::html(index_html.clone()));
let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors);
warp::serve(index_handle.or(dir_handle))
.run(http_addr)
.await;
Ok(())
loop {
let (socket, _) = listener.accept().await?;
let io = TokioIo::new(socket);
let server = server.clone();
tokio::spawn(async move {
if let Err(e) = http1::Builder::new().serve_connection(io, server).await {
error!("Failed to handle request: {}", e);
}
});
}
}));
// spawn background job

90
src/http.rs Normal file
View File

@ -0,0 +1,90 @@
use crate::overseer::Overseer;
use bytes::Bytes;
use futures_util::TryStreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{Frame, Incoming};
use hyper::service::Service;
use hyper::{Method, Request, Response};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
#[derive(Clone)]
pub struct HttpServer {
index: String,
files_dir: PathBuf,
overseer: Arc<dyn Overseer>,
}
impl HttpServer {
pub fn new(index: String, files_dir: PathBuf, overseer: Arc<dyn Overseer>) -> Self {
Self {
index,
files_dir,
overseer,
}
}
}
impl Service<Request<Incoming>> for HttpServer {
type Response = Response<BoxBody<Bytes, Self::Error>>;
type Error = anyhow::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
// check is index.html
if req.method() == Method::GET && req.uri().path() == "/"
|| req.uri().path() == "/index.html"
{
let index = self.index.clone();
return Box::pin(async move {
Ok(Response::builder()
.header("content-type", "text/html")
.header("server", "zap-stream-core")
.body(
Full::new(Bytes::from(index))
.map_err(|e| match e {})
.boxed(),
)?)
});
}
// check if mapped to file
let mut dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
if dst_path.exists() {
return Box::pin(async move {
let mut rsp = Response::builder()
.header("server", "zap-stream-core")
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET");
if req.method() == Method::HEAD {
return Ok(rsp.body(BoxBody::default())?);
}
let f = File::open(&dst_path).await?;
let f_stream = ReaderStream::new(f);
let body = StreamBody::new(
f_stream
.map_ok(Frame::data)
.map_err(|e| Self::Error::new(e)),
)
.boxed();
Ok(rsp.body(body)?)
});
}
// otherwise handle in overseer
let overseer = self.overseer.clone();
Box::pin(async move {
match overseer.api(req).await {
Ok(res) => Ok(res),
Err(e) => Err(e),
}
})
}
}

View File

@ -2,6 +2,7 @@ pub mod background;
#[cfg(feature = "zap-stream")]
pub mod blossom;
pub mod egress;
pub mod http;
pub mod ingress;
pub mod mux;
pub mod overseer;

View File

@ -20,7 +20,12 @@ use crate::variant::video::VideoVariant;
use crate::variant::VariantStream;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use http_body_util::combinators::BoxBody;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::{Request, Response};
use std::cmp::PartialEq;
use std::path::PathBuf;
use std::sync::Arc;
@ -66,6 +71,9 @@ pub enum IngressStreamType {
#[async_trait]
/// The control process that oversees streaming operations
pub trait Overseer: Send + Sync {
/// Add any API routes to the web server
async fn api(&self, req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, anyhow::Error>>>;
/// Check all streams
async fn check_streams(&self) -> Result<()>;

View File

@ -8,12 +8,17 @@ use crate::settings::LndSettings;
use crate::variant::StreamMapping;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use fedimint_tonic_lnd::verrpc::VersionRequest;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
use ffmpeg_rs_raw::Encoder;
use futures_util::FutureExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::{Method, Request, Response};
use log::{error, info, warn};
use nostr_sdk::bitcoin::PrivateKey;
use nostr_sdk::prelude::Coordinate;
@ -27,7 +32,6 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use url::Url;
use uuid::Uuid;
use warp::Filter;
use zap_stream_db::sqlx::Encode;
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
@ -217,6 +221,18 @@ impl ZapStreamOverseer {
#[async_trait]
impl Overseer for ZapStreamOverseer {
async fn api(&self, req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, anyhow::Error>>> {
Ok(match (req.method(), req.uri().path()) {
(&Method::GET, "/api/v1/account") => {
bail!("Not implemented")
}
_ => Response::builder()
.header("server", "zap-stream-core")
.status(404)
.body(Full::from("").map_err(anyhow::Error::new).boxed())?,
})
}
async fn check_streams(&self) -> Result<()> {
let active_streams = self.db.list_live_streams().await?;
for stream in active_streams {