From 54bebc51701d0724ce21074ab7d9e1f85132fe41 Mon Sep 17 00:00:00 2001 From: kieran Date: Fri, 22 Nov 2024 16:54:32 +0000 Subject: [PATCH] feat: RTMP input (WIP) --- Cargo.lock | 126 +++++++++++++++---- Cargo.toml | 21 +++- TODO.md | 4 +- config.yaml | 5 +- src/bin/zap_stream_core.rs | 36 +++++- src/egress/http.rs | 14 --- src/egress/mod.rs | 1 - src/index.html | 17 +++ src/ingress/file.rs | 10 +- src/ingress/mod.rs | 15 ++- src/ingress/rtmp.rs | 239 +++++++++++++++++++++++++++++++++++++ src/ingress/srt.rs | 17 ++- src/ingress/tcp.rs | 15 ++- src/ingress/test.rs | 10 +- src/overseer/local.rs | 67 +++++++++++ src/overseer/mod.rs | 118 +++++------------- src/overseer/zap_stream.rs | 9 +- src/pipeline/runner.rs | 21 ++-- src/settings.rs | 5 +- 19 files changed, 577 insertions(+), 173 deletions(-) delete mode 100644 src/egress/http.rs create mode 100644 src/index.html create mode 100644 src/ingress/rtmp.rs create mode 100644 src/overseer/local.rs diff --git a/Cargo.lock b/Cargo.lock index 7acb312..81d2534 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,6 +473,15 @@ dependencies = [ "serde", ] +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -843,6 +852,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto-mac" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "ctr" version = "0.9.2" @@ -888,13 +907,22 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", + "block-buffer 0.10.4", "const-oid", "crypto-common", "subtle", @@ -1022,7 +1050,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=c2ae78acbcbe315137aea94c77b0db7ea538a709#c2ae78acbcbe315137aea94c77b0db7ea538a709" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0#8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" dependencies = [ "anyhow", "ffmpeg-sys-the-third", @@ -1034,7 +1062,7 @@ dependencies = [ [[package]] name = "ffmpeg-sys-the-third" version = "2.1.0+ffmpeg-7.1" -source = "git+https://git.v0l.io/ffmpeg/ffmpeg-the-third.git?branch=master#0fdfa9ab506f5c92aad5a175db081c8a2c1579a1" +source = "git+https://git.v0l.io/Kieran/ffmpeg-the-third.git?rev=e5f8e077b04b10d5887bce4df1eb1a71738a6c66#e5f8e077b04b10d5887bce4df1eb1a71738a6c66" dependencies = [ "bindgen", "cc", @@ -1451,7 +1479,17 @@ version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" dependencies = [ - "hmac", + "hmac 0.12.1", +] + +[[package]] +name = "hmac" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" +dependencies = [ + "crypto-mac", + "digest 0.9.0", ] [[package]] @@ -1460,7 +1498,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -1966,7 +2004,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ "cfg-if", - "digest", + "digest 0.10.7", ] [[package]] @@ -2390,8 +2428,8 @@ version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" dependencies = [ - "digest", - "hmac", + "digest 0.10.7", + "hmac 0.12.1", ] [[package]] @@ -2451,7 +2489,7 @@ checksum = "934cd7631c050f4674352a6e835d5f6711ffbfb9345c2fc0107155ac495ae293" dependencies = [ "once_cell", "pest", - "sha2", + "sha2 0.10.8", ] [[package]] @@ -2884,6 +2922,31 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "rml_amf0" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63551cfcd4d1f42733c190e4b58dd268b1eacb73410d9afbf62784aa12cac240" +dependencies = [ + "byteorder", + "thiserror 1.0.57", +] + +[[package]] +name = "rml_rtmp" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a354e80eb7aa2a6fed09b3bd25c19bcfd32cf51f81f1219f4ec04f34519989da" +dependencies = [ + "byteorder", + "bytes", + "hmac 0.10.1", + "rand", + "rml_amf0", + "sha2 0.9.9", + "thiserror 1.0.57", +] + [[package]] name = "ron" version = "0.8.1" @@ -2909,7 +2972,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" dependencies = [ "const-oid", - "digest", + "digest 0.10.7", "num-bigint-dig", "num-integer", "num-traits", @@ -3115,7 +3178,7 @@ dependencies = [ "password-hash", "pbkdf2", "salsa20", - "sha2", + "sha2 0.10.8", ] [[package]] @@ -3239,7 +3302,7 @@ checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", ] [[package]] @@ -3250,7 +3313,20 @@ checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", +] + +[[package]] +name = "sha2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", ] [[package]] @@ -3261,7 +3337,7 @@ checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", ] [[package]] @@ -3276,7 +3352,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ - "digest", + "digest 0.10.7", "rand_core", ] @@ -3422,7 +3498,7 @@ dependencies = [ "percent-encoding", "serde", "serde_json", - "sha2", + "sha2 0.10.8", "smallvec", "sqlformat", "thiserror 1.0.57", @@ -3460,7 +3536,7 @@ dependencies = [ "quote", "serde", "serde_json", - "sha2", + "sha2 0.10.8", "sqlx-core", "sqlx-mysql", "sqlx-postgres", @@ -3484,7 +3560,7 @@ dependencies = [ "bytes", "chrono", "crc", - "digest", + "digest 0.10.7", "dotenvy", "either", "futures-channel", @@ -3494,7 +3570,7 @@ dependencies = [ "generic-array", "hex", "hkdf", - "hmac", + "hmac 0.12.1", "itoa", "log", "md-5", @@ -3505,7 +3581,7 @@ dependencies = [ "rsa", "serde", "sha1", - "sha2", + "sha2 0.10.8", "smallvec", "sqlx-core", "stringprep", @@ -3534,7 +3610,7 @@ dependencies = [ "futures-util", "hex", "hkdf", - "hmac", + "hmac 0.12.1", "home", "itoa", "log", @@ -3544,7 +3620,7 @@ dependencies = [ "rand", "serde", "serde_json", - "sha2", + "sha2 0.10.8", "smallvec", "sqlx-core", "stringprep", @@ -3592,7 +3668,7 @@ dependencies = [ "ctr", "derive_more", "hex", - "hmac", + "hmac 0.12.1", "keyed_priority_queue", "log", "pbkdf2", @@ -4777,6 +4853,7 @@ dependencies = [ "anyhow", "async-trait", "base64 0.22.1", + "bytes", "chrono", "clap", "config", @@ -4795,8 +4872,9 @@ dependencies = [ "reqwest", "resvg", "ringbuf", + "rml_rtmp", "serde", - "sha2", + "sha2 0.10.8", "srt-tokio", "tiny-skia", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 0a86d96..7a1e292 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,11 @@ name = "zap-stream-core" path = "src/bin/zap_stream_core.rs" [features] -default = ["test-pattern", "srt"] +default = ["test-pattern", "srt", "rtmp"] srt = ["dep:srt-tokio"] +rtmp = ["dep:rml_rtmp"] +local-overseer = [] # WIP +webhook-overseer = [] # WIP zap-stream = [ "dep:nostr-sdk", "dep:zap-stream-db", @@ -19,10 +22,17 @@ zap-stream = [ "dep:sha2", "tokio/fs", ] -test-pattern = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf", "zap-stream-db/test-pattern"] +test-pattern = [ + "dep:resvg", + "dep:usvg", + "dep:tiny-skia", + "dep:fontdue", + "dep:ringbuf", + "zap-stream-db/test-pattern" +] [dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "c2ae78acbcbe315137aea94c77b0db7ea538a709" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } pretty_env_logger = "0.5.0" @@ -46,6 +56,9 @@ hex = "0.4.3" # srt srt-tokio = { version = "0.4.3", optional = true } +# rtmp +rml_rtmp = { version = "0.8.0", optional = true } + # test-pattern resvg = { version = "0.44.0", optional = true } usvg = { version = "0.44.0", optional = true } @@ -60,3 +73,5 @@ fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = fa reqwest = { version = "0.12.9", optional = true, features = ["stream"] } base64 = { version = "0.22.1", optional = true } sha2 = { version = "0.10.8", optional = true } +bytes = "1.8.0" + diff --git a/TODO.md b/TODO.md index b491287..cb0f270 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,5 @@ -- Store user preference for (rates and recording) [DB] -- Setup multi-variant output -- Manage event lifecycle (close stream) - RTMP? +- Setup multi-variant output - API parity - fMP4 instead of MPEG-TS segments - HLS-LL \ No newline at end of file diff --git a/config.yaml b/config.yaml index d3e225a..370ff3c 100755 --- a/config.yaml +++ b/config.yaml @@ -2,6 +2,7 @@ # currently supporting srt/tcp/file/test-pattern # All the endpoints must be valid URI's endpoints: + - "rtmp://127.0.0.1:3336" - "srt://127.0.0.1:3335" - "tcp://127.0.0.1:3334" @@ -38,8 +39,8 @@ listen_http: "127.0.0.1:8080" overseer: zap-stream: nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" - blossom: - - "http://localhost:8881" + #blossom: + # - "http://localhost:8881" relays: - "ws://localhost:7766" database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2" diff --git a/src/bin/zap_stream_core.rs b/src/bin/zap_stream_core.rs index 019ab94..58f871a 100644 --- a/src/bin/zap_stream_core.rs +++ b/src/bin/zap_stream_core.rs @@ -4,12 +4,14 @@ use config::Config; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info}; use ffmpeg_rs_raw::{av_log_redirect, rstr}; use log::{error, info}; +use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::task::JoinHandle; use url::Url; - -use zap_stream_core::egress::http::listen_out_dir; +use warp::{cors, Filter}; +#[cfg(feature = "rtmp")] +use zap_stream_core::ingress::rtmp; #[cfg(feature = "srt")] use zap_stream_core::ingress::srt; #[cfg(feature = "test-pattern")] @@ -48,10 +50,26 @@ async fn main() -> Result<()> { Err(e) => error!("{}", e), } } - listeners.push(tokio::spawn(listen_out_dir( - settings.listen_http, - settings.output_dir, - ))); + + let http_addr: SocketAddr = settings.listen_http.parse()?; + let http_dir = settings.output_dir.clone(); + let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url); + + listeners.push(tokio::spawn(async move { + let cors = cors().allow_any_origin().allow_methods(vec!["GET"]); + + let index_handle = warp::get() + .or(warp::path("index.html")) + .and(warp::path::end()) + .map(move |_| warp::reply::html(index_html.clone())); + + let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors); + + warp::serve(index_handle.or(dir_handle)) + .run(http_addr) + .await; + Ok(()) + })); for handle in listeners { if let Err(e) = handle.await? { @@ -75,6 +93,12 @@ fn try_create_listener( format!("{}:{}", url.host().unwrap(), url.port().unwrap()), overseer.clone(), ))), + #[cfg(feature = "srt")] + "rtmp" => Ok(tokio::spawn(rtmp::listen( + out_dir.to_string(), + format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + overseer.clone(), + ))), "tcp" => Ok(tokio::spawn(tcp::listen( out_dir.to_string(), format!("{}:{}", url.host().unwrap(), url.port().unwrap()), diff --git a/src/egress/http.rs b/src/egress/http.rs deleted file mode 100644 index 10937fe..0000000 --- a/src/egress/http.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::net::SocketAddr; - -use anyhow::Error; -use warp::{cors, Filter}; - -pub async fn listen_out_dir(addr: String, dir: String) -> Result<(), Error> { - let addr: SocketAddr = addr.parse()?; - let cors = cors().allow_any_origin().allow_methods(vec!["GET"]); - - let warp_out = warp::get().and(warp::fs::dir(dir)).with(cors); - - warp::serve(warp_out).run(addr).await; - Ok(()) -} diff --git a/src/egress/mod.rs b/src/egress/mod.rs index 8fcf0ac..4a5f8eb 100644 --- a/src/egress/mod.rs +++ b/src/egress/mod.rs @@ -6,7 +6,6 @@ use std::path::PathBuf; use uuid::Uuid; pub mod hls; -pub mod http; pub mod recorder; #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/index.html b/src/index.html new file mode 100644 index 0000000..b952b71 --- /dev/null +++ b/src/index.html @@ -0,0 +1,17 @@ + + + + zap-stream-core + + + +

