Compare commits
10 Commits
afbc2fc8b2
...
ccb2add607
Author | SHA1 | Date | |
---|---|---|---|
ccb2add607 | |||
32f342efde | |||
3b151563a7 | |||
6d1edb1b21 | |||
70b8cd3c55 | |||
e11d7dc787 | |||
9045bb93e4 | |||
20c9d107b7 | |||
0202a7da5f | |||
f38f436b6c |
@ -1,3 +1,3 @@
|
|||||||
target/
|
**/target/
|
||||||
.git/
|
**/.git/
|
||||||
out/
|
**/out/
|
@ -19,6 +19,6 @@ steps:
|
|||||||
- docker login -u kieran -p $TOKEN git.v0l.io
|
- docker login -u kieran -p $TOKEN git.v0l.io
|
||||||
- docker login -u voidic -p $DOCKER_TOKEN
|
- docker login -u voidic -p $DOCKER_TOKEN
|
||||||
- docker buildx create --name mybuilder --bootstrap --use
|
- docker buildx create --name mybuilder --bootstrap --use
|
||||||
- docker buildx build --push --platform linux/amd64 -t git.v0l.io/kieran/zap-stream-core:latest -t voidic/zap-stream-core:latest .
|
- docker buildx build --push --platform linux/amd64 -t git.v0l.io/kieran/zap-stream-core:latest -t voidic/zap-stream-core:latest -f ./crates/zap-stream/Dockerfile .
|
||||||
- kill $(cat /var/run/docker.pid)
|
- kill $(cat /var/run/docker.pid)
|
||||||
|
|
||||||
|
1690
Cargo.lock
generated
Executable file → Normal file
1690
Cargo.lock
generated
Executable file → Normal file
File diff suppressed because it is too large
Load Diff
75
Cargo.toml
75
Cargo.toml
@ -1,77 +1,20 @@
|
|||||||
[package]
|
[workspace]
|
||||||
name = "zap-stream-core"
|
resolver = "2"
|
||||||
version = "0.1.0"
|
members = [
|
||||||
edition = "2021"
|
"crates/core",
|
||||||
|
"crates/zap-stream",
|
||||||
[[bin]]
|
"crates/zap-stream-db"
|
||||||
name = "zap-stream-core"
|
|
||||||
path = "src/bin/zap_stream_core.rs"
|
|
||||||
|
|
||||||
[features]
|
|
||||||
default = ["test-pattern", "srt", "rtmp"]
|
|
||||||
srt = ["dep:srt-tokio"]
|
|
||||||
rtmp = ["dep:rml_rtmp"]
|
|
||||||
local-overseer = [] # WIP
|
|
||||||
webhook-overseer = [] # WIP
|
|
||||||
zap-stream = [
|
|
||||||
"dep:nostr-sdk",
|
|
||||||
"dep:zap-stream-db",
|
|
||||||
"dep:fedimint-tonic-lnd",
|
|
||||||
"dep:reqwest",
|
|
||||||
"dep:base64",
|
|
||||||
"dep:sha2",
|
|
||||||
"tokio/fs",
|
|
||||||
]
|
|
||||||
test-pattern = [
|
|
||||||
"dep:resvg",
|
|
||||||
"dep:usvg",
|
|
||||||
"dep:tiny-skia",
|
|
||||||
"dep:fontdue",
|
|
||||||
"dep:ringbuf",
|
|
||||||
"zap-stream-db/test-pattern"
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[dependencies]
|
[workspace.dependencies]
|
||||||
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" }
|
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" }
|
||||||
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
|
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
|
||||||
anyhow = { version = "^1.0.91", features = ["backtrace"] }
|
anyhow = { version = "^1.0.91", features = ["backtrace"] }
|
||||||
pretty_env_logger = "0.5.0"
|
|
||||||
tokio-stream = "0.1.14"
|
|
||||||
futures-util = "0.3.30"
|
|
||||||
async-trait = "0.1.77"
|
async-trait = "0.1.77"
|
||||||
log = "0.4.21"
|
log = "0.4.21"
|
||||||
uuid = { version = "1.8.0", features = ["v4", "serde"] }
|
uuid = { version = "1.8.0", features = ["v4", "serde"] }
|
||||||
serde = { version = "1.0.197", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
config = { version = "0.14.0", features = ["yaml"] }
|
|
||||||
url = "2.5.0"
|
url = "2.5.0"
|
||||||
itertools = "0.13.0"
|
itertools = "0.14.0"
|
||||||
rand = "0.8.5"
|
|
||||||
clap = { version = "4.5.16", features = ["derive"] }
|
|
||||||
warp = "0.3.7"
|
|
||||||
libc = "0.2.162"
|
|
||||||
m3u8-rs = "6.0.0"
|
|
||||||
chrono = "^0.4.38"
|
chrono = "^0.4.38"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
|
|
||||||
# srt
|
|
||||||
srt-tokio = { version = "0.4.3", optional = true }
|
|
||||||
|
|
||||||
# rtmp
|
|
||||||
rml_rtmp = { version = "0.8.0", optional = true }
|
|
||||||
|
|
||||||
# test-pattern
|
|
||||||
resvg = { version = "0.44.0", optional = true }
|
|
||||||
usvg = { version = "0.44.0", optional = true }
|
|
||||||
tiny-skia = { version = "0.11.4", optional = true }
|
|
||||||
fontdue = { version = "0.9.2", optional = true }
|
|
||||||
ringbuf = { version = "0.4.7", optional = true }
|
|
||||||
|
|
||||||
# zap-stream
|
|
||||||
zap-stream-db = { path = "zap-stream-db", optional = true }
|
|
||||||
nostr-sdk = { version = "0.36.0", optional = true }
|
|
||||||
fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = false, features = ["invoicesrpc", "versionrpc"] }
|
|
||||||
reqwest = { version = "0.12.9", optional = true, features = ["stream"] }
|
|
||||||
base64 = { version = "0.22.1", optional = true }
|
|
||||||
sha2 = { version = "0.10.8", optional = true }
|
|
||||||
bytes = "1.8.0"
|
|
||||||
|
|
||||||
|
5
TODO.md
5
TODO.md
@ -1,5 +1,4 @@
|
|||||||
- RTMP?
|
|
||||||
- Setup multi-variant output
|
- Setup multi-variant output
|
||||||
- API parity
|
- API parity https://git.v0l.io/Kieran/zap.stream/issues/7
|
||||||
- fMP4 instead of MPEG-TS segments
|
|
||||||
- HLS-LL
|
- HLS-LL
|
||||||
|
- Delete old segments (N94)
|
4818
crates/core/Cargo.lock
generated
Executable file
4818
crates/core/Cargo.lock
generated
Executable file
File diff suppressed because it is too large
Load Diff
44
crates/core/Cargo.toml
Normal file
44
crates/core/Cargo.toml
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
[package]
|
||||||
|
name = "zap-stream-core"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["test-pattern", "srt", "rtmp"]
|
||||||
|
srt = ["dep:srt-tokio"]
|
||||||
|
rtmp = ["dep:rml_rtmp"]
|
||||||
|
local-overseer = [] # WIP
|
||||||
|
webhook-overseer = [] # WIP
|
||||||
|
test-pattern = [
|
||||||
|
"dep:resvg",
|
||||||
|
"dep:usvg",
|
||||||
|
"dep:tiny-skia",
|
||||||
|
"dep:fontdue",
|
||||||
|
"dep:ringbuf",
|
||||||
|
]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
ffmpeg-rs-raw.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
anyhow.workspace = true
|
||||||
|
async-trait.workspace = true
|
||||||
|
log.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
|
serde.workspace = true
|
||||||
|
hex.workspace = true
|
||||||
|
itertools.workspace = true
|
||||||
|
futures-util = "0.3.30"
|
||||||
|
m3u8-rs = "6.0.0"
|
||||||
|
|
||||||
|
# srt
|
||||||
|
srt-tokio = { version = "0.4.3", optional = true }
|
||||||
|
|
||||||
|
# rtmp
|
||||||
|
rml_rtmp = { version = "0.8.0", optional = true }
|
||||||
|
|
||||||
|
# test-pattern
|
||||||
|
resvg = { version = "0.44.0", optional = true }
|
||||||
|
usvg = { version = "0.44.0", optional = true }
|
||||||
|
tiny-skia = { version = "0.11.4", optional = true }
|
||||||
|
fontdue = { version = "0.9.2", optional = true }
|
||||||
|
ringbuf = { version = "0.4.7", optional = true }
|
@ -47,7 +47,7 @@ relay {
|
|||||||
port = 7777
|
port = 7777
|
||||||
|
|
||||||
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
|
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
|
||||||
nofiles = 1000000
|
nofiles = 0
|
||||||
|
|
||||||
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
|
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
|
||||||
realIpHeader = ""
|
realIpHeader = ""
|
@ -14,11 +14,7 @@ impl Egress for HlsMuxer {
|
|||||||
packet: *mut AVPacket,
|
packet: *mut AVPacket,
|
||||||
variant: &Uuid,
|
variant: &Uuid,
|
||||||
) -> Result<EgressResult> {
|
) -> Result<EgressResult> {
|
||||||
if let Some(ns) = self.mux_packet(packet, variant)? {
|
self.mux_packet(packet, variant)
|
||||||
Ok(EgressResult::NewSegment(ns))
|
|
||||||
} else {
|
|
||||||
Ok(EgressResult::None)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn reset(&mut self) -> Result<()> {
|
unsafe fn reset(&mut self) -> Result<()> {
|
@ -25,13 +25,16 @@ pub trait Egress {
|
|||||||
pub enum EgressResult {
|
pub enum EgressResult {
|
||||||
/// Nothing to report
|
/// Nothing to report
|
||||||
None,
|
None,
|
||||||
/// A new segment was created
|
/// Egress created/deleted some segments
|
||||||
NewSegment(NewSegment),
|
Segments {
|
||||||
|
created: Vec<EgressSegment>,
|
||||||
|
deleted: Vec<EgressSegment>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Basic details of new segment created by a muxer
|
/// Basic details of new segment created by a muxer
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct NewSegment {
|
pub struct EgressSegment {
|
||||||
/// The id of the variant (video or audio)
|
/// The id of the variant (video or audio)
|
||||||
pub variant: Uuid,
|
pub variant: Uuid,
|
||||||
/// Segment index
|
/// Segment index
|
@ -65,6 +65,6 @@ impl Egress for RecorderEgress {
|
|||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn reset(&mut self) -> Result<()> {
|
unsafe fn reset(&mut self) -> Result<()> {
|
||||||
self.muxer.reset()
|
self.muxer.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,9 +1,6 @@
|
|||||||
#[cfg(feature = "zap-stream")]
|
|
||||||
pub mod blossom;
|
|
||||||
pub mod egress;
|
pub mod egress;
|
||||||
pub mod ingress;
|
pub mod ingress;
|
||||||
pub mod mux;
|
pub mod mux;
|
||||||
pub mod overseer;
|
pub mod overseer;
|
||||||
pub mod pipeline;
|
pub mod pipeline;
|
||||||
pub mod settings;
|
|
||||||
pub mod variant;
|
pub mod variant;
|
@ -1,4 +1,4 @@
|
|||||||
use crate::egress::NewSegment;
|
use crate::egress::{EgressResult, EgressSegment};
|
||||||
use crate::variant::{StreamMapping, VariantStream};
|
use crate::variant::{StreamMapping, VariantStream};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
|
||||||
@ -79,6 +79,8 @@ pub struct HlsVariant {
|
|||||||
pub streams: Vec<HlsVariantStream>,
|
pub streams: Vec<HlsVariantStream>,
|
||||||
/// Segment length in seconds
|
/// Segment length in seconds
|
||||||
pub segment_length: f32,
|
pub segment_length: f32,
|
||||||
|
/// Total number of segments to store for this variant
|
||||||
|
pub segment_window: Option<u16>,
|
||||||
/// Current segment index
|
/// Current segment index
|
||||||
pub idx: u64,
|
pub idx: u64,
|
||||||
/// Current segment start time in seconds (duration)
|
/// Current segment start time in seconds (duration)
|
||||||
@ -91,20 +93,24 @@ pub struct HlsVariant {
|
|||||||
pub segment_type: SegmentType,
|
pub segment_type: SegmentType,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SegmentInfo(u64, f32, SegmentType);
|
struct SegmentInfo {
|
||||||
|
pub index: u64,
|
||||||
|
pub duration: f32,
|
||||||
|
pub kind: SegmentType,
|
||||||
|
}
|
||||||
|
|
||||||
impl SegmentInfo {
|
impl SegmentInfo {
|
||||||
fn to_media_segment(&self) -> MediaSegment {
|
fn to_media_segment(&self) -> MediaSegment {
|
||||||
MediaSegment {
|
MediaSegment {
|
||||||
uri: self.filename(),
|
uri: self.filename(),
|
||||||
duration: self.1,
|
duration: self.duration,
|
||||||
title: None,
|
title: None,
|
||||||
..MediaSegment::default()
|
..MediaSegment::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn filename(&self) -> String {
|
fn filename(&self) -> String {
|
||||||
HlsVariant::segment_name(self.2, self.0)
|
HlsVariant::segment_name(self.kind, self.index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,11 +181,16 @@ impl HlsVariant {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: name.clone(),
|
name: name.clone(),
|
||||||
segment_length,
|
segment_length,
|
||||||
|
segment_window: Some(10), //TODO: configure window
|
||||||
mux,
|
mux,
|
||||||
streams,
|
streams,
|
||||||
idx: 1,
|
idx: 1,
|
||||||
pkt_start: 0.0,
|
pkt_start: 0.0,
|
||||||
segments: Vec::from([SegmentInfo(1, segment_length, segment_type)]),
|
segments: Vec::from([SegmentInfo {
|
||||||
|
index: 1,
|
||||||
|
duration: segment_length,
|
||||||
|
kind: segment_type,
|
||||||
|
}]),
|
||||||
out_dir: out_dir.to_string(),
|
out_dir: out_dir.to_string(),
|
||||||
segment_type,
|
segment_type,
|
||||||
})
|
})
|
||||||
@ -205,31 +216,32 @@ impl HlsVariant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Mux a packet created by the encoder for this variant
|
/// Mux a packet created by the encoder for this variant
|
||||||
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<Option<NewSegment>> {
|
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
|
||||||
let pkt_q = av_q2d((*pkt).time_base);
|
let pkt_q = av_q2d((*pkt).time_base);
|
||||||
// time of this packet in seconds
|
// time of this packet in seconds
|
||||||
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
|
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
|
||||||
// what segment this pkt should be in (index)
|
// what segment this pkt should be in (index)
|
||||||
let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64;
|
let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64;
|
||||||
|
|
||||||
let mut result = None;
|
let mut result = EgressResult::None;
|
||||||
let pkt_stream = *(*self.mux.context())
|
let pkt_stream = *(*self.mux.context())
|
||||||
.streams
|
.streams
|
||||||
.add((*pkt).stream_index as usize);
|
.add((*pkt).stream_index as usize);
|
||||||
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
|
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
|
||||||
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
|
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
|
||||||
if pkt_seg != self.idx && can_split {
|
if pkt_seg != self.idx && can_split {
|
||||||
result = Some(self.split_next_seg(pkt_time)?);
|
result = self.split_next_seg(pkt_time)?;
|
||||||
}
|
}
|
||||||
self.mux.write_packet(pkt)?;
|
self.mux.write_packet(pkt)?;
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub unsafe fn reset(&mut self) -> Result<()> {
|
pub unsafe fn reset(&mut self) -> Result<()> {
|
||||||
self.mux.reset()
|
self.mux.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<NewSegment> {
|
/// Reset the muxer state and start the next segment
|
||||||
|
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<EgressResult> {
|
||||||
self.idx += 1;
|
self.idx += 1;
|
||||||
|
|
||||||
// Manually reset muxer avio
|
// Manually reset muxer avio
|
||||||
@ -257,19 +269,40 @@ impl HlsVariant {
|
|||||||
|
|
||||||
let duration = pkt_time - self.pkt_start;
|
let duration = pkt_time - self.pkt_start;
|
||||||
info!("Writing segment {} [{}s]", &next_seg_url, duration);
|
info!("Writing segment {} [{}s]", &next_seg_url, duration);
|
||||||
if let Err(e) = self.add_segment(self.idx, duration) {
|
if let Err(e) = self.push_segment(self.idx, duration) {
|
||||||
warn!("Failed to update playlist: {}", e);
|
warn!("Failed to update playlist: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the video variant for this group
|
/// Get the video variant for this group
|
||||||
/// since this could actually be audio which would not be useful for
|
/// since this could actually be audio which would not be useful for
|
||||||
/// [Overseer] impl
|
/// [Overseer] impl
|
||||||
let video_var = self.video_stream().unwrap_or(self.streams.first().unwrap());
|
let video_var_id = self
|
||||||
|
.video_stream()
|
||||||
|
.unwrap_or(self.streams.first().unwrap())
|
||||||
|
.id()
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
// cleanup old segments
|
||||||
|
let deleted = self
|
||||||
|
.clean_segments()?
|
||||||
|
.into_iter()
|
||||||
|
.map(|seg| EgressSegment {
|
||||||
|
variant: video_var_id,
|
||||||
|
idx: seg.index,
|
||||||
|
duration: seg.duration,
|
||||||
|
path: PathBuf::from(Self::map_segment_path(
|
||||||
|
&self.out_dir,
|
||||||
|
&self.name,
|
||||||
|
seg.index,
|
||||||
|
self.segment_type,
|
||||||
|
)),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
// emit result of the previously completed segment,
|
// emit result of the previously completed segment,
|
||||||
let prev_seg = self.idx - 1;
|
let prev_seg = self.idx - 1;
|
||||||
let ret = NewSegment {
|
let created = EgressSegment {
|
||||||
variant: *video_var.id(),
|
variant: video_var_id,
|
||||||
idx: prev_seg,
|
idx: prev_seg,
|
||||||
duration,
|
duration,
|
||||||
path: PathBuf::from(Self::map_segment_path(
|
path: PathBuf::from(Self::map_segment_path(
|
||||||
@ -280,7 +313,10 @@ impl HlsVariant {
|
|||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
self.pkt_start = pkt_time;
|
self.pkt_start = pkt_time;
|
||||||
Ok(ret)
|
Ok(EgressResult::Segments {
|
||||||
|
created: vec![created],
|
||||||
|
deleted,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn video_stream(&self) -> Option<&HlsVariantStream> {
|
fn video_stream(&self) -> Option<&HlsVariantStream> {
|
||||||
@ -289,22 +325,39 @@ impl HlsVariant {
|
|||||||
.find(|a| matches!(*a, HlsVariantStream::Video { .. }))
|
.find(|a| matches!(*a, HlsVariantStream::Video { .. }))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
|
/// Add a new segment to the variant and return a list of deleted segments
|
||||||
self.segments
|
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
|
||||||
.push(SegmentInfo(idx, duration, self.segment_type));
|
self.segments.push(SegmentInfo {
|
||||||
|
index: idx,
|
||||||
|
duration,
|
||||||
|
kind: self.segment_type,
|
||||||
|
});
|
||||||
|
|
||||||
|
self.write_playlist()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete segments which are too old
|
||||||
|
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
|
||||||
const MAX_SEGMENTS: usize = 10;
|
const MAX_SEGMENTS: usize = 10;
|
||||||
|
|
||||||
|
let mut ret = vec![];
|
||||||
if self.segments.len() > MAX_SEGMENTS {
|
if self.segments.len() > MAX_SEGMENTS {
|
||||||
let n_drain = self.segments.len() - MAX_SEGMENTS;
|
let n_drain = self.segments.len() - MAX_SEGMENTS;
|
||||||
let seg_dir = self.out_dir();
|
let seg_dir = self.out_dir();
|
||||||
for seg in self.segments.drain(..n_drain) {
|
for seg in self.segments.drain(..n_drain) {
|
||||||
// delete file
|
// delete file
|
||||||
let seg_path = seg_dir.join(seg.filename());
|
let seg_path = seg_dir.join(seg.filename());
|
||||||
std::fs::remove_file(seg_path)?;
|
if let Err(e) = std::fs::remove_file(&seg_path) {
|
||||||
|
warn!(
|
||||||
|
"Failed to remove segment file: {} {}",
|
||||||
|
seg_path.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
ret.push(seg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.write_playlist()
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_playlist(&mut self) -> Result<()> {
|
fn write_playlist(&mut self) -> Result<()> {
|
||||||
@ -312,7 +365,7 @@ impl HlsVariant {
|
|||||||
pl.target_duration = self.segment_length as u64;
|
pl.target_duration = self.segment_length as u64;
|
||||||
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
||||||
pl.version = Some(3);
|
pl.version = Some(3);
|
||||||
pl.media_sequence = self.segments.first().map(|s| s.0).unwrap_or(0);
|
pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0);
|
||||||
|
|
||||||
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
|
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
|
||||||
pl.write_to(&mut f_out)?;
|
pl.write_to(&mut f_out)?;
|
||||||
@ -430,7 +483,7 @@ impl HlsMuxer {
|
|||||||
&mut self,
|
&mut self,
|
||||||
pkt: *mut AVPacket,
|
pkt: *mut AVPacket,
|
||||||
variant: &Uuid,
|
variant: &Uuid,
|
||||||
) -> Result<Option<NewSegment>> {
|
) -> Result<EgressResult> {
|
||||||
for var in self.variants.iter_mut() {
|
for var in self.variants.iter_mut() {
|
||||||
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
|
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
|
||||||
// very important for muxer to know which stream this pkt belongs to
|
// very important for muxer to know which stream this pkt belongs to
|
@ -20,6 +20,10 @@ impl LocalOverseer {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Overseer for LocalOverseer {
|
impl Overseer for LocalOverseer {
|
||||||
|
async fn check_streams(&self) -> Result<()> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
async fn start_stream(
|
async fn start_stream(
|
||||||
&self,
|
&self,
|
||||||
_connection: &ConnectionInfo,
|
_connection: &ConnectionInfo,
|
80
crates/core/src/overseer/mod.rs
Normal file
80
crates/core/src/overseer/mod.rs
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
use crate::ingress::ConnectionInfo;
|
||||||
|
|
||||||
|
use crate::egress::EgressSegment;
|
||||||
|
use crate::pipeline::PipelineConfig;
|
||||||
|
use anyhow::Result;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use std::cmp::PartialEq;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[cfg(feature = "local-overseer")]
|
||||||
|
mod local;
|
||||||
|
|
||||||
|
#[cfg(feature = "webhook-overseer")]
|
||||||
|
mod webhook;
|
||||||
|
|
||||||
|
/// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr
|
||||||
|
#[derive(PartialEq, Clone)]
|
||||||
|
pub struct IngressInfo {
|
||||||
|
pub bitrate: usize,
|
||||||
|
pub streams: Vec<IngressStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A copy of [ffmpeg_rs_raw::StreamInfo] without ptr
|
||||||
|
#[derive(PartialEq, Clone)]
|
||||||
|
pub struct IngressStream {
|
||||||
|
pub index: usize,
|
||||||
|
pub stream_type: IngressStreamType,
|
||||||
|
pub codec: isize,
|
||||||
|
pub format: isize,
|
||||||
|
pub width: usize,
|
||||||
|
pub height: usize,
|
||||||
|
pub fps: f32,
|
||||||
|
pub sample_rate: usize,
|
||||||
|
pub language: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone)]
|
||||||
|
pub enum IngressStreamType {
|
||||||
|
Video,
|
||||||
|
Audio,
|
||||||
|
Subtitle,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
/// The control process that oversees streaming operations
|
||||||
|
pub trait Overseer: Send + Sync {
|
||||||
|
/// Check all streams
|
||||||
|
async fn check_streams(&self) -> Result<()>;
|
||||||
|
|
||||||
|
/// Set up a new streaming pipeline
|
||||||
|
async fn start_stream(
|
||||||
|
&self,
|
||||||
|
connection: &ConnectionInfo,
|
||||||
|
stream_info: &IngressInfo,
|
||||||
|
) -> Result<PipelineConfig>;
|
||||||
|
|
||||||
|
/// A new segment(s) (HLS etc.) was generated for a stream variant
|
||||||
|
///
|
||||||
|
/// This handler is usually used for distribution / billing
|
||||||
|
async fn on_segments(
|
||||||
|
&self,
|
||||||
|
pipeline_id: &Uuid,
|
||||||
|
added: &Vec<EgressSegment>,
|
||||||
|
deleted: &Vec<EgressSegment>,
|
||||||
|
) -> Result<()>;
|
||||||
|
|
||||||
|
/// At a regular interval, pipeline will emit one of the frames for processing as a
|
||||||
|
/// thumbnail
|
||||||
|
async fn on_thumbnail(
|
||||||
|
&self,
|
||||||
|
pipeline_id: &Uuid,
|
||||||
|
width: usize,
|
||||||
|
height: usize,
|
||||||
|
path: &PathBuf,
|
||||||
|
) -> Result<()>;
|
||||||
|
|
||||||
|
/// Stream is finished
|
||||||
|
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>;
|
||||||
|
}
|
@ -21,6 +21,10 @@ impl WebhookOverseer {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Overseer for WebhookOverseer {
|
impl Overseer for WebhookOverseer {
|
||||||
|
async fn check_streams(&self) -> Result<()> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
async fn start_stream(
|
async fn start_stream(
|
||||||
&self,
|
&self,
|
||||||
connection: &ConnectionInfo,
|
connection: &ConnectionInfo,
|
@ -73,6 +73,9 @@ pub struct PipelineRunner {
|
|||||||
overseer: Arc<dyn Overseer>,
|
overseer: Arc<dyn Overseer>,
|
||||||
|
|
||||||
fps_counter_start: Instant,
|
fps_counter_start: Instant,
|
||||||
|
fps_last_frame_ctr: u64,
|
||||||
|
|
||||||
|
/// Total number of frames produced
|
||||||
frame_ctr: u64,
|
frame_ctr: u64,
|
||||||
out_dir: String,
|
out_dir: String,
|
||||||
}
|
}
|
||||||
@ -100,6 +103,7 @@ impl PipelineRunner {
|
|||||||
fps_counter_start: Instant::now(),
|
fps_counter_start: Instant::now(),
|
||||||
egress: Vec::new(),
|
egress: Vec::new(),
|
||||||
frame_ctr: 0,
|
frame_ctr: 0,
|
||||||
|
fps_last_frame_ctr: 0,
|
||||||
info: None,
|
info: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -140,7 +144,7 @@ impl PipelineRunner {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// run transcoder pipeline
|
// run transcoder pipeline
|
||||||
let (mut pkt, stream) = self.demuxer.get_packet()?;
|
let (mut pkt, _stream) = self.demuxer.get_packet()?;
|
||||||
if pkt.is_null() {
|
if pkt.is_null() {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
@ -155,16 +159,14 @@ impl PipelineRunner {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut egress_results = vec![];
|
let mut egress_results = vec![];
|
||||||
for frame in frames {
|
for (frame, stream) in frames {
|
||||||
// Copy frame from GPU if using hwaccel decoding
|
// Copy frame from GPU if using hwaccel decoding
|
||||||
let mut frame = get_frame_from_hw(frame)?;
|
let mut frame = get_frame_from_hw(frame)?;
|
||||||
(*frame).time_base = (*stream).time_base;
|
(*frame).time_base = (*stream).time_base;
|
||||||
|
|
||||||
let p = (*stream).codecpar;
|
let p = (*stream).codecpar;
|
||||||
if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
|
if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
|
||||||
let pts_sec = ((*frame).pts as f64 * av_q2d((*stream).time_base)).floor() as u64;
|
if (self.frame_ctr % 1800) == 0 {
|
||||||
// write thumbnail every 1min
|
|
||||||
if pts_sec % 60 == 0 && pts_sec != 0 {
|
|
||||||
let dst_pic = PathBuf::from(&self.out_dir)
|
let dst_pic = PathBuf::from(&self.out_dir)
|
||||||
.join(config.id.to_string())
|
.join(config.id.to_string())
|
||||||
.join("thumb.webp");
|
.join("thumb.webp");
|
||||||
@ -268,22 +270,24 @@ impl PipelineRunner {
|
|||||||
// egress results
|
// egress results
|
||||||
self.handle.block_on(async {
|
self.handle.block_on(async {
|
||||||
for er in egress_results {
|
for er in egress_results {
|
||||||
if let EgressResult::NewSegment(seg) = er {
|
if let EgressResult::Segments { created, deleted } = er {
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.overseer
|
.overseer
|
||||||
.on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path)
|
.on_segments(&config.id, &created, &deleted)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
error!("Failed to process segment: {}", e);
|
bail!("Failed to process segment {}", e.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
Ok(())
|
||||||
|
})?;
|
||||||
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
|
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
|
||||||
if elapsed >= 2f32 {
|
if elapsed >= 2f32 {
|
||||||
info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed);
|
let n_frames = self.frame_ctr - self.fps_last_frame_ctr;
|
||||||
|
info!("Average fps: {:.2}", n_frames as f32 / elapsed);
|
||||||
self.fps_counter_start = Instant::now();
|
self.fps_counter_start = Instant::now();
|
||||||
self.frame_ctr = 0;
|
self.fps_last_frame_ctr = self.frame_ctr;
|
||||||
}
|
}
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
Before Width: | Height: | Size: 39 KiB After Width: | Height: | Size: 39 KiB |
@ -8,8 +8,8 @@ default = []
|
|||||||
test-pattern = []
|
test-pattern = []
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "^1.0.70"
|
anyhow.workspace = true
|
||||||
chrono = { version = "0.4.38", features = ["serde"] }
|
chrono.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
|
|
||||||
sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono"] }
|
sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono"] }
|
||||||
log = "0.4.22"
|
|
||||||
uuid = { version = "1.11.0", features = ["v4"] }
|
|
@ -1,8 +1,9 @@
|
|||||||
use crate::{User, UserStream};
|
use crate::{User, UserStream};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use sqlx::{MySqlPool, Row};
|
use sqlx::{Executor, MySqlPool, Row};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct ZapStreamDb {
|
pub struct ZapStreamDb {
|
||||||
db: MySqlPool,
|
db: MySqlPool,
|
||||||
}
|
}
|
||||||
@ -42,6 +43,16 @@ impl ZapStreamDb {
|
|||||||
.map_err(anyhow::Error::new)?)
|
.map_err(anyhow::Error::new)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update a users balance
|
||||||
|
pub async fn update_user_balance(&self, uid: u64, diff: i64) -> Result<()> {
|
||||||
|
sqlx::query("update user set balance = balance + ? where id = ?")
|
||||||
|
.bind(diff)
|
||||||
|
.bind(uid)
|
||||||
|
.execute(&self.db)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
|
pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
|
||||||
let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id")
|
let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id")
|
||||||
.bind(pubkey.as_slice())
|
.bind(pubkey.as_slice())
|
||||||
@ -101,4 +112,45 @@ impl ZapStreamDb {
|
|||||||
.await
|
.await
|
||||||
.map_err(anyhow::Error::new)?)
|
.map_err(anyhow::Error::new)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the list of active streams
|
||||||
|
pub async fn list_live_streams(&self) -> Result<Vec<UserStream>> {
|
||||||
|
Ok(sqlx::query_as("select * from user_stream where state = 2")
|
||||||
|
.fetch_all(&self.db)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add [duration] & [cost] to a stream and return the new user balance
|
||||||
|
pub async fn tick_stream(
|
||||||
|
&self,
|
||||||
|
stream_id: &Uuid,
|
||||||
|
user_id: u64,
|
||||||
|
duration: f32,
|
||||||
|
cost: i64,
|
||||||
|
) -> Result<i64> {
|
||||||
|
let mut tx = self.db.begin().await?;
|
||||||
|
|
||||||
|
sqlx::query("update user_stream set duration = duration + ?, cost = cost + ? where id = ?")
|
||||||
|
.bind(&duration)
|
||||||
|
.bind(&cost)
|
||||||
|
.bind(stream_id.to_string())
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
sqlx::query("update user set balance = balance - ? where id = ?")
|
||||||
|
.bind(&cost)
|
||||||
|
.bind(&user_id)
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let balance: i64 = sqlx::query("select balance from user where id = ?")
|
||||||
|
.bind(&user_id)
|
||||||
|
.fetch_one(&mut *tx)
|
||||||
|
.await?
|
||||||
|
.try_get(0)?;
|
||||||
|
|
||||||
|
tx.commit().await?;
|
||||||
|
|
||||||
|
Ok(balance)
|
||||||
|
}
|
||||||
}
|
}
|
@ -1,7 +1,6 @@
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use sqlx::{FromRow, Type};
|
use sqlx::{FromRow, Type};
|
||||||
use std::fmt::{Display, Formatter};
|
use std::fmt::{Display, Formatter};
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, FromRow)]
|
#[derive(Debug, Clone, FromRow)]
|
||||||
pub struct User {
|
pub struct User {
|
7
crates/zap-stream/Cargo.lock
generated
Normal file
7
crates/zap-stream/Cargo.lock
generated
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 4
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "zap-stream"
|
||||||
|
version = "0.1.0"
|
43
crates/zap-stream/Cargo.toml
Normal file
43
crates/zap-stream/Cargo.toml
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
[package]
|
||||||
|
name = "zap-stream"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["srt", "rtmp", "test-pattern"]
|
||||||
|
srt = ["zap-stream-core/srt"]
|
||||||
|
rtmp = ["zap-stream-core/rtmp"]
|
||||||
|
test-pattern = ["zap-stream-core/test-pattern", "zap-stream-db/test-pattern"]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
zap-stream-db = { path = "../zap-stream-db" }
|
||||||
|
zap-stream-core = { path = "../core" }
|
||||||
|
|
||||||
|
uuid.workspace = true
|
||||||
|
ffmpeg-rs-raw.workspace = true
|
||||||
|
anyhow.workspace = true
|
||||||
|
log.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
async-trait.workspace = true
|
||||||
|
serde.workspace = true
|
||||||
|
chrono.workspace = true
|
||||||
|
hex.workspace = true
|
||||||
|
url.workspace = true
|
||||||
|
|
||||||
|
# http setuff
|
||||||
|
hyper = { version = "1.5.1", features = ["server"] }
|
||||||
|
bytes = "1.8.0"
|
||||||
|
http-body-util = "0.1.2"
|
||||||
|
tokio-util = "0.7.13"
|
||||||
|
hyper-util = "0.1.10"
|
||||||
|
|
||||||
|
# direct deps
|
||||||
|
config = { version = "0.15.6", features = ["yaml"] }
|
||||||
|
nostr-sdk = { version = "0.38.0" }
|
||||||
|
fedimint-tonic-lnd = { version = "0.2.0", default-features = false, features = ["invoicesrpc", "versionrpc"] }
|
||||||
|
reqwest = { version = "0.12.9", features = ["stream", "json"] }
|
||||||
|
base64 = { version = "0.22.1" }
|
||||||
|
sha2 = { version = "0.10.8" }
|
||||||
|
pretty_env_logger = "0.5.0"
|
||||||
|
clap = { version = "4.5.16", features = ["derive"] }
|
||||||
|
futures-util = "0.3.31"
|
@ -31,7 +31,7 @@ RUN git clone --single-branch --branch release/7.1 https://git.ffmpeg.org/ffmpeg
|
|||||||
--disable-static \
|
--disable-static \
|
||||||
--enable-shared && \
|
--enable-shared && \
|
||||||
make -j$(nproc) && make install
|
make -j$(nproc) && make install
|
||||||
RUN cargo install --path . --bin zap-stream-core --root /app/build --features zap-stream
|
RUN cargo install --path ./crates/zap-stream --root /app/build
|
||||||
|
|
||||||
FROM $IMAGE AS runner
|
FROM $IMAGE AS runner
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
@ -40,4 +40,4 @@ RUN apt update && \
|
|||||||
rm -rf /var/lib/apt/lists/*
|
rm -rf /var/lib/apt/lists/*
|
||||||
COPY --from=build /app/build .
|
COPY --from=build /app/build .
|
||||||
COPY --from=build /app/ffmpeg/lib/ /lib
|
COPY --from=build /app/ffmpeg/lib/ /lib
|
||||||
ENTRYPOINT ["/app/bin/zap-stream-core"]
|
ENTRYPOINT ["/app/bin/zap-stream"]
|
@ -5,6 +5,10 @@ endpoints:
|
|||||||
- "rtmp://127.0.0.1:3336"
|
- "rtmp://127.0.0.1:3336"
|
||||||
- "srt://127.0.0.1:3335"
|
- "srt://127.0.0.1:3335"
|
||||||
- "tcp://127.0.0.1:3334"
|
- "tcp://127.0.0.1:3334"
|
||||||
|
- "test-pattern://"
|
||||||
|
|
||||||
|
# Public hostname which points to the IP address used to listen for all [endpoints]
|
||||||
|
endpoints_public_hostname: "localhost"
|
||||||
|
|
||||||
# Output directory for recording / hls
|
# Output directory for recording / hls
|
||||||
output_dir: "./out"
|
output_dir: "./out"
|
||||||
@ -38,6 +42,7 @@ listen_http: "127.0.0.1:8080"
|
|||||||
#
|
#
|
||||||
overseer:
|
overseer:
|
||||||
zap-stream:
|
zap-stream:
|
||||||
|
cost: 16
|
||||||
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
|
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
|
||||||
#blossom:
|
#blossom:
|
||||||
# - "http://localhost:8881"
|
# - "http://localhost:8881"
|
2
crates/zap-stream/dev-setup/db.sql
Normal file
2
crates/zap-stream/dev-setup/db.sql
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
create database route96;
|
||||||
|
create database zap_stream;
|
5
crates/zap-stream/dev-setup/route96.yaml
Normal file
5
crates/zap-stream/dev-setup/route96.yaml
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
listen: "0.0.0.0:8000"
|
||||||
|
database: "mysql://root:root@db:3306/route96"
|
||||||
|
storage_dir: "./data"
|
||||||
|
max_upload_bytes: 5e+9
|
||||||
|
public_url: "http://localhost:8881"
|
144
crates/zap-stream/dev-setup/strfry.conf
Normal file
144
crates/zap-stream/dev-setup/strfry.conf
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
##
|
||||||
|
## Default strfry config
|
||||||
|
##
|
||||||
|
|
||||||
|
# Directory that contains the strfry LMDB database (restart required)
|
||||||
|
db = "./strfry-db/"
|
||||||
|
|
||||||
|
dbParams {
|
||||||
|
# Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required)
|
||||||
|
maxreaders = 256
|
||||||
|
|
||||||
|
# Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used) (restart required)
|
||||||
|
mapsize = 10995116277760
|
||||||
|
|
||||||
|
# Disables read-ahead when accessing the LMDB mapping. Reduces IO activity when DB size is larger than RAM. (restart required)
|
||||||
|
noReadAhead = false
|
||||||
|
}
|
||||||
|
|
||||||
|
events {
|
||||||
|
# Maximum size of normalised JSON, in bytes
|
||||||
|
maxEventSize = 65536
|
||||||
|
|
||||||
|
# Events newer than this will be rejected
|
||||||
|
rejectEventsNewerThanSeconds = 900
|
||||||
|
|
||||||
|
# Events older than this will be rejected
|
||||||
|
rejectEventsOlderThanSeconds = 94608000
|
||||||
|
|
||||||
|
# Ephemeral events older than this will be rejected
|
||||||
|
rejectEphemeralEventsOlderThanSeconds = 60
|
||||||
|
|
||||||
|
# Ephemeral events will be deleted from the DB when older than this
|
||||||
|
ephemeralEventsLifetimeSeconds = 300
|
||||||
|
|
||||||
|
# Maximum number of tags allowed
|
||||||
|
maxNumTags = 2000
|
||||||
|
|
||||||
|
# Maximum size for tag values, in bytes
|
||||||
|
maxTagValSize = 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
relay {
|
||||||
|
# Interface to listen on. Use 0.0.0.0 to listen on all interfaces (restart required)
|
||||||
|
bind = "0.0.0.0"
|
||||||
|
|
||||||
|
# Port to open for the nostr websocket protocol (restart required)
|
||||||
|
port = 7777
|
||||||
|
|
||||||
|
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
|
||||||
|
nofiles = 0
|
||||||
|
|
||||||
|
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
|
||||||
|
realIpHeader = ""
|
||||||
|
|
||||||
|
info {
|
||||||
|
# NIP-11: Name of this server. Short/descriptive (< 30 characters)
|
||||||
|
name = "strfry default"
|
||||||
|
|
||||||
|
# NIP-11: Detailed information about relay, free-form
|
||||||
|
description = "This is a strfry instance."
|
||||||
|
|
||||||
|
# NIP-11: Administrative nostr pubkey, for contact purposes
|
||||||
|
pubkey = ""
|
||||||
|
|
||||||
|
# NIP-11: Alternative administrative contact (email, website, etc)
|
||||||
|
contact = ""
|
||||||
|
|
||||||
|
# NIP-11: URL pointing to an image to be used as an icon for the relay
|
||||||
|
icon = ""
|
||||||
|
|
||||||
|
# List of supported lists as JSON array, or empty string to use default. Example: "[1,2]"
|
||||||
|
nips = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
|
||||||
|
maxWebsocketPayloadSize = 131072
|
||||||
|
|
||||||
|
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
|
||||||
|
autoPingSeconds = 55
|
||||||
|
|
||||||
|
# If TCP keep-alive should be enabled (detect dropped connections to upstream reverse proxy)
|
||||||
|
enableTcpKeepalive = false
|
||||||
|
|
||||||
|
# How much uninterrupted CPU time a REQ query should get during its DB scan
|
||||||
|
queryTimesliceBudgetMicroseconds = 10000
|
||||||
|
|
||||||
|
# Maximum records that can be returned per filter
|
||||||
|
maxFilterLimit = 500
|
||||||
|
|
||||||
|
# Maximum number of subscriptions (concurrent REQs) a connection can have open at any time
|
||||||
|
maxSubsPerConnection = 20
|
||||||
|
|
||||||
|
writePolicy {
|
||||||
|
# If non-empty, path to an executable script that implements the writePolicy plugin logic
|
||||||
|
plugin = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
compression {
|
||||||
|
# Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required)
|
||||||
|
enabled = true
|
||||||
|
|
||||||
|
# Maintain a sliding window buffer for each connection. Improves compression, but uses more memory (restart required)
|
||||||
|
slidingWindow = true
|
||||||
|
}
|
||||||
|
|
||||||
|
logging {
|
||||||
|
# Dump all incoming messages
|
||||||
|
dumpInAll = false
|
||||||
|
|
||||||
|
# Dump all incoming EVENT messages
|
||||||
|
dumpInEvents = false
|
||||||
|
|
||||||
|
# Dump all incoming REQ/CLOSE messages
|
||||||
|
dumpInReqs = false
|
||||||
|
|
||||||
|
# Log performance metrics for initial REQ database scans
|
||||||
|
dbScanPerf = false
|
||||||
|
|
||||||
|
# Log reason for invalid event rejection? Can be disabled to silence excessive logging
|
||||||
|
invalidEvents = true
|
||||||
|
}
|
||||||
|
|
||||||
|
numThreads {
|
||||||
|
# Ingester threads: route incoming requests, validate events/sigs (restart required)
|
||||||
|
ingester = 3
|
||||||
|
|
||||||
|
# reqWorker threads: Handle initial DB scan for events (restart required)
|
||||||
|
reqWorker = 3
|
||||||
|
|
||||||
|
# reqMonitor threads: Handle filtering of new events (restart required)
|
||||||
|
reqMonitor = 3
|
||||||
|
|
||||||
|
# negentropy threads: Handle negentropy protocol messages (restart required)
|
||||||
|
negentropy = 2
|
||||||
|
}
|
||||||
|
|
||||||
|
negentropy {
|
||||||
|
# Support negentropy protocol messages
|
||||||
|
enabled = true
|
||||||
|
|
||||||
|
# Maximum records that sync will process before returning an error
|
||||||
|
maxSyncEvents = 1000000
|
||||||
|
}
|
||||||
|
}
|
@ -18,14 +18,14 @@ services:
|
|||||||
blossom:
|
blossom:
|
||||||
depends_on:
|
depends_on:
|
||||||
- db
|
- db
|
||||||
image: voidic/route96
|
image: voidic/route96:latest
|
||||||
environment:
|
environment:
|
||||||
- "RUST_LOG=info"
|
- "RUST_LOG=info"
|
||||||
ports:
|
ports:
|
||||||
- "8881:8000"
|
- "8881:8000"
|
||||||
volumes:
|
volumes:
|
||||||
- "blossom:/app/data"
|
- "blossom:/app/data"
|
||||||
- "./dev-setup/route96.toml:/app/config.toml"
|
- "./dev-setup/route96.yaml:/app/config.yaml"
|
||||||
volumes:
|
volumes:
|
||||||
db:
|
db:
|
||||||
blossom:
|
blossom:
|
204
crates/zap-stream/src/api.rs
Normal file
204
crates/zap-stream/src/api.rs
Normal file
@ -0,0 +1,204 @@
|
|||||||
|
use crate::http::check_nip98_auth;
|
||||||
|
use crate::settings::Settings;
|
||||||
|
use crate::ListenerEndpoint;
|
||||||
|
use anyhow::{anyhow, bail, Result};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use fedimint_tonic_lnd::tonic::codegen::Body;
|
||||||
|
use http_body_util::combinators::BoxBody;
|
||||||
|
use http_body_util::{BodyExt, Full};
|
||||||
|
use hyper::body::Incoming;
|
||||||
|
use hyper::{Method, Request, Response};
|
||||||
|
use nostr_sdk::{serde_json, Event, PublicKey};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use url::Url;
|
||||||
|
use zap_stream_db::ZapStreamDb;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Api {
|
||||||
|
db: ZapStreamDb,
|
||||||
|
settings: Settings,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Api {
|
||||||
|
pub fn new(db: ZapStreamDb, settings: Settings) -> Self {
|
||||||
|
Self { db, settings }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handler(
|
||||||
|
self,
|
||||||
|
req: Request<Incoming>,
|
||||||
|
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||||
|
let base = Response::builder()
|
||||||
|
.header("server", "zap-stream")
|
||||||
|
.header("access-control-allow-origin", "*")
|
||||||
|
.header("access-control-allow-headers", "*")
|
||||||
|
.header("access-control-allow-methods", "HEAD, GET");
|
||||||
|
|
||||||
|
Ok(match (req.method(), req.uri().path()) {
|
||||||
|
(&Method::GET, "/api/v1/account") => {
|
||||||
|
let auth = check_nip98_auth(&req)?;
|
||||||
|
let rsp = self.get_account(&auth.pubkey).await?;
|
||||||
|
return Ok(base.body(Self::body_json(&rsp)?)?);
|
||||||
|
}
|
||||||
|
(&Method::PATCH, "/api/v1/account") => {
|
||||||
|
let auth = check_nip98_auth(&req)?;
|
||||||
|
let body = req.collect().await?.to_bytes();
|
||||||
|
let r_body: PatchAccount = serde_json::from_slice(&body)?;
|
||||||
|
let rsp = self.update_account(&auth.pubkey, r_body).await?;
|
||||||
|
return Ok(base.body(Self::body_json(&rsp)?)?);
|
||||||
|
}
|
||||||
|
(&Method::GET, "/api/v1/topup") => {
|
||||||
|
let auth = check_nip98_auth(&req)?;
|
||||||
|
let url: Url = req.uri().to_string().parse()?;
|
||||||
|
let amount: usize = url
|
||||||
|
.query_pairs()
|
||||||
|
.find_map(|(k, v)| if k == "amount" { Some(v) } else { None })
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.ok_or(anyhow!("Missing amount"))?;
|
||||||
|
let rsp = self.topup(&auth.pubkey, amount).await?;
|
||||||
|
return Ok(base.body(Self::body_json(&rsp)?)?);
|
||||||
|
}
|
||||||
|
(&Method::PATCH, "/api/v1/event") => {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
(&Method::POST, "/api/v1/withdraw") => {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
(&Method::POST, "/api/v1/account/forward") => {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
(&Method::DELETE, "/api/v1/account/forward/<id>") => {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
(&Method::GET, "/api/v1/account/history") => {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
(&Method::GET, "/api/v1/account/keys") => {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
if req.method() == Method::OPTIONS {
|
||||||
|
base.body(Default::default())?
|
||||||
|
} else {
|
||||||
|
base.status(404).body(Default::default())?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn body_json<T: Serialize>(obj: &T) -> Result<BoxBody<Bytes, anyhow::Error>> {
|
||||||
|
Ok(Full::from(serde_json::to_string(obj)?)
|
||||||
|
.map_err(|e| match e {})
|
||||||
|
.boxed())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_account(&self, pubkey: &PublicKey) -> Result<AccountInfo> {
|
||||||
|
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
|
||||||
|
let user = self.db.get_user(uid).await?;
|
||||||
|
|
||||||
|
Ok(AccountInfo {
|
||||||
|
endpoints: self
|
||||||
|
.settings
|
||||||
|
.endpoints
|
||||||
|
.iter()
|
||||||
|
.filter_map(|e| match ListenerEndpoint::from_str(&e).ok()? {
|
||||||
|
ListenerEndpoint::SRT { endpoint } => {
|
||||||
|
let addr: SocketAddr = endpoint.parse().ok()?;
|
||||||
|
Some(Endpoint {
|
||||||
|
name: "SRT".to_string(),
|
||||||
|
url: format!(
|
||||||
|
"srt://{}:{}",
|
||||||
|
self.settings.endpoints_public_hostname,
|
||||||
|
addr.port()
|
||||||
|
),
|
||||||
|
key: user.stream_key.clone(),
|
||||||
|
capabilities: vec![],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
ListenerEndpoint::RTMP { endpoint } => {
|
||||||
|
let addr: SocketAddr = endpoint.parse().ok()?;
|
||||||
|
Some(Endpoint {
|
||||||
|
name: "RTMP".to_string(),
|
||||||
|
url: format!(
|
||||||
|
"rtmp://{}:{}",
|
||||||
|
self.settings.endpoints_public_hostname,
|
||||||
|
addr.port()
|
||||||
|
),
|
||||||
|
key: user.stream_key.clone(),
|
||||||
|
capabilities: vec![],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
ListenerEndpoint::TCP { endpoint } => {
|
||||||
|
let addr: SocketAddr = endpoint.parse().ok()?;
|
||||||
|
Some(Endpoint {
|
||||||
|
name: "TCP".to_string(),
|
||||||
|
url: format!(
|
||||||
|
"tcp://{}:{}",
|
||||||
|
self.settings.endpoints_public_hostname,
|
||||||
|
addr.port()
|
||||||
|
),
|
||||||
|
key: user.stream_key.clone(),
|
||||||
|
capabilities: vec![],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
ListenerEndpoint::File { .. } => None,
|
||||||
|
ListenerEndpoint::TestPattern => None,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
event: None,
|
||||||
|
balance: user.balance as u64,
|
||||||
|
tos: AccountTos {
|
||||||
|
accepted: user.tos_accepted.is_some(),
|
||||||
|
link: "https://zap.stream/tos".to_string(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_account(&self, pubkey: &PublicKey, account: PatchAccount) -> Result<()> {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn topup(&self, pubkey: &PublicKey, amount: usize) -> Result<TopupResponse> {
|
||||||
|
bail!("Not implemented")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct AccountInfo {
|
||||||
|
pub endpoints: Vec<Endpoint>,
|
||||||
|
pub event: Option<Event>,
|
||||||
|
pub balance: u64,
|
||||||
|
pub tos: AccountTos,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct Endpoint {
|
||||||
|
pub name: String,
|
||||||
|
pub url: String,
|
||||||
|
pub key: String,
|
||||||
|
pub capabilities: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct EndpointCost {
|
||||||
|
pub unit: String,
|
||||||
|
pub rate: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct AccountTos {
|
||||||
|
pub accepted: bool,
|
||||||
|
pub link: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct PatchAccount {
|
||||||
|
pub accept_tos: Option<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct TopupResponse {
|
||||||
|
pub pr: String,
|
||||||
|
}
|
@ -59,15 +59,11 @@ impl Blossom {
|
|||||||
) -> Result<BlobDescriptor> {
|
) -> Result<BlobDescriptor> {
|
||||||
let mut f = File::open(from_file).await?;
|
let mut f = File::open(from_file).await?;
|
||||||
let hash = Self::hash_file(&mut f).await?;
|
let hash = Self::hash_file(&mut f).await?;
|
||||||
let auth_event = EventBuilder::new(
|
let auth_event = EventBuilder::new(Kind::Custom(24242), "Upload blob").tags([
|
||||||
Kind::Custom(24242),
|
|
||||||
"Upload blob",
|
|
||||||
[
|
|
||||||
Tag::hashtag("upload"),
|
Tag::hashtag("upload"),
|
||||||
Tag::parse(&["x", &hash])?,
|
Tag::parse(["x", &hash])?,
|
||||||
Tag::expiration(Timestamp::now().add(60)),
|
Tag::expiration(Timestamp::now().add(60)),
|
||||||
],
|
]);
|
||||||
);
|
|
||||||
|
|
||||||
let auth_event = auth_event.sign_with_keys(keys)?;
|
let auth_event = auth_event.sign_with_keys(keys)?;
|
||||||
|
|
118
crates/zap-stream/src/http.rs
Normal file
118
crates/zap-stream/src/http.rs
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
use crate::api::Api;
|
||||||
|
use crate::overseer::ZapStreamOverseer;
|
||||||
|
use anyhow::{bail, Result};
|
||||||
|
use base64::Engine;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures_util::TryStreamExt;
|
||||||
|
use http_body_util::combinators::BoxBody;
|
||||||
|
use http_body_util::{BodyExt, Full, StreamBody};
|
||||||
|
use hyper::body::{Frame, Incoming};
|
||||||
|
use hyper::service::Service;
|
||||||
|
use hyper::{Method, Request, Response};
|
||||||
|
use log::{error, info};
|
||||||
|
use nostr_sdk::{serde_json, Event};
|
||||||
|
use std::future::Future;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio_util::io::ReaderStream;
|
||||||
|
use zap_stream_core::overseer::Overseer;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct HttpServer {
|
||||||
|
index: String,
|
||||||
|
files_dir: PathBuf,
|
||||||
|
api: Api,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpServer {
|
||||||
|
pub fn new(index: String, files_dir: PathBuf, api: Api) -> Self {
|
||||||
|
Self {
|
||||||
|
index,
|
||||||
|
files_dir,
|
||||||
|
api,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<Request<Incoming>> for HttpServer {
|
||||||
|
type Response = Response<BoxBody<Bytes, Self::Error>>;
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||||
|
|
||||||
|
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
||||||
|
// check is index.html
|
||||||
|
if req.method() == Method::GET && req.uri().path() == "/"
|
||||||
|
|| req.uri().path() == "/index.html"
|
||||||
|
{
|
||||||
|
let index = self.index.clone();
|
||||||
|
return Box::pin(async move {
|
||||||
|
Ok(Response::builder()
|
||||||
|
.header("content-type", "text/html")
|
||||||
|
.header("server", "zap-stream-core")
|
||||||
|
.body(
|
||||||
|
Full::new(Bytes::from(index))
|
||||||
|
.map_err(|e| match e {})
|
||||||
|
.boxed(),
|
||||||
|
)?)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if mapped to file
|
||||||
|
let mut dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
||||||
|
if dst_path.exists() {
|
||||||
|
return Box::pin(async move {
|
||||||
|
let mut rsp = Response::builder()
|
||||||
|
.header("server", "zap-stream-core")
|
||||||
|
.header("access-control-allow-origin", "*")
|
||||||
|
.header("access-control-allow-headers", "*")
|
||||||
|
.header("access-control-allow-methods", "HEAD, GET");
|
||||||
|
|
||||||
|
if req.method() == Method::HEAD {
|
||||||
|
return Ok(rsp.body(BoxBody::default())?);
|
||||||
|
}
|
||||||
|
let f = File::open(&dst_path).await?;
|
||||||
|
let f_stream = ReaderStream::new(f);
|
||||||
|
let body = StreamBody::new(
|
||||||
|
f_stream
|
||||||
|
.map_ok(Frame::data)
|
||||||
|
.map_err(|e| Self::Error::new(e)),
|
||||||
|
)
|
||||||
|
.boxed();
|
||||||
|
Ok(rsp.body(body)?)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise handle in overseer
|
||||||
|
let mut api = self.api.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
match api.handler(req).await {
|
||||||
|
Ok(res) => Ok(res),
|
||||||
|
Err(e) => {
|
||||||
|
error!("{}", e);
|
||||||
|
Ok(Response::builder().status(500).body(BoxBody::default())?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check_nip98_auth(req: &Request<Incoming>) -> Result<Event> {
|
||||||
|
let auth = if let Some(a) = req.headers().get("authorization") {
|
||||||
|
a.to_str()?
|
||||||
|
} else {
|
||||||
|
bail!("Authorization header missing");
|
||||||
|
};
|
||||||
|
|
||||||
|
if !auth.starts_with("Nostr ") {
|
||||||
|
bail!("Invalid authorization scheme");
|
||||||
|
}
|
||||||
|
|
||||||
|
let json =
|
||||||
|
String::from_utf8(base64::engine::general_purpose::STANDARD.decode(auth[6..].as_bytes())?)?;
|
||||||
|
info!("{}", json);
|
||||||
|
|
||||||
|
// TODO: check tags
|
||||||
|
Ok(serde_json::from_str::<Event>(&json)?)
|
||||||
|
}
|
183
crates/zap-stream/src/main.rs
Normal file
183
crates/zap-stream/src/main.rs
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
use anyhow::{bail, Result};
|
||||||
|
use clap::Parser;
|
||||||
|
use config::Config;
|
||||||
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
|
||||||
|
use ffmpeg_rs_raw::{av_log_redirect, rstr};
|
||||||
|
use hyper::server::conn::http1;
|
||||||
|
use hyper_util::rt::TokioIo;
|
||||||
|
use log::{error, info};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
use url::Url;
|
||||||
|
#[cfg(feature = "rtmp")]
|
||||||
|
use zap_stream_core::ingress::rtmp;
|
||||||
|
#[cfg(feature = "srt")]
|
||||||
|
use zap_stream_core::ingress::srt;
|
||||||
|
#[cfg(feature = "test-pattern")]
|
||||||
|
use zap_stream_core::ingress::test;
|
||||||
|
|
||||||
|
use crate::api::Api;
|
||||||
|
use crate::http::HttpServer;
|
||||||
|
use crate::monitor::BackgroundMonitor;
|
||||||
|
use crate::overseer::ZapStreamOverseer;
|
||||||
|
use crate::settings::Settings;
|
||||||
|
use zap_stream_core::ingress::{file, tcp};
|
||||||
|
use zap_stream_core::overseer::Overseer;
|
||||||
|
|
||||||
|
mod api;
|
||||||
|
mod blossom;
|
||||||
|
mod http;
|
||||||
|
mod monitor;
|
||||||
|
mod overseer;
|
||||||
|
mod settings;
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
struct Args {}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
pretty_env_logger::init();
|
||||||
|
|
||||||
|
let _args = Args::parse();
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
av_log_set_callback(Some(av_log_redirect));
|
||||||
|
info!("FFMPEG version={}", rstr!(av_version_info()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let builder = Config::builder()
|
||||||
|
.add_source(config::File::with_name("config.yaml"))
|
||||||
|
.add_source(config::Environment::with_prefix("APP"))
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
let settings: Settings = builder.try_deserialize()?;
|
||||||
|
let overseer = settings.get_overseer().await?;
|
||||||
|
|
||||||
|
// Create ingress listeners
|
||||||
|
let mut tasks = vec![];
|
||||||
|
for e in &settings.endpoints {
|
||||||
|
match try_create_listener(e, &settings.output_dir, &overseer) {
|
||||||
|
Ok(l) => tasks.push(l),
|
||||||
|
Err(e) => error!("{}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let http_addr: SocketAddr = settings.listen_http.parse()?;
|
||||||
|
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
|
||||||
|
|
||||||
|
let api = Api::new(overseer.database(), settings.clone());
|
||||||
|
// HTTP server
|
||||||
|
let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api);
|
||||||
|
tasks.push(tokio::spawn(async move {
|
||||||
|
let listener = TcpListener::bind(&http_addr).await?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (socket, _) = listener.accept().await?;
|
||||||
|
let io = TokioIo::new(socket);
|
||||||
|
let server = server.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = http1::Builder::new().serve_connection(io, server).await {
|
||||||
|
error!("Failed to handle request: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Background worker
|
||||||
|
let mut bg = BackgroundMonitor::new(overseer.clone());
|
||||||
|
tasks.push(tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Err(e) = bg.check().await {
|
||||||
|
error!("{}", e);
|
||||||
|
}
|
||||||
|
sleep(Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Join tasks and get errors
|
||||||
|
for handle in tasks {
|
||||||
|
if let Err(e) = handle.await? {
|
||||||
|
error!("{e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("Server closed");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ListenerEndpoint {
|
||||||
|
SRT { endpoint: String },
|
||||||
|
RTMP { endpoint: String },
|
||||||
|
TCP { endpoint: String },
|
||||||
|
File { path: PathBuf },
|
||||||
|
TestPattern,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for ListenerEndpoint {
|
||||||
|
type Err = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||||
|
let url: Url = s.parse()?;
|
||||||
|
match url.scheme() {
|
||||||
|
"srt" => Ok(Self::SRT {
|
||||||
|
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||||
|
}),
|
||||||
|
"rtmp" => Ok(Self::RTMP {
|
||||||
|
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||||
|
}),
|
||||||
|
"tcp" => Ok(Self::TCP {
|
||||||
|
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||||
|
}),
|
||||||
|
"file" => Ok(Self::File {
|
||||||
|
path: PathBuf::from(url.path()),
|
||||||
|
}),
|
||||||
|
"test-pattern" => Ok(Self::TestPattern),
|
||||||
|
_ => bail!("Unsupported endpoint scheme: {}", url.scheme()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_create_listener(
|
||||||
|
u: &str,
|
||||||
|
out_dir: &str,
|
||||||
|
overseer: &Arc<ZapStreamOverseer>,
|
||||||
|
) -> Result<JoinHandle<Result<()>>> {
|
||||||
|
let ep = ListenerEndpoint::from_str(u)?;
|
||||||
|
match ep {
|
||||||
|
#[cfg(feature = "srt")]
|
||||||
|
ListenerEndpoint::SRT { endpoint } => Ok(tokio::spawn(srt::listen(
|
||||||
|
out_dir.to_string(),
|
||||||
|
endpoint,
|
||||||
|
overseer.clone(),
|
||||||
|
))),
|
||||||
|
#[cfg(feature = "rtmp")]
|
||||||
|
ListenerEndpoint::RTMP { endpoint } => Ok(tokio::spawn(rtmp::listen(
|
||||||
|
out_dir.to_string(),
|
||||||
|
endpoint,
|
||||||
|
overseer.clone(),
|
||||||
|
))),
|
||||||
|
ListenerEndpoint::TCP { endpoint } => Ok(tokio::spawn(tcp::listen(
|
||||||
|
out_dir.to_string(),
|
||||||
|
endpoint,
|
||||||
|
overseer.clone(),
|
||||||
|
))),
|
||||||
|
ListenerEndpoint::File { path } => Ok(tokio::spawn(file::listen(
|
||||||
|
out_dir.to_string(),
|
||||||
|
path,
|
||||||
|
overseer.clone(),
|
||||||
|
))),
|
||||||
|
#[cfg(feature = "test-pattern")]
|
||||||
|
ListenerEndpoint::TestPattern => Ok(tokio::spawn(test::listen(
|
||||||
|
out_dir.to_string(),
|
||||||
|
overseer.clone(),
|
||||||
|
))),
|
||||||
|
_ => {
|
||||||
|
bail!("Unknown endpoint config: {u}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
19
crates/zap-stream/src/monitor.rs
Normal file
19
crates/zap-stream/src/monitor.rs
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
use crate::overseer::ZapStreamOverseer;
|
||||||
|
use anyhow::Result;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use zap_stream_core::overseer::Overseer;
|
||||||
|
|
||||||
|
/// Monitor stream status, perform any necessary cleanup
|
||||||
|
pub struct BackgroundMonitor {
|
||||||
|
overseer: Arc<ZapStreamOverseer>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BackgroundMonitor {
|
||||||
|
pub fn new(overseer: Arc<ZapStreamOverseer>) -> Self {
|
||||||
|
Self { overseer }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn check(&mut self) -> Result<()> {
|
||||||
|
self.overseer.check_streams().await
|
||||||
|
}
|
||||||
|
}
|
@ -1,30 +1,44 @@
|
|||||||
use crate::blossom::{BlobDescriptor, Blossom};
|
use crate::blossom::{BlobDescriptor, Blossom};
|
||||||
use crate::egress::hls::HlsEgress;
|
|
||||||
use crate::egress::EgressConfig;
|
|
||||||
use crate::ingress::ConnectionInfo;
|
|
||||||
use crate::overseer::{get_default_variants, IngressInfo, Overseer};
|
|
||||||
use crate::pipeline::{EgressType, PipelineConfig};
|
|
||||||
use crate::settings::LndSettings;
|
use crate::settings::LndSettings;
|
||||||
use crate::variant::StreamMapping;
|
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use base64::alphabet::STANDARD;
|
||||||
|
use base64::Engine;
|
||||||
|
use bytes::Bytes;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use fedimint_tonic_lnd::verrpc::VersionRequest;
|
use fedimint_tonic_lnd::verrpc::VersionRequest;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
|
||||||
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||||
use ffmpeg_rs_raw::Encoder;
|
use ffmpeg_rs_raw::Encoder;
|
||||||
use futures_util::FutureExt;
|
use futures_util::FutureExt;
|
||||||
use log::{info, warn};
|
use http_body_util::combinators::BoxBody;
|
||||||
|
use http_body_util::{BodyExt, Full};
|
||||||
|
use hyper::body::Incoming;
|
||||||
|
use hyper::{Method, Request, Response};
|
||||||
|
use log::{error, info, warn};
|
||||||
use nostr_sdk::bitcoin::PrivateKey;
|
use nostr_sdk::bitcoin::PrivateKey;
|
||||||
use nostr_sdk::prelude::Coordinate;
|
use nostr_sdk::prelude::Coordinate;
|
||||||
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
|
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
|
||||||
|
use serde::Serialize;
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::env::temp_dir;
|
use std::env::temp_dir;
|
||||||
use std::fs::create_dir_all;
|
use std::fs::create_dir_all;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use warp::Filter;
|
use zap_stream_core::egress::hls::HlsEgress;
|
||||||
|
use zap_stream_core::egress::{EgressConfig, EgressSegment};
|
||||||
|
use zap_stream_core::ingress::ConnectionInfo;
|
||||||
|
use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer};
|
||||||
|
use zap_stream_core::pipeline::{EgressType, PipelineConfig};
|
||||||
|
use zap_stream_core::variant::audio::AudioVariant;
|
||||||
|
use zap_stream_core::variant::mapping::VariantMapping;
|
||||||
|
use zap_stream_core::variant::video::VideoVariant;
|
||||||
|
use zap_stream_core::variant::{StreamMapping, VariantStream};
|
||||||
use zap_stream_db::sqlx::Encode;
|
use zap_stream_db::sqlx::Encode;
|
||||||
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
|
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
|
||||||
|
|
||||||
@ -46,6 +60,11 @@ pub struct ZapStreamOverseer {
|
|||||||
blossom_servers: Vec<Blossom>,
|
blossom_servers: Vec<Blossom>,
|
||||||
/// Public facing URL pointing to [out_dir]
|
/// Public facing URL pointing to [out_dir]
|
||||||
public_url: String,
|
public_url: String,
|
||||||
|
/// Cost / second / variant
|
||||||
|
cost: i64,
|
||||||
|
/// Currently active streams
|
||||||
|
/// Any streams which are not contained in this set are dead
|
||||||
|
active_streams: Arc<RwLock<HashSet<Uuid>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ZapStreamOverseer {
|
impl ZapStreamOverseer {
|
||||||
@ -57,10 +76,25 @@ impl ZapStreamOverseer {
|
|||||||
lnd: &LndSettings,
|
lnd: &LndSettings,
|
||||||
relays: &Vec<String>,
|
relays: &Vec<String>,
|
||||||
blossom_servers: &Option<Vec<String>>,
|
blossom_servers: &Option<Vec<String>>,
|
||||||
|
cost: i64,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let db = ZapStreamDb::new(db).await?;
|
let db = ZapStreamDb::new(db).await?;
|
||||||
db.migrate().await?;
|
db.migrate().await?;
|
||||||
|
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
{
|
||||||
|
let uid = db.upsert_user(&[0; 32]).await?;
|
||||||
|
db.update_user_balance(uid, 100_000_000).await?;
|
||||||
|
let user = db.get_user(uid).await?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"ZERO pubkey: uid={},key={},balance={}",
|
||||||
|
user.id,
|
||||||
|
user.stream_key,
|
||||||
|
user.balance / 1000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
let mut lnd = fedimint_tonic_lnd::connect(
|
let mut lnd = fedimint_tonic_lnd::connect(
|
||||||
lnd.address.clone(),
|
lnd.address.clone(),
|
||||||
PathBuf::from(&lnd.cert),
|
PathBuf::from(&lnd.cert),
|
||||||
@ -94,9 +128,15 @@ impl ZapStreamOverseer {
|
|||||||
.map(|b| Blossom::new(b))
|
.map(|b| Blossom::new(b))
|
||||||
.collect(),
|
.collect(),
|
||||||
public_url: public_url.clone(),
|
public_url: public_url.clone(),
|
||||||
|
cost,
|
||||||
|
active_streams: Arc::new(RwLock::new(HashSet::new())),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn database(&self) -> ZapStreamDb {
|
||||||
|
self.db.clone()
|
||||||
|
}
|
||||||
|
|
||||||
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
|
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
|
||||||
let mut tags = vec![
|
let mut tags = vec![
|
||||||
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
|
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
|
||||||
@ -141,71 +181,95 @@ impl ZapStreamOverseer {
|
|||||||
|
|
||||||
let kind = Kind::from(STREAM_EVENT_KIND);
|
let kind = Kind::from(STREAM_EVENT_KIND);
|
||||||
let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id);
|
let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id);
|
||||||
tags.push(Tag::parse(&[
|
tags.push(Tag::parse([
|
||||||
"alt",
|
"alt",
|
||||||
&format!("Watch live on https://zap.stream/{}", coord.to_bech32()?),
|
&format!("Watch live on https://zap.stream/{}", coord.to_bech32()?),
|
||||||
])?);
|
])?);
|
||||||
Ok(EventBuilder::new(kind, "", tags))
|
Ok(EventBuilder::new(kind, "").tags(tags))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn blob_to_event_builder(&self, stream: &BlobDescriptor) -> Result<EventBuilder> {
|
fn blob_to_event_builder(&self, stream: &BlobDescriptor) -> Result<EventBuilder> {
|
||||||
let tags = if let Some(tags) = stream.nip94.as_ref() {
|
let tags = if let Some(tags) = stream.nip94.as_ref() {
|
||||||
tags.iter()
|
tags.iter()
|
||||||
.map_while(|(k, v)| Tag::parse(&[k, v]).ok())
|
.map_while(|(k, v)| Tag::parse([k, v]).ok())
|
||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
let mut tags = vec![
|
let mut tags = vec![
|
||||||
Tag::parse(&["x", &stream.sha256])?,
|
Tag::parse(["x", &stream.sha256])?,
|
||||||
Tag::parse(&["url", &stream.url])?,
|
Tag::parse(["url", &stream.url])?,
|
||||||
Tag::parse(&["size", &stream.size.to_string()])?,
|
Tag::parse(["size", &stream.size.to_string()])?,
|
||||||
];
|
];
|
||||||
if let Some(m) = stream.mime_type.as_ref() {
|
if let Some(m) = stream.mime_type.as_ref() {
|
||||||
tags.push(Tag::parse(&["m", m])?)
|
tags.push(Tag::parse(["m", m])?)
|
||||||
}
|
}
|
||||||
tags
|
tags
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(EventBuilder::new(Kind::FileMetadata, "", tags))
|
Ok(EventBuilder::new(Kind::FileMetadata, "").tags(tags))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
|
async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
|
||||||
let mut extra_tags = vec![
|
let extra_tags = vec![
|
||||||
Tag::parse(&["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
||||||
Tag::parse(&[
|
Tag::parse([
|
||||||
"streaming",
|
"streaming",
|
||||||
self.map_to_public_url(stream, "live.m3u8")?.as_str(),
|
self.map_to_stream_public_url(stream, "live.m3u8")?.as_str(),
|
||||||
])?,
|
])?,
|
||||||
Tag::parse(&[
|
Tag::parse([
|
||||||
"image",
|
"image",
|
||||||
self.map_to_public_url(stream, "thumb.webp")?.as_str(),
|
self.map_to_stream_public_url(stream, "thumb.webp")?
|
||||||
|
.as_str(),
|
||||||
])?,
|
])?,
|
||||||
|
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
|
||||||
];
|
];
|
||||||
// flag NIP94 streaming when using blossom servers
|
|
||||||
if self.blossom_servers.len() > 0 {
|
|
||||||
extra_tags.push(Tag::parse(&["streaming", "nip94"])?);
|
|
||||||
}
|
|
||||||
let ev = self
|
let ev = self
|
||||||
.stream_to_event_builder(stream)?
|
.stream_to_event_builder(stream)?
|
||||||
.add_tags(extra_tags)
|
.tags(extra_tags)
|
||||||
.sign_with_keys(&self.keys)?;
|
.sign_with_keys(&self.keys)?;
|
||||||
self.client.send_event(ev.clone()).await?;
|
self.client.send_event(ev.clone()).await?;
|
||||||
Ok(ev)
|
Ok(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_to_public_url<'a>(
|
fn map_to_stream_public_url(&self, stream: &UserStream, path: &str) -> Result<String> {
|
||||||
&self,
|
self.map_to_public_url(&format!("{}/{}", stream.id, path))
|
||||||
stream: &UserStream,
|
}
|
||||||
path: impl Into<&'a str>,
|
|
||||||
) -> Result<String> {
|
fn map_to_public_url(&self, path: &str) -> Result<String> {
|
||||||
let u: Url = self.public_url.parse()?;
|
let u: Url = self.public_url.parse()?;
|
||||||
Ok(u.join(&format!("/{}/", stream.id))?
|
Ok(u.join(path)?.to_string())
|
||||||
.join(path.into())?
|
|
||||||
.to_string())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct Endpoint {}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct AccountInfo {
|
||||||
|
pub endpoints: Vec<Endpoint>,
|
||||||
|
pub event: Event,
|
||||||
|
pub balance: u64,
|
||||||
|
}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Overseer for ZapStreamOverseer {
|
impl Overseer for ZapStreamOverseer {
|
||||||
|
async fn check_streams(&self) -> Result<()> {
|
||||||
|
let active_streams = self.db.list_live_streams().await?;
|
||||||
|
for stream in active_streams {
|
||||||
|
// check
|
||||||
|
let id = Uuid::parse_str(&stream.id)?;
|
||||||
|
info!("Checking stream is alive: {}", stream.id);
|
||||||
|
let is_active = {
|
||||||
|
let streams = self.active_streams.read().await;
|
||||||
|
streams.contains(&id)
|
||||||
|
};
|
||||||
|
if !is_active {
|
||||||
|
if let Err(e) = self.on_end(&id).await {
|
||||||
|
error!("Failed to end dead stream {}: {}", &id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn start_stream(
|
async fn start_stream(
|
||||||
&self,
|
&self,
|
||||||
connection: &ConnectionInfo,
|
connection: &ConnectionInfo,
|
||||||
@ -217,6 +281,11 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| anyhow::anyhow!("User not found"))?;
|
.ok_or_else(|| anyhow::anyhow!("User not found"))?;
|
||||||
|
|
||||||
|
let user = self.db.get_user(uid).await?;
|
||||||
|
if user.balance <= 0 {
|
||||||
|
bail!("Not enough balance");
|
||||||
|
}
|
||||||
|
|
||||||
let variants = get_default_variants(&stream_info)?;
|
let variants = get_default_variants(&stream_info)?;
|
||||||
|
|
||||||
let mut egress = vec![];
|
let mut egress = vec![];
|
||||||
@ -225,7 +294,6 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
variants: variants.iter().map(|v| v.id()).collect(),
|
variants: variants.iter().map(|v| v.id()).collect(),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let user = self.db.get_user(uid).await?;
|
|
||||||
let stream_id = Uuid::new_v4();
|
let stream_id = Uuid::new_v4();
|
||||||
// insert new stream record
|
// insert new stream record
|
||||||
let mut new_stream = UserStream {
|
let mut new_stream = UserStream {
|
||||||
@ -238,8 +306,12 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
|
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
|
||||||
new_stream.event = Some(stream_event.as_json());
|
new_stream.event = Some(stream_event.as_json());
|
||||||
|
|
||||||
|
let mut streams = self.active_streams.write().await;
|
||||||
|
streams.insert(stream_id.clone());
|
||||||
|
|
||||||
self.db.insert_stream(&new_stream).await?;
|
self.db.insert_stream(&new_stream).await?;
|
||||||
self.db.update_stream(&new_stream).await?;
|
self.db.update_stream(&new_stream).await?;
|
||||||
|
|
||||||
Ok(PipelineConfig {
|
Ok(PipelineConfig {
|
||||||
id: stream_id,
|
id: stream_id,
|
||||||
variants,
|
variants,
|
||||||
@ -247,18 +319,29 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_segment(
|
async fn on_segments(
|
||||||
&self,
|
&self,
|
||||||
pipeline_id: &Uuid,
|
pipeline_id: &Uuid,
|
||||||
variant_id: &Uuid,
|
added: &Vec<EgressSegment>,
|
||||||
index: u64,
|
deleted: &Vec<EgressSegment>,
|
||||||
duration: f32,
|
|
||||||
path: &PathBuf,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// Upload to blossom servers if configured
|
let stream = self.db.get_stream(pipeline_id).await?;
|
||||||
|
|
||||||
|
let duration = added.iter().fold(0.0, |acc, v| acc + v.duration);
|
||||||
|
let cost = self.cost * duration.round() as i64;
|
||||||
|
let bal = self
|
||||||
|
.db
|
||||||
|
.tick_stream(pipeline_id, stream.user_id, duration, cost)
|
||||||
|
.await?;
|
||||||
|
if bal <= 0 {
|
||||||
|
bail!("Not enough balance");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload to blossom servers if configured (N94)
|
||||||
let mut blobs = vec![];
|
let mut blobs = vec![];
|
||||||
|
for seg in added {
|
||||||
for b in &self.blossom_servers {
|
for b in &self.blossom_servers {
|
||||||
blobs.push(b.upload(path, &self.keys, Some("video/mp2t")).await?);
|
blobs.push(b.upload(&seg.path, &self.keys, Some("video/mp2t")).await?);
|
||||||
}
|
}
|
||||||
if let Some(blob) = blobs.first() {
|
if let Some(blob) = blobs.first() {
|
||||||
let a_tag = format!(
|
let a_tag = format!(
|
||||||
@ -267,13 +350,24 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
self.keys.public_key.to_hex(),
|
self.keys.public_key.to_hex(),
|
||||||
pipeline_id
|
pipeline_id
|
||||||
);
|
);
|
||||||
let mut n94 = self.blob_to_event_builder(blob)?.add_tags([
|
let mut n94 = self.blob_to_event_builder(blob)?.tags([
|
||||||
Tag::parse(&["a", &a_tag])?,
|
Tag::parse(["a", &a_tag])?,
|
||||||
Tag::parse(&["d", variant_id.to_string().as_str()])?,
|
Tag::parse(["d", seg.variant.to_string().as_str()])?,
|
||||||
Tag::parse(&["duration", duration.to_string().as_str()])?,
|
Tag::parse(["index", seg.idx.to_string().as_str()])?,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
// some servers add duration tag
|
||||||
|
if blob
|
||||||
|
.nip94
|
||||||
|
.as_ref()
|
||||||
|
.map(|a| a.contains_key("duration"))
|
||||||
|
.is_none()
|
||||||
|
{
|
||||||
|
n94 = n94.tag(Tag::parse(["duration", seg.duration.to_string().as_str()])?);
|
||||||
|
}
|
||||||
|
|
||||||
for b in blobs.iter().skip(1) {
|
for b in blobs.iter().skip(1) {
|
||||||
n94 = n94.add_tags(Tag::parse(&["url", &b.url]));
|
n94 = n94.tag(Tag::parse(["url", &b.url])?);
|
||||||
}
|
}
|
||||||
let n94 = n94.sign_with_keys(&self.keys)?;
|
let n94 = n94.sign_with_keys(&self.keys)?;
|
||||||
let cc = self.client.clone();
|
let cc = self.client.clone();
|
||||||
@ -284,6 +378,7 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
});
|
});
|
||||||
info!("Published N94 segment to {}", blob.url);
|
info!("Published N94 segment to {}", blob.url);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -303,6 +398,9 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
let mut stream = self.db.get_stream(pipeline_id).await?;
|
let mut stream = self.db.get_stream(pipeline_id).await?;
|
||||||
let user = self.db.get_user(stream.user_id).await?;
|
let user = self.db.get_user(stream.user_id).await?;
|
||||||
|
|
||||||
|
let mut streams = self.active_streams.write().await;
|
||||||
|
streams.remove(pipeline_id);
|
||||||
|
|
||||||
stream.state = UserStreamState::Ended;
|
stream.state = UserStreamState::Ended;
|
||||||
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
||||||
stream.event = Some(event.as_json());
|
stream.event = Some(event.as_json());
|
||||||
@ -312,3 +410,64 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
|
||||||
|
let mut vars: Vec<VariantStream> = vec![];
|
||||||
|
if let Some(video_src) = info
|
||||||
|
.streams
|
||||||
|
.iter()
|
||||||
|
.find(|c| c.stream_type == IngressStreamType::Video)
|
||||||
|
{
|
||||||
|
vars.push(VariantStream::CopyVideo(VariantMapping {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
src_index: video_src.index,
|
||||||
|
dst_index: 0,
|
||||||
|
group_id: 0,
|
||||||
|
}));
|
||||||
|
vars.push(VariantStream::Video(VideoVariant {
|
||||||
|
mapping: VariantMapping {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
src_index: video_src.index,
|
||||||
|
dst_index: 1,
|
||||||
|
group_id: 1,
|
||||||
|
},
|
||||||
|
width: 1280,
|
||||||
|
height: 720,
|
||||||
|
fps: video_src.fps,
|
||||||
|
bitrate: 3_000_000,
|
||||||
|
codec: "libx264".to_string(),
|
||||||
|
profile: 100,
|
||||||
|
level: 51,
|
||||||
|
keyframe_interval: video_src.fps as u16 * 2,
|
||||||
|
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(audio_src) = info
|
||||||
|
.streams
|
||||||
|
.iter()
|
||||||
|
.find(|c| c.stream_type == IngressStreamType::Audio)
|
||||||
|
{
|
||||||
|
vars.push(VariantStream::CopyAudio(VariantMapping {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
src_index: audio_src.index,
|
||||||
|
dst_index: 2,
|
||||||
|
group_id: 0,
|
||||||
|
}));
|
||||||
|
vars.push(VariantStream::Audio(AudioVariant {
|
||||||
|
mapping: VariantMapping {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
src_index: audio_src.index,
|
||||||
|
dst_index: 3,
|
||||||
|
group_id: 1,
|
||||||
|
},
|
||||||
|
bitrate: 192_000,
|
||||||
|
codec: "aac".to_string(),
|
||||||
|
channels: 2,
|
||||||
|
sample_rate: 48_000,
|
||||||
|
sample_fmt: "fltp".to_owned(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(vars)
|
||||||
|
}
|
@ -1,4 +1,6 @@
|
|||||||
|
use crate::overseer::ZapStreamOverseer;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Settings {
|
pub struct Settings {
|
||||||
@ -9,6 +11,9 @@ pub struct Settings {
|
|||||||
/// - rtmp://localhost:1935
|
/// - rtmp://localhost:1935
|
||||||
pub endpoints: Vec<String>,
|
pub endpoints: Vec<String>,
|
||||||
|
|
||||||
|
/// Public facing hostname that maps to [endpoints]
|
||||||
|
pub endpoints_public_hostname: String,
|
||||||
|
|
||||||
/// Where to store output (static files)
|
/// Where to store output (static files)
|
||||||
pub output_dir: String,
|
pub output_dir: String,
|
||||||
|
|
||||||
@ -18,7 +23,7 @@ pub struct Settings {
|
|||||||
/// Binding address for http server serving files from [output_dir]
|
/// Binding address for http server serving files from [output_dir]
|
||||||
pub listen_http: String,
|
pub listen_http: String,
|
||||||
|
|
||||||
/// Overseer service see [crate::overseer::Overseer] for more info
|
/// Overseer service see [Overseer] for more info
|
||||||
pub overseer: OverseerConfig,
|
pub overseer: OverseerConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,6 +49,8 @@ pub enum OverseerConfig {
|
|||||||
nsec: String,
|
nsec: String,
|
||||||
/// Blossom servers
|
/// Blossom servers
|
||||||
blossom: Option<Vec<String>>,
|
blossom: Option<Vec<String>>,
|
||||||
|
/// Cost (milli-sats) / second / variant
|
||||||
|
cost: i64,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,3 +60,33 @@ pub struct LndSettings {
|
|||||||
pub cert: String,
|
pub cert: String,
|
||||||
pub macaroon: String,
|
pub macaroon: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Settings {
|
||||||
|
pub async fn get_overseer(&self) -> anyhow::Result<Arc<ZapStreamOverseer>> {
|
||||||
|
match &self.overseer {
|
||||||
|
OverseerConfig::ZapStream {
|
||||||
|
nsec: private_key,
|
||||||
|
database,
|
||||||
|
lnd,
|
||||||
|
relays,
|
||||||
|
blossom,
|
||||||
|
cost,
|
||||||
|
} => Ok(Arc::new(
|
||||||
|
ZapStreamOverseer::new(
|
||||||
|
&self.output_dir,
|
||||||
|
&self.public_url,
|
||||||
|
private_key,
|
||||||
|
database,
|
||||||
|
lnd,
|
||||||
|
relays,
|
||||||
|
blossom,
|
||||||
|
*cost,
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
)),
|
||||||
|
_ => {
|
||||||
|
panic!("Unsupported overseer");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Before Width: | Height: | Size: 118 KiB After Width: | Height: | Size: 118 KiB |
@ -1,121 +0,0 @@
|
|||||||
use anyhow::{bail, Result};
|
|
||||||
use clap::Parser;
|
|
||||||
use config::Config;
|
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
|
|
||||||
use ffmpeg_rs_raw::{av_log_redirect, rstr};
|
|
||||||
use log::{error, info};
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use url::Url;
|
|
||||||
use warp::{cors, Filter};
|
|
||||||
#[cfg(feature = "rtmp")]
|
|
||||||
use zap_stream_core::ingress::rtmp;
|
|
||||||
#[cfg(feature = "srt")]
|
|
||||||
use zap_stream_core::ingress::srt;
|
|
||||||
#[cfg(feature = "test-pattern")]
|
|
||||||
use zap_stream_core::ingress::test;
|
|
||||||
|
|
||||||
use zap_stream_core::ingress::{file, tcp};
|
|
||||||
use zap_stream_core::overseer::Overseer;
|
|
||||||
use zap_stream_core::settings::Settings;
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
struct Args {}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<()> {
|
|
||||||
pretty_env_logger::init();
|
|
||||||
|
|
||||||
let _args = Args::parse();
|
|
||||||
|
|
||||||
unsafe {
|
|
||||||
av_log_set_callback(Some(av_log_redirect));
|
|
||||||
info!("FFMPEG version={}", rstr!(av_version_info()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let builder = Config::builder()
|
|
||||||
.add_source(config::File::with_name("config.yaml"))
|
|
||||||
.add_source(config::Environment::with_prefix("APP"))
|
|
||||||
.build()?;
|
|
||||||
|
|
||||||
let settings: Settings = builder.try_deserialize()?;
|
|
||||||
let overseer = settings.get_overseer().await?;
|
|
||||||
|
|
||||||
let mut listeners = vec![];
|
|
||||||
for e in &settings.endpoints {
|
|
||||||
match try_create_listener(e, &settings.output_dir, &overseer) {
|
|
||||||
Ok(l) => listeners.push(l),
|
|
||||||
Err(e) => error!("{}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let http_addr: SocketAddr = settings.listen_http.parse()?;
|
|
||||||
let http_dir = settings.output_dir.clone();
|
|
||||||
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
|
|
||||||
|
|
||||||
listeners.push(tokio::spawn(async move {
|
|
||||||
let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
|
|
||||||
|
|
||||||
let index_handle = warp::get()
|
|
||||||
.or(warp::path("index.html"))
|
|
||||||
.and(warp::path::end())
|
|
||||||
.map(move |_| warp::reply::html(index_html.clone()));
|
|
||||||
|
|
||||||
let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors);
|
|
||||||
|
|
||||||
warp::serve(index_handle.or(dir_handle))
|
|
||||||
.run(http_addr)
|
|
||||||
.await;
|
|
||||||
Ok(())
|
|
||||||
}));
|
|
||||||
|
|
||||||
for handle in listeners {
|
|
||||||
if let Err(e) = handle.await? {
|
|
||||||
error!("{e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info!("Server closed");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_create_listener(
|
|
||||||
u: &str,
|
|
||||||
out_dir: &str,
|
|
||||||
overseer: &Arc<dyn Overseer>,
|
|
||||||
) -> Result<JoinHandle<Result<()>>> {
|
|
||||||
let url: Url = u.parse()?;
|
|
||||||
match url.scheme() {
|
|
||||||
#[cfg(feature = "srt")]
|
|
||||||
"srt" => Ok(tokio::spawn(srt::listen(
|
|
||||||
out_dir.to_string(),
|
|
||||||
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
|
||||||
overseer.clone(),
|
|
||||||
))),
|
|
||||||
#[cfg(feature = "srt")]
|
|
||||||
"rtmp" => Ok(tokio::spawn(rtmp::listen(
|
|
||||||
out_dir.to_string(),
|
|
||||||
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
|
||||||
overseer.clone(),
|
|
||||||
))),
|
|
||||||
"tcp" => Ok(tokio::spawn(tcp::listen(
|
|
||||||
out_dir.to_string(),
|
|
||||||
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
|
||||||
overseer.clone(),
|
|
||||||
))),
|
|
||||||
"file" => Ok(tokio::spawn(file::listen(
|
|
||||||
out_dir.to_string(),
|
|
||||||
PathBuf::from(url.path()),
|
|
||||||
overseer.clone(),
|
|
||||||
))),
|
|
||||||
#[cfg(feature = "test-pattern")]
|
|
||||||
"test-pattern" => Ok(tokio::spawn(test::listen(
|
|
||||||
out_dir.to_string(),
|
|
||||||
overseer.clone(),
|
|
||||||
))),
|
|
||||||
_ => {
|
|
||||||
bail!("Unknown endpoint config: {u}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,194 +0,0 @@
|
|||||||
use crate::ingress::ConnectionInfo;
|
|
||||||
|
|
||||||
#[cfg(feature = "local-overseer")]
|
|
||||||
use crate::overseer::local::LocalOverseer;
|
|
||||||
#[cfg(feature = "webhook-overseer")]
|
|
||||||
use crate::overseer::webhook::WebhookOverseer;
|
|
||||||
#[cfg(feature = "zap-stream")]
|
|
||||||
use crate::overseer::zap_stream::ZapStreamOverseer;
|
|
||||||
use crate::pipeline::PipelineConfig;
|
|
||||||
#[cfg(any(
|
|
||||||
feature = "local-overseer",
|
|
||||||
feature = "webhook-overseer",
|
|
||||||
feature = "zap-stream"
|
|
||||||
))]
|
|
||||||
use crate::settings::OverseerConfig;
|
|
||||||
use crate::settings::Settings;
|
|
||||||
use crate::variant::audio::AudioVariant;
|
|
||||||
use crate::variant::mapping::VariantMapping;
|
|
||||||
use crate::variant::video::VideoVariant;
|
|
||||||
use crate::variant::VariantStream;
|
|
||||||
use anyhow::Result;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
|
||||||
use std::cmp::PartialEq;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[cfg(feature = "local-overseer")]
|
|
||||||
mod local;
|
|
||||||
|
|
||||||
#[cfg(feature = "webhook-overseer")]
|
|
||||||
mod webhook;
|
|
||||||
|
|
||||||
#[cfg(feature = "zap-stream")]
|
|
||||||
mod zap_stream;
|
|
||||||
|
|
||||||
/// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr
|
|
||||||
#[derive(PartialEq, Clone)]
|
|
||||||
pub struct IngressInfo {
|
|
||||||
pub bitrate: usize,
|
|
||||||
pub streams: Vec<IngressStream>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A copy of [ffmpeg_rs_raw::StreamInfo] without ptr
|
|
||||||
#[derive(PartialEq, Clone)]
|
|
||||||
pub struct IngressStream {
|
|
||||||
pub index: usize,
|
|
||||||
pub stream_type: IngressStreamType,
|
|
||||||
pub codec: isize,
|
|
||||||
pub format: isize,
|
|
||||||
pub width: usize,
|
|
||||||
pub height: usize,
|
|
||||||
pub fps: f32,
|
|
||||||
pub sample_rate: usize,
|
|
||||||
pub language: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Clone)]
|
|
||||||
pub enum IngressStreamType {
|
|
||||||
Video,
|
|
||||||
Audio,
|
|
||||||
Subtitle,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
/// The control process that oversees streaming operations
|
|
||||||
pub trait Overseer: Send + Sync {
|
|
||||||
/// Set up a new streaming pipeline
|
|
||||||
async fn start_stream(
|
|
||||||
&self,
|
|
||||||
connection: &ConnectionInfo,
|
|
||||||
stream_info: &IngressInfo,
|
|
||||||
) -> Result<PipelineConfig>;
|
|
||||||
|
|
||||||
/// A new segment (HLS etc.) was generated for a stream variant
|
|
||||||
///
|
|
||||||
/// This handler is usually used for distribution / billing
|
|
||||||
async fn on_segment(
|
|
||||||
&self,
|
|
||||||
pipeline_id: &Uuid,
|
|
||||||
variant_id: &Uuid,
|
|
||||||
index: u64,
|
|
||||||
duration: f32,
|
|
||||||
path: &PathBuf,
|
|
||||||
) -> Result<()>;
|
|
||||||
|
|
||||||
/// At a regular interval, pipeline will emit one of the frames for processing as a
|
|
||||||
/// thumbnail
|
|
||||||
async fn on_thumbnail(
|
|
||||||
&self,
|
|
||||||
pipeline_id: &Uuid,
|
|
||||||
width: usize,
|
|
||||||
height: usize,
|
|
||||||
path: &PathBuf,
|
|
||||||
) -> Result<()>;
|
|
||||||
|
|
||||||
/// Stream is finished
|
|
||||||
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Settings {
|
|
||||||
pub async fn get_overseer(&self) -> Result<Arc<dyn Overseer>> {
|
|
||||||
match &self.overseer {
|
|
||||||
#[cfg(feature = "local-overseer")]
|
|
||||||
OverseerConfig::Local => Ok(Arc::new(LocalOverseer::new())),
|
|
||||||
#[cfg(feature = "webhook-overseer")]
|
|
||||||
OverseerConfig::Webhook { url } => Ok(Arc::new(WebhookOverseer::new(&url))),
|
|
||||||
#[cfg(feature = "zap-stream")]
|
|
||||||
OverseerConfig::ZapStream {
|
|
||||||
nsec: private_key,
|
|
||||||
database,
|
|
||||||
lnd,
|
|
||||||
relays,
|
|
||||||
blossom,
|
|
||||||
} => Ok(Arc::new(
|
|
||||||
ZapStreamOverseer::new(
|
|
||||||
&self.output_dir,
|
|
||||||
&self.public_url,
|
|
||||||
private_key,
|
|
||||||
database,
|
|
||||||
lnd,
|
|
||||||
relays,
|
|
||||||
blossom,
|
|
||||||
)
|
|
||||||
.await?,
|
|
||||||
)),
|
|
||||||
_ => {
|
|
||||||
panic!("Unsupported overseer");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
|
|
||||||
let mut vars: Vec<VariantStream> = vec![];
|
|
||||||
if let Some(video_src) = info
|
|
||||||
.streams
|
|
||||||
.iter()
|
|
||||||
.find(|c| c.stream_type == IngressStreamType::Video)
|
|
||||||
{
|
|
||||||
vars.push(VariantStream::CopyVideo(VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: video_src.index,
|
|
||||||
dst_index: 0,
|
|
||||||
group_id: 0,
|
|
||||||
}));
|
|
||||||
vars.push(VariantStream::Video(VideoVariant {
|
|
||||||
mapping: VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: video_src.index,
|
|
||||||
dst_index: 1,
|
|
||||||
group_id: 1,
|
|
||||||
},
|
|
||||||
width: 1280,
|
|
||||||
height: 720,
|
|
||||||
fps: video_src.fps,
|
|
||||||
bitrate: 3_000_000,
|
|
||||||
codec: "libx264".to_string(),
|
|
||||||
profile: 100,
|
|
||||||
level: 51,
|
|
||||||
keyframe_interval: video_src.fps as u16 * 2,
|
|
||||||
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(audio_src) = info
|
|
||||||
.streams
|
|
||||||
.iter()
|
|
||||||
.find(|c| c.stream_type == IngressStreamType::Audio)
|
|
||||||
{
|
|
||||||
vars.push(VariantStream::CopyAudio(VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: audio_src.index,
|
|
||||||
dst_index: 2,
|
|
||||||
group_id: 0,
|
|
||||||
}));
|
|
||||||
vars.push(VariantStream::Audio(AudioVariant {
|
|
||||||
mapping: VariantMapping {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
src_index: audio_src.index,
|
|
||||||
dst_index: 3,
|
|
||||||
group_id: 1,
|
|
||||||
},
|
|
||||||
bitrate: 192_000,
|
|
||||||
codec: "aac".to_string(),
|
|
||||||
channels: 2,
|
|
||||||
sample_rate: 48_000,
|
|
||||||
sample_fmt: "fltp".to_owned(),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(vars)
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user