diff --git a/Cargo.toml b/Cargo.toml index 4c9b262..0a86d96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,16 @@ name = "zap-stream-core" path = "src/bin/zap_stream_core.rs" [features] -default = ["test-pattern"] +default = ["test-pattern", "srt"] srt = ["dep:srt-tokio"] zap-stream = [ "dep:nostr-sdk", "dep:zap-stream-db", "dep:fedimint-tonic-lnd", "dep:reqwest", - "tokio/fs", "dep:base64", "dep:sha2", + "tokio/fs", ] test-pattern = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf", "zap-stream-db/test-pattern"] @@ -43,8 +43,10 @@ m3u8-rs = "6.0.0" chrono = "^0.4.38" hex = "0.4.3" -# test-pattern +# srt srt-tokio = { version = "0.4.3", optional = true } + +# test-pattern resvg = { version = "0.44.0", optional = true } usvg = { version = "0.44.0", optional = true } tiny-skia = { version = "0.11.4", optional = true } diff --git a/config.yaml b/config.yaml index 20b1a3b..d3e225a 100755 --- a/config.yaml +++ b/config.yaml @@ -2,9 +2,8 @@ # currently supporting srt/tcp/file/test-pattern # All the endpoints must be valid URI's endpoints: - - "srt://127.0.0.1:3333" + - "srt://127.0.0.1:3335" - "tcp://127.0.0.1:3334" - - "test-pattern:" # Output directory for recording / hls output_dir: "./out" diff --git a/src/ingress/srt.rs b/src/ingress/srt.rs index 6a5ab9a..53905a1 100644 --- a/src/ingress/srt.rs +++ b/src/ingress/srt.rs @@ -3,14 +3,19 @@ use crate::overseer::Overseer; use crate::pipeline::runner::PipelineRunner; use crate::settings::Settings; use anyhow::Result; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::{SinkExt, StreamExt, TryStreamExt}; use log::{error, info, warn}; use srt_tokio::{SrtListener, SrtSocket}; +use std::io::Read; +use std::net::SocketAddr; use std::sync::Arc; +use tokio::runtime::Handle; use tokio::sync::mpsc::unbounded_channel; +use warp::Buf; pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { - let (_binding, mut packets) = SrtListener::builder().bind(&addr).await?; + let binder: SocketAddr = addr.parse()?; + let (_binding, mut packets) = SrtListener::builder().bind(binder).await?; info!("SRT listening on: {}", &addr); while let Some(request) = packets.incoming().next().await { @@ -18,9 +23,43 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) let info = ConnectionInfo { endpoint: addr.clone(), ip_addr: socket.settings().remote.to_string(), - key: "".to_string(), + key: socket + .settings() + .stream_id + .as_ref() + .map_or(String::new(), |s| s.to_string()), }; - spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await; + spawn_pipeline( + info, + out_dir.clone(), + overseer.clone(), + Box::new(SrtReader { + handle: Handle::current(), + socket, + buf: Vec::with_capacity(4096), + }), + ) + .await; } Ok(()) } + +struct SrtReader { + pub handle: Handle, + pub socket: SrtSocket, + pub buf: Vec, +} + +impl Read for SrtReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let (mut rx, _) = self.socket.split_mut(); + while self.buf.len() < buf.len() { + if let Some((_, mut data)) = self.handle.block_on(rx.next()) { + self.buf.extend(data.iter().as_slice()); + } + } + let drain = self.buf.drain(..buf.len()); + buf.copy_from_slice(drain.as_slice()); + Ok(buf.len()) + } +} diff --git a/src/ingress/tcp.rs b/src/ingress/tcp.rs index d3f0ed8..4569490 100644 --- a/src/ingress/tcp.rs +++ b/src/ingress/tcp.rs @@ -14,7 +14,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc) let info = ConnectionInfo { ip_addr: ip.to_string(), endpoint: addr.clone(), - key: "".to_string(), + key: "no-key-tcp".to_string(), }; let socket = socket.into_std()?; spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await;