diff --git a/Cargo.lock b/Cargo.lock index 2c64039..6e5e0f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1044,7 +1044,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=df69b2f05da4279e36ad55086d77b45b2caf5174#df69b2f05da4279e36ad55086d77b45b2caf5174" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385#a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" dependencies = [ "anyhow", "ffmpeg-sys-the-third", diff --git a/Cargo.toml b/Cargo.toml index 8181b02..8695427 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ ] [workspace.dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "df69b2f05da4279e36ad55086d77b45b2caf5174" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } async-trait = "0.1.77" diff --git a/TODO.md b/TODO.md index 5a0492e..65f16ba 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,4 @@ -- RTMP? - Setup multi-variant output - API parity https://git.v0l.io/Kieran/zap.stream/issues/7 - HLS-LL -- Delete old segments (HLS+N94) \ No newline at end of file +- Delete old segments (N94) \ No newline at end of file diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 4454d9c..21978d7 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -144,7 +144,7 @@ impl PipelineRunner { }; // run transcoder pipeline - let (mut pkt, stream) = self.demuxer.get_packet()?; + let (mut pkt, _stream) = self.demuxer.get_packet()?; if pkt.is_null() { return Ok(false); } @@ -159,7 +159,7 @@ impl PipelineRunner { }; let mut egress_results = vec![]; - for frame in frames { + for (frame, stream) in frames { // Copy frame from GPU if using hwaccel decoding let mut frame = get_frame_from_hw(frame)?; (*frame).time_base = (*stream).time_base; diff --git a/crates/zap-stream-db/src/db.rs b/crates/zap-stream-db/src/db.rs index bc4a077..9293bc3 100644 --- a/crates/zap-stream-db/src/db.rs +++ b/crates/zap-stream-db/src/db.rs @@ -3,6 +3,7 @@ use anyhow::Result; use sqlx::{Executor, MySqlPool, Row}; use uuid::Uuid; +#[derive(Clone)] pub struct ZapStreamDb { db: MySqlPool, } diff --git a/crates/zap-stream/Cargo.toml b/crates/zap-stream/Cargo.toml index 49baf8c..b64f042 100644 --- a/crates/zap-stream/Cargo.toml +++ b/crates/zap-stream/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [features] -default = ["srt", "rtmp"] +default = ["srt", "rtmp", "test-pattern"] srt = ["zap-stream-core/srt"] rtmp = ["zap-stream-core/rtmp"] test-pattern = ["zap-stream-core/test-pattern", "zap-stream-db/test-pattern"] diff --git a/crates/zap-stream/config.yaml b/crates/zap-stream/config.yaml index e88e4a3..9c5e042 100755 --- a/crates/zap-stream/config.yaml +++ b/crates/zap-stream/config.yaml @@ -5,6 +5,10 @@ endpoints: - "rtmp://127.0.0.1:3336" - "srt://127.0.0.1:3335" - "tcp://127.0.0.1:3334" + - "test-pattern://" + +# Public hostname which points to the IP address used to listen for all [endpoints] +endpoints_public_hostname: "localhost" # Output directory for recording / hls output_dir: "./out" @@ -40,8 +44,8 @@ overseer: zap-stream: cost: 16 nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" - blossom: - - "http://localhost:8881" + #blossom: + # - "http://localhost:8881" relays: - "ws://localhost:7766" database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2" diff --git a/crates/zap-stream/dev-setup/db.sql b/crates/zap-stream/dev-setup/db.sql new file mode 100644 index 0000000..adfe77a --- /dev/null +++ b/crates/zap-stream/dev-setup/db.sql @@ -0,0 +1,2 @@ +create database route96; +create database zap_stream; \ No newline at end of file diff --git a/crates/zap-stream/dev-setup/route96.yaml b/crates/zap-stream/dev-setup/route96.yaml new file mode 100644 index 0000000..e4aba73 --- /dev/null +++ b/crates/zap-stream/dev-setup/route96.yaml @@ -0,0 +1,5 @@ +listen: "0.0.0.0:8000" +database: "mysql://root:root@db:3306/route96" +storage_dir: "./data" +max_upload_bytes: 5e+9 +public_url: "http://localhost:8881" diff --git a/crates/zap-stream/dev-setup/strfry.conf/strfry.conf.default b/crates/zap-stream/dev-setup/strfry.conf similarity index 94% rename from crates/zap-stream/dev-setup/strfry.conf/strfry.conf.default rename to crates/zap-stream/dev-setup/strfry.conf index dc26e4c..6ecffb5 100644 --- a/crates/zap-stream/dev-setup/strfry.conf/strfry.conf.default +++ b/crates/zap-stream/dev-setup/strfry.conf @@ -47,7 +47,7 @@ relay { port = 7777 # Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required) - nofiles = 1000000 + nofiles = 0 # HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case) realIpHeader = "" @@ -64,6 +64,12 @@ relay { # NIP-11: Alternative administrative contact (email, website, etc) contact = "" + + # NIP-11: URL pointing to an image to be used as an icon for the relay + icon = "" + + # List of supported lists as JSON array, or empty string to use default. Example: "[1,2]" + nips = "" } # Maximum accepted incoming websocket frame size (should be larger than max event) (restart required) @@ -86,7 +92,7 @@ relay { writePolicy { # If non-empty, path to an executable script that implements the writePolicy plugin logic - plugin = "/app/write-policy.py" + plugin = "" } compression { @@ -135,4 +141,4 @@ relay { # Maximum records that sync will process before returning an error maxSyncEvents = 1000000 } -} +} \ No newline at end of file diff --git a/crates/zap-stream/docker-compose.yml b/crates/zap-stream/docker-compose.yml index 87a08da..5620f92 100644 --- a/crates/zap-stream/docker-compose.yml +++ b/crates/zap-stream/docker-compose.yml @@ -18,14 +18,14 @@ services: blossom: depends_on: - db - image: voidic/route96 + image: voidic/route96:latest environment: - "RUST_LOG=info" ports: - "8881:8000" volumes: - "blossom:/app/data" - - "./dev-setup/route96.toml:/app/config.toml" + - "./dev-setup/route96.yaml:/app/config.yaml" volumes: db: blossom: diff --git a/crates/zap-stream/src/api.rs b/crates/zap-stream/src/api.rs new file mode 100644 index 0000000..9229960 --- /dev/null +++ b/crates/zap-stream/src/api.rs @@ -0,0 +1,192 @@ +use crate::http::check_nip98_auth; +use crate::settings::Settings; +use crate::ListenerEndpoint; +use anyhow::{anyhow, bail, Result}; +use bytes::Bytes; +use fedimint_tonic_lnd::tonic::codegen::Body; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::Incoming; +use hyper::{Method, Request, Response}; +use nostr_sdk::{serde_json, Event, PublicKey}; +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +use std::str::FromStr; +use url::Url; +use zap_stream_db::ZapStreamDb; + +#[derive(Clone)] +pub struct Api { + db: ZapStreamDb, + settings: Settings, +} + +impl Api { + pub fn new(db: ZapStreamDb, settings: Settings) -> Self { + Self { db, settings } + } + + pub async fn handler( + self, + req: Request, + ) -> Result>, anyhow::Error> { + let base = Response::builder() + .header("server", "zap-stream") + .header("access-control-allow-origin", "*") + .header("access-control-allow-headers", "*") + .header("access-control-allow-methods", "HEAD, GET"); + + Ok(match (req.method(), req.uri().path()) { + (&Method::GET, "/api/v1/account") => { + let auth = check_nip98_auth(&req)?; + let rsp = self.get_account(&auth.pubkey).await?; + return Ok(base.body(Self::body_json(&rsp)?)?); + } + (&Method::PATCH, "/api/v1/account") => { + let auth = check_nip98_auth(&req)?; + let body = req.collect().await?.to_bytes(); + let r_body: PatchAccount = serde_json::from_slice(&body)?; + let rsp = self.update_account(&auth.pubkey, r_body).await?; + return Ok(base.body(Self::body_json(&rsp)?)?); + } + (&Method::GET, "/api/v1/topup") => { + let auth = check_nip98_auth(&req)?; + let url: Url = req.uri().to_string().parse()?; + let amount: usize = url + .query_pairs() + .find_map(|(k, v)| if k == "amount" { Some(v) } else { None }) + .and_then(|v| v.parse().ok()) + .ok_or(anyhow!("Missing amount"))?; + let rsp = self.topup(&auth.pubkey, amount).await?; + return Ok(base.body(Self::body_json(&rsp)?)?); + } + (&Method::PATCH, "/api/v1/event") => { + bail!("Not implemented") + } + (&Method::POST, "/api/v1/withdraw") => { + bail!("Not implemented") + } + (&Method::POST, "/api/v1/account/forward") => { + bail!("Not implemented") + } + (&Method::DELETE, "/api/v1/account/forward/") => { + bail!("Not implemented") + } + (&Method::GET, "/api/v1/account/history") => { + bail!("Not implemented") + } + (&Method::GET, "/api/v1/account/keys") => { + bail!("Not implemented") + } + _ => { + if req.method() == Method::OPTIONS { + base.body(Default::default())? + } else { + base.status(404).body(Default::default())? + } + } + }) + } + + fn body_json(obj: &T) -> Result> { + Ok(Full::from(serde_json::to_string(obj)?) + .map_err(|e| match e {}) + .boxed()) + } + + async fn get_account(&self, pubkey: &PublicKey) -> Result { + let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; + let user = self.db.get_user(uid).await?; + + Ok(AccountInfo { + endpoints: self + .settings + .endpoints + .iter() + .filter_map(|e| match ListenerEndpoint::from_str(&e).ok()? { + ListenerEndpoint::SRT { endpoint } => { + let addr: SocketAddr = endpoint.parse().ok()?; + Some(Endpoint { + name: "SRT".to_string(), + url: format!("srt://{}:{}", self.settings.endpoints_public_hostname, addr.port()), + key: user.stream_key.clone(), + capabilities: vec![], + }) + } + ListenerEndpoint::RTMP { endpoint } => { + let addr: SocketAddr = endpoint.parse().ok()?; + Some(Endpoint { + name: "RTMP".to_string(), + url: format!("rtmp://{}:{}", self.settings.endpoints_public_hostname, addr.port()), + key: user.stream_key.clone(), + capabilities: vec![], + }) + } + ListenerEndpoint::TCP { endpoint } => { + let addr: SocketAddr = endpoint.parse().ok()?; + Some(Endpoint { + name: "TCP".to_string(), + url: format!("tcp://{}:{}", self.settings.endpoints_public_hostname, addr.port()), + key: user.stream_key.clone(), + capabilities: vec![], + }) + } + ListenerEndpoint::File { .. } => None, + ListenerEndpoint::TestPattern => None, + }) + .collect(), + event: None, + balance: user.balance as u64, + tos: AccountTos { + accepted: user.tos_accepted.is_some(), + link: "https://zap.stream/tos".to_string(), + }, + }) + } + + async fn update_account(&self, pubkey: &PublicKey, account: PatchAccount) -> Result<()> { + bail!("Not implemented") + } + + async fn topup(&self, pubkey: &PublicKey, amount: usize) -> Result { + bail!("Not implemented") + } +} + +#[derive(Deserialize, Serialize)] +struct AccountInfo { + pub endpoints: Vec, + pub event: Option, + pub balance: u64, + pub tos: AccountTos, +} + +#[derive(Deserialize, Serialize)] +struct Endpoint { + pub name: String, + pub url: String, + pub key: String, + pub capabilities: Vec, +} + +#[derive(Deserialize, Serialize)] +struct EndpointCost { + pub unit: String, + pub rate: u16, +} + +#[derive(Deserialize, Serialize)] +struct AccountTos { + pub accepted: bool, + pub link: String, +} + +#[derive(Deserialize, Serialize)] +struct PatchAccount { + pub accept_tos: Option, +} + +#[derive(Deserialize, Serialize)] +struct TopupResponse { + pub pr: String, +} diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index f50ca1a..b949596 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -1,3 +1,7 @@ +use crate::api::Api; +use crate::overseer::ZapStreamOverseer; +use anyhow::{bail, Result}; +use base64::Engine; use bytes::Bytes; use futures_util::TryStreamExt; use http_body_util::combinators::BoxBody; @@ -5,7 +9,8 @@ use http_body_util::{BodyExt, Full, StreamBody}; use hyper::body::{Frame, Incoming}; use hyper::service::Service; use hyper::{Method, Request, Response}; -use log::error; +use log::{error, info}; +use nostr_sdk::{serde_json, Event}; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; @@ -13,21 +18,20 @@ use std::sync::Arc; use tokio::fs::File; use tokio_util::io::ReaderStream; use zap_stream_core::overseer::Overseer; -use crate::overseer::ZapStreamOverseer; #[derive(Clone)] pub struct HttpServer { index: String, files_dir: PathBuf, - overseer: Arc, + api: Api, } impl HttpServer { - pub fn new(index: String, files_dir: PathBuf, overseer: Arc) -> Self { + pub fn new(index: String, files_dir: PathBuf, api: Api) -> Self { Self { index, files_dir, - overseer, + api, } } } @@ -81,9 +85,9 @@ impl Service> for HttpServer { } // otherwise handle in overseer - let overseer = self.overseer.clone(); + let mut api = self.api.clone(); Box::pin(async move { - match overseer.api(req).await { + match api.handler(req).await { Ok(res) => Ok(res), Err(e) => { error!("{}", e); @@ -93,3 +97,22 @@ impl Service> for HttpServer { }) } } + +pub fn check_nip98_auth(req: &Request) -> Result { + let auth = if let Some(a) = req.headers().get("authorization") { + a.to_str()? + } else { + bail!("Authorization header missing"); + }; + + if !auth.starts_with("Nostr ") { + bail!("Invalid authorization scheme"); + } + + let json = + String::from_utf8(base64::engine::general_purpose::STANDARD.decode(auth[6..].as_bytes())?)?; + info!("{}", json); + + // TODO: check tags + Ok(serde_json::from_str::(&json)?) +} diff --git a/crates/zap-stream/src/main.rs b/crates/zap-stream/src/main.rs index 2b3677c..10f339b 100644 --- a/crates/zap-stream/src/main.rs +++ b/crates/zap-stream/src/main.rs @@ -8,6 +8,7 @@ use hyper_util::rt::TokioIo; use log::{error, info}; use std::net::SocketAddr; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; @@ -21,13 +22,15 @@ use zap_stream_core::ingress::srt; #[cfg(feature = "test-pattern")] use zap_stream_core::ingress::test; -use zap_stream_core::ingress::{file, tcp}; -use zap_stream_core::overseer::Overseer; +use crate::api::Api; use crate::http::HttpServer; use crate::monitor::BackgroundMonitor; use crate::overseer::ZapStreamOverseer; use crate::settings::Settings; +use zap_stream_core::ingress::{file, tcp}; +use zap_stream_core::overseer::Overseer; +mod api; mod blossom; mod http; mod monitor; @@ -56,6 +59,7 @@ async fn main() -> Result<()> { let settings: Settings = builder.try_deserialize()?; let overseer = settings.get_overseer().await?; + // Create ingress listeners let mut tasks = vec![]; for e in &settings.endpoints { match try_create_listener(e, &settings.output_dir, &overseer) { @@ -67,10 +71,12 @@ async fn main() -> Result<()> { let http_addr: SocketAddr = settings.listen_http.parse()?; let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); + let api = Api::new(overseer.database(), settings.clone()); + // HTTP server let server = HttpServer::new( index_html, PathBuf::from(settings.output_dir), - overseer.clone(), + api, ); tasks.push(tokio::spawn(async move { let listener = TcpListener::bind(&http_addr).await?; @@ -87,7 +93,7 @@ async fn main() -> Result<()> { } })); - // spawn background job + // Background worker let mut bg = BackgroundMonitor::new(overseer.clone()); tasks.push(tokio::spawn(async move { loop { @@ -98,6 +104,7 @@ async fn main() -> Result<()> { } })); + // Join tasks and get errors for handle in tasks { if let Err(e) = handle.await? { error!("{e}"); @@ -107,37 +114,69 @@ async fn main() -> Result<()> { Ok(()) } +pub enum ListenerEndpoint { + SRT { endpoint: String }, + RTMP { endpoint: String }, + TCP { endpoint: String }, + File { path: PathBuf }, + TestPattern, +} + +impl FromStr for ListenerEndpoint { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::result::Result { + let url: Url = s.parse()?; + match url.scheme() { + "srt" => Ok(Self::SRT { + endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + }), + "rtmp" => Ok(Self::RTMP { + endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + }), + "tcp" => Ok(Self::TCP { + endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + }), + "file" => Ok(Self::File { + path: PathBuf::from(url.path()), + }), + "test-pattern" => Ok(Self::TestPattern), + _ => bail!("Unsupported endpoint scheme: {}", url.scheme()), + } + } +} + fn try_create_listener( u: &str, out_dir: &str, overseer: &Arc, ) -> Result>> { - let url: Url = u.parse()?; - match url.scheme() { + let ep = ListenerEndpoint::from_str(u)?; + match ep { #[cfg(feature = "srt")] - "srt" => Ok(tokio::spawn(srt::listen( + ListenerEndpoint::SRT { endpoint } => Ok(tokio::spawn(srt::listen( out_dir.to_string(), - format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + endpoint, overseer.clone(), ))), #[cfg(feature = "rtmp")] - "rtmp" => Ok(tokio::spawn(rtmp::listen( + ListenerEndpoint::RTMP { endpoint } => Ok(tokio::spawn(rtmp::listen( out_dir.to_string(), - format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + endpoint, overseer.clone(), ))), - "tcp" => Ok(tokio::spawn(tcp::listen( + ListenerEndpoint::TCP { endpoint } => Ok(tokio::spawn(tcp::listen( out_dir.to_string(), - format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + endpoint, overseer.clone(), ))), - "file" => Ok(tokio::spawn(file::listen( + ListenerEndpoint::File { path } => Ok(tokio::spawn(file::listen( out_dir.to_string(), - PathBuf::from(url.path()), + path, overseer.clone(), ))), #[cfg(feature = "test-pattern")] - "test-pattern" => Ok(tokio::spawn(test::listen( + ListenerEndpoint::TestPattern => Ok(tokio::spawn(test::listen( out_dir.to_string(), overseer.clone(), ))), diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 232da9e..edfa040 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -1,10 +1,5 @@ use crate::blossom::{BlobDescriptor, Blossom}; -use zap_stream_core::egress::hls::HlsEgress; -use zap_stream_core::egress::EgressConfig; -use zap_stream_core::ingress::ConnectionInfo; -use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer}; -use zap_stream_core::pipeline::{EgressType, PipelineConfig}; -use zap_stream_core::variant::{StreamMapping, VariantStream}; +use crate::settings::LndSettings; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use base64::alphabet::STANDARD; @@ -14,6 +9,7 @@ use chrono::Utc; use fedimint_tonic_lnd::verrpc::VersionRequest; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::Encoder; use futures_util::FutureExt; use http_body_util::combinators::BoxBody; @@ -31,16 +27,20 @@ use std::fs::create_dir_all; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use tokio::sync::RwLock; use url::Url; use uuid::Uuid; +use zap_stream_core::egress::hls::HlsEgress; +use zap_stream_core::egress::EgressConfig; +use zap_stream_core::ingress::ConnectionInfo; +use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer}; +use zap_stream_core::pipeline::{EgressType, PipelineConfig}; use zap_stream_core::variant::audio::AudioVariant; use zap_stream_core::variant::mapping::VariantMapping; use zap_stream_core::variant::video::VideoVariant; +use zap_stream_core::variant::{StreamMapping, VariantStream}; use zap_stream_db::sqlx::Encode; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; -use crate::settings::LndSettings; const STREAM_EVENT_KIND: u16 = 30_313; @@ -100,7 +100,7 @@ impl ZapStreamOverseer { PathBuf::from(&lnd.cert), PathBuf::from(&lnd.macaroon), ) - .await?; + .await?; let version = lnd .versioner() @@ -133,50 +133,8 @@ impl ZapStreamOverseer { }) } - pub(crate) async fn api(&self, req: Request) -> Result>> { - let base = Response::builder() - .header("server", "zap-stream-core") - .header("access-control-allow-origin", "*") - .header("access-control-allow-headers", "*") - .header("access-control-allow-methods", "HEAD, GET"); - - Ok(match (req.method(), req.uri().path()) { - (&Method::GET, "/api/v1/account") => { - self.check_nip98_auth(req)?; - base.body(Default::default())? - } - (&Method::PATCH, "/api/v1/account") => { - bail!("Not implemented") - } - (&Method::GET, "/api/v1/topup") => { - bail!("Not implemented") - } - (&Method::PATCH, "/api/v1/event") => { - bail!("Not implemented") - } - (&Method::POST, "/api/v1/withdraw") => { - bail!("Not implemented") - } - (&Method::POST, "/api/v1/account/forward") => { - bail!("Not implemented") - } - (&Method::DELETE, "/api/v1/account/forward/") => { - bail!("Not implemented") - } - (&Method::GET, "/api/v1/account/history") => { - bail!("Not implemented") - } - (&Method::GET, "/api/v1/account/keys") => { - bail!("Not implemented") - } - _ => { - if req.method() == Method::OPTIONS { - base.body(Default::default())? - } else { - base.status(404).body(Default::default())? - } - } - }) + pub(crate) fn database(&self) -> ZapStreamDb { + self.db.clone() } fn stream_to_event_builder(&self, stream: &UserStream) -> Result { @@ -280,25 +238,6 @@ impl ZapStreamOverseer { let u: Url = self.public_url.parse()?; Ok(u.join(path)?.to_string()) } - - fn check_nip98_auth(&self, req: Request) -> Result<()> { - let auth = if let Some(a) = req.headers().get("authorization") { - a.to_str()? - } else { - bail!("Authorization header missing"); - }; - - if !auth.starts_with("Nostr ") { - bail!("Invalid authorization scheme"); - } - - let json = String::from_utf8( - base64::engine::general_purpose::STANDARD.decode(auth[6..].as_bytes())?, - )?; - info!("{}", json); - - Ok(()) - } } #[derive(Serialize)] @@ -459,7 +398,6 @@ impl Overseer for ZapStreamOverseer { } } - fn get_default_variants(info: &IngressInfo) -> Result> { let mut vars: Vec = vec![]; if let Some(video_src) = info diff --git a/crates/zap-stream/src/settings.rs b/crates/zap-stream/src/settings.rs index 003af15..6e2ac79 100644 --- a/crates/zap-stream/src/settings.rs +++ b/crates/zap-stream/src/settings.rs @@ -1,7 +1,6 @@ use crate::overseer::ZapStreamOverseer; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use zap_stream_core::overseer::Overseer; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Settings { @@ -12,6 +11,9 @@ pub struct Settings { /// - rtmp://localhost:1935 pub endpoints: Vec, + /// Public facing hostname that maps to [endpoints] + pub endpoints_public_hostname: String, + /// Where to store output (static files) pub output_dir: String, @@ -21,7 +23,7 @@ pub struct Settings { /// Binding address for http server serving files from [output_dir] pub listen_http: String, - /// Overseer service see [crate::overseer::Overseer] for more info + /// Overseer service see [Overseer] for more info pub overseer: OverseerConfig, }