This commit is contained in:
kieran 2024-11-21 16:56:15 +00:00
parent f515dd09d1
commit f192a915e0
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
4 changed files with 50 additions and 10 deletions

View File

@ -8,16 +8,16 @@ name = "zap-stream-core"
path = "src/bin/zap_stream_core.rs" path = "src/bin/zap_stream_core.rs"
[features] [features]
default = ["test-pattern"] default = ["test-pattern", "srt"]
srt = ["dep:srt-tokio"] srt = ["dep:srt-tokio"]
zap-stream = [ zap-stream = [
"dep:nostr-sdk", "dep:nostr-sdk",
"dep:zap-stream-db", "dep:zap-stream-db",
"dep:fedimint-tonic-lnd", "dep:fedimint-tonic-lnd",
"dep:reqwest", "dep:reqwest",
"tokio/fs",
"dep:base64", "dep:base64",
"dep:sha2", "dep:sha2",
"tokio/fs",
] ]
test-pattern = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf", "zap-stream-db/test-pattern"] test-pattern = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf", "zap-stream-db/test-pattern"]
@ -43,8 +43,10 @@ m3u8-rs = "6.0.0"
chrono = "^0.4.38" chrono = "^0.4.38"
hex = "0.4.3" hex = "0.4.3"
# test-pattern # srt
srt-tokio = { version = "0.4.3", optional = true } srt-tokio = { version = "0.4.3", optional = true }
# test-pattern
resvg = { version = "0.44.0", optional = true } resvg = { version = "0.44.0", optional = true }
usvg = { version = "0.44.0", optional = true } usvg = { version = "0.44.0", optional = true }
tiny-skia = { version = "0.11.4", optional = true } tiny-skia = { version = "0.11.4", optional = true }

View File

@ -2,9 +2,8 @@
# currently supporting srt/tcp/file/test-pattern # currently supporting srt/tcp/file/test-pattern
# All the endpoints must be valid URI's # All the endpoints must be valid URI's
endpoints: endpoints:
- "srt://127.0.0.1:3333" - "srt://127.0.0.1:3335"
- "tcp://127.0.0.1:3334" - "tcp://127.0.0.1:3334"
- "test-pattern:"
# Output directory for recording / hls # Output directory for recording / hls
output_dir: "./out" output_dir: "./out"

View File

@ -3,14 +3,19 @@ use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner; use crate::pipeline::runner::PipelineRunner;
use crate::settings::Settings; use crate::settings::Settings;
use anyhow::Result; use anyhow::Result;
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{SinkExt, StreamExt, TryStreamExt};
use log::{error, info, warn}; use log::{error, info, warn};
use srt_tokio::{SrtListener, SrtSocket}; use srt_tokio::{SrtListener, SrtSocket};
use std::io::Read;
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::unbounded_channel;
use warp::Buf;
pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>) -> Result<()> { pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>) -> Result<()> {
let (_binding, mut packets) = SrtListener::builder().bind(&addr).await?; let binder: SocketAddr = addr.parse()?;
let (_binding, mut packets) = SrtListener::builder().bind(binder).await?;
info!("SRT listening on: {}", &addr); info!("SRT listening on: {}", &addr);
while let Some(request) = packets.incoming().next().await { while let Some(request) = packets.incoming().next().await {
@ -18,9 +23,43 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
let info = ConnectionInfo { let info = ConnectionInfo {
endpoint: addr.clone(), endpoint: addr.clone(),
ip_addr: socket.settings().remote.to_string(), ip_addr: socket.settings().remote.to_string(),
key: "".to_string(), key: socket
.settings()
.stream_id
.as_ref()
.map_or(String::new(), |s| s.to_string()),
}; };
spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await; spawn_pipeline(
info,
out_dir.clone(),
overseer.clone(),
Box::new(SrtReader {
handle: Handle::current(),
socket,
buf: Vec::with_capacity(4096),
}),
)
.await;
} }
Ok(()) Ok(())
} }
struct SrtReader {
pub handle: Handle,
pub socket: SrtSocket,
pub buf: Vec<u8>,
}
impl Read for SrtReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let (mut rx, _) = self.socket.split_mut();
while self.buf.len() < buf.len() {
if let Some((_, mut data)) = self.handle.block_on(rx.next()) {
self.buf.extend(data.iter().as_slice());
}
}
let drain = self.buf.drain(..buf.len());
buf.copy_from_slice(drain.as_slice());
Ok(buf.len())
}
}

View File

@ -14,7 +14,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
let info = ConnectionInfo { let info = ConnectionInfo {
ip_addr: ip.to_string(), ip_addr: ip.to_string(),
endpoint: addr.clone(), endpoint: addr.clone(),
key: "".to_string(), key: "no-key-tcp".to_string(),
}; };
let socket = socket.into_std()?; let socket = socket.into_std()?;
spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await; spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await;