feat: RTMP input (WIP)

This commit is contained in:
kieran 2024-11-22 16:54:32 +00:00
parent 9937f9a6f9
commit 54bebc5170
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
19 changed files with 577 additions and 173 deletions

126
Cargo.lock generated
View File

@ -473,6 +473,15 @@ dependencies = [
"serde",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "block-buffer"
version = "0.10.4"
@ -843,6 +852,16 @@ dependencies = [
"typenum",
]
[[package]]
name = "crypto-mac"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6"
dependencies = [
"generic-array",
"subtle",
]
[[package]]
name = "ctr"
version = "0.9.2"
@ -888,13 +907,22 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"block-buffer 0.10.4",
"const-oid",
"crypto-common",
"subtle",
@ -1022,7 +1050,7 @@ dependencies = [
[[package]]
name = "ffmpeg-rs-raw"
version = "0.1.0"
source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=c2ae78acbcbe315137aea94c77b0db7ea538a709#c2ae78acbcbe315137aea94c77b0db7ea538a709"
source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0#8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0"
dependencies = [
"anyhow",
"ffmpeg-sys-the-third",
@ -1034,7 +1062,7 @@ dependencies = [
[[package]]
name = "ffmpeg-sys-the-third"
version = "2.1.0+ffmpeg-7.1"
source = "git+https://git.v0l.io/ffmpeg/ffmpeg-the-third.git?branch=master#0fdfa9ab506f5c92aad5a175db081c8a2c1579a1"
source = "git+https://git.v0l.io/Kieran/ffmpeg-the-third.git?rev=e5f8e077b04b10d5887bce4df1eb1a71738a6c66#e5f8e077b04b10d5887bce4df1eb1a71738a6c66"
dependencies = [
"bindgen",
"cc",
@ -1451,7 +1479,17 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7"
dependencies = [
"hmac",
"hmac 0.12.1",
]
[[package]]
name = "hmac"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15"
dependencies = [
"crypto-mac",
"digest 0.9.0",
]
[[package]]
@ -1460,7 +1498,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
"digest 0.10.7",
]
[[package]]
@ -1966,7 +2004,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
"digest 0.10.7",
]
[[package]]
@ -2390,8 +2428,8 @@ version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
dependencies = [
"digest",
"hmac",
"digest 0.10.7",
"hmac 0.12.1",
]
[[package]]
@ -2451,7 +2489,7 @@ checksum = "934cd7631c050f4674352a6e835d5f6711ffbfb9345c2fc0107155ac495ae293"
dependencies = [
"once_cell",
"pest",
"sha2",
"sha2 0.10.8",
]
[[package]]
@ -2884,6 +2922,31 @@ dependencies = [
"portable-atomic",
]
[[package]]
name = "rml_amf0"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63551cfcd4d1f42733c190e4b58dd268b1eacb73410d9afbf62784aa12cac240"
dependencies = [
"byteorder",
"thiserror 1.0.57",
]
[[package]]
name = "rml_rtmp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a354e80eb7aa2a6fed09b3bd25c19bcfd32cf51f81f1219f4ec04f34519989da"
dependencies = [
"byteorder",
"bytes",
"hmac 0.10.1",
"rand",
"rml_amf0",
"sha2 0.9.9",
"thiserror 1.0.57",
]
[[package]]
name = "ron"
version = "0.8.1"
@ -2909,7 +2972,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc"
dependencies = [
"const-oid",
"digest",
"digest 0.10.7",
"num-bigint-dig",
"num-integer",
"num-traits",
@ -3115,7 +3178,7 @@ dependencies = [
"password-hash",
"pbkdf2",
"salsa20",
"sha2",
"sha2 0.10.8",
]
[[package]]
@ -3239,7 +3302,7 @@ checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"digest 0.10.7",
]
[[package]]
@ -3250,7 +3313,20 @@ checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"digest 0.10.7",
]
[[package]]
name = "sha2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
dependencies = [
"block-buffer 0.9.0",
"cfg-if",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
]
[[package]]
@ -3261,7 +3337,7 @@ checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"digest 0.10.7",
]
[[package]]
@ -3276,7 +3352,7 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
dependencies = [
"digest",
"digest 0.10.7",
"rand_core",
]
@ -3422,7 +3498,7 @@ dependencies = [
"percent-encoding",
"serde",
"serde_json",
"sha2",
"sha2 0.10.8",
"smallvec",
"sqlformat",
"thiserror 1.0.57",
@ -3460,7 +3536,7 @@ dependencies = [
"quote",
"serde",
"serde_json",
"sha2",
"sha2 0.10.8",
"sqlx-core",
"sqlx-mysql",
"sqlx-postgres",
@ -3484,7 +3560,7 @@ dependencies = [
"bytes",
"chrono",
"crc",
"digest",
"digest 0.10.7",
"dotenvy",
"either",
"futures-channel",
@ -3494,7 +3570,7 @@ dependencies = [
"generic-array",
"hex",
"hkdf",
"hmac",
"hmac 0.12.1",
"itoa",
"log",
"md-5",
@ -3505,7 +3581,7 @@ dependencies = [
"rsa",
"serde",
"sha1",
"sha2",
"sha2 0.10.8",
"smallvec",
"sqlx-core",
"stringprep",
@ -3534,7 +3610,7 @@ dependencies = [
"futures-util",
"hex",
"hkdf",
"hmac",
"hmac 0.12.1",
"home",
"itoa",
"log",
@ -3544,7 +3620,7 @@ dependencies = [
"rand",
"serde",
"serde_json",
"sha2",
"sha2 0.10.8",
"smallvec",
"sqlx-core",
"stringprep",
@ -3592,7 +3668,7 @@ dependencies = [
"ctr",
"derive_more",
"hex",
"hmac",
"hmac 0.12.1",
"keyed_priority_queue",
"log",
"pbkdf2",
@ -4777,6 +4853,7 @@ dependencies = [
"anyhow",
"async-trait",
"base64 0.22.1",
"bytes",
"chrono",
"clap",
"config",
@ -4795,8 +4872,9 @@ dependencies = [
"reqwest",
"resvg",
"ringbuf",
"rml_rtmp",
"serde",
"sha2",
"sha2 0.10.8",
"srt-tokio",
"tiny-skia",
"tokio",

View File

@ -8,8 +8,11 @@ name = "zap-stream-core"
path = "src/bin/zap_stream_core.rs"
[features]
default = ["test-pattern", "srt"]
default = ["test-pattern", "srt", "rtmp"]
srt = ["dep:srt-tokio"]
rtmp = ["dep:rml_rtmp"]
local-overseer = [] # WIP
webhook-overseer = [] # WIP
zap-stream = [
"dep:nostr-sdk",
"dep:zap-stream-db",
@ -19,10 +22,17 @@ zap-stream = [
"dep:sha2",
"tokio/fs",
]
test-pattern = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf", "zap-stream-db/test-pattern"]
test-pattern = [
"dep:resvg",
"dep:usvg",
"dep:tiny-skia",
"dep:fontdue",
"dep:ringbuf",
"zap-stream-db/test-pattern"
]
[dependencies]
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "c2ae78acbcbe315137aea94c77b0db7ea538a709" }
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" }
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
anyhow = { version = "^1.0.91", features = ["backtrace"] }
pretty_env_logger = "0.5.0"
@ -46,6 +56,9 @@ hex = "0.4.3"
# srt
srt-tokio = { version = "0.4.3", optional = true }
# rtmp
rml_rtmp = { version = "0.8.0", optional = true }
# test-pattern
resvg = { version = "0.44.0", optional = true }
usvg = { version = "0.44.0", optional = true }
@ -60,3 +73,5 @@ fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = fa
reqwest = { version = "0.12.9", optional = true, features = ["stream"] }
base64 = { version = "0.22.1", optional = true }
sha2 = { version = "0.10.8", optional = true }
bytes = "1.8.0"

View File

@ -1,7 +1,5 @@
- Store user preference for (rates and recording) [DB]
- Setup multi-variant output
- Manage event lifecycle (close stream)
- RTMP?
- Setup multi-variant output
- API parity
- fMP4 instead of MPEG-TS segments
- HLS-LL

View File

@ -2,6 +2,7 @@
# currently supporting srt/tcp/file/test-pattern
# All the endpoints must be valid URI's
endpoints:
- "rtmp://127.0.0.1:3336"
- "srt://127.0.0.1:3335"
- "tcp://127.0.0.1:3334"
@ -38,8 +39,8 @@ listen_http: "127.0.0.1:8080"
overseer:
zap-stream:
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
blossom:
- "http://localhost:8881"
#blossom:
# - "http://localhost:8881"
relays:
- "ws://localhost:7766"
database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2"

View File

@ -4,12 +4,14 @@ use config::Config;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
use ffmpeg_rs_raw::{av_log_redirect, rstr};
use log::{error, info};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::task::JoinHandle;
use url::Url;
use zap_stream_core::egress::http::listen_out_dir;
use warp::{cors, Filter};
#[cfg(feature = "rtmp")]
use zap_stream_core::ingress::rtmp;
#[cfg(feature = "srt")]
use zap_stream_core::ingress::srt;
#[cfg(feature = "test-pattern")]
@ -48,10 +50,26 @@ async fn main() -> Result<()> {
Err(e) => error!("{}", e),
}
}
listeners.push(tokio::spawn(listen_out_dir(
settings.listen_http,
settings.output_dir,
)));
let http_addr: SocketAddr = settings.listen_http.parse()?;
let http_dir = settings.output_dir.clone();
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
listeners.push(tokio::spawn(async move {
let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
let index_handle = warp::get()
.or(warp::path("index.html"))
.and(warp::path::end())
.map(move |_| warp::reply::html(index_html.clone()));
let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors);
warp::serve(index_handle.or(dir_handle))
.run(http_addr)
.await;
Ok(())
}));
for handle in listeners {
if let Err(e) = handle.await? {
@ -75,6 +93,12 @@ fn try_create_listener(
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
overseer.clone(),
))),
#[cfg(feature = "srt")]
"rtmp" => Ok(tokio::spawn(rtmp::listen(
out_dir.to_string(),
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
overseer.clone(),
))),
"tcp" => Ok(tokio::spawn(tcp::listen(
out_dir.to_string(),
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),

View File

@ -1,14 +0,0 @@
use std::net::SocketAddr;
use anyhow::Error;
use warp::{cors, Filter};
pub async fn listen_out_dir(addr: String, dir: String) -> Result<(), Error> {
let addr: SocketAddr = addr.parse()?;
let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
let warp_out = warp::get().and(warp::fs::dir(dir)).with(cors);
warp::serve(warp_out).run(addr).await;
Ok(())
}

View File

@ -6,7 +6,6 @@ use std::path::PathBuf;
use uuid::Uuid;
pub mod hls;
pub mod http;
pub mod recorder;
#[derive(Clone, Debug, Serialize, Deserialize)]

17
src/index.html Normal file
View File

@ -0,0 +1,17 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>zap-stream-core</title>
<style>
html, body {
margin: 0;
background: black;
color: white;
font-family: monospace;
}
</style>
</head>
<body>
<h1>Welcome to %%PUBLIC_URL%%</h1>
</body>
</html>

View File

@ -4,6 +4,7 @@ use anyhow::Result;
use log::info;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::Handle;
pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc<dyn Overseer>) -> Result<()> {
info!("Sending file: {}", path.display());
@ -11,10 +12,17 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc<dyn Overseer>)
let info = ConnectionInfo {
ip_addr: "127.0.0.1:6969".to_string(),
endpoint: "file-input".to_owned(),
app_name: "".to_string(),
key: "test".to_string(),
};
let file = std::fs::File::open(path)?;
spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(file)).await;
spawn_pipeline(
Handle::current(),
info,
out_dir.clone(),
overseer.clone(),
Box::new(file),
);
Ok(())
}

View File

@ -7,6 +7,8 @@ use std::sync::Arc;
use tokio::runtime::Handle;
pub mod file;
#[cfg(feature = "rtmp")]
pub mod rtmp;
#[cfg(feature = "srt")]
pub mod srt;
pub mod tcp;
@ -21,18 +23,21 @@ pub struct ConnectionInfo {
/// IP address of the connection
pub ip_addr: String,
/// App name, empty unless RTMP ingress
pub app_name: String,
/// Stream key
pub key: String,
}
pub async fn spawn_pipeline(
pub fn spawn_pipeline(
handle: Handle,
info: ConnectionInfo,
out_dir: String,
seer: Arc<dyn Overseer>,
reader: Box<dyn Read + Send>,
) {
info!("New client connected: {}", &info.ip_addr);
let handle = Handle::current();
let seer = seer.clone();
let out_dir = out_dir.to_string();
std::thread::spawn(move || unsafe {
@ -41,10 +46,16 @@ pub async fn spawn_pipeline(
match pl.run() {
Ok(c) => {
if !c {
if let Err(e) = pl.flush() {
error!("Pipeline flush failed: {}", e);
}
break;
}
}
Err(e) => {
if let Err(e) = pl.flush() {
error!("Pipeline flush failed: {}", e);
}
error!("Pipeline run failed: {}", e);
break;
}

239
src/ingress/rtmp.rs Normal file
View File

@ -0,0 +1,239 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer;
use anyhow::{bail, Result};
use log::{error, info};
use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};
use rml_rtmp::sessions::{
ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult,
};
use std::collections::VecDeque;
use std::io::{ErrorKind, Read, Write};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Handle;
use tokio::time::Instant;
#[derive(PartialEq, Eq, Clone, Hash)]
struct RtmpPublishedStream(String, String);
struct RtmpClient {
socket: std::net::TcpStream,
media_buf: Vec<u8>,
session: ServerSession,
msg_queue: VecDeque<ServerSessionResult>,
reader_buf: [u8; 4096],
pub published_stream: Option<RtmpPublishedStream>,
}
impl RtmpClient {
async fn start(mut socket: TcpStream) -> Result<Self> {
let mut hs = Handshake::new(PeerType::Server);
let exchange = hs.generate_outbound_p0_and_p1()?;
socket.write_all(&exchange).await?;
let mut buf = [0; 4096];
loop {
let r = socket.read(&mut buf).await?;
if r == 0 {
bail!("EOF reached while reading");
}
match hs.process_bytes(&buf[..r])? {
HandshakeProcessResult::InProgress { response_bytes } => {
socket.write_all(&response_bytes).await?;
}
HandshakeProcessResult::Completed {
response_bytes,
remaining_bytes,
} => {
socket.write_all(&response_bytes).await?;
let cfg = ServerSessionConfig::new();
let (mut ses, mut res) = ServerSession::new(cfg)?;
let q = ses.handle_input(&remaining_bytes)?;
res.extend(q);
let ret = Self {
socket: socket.into_std()?,
media_buf: vec![],
session: ses,
msg_queue: VecDeque::from(res),
reader_buf: [0; 4096],
published_stream: None,
};
return Ok(ret);
}
}
}
}
/// Read data until we get the publish request
pub fn read_until_publish_request(&mut self, timeout: Duration) -> Result<()> {
let start = Instant::now();
while self.published_stream.is_none() {
if (Instant::now() - start) > timeout {
bail!("Timed out waiting for publish request");
}
self.read_data()?;
}
Ok(())
}
fn read_data(&mut self) -> Result<()> {
let r = match self.socket.read(&mut self.reader_buf) {
Ok(r) => r,
Err(e) => {
return match e.kind() {
ErrorKind::WouldBlock => Ok(()),
ErrorKind::Interrupted => Ok(()),
_ => Err(anyhow::Error::new(e)),
};
}
};
if r == 0 {
bail!("EOF");
}
let mx = self.session.handle_input(&self.reader_buf[..r])?;
if mx.len() > 0 {
self.msg_queue.extend(mx);
self.process_msg_queue()?;
}
Ok(())
}
fn process_msg_queue(&mut self) -> Result<()> {
while let Some(msg) = self.msg_queue.pop_front() {
match msg {
ServerSessionResult::OutboundResponse(data) => {
self.socket.write_all(&data.bytes)?
}
ServerSessionResult::RaisedEvent(ev) => self.handle_event(ev)?,
ServerSessionResult::UnhandleableMessageReceived(m) => {
// treat any non-flv streams as raw media stream in rtmp
self.media_buf.extend(&m.data);
}
}
}
Ok(())
}
fn handle_event(&mut self, event: ServerSessionEvent) -> Result<()> {
match event {
ServerSessionEvent::ClientChunkSizeChanged { new_chunk_size } => {
info!("New client chunk size: {}", new_chunk_size);
}
ServerSessionEvent::ConnectionRequested { request_id, .. } => {
let mx = self.session.accept_request(request_id)?;
self.msg_queue.extend(mx);
}
ServerSessionEvent::ReleaseStreamRequested { .. } => {}
ServerSessionEvent::PublishStreamRequested {
request_id,
app_name,
stream_key,
mode,
} => {
if self.published_stream.is_some() {
let mx =
self.session
.reject_request(request_id, "0", "stream already published")?;
self.msg_queue.extend(mx);
} else {
let mx = self.session.accept_request(request_id)?;
self.msg_queue.extend(mx);
info!(
"Published stream request: {app_name}/{stream_key} [{:?}]",
mode
);
self.published_stream = Some(RtmpPublishedStream(app_name, stream_key));
}
}
ServerSessionEvent::PublishStreamFinished { .. } => {}
ServerSessionEvent::StreamMetadataChanged {
app_name,
stream_key,
metadata,
} => {
info!(
"Metadata configured: {}/{} {:?}",
app_name, stream_key, metadata
);
}
ServerSessionEvent::AudioDataReceived { data, .. } => {
self.media_buf.extend(data);
}
ServerSessionEvent::VideoDataReceived { data, .. } => {
self.media_buf.extend(data);
}
ServerSessionEvent::UnhandleableAmf0Command { .. } => {}
ServerSessionEvent::PlayStreamRequested { request_id, .. } => {
let mx = self
.session
.reject_request(request_id, "0", "playback not supported")?;
self.msg_queue.extend(mx);
}
ServerSessionEvent::PlayStreamFinished { .. } => {}
ServerSessionEvent::AcknowledgementReceived { .. } => {}
ServerSessionEvent::PingResponseReceived { .. } => {}
}
Ok(())
}
}
impl Read for RtmpClient {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// block this thread until something comes into [media_buf]
while self.media_buf.len() == 0 {
if let Err(e) = self.read_data() {
error!("Error reading data: {}", e);
return Ok(0);
};
}
let to_read = buf.len().min(self.media_buf.len());
let drain = self.media_buf.drain(..to_read);
buf[..to_read].copy_from_slice(drain.as_slice());
Ok(to_read)
}
}
pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>) -> Result<()> {
let listener = TcpListener::bind(&addr).await?;
info!("RTMP listening on: {}", &addr);
while let Ok((socket, ip)) = listener.accept().await {
let mut cc = RtmpClient::start(socket).await?;
let addr = addr.clone();
let overseer = overseer.clone();
let out_dir = out_dir.clone();
let handle = Handle::current();
std::thread::Builder::new()
.name("rtmp-client".to_string())
.spawn(move || {
if let Err(e) = cc.read_until_publish_request(Duration::from_secs(10)) {
error!("{}", e);
return;
} else {
let pr = cc.published_stream.as_ref().unwrap();
let info = ConnectionInfo {
ip_addr: ip.to_string(),
endpoint: addr.clone(),
app_name: pr.0.clone(),
key: pr.1.clone(),
};
spawn_pipeline(
handle,
info,
out_dir.clone(),
overseer.clone(),
Box::new(cc),
);
}
})?;
}
Ok(())
}

View File

@ -1,18 +1,14 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner;
use crate::settings::Settings;
use anyhow::Result;
use futures_util::stream::FusedStream;
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use log::{error, info, warn};
use futures_util::StreamExt;
use log::info;
use srt_tokio::{SrtListener, SrtSocket};
use std::io::Read;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::mpsc::unbounded_channel;
use warp::Buf;
pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>) -> Result<()> {
let binder: SocketAddr = addr.parse()?;
@ -20,10 +16,11 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
info!("SRT listening on: {}", &addr);
while let Some(request) = packets.incoming().next().await {
let mut socket = request.accept(None).await?;
let socket = request.accept(None).await?;
let info = ConnectionInfo {
endpoint: addr.clone(),
ip_addr: socket.settings().remote.to_string(),
app_name: "".to_string(),
key: socket
.settings()
.stream_id
@ -31,6 +28,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
.map_or(String::new(), |s| s.to_string()),
};
spawn_pipeline(
Handle::current(),
info,
out_dir.clone(),
overseer.clone(),
@ -39,8 +37,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
socket,
buf: Vec::with_capacity(4096),
}),
)
.await;
);
}
Ok(())
}
@ -58,7 +55,7 @@ impl Read for SrtReader {
if rx.is_terminated() {
return Ok(0);
}
if let Some((_, mut data)) = self.handle.block_on(rx.next()) {
if let Some((_, data)) = self.handle.block_on(rx.next()) {
self.buf.extend(data.iter().as_slice());
}
}

View File

@ -1,10 +1,10 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer;
use anyhow::Result;
use log::info;
use std::sync::Arc;
use tokio::net::TcpListener;
use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer;
use tokio::runtime::Handle;
pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>) -> Result<()> {
let listener = TcpListener::bind(&addr).await?;
@ -14,10 +14,17 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
let info = ConnectionInfo {
ip_addr: ip.to_string(),
endpoint: addr.clone(),
app_name: "".to_string(),
key: "no-key-tcp".to_string(),
};
let socket = socket.into_std()?;
spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await;
spawn_pipeline(
Handle::current(),
info,
out_dir.clone(),
overseer.clone(),
Box::new(socket),
);
}
Ok(())
}

View File

@ -17,6 +17,7 @@ use std::io::Read;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tiny_skia::Pixmap;
use tokio::runtime::Handle;
pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()> {
info!("Test pattern enabled");
@ -24,10 +25,17 @@ pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()>
let info = ConnectionInfo {
endpoint: "test-pattern".to_string(),
ip_addr: "test-pattern".to_string(),
app_name: "".to_string(),
key: "test".to_string(),
};
let src = TestPatternSrc::new()?;
spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(src)).await;
spawn_pipeline(
Handle::current(),
info,
out_dir.clone(),
overseer.clone(),
Box::new(src),
);
Ok(())
}

67
src/overseer/local.rs Normal file
View File

@ -0,0 +1,67 @@
use crate::egress::EgressConfig;
use crate::ingress::ConnectionInfo;
use crate::overseer::{get_default_variants, IngressInfo, Overseer};
use crate::pipeline::{EgressType, PipelineConfig};
use crate::variant::StreamMapping;
use anyhow::Result;
use async_trait::async_trait;
use std::path::PathBuf;
use uuid::Uuid;
/// Simple static file output without any access controls
/// Useful for testing or self-hosting
pub struct LocalOverseer;
impl LocalOverseer {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl Overseer for LocalOverseer {
async fn start_stream(
&self,
_connection: &ConnectionInfo,
stream_info: &IngressInfo,
) -> Result<PipelineConfig> {
let vars = get_default_variants(stream_info)?;
let var_ids = vars.iter().map(|v| v.id()).collect();
Ok(PipelineConfig {
id: Uuid::new_v4(),
variants: vars,
egress: vec![EgressType::HLS(EgressConfig {
name: "HLS".to_owned(),
variants: var_ids,
})],
})
}
async fn on_segment(
&self,
pipeline_id: &Uuid,
variant_id: &Uuid,
index: u64,
duration: f32,
path: &PathBuf,
) -> Result<()> {
// nothing to do here
Ok(())
}
async fn on_thumbnail(
&self,
pipeline_id: &Uuid,
width: usize,
height: usize,
path: &PathBuf,
) -> Result<()> {
// nothing to do here
Ok(())
}
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> {
// nothing to do here
Ok(())
}
}

View File

@ -1,14 +1,15 @@
use crate::egress::EgressConfig;
use crate::ingress::ConnectionInfo;
#[cfg(feature = "local-overseer")]
use crate::overseer::local::LocalOverseer;
#[cfg(feature = "webhook-overseer")]
use crate::overseer::webhook::WebhookOverseer;
#[cfg(feature = "zap-stream")]
use crate::overseer::zap_stream::ZapStreamOverseer;
use crate::pipeline::{EgressType, PipelineConfig};
use crate::settings::{OverseerConfig, Settings};
use crate::pipeline::PipelineConfig;
use crate::settings::Settings;
use crate::variant::audio::AudioVariant;
use crate::variant::mapping::VariantMapping;
use crate::variant::video::VideoVariant;
use crate::variant::{StreamMapping, VariantStream};
use crate::variant::VariantStream;
use anyhow::Result;
use async_trait::async_trait;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
@ -16,8 +17,14 @@ use std::cmp::PartialEq;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
use warp::Filter;
#[cfg(feature = "zap-stream")]
use crate::overseer::zap_stream::ZapStreamOverseer;
#[cfg(feature = "local-overseer")]
mod local;
#[cfg(feature = "webhook-overseer")]
mod webhook;
#[cfg(feature = "zap-stream")]
@ -90,23 +97,18 @@ pub trait Overseer: Send + Sync {
impl Settings {
pub async fn get_overseer(&self) -> Result<Arc<dyn Overseer>> {
match &self.overseer {
OverseerConfig::Static { egress_types } => Ok(Arc::new(StaticOverseer::new(
&self.output_dir,
egress_types,
))),
#[cfg(feature = "local-overseer")]
OverseerConfig::Local => Ok(Arc::new(LocalOverseer::new())),
#[cfg(feature = "webhook-overseer")]
OverseerConfig::Webhook { url } => Ok(Arc::new(WebhookOverseer::new(&url))),
#[cfg(feature = "zap-stream")]
OverseerConfig::ZapStream {
nsec: private_key,
database,
lnd,
relays,
blossom,
} => {
#[cfg(not(feature = "zap-stream"))]
panic!("zap.stream overseer is not enabled");
#[cfg(feature = "zap-stream")]
Ok(Arc::new(
} => Ok(Arc::new(
ZapStreamOverseer::new(
&self.output_dir,
&self.public_url,
@ -117,7 +119,9 @@ impl Settings {
blossom,
)
.await?,
))
)),
_ => {
panic!("Unsupported overseer");
}
}
}
@ -183,61 +187,3 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStre
Ok(vars)
}
/// Simple static file output without any access controls
struct StaticOverseer;
impl StaticOverseer {
fn new(out_dir: &str, egress_types: &Vec<String>) -> Self {
Self {}
}
}
#[async_trait]
impl Overseer for StaticOverseer {
async fn start_stream(
&self,
_connection: &ConnectionInfo,
stream_info: &IngressInfo,
) -> Result<PipelineConfig> {
let vars = get_default_variants(stream_info)?;
let var_ids = vars.iter().map(|v| v.id()).collect();
Ok(PipelineConfig {
id: Uuid::new_v4(),
variants: vars,
egress: vec![EgressType::HLS(EgressConfig {
name: "HLS".to_owned(),
variants: var_ids,
})],
})
}
async fn on_segment(
&self,
pipeline_id: &Uuid,
variant_id: &Uuid,
index: u64,
duration: f32,
path: &PathBuf,
) -> Result<()> {
// nothing to do here
Ok(())
}
async fn on_thumbnail(
&self,
pipeline_id: &Uuid,
width: usize,
height: usize,
path: &PathBuf,
) -> Result<()> {
// nothing to do here
Ok(())
}
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> {
// nothing to do here
Ok(())
}
}

View File

@ -14,7 +14,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
use ffmpeg_rs_raw::Encoder;
use futures_util::FutureExt;
use log::info;
use log::{info, warn};
use nostr_sdk::bitcoin::PrivateKey;
use nostr_sdk::prelude::Coordinate;
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
@ -276,7 +276,12 @@ impl Overseer for ZapStreamOverseer {
n94 = n94.add_tags(Tag::parse(&["url", &b.url]));
}
let n94 = n94.sign_with_keys(&self.keys)?;
self.client.send_event(n94).await?;
let cc = self.client.clone();
tokio::spawn(async move {
if let Err(e) = cc.send_event(n94).await {
warn!("Error sending event: {}", e);
}
});
info!("Published N94 segment to {}", blob.url);
}

View File

@ -20,8 +20,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_frame_free, av_get_sample_fmt, av_packet_free, av_pkt_dump_log2, av_q2d, av_rescale_q,
AVMediaType,
av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVMediaType,
};
use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
@ -106,7 +105,7 @@ impl PipelineRunner {
}
/// EOF, cleanup
unsafe fn flush(&mut self) -> Result<()> {
pub unsafe fn flush(&mut self) -> Result<()> {
for (var, enc) in &mut self.encoders {
for mut pkt in enc.encode_frame(ptr::null_mut())? {
for eg in self.egress.iter_mut() {
@ -118,6 +117,14 @@ impl PipelineRunner {
for eg in self.egress.iter_mut() {
eg.reset()?;
}
if let Some(config) = &self.config {
self.handle.block_on(async {
if let Err(e) = self.overseer.on_end(&config.id).await {
error!("Failed to end stream: {e}");
}
});
}
Ok(())
}
@ -135,12 +142,6 @@ impl PipelineRunner {
// run transcoder pipeline
let (mut pkt, stream) = self.demuxer.get_packet()?;
if pkt.is_null() {
self.handle.block_on(async {
if let Err(e) = self.overseer.on_end(&config.id).await {
error!("Failed to end stream: {e}");
}
});
self.flush()?;
return Ok(false);
}
@ -227,7 +228,7 @@ impl PipelineRunner {
if let Some((r, f)) = self.resampler.get_mut(&a.id()) {
let frame_size = (*enc.codec_context()).frame_size;
new_frame = true;
let mut resampled_frame = r.process_frame(frame, frame_size)?;
let mut resampled_frame = r.process_frame(frame)?;
if let Some(ret) =
f.buffer_frame(resampled_frame, frame_size as usize)?
{

View File

@ -26,10 +26,7 @@ pub struct Settings {
#[serde(rename_all = "kebab-case")]
pub enum OverseerConfig {
/// Static output
Static {
/// Types of output
egress_types: Vec<String>,
},
Local,
/// Control system via external API
Webhook {
/// Webhook service URL