diff --git a/Cargo.lock b/Cargo.lock
index 7acb312..81d2534 100755
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -473,6 +473,15 @@ dependencies = [
"serde",
]
+[[package]]
+name = "block-buffer"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
+dependencies = [
+ "generic-array",
+]
+
[[package]]
name = "block-buffer"
version = "0.10.4"
@@ -843,6 +852,16 @@ dependencies = [
"typenum",
]
+[[package]]
+name = "crypto-mac"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6"
+dependencies = [
+ "generic-array",
+ "subtle",
+]
+
[[package]]
name = "ctr"
version = "0.9.2"
@@ -888,13 +907,22 @@ dependencies = [
"syn 1.0.109",
]
+[[package]]
+name = "digest"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
+dependencies = [
+ "generic-array",
+]
+
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
- "block-buffer",
+ "block-buffer 0.10.4",
"const-oid",
"crypto-common",
"subtle",
@@ -1022,7 +1050,7 @@ dependencies = [
[[package]]
name = "ffmpeg-rs-raw"
version = "0.1.0"
-source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=c2ae78acbcbe315137aea94c77b0db7ea538a709#c2ae78acbcbe315137aea94c77b0db7ea538a709"
+source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0#8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0"
dependencies = [
"anyhow",
"ffmpeg-sys-the-third",
@@ -1034,7 +1062,7 @@ dependencies = [
[[package]]
name = "ffmpeg-sys-the-third"
version = "2.1.0+ffmpeg-7.1"
-source = "git+https://git.v0l.io/ffmpeg/ffmpeg-the-third.git?branch=master#0fdfa9ab506f5c92aad5a175db081c8a2c1579a1"
+source = "git+https://git.v0l.io/Kieran/ffmpeg-the-third.git?rev=e5f8e077b04b10d5887bce4df1eb1a71738a6c66#e5f8e077b04b10d5887bce4df1eb1a71738a6c66"
dependencies = [
"bindgen",
"cc",
@@ -1451,7 +1479,17 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7"
dependencies = [
- "hmac",
+ "hmac 0.12.1",
+]
+
+[[package]]
+name = "hmac"
+version = "0.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15"
+dependencies = [
+ "crypto-mac",
+ "digest 0.9.0",
]
[[package]]
@@ -1460,7 +1498,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
- "digest",
+ "digest 0.10.7",
]
[[package]]
@@ -1966,7 +2004,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
- "digest",
+ "digest 0.10.7",
]
[[package]]
@@ -2390,8 +2428,8 @@ version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
dependencies = [
- "digest",
- "hmac",
+ "digest 0.10.7",
+ "hmac 0.12.1",
]
[[package]]
@@ -2451,7 +2489,7 @@ checksum = "934cd7631c050f4674352a6e835d5f6711ffbfb9345c2fc0107155ac495ae293"
dependencies = [
"once_cell",
"pest",
- "sha2",
+ "sha2 0.10.8",
]
[[package]]
@@ -2884,6 +2922,31 @@ dependencies = [
"portable-atomic",
]
+[[package]]
+name = "rml_amf0"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "63551cfcd4d1f42733c190e4b58dd268b1eacb73410d9afbf62784aa12cac240"
+dependencies = [
+ "byteorder",
+ "thiserror 1.0.57",
+]
+
+[[package]]
+name = "rml_rtmp"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a354e80eb7aa2a6fed09b3bd25c19bcfd32cf51f81f1219f4ec04f34519989da"
+dependencies = [
+ "byteorder",
+ "bytes",
+ "hmac 0.10.1",
+ "rand",
+ "rml_amf0",
+ "sha2 0.9.9",
+ "thiserror 1.0.57",
+]
+
[[package]]
name = "ron"
version = "0.8.1"
@@ -2909,7 +2972,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc"
dependencies = [
"const-oid",
- "digest",
+ "digest 0.10.7",
"num-bigint-dig",
"num-integer",
"num-traits",
@@ -3115,7 +3178,7 @@ dependencies = [
"password-hash",
"pbkdf2",
"salsa20",
- "sha2",
+ "sha2 0.10.8",
]
[[package]]
@@ -3239,7 +3302,7 @@ checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c"
dependencies = [
"cfg-if",
"cpufeatures",
- "digest",
+ "digest 0.10.7",
]
[[package]]
@@ -3250,7 +3313,20 @@ checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
- "digest",
+ "digest 0.10.7",
+]
+
+[[package]]
+name = "sha2"
+version = "0.9.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
+dependencies = [
+ "block-buffer 0.9.0",
+ "cfg-if",
+ "cpufeatures",
+ "digest 0.9.0",
+ "opaque-debug",
]
[[package]]
@@ -3261,7 +3337,7 @@ checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if",
"cpufeatures",
- "digest",
+ "digest 0.10.7",
]
[[package]]
@@ -3276,7 +3352,7 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
dependencies = [
- "digest",
+ "digest 0.10.7",
"rand_core",
]
@@ -3422,7 +3498,7 @@ dependencies = [
"percent-encoding",
"serde",
"serde_json",
- "sha2",
+ "sha2 0.10.8",
"smallvec",
"sqlformat",
"thiserror 1.0.57",
@@ -3460,7 +3536,7 @@ dependencies = [
"quote",
"serde",
"serde_json",
- "sha2",
+ "sha2 0.10.8",
"sqlx-core",
"sqlx-mysql",
"sqlx-postgres",
@@ -3484,7 +3560,7 @@ dependencies = [
"bytes",
"chrono",
"crc",
- "digest",
+ "digest 0.10.7",
"dotenvy",
"either",
"futures-channel",
@@ -3494,7 +3570,7 @@ dependencies = [
"generic-array",
"hex",
"hkdf",
- "hmac",
+ "hmac 0.12.1",
"itoa",
"log",
"md-5",
@@ -3505,7 +3581,7 @@ dependencies = [
"rsa",
"serde",
"sha1",
- "sha2",
+ "sha2 0.10.8",
"smallvec",
"sqlx-core",
"stringprep",
@@ -3534,7 +3610,7 @@ dependencies = [
"futures-util",
"hex",
"hkdf",
- "hmac",
+ "hmac 0.12.1",
"home",
"itoa",
"log",
@@ -3544,7 +3620,7 @@ dependencies = [
"rand",
"serde",
"serde_json",
- "sha2",
+ "sha2 0.10.8",
"smallvec",
"sqlx-core",
"stringprep",
@@ -3592,7 +3668,7 @@ dependencies = [
"ctr",
"derive_more",
"hex",
- "hmac",
+ "hmac 0.12.1",
"keyed_priority_queue",
"log",
"pbkdf2",
@@ -4777,6 +4853,7 @@ dependencies = [
"anyhow",
"async-trait",
"base64 0.22.1",
+ "bytes",
"chrono",
"clap",
"config",
@@ -4795,8 +4872,9 @@ dependencies = [
"reqwest",
"resvg",
"ringbuf",
+ "rml_rtmp",
"serde",
- "sha2",
+ "sha2 0.10.8",
"srt-tokio",
"tiny-skia",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index 0a86d96..7a1e292 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,8 +8,11 @@ name = "zap-stream-core"
path = "src/bin/zap_stream_core.rs"
[features]
-default = ["test-pattern", "srt"]
+default = ["test-pattern", "srt", "rtmp"]
srt = ["dep:srt-tokio"]
+rtmp = ["dep:rml_rtmp"]
+local-overseer = [] # WIP
+webhook-overseer = [] # WIP
zap-stream = [
"dep:nostr-sdk",
"dep:zap-stream-db",
@@ -19,10 +22,17 @@ zap-stream = [
"dep:sha2",
"tokio/fs",
]
-test-pattern = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf", "zap-stream-db/test-pattern"]
+test-pattern = [
+ "dep:resvg",
+ "dep:usvg",
+ "dep:tiny-skia",
+ "dep:fontdue",
+ "dep:ringbuf",
+ "zap-stream-db/test-pattern"
+]
[dependencies]
-ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "c2ae78acbcbe315137aea94c77b0db7ea538a709" }
+ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" }
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
anyhow = { version = "^1.0.91", features = ["backtrace"] }
pretty_env_logger = "0.5.0"
@@ -46,6 +56,9 @@ hex = "0.4.3"
# srt
srt-tokio = { version = "0.4.3", optional = true }
+# rtmp
+rml_rtmp = { version = "0.8.0", optional = true }
+
# test-pattern
resvg = { version = "0.44.0", optional = true }
usvg = { version = "0.44.0", optional = true }
@@ -60,3 +73,5 @@ fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = fa
reqwest = { version = "0.12.9", optional = true, features = ["stream"] }
base64 = { version = "0.22.1", optional = true }
sha2 = { version = "0.10.8", optional = true }
+bytes = "1.8.0"
+
diff --git a/TODO.md b/TODO.md
index b491287..cb0f270 100644
--- a/TODO.md
+++ b/TODO.md
@@ -1,7 +1,5 @@
-- Store user preference for (rates and recording) [DB]
-- Setup multi-variant output
-- Manage event lifecycle (close stream)
- RTMP?
+- Setup multi-variant output
- API parity
- fMP4 instead of MPEG-TS segments
- HLS-LL
\ No newline at end of file
diff --git a/config.yaml b/config.yaml
index d3e225a..370ff3c 100755
--- a/config.yaml
+++ b/config.yaml
@@ -2,6 +2,7 @@
# currently supporting srt/tcp/file/test-pattern
# All the endpoints must be valid URI's
endpoints:
+ - "rtmp://127.0.0.1:3336"
- "srt://127.0.0.1:3335"
- "tcp://127.0.0.1:3334"
@@ -38,8 +39,8 @@ listen_http: "127.0.0.1:8080"
overseer:
zap-stream:
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
- blossom:
- - "http://localhost:8881"
+ #blossom:
+ # - "http://localhost:8881"
relays:
- "ws://localhost:7766"
database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2"
diff --git a/src/bin/zap_stream_core.rs b/src/bin/zap_stream_core.rs
index 019ab94..58f871a 100644
--- a/src/bin/zap_stream_core.rs
+++ b/src/bin/zap_stream_core.rs
@@ -4,12 +4,14 @@ use config::Config;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
use ffmpeg_rs_raw::{av_log_redirect, rstr};
use log::{error, info};
+use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::task::JoinHandle;
use url::Url;
-
-use zap_stream_core::egress::http::listen_out_dir;
+use warp::{cors, Filter};
+#[cfg(feature = "rtmp")]
+use zap_stream_core::ingress::rtmp;
#[cfg(feature = "srt")]
use zap_stream_core::ingress::srt;
#[cfg(feature = "test-pattern")]
@@ -48,10 +50,26 @@ async fn main() -> Result<()> {
Err(e) => error!("{}", e),
}
}
- listeners.push(tokio::spawn(listen_out_dir(
- settings.listen_http,
- settings.output_dir,
- )));
+
+ let http_addr: SocketAddr = settings.listen_http.parse()?;
+ let http_dir = settings.output_dir.clone();
+ let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
+
+ listeners.push(tokio::spawn(async move {
+ let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
+
+ let index_handle = warp::get()
+ .or(warp::path("index.html"))
+ .and(warp::path::end())
+ .map(move |_| warp::reply::html(index_html.clone()));
+
+ let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors);
+
+ warp::serve(index_handle.or(dir_handle))
+ .run(http_addr)
+ .await;
+ Ok(())
+ }));
for handle in listeners {
if let Err(e) = handle.await? {
@@ -75,6 +93,12 @@ fn try_create_listener(
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
overseer.clone(),
))),
+ #[cfg(feature = "srt")]
+ "rtmp" => Ok(tokio::spawn(rtmp::listen(
+ out_dir.to_string(),
+ format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
+ overseer.clone(),
+ ))),
"tcp" => Ok(tokio::spawn(tcp::listen(
out_dir.to_string(),
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
diff --git a/src/egress/http.rs b/src/egress/http.rs
deleted file mode 100644
index 10937fe..0000000
--- a/src/egress/http.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-use std::net::SocketAddr;
-
-use anyhow::Error;
-use warp::{cors, Filter};
-
-pub async fn listen_out_dir(addr: String, dir: String) -> Result<(), Error> {
- let addr: SocketAddr = addr.parse()?;
- let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
-
- let warp_out = warp::get().and(warp::fs::dir(dir)).with(cors);
-
- warp::serve(warp_out).run(addr).await;
- Ok(())
-}
diff --git a/src/egress/mod.rs b/src/egress/mod.rs
index 8fcf0ac..4a5f8eb 100644
--- a/src/egress/mod.rs
+++ b/src/egress/mod.rs
@@ -6,7 +6,6 @@ use std::path::PathBuf;
use uuid::Uuid;
pub mod hls;
-pub mod http;
pub mod recorder;
#[derive(Clone, Debug, Serialize, Deserialize)]
diff --git a/src/index.html b/src/index.html
new file mode 100644
index 0000000..b952b71
--- /dev/null
+++ b/src/index.html
@@ -0,0 +1,17 @@
+
+
+
+ zap-stream-core
+
+
+
+Welcome to %%PUBLIC_URL%%
+
+
\ No newline at end of file
diff --git a/src/ingress/file.rs b/src/ingress/file.rs
index ed4e836..206b7b0 100644
--- a/src/ingress/file.rs
+++ b/src/ingress/file.rs
@@ -4,6 +4,7 @@ use anyhow::Result;
use log::info;
use std::path::PathBuf;
use std::sync::Arc;
+use tokio::runtime::Handle;
pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) -> Result<()> {
info!("Sending file: {}", path.display());
@@ -11,10 +12,17 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc)
let info = ConnectionInfo {
ip_addr: "127.0.0.1:6969".to_string(),
endpoint: "file-input".to_owned(),
+ app_name: "".to_string(),
key: "test".to_string(),
};
let file = std::fs::File::open(path)?;
- spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(file)).await;
+ spawn_pipeline(
+ Handle::current(),
+ info,
+ out_dir.clone(),
+ overseer.clone(),
+ Box::new(file),
+ );
Ok(())
}
diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs
index 0ad806b..1fe2861 100644
--- a/src/ingress/mod.rs
+++ b/src/ingress/mod.rs
@@ -7,6 +7,8 @@ use std::sync::Arc;
use tokio::runtime::Handle;
pub mod file;
+#[cfg(feature = "rtmp")]
+pub mod rtmp;
#[cfg(feature = "srt")]
pub mod srt;
pub mod tcp;
@@ -21,18 +23,21 @@ pub struct ConnectionInfo {
/// IP address of the connection
pub ip_addr: String,
+ /// App name, empty unless RTMP ingress
+ pub app_name: String,
+
/// Stream key
pub key: String,
}
-pub async fn spawn_pipeline(
+pub fn spawn_pipeline(
+ handle: Handle,
info: ConnectionInfo,
out_dir: String,
seer: Arc,
reader: Box,
) {
info!("New client connected: {}", &info.ip_addr);
- let handle = Handle::current();
let seer = seer.clone();
let out_dir = out_dir.to_string();
std::thread::spawn(move || unsafe {
@@ -41,10 +46,16 @@ pub async fn spawn_pipeline(
match pl.run() {
Ok(c) => {
if !c {
+ if let Err(e) = pl.flush() {
+ error!("Pipeline flush failed: {}", e);
+ }
break;
}
}
Err(e) => {
+ if let Err(e) = pl.flush() {
+ error!("Pipeline flush failed: {}", e);
+ }
error!("Pipeline run failed: {}", e);
break;
}
diff --git a/src/ingress/rtmp.rs b/src/ingress/rtmp.rs
new file mode 100644
index 0000000..8297a2c
--- /dev/null
+++ b/src/ingress/rtmp.rs
@@ -0,0 +1,239 @@
+use crate::ingress::{spawn_pipeline, ConnectionInfo};
+use crate::overseer::Overseer;
+use anyhow::{bail, Result};
+use log::{error, info};
+use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};
+use rml_rtmp::sessions::{
+ ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult,
+};
+use std::collections::VecDeque;
+use std::io::{ErrorKind, Read, Write};
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::runtime::Handle;
+use tokio::time::Instant;
+#[derive(PartialEq, Eq, Clone, Hash)]
+struct RtmpPublishedStream(String, String);
+
+struct RtmpClient {
+ socket: std::net::TcpStream,
+ media_buf: Vec,
+ session: ServerSession,
+ msg_queue: VecDeque,
+ reader_buf: [u8; 4096],
+ pub published_stream: Option,
+}
+
+impl RtmpClient {
+ async fn start(mut socket: TcpStream) -> Result {
+ let mut hs = Handshake::new(PeerType::Server);
+
+ let exchange = hs.generate_outbound_p0_and_p1()?;
+ socket.write_all(&exchange).await?;
+
+ let mut buf = [0; 4096];
+ loop {
+ let r = socket.read(&mut buf).await?;
+ if r == 0 {
+ bail!("EOF reached while reading");
+ }
+
+ match hs.process_bytes(&buf[..r])? {
+ HandshakeProcessResult::InProgress { response_bytes } => {
+ socket.write_all(&response_bytes).await?;
+ }
+ HandshakeProcessResult::Completed {
+ response_bytes,
+ remaining_bytes,
+ } => {
+ socket.write_all(&response_bytes).await?;
+
+ let cfg = ServerSessionConfig::new();
+ let (mut ses, mut res) = ServerSession::new(cfg)?;
+ let q = ses.handle_input(&remaining_bytes)?;
+ res.extend(q);
+
+ let ret = Self {
+ socket: socket.into_std()?,
+ media_buf: vec![],
+ session: ses,
+ msg_queue: VecDeque::from(res),
+ reader_buf: [0; 4096],
+ published_stream: None,
+ };
+
+ return Ok(ret);
+ }
+ }
+ }
+ }
+
+ /// Read data until we get the publish request
+ pub fn read_until_publish_request(&mut self, timeout: Duration) -> Result<()> {
+ let start = Instant::now();
+ while self.published_stream.is_none() {
+ if (Instant::now() - start) > timeout {
+ bail!("Timed out waiting for publish request");
+ }
+ self.read_data()?;
+ }
+ Ok(())
+ }
+
+ fn read_data(&mut self) -> Result<()> {
+ let r = match self.socket.read(&mut self.reader_buf) {
+ Ok(r) => r,
+ Err(e) => {
+ return match e.kind() {
+ ErrorKind::WouldBlock => Ok(()),
+ ErrorKind::Interrupted => Ok(()),
+ _ => Err(anyhow::Error::new(e)),
+ };
+ }
+ };
+ if r == 0 {
+ bail!("EOF");
+ }
+
+ let mx = self.session.handle_input(&self.reader_buf[..r])?;
+ if mx.len() > 0 {
+ self.msg_queue.extend(mx);
+ self.process_msg_queue()?;
+ }
+ Ok(())
+ }
+
+ fn process_msg_queue(&mut self) -> Result<()> {
+ while let Some(msg) = self.msg_queue.pop_front() {
+ match msg {
+ ServerSessionResult::OutboundResponse(data) => {
+ self.socket.write_all(&data.bytes)?
+ }
+ ServerSessionResult::RaisedEvent(ev) => self.handle_event(ev)?,
+ ServerSessionResult::UnhandleableMessageReceived(m) => {
+ // treat any non-flv streams as raw media stream in rtmp
+ self.media_buf.extend(&m.data);
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn handle_event(&mut self, event: ServerSessionEvent) -> Result<()> {
+ match event {
+ ServerSessionEvent::ClientChunkSizeChanged { new_chunk_size } => {
+ info!("New client chunk size: {}", new_chunk_size);
+ }
+ ServerSessionEvent::ConnectionRequested { request_id, .. } => {
+ let mx = self.session.accept_request(request_id)?;
+ self.msg_queue.extend(mx);
+ }
+ ServerSessionEvent::ReleaseStreamRequested { .. } => {}
+ ServerSessionEvent::PublishStreamRequested {
+ request_id,
+ app_name,
+ stream_key,
+ mode,
+ } => {
+ if self.published_stream.is_some() {
+ let mx =
+ self.session
+ .reject_request(request_id, "0", "stream already published")?;
+ self.msg_queue.extend(mx);
+ } else {
+ let mx = self.session.accept_request(request_id)?;
+ self.msg_queue.extend(mx);
+ info!(
+ "Published stream request: {app_name}/{stream_key} [{:?}]",
+ mode
+ );
+ self.published_stream = Some(RtmpPublishedStream(app_name, stream_key));
+ }
+ }
+ ServerSessionEvent::PublishStreamFinished { .. } => {}
+ ServerSessionEvent::StreamMetadataChanged {
+ app_name,
+ stream_key,
+ metadata,
+ } => {
+ info!(
+ "Metadata configured: {}/{} {:?}",
+ app_name, stream_key, metadata
+ );
+ }
+ ServerSessionEvent::AudioDataReceived { data, .. } => {
+ self.media_buf.extend(data);
+ }
+ ServerSessionEvent::VideoDataReceived { data, .. } => {
+ self.media_buf.extend(data);
+ }
+ ServerSessionEvent::UnhandleableAmf0Command { .. } => {}
+ ServerSessionEvent::PlayStreamRequested { request_id, .. } => {
+ let mx = self
+ .session
+ .reject_request(request_id, "0", "playback not supported")?;
+ self.msg_queue.extend(mx);
+ }
+ ServerSessionEvent::PlayStreamFinished { .. } => {}
+ ServerSessionEvent::AcknowledgementReceived { .. } => {}
+ ServerSessionEvent::PingResponseReceived { .. } => {}
+ }
+ Ok(())
+ }
+}
+
+impl Read for RtmpClient {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result {
+ // block this thread until something comes into [media_buf]
+ while self.media_buf.len() == 0 {
+ if let Err(e) = self.read_data() {
+ error!("Error reading data: {}", e);
+ return Ok(0);
+ };
+ }
+
+ let to_read = buf.len().min(self.media_buf.len());
+ let drain = self.media_buf.drain(..to_read);
+ buf[..to_read].copy_from_slice(drain.as_slice());
+ Ok(to_read)
+ }
+}
+
+pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> {
+ let listener = TcpListener::bind(&addr).await?;
+
+ info!("RTMP listening on: {}", &addr);
+ while let Ok((socket, ip)) = listener.accept().await {
+ let mut cc = RtmpClient::start(socket).await?;
+ let addr = addr.clone();
+ let overseer = overseer.clone();
+ let out_dir = out_dir.clone();
+ let handle = Handle::current();
+ std::thread::Builder::new()
+ .name("rtmp-client".to_string())
+ .spawn(move || {
+ if let Err(e) = cc.read_until_publish_request(Duration::from_secs(10)) {
+ error!("{}", e);
+ return;
+ } else {
+ let pr = cc.published_stream.as_ref().unwrap();
+ let info = ConnectionInfo {
+ ip_addr: ip.to_string(),
+ endpoint: addr.clone(),
+ app_name: pr.0.clone(),
+ key: pr.1.clone(),
+ };
+ spawn_pipeline(
+ handle,
+ info,
+ out_dir.clone(),
+ overseer.clone(),
+ Box::new(cc),
+ );
+ }
+ })?;
+ }
+ Ok(())
+}
diff --git a/src/ingress/srt.rs b/src/ingress/srt.rs
index 75e9074..f24d7f6 100644
--- a/src/ingress/srt.rs
+++ b/src/ingress/srt.rs
@@ -1,18 +1,14 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer;
-use crate::pipeline::runner::PipelineRunner;
-use crate::settings::Settings;
use anyhow::Result;
use futures_util::stream::FusedStream;
-use futures_util::{SinkExt, StreamExt, TryStreamExt};
-use log::{error, info, warn};
+use futures_util::StreamExt;
+use log::info;
use srt_tokio::{SrtListener, SrtSocket};
use std::io::Read;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::runtime::Handle;
-use tokio::sync::mpsc::unbounded_channel;
-use warp::Buf;
pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> {
let binder: SocketAddr = addr.parse()?;
@@ -20,10 +16,11 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc)
info!("SRT listening on: {}", &addr);
while let Some(request) = packets.incoming().next().await {
- let mut socket = request.accept(None).await?;
+ let socket = request.accept(None).await?;
let info = ConnectionInfo {
endpoint: addr.clone(),
ip_addr: socket.settings().remote.to_string(),
+ app_name: "".to_string(),
key: socket
.settings()
.stream_id
@@ -31,6 +28,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc)
.map_or(String::new(), |s| s.to_string()),
};
spawn_pipeline(
+ Handle::current(),
info,
out_dir.clone(),
overseer.clone(),
@@ -39,8 +37,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc)
socket,
buf: Vec::with_capacity(4096),
}),
- )
- .await;
+ );
}
Ok(())
}
@@ -58,7 +55,7 @@ impl Read for SrtReader {
if rx.is_terminated() {
return Ok(0);
}
- if let Some((_, mut data)) = self.handle.block_on(rx.next()) {
+ if let Some((_, data)) = self.handle.block_on(rx.next()) {
self.buf.extend(data.iter().as_slice());
}
}
diff --git a/src/ingress/tcp.rs b/src/ingress/tcp.rs
index 4569490..47b014a 100644
--- a/src/ingress/tcp.rs
+++ b/src/ingress/tcp.rs
@@ -1,10 +1,10 @@
+use crate::ingress::{spawn_pipeline, ConnectionInfo};
+use crate::overseer::Overseer;
use anyhow::Result;
use log::info;
use std::sync::Arc;
use tokio::net::TcpListener;
-
-use crate::ingress::{spawn_pipeline, ConnectionInfo};
-use crate::overseer::Overseer;
+use tokio::runtime::Handle;
pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> {
let listener = TcpListener::bind(&addr).await?;
@@ -14,10 +14,17 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc)
let info = ConnectionInfo {
ip_addr: ip.to_string(),
endpoint: addr.clone(),
+ app_name: "".to_string(),
key: "no-key-tcp".to_string(),
};
let socket = socket.into_std()?;
- spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await;
+ spawn_pipeline(
+ Handle::current(),
+ info,
+ out_dir.clone(),
+ overseer.clone(),
+ Box::new(socket),
+ );
}
Ok(())
}
diff --git a/src/ingress/test.rs b/src/ingress/test.rs
index 9502a3c..469071f 100644
--- a/src/ingress/test.rs
+++ b/src/ingress/test.rs
@@ -17,6 +17,7 @@ use std::io::Read;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tiny_skia::Pixmap;
+use tokio::runtime::Handle;
pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> {
info!("Test pattern enabled");
@@ -24,10 +25,17 @@ pub async fn listen(out_dir: String, overseer: Arc) -> Result<()>
let info = ConnectionInfo {
endpoint: "test-pattern".to_string(),
ip_addr: "test-pattern".to_string(),
+ app_name: "".to_string(),
key: "test".to_string(),
};
let src = TestPatternSrc::new()?;
- spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(src)).await;
+ spawn_pipeline(
+ Handle::current(),
+ info,
+ out_dir.clone(),
+ overseer.clone(),
+ Box::new(src),
+ );
Ok(())
}
diff --git a/src/overseer/local.rs b/src/overseer/local.rs
new file mode 100644
index 0000000..d9f54b3
--- /dev/null
+++ b/src/overseer/local.rs
@@ -0,0 +1,67 @@
+use crate::egress::EgressConfig;
+use crate::ingress::ConnectionInfo;
+use crate::overseer::{get_default_variants, IngressInfo, Overseer};
+use crate::pipeline::{EgressType, PipelineConfig};
+use crate::variant::StreamMapping;
+use anyhow::Result;
+use async_trait::async_trait;
+use std::path::PathBuf;
+use uuid::Uuid;
+
+/// Simple static file output without any access controls
+/// Useful for testing or self-hosting
+pub struct LocalOverseer;
+
+impl LocalOverseer {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+#[async_trait]
+impl Overseer for LocalOverseer {
+ async fn start_stream(
+ &self,
+ _connection: &ConnectionInfo,
+ stream_info: &IngressInfo,
+ ) -> Result {
+ let vars = get_default_variants(stream_info)?;
+ let var_ids = vars.iter().map(|v| v.id()).collect();
+ Ok(PipelineConfig {
+ id: Uuid::new_v4(),
+ variants: vars,
+ egress: vec![EgressType::HLS(EgressConfig {
+ name: "HLS".to_owned(),
+ variants: var_ids,
+ })],
+ })
+ }
+
+ async fn on_segment(
+ &self,
+ pipeline_id: &Uuid,
+ variant_id: &Uuid,
+ index: u64,
+ duration: f32,
+ path: &PathBuf,
+ ) -> Result<()> {
+ // nothing to do here
+ Ok(())
+ }
+
+ async fn on_thumbnail(
+ &self,
+ pipeline_id: &Uuid,
+ width: usize,
+ height: usize,
+ path: &PathBuf,
+ ) -> Result<()> {
+ // nothing to do here
+ Ok(())
+ }
+
+ async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> {
+ // nothing to do here
+ Ok(())
+ }
+}
diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs
index 8a0aa6a..2b3cf0e 100644
--- a/src/overseer/mod.rs
+++ b/src/overseer/mod.rs
@@ -1,14 +1,15 @@
-use crate::egress::EgressConfig;
use crate::ingress::ConnectionInfo;
+
+#[cfg(feature = "local-overseer")]
+use crate::overseer::local::LocalOverseer;
+#[cfg(feature = "webhook-overseer")]
use crate::overseer::webhook::WebhookOverseer;
-#[cfg(feature = "zap-stream")]
-use crate::overseer::zap_stream::ZapStreamOverseer;
-use crate::pipeline::{EgressType, PipelineConfig};
-use crate::settings::{OverseerConfig, Settings};
+use crate::pipeline::PipelineConfig;
+use crate::settings::Settings;
use crate::variant::audio::AudioVariant;
use crate::variant::mapping::VariantMapping;
use crate::variant::video::VideoVariant;
-use crate::variant::{StreamMapping, VariantStream};
+use crate::variant::VariantStream;
use anyhow::Result;
use async_trait::async_trait;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
@@ -16,8 +17,14 @@ use std::cmp::PartialEq;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
-use warp::Filter;
+#[cfg(feature = "zap-stream")]
+use crate::overseer::zap_stream::ZapStreamOverseer;
+
+#[cfg(feature = "local-overseer")]
+mod local;
+
+#[cfg(feature = "webhook-overseer")]
mod webhook;
#[cfg(feature = "zap-stream")]
@@ -90,34 +97,31 @@ pub trait Overseer: Send + Sync {
impl Settings {
pub async fn get_overseer(&self) -> Result> {
match &self.overseer {
- OverseerConfig::Static { egress_types } => Ok(Arc::new(StaticOverseer::new(
- &self.output_dir,
- egress_types,
- ))),
+ #[cfg(feature = "local-overseer")]
+ OverseerConfig::Local => Ok(Arc::new(LocalOverseer::new())),
+ #[cfg(feature = "webhook-overseer")]
OverseerConfig::Webhook { url } => Ok(Arc::new(WebhookOverseer::new(&url))),
+ #[cfg(feature = "zap-stream")]
OverseerConfig::ZapStream {
nsec: private_key,
database,
lnd,
relays,
blossom,
- } => {
- #[cfg(not(feature = "zap-stream"))]
- panic!("zap.stream overseer is not enabled");
-
- #[cfg(feature = "zap-stream")]
- Ok(Arc::new(
- ZapStreamOverseer::new(
- &self.output_dir,
- &self.public_url,
- private_key,
- database,
- lnd,
- relays,
- blossom,
- )
- .await?,
- ))
+ } => Ok(Arc::new(
+ ZapStreamOverseer::new(
+ &self.output_dir,
+ &self.public_url,
+ private_key,
+ database,
+ lnd,
+ relays,
+ blossom,
+ )
+ .await?,
+ )),
+ _ => {
+ panic!("Unsupported overseer");
}
}
}
@@ -183,61 +187,3 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result) -> Self {
- Self {}
- }
-}
-
-#[async_trait]
-impl Overseer for StaticOverseer {
- async fn start_stream(
- &self,
- _connection: &ConnectionInfo,
- stream_info: &IngressInfo,
- ) -> Result {
- let vars = get_default_variants(stream_info)?;
- let var_ids = vars.iter().map(|v| v.id()).collect();
- Ok(PipelineConfig {
- id: Uuid::new_v4(),
- variants: vars,
- egress: vec![EgressType::HLS(EgressConfig {
- name: "HLS".to_owned(),
- variants: var_ids,
- })],
- })
- }
-
- async fn on_segment(
- &self,
- pipeline_id: &Uuid,
- variant_id: &Uuid,
- index: u64,
- duration: f32,
- path: &PathBuf,
- ) -> Result<()> {
- // nothing to do here
- Ok(())
- }
-
- async fn on_thumbnail(
- &self,
- pipeline_id: &Uuid,
- width: usize,
- height: usize,
- path: &PathBuf,
- ) -> Result<()> {
- // nothing to do here
- Ok(())
- }
-
- async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> {
-
- // nothing to do here
- Ok(())
- }
-}
diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs
index 8c97dc2..cd33ec2 100644
--- a/src/overseer/zap_stream.rs
+++ b/src/overseer/zap_stream.rs
@@ -14,7 +14,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
use ffmpeg_rs_raw::Encoder;
use futures_util::FutureExt;
-use log::info;
+use log::{info, warn};
use nostr_sdk::bitcoin::PrivateKey;
use nostr_sdk::prelude::Coordinate;
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
@@ -276,7 +276,12 @@ impl Overseer for ZapStreamOverseer {
n94 = n94.add_tags(Tag::parse(&["url", &b.url]));
}
let n94 = n94.sign_with_keys(&self.keys)?;
- self.client.send_event(n94).await?;
+ let cc = self.client.clone();
+ tokio::spawn(async move {
+ if let Err(e) = cc.send_event(n94).await {
+ warn!("Error sending event: {}", e);
+ }
+ });
info!("Published N94 segment to {}", blob.url);
}
diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs
index b87beac..b7c5801 100644
--- a/src/pipeline/runner.rs
+++ b/src/pipeline/runner.rs
@@ -20,8 +20,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
- av_frame_free, av_get_sample_fmt, av_packet_free, av_pkt_dump_log2, av_q2d, av_rescale_q,
- AVMediaType,
+ av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVMediaType,
};
use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
@@ -106,7 +105,7 @@ impl PipelineRunner {
}
/// EOF, cleanup
- unsafe fn flush(&mut self) -> Result<()> {
+ pub unsafe fn flush(&mut self) -> Result<()> {
for (var, enc) in &mut self.encoders {
for mut pkt in enc.encode_frame(ptr::null_mut())? {
for eg in self.egress.iter_mut() {
@@ -118,6 +117,14 @@ impl PipelineRunner {
for eg in self.egress.iter_mut() {
eg.reset()?;
}
+
+ if let Some(config) = &self.config {
+ self.handle.block_on(async {
+ if let Err(e) = self.overseer.on_end(&config.id).await {
+ error!("Failed to end stream: {e}");
+ }
+ });
+ }
Ok(())
}
@@ -135,12 +142,6 @@ impl PipelineRunner {
// run transcoder pipeline
let (mut pkt, stream) = self.demuxer.get_packet()?;
if pkt.is_null() {
- self.handle.block_on(async {
- if let Err(e) = self.overseer.on_end(&config.id).await {
- error!("Failed to end stream: {e}");
- }
- });
- self.flush()?;
return Ok(false);
}
@@ -227,7 +228,7 @@ impl PipelineRunner {
if let Some((r, f)) = self.resampler.get_mut(&a.id()) {
let frame_size = (*enc.codec_context()).frame_size;
new_frame = true;
- let mut resampled_frame = r.process_frame(frame, frame_size)?;
+ let mut resampled_frame = r.process_frame(frame)?;
if let Some(ret) =
f.buffer_frame(resampled_frame, frame_size as usize)?
{
diff --git a/src/settings.rs b/src/settings.rs
index e1d9173..044bb36 100644
--- a/src/settings.rs
+++ b/src/settings.rs
@@ -26,10 +26,7 @@ pub struct Settings {
#[serde(rename_all = "kebab-case")]
pub enum OverseerConfig {
/// Static output
- Static {
- /// Types of output
- egress_types: Vec,
- },
+ Local,
/// Control system via external API
Webhook {
/// Webhook service URL