Welcome to %%PUBLIC_URL%%

+ + \ No newline at end of file diff --git a/src/ingress/file.rs b/src/ingress/file.rs index ed4e836..206b7b0 100644 --- a/src/ingress/file.rs +++ b/src/ingress/file.rs @@ -4,6 +4,7 @@ use anyhow::Result; use log::info; use std::path::PathBuf; use std::sync::Arc; +use tokio::runtime::Handle; pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) -> Result<()> { info!("Sending file: {}", path.display()); @@ -11,10 +12,17 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) let info = ConnectionInfo { ip_addr: "127.0.0.1:6969".to_string(), endpoint: "file-input".to_owned(), + app_name: "".to_string(), key: "test".to_string(), }; let file = std::fs::File::open(path)?; - spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(file)).await; + spawn_pipeline( + Handle::current(), + info, + out_dir.clone(), + overseer.clone(), + Box::new(file), + ); Ok(()) } diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 0ad806b..1fe2861 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -7,6 +7,8 @@ use std::sync::Arc; use tokio::runtime::Handle; pub mod file; +#[cfg(feature = "rtmp")] +pub mod rtmp; #[cfg(feature = "srt")] pub mod srt; pub mod tcp; @@ -21,18 +23,21 @@ pub struct ConnectionInfo { /// IP address of the connection pub ip_addr: String, + /// App name, empty unless RTMP ingress + pub app_name: String, + /// Stream key pub key: String, } -pub async fn spawn_pipeline( +pub fn spawn_pipeline( + handle: Handle, info: ConnectionInfo, out_dir: String, seer: Arc, reader: Box, ) { info!("New client connected: {}", &info.ip_addr); - let handle = Handle::current(); let seer = seer.clone(); let out_dir = out_dir.to_string(); std::thread::spawn(move || unsafe { @@ -41,10 +46,16 @@ pub async fn spawn_pipeline( match pl.run() { Ok(c) => { if !c { + if let Err(e) = pl.flush() { + error!("Pipeline flush failed: {}", e); + } break; } } Err(e) => { + if let Err(e) = pl.flush() { + error!("Pipeline flush failed: {}", e); + } error!("Pipeline run failed: {}", e); break; } diff --git a/src/ingress/rtmp.rs b/src/ingress/rtmp.rs new file mode 100644 index 0000000..8297a2c --- /dev/null +++ b/src/ingress/rtmp.rs @@ -0,0 +1,239 @@ +use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::overseer::Overseer; +use anyhow::{bail, Result}; +use log::{error, info}; +use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType}; +use rml_rtmp::sessions::{ + ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult, +}; +use std::collections::VecDeque; +use std::io::{ErrorKind, Read, Write}; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::runtime::Handle; +use tokio::time::Instant; +#[derive(PartialEq, Eq, Clone, Hash)] +struct RtmpPublishedStream(String, String); + +struct RtmpClient { + socket: std::net::TcpStream, + media_buf: Vec, + session: ServerSession, + msg_queue: VecDeque, + reader_buf: [u8; 4096], + pub published_stream: Option, +} + +impl RtmpClient { + async fn start(mut socket: TcpStream) -> Result { + let mut hs = Handshake::new(PeerType::Server); + + let exchange = hs.generate_outbound_p0_and_p1()?; + socket.write_all(&exchange).await?; + + let mut buf = [0; 4096]; + loop { + let r = socket.read(&mut buf).await?; + if r == 0 { + bail!("EOF reached while reading"); + } + + match hs.process_bytes(&buf[..r])? { + HandshakeProcessResult::InProgress { response_bytes } => { + socket.write_all(&response_bytes).await?; + } + HandshakeProcessResult::Completed { + response_bytes, + remaining_bytes, + } => { + socket.write_all(&response_bytes).await?; + + let cfg = ServerSessionConfig::new(); + let (mut ses, mut res) = ServerSession::new(cfg)?; + let q = ses.handle_input(&remaining_bytes)?; + res.extend(q); + + let ret = Self { + socket: socket.into_std()?, + media_buf: vec![], + session: ses, + msg_queue: VecDeque::from(res), + reader_buf: [0; 4096], + published_stream: None, + }; + + return Ok(ret); + } + } + } + } + + /// Read data until we get the publish request + pub fn read_until_publish_request(&mut self, timeout: Duration) -> Result<()> { + let start = Instant::now(); + while self.published_stream.is_none() { + if (Instant::now() - start) > timeout { + bail!("Timed out waiting for publish request"); + } + self.read_data()?; + } + Ok(()) + } + + fn read_data(&mut self) -> Result<()> { + let r = match self.socket.read(&mut self.reader_buf) { + Ok(r) => r, + Err(e) => { + return match e.kind() { + ErrorKind::WouldBlock => Ok(()), + ErrorKind::Interrupted => Ok(()), + _ => Err(anyhow::Error::new(e)), + }; + } + }; + if r == 0 { + bail!("EOF"); + } + + let mx = self.session.handle_input(&self.reader_buf[..r])?; + if mx.len() > 0 { + self.msg_queue.extend(mx); + self.process_msg_queue()?; + } + Ok(()) + } + + fn process_msg_queue(&mut self) -> Result<()> { + while let Some(msg) = self.msg_queue.pop_front() { + match msg { + ServerSessionResult::OutboundResponse(data) => { + self.socket.write_all(&data.bytes)? + } + ServerSessionResult::RaisedEvent(ev) => self.handle_event(ev)?, + ServerSessionResult::UnhandleableMessageReceived(m) => { + // treat any non-flv streams as raw media stream in rtmp + self.media_buf.extend(&m.data); + } + } + } + Ok(()) + } + + fn handle_event(&mut self, event: ServerSessionEvent) -> Result<()> { + match event { + ServerSessionEvent::ClientChunkSizeChanged { new_chunk_size } => { + info!("New client chunk size: {}", new_chunk_size); + } + ServerSessionEvent::ConnectionRequested { request_id, .. } => { + let mx = self.session.accept_request(request_id)?; + self.msg_queue.extend(mx); + } + ServerSessionEvent::ReleaseStreamRequested { .. } => {} + ServerSessionEvent::PublishStreamRequested { + request_id, + app_name, + stream_key, + mode, + } => { + if self.published_stream.is_some() { + let mx = + self.session + .reject_request(request_id, "0", "stream already published")?; + self.msg_queue.extend(mx); + } else { + let mx = self.session.accept_request(request_id)?; + self.msg_queue.extend(mx); + info!( + "Published stream request: {app_name}/{stream_key} [{:?}]", + mode + ); + self.published_stream = Some(RtmpPublishedStream(app_name, stream_key)); + } + } + ServerSessionEvent::PublishStreamFinished { .. } => {} + ServerSessionEvent::StreamMetadataChanged { + app_name, + stream_key, + metadata, + } => { + info!( + "Metadata configured: {}/{} {:?}", + app_name, stream_key, metadata + ); + } + ServerSessionEvent::AudioDataReceived { data, .. } => { + self.media_buf.extend(data); + } + ServerSessionEvent::VideoDataReceived { data, .. } => { + self.media_buf.extend(data); + } + ServerSessionEvent::UnhandleableAmf0Command { .. } => {} + ServerSessionEvent::PlayStreamRequested { request_id, .. } => { + let mx = self + .session + .reject_request(request_id, "0", "playback not supported")?; + self.msg_queue.extend(mx); + } + ServerSessionEvent::PlayStreamFinished { .. } => {} + ServerSessionEvent::AcknowledgementReceived { .. } => {} + ServerSessionEvent::PingResponseReceived { .. } => {} + } + Ok(()) + } +} + +impl Read for RtmpClient { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // block this thread until something comes into [media_buf] + while self.media_buf.len() == 0 { + if let Err(e) = self.read_data() { + error!("Error reading data: {}", e); + return Ok(0); + }; + } + + let to_read = buf.len().min(self.media_buf.len()); + let drain = self.media_buf.drain(..to_read); + buf[..to_read].copy_from_slice(drain.as_slice()); + Ok(to_read) + } +} + +pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { + let listener = TcpListener::bind(&addr).await?; + + info!("RTMP listening on: {}", &addr); + while let Ok((socket, ip)) = listener.accept().await { + let mut cc = RtmpClient::start(socket).await?; + let addr = addr.clone(); + let overseer = overseer.clone(); + let out_dir = out_dir.clone(); + let handle = Handle::current(); + std::thread::Builder::new() + .name("rtmp-client".to_string()) + .spawn(move || { + if let Err(e) = cc.read_until_publish_request(Duration::from_secs(10)) { + error!("{}", e); + return; + } else { + let pr = cc.published_stream.as_ref().unwrap(); + let info = ConnectionInfo { + ip_addr: ip.to_string(), + endpoint: addr.clone(), + app_name: pr.0.clone(), + key: pr.1.clone(), + }; + spawn_pipeline( + handle, + info, + out_dir.clone(), + overseer.clone(), + Box::new(cc), + ); + } + })?; + } + Ok(()) +} diff --git a/src/ingress/srt.rs b/src/ingress/srt.rs index 75e9074..f24d7f6 100644 --- a/src/ingress/srt.rs +++ b/src/ingress/srt.rs @@ -1,18 +1,14 @@ use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; -use crate::pipeline::runner::PipelineRunner; -use crate::settings::Settings; use anyhow::Result; use futures_util::stream::FusedStream; -use futures_util::{SinkExt, StreamExt, TryStreamExt}; -use log::{error, info, warn}; +use futures_util::StreamExt; +use log::info; use srt_tokio::{SrtListener, SrtSocket}; use std::io::Read; use std::net::SocketAddr; use std::sync::Arc; use tokio::runtime::Handle; -use tokio::sync::mpsc::unbounded_channel; -use warp::Buf; pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { let binder: SocketAddr = addr.parse()?; @@ -20,10 +16,11 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) info!("SRT listening on: {}", &addr); while let Some(request) = packets.incoming().next().await { - let mut socket = request.accept(None).await?; + let socket = request.accept(None).await?; let info = ConnectionInfo { endpoint: addr.clone(), ip_addr: socket.settings().remote.to_string(), + app_name: "".to_string(), key: socket .settings() .stream_id @@ -31,6 +28,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) .map_or(String::new(), |s| s.to_string()), }; spawn_pipeline( + Handle::current(), info, out_dir.clone(), overseer.clone(), @@ -39,8 +37,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) socket, buf: Vec::with_capacity(4096), }), - ) - .await; + ); } Ok(()) } @@ -58,7 +55,7 @@ impl Read for SrtReader { if rx.is_terminated() { return Ok(0); } - if let Some((_, mut data)) = self.handle.block_on(rx.next()) { + if let Some((_, data)) = self.handle.block_on(rx.next()) { self.buf.extend(data.iter().as_slice()); } } diff --git a/src/ingress/tcp.rs b/src/ingress/tcp.rs index 4569490..47b014a 100644 --- a/src/ingress/tcp.rs +++ b/src/ingress/tcp.rs @@ -1,10 +1,10 @@ +use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::overseer::Overseer; use anyhow::Result; use log::info; use std::sync::Arc; use tokio::net::TcpListener; - -use crate::ingress::{spawn_pipeline, ConnectionInfo}; -use crate::overseer::Overseer; +use tokio::runtime::Handle; pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { let listener = TcpListener::bind(&addr).await?; @@ -14,10 +14,17 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) let info = ConnectionInfo { ip_addr: ip.to_string(), endpoint: addr.clone(), + app_name: "".to_string(), key: "no-key-tcp".to_string(), }; let socket = socket.into_std()?; - spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await; + spawn_pipeline( + Handle::current(), + info, + out_dir.clone(), + overseer.clone(), + Box::new(socket), + ); } Ok(()) } diff --git a/src/ingress/test.rs b/src/ingress/test.rs index 9502a3c..469071f 100644 --- a/src/ingress/test.rs +++ b/src/ingress/test.rs @@ -17,6 +17,7 @@ use std::io::Read; use std::sync::Arc; use std::time::{Duration, Instant}; use tiny_skia::Pixmap; +use tokio::runtime::Handle; pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> { info!("Test pattern enabled"); @@ -24,10 +25,17 @@ pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> let info = ConnectionInfo { endpoint: "test-pattern".to_string(), ip_addr: "test-pattern".to_string(), + app_name: "".to_string(), key: "test".to_string(), }; let src = TestPatternSrc::new()?; - spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(src)).await; + spawn_pipeline( + Handle::current(), + info, + out_dir.clone(), + overseer.clone(), + Box::new(src), + ); Ok(()) } diff --git a/src/overseer/local.rs b/src/overseer/local.rs new file mode 100644 index 0000000..d9f54b3 --- /dev/null +++ b/src/overseer/local.rs @@ -0,0 +1,67 @@ +use crate::egress::EgressConfig; +use crate::ingress::ConnectionInfo; +use crate::overseer::{get_default_variants, IngressInfo, Overseer}; +use crate::pipeline::{EgressType, PipelineConfig}; +use crate::variant::StreamMapping; +use anyhow::Result; +use async_trait::async_trait; +use std::path::PathBuf; +use uuid::Uuid; + +/// Simple static file output without any access controls +/// Useful for testing or self-hosting +pub struct LocalOverseer; + +impl LocalOverseer { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl Overseer for LocalOverseer { + async fn start_stream( + &self, + _connection: &ConnectionInfo, + stream_info: &IngressInfo, + ) -> Result { + let vars = get_default_variants(stream_info)?; + let var_ids = vars.iter().map(|v| v.id()).collect(); + Ok(PipelineConfig { + id: Uuid::new_v4(), + variants: vars, + egress: vec![EgressType::HLS(EgressConfig { + name: "HLS".to_owned(), + variants: var_ids, + })], + }) + } + + async fn on_segment( + &self, + pipeline_id: &Uuid, + variant_id: &Uuid, + index: u64, + duration: f32, + path: &PathBuf, + ) -> Result<()> { + // nothing to do here + Ok(()) + } + + async fn on_thumbnail( + &self, + pipeline_id: &Uuid, + width: usize, + height: usize, + path: &PathBuf, + ) -> Result<()> { + // nothing to do here + Ok(()) + } + + async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> { + // nothing to do here + Ok(()) + } +} diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs index 8a0aa6a..2b3cf0e 100644 --- a/src/overseer/mod.rs +++ b/src/overseer/mod.rs @@ -1,14 +1,15 @@ -use crate::egress::EgressConfig; use crate::ingress::ConnectionInfo; + +#[cfg(feature = "local-overseer")] +use crate::overseer::local::LocalOverseer; +#[cfg(feature = "webhook-overseer")] use crate::overseer::webhook::WebhookOverseer; -#[cfg(feature = "zap-stream")] -use crate::overseer::zap_stream::ZapStreamOverseer; -use crate::pipeline::{EgressType, PipelineConfig}; -use crate::settings::{OverseerConfig, Settings}; +use crate::pipeline::PipelineConfig; +use crate::settings::Settings; use crate::variant::audio::AudioVariant; use crate::variant::mapping::VariantMapping; use crate::variant::video::VideoVariant; -use crate::variant::{StreamMapping, VariantStream}; +use crate::variant::VariantStream; use anyhow::Result; use async_trait::async_trait; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; @@ -16,8 +17,14 @@ use std::cmp::PartialEq; use std::path::PathBuf; use std::sync::Arc; use uuid::Uuid; -use warp::Filter; +#[cfg(feature = "zap-stream")] +use crate::overseer::zap_stream::ZapStreamOverseer; + +#[cfg(feature = "local-overseer")] +mod local; + +#[cfg(feature = "webhook-overseer")] mod webhook; #[cfg(feature = "zap-stream")] @@ -90,34 +97,31 @@ pub trait Overseer: Send + Sync { impl Settings { pub async fn get_overseer(&self) -> Result> { match &self.overseer { - OverseerConfig::Static { egress_types } => Ok(Arc::new(StaticOverseer::new( - &self.output_dir, - egress_types, - ))), + #[cfg(feature = "local-overseer")] + OverseerConfig::Local => Ok(Arc::new(LocalOverseer::new())), + #[cfg(feature = "webhook-overseer")] OverseerConfig::Webhook { url } => Ok(Arc::new(WebhookOverseer::new(&url))), + #[cfg(feature = "zap-stream")] OverseerConfig::ZapStream { nsec: private_key, database, lnd, relays, blossom, - } => { - #[cfg(not(feature = "zap-stream"))] - panic!("zap.stream overseer is not enabled"); - - #[cfg(feature = "zap-stream")] - Ok(Arc::new( - ZapStreamOverseer::new( - &self.output_dir, - &self.public_url, - private_key, - database, - lnd, - relays, - blossom, - ) - .await?, - )) + } => Ok(Arc::new( + ZapStreamOverseer::new( + &self.output_dir, + &self.public_url, + private_key, + database, + lnd, + relays, + blossom, + ) + .await?, + )), + _ => { + panic!("Unsupported overseer"); } } } @@ -183,61 +187,3 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result) -> Self { - Self {} - } -} - -#[async_trait] -impl Overseer for StaticOverseer { - async fn start_stream( - &self, - _connection: &ConnectionInfo, - stream_info: &IngressInfo, - ) -> Result { - let vars = get_default_variants(stream_info)?; - let var_ids = vars.iter().map(|v| v.id()).collect(); - Ok(PipelineConfig { - id: Uuid::new_v4(), - variants: vars, - egress: vec![EgressType::HLS(EgressConfig { - name: "HLS".to_owned(), - variants: var_ids, - })], - }) - } - - async fn on_segment( - &self, - pipeline_id: &Uuid, - variant_id: &Uuid, - index: u64, - duration: f32, - path: &PathBuf, - ) -> Result<()> { - // nothing to do here - Ok(()) - } - - async fn on_thumbnail( - &self, - pipeline_id: &Uuid, - width: usize, - height: usize, - path: &PathBuf, - ) -> Result<()> { - // nothing to do here - Ok(()) - } - - async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> { - - // nothing to do here - Ok(()) - } -} diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs index 8c97dc2..cd33ec2 100644 --- a/src/overseer/zap_stream.rs +++ b/src/overseer/zap_stream.rs @@ -14,7 +14,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame; use ffmpeg_rs_raw::Encoder; use futures_util::FutureExt; -use log::info; +use log::{info, warn}; use nostr_sdk::bitcoin::PrivateKey; use nostr_sdk::prelude::Coordinate; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; @@ -276,7 +276,12 @@ impl Overseer for ZapStreamOverseer { n94 = n94.add_tags(Tag::parse(&["url", &b.url])); } let n94 = n94.sign_with_keys(&self.keys)?; - self.client.send_event(n94).await?; + let cc = self.client.clone(); + tokio::spawn(async move { + if let Err(e) = cc.send_event(n94).await { + warn!("Error sending event: {}", e); + } + }); info!("Published N94 segment to {}", blob.url); } diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index b87beac..b7c5801 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -20,8 +20,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_free, av_get_sample_fmt, av_packet_free, av_pkt_dump_log2, av_q2d, av_rescale_q, - AVMediaType, + av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVMediaType, }; use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, @@ -106,7 +105,7 @@ impl PipelineRunner { } /// EOF, cleanup - unsafe fn flush(&mut self) -> Result<()> { + pub unsafe fn flush(&mut self) -> Result<()> { for (var, enc) in &mut self.encoders { for mut pkt in enc.encode_frame(ptr::null_mut())? { for eg in self.egress.iter_mut() { @@ -118,6 +117,14 @@ impl PipelineRunner { for eg in self.egress.iter_mut() { eg.reset()?; } + + if let Some(config) = &self.config { + self.handle.block_on(async { + if let Err(e) = self.overseer.on_end(&config.id).await { + error!("Failed to end stream: {e}"); + } + }); + } Ok(()) } @@ -135,12 +142,6 @@ impl PipelineRunner { // run transcoder pipeline let (mut pkt, stream) = self.demuxer.get_packet()?; if pkt.is_null() { - self.handle.block_on(async { - if let Err(e) = self.overseer.on_end(&config.id).await { - error!("Failed to end stream: {e}"); - } - }); - self.flush()?; return Ok(false); } @@ -227,7 +228,7 @@ impl PipelineRunner { if let Some((r, f)) = self.resampler.get_mut(&a.id()) { let frame_size = (*enc.codec_context()).frame_size; new_frame = true; - let mut resampled_frame = r.process_frame(frame, frame_size)?; + let mut resampled_frame = r.process_frame(frame)?; if let Some(ret) = f.buffer_frame(resampled_frame, frame_size as usize)? { diff --git a/src/settings.rs b/src/settings.rs index e1d9173..044bb36 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -26,10 +26,7 @@ pub struct Settings { #[serde(rename_all = "kebab-case")] pub enum OverseerConfig { /// Static output - Static { - /// Types of output - egress_types: Vec, - }, + Local, /// Control system via external API Webhook { /// Webhook service URL