Compare commits
No commits in common. "ccb2add6073e5bb68191c42613c34f66583e34fc" and "afbc2fc8b2f3401fc6e10708adf8cf7cc00e7b4f" have entirely different histories.
ccb2add607
...
afbc2fc8b2
@ -1,3 +1,3 @@
|
||||
**/target/
|
||||
**/.git/
|
||||
**/out/
|
||||
target/
|
||||
.git/
|
||||
out/
|
@ -19,6 +19,6 @@ steps:
|
||||
- docker login -u kieran -p $TOKEN git.v0l.io
|
||||
- docker login -u voidic -p $DOCKER_TOKEN
|
||||
- docker buildx create --name mybuilder --bootstrap --use
|
||||
- docker buildx build --push --platform linux/amd64 -t git.v0l.io/kieran/zap-stream-core:latest -t voidic/zap-stream-core:latest -f ./crates/zap-stream/Dockerfile .
|
||||
- docker buildx build --push --platform linux/amd64 -t git.v0l.io/kieran/zap-stream-core:latest -t voidic/zap-stream-core:latest .
|
||||
- kill $(cat /var/run/docker.pid)
|
||||
|
||||
|
1700
Cargo.lock
generated
Normal file → Executable file
1700
Cargo.lock
generated
Normal file → Executable file
File diff suppressed because it is too large
Load Diff
75
Cargo.toml
75
Cargo.toml
@ -1,20 +1,77 @@
|
||||
[workspace]
|
||||
resolver = "2"
|
||||
members = [
|
||||
"crates/core",
|
||||
"crates/zap-stream",
|
||||
"crates/zap-stream-db"
|
||||
[package]
|
||||
name = "zap-stream-core"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "zap-stream-core"
|
||||
path = "src/bin/zap_stream_core.rs"
|
||||
|
||||
[features]
|
||||
default = ["test-pattern", "srt", "rtmp"]
|
||||
srt = ["dep:srt-tokio"]
|
||||
rtmp = ["dep:rml_rtmp"]
|
||||
local-overseer = [] # WIP
|
||||
webhook-overseer = [] # WIP
|
||||
zap-stream = [
|
||||
"dep:nostr-sdk",
|
||||
"dep:zap-stream-db",
|
||||
"dep:fedimint-tonic-lnd",
|
||||
"dep:reqwest",
|
||||
"dep:base64",
|
||||
"dep:sha2",
|
||||
"tokio/fs",
|
||||
]
|
||||
test-pattern = [
|
||||
"dep:resvg",
|
||||
"dep:usvg",
|
||||
"dep:tiny-skia",
|
||||
"dep:fontdue",
|
||||
"dep:ringbuf",
|
||||
"zap-stream-db/test-pattern"
|
||||
]
|
||||
|
||||
[workspace.dependencies]
|
||||
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" }
|
||||
[dependencies]
|
||||
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "8e102423d46c8fe7dc4dc999e4ce3fcfe6abfee0" }
|
||||
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
|
||||
anyhow = { version = "^1.0.91", features = ["backtrace"] }
|
||||
pretty_env_logger = "0.5.0"
|
||||
tokio-stream = "0.1.14"
|
||||
futures-util = "0.3.30"
|
||||
async-trait = "0.1.77"
|
||||
log = "0.4.21"
|
||||
uuid = { version = "1.8.0", features = ["v4", "serde"] }
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
config = { version = "0.14.0", features = ["yaml"] }
|
||||
url = "2.5.0"
|
||||
itertools = "0.14.0"
|
||||
itertools = "0.13.0"
|
||||
rand = "0.8.5"
|
||||
clap = { version = "4.5.16", features = ["derive"] }
|
||||
warp = "0.3.7"
|
||||
libc = "0.2.162"
|
||||
m3u8-rs = "6.0.0"
|
||||
chrono = "^0.4.38"
|
||||
hex = "0.4.3"
|
||||
|
||||
# srt
|
||||
srt-tokio = { version = "0.4.3", optional = true }
|
||||
|
||||
# rtmp
|
||||
rml_rtmp = { version = "0.8.0", optional = true }
|
||||
|
||||
# test-pattern
|
||||
resvg = { version = "0.44.0", optional = true }
|
||||
usvg = { version = "0.44.0", optional = true }
|
||||
tiny-skia = { version = "0.11.4", optional = true }
|
||||
fontdue = { version = "0.9.2", optional = true }
|
||||
ringbuf = { version = "0.4.7", optional = true }
|
||||
|
||||
# zap-stream
|
||||
zap-stream-db = { path = "zap-stream-db", optional = true }
|
||||
nostr-sdk = { version = "0.36.0", optional = true }
|
||||
fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = false, features = ["invoicesrpc", "versionrpc"] }
|
||||
reqwest = { version = "0.12.9", optional = true, features = ["stream"] }
|
||||
base64 = { version = "0.22.1", optional = true }
|
||||
sha2 = { version = "0.10.8", optional = true }
|
||||
bytes = "1.8.0"
|
||||
|
||||
|
@ -31,7 +31,7 @@ RUN git clone --single-branch --branch release/7.1 https://git.ffmpeg.org/ffmpeg
|
||||
--disable-static \
|
||||
--enable-shared && \
|
||||
make -j$(nproc) && make install
|
||||
RUN cargo install --path ./crates/zap-stream --root /app/build
|
||||
RUN cargo install --path . --bin zap-stream-core --root /app/build --features zap-stream
|
||||
|
||||
FROM $IMAGE AS runner
|
||||
WORKDIR /app
|
||||
@ -40,4 +40,4 @@ RUN apt update && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
COPY --from=build /app/build .
|
||||
COPY --from=build /app/ffmpeg/lib/ /lib
|
||||
ENTRYPOINT ["/app/bin/zap-stream"]
|
||||
ENTRYPOINT ["/app/bin/zap-stream-core"]
|
5
TODO.md
5
TODO.md
@ -1,4 +1,5 @@
|
||||
- RTMP?
|
||||
- Setup multi-variant output
|
||||
- API parity https://git.v0l.io/Kieran/zap.stream/issues/7
|
||||
- API parity
|
||||
- fMP4 instead of MPEG-TS segments
|
||||
- HLS-LL
|
||||
- Delete old segments (N94)
|
@ -5,10 +5,6 @@ endpoints:
|
||||
- "rtmp://127.0.0.1:3336"
|
||||
- "srt://127.0.0.1:3335"
|
||||
- "tcp://127.0.0.1:3334"
|
||||
- "test-pattern://"
|
||||
|
||||
# Public hostname which points to the IP address used to listen for all [endpoints]
|
||||
endpoints_public_hostname: "localhost"
|
||||
|
||||
# Output directory for recording / hls
|
||||
output_dir: "./out"
|
||||
@ -42,7 +38,6 @@ listen_http: "127.0.0.1:8080"
|
||||
#
|
||||
overseer:
|
||||
zap-stream:
|
||||
cost: 16
|
||||
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
|
||||
#blossom:
|
||||
# - "http://localhost:8881"
|
4818
crates/core/Cargo.lock
generated
4818
crates/core/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,44 +0,0 @@
|
||||
[package]
|
||||
name = "zap-stream-core"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
default = ["test-pattern", "srt", "rtmp"]
|
||||
srt = ["dep:srt-tokio"]
|
||||
rtmp = ["dep:rml_rtmp"]
|
||||
local-overseer = [] # WIP
|
||||
webhook-overseer = [] # WIP
|
||||
test-pattern = [
|
||||
"dep:resvg",
|
||||
"dep:usvg",
|
||||
"dep:tiny-skia",
|
||||
"dep:fontdue",
|
||||
"dep:ringbuf",
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
ffmpeg-rs-raw.workspace = true
|
||||
tokio.workspace = true
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
log.workspace = true
|
||||
uuid.workspace = true
|
||||
serde.workspace = true
|
||||
hex.workspace = true
|
||||
itertools.workspace = true
|
||||
futures-util = "0.3.30"
|
||||
m3u8-rs = "6.0.0"
|
||||
|
||||
# srt
|
||||
srt-tokio = { version = "0.4.3", optional = true }
|
||||
|
||||
# rtmp
|
||||
rml_rtmp = { version = "0.8.0", optional = true }
|
||||
|
||||
# test-pattern
|
||||
resvg = { version = "0.44.0", optional = true }
|
||||
usvg = { version = "0.44.0", optional = true }
|
||||
tiny-skia = { version = "0.11.4", optional = true }
|
||||
fontdue = { version = "0.9.2", optional = true }
|
||||
ringbuf = { version = "0.4.7", optional = true }
|
@ -1,144 +0,0 @@
|
||||
##
|
||||
## Default strfry config
|
||||
##
|
||||
|
||||
# Directory that contains the strfry LMDB database (restart required)
|
||||
db = "./strfry-db/"
|
||||
|
||||
dbParams {
|
||||
# Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required)
|
||||
maxreaders = 256
|
||||
|
||||
# Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used) (restart required)
|
||||
mapsize = 10995116277760
|
||||
|
||||
# Disables read-ahead when accessing the LMDB mapping. Reduces IO activity when DB size is larger than RAM. (restart required)
|
||||
noReadAhead = false
|
||||
}
|
||||
|
||||
events {
|
||||
# Maximum size of normalised JSON, in bytes
|
||||
maxEventSize = 65536
|
||||
|
||||
# Events newer than this will be rejected
|
||||
rejectEventsNewerThanSeconds = 900
|
||||
|
||||
# Events older than this will be rejected
|
||||
rejectEventsOlderThanSeconds = 94608000
|
||||
|
||||
# Ephemeral events older than this will be rejected
|
||||
rejectEphemeralEventsOlderThanSeconds = 60
|
||||
|
||||
# Ephemeral events will be deleted from the DB when older than this
|
||||
ephemeralEventsLifetimeSeconds = 300
|
||||
|
||||
# Maximum number of tags allowed
|
||||
maxNumTags = 2000
|
||||
|
||||
# Maximum size for tag values, in bytes
|
||||
maxTagValSize = 1024
|
||||
}
|
||||
|
||||
relay {
|
||||
# Interface to listen on. Use 0.0.0.0 to listen on all interfaces (restart required)
|
||||
bind = "0.0.0.0"
|
||||
|
||||
# Port to open for the nostr websocket protocol (restart required)
|
||||
port = 7777
|
||||
|
||||
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
|
||||
nofiles = 0
|
||||
|
||||
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
|
||||
realIpHeader = ""
|
||||
|
||||
info {
|
||||
# NIP-11: Name of this server. Short/descriptive (< 30 characters)
|
||||
name = "strfry default"
|
||||
|
||||
# NIP-11: Detailed information about relay, free-form
|
||||
description = "This is a strfry instance."
|
||||
|
||||
# NIP-11: Administrative nostr pubkey, for contact purposes
|
||||
pubkey = ""
|
||||
|
||||
# NIP-11: Alternative administrative contact (email, website, etc)
|
||||
contact = ""
|
||||
|
||||
# NIP-11: URL pointing to an image to be used as an icon for the relay
|
||||
icon = ""
|
||||
|
||||
# List of supported lists as JSON array, or empty string to use default. Example: "[1,2]"
|
||||
nips = ""
|
||||
}
|
||||
|
||||
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
|
||||
maxWebsocketPayloadSize = 131072
|
||||
|
||||
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
|
||||
autoPingSeconds = 55
|
||||
|
||||
# If TCP keep-alive should be enabled (detect dropped connections to upstream reverse proxy)
|
||||
enableTcpKeepalive = false
|
||||
|
||||
# How much uninterrupted CPU time a REQ query should get during its DB scan
|
||||
queryTimesliceBudgetMicroseconds = 10000
|
||||
|
||||
# Maximum records that can be returned per filter
|
||||
maxFilterLimit = 500
|
||||
|
||||
# Maximum number of subscriptions (concurrent REQs) a connection can have open at any time
|
||||
maxSubsPerConnection = 20
|
||||
|
||||
writePolicy {
|
||||
# If non-empty, path to an executable script that implements the writePolicy plugin logic
|
||||
plugin = ""
|
||||
}
|
||||
|
||||
compression {
|
||||
# Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required)
|
||||
enabled = true
|
||||
|
||||
# Maintain a sliding window buffer for each connection. Improves compression, but uses more memory (restart required)
|
||||
slidingWindow = true
|
||||
}
|
||||
|
||||
logging {
|
||||
# Dump all incoming messages
|
||||
dumpInAll = false
|
||||
|
||||
# Dump all incoming EVENT messages
|
||||
dumpInEvents = false
|
||||
|
||||
# Dump all incoming REQ/CLOSE messages
|
||||
dumpInReqs = false
|
||||
|
||||
# Log performance metrics for initial REQ database scans
|
||||
dbScanPerf = false
|
||||
|
||||
# Log reason for invalid event rejection? Can be disabled to silence excessive logging
|
||||
invalidEvents = true
|
||||
}
|
||||
|
||||
numThreads {
|
||||
# Ingester threads: route incoming requests, validate events/sigs (restart required)
|
||||
ingester = 3
|
||||
|
||||
# reqWorker threads: Handle initial DB scan for events (restart required)
|
||||
reqWorker = 3
|
||||
|
||||
# reqMonitor threads: Handle filtering of new events (restart required)
|
||||
reqMonitor = 3
|
||||
|
||||
# negentropy threads: Handle negentropy protocol messages (restart required)
|
||||
negentropy = 2
|
||||
}
|
||||
|
||||
negentropy {
|
||||
# Support negentropy protocol messages
|
||||
enabled = true
|
||||
|
||||
# Maximum records that sync will process before returning an error
|
||||
maxSyncEvents = 1000000
|
||||
}
|
||||
}
|
@ -1,80 +0,0 @@
|
||||
use crate::ingress::ConnectionInfo;
|
||||
|
||||
use crate::egress::EgressSegment;
|
||||
use crate::pipeline::PipelineConfig;
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use std::cmp::PartialEq;
|
||||
use std::path::PathBuf;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[cfg(feature = "local-overseer")]
|
||||
mod local;
|
||||
|
||||
#[cfg(feature = "webhook-overseer")]
|
||||
mod webhook;
|
||||
|
||||
/// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr
|
||||
#[derive(PartialEq, Clone)]
|
||||
pub struct IngressInfo {
|
||||
pub bitrate: usize,
|
||||
pub streams: Vec<IngressStream>,
|
||||
}
|
||||
|
||||
/// A copy of [ffmpeg_rs_raw::StreamInfo] without ptr
|
||||
#[derive(PartialEq, Clone)]
|
||||
pub struct IngressStream {
|
||||
pub index: usize,
|
||||
pub stream_type: IngressStreamType,
|
||||
pub codec: isize,
|
||||
pub format: isize,
|
||||
pub width: usize,
|
||||
pub height: usize,
|
||||
pub fps: f32,
|
||||
pub sample_rate: usize,
|
||||
pub language: String,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone)]
|
||||
pub enum IngressStreamType {
|
||||
Video,
|
||||
Audio,
|
||||
Subtitle,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
/// The control process that oversees streaming operations
|
||||
pub trait Overseer: Send + Sync {
|
||||
/// Check all streams
|
||||
async fn check_streams(&self) -> Result<()>;
|
||||
|
||||
/// Set up a new streaming pipeline
|
||||
async fn start_stream(
|
||||
&self,
|
||||
connection: &ConnectionInfo,
|
||||
stream_info: &IngressInfo,
|
||||
) -> Result<PipelineConfig>;
|
||||
|
||||
/// A new segment(s) (HLS etc.) was generated for a stream variant
|
||||
///
|
||||
/// This handler is usually used for distribution / billing
|
||||
async fn on_segments(
|
||||
&self,
|
||||
pipeline_id: &Uuid,
|
||||
added: &Vec<EgressSegment>,
|
||||
deleted: &Vec<EgressSegment>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// At a regular interval, pipeline will emit one of the frames for processing as a
|
||||
/// thumbnail
|
||||
async fn on_thumbnail(
|
||||
&self,
|
||||
pipeline_id: &Uuid,
|
||||
width: usize,
|
||||
height: usize,
|
||||
path: &PathBuf,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Stream is finished
|
||||
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>;
|
||||
}
|
7
crates/zap-stream/Cargo.lock
generated
7
crates/zap-stream/Cargo.lock
generated
@ -1,7 +0,0 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "zap-stream"
|
||||
version = "0.1.0"
|
@ -1,43 +0,0 @@
|
||||
[package]
|
||||
name = "zap-stream"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
default = ["srt", "rtmp", "test-pattern"]
|
||||
srt = ["zap-stream-core/srt"]
|
||||
rtmp = ["zap-stream-core/rtmp"]
|
||||
test-pattern = ["zap-stream-core/test-pattern", "zap-stream-db/test-pattern"]
|
||||
|
||||
[dependencies]
|
||||
zap-stream-db = { path = "../zap-stream-db" }
|
||||
zap-stream-core = { path = "../core" }
|
||||
|
||||
uuid.workspace = true
|
||||
ffmpeg-rs-raw.workspace = true
|
||||
anyhow.workspace = true
|
||||
log.workspace = true
|
||||
tokio.workspace = true
|
||||
async-trait.workspace = true
|
||||
serde.workspace = true
|
||||
chrono.workspace = true
|
||||
hex.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
# http setuff
|
||||
hyper = { version = "1.5.1", features = ["server"] }
|
||||
bytes = "1.8.0"
|
||||
http-body-util = "0.1.2"
|
||||
tokio-util = "0.7.13"
|
||||
hyper-util = "0.1.10"
|
||||
|
||||
# direct deps
|
||||
config = { version = "0.15.6", features = ["yaml"] }
|
||||
nostr-sdk = { version = "0.38.0" }
|
||||
fedimint-tonic-lnd = { version = "0.2.0", default-features = false, features = ["invoicesrpc", "versionrpc"] }
|
||||
reqwest = { version = "0.12.9", features = ["stream", "json"] }
|
||||
base64 = { version = "0.22.1" }
|
||||
sha2 = { version = "0.10.8" }
|
||||
pretty_env_logger = "0.5.0"
|
||||
clap = { version = "4.5.16", features = ["derive"] }
|
||||
futures-util = "0.3.31"
|
@ -1,2 +0,0 @@
|
||||
create database route96;
|
||||
create database zap_stream;
|
@ -1,5 +0,0 @@
|
||||
listen: "0.0.0.0:8000"
|
||||
database: "mysql://root:root@db:3306/route96"
|
||||
storage_dir: "./data"
|
||||
max_upload_bytes: 5e+9
|
||||
public_url: "http://localhost:8881"
|
@ -1,204 +0,0 @@
|
||||
use crate::http::check_nip98_auth;
|
||||
use crate::settings::Settings;
|
||||
use crate::ListenerEndpoint;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use bytes::Bytes;
|
||||
use fedimint_tonic_lnd::tonic::codegen::Body;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::{Method, Request, Response};
|
||||
use nostr_sdk::{serde_json, Event, PublicKey};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::SocketAddr;
|
||||
use std::str::FromStr;
|
||||
use url::Url;
|
||||
use zap_stream_db::ZapStreamDb;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Api {
|
||||
db: ZapStreamDb,
|
||||
settings: Settings,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
pub fn new(db: ZapStreamDb, settings: Settings) -> Self {
|
||||
Self { db, settings }
|
||||
}
|
||||
|
||||
pub async fn handler(
|
||||
self,
|
||||
req: Request<Incoming>,
|
||||
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||
let base = Response::builder()
|
||||
.header("server", "zap-stream")
|
||||
.header("access-control-allow-origin", "*")
|
||||
.header("access-control-allow-headers", "*")
|
||||
.header("access-control-allow-methods", "HEAD, GET");
|
||||
|
||||
Ok(match (req.method(), req.uri().path()) {
|
||||
(&Method::GET, "/api/v1/account") => {
|
||||
let auth = check_nip98_auth(&req)?;
|
||||
let rsp = self.get_account(&auth.pubkey).await?;
|
||||
return Ok(base.body(Self::body_json(&rsp)?)?);
|
||||
}
|
||||
(&Method::PATCH, "/api/v1/account") => {
|
||||
let auth = check_nip98_auth(&req)?;
|
||||
let body = req.collect().await?.to_bytes();
|
||||
let r_body: PatchAccount = serde_json::from_slice(&body)?;
|
||||
let rsp = self.update_account(&auth.pubkey, r_body).await?;
|
||||
return Ok(base.body(Self::body_json(&rsp)?)?);
|
||||
}
|
||||
(&Method::GET, "/api/v1/topup") => {
|
||||
let auth = check_nip98_auth(&req)?;
|
||||
let url: Url = req.uri().to_string().parse()?;
|
||||
let amount: usize = url
|
||||
.query_pairs()
|
||||
.find_map(|(k, v)| if k == "amount" { Some(v) } else { None })
|
||||
.and_then(|v| v.parse().ok())
|
||||
.ok_or(anyhow!("Missing amount"))?;
|
||||
let rsp = self.topup(&auth.pubkey, amount).await?;
|
||||
return Ok(base.body(Self::body_json(&rsp)?)?);
|
||||
}
|
||||
(&Method::PATCH, "/api/v1/event") => {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
(&Method::POST, "/api/v1/withdraw") => {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
(&Method::POST, "/api/v1/account/forward") => {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
(&Method::DELETE, "/api/v1/account/forward/<id>") => {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
(&Method::GET, "/api/v1/account/history") => {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
(&Method::GET, "/api/v1/account/keys") => {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
_ => {
|
||||
if req.method() == Method::OPTIONS {
|
||||
base.body(Default::default())?
|
||||
} else {
|
||||
base.status(404).body(Default::default())?
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn body_json<T: Serialize>(obj: &T) -> Result<BoxBody<Bytes, anyhow::Error>> {
|
||||
Ok(Full::from(serde_json::to_string(obj)?)
|
||||
.map_err(|e| match e {})
|
||||
.boxed())
|
||||
}
|
||||
|
||||
async fn get_account(&self, pubkey: &PublicKey) -> Result<AccountInfo> {
|
||||
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
|
||||
let user = self.db.get_user(uid).await?;
|
||||
|
||||
Ok(AccountInfo {
|
||||
endpoints: self
|
||||
.settings
|
||||
.endpoints
|
||||
.iter()
|
||||
.filter_map(|e| match ListenerEndpoint::from_str(&e).ok()? {
|
||||
ListenerEndpoint::SRT { endpoint } => {
|
||||
let addr: SocketAddr = endpoint.parse().ok()?;
|
||||
Some(Endpoint {
|
||||
name: "SRT".to_string(),
|
||||
url: format!(
|
||||
"srt://{}:{}",
|
||||
self.settings.endpoints_public_hostname,
|
||||
addr.port()
|
||||
),
|
||||
key: user.stream_key.clone(),
|
||||
capabilities: vec![],
|
||||
})
|
||||
}
|
||||
ListenerEndpoint::RTMP { endpoint } => {
|
||||
let addr: SocketAddr = endpoint.parse().ok()?;
|
||||
Some(Endpoint {
|
||||
name: "RTMP".to_string(),
|
||||
url: format!(
|
||||
"rtmp://{}:{}",
|
||||
self.settings.endpoints_public_hostname,
|
||||
addr.port()
|
||||
),
|
||||
key: user.stream_key.clone(),
|
||||
capabilities: vec![],
|
||||
})
|
||||
}
|
||||
ListenerEndpoint::TCP { endpoint } => {
|
||||
let addr: SocketAddr = endpoint.parse().ok()?;
|
||||
Some(Endpoint {
|
||||
name: "TCP".to_string(),
|
||||
url: format!(
|
||||
"tcp://{}:{}",
|
||||
self.settings.endpoints_public_hostname,
|
||||
addr.port()
|
||||
),
|
||||
key: user.stream_key.clone(),
|
||||
capabilities: vec![],
|
||||
})
|
||||
}
|
||||
ListenerEndpoint::File { .. } => None,
|
||||
ListenerEndpoint::TestPattern => None,
|
||||
})
|
||||
.collect(),
|
||||
event: None,
|
||||
balance: user.balance as u64,
|
||||
tos: AccountTos {
|
||||
accepted: user.tos_accepted.is_some(),
|
||||
link: "https://zap.stream/tos".to_string(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
async fn update_account(&self, pubkey: &PublicKey, account: PatchAccount) -> Result<()> {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
|
||||
async fn topup(&self, pubkey: &PublicKey, amount: usize) -> Result<TopupResponse> {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct AccountInfo {
|
||||
pub endpoints: Vec<Endpoint>,
|
||||
pub event: Option<Event>,
|
||||
pub balance: u64,
|
||||
pub tos: AccountTos,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct Endpoint {
|
||||
pub name: String,
|
||||
pub url: String,
|
||||
pub key: String,
|
||||
pub capabilities: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct EndpointCost {
|
||||
pub unit: String,
|
||||
pub rate: u16,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct AccountTos {
|
||||
pub accepted: bool,
|
||||
pub link: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct PatchAccount {
|
||||
pub accept_tos: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct TopupResponse {
|
||||
pub pr: String,
|
||||
}
|
@ -1,118 +0,0 @@
|
||||
use crate::api::Api;
|
||||
use crate::overseer::ZapStreamOverseer;
|
||||
use anyhow::{bail, Result};
|
||||
use base64::Engine;
|
||||
use bytes::Bytes;
|
||||
use futures_util::TryStreamExt;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full, StreamBody};
|
||||
use hyper::body::{Frame, Incoming};
|
||||
use hyper::service::Service;
|
||||
use hyper::{Method, Request, Response};
|
||||
use log::{error, info};
|
||||
use nostr_sdk::{serde_json, Event};
|
||||
use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs::File;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use zap_stream_core::overseer::Overseer;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpServer {
|
||||
index: String,
|
||||
files_dir: PathBuf,
|
||||
api: Api,
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn new(index: String, files_dir: PathBuf, api: Api) -> Self {
|
||||
Self {
|
||||
index,
|
||||
files_dir,
|
||||
api,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<Request<Incoming>> for HttpServer {
|
||||
type Response = Response<BoxBody<Bytes, Self::Error>>;
|
||||
type Error = anyhow::Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
||||
// check is index.html
|
||||
if req.method() == Method::GET && req.uri().path() == "/"
|
||||
|| req.uri().path() == "/index.html"
|
||||
{
|
||||
let index = self.index.clone();
|
||||
return Box::pin(async move {
|
||||
Ok(Response::builder()
|
||||
.header("content-type", "text/html")
|
||||
.header("server", "zap-stream-core")
|
||||
.body(
|
||||
Full::new(Bytes::from(index))
|
||||
.map_err(|e| match e {})
|
||||
.boxed(),
|
||||
)?)
|
||||
});
|
||||
}
|
||||
|
||||
// check if mapped to file
|
||||
let mut dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
||||
if dst_path.exists() {
|
||||
return Box::pin(async move {
|
||||
let mut rsp = Response::builder()
|
||||
.header("server", "zap-stream-core")
|
||||
.header("access-control-allow-origin", "*")
|
||||
.header("access-control-allow-headers", "*")
|
||||
.header("access-control-allow-methods", "HEAD, GET");
|
||||
|
||||
if req.method() == Method::HEAD {
|
||||
return Ok(rsp.body(BoxBody::default())?);
|
||||
}
|
||||
let f = File::open(&dst_path).await?;
|
||||
let f_stream = ReaderStream::new(f);
|
||||
let body = StreamBody::new(
|
||||
f_stream
|
||||
.map_ok(Frame::data)
|
||||
.map_err(|e| Self::Error::new(e)),
|
||||
)
|
||||
.boxed();
|
||||
Ok(rsp.body(body)?)
|
||||
});
|
||||
}
|
||||
|
||||
// otherwise handle in overseer
|
||||
let mut api = self.api.clone();
|
||||
Box::pin(async move {
|
||||
match api.handler(req).await {
|
||||
Ok(res) => Ok(res),
|
||||
Err(e) => {
|
||||
error!("{}", e);
|
||||
Ok(Response::builder().status(500).body(BoxBody::default())?)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check_nip98_auth(req: &Request<Incoming>) -> Result<Event> {
|
||||
let auth = if let Some(a) = req.headers().get("authorization") {
|
||||
a.to_str()?
|
||||
} else {
|
||||
bail!("Authorization header missing");
|
||||
};
|
||||
|
||||
if !auth.starts_with("Nostr ") {
|
||||
bail!("Invalid authorization scheme");
|
||||
}
|
||||
|
||||
let json =
|
||||
String::from_utf8(base64::engine::general_purpose::STANDARD.decode(auth[6..].as_bytes())?)?;
|
||||
info!("{}", json);
|
||||
|
||||
// TODO: check tags
|
||||
Ok(serde_json::from_str::<Event>(&json)?)
|
||||
}
|
@ -1,183 +0,0 @@
|
||||
use anyhow::{bail, Result};
|
||||
use clap::Parser;
|
||||
use config::Config;
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
|
||||
use ffmpeg_rs_raw::{av_log_redirect, rstr};
|
||||
use hyper::server::conn::http1;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use log::{error, info};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::sleep;
|
||||
use url::Url;
|
||||
#[cfg(feature = "rtmp")]
|
||||
use zap_stream_core::ingress::rtmp;
|
||||
#[cfg(feature = "srt")]
|
||||
use zap_stream_core::ingress::srt;
|
||||
#[cfg(feature = "test-pattern")]
|
||||
use zap_stream_core::ingress::test;
|
||||
|
||||
use crate::api::Api;
|
||||
use crate::http::HttpServer;
|
||||
use crate::monitor::BackgroundMonitor;
|
||||
use crate::overseer::ZapStreamOverseer;
|
||||
use crate::settings::Settings;
|
||||
use zap_stream_core::ingress::{file, tcp};
|
||||
use zap_stream_core::overseer::Overseer;
|
||||
|
||||
mod api;
|
||||
mod blossom;
|
||||
mod http;
|
||||
mod monitor;
|
||||
mod overseer;
|
||||
mod settings;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
struct Args {}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let _args = Args::parse();
|
||||
|
||||
unsafe {
|
||||
av_log_set_callback(Some(av_log_redirect));
|
||||
info!("FFMPEG version={}", rstr!(av_version_info()));
|
||||
}
|
||||
|
||||
let builder = Config::builder()
|
||||
.add_source(config::File::with_name("config.yaml"))
|
||||
.add_source(config::Environment::with_prefix("APP"))
|
||||
.build()?;
|
||||
|
||||
let settings: Settings = builder.try_deserialize()?;
|
||||
let overseer = settings.get_overseer().await?;
|
||||
|
||||
// Create ingress listeners
|
||||
let mut tasks = vec![];
|
||||
for e in &settings.endpoints {
|
||||
match try_create_listener(e, &settings.output_dir, &overseer) {
|
||||
Ok(l) => tasks.push(l),
|
||||
Err(e) => error!("{}", e),
|
||||
}
|
||||
}
|
||||
|
||||
let http_addr: SocketAddr = settings.listen_http.parse()?;
|
||||
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
|
||||
|
||||
let api = Api::new(overseer.database(), settings.clone());
|
||||
// HTTP server
|
||||
let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api);
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let listener = TcpListener::bind(&http_addr).await?;
|
||||
|
||||
loop {
|
||||
let (socket, _) = listener.accept().await?;
|
||||
let io = TokioIo::new(socket);
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = http1::Builder::new().serve_connection(io, server).await {
|
||||
error!("Failed to handle request: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}));
|
||||
|
||||
// Background worker
|
||||
let mut bg = BackgroundMonitor::new(overseer.clone());
|
||||
tasks.push(tokio::spawn(async move {
|
||||
loop {
|
||||
if let Err(e) = bg.check().await {
|
||||
error!("{}", e);
|
||||
}
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}));
|
||||
|
||||
// Join tasks and get errors
|
||||
for handle in tasks {
|
||||
if let Err(e) = handle.await? {
|
||||
error!("{e}");
|
||||
}
|
||||
}
|
||||
info!("Server closed");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub enum ListenerEndpoint {
|
||||
SRT { endpoint: String },
|
||||
RTMP { endpoint: String },
|
||||
TCP { endpoint: String },
|
||||
File { path: PathBuf },
|
||||
TestPattern,
|
||||
}
|
||||
|
||||
impl FromStr for ListenerEndpoint {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
let url: Url = s.parse()?;
|
||||
match url.scheme() {
|
||||
"srt" => Ok(Self::SRT {
|
||||
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||
}),
|
||||
"rtmp" => Ok(Self::RTMP {
|
||||
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||
}),
|
||||
"tcp" => Ok(Self::TCP {
|
||||
endpoint: format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||
}),
|
||||
"file" => Ok(Self::File {
|
||||
path: PathBuf::from(url.path()),
|
||||
}),
|
||||
"test-pattern" => Ok(Self::TestPattern),
|
||||
_ => bail!("Unsupported endpoint scheme: {}", url.scheme()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_create_listener(
|
||||
u: &str,
|
||||
out_dir: &str,
|
||||
overseer: &Arc<ZapStreamOverseer>,
|
||||
) -> Result<JoinHandle<Result<()>>> {
|
||||
let ep = ListenerEndpoint::from_str(u)?;
|
||||
match ep {
|
||||
#[cfg(feature = "srt")]
|
||||
ListenerEndpoint::SRT { endpoint } => Ok(tokio::spawn(srt::listen(
|
||||
out_dir.to_string(),
|
||||
endpoint,
|
||||
overseer.clone(),
|
||||
))),
|
||||
#[cfg(feature = "rtmp")]
|
||||
ListenerEndpoint::RTMP { endpoint } => Ok(tokio::spawn(rtmp::listen(
|
||||
out_dir.to_string(),
|
||||
endpoint,
|
||||
overseer.clone(),
|
||||
))),
|
||||
ListenerEndpoint::TCP { endpoint } => Ok(tokio::spawn(tcp::listen(
|
||||
out_dir.to_string(),
|
||||
endpoint,
|
||||
overseer.clone(),
|
||||
))),
|
||||
ListenerEndpoint::File { path } => Ok(tokio::spawn(file::listen(
|
||||
out_dir.to_string(),
|
||||
path,
|
||||
overseer.clone(),
|
||||
))),
|
||||
#[cfg(feature = "test-pattern")]
|
||||
ListenerEndpoint::TestPattern => Ok(tokio::spawn(test::listen(
|
||||
out_dir.to_string(),
|
||||
overseer.clone(),
|
||||
))),
|
||||
_ => {
|
||||
bail!("Unknown endpoint config: {u}");
|
||||
}
|
||||
}
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
use crate::overseer::ZapStreamOverseer;
|
||||
use anyhow::Result;
|
||||
use std::sync::Arc;
|
||||
use zap_stream_core::overseer::Overseer;
|
||||
|
||||
/// Monitor stream status, perform any necessary cleanup
|
||||
pub struct BackgroundMonitor {
|
||||
overseer: Arc<ZapStreamOverseer>,
|
||||
}
|
||||
|
||||
impl BackgroundMonitor {
|
||||
pub fn new(overseer: Arc<ZapStreamOverseer>) -> Self {
|
||||
Self { overseer }
|
||||
}
|
||||
|
||||
pub async fn check(&mut self) -> Result<()> {
|
||||
self.overseer.check_streams().await
|
||||
}
|
||||
}
|
@ -47,7 +47,7 @@ relay {
|
||||
port = 7777
|
||||
|
||||
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
|
||||
nofiles = 0
|
||||
nofiles = 1000000
|
||||
|
||||
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
|
||||
realIpHeader = ""
|
@ -18,14 +18,14 @@ services:
|
||||
blossom:
|
||||
depends_on:
|
||||
- db
|
||||
image: voidic/route96:latest
|
||||
image: voidic/route96
|
||||
environment:
|
||||
- "RUST_LOG=info"
|
||||
ports:
|
||||
- "8881:8000"
|
||||
volumes:
|
||||
- "blossom:/app/data"
|
||||
- "./dev-setup/route96.yaml:/app/config.yaml"
|
||||
- "./dev-setup/route96.toml:/app/config.toml"
|
||||
volumes:
|
||||
db:
|
||||
blossom:
|
121
src/bin/zap_stream_core.rs
Normal file
121
src/bin/zap_stream_core.rs
Normal file
@ -0,0 +1,121 @@
|
||||
use anyhow::{bail, Result};
|
||||
use clap::Parser;
|
||||
use config::Config;
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info};
|
||||
use ffmpeg_rs_raw::{av_log_redirect, rstr};
|
||||
use log::{error, info};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
use url::Url;
|
||||
use warp::{cors, Filter};
|
||||
#[cfg(feature = "rtmp")]
|
||||
use zap_stream_core::ingress::rtmp;
|
||||
#[cfg(feature = "srt")]
|
||||
use zap_stream_core::ingress::srt;
|
||||
#[cfg(feature = "test-pattern")]
|
||||
use zap_stream_core::ingress::test;
|
||||
|
||||
use zap_stream_core::ingress::{file, tcp};
|
||||
use zap_stream_core::overseer::Overseer;
|
||||
use zap_stream_core::settings::Settings;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
struct Args {}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let _args = Args::parse();
|
||||
|
||||
unsafe {
|
||||
av_log_set_callback(Some(av_log_redirect));
|
||||
info!("FFMPEG version={}", rstr!(av_version_info()));
|
||||
}
|
||||
|
||||
let builder = Config::builder()
|
||||
.add_source(config::File::with_name("config.yaml"))
|
||||
.add_source(config::Environment::with_prefix("APP"))
|
||||
.build()?;
|
||||
|
||||
let settings: Settings = builder.try_deserialize()?;
|
||||
let overseer = settings.get_overseer().await?;
|
||||
|
||||
let mut listeners = vec![];
|
||||
for e in &settings.endpoints {
|
||||
match try_create_listener(e, &settings.output_dir, &overseer) {
|
||||
Ok(l) => listeners.push(l),
|
||||
Err(e) => error!("{}", e),
|
||||
}
|
||||
}
|
||||
|
||||
let http_addr: SocketAddr = settings.listen_http.parse()?;
|
||||
let http_dir = settings.output_dir.clone();
|
||||
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
|
||||
|
||||
listeners.push(tokio::spawn(async move {
|
||||
let cors = cors().allow_any_origin().allow_methods(vec!["GET"]);
|
||||
|
||||
let index_handle = warp::get()
|
||||
.or(warp::path("index.html"))
|
||||
.and(warp::path::end())
|
||||
.map(move |_| warp::reply::html(index_html.clone()));
|
||||
|
||||
let dir_handle = warp::get().and(warp::fs::dir(http_dir)).with(cors);
|
||||
|
||||
warp::serve(index_handle.or(dir_handle))
|
||||
.run(http_addr)
|
||||
.await;
|
||||
Ok(())
|
||||
}));
|
||||
|
||||
for handle in listeners {
|
||||
if let Err(e) = handle.await? {
|
||||
error!("{e}");
|
||||
}
|
||||
}
|
||||
info!("Server closed");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_create_listener(
|
||||
u: &str,
|
||||
out_dir: &str,
|
||||
overseer: &Arc<dyn Overseer>,
|
||||
) -> Result<JoinHandle<Result<()>>> {
|
||||
let url: Url = u.parse()?;
|
||||
match url.scheme() {
|
||||
#[cfg(feature = "srt")]
|
||||
"srt" => Ok(tokio::spawn(srt::listen(
|
||||
out_dir.to_string(),
|
||||
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||
overseer.clone(),
|
||||
))),
|
||||
#[cfg(feature = "srt")]
|
||||
"rtmp" => Ok(tokio::spawn(rtmp::listen(
|
||||
out_dir.to_string(),
|
||||
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||
overseer.clone(),
|
||||
))),
|
||||
"tcp" => Ok(tokio::spawn(tcp::listen(
|
||||
out_dir.to_string(),
|
||||
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
|
||||
overseer.clone(),
|
||||
))),
|
||||
"file" => Ok(tokio::spawn(file::listen(
|
||||
out_dir.to_string(),
|
||||
PathBuf::from(url.path()),
|
||||
overseer.clone(),
|
||||
))),
|
||||
#[cfg(feature = "test-pattern")]
|
||||
"test-pattern" => Ok(tokio::spawn(test::listen(
|
||||
out_dir.to_string(),
|
||||
overseer.clone(),
|
||||
))),
|
||||
_ => {
|
||||
bail!("Unknown endpoint config: {u}");
|
||||
}
|
||||
}
|
||||
}
|
@ -59,11 +59,15 @@ impl Blossom {
|
||||
) -> Result<BlobDescriptor> {
|
||||
let mut f = File::open(from_file).await?;
|
||||
let hash = Self::hash_file(&mut f).await?;
|
||||
let auth_event = EventBuilder::new(Kind::Custom(24242), "Upload blob").tags([
|
||||
Tag::hashtag("upload"),
|
||||
Tag::parse(["x", &hash])?,
|
||||
Tag::expiration(Timestamp::now().add(60)),
|
||||
]);
|
||||
let auth_event = EventBuilder::new(
|
||||
Kind::Custom(24242),
|
||||
"Upload blob",
|
||||
[
|
||||
Tag::hashtag("upload"),
|
||||
Tag::parse(&["x", &hash])?,
|
||||
Tag::expiration(Timestamp::now().add(60)),
|
||||
],
|
||||
);
|
||||
|
||||
let auth_event = auth_event.sign_with_keys(keys)?;
|
||||
|
@ -14,7 +14,11 @@ impl Egress for HlsMuxer {
|
||||
packet: *mut AVPacket,
|
||||
variant: &Uuid,
|
||||
) -> Result<EgressResult> {
|
||||
self.mux_packet(packet, variant)
|
||||
if let Some(ns) = self.mux_packet(packet, variant)? {
|
||||
Ok(EgressResult::NewSegment(ns))
|
||||
} else {
|
||||
Ok(EgressResult::None)
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn reset(&mut self) -> Result<()> {
|
@ -25,16 +25,13 @@ pub trait Egress {
|
||||
pub enum EgressResult {
|
||||
/// Nothing to report
|
||||
None,
|
||||
/// Egress created/deleted some segments
|
||||
Segments {
|
||||
created: Vec<EgressSegment>,
|
||||
deleted: Vec<EgressSegment>,
|
||||
},
|
||||
/// A new segment was created
|
||||
NewSegment(NewSegment),
|
||||
}
|
||||
|
||||
/// Basic details of new segment created by a muxer
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EgressSegment {
|
||||
pub struct NewSegment {
|
||||
/// The id of the variant (video or audio)
|
||||
pub variant: Uuid,
|
||||
/// Segment index
|
@ -65,6 +65,6 @@ impl Egress for RecorderEgress {
|
||||
}
|
||||
|
||||
unsafe fn reset(&mut self) -> Result<()> {
|
||||
self.muxer.close()
|
||||
self.muxer.reset()
|
||||
}
|
||||
}
|
@ -1,6 +1,9 @@
|
||||
#[cfg(feature = "zap-stream")]
|
||||
pub mod blossom;
|
||||
pub mod egress;
|
||||
pub mod ingress;
|
||||
pub mod mux;
|
||||
pub mod overseer;
|
||||
pub mod pipeline;
|
||||
pub mod settings;
|
||||
pub mod variant;
|
@ -1,4 +1,4 @@
|
||||
use crate::egress::{EgressResult, EgressSegment};
|
||||
use crate::egress::NewSegment;
|
||||
use crate::variant::{StreamMapping, VariantStream};
|
||||
use anyhow::{bail, Result};
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
|
||||
@ -79,8 +79,6 @@ pub struct HlsVariant {
|
||||
pub streams: Vec<HlsVariantStream>,
|
||||
/// Segment length in seconds
|
||||
pub segment_length: f32,
|
||||
/// Total number of segments to store for this variant
|
||||
pub segment_window: Option<u16>,
|
||||
/// Current segment index
|
||||
pub idx: u64,
|
||||
/// Current segment start time in seconds (duration)
|
||||
@ -93,24 +91,20 @@ pub struct HlsVariant {
|
||||
pub segment_type: SegmentType,
|
||||
}
|
||||
|
||||
struct SegmentInfo {
|
||||
pub index: u64,
|
||||
pub duration: f32,
|
||||
pub kind: SegmentType,
|
||||
}
|
||||
struct SegmentInfo(u64, f32, SegmentType);
|
||||
|
||||
impl SegmentInfo {
|
||||
fn to_media_segment(&self) -> MediaSegment {
|
||||
MediaSegment {
|
||||
uri: self.filename(),
|
||||
duration: self.duration,
|
||||
duration: self.1,
|
||||
title: None,
|
||||
..MediaSegment::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn filename(&self) -> String {
|
||||
HlsVariant::segment_name(self.kind, self.index)
|
||||
HlsVariant::segment_name(self.2, self.0)
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,16 +175,11 @@ impl HlsVariant {
|
||||
Ok(Self {
|
||||
name: name.clone(),
|
||||
segment_length,
|
||||
segment_window: Some(10), //TODO: configure window
|
||||
mux,
|
||||
streams,
|
||||
idx: 1,
|
||||
pkt_start: 0.0,
|
||||
segments: Vec::from([SegmentInfo {
|
||||
index: 1,
|
||||
duration: segment_length,
|
||||
kind: segment_type,
|
||||
}]),
|
||||
segments: Vec::from([SegmentInfo(1, segment_length, segment_type)]),
|
||||
out_dir: out_dir.to_string(),
|
||||
segment_type,
|
||||
})
|
||||
@ -216,32 +205,31 @@ impl HlsVariant {
|
||||
}
|
||||
|
||||
/// Mux a packet created by the encoder for this variant
|
||||
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<EgressResult> {
|
||||
pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result<Option<NewSegment>> {
|
||||
let pkt_q = av_q2d((*pkt).time_base);
|
||||
// time of this packet in seconds
|
||||
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
|
||||
// what segment this pkt should be in (index)
|
||||
let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64;
|
||||
|
||||
let mut result = EgressResult::None;
|
||||
let mut result = None;
|
||||
let pkt_stream = *(*self.mux.context())
|
||||
.streams
|
||||
.add((*pkt).stream_index as usize);
|
||||
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
|
||||
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
|
||||
if pkt_seg != self.idx && can_split {
|
||||
result = self.split_next_seg(pkt_time)?;
|
||||
result = Some(self.split_next_seg(pkt_time)?);
|
||||
}
|
||||
self.mux.write_packet(pkt)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub unsafe fn reset(&mut self) -> Result<()> {
|
||||
self.mux.close()
|
||||
self.mux.reset()
|
||||
}
|
||||
|
||||
/// Reset the muxer state and start the next segment
|
||||
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<EgressResult> {
|
||||
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<NewSegment> {
|
||||
self.idx += 1;
|
||||
|
||||
// Manually reset muxer avio
|
||||
@ -269,40 +257,19 @@ impl HlsVariant {
|
||||
|
||||
let duration = pkt_time - self.pkt_start;
|
||||
info!("Writing segment {} [{}s]", &next_seg_url, duration);
|
||||
if let Err(e) = self.push_segment(self.idx, duration) {
|
||||
if let Err(e) = self.add_segment(self.idx, duration) {
|
||||
warn!("Failed to update playlist: {}", e);
|
||||
}
|
||||
|
||||
/// Get the video variant for this group
|
||||
/// since this could actually be audio which would not be useful for
|
||||
/// [Overseer] impl
|
||||
let video_var_id = self
|
||||
.video_stream()
|
||||
.unwrap_or(self.streams.first().unwrap())
|
||||
.id()
|
||||
.clone();
|
||||
|
||||
// cleanup old segments
|
||||
let deleted = self
|
||||
.clean_segments()?
|
||||
.into_iter()
|
||||
.map(|seg| EgressSegment {
|
||||
variant: video_var_id,
|
||||
idx: seg.index,
|
||||
duration: seg.duration,
|
||||
path: PathBuf::from(Self::map_segment_path(
|
||||
&self.out_dir,
|
||||
&self.name,
|
||||
seg.index,
|
||||
self.segment_type,
|
||||
)),
|
||||
})
|
||||
.collect();
|
||||
let video_var = self.video_stream().unwrap_or(self.streams.first().unwrap());
|
||||
|
||||
// emit result of the previously completed segment,
|
||||
let prev_seg = self.idx - 1;
|
||||
let created = EgressSegment {
|
||||
variant: video_var_id,
|
||||
let ret = NewSegment {
|
||||
variant: *video_var.id(),
|
||||
idx: prev_seg,
|
||||
duration,
|
||||
path: PathBuf::from(Self::map_segment_path(
|
||||
@ -313,10 +280,7 @@ impl HlsVariant {
|
||||
)),
|
||||
};
|
||||
self.pkt_start = pkt_time;
|
||||
Ok(EgressResult::Segments {
|
||||
created: vec![created],
|
||||
deleted,
|
||||
})
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn video_stream(&self) -> Option<&HlsVariantStream> {
|
||||
@ -325,39 +289,22 @@ impl HlsVariant {
|
||||
.find(|a| matches!(*a, HlsVariantStream::Video { .. }))
|
||||
}
|
||||
|
||||
/// Add a new segment to the variant and return a list of deleted segments
|
||||
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
|
||||
self.segments.push(SegmentInfo {
|
||||
index: idx,
|
||||
duration,
|
||||
kind: self.segment_type,
|
||||
});
|
||||
fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
|
||||
self.segments
|
||||
.push(SegmentInfo(idx, duration, self.segment_type));
|
||||
|
||||
self.write_playlist()
|
||||
}
|
||||
|
||||
/// Delete segments which are too old
|
||||
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
|
||||
const MAX_SEGMENTS: usize = 10;
|
||||
|
||||
let mut ret = vec![];
|
||||
if self.segments.len() > MAX_SEGMENTS {
|
||||
let n_drain = self.segments.len() - MAX_SEGMENTS;
|
||||
let seg_dir = self.out_dir();
|
||||
for seg in self.segments.drain(..n_drain) {
|
||||
// delete file
|
||||
let seg_path = seg_dir.join(seg.filename());
|
||||
if let Err(e) = std::fs::remove_file(&seg_path) {
|
||||
warn!(
|
||||
"Failed to remove segment file: {} {}",
|
||||
seg_path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
ret.push(seg);
|
||||
std::fs::remove_file(seg_path)?;
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
self.write_playlist()
|
||||
}
|
||||
|
||||
fn write_playlist(&mut self) -> Result<()> {
|
||||
@ -365,7 +312,7 @@ impl HlsVariant {
|
||||
pl.target_duration = self.segment_length as u64;
|
||||
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
||||
pl.version = Some(3);
|
||||
pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0);
|
||||
pl.media_sequence = self.segments.first().map(|s| s.0).unwrap_or(0);
|
||||
|
||||
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
|
||||
pl.write_to(&mut f_out)?;
|
||||
@ -483,7 +430,7 @@ impl HlsMuxer {
|
||||
&mut self,
|
||||
pkt: *mut AVPacket,
|
||||
variant: &Uuid,
|
||||
) -> Result<EgressResult> {
|
||||
) -> Result<Option<NewSegment>> {
|
||||
for var in self.variants.iter_mut() {
|
||||
if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) {
|
||||
// very important for muxer to know which stream this pkt belongs to
|
@ -20,10 +20,6 @@ impl LocalOverseer {
|
||||
|
||||
#[async_trait]
|
||||
impl Overseer for LocalOverseer {
|
||||
async fn check_streams(&self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn start_stream(
|
||||
&self,
|
||||
_connection: &ConnectionInfo,
|
194
src/overseer/mod.rs
Normal file
194
src/overseer/mod.rs
Normal file
@ -0,0 +1,194 @@
|
||||
use crate::ingress::ConnectionInfo;
|
||||
|
||||
#[cfg(feature = "local-overseer")]
|
||||
use crate::overseer::local::LocalOverseer;
|
||||
#[cfg(feature = "webhook-overseer")]
|
||||
use crate::overseer::webhook::WebhookOverseer;
|
||||
#[cfg(feature = "zap-stream")]
|
||||
use crate::overseer::zap_stream::ZapStreamOverseer;
|
||||
use crate::pipeline::PipelineConfig;
|
||||
#[cfg(any(
|
||||
feature = "local-overseer",
|
||||
feature = "webhook-overseer",
|
||||
feature = "zap-stream"
|
||||
))]
|
||||
use crate::settings::OverseerConfig;
|
||||
use crate::settings::Settings;
|
||||
use crate::variant::audio::AudioVariant;
|
||||
use crate::variant::mapping::VariantMapping;
|
||||
use crate::variant::video::VideoVariant;
|
||||
use crate::variant::VariantStream;
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||
use std::cmp::PartialEq;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[cfg(feature = "local-overseer")]
|
||||
mod local;
|
||||
|
||||
#[cfg(feature = "webhook-overseer")]
|
||||
mod webhook;
|
||||
|
||||
#[cfg(feature = "zap-stream")]
|
||||
mod zap_stream;
|
||||
|
||||
/// A copy of [ffmpeg_rs_raw::DemuxerInfo] without internal ptr
|
||||
#[derive(PartialEq, Clone)]
|
||||
pub struct IngressInfo {
|
||||
pub bitrate: usize,
|
||||
pub streams: Vec<IngressStream>,
|
||||
}
|
||||
|
||||
/// A copy of [ffmpeg_rs_raw::StreamInfo] without ptr
|
||||
#[derive(PartialEq, Clone)]
|
||||
pub struct IngressStream {
|
||||
pub index: usize,
|
||||
pub stream_type: IngressStreamType,
|
||||
pub codec: isize,
|
||||
pub format: isize,
|
||||
pub width: usize,
|
||||
pub height: usize,
|
||||
pub fps: f32,
|
||||
pub sample_rate: usize,
|
||||
pub language: String,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone)]
|
||||
pub enum IngressStreamType {
|
||||
Video,
|
||||
Audio,
|
||||
Subtitle,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
/// The control process that oversees streaming operations
|
||||
pub trait Overseer: Send + Sync {
|
||||
/// Set up a new streaming pipeline
|
||||
async fn start_stream(
|
||||
&self,
|
||||
connection: &ConnectionInfo,
|
||||
stream_info: &IngressInfo,
|
||||
) -> Result<PipelineConfig>;
|
||||
|
||||
/// A new segment (HLS etc.) was generated for a stream variant
|
||||
///
|
||||
/// This handler is usually used for distribution / billing
|
||||
async fn on_segment(
|
||||
&self,
|
||||
pipeline_id: &Uuid,
|
||||
variant_id: &Uuid,
|
||||
index: u64,
|
||||
duration: f32,
|
||||
path: &PathBuf,
|
||||
) -> Result<()>;
|
||||
|
||||
/// At a regular interval, pipeline will emit one of the frames for processing as a
|
||||
/// thumbnail
|
||||
async fn on_thumbnail(
|
||||
&self,
|
||||
pipeline_id: &Uuid,
|
||||
width: usize,
|
||||
height: usize,
|
||||
path: &PathBuf,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Stream is finished
|
||||
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>;
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
pub async fn get_overseer(&self) -> Result<Arc<dyn Overseer>> {
|
||||
match &self.overseer {
|
||||
#[cfg(feature = "local-overseer")]
|
||||
OverseerConfig::Local => Ok(Arc::new(LocalOverseer::new())),
|
||||
#[cfg(feature = "webhook-overseer")]
|
||||
OverseerConfig::Webhook { url } => Ok(Arc::new(WebhookOverseer::new(&url))),
|
||||
#[cfg(feature = "zap-stream")]
|
||||
OverseerConfig::ZapStream {
|
||||
nsec: private_key,
|
||||
database,
|
||||
lnd,
|
||||
relays,
|
||||
blossom,
|
||||
} => Ok(Arc::new(
|
||||
ZapStreamOverseer::new(
|
||||
&self.output_dir,
|
||||
&self.public_url,
|
||||
private_key,
|
||||
database,
|
||||
lnd,
|
||||
relays,
|
||||
blossom,
|
||||
)
|
||||
.await?,
|
||||
)),
|
||||
_ => {
|
||||
panic!("Unsupported overseer");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
|
||||
let mut vars: Vec<VariantStream> = vec![];
|
||||
if let Some(video_src) = info
|
||||
.streams
|
||||
.iter()
|
||||
.find(|c| c.stream_type == IngressStreamType::Video)
|
||||
{
|
||||
vars.push(VariantStream::CopyVideo(VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: video_src.index,
|
||||
dst_index: 0,
|
||||
group_id: 0,
|
||||
}));
|
||||
vars.push(VariantStream::Video(VideoVariant {
|
||||
mapping: VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: video_src.index,
|
||||
dst_index: 1,
|
||||
group_id: 1,
|
||||
},
|
||||
width: 1280,
|
||||
height: 720,
|
||||
fps: video_src.fps,
|
||||
bitrate: 3_000_000,
|
||||
codec: "libx264".to_string(),
|
||||
profile: 100,
|
||||
level: 51,
|
||||
keyframe_interval: video_src.fps as u16 * 2,
|
||||
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
||||
}));
|
||||
}
|
||||
|
||||
if let Some(audio_src) = info
|
||||
.streams
|
||||
.iter()
|
||||
.find(|c| c.stream_type == IngressStreamType::Audio)
|
||||
{
|
||||
vars.push(VariantStream::CopyAudio(VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: audio_src.index,
|
||||
dst_index: 2,
|
||||
group_id: 0,
|
||||
}));
|
||||
vars.push(VariantStream::Audio(AudioVariant {
|
||||
mapping: VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: audio_src.index,
|
||||
dst_index: 3,
|
||||
group_id: 1,
|
||||
},
|
||||
bitrate: 192_000,
|
||||
codec: "aac".to_string(),
|
||||
channels: 2,
|
||||
sample_rate: 48_000,
|
||||
sample_fmt: "fltp".to_owned(),
|
||||
}));
|
||||
}
|
||||
|
||||
Ok(vars)
|
||||
}
|
@ -21,10 +21,6 @@ impl WebhookOverseer {
|
||||
|
||||
#[async_trait]
|
||||
impl Overseer for WebhookOverseer {
|
||||
async fn check_streams(&self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn start_stream(
|
||||
&self,
|
||||
connection: &ConnectionInfo,
|
@ -1,44 +1,30 @@
|
||||
use crate::blossom::{BlobDescriptor, Blossom};
|
||||
use crate::egress::hls::HlsEgress;
|
||||
use crate::egress::EgressConfig;
|
||||
use crate::ingress::ConnectionInfo;
|
||||
use crate::overseer::{get_default_variants, IngressInfo, Overseer};
|
||||
use crate::pipeline::{EgressType, PipelineConfig};
|
||||
use crate::settings::LndSettings;
|
||||
use crate::variant::StreamMapping;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use async_trait::async_trait;
|
||||
use base64::alphabet::STANDARD;
|
||||
use base64::Engine;
|
||||
use bytes::Bytes;
|
||||
use chrono::Utc;
|
||||
use fedimint_tonic_lnd::verrpc::VersionRequest;
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
|
||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||
use ffmpeg_rs_raw::Encoder;
|
||||
use futures_util::FutureExt;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::{Method, Request, Response};
|
||||
use log::{error, info, warn};
|
||||
use log::{info, warn};
|
||||
use nostr_sdk::bitcoin::PrivateKey;
|
||||
use nostr_sdk::prelude::Coordinate;
|
||||
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
|
||||
use serde::Serialize;
|
||||
use std::collections::HashSet;
|
||||
use std::env::temp_dir;
|
||||
use std::fs::create_dir_all;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
use zap_stream_core::egress::hls::HlsEgress;
|
||||
use zap_stream_core::egress::{EgressConfig, EgressSegment};
|
||||
use zap_stream_core::ingress::ConnectionInfo;
|
||||
use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer};
|
||||
use zap_stream_core::pipeline::{EgressType, PipelineConfig};
|
||||
use zap_stream_core::variant::audio::AudioVariant;
|
||||
use zap_stream_core::variant::mapping::VariantMapping;
|
||||
use zap_stream_core::variant::video::VideoVariant;
|
||||
use zap_stream_core::variant::{StreamMapping, VariantStream};
|
||||
use warp::Filter;
|
||||
use zap_stream_db::sqlx::Encode;
|
||||
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
|
||||
|
||||
@ -60,11 +46,6 @@ pub struct ZapStreamOverseer {
|
||||
blossom_servers: Vec<Blossom>,
|
||||
/// Public facing URL pointing to [out_dir]
|
||||
public_url: String,
|
||||
/// Cost / second / variant
|
||||
cost: i64,
|
||||
/// Currently active streams
|
||||
/// Any streams which are not contained in this set are dead
|
||||
active_streams: Arc<RwLock<HashSet<Uuid>>>,
|
||||
}
|
||||
|
||||
impl ZapStreamOverseer {
|
||||
@ -76,25 +57,10 @@ impl ZapStreamOverseer {
|
||||
lnd: &LndSettings,
|
||||
relays: &Vec<String>,
|
||||
blossom_servers: &Option<Vec<String>>,
|
||||
cost: i64,
|
||||
) -> Result<Self> {
|
||||
let db = ZapStreamDb::new(db).await?;
|
||||
db.migrate().await?;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let uid = db.upsert_user(&[0; 32]).await?;
|
||||
db.update_user_balance(uid, 100_000_000).await?;
|
||||
let user = db.get_user(uid).await?;
|
||||
|
||||
info!(
|
||||
"ZERO pubkey: uid={},key={},balance={}",
|
||||
user.id,
|
||||
user.stream_key,
|
||||
user.balance / 1000
|
||||
);
|
||||
}
|
||||
|
||||
let mut lnd = fedimint_tonic_lnd::connect(
|
||||
lnd.address.clone(),
|
||||
PathBuf::from(&lnd.cert),
|
||||
@ -128,15 +94,9 @@ impl ZapStreamOverseer {
|
||||
.map(|b| Blossom::new(b))
|
||||
.collect(),
|
||||
public_url: public_url.clone(),
|
||||
cost,
|
||||
active_streams: Arc::new(RwLock::new(HashSet::new())),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn database(&self) -> ZapStreamDb {
|
||||
self.db.clone()
|
||||
}
|
||||
|
||||
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
|
||||
let mut tags = vec![
|
||||
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
|
||||
@ -181,95 +141,71 @@ impl ZapStreamOverseer {
|
||||
|
||||
let kind = Kind::from(STREAM_EVENT_KIND);
|
||||
let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id);
|
||||
tags.push(Tag::parse([
|
||||
tags.push(Tag::parse(&[
|
||||
"alt",
|
||||
&format!("Watch live on https://zap.stream/{}", coord.to_bech32()?),
|
||||
])?);
|
||||
Ok(EventBuilder::new(kind, "").tags(tags))
|
||||
Ok(EventBuilder::new(kind, "", tags))
|
||||
}
|
||||
|
||||
fn blob_to_event_builder(&self, stream: &BlobDescriptor) -> Result<EventBuilder> {
|
||||
let tags = if let Some(tags) = stream.nip94.as_ref() {
|
||||
tags.iter()
|
||||
.map_while(|(k, v)| Tag::parse([k, v]).ok())
|
||||
.map_while(|(k, v)| Tag::parse(&[k, v]).ok())
|
||||
.collect()
|
||||
} else {
|
||||
let mut tags = vec![
|
||||
Tag::parse(["x", &stream.sha256])?,
|
||||
Tag::parse(["url", &stream.url])?,
|
||||
Tag::parse(["size", &stream.size.to_string()])?,
|
||||
Tag::parse(&["x", &stream.sha256])?,
|
||||
Tag::parse(&["url", &stream.url])?,
|
||||
Tag::parse(&["size", &stream.size.to_string()])?,
|
||||
];
|
||||
if let Some(m) = stream.mime_type.as_ref() {
|
||||
tags.push(Tag::parse(["m", m])?)
|
||||
tags.push(Tag::parse(&["m", m])?)
|
||||
}
|
||||
tags
|
||||
};
|
||||
|
||||
Ok(EventBuilder::new(Kind::FileMetadata, "").tags(tags))
|
||||
Ok(EventBuilder::new(Kind::FileMetadata, "", tags))
|
||||
}
|
||||
|
||||
async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
|
||||
let extra_tags = vec![
|
||||
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
||||
Tag::parse([
|
||||
let mut extra_tags = vec![
|
||||
Tag::parse(&["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
||||
Tag::parse(&[
|
||||
"streaming",
|
||||
self.map_to_stream_public_url(stream, "live.m3u8")?.as_str(),
|
||||
self.map_to_public_url(stream, "live.m3u8")?.as_str(),
|
||||
])?,
|
||||
Tag::parse([
|
||||
Tag::parse(&[
|
||||
"image",
|
||||
self.map_to_stream_public_url(stream, "thumb.webp")?
|
||||
.as_str(),
|
||||
self.map_to_public_url(stream, "thumb.webp")?.as_str(),
|
||||
])?,
|
||||
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
|
||||
];
|
||||
// flag NIP94 streaming when using blossom servers
|
||||
if self.blossom_servers.len() > 0 {
|
||||
extra_tags.push(Tag::parse(&["streaming", "nip94"])?);
|
||||
}
|
||||
let ev = self
|
||||
.stream_to_event_builder(stream)?
|
||||
.tags(extra_tags)
|
||||
.add_tags(extra_tags)
|
||||
.sign_with_keys(&self.keys)?;
|
||||
self.client.send_event(ev.clone()).await?;
|
||||
Ok(ev)
|
||||
}
|
||||
|
||||
fn map_to_stream_public_url(&self, stream: &UserStream, path: &str) -> Result<String> {
|
||||
self.map_to_public_url(&format!("{}/{}", stream.id, path))
|
||||
}
|
||||
|
||||
fn map_to_public_url(&self, path: &str) -> Result<String> {
|
||||
fn map_to_public_url<'a>(
|
||||
&self,
|
||||
stream: &UserStream,
|
||||
path: impl Into<&'a str>,
|
||||
) -> Result<String> {
|
||||
let u: Url = self.public_url.parse()?;
|
||||
Ok(u.join(path)?.to_string())
|
||||
Ok(u.join(&format!("/{}/", stream.id))?
|
||||
.join(path.into())?
|
||||
.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Endpoint {}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct AccountInfo {
|
||||
pub endpoints: Vec<Endpoint>,
|
||||
pub event: Event,
|
||||
pub balance: u64,
|
||||
}
|
||||
#[async_trait]
|
||||
impl Overseer for ZapStreamOverseer {
|
||||
async fn check_streams(&self) -> Result<()> {
|
||||
let active_streams = self.db.list_live_streams().await?;
|
||||
for stream in active_streams {
|
||||
// check
|
||||
let id = Uuid::parse_str(&stream.id)?;
|
||||
info!("Checking stream is alive: {}", stream.id);
|
||||
let is_active = {
|
||||
let streams = self.active_streams.read().await;
|
||||
streams.contains(&id)
|
||||
};
|
||||
if !is_active {
|
||||
if let Err(e) = self.on_end(&id).await {
|
||||
error!("Failed to end dead stream {}: {}", &id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_stream(
|
||||
&self,
|
||||
connection: &ConnectionInfo,
|
||||
@ -281,11 +217,6 @@ impl Overseer for ZapStreamOverseer {
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("User not found"))?;
|
||||
|
||||
let user = self.db.get_user(uid).await?;
|
||||
if user.balance <= 0 {
|
||||
bail!("Not enough balance");
|
||||
}
|
||||
|
||||
let variants = get_default_variants(&stream_info)?;
|
||||
|
||||
let mut egress = vec![];
|
||||
@ -294,6 +225,7 @@ impl Overseer for ZapStreamOverseer {
|
||||
variants: variants.iter().map(|v| v.id()).collect(),
|
||||
}));
|
||||
|
||||
let user = self.db.get_user(uid).await?;
|
||||
let stream_id = Uuid::new_v4();
|
||||
// insert new stream record
|
||||
let mut new_stream = UserStream {
|
||||
@ -306,12 +238,8 @@ impl Overseer for ZapStreamOverseer {
|
||||
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
|
||||
new_stream.event = Some(stream_event.as_json());
|
||||
|
||||
let mut streams = self.active_streams.write().await;
|
||||
streams.insert(stream_id.clone());
|
||||
|
||||
self.db.insert_stream(&new_stream).await?;
|
||||
self.db.update_stream(&new_stream).await?;
|
||||
|
||||
Ok(PipelineConfig {
|
||||
id: stream_id,
|
||||
variants,
|
||||
@ -319,65 +247,42 @@ impl Overseer for ZapStreamOverseer {
|
||||
})
|
||||
}
|
||||
|
||||
async fn on_segments(
|
||||
async fn on_segment(
|
||||
&self,
|
||||
pipeline_id: &Uuid,
|
||||
added: &Vec<EgressSegment>,
|
||||
deleted: &Vec<EgressSegment>,
|
||||
variant_id: &Uuid,
|
||||
index: u64,
|
||||
duration: f32,
|
||||
path: &PathBuf,
|
||||
) -> Result<()> {
|
||||
let stream = self.db.get_stream(pipeline_id).await?;
|
||||
|
||||
let duration = added.iter().fold(0.0, |acc, v| acc + v.duration);
|
||||
let cost = self.cost * duration.round() as i64;
|
||||
let bal = self
|
||||
.db
|
||||
.tick_stream(pipeline_id, stream.user_id, duration, cost)
|
||||
.await?;
|
||||
if bal <= 0 {
|
||||
bail!("Not enough balance");
|
||||
}
|
||||
|
||||
// Upload to blossom servers if configured (N94)
|
||||
// Upload to blossom servers if configured
|
||||
let mut blobs = vec![];
|
||||
for seg in added {
|
||||
for b in &self.blossom_servers {
|
||||
blobs.push(b.upload(&seg.path, &self.keys, Some("video/mp2t")).await?);
|
||||
for b in &self.blossom_servers {
|
||||
blobs.push(b.upload(path, &self.keys, Some("video/mp2t")).await?);
|
||||
}
|
||||
if let Some(blob) = blobs.first() {
|
||||
let a_tag = format!(
|
||||
"{}:{}:{}",
|
||||
STREAM_EVENT_KIND,
|
||||
self.keys.public_key.to_hex(),
|
||||
pipeline_id
|
||||
);
|
||||
let mut n94 = self.blob_to_event_builder(blob)?.add_tags([
|
||||
Tag::parse(&["a", &a_tag])?,
|
||||
Tag::parse(&["d", variant_id.to_string().as_str()])?,
|
||||
Tag::parse(&["duration", duration.to_string().as_str()])?,
|
||||
]);
|
||||
for b in blobs.iter().skip(1) {
|
||||
n94 = n94.add_tags(Tag::parse(&["url", &b.url]));
|
||||
}
|
||||
if let Some(blob) = blobs.first() {
|
||||
let a_tag = format!(
|
||||
"{}:{}:{}",
|
||||
STREAM_EVENT_KIND,
|
||||
self.keys.public_key.to_hex(),
|
||||
pipeline_id
|
||||
);
|
||||
let mut n94 = self.blob_to_event_builder(blob)?.tags([
|
||||
Tag::parse(["a", &a_tag])?,
|
||||
Tag::parse(["d", seg.variant.to_string().as_str()])?,
|
||||
Tag::parse(["index", seg.idx.to_string().as_str()])?,
|
||||
]);
|
||||
|
||||
// some servers add duration tag
|
||||
if blob
|
||||
.nip94
|
||||
.as_ref()
|
||||
.map(|a| a.contains_key("duration"))
|
||||
.is_none()
|
||||
{
|
||||
n94 = n94.tag(Tag::parse(["duration", seg.duration.to_string().as_str()])?);
|
||||
let n94 = n94.sign_with_keys(&self.keys)?;
|
||||
let cc = self.client.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = cc.send_event(n94).await {
|
||||
warn!("Error sending event: {}", e);
|
||||
}
|
||||
|
||||
for b in blobs.iter().skip(1) {
|
||||
n94 = n94.tag(Tag::parse(["url", &b.url])?);
|
||||
}
|
||||
let n94 = n94.sign_with_keys(&self.keys)?;
|
||||
let cc = self.client.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = cc.send_event(n94).await {
|
||||
warn!("Error sending event: {}", e);
|
||||
}
|
||||
});
|
||||
info!("Published N94 segment to {}", blob.url);
|
||||
}
|
||||
});
|
||||
info!("Published N94 segment to {}", blob.url);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -398,9 +303,6 @@ impl Overseer for ZapStreamOverseer {
|
||||
let mut stream = self.db.get_stream(pipeline_id).await?;
|
||||
let user = self.db.get_user(stream.user_id).await?;
|
||||
|
||||
let mut streams = self.active_streams.write().await;
|
||||
streams.remove(pipeline_id);
|
||||
|
||||
stream.state = UserStreamState::Ended;
|
||||
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
||||
stream.event = Some(event.as_json());
|
||||
@ -410,64 +312,3 @@ impl Overseer for ZapStreamOverseer {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
|
||||
let mut vars: Vec<VariantStream> = vec![];
|
||||
if let Some(video_src) = info
|
||||
.streams
|
||||
.iter()
|
||||
.find(|c| c.stream_type == IngressStreamType::Video)
|
||||
{
|
||||
vars.push(VariantStream::CopyVideo(VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: video_src.index,
|
||||
dst_index: 0,
|
||||
group_id: 0,
|
||||
}));
|
||||
vars.push(VariantStream::Video(VideoVariant {
|
||||
mapping: VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: video_src.index,
|
||||
dst_index: 1,
|
||||
group_id: 1,
|
||||
},
|
||||
width: 1280,
|
||||
height: 720,
|
||||
fps: video_src.fps,
|
||||
bitrate: 3_000_000,
|
||||
codec: "libx264".to_string(),
|
||||
profile: 100,
|
||||
level: 51,
|
||||
keyframe_interval: video_src.fps as u16 * 2,
|
||||
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
||||
}));
|
||||
}
|
||||
|
||||
if let Some(audio_src) = info
|
||||
.streams
|
||||
.iter()
|
||||
.find(|c| c.stream_type == IngressStreamType::Audio)
|
||||
{
|
||||
vars.push(VariantStream::CopyAudio(VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: audio_src.index,
|
||||
dst_index: 2,
|
||||
group_id: 0,
|
||||
}));
|
||||
vars.push(VariantStream::Audio(AudioVariant {
|
||||
mapping: VariantMapping {
|
||||
id: Uuid::new_v4(),
|
||||
src_index: audio_src.index,
|
||||
dst_index: 3,
|
||||
group_id: 1,
|
||||
},
|
||||
bitrate: 192_000,
|
||||
codec: "aac".to_string(),
|
||||
channels: 2,
|
||||
sample_rate: 48_000,
|
||||
sample_fmt: "fltp".to_owned(),
|
||||
}));
|
||||
}
|
||||
|
||||
Ok(vars)
|
||||
}
|
@ -73,9 +73,6 @@ pub struct PipelineRunner {
|
||||
overseer: Arc<dyn Overseer>,
|
||||
|
||||
fps_counter_start: Instant,
|
||||
fps_last_frame_ctr: u64,
|
||||
|
||||
/// Total number of frames produced
|
||||
frame_ctr: u64,
|
||||
out_dir: String,
|
||||
}
|
||||
@ -103,7 +100,6 @@ impl PipelineRunner {
|
||||
fps_counter_start: Instant::now(),
|
||||
egress: Vec::new(),
|
||||
frame_ctr: 0,
|
||||
fps_last_frame_ctr: 0,
|
||||
info: None,
|
||||
})
|
||||
}
|
||||
@ -144,7 +140,7 @@ impl PipelineRunner {
|
||||
};
|
||||
|
||||
// run transcoder pipeline
|
||||
let (mut pkt, _stream) = self.demuxer.get_packet()?;
|
||||
let (mut pkt, stream) = self.demuxer.get_packet()?;
|
||||
if pkt.is_null() {
|
||||
return Ok(false);
|
||||
}
|
||||
@ -159,14 +155,16 @@ impl PipelineRunner {
|
||||
};
|
||||
|
||||
let mut egress_results = vec![];
|
||||
for (frame, stream) in frames {
|
||||
for frame in frames {
|
||||
// Copy frame from GPU if using hwaccel decoding
|
||||
let mut frame = get_frame_from_hw(frame)?;
|
||||
(*frame).time_base = (*stream).time_base;
|
||||
|
||||
let p = (*stream).codecpar;
|
||||
if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
|
||||
if (self.frame_ctr % 1800) == 0 {
|
||||
let pts_sec = ((*frame).pts as f64 * av_q2d((*stream).time_base)).floor() as u64;
|
||||
// write thumbnail every 1min
|
||||
if pts_sec % 60 == 0 && pts_sec != 0 {
|
||||
let dst_pic = PathBuf::from(&self.out_dir)
|
||||
.join(config.id.to_string())
|
||||
.join("thumb.webp");
|
||||
@ -270,24 +268,22 @@ impl PipelineRunner {
|
||||
// egress results
|
||||
self.handle.block_on(async {
|
||||
for er in egress_results {
|
||||
if let EgressResult::Segments { created, deleted } = er {
|
||||
if let EgressResult::NewSegment(seg) = er {
|
||||
if let Err(e) = self
|
||||
.overseer
|
||||
.on_segments(&config.id, &created, &deleted)
|
||||
.on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path)
|
||||
.await
|
||||
{
|
||||
bail!("Failed to process segment {}", e.to_string());
|
||||
error!("Failed to process segment: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
});
|
||||
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
|
||||
if elapsed >= 2f32 {
|
||||
let n_frames = self.frame_ctr - self.fps_last_frame_ctr;
|
||||
info!("Average fps: {:.2}", n_frames as f32 / elapsed);
|
||||
info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed);
|
||||
self.fps_counter_start = Instant::now();
|
||||
self.fps_last_frame_ctr = self.frame_ctr;
|
||||
self.frame_ctr = 0;
|
||||
}
|
||||
Ok(true)
|
||||
}
|
@ -1,6 +1,4 @@
|
||||
use crate::overseer::ZapStreamOverseer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Settings {
|
||||
@ -11,9 +9,6 @@ pub struct Settings {
|
||||
/// - rtmp://localhost:1935
|
||||
pub endpoints: Vec<String>,
|
||||
|
||||
/// Public facing hostname that maps to [endpoints]
|
||||
pub endpoints_public_hostname: String,
|
||||
|
||||
/// Where to store output (static files)
|
||||
pub output_dir: String,
|
||||
|
||||
@ -23,7 +18,7 @@ pub struct Settings {
|
||||
/// Binding address for http server serving files from [output_dir]
|
||||
pub listen_http: String,
|
||||
|
||||
/// Overseer service see [Overseer] for more info
|
||||
/// Overseer service see [crate::overseer::Overseer] for more info
|
||||
pub overseer: OverseerConfig,
|
||||
}
|
||||
|
||||
@ -49,8 +44,6 @@ pub enum OverseerConfig {
|
||||
nsec: String,
|
||||
/// Blossom servers
|
||||
blossom: Option<Vec<String>>,
|
||||
/// Cost (milli-sats) / second / variant
|
||||
cost: i64,
|
||||
},
|
||||
}
|
||||
|
||||
@ -60,33 +53,3 @@ pub struct LndSettings {
|
||||
pub cert: String,
|
||||
pub macaroon: String,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
pub async fn get_overseer(&self) -> anyhow::Result<Arc<ZapStreamOverseer>> {
|
||||
match &self.overseer {
|
||||
OverseerConfig::ZapStream {
|
||||
nsec: private_key,
|
||||
database,
|
||||
lnd,
|
||||
relays,
|
||||
blossom,
|
||||
cost,
|
||||
} => Ok(Arc::new(
|
||||
ZapStreamOverseer::new(
|
||||
&self.output_dir,
|
||||
&self.public_url,
|
||||
private_key,
|
||||
database,
|
||||
lnd,
|
||||
relays,
|
||||
blossom,
|
||||
*cost,
|
||||
)
|
||||
.await?,
|
||||
)),
|
||||
_ => {
|
||||
panic!("Unsupported overseer");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Before Width: | Height: | Size: 39 KiB After Width: | Height: | Size: 39 KiB |
@ -8,8 +8,8 @@ default = []
|
||||
test-pattern = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
anyhow = "^1.0.70"
|
||||
chrono = { version = "0.4.38", features = ["serde"] }
|
||||
sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono"] }
|
||||
log = "0.4.22"
|
||||
uuid = { version = "1.11.0", features = ["v4"] }
|
@ -1,9 +1,8 @@
|
||||
use crate::{User, UserStream};
|
||||
use anyhow::Result;
|
||||
use sqlx::{Executor, MySqlPool, Row};
|
||||
use sqlx::{MySqlPool, Row};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ZapStreamDb {
|
||||
db: MySqlPool,
|
||||
}
|
||||
@ -43,16 +42,6 @@ impl ZapStreamDb {
|
||||
.map_err(anyhow::Error::new)?)
|
||||
}
|
||||
|
||||
/// Update a users balance
|
||||
pub async fn update_user_balance(&self, uid: u64, diff: i64) -> Result<()> {
|
||||
sqlx::query("update user set balance = balance + ? where id = ?")
|
||||
.bind(diff)
|
||||
.bind(uid)
|
||||
.execute(&self.db)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
|
||||
let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id")
|
||||
.bind(pubkey.as_slice())
|
||||
@ -112,45 +101,4 @@ impl ZapStreamDb {
|
||||
.await
|
||||
.map_err(anyhow::Error::new)?)
|
||||
}
|
||||
|
||||
/// Get the list of active streams
|
||||
pub async fn list_live_streams(&self) -> Result<Vec<UserStream>> {
|
||||
Ok(sqlx::query_as("select * from user_stream where state = 2")
|
||||
.fetch_all(&self.db)
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Add [duration] & [cost] to a stream and return the new user balance
|
||||
pub async fn tick_stream(
|
||||
&self,
|
||||
stream_id: &Uuid,
|
||||
user_id: u64,
|
||||
duration: f32,
|
||||
cost: i64,
|
||||
) -> Result<i64> {
|
||||
let mut tx = self.db.begin().await?;
|
||||
|
||||
sqlx::query("update user_stream set duration = duration + ?, cost = cost + ? where id = ?")
|
||||
.bind(&duration)
|
||||
.bind(&cost)
|
||||
.bind(stream_id.to_string())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query("update user set balance = balance - ? where id = ?")
|
||||
.bind(&cost)
|
||||
.bind(&user_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let balance: i64 = sqlx::query("select balance from user where id = ?")
|
||||
.bind(&user_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.try_get(0)?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(balance)
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::{FromRow, Type};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone, FromRow)]
|
||||
pub struct User {
|
Before Width: | Height: | Size: 118 KiB After Width: | Height: | Size: 118 KiB |
Loading…
x
Reference in New Issue
Block a user