feat: publish n96 segments

This commit is contained in:
kieran 2024-11-15 18:26:23 +00:00
parent 9fdc1defaa
commit 0da9bd996f
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
26 changed files with 278 additions and 75 deletions

15
Cargo.lock generated
View File

@ -1874,7 +1874,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"windows-targets 0.48.5", "windows-targets 0.52.6",
] ]
[[package]] [[package]]
@ -2702,7 +2702,7 @@ dependencies = [
"once_cell", "once_cell",
"socket2", "socket2",
"tracing", "tracing",
"windows-sys 0.52.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]
@ -3430,6 +3430,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tracing", "tracing",
"url", "url",
"uuid",
] ]
[[package]] [[package]]
@ -3511,6 +3512,7 @@ dependencies = [
"stringprep", "stringprep",
"thiserror 1.0.57", "thiserror 1.0.57",
"tracing", "tracing",
"uuid",
"whoami", "whoami",
] ]
@ -3550,6 +3552,7 @@ dependencies = [
"stringprep", "stringprep",
"thiserror 1.0.57", "thiserror 1.0.57",
"tracing", "tracing",
"uuid",
"whoami", "whoami",
] ]
@ -3575,6 +3578,7 @@ dependencies = [
"sqlx-core", "sqlx-core",
"tracing", "tracing",
"url", "url",
"uuid",
] ]
[[package]] [[package]]
@ -3746,7 +3750,7 @@ dependencies = [
"fastrand", "fastrand",
"once_cell", "once_cell",
"rustix", "rustix",
"windows-sys 0.52.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]
@ -4333,9 +4337,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.8.0" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"serde", "serde",
@ -4816,6 +4820,7 @@ dependencies = [
"chrono", "chrono",
"log", "log",
"sqlx", "sqlx",
"uuid",
] ]
[[package]] [[package]]

View File

@ -35,7 +35,7 @@ overseer:
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
relays: relays:
- "ws://localhost:7766" - "ws://localhost:7766"
database: "mysql://root:root@localhost:3368/zap-stream?max_connections=2" database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2"
lnd: lnd:
address: "https://127.0.0.1:10001" address: "https://127.0.0.1:10001"
cert: "/home/kieran/.polar/networks/1/volumes/lnd/alice/tls.cert" cert: "/home/kieran/.polar/networks/1/volumes/lnd/alice/tls.cert"

2
db.sql
View File

@ -1,2 +0,0 @@
create database route96;
create database zap-stream;

2
dev-setup/db.sql Normal file
View File

@ -0,0 +1,2 @@
create database route96;
create database zap_stream;

View File

@ -1,5 +1,5 @@
# Listen address for webserver # Listen address for webserver
listen = "127.0.0.1:8000" listen = "0.0.0.0:8000"
# Database connection string (MYSQL) # Database connection string (MYSQL)
database = "mysql://root:root@db:3306/route96" database = "mysql://root:root@db:3306/route96"

144
dev-setup/strfry.conf Normal file
View File

@ -0,0 +1,144 @@
##
## Default strfry config
##
# Directory that contains the strfry LMDB database (restart required)
db = "./strfry-db/"
dbParams {
# Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required)
maxreaders = 256
# Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used) (restart required)
mapsize = 10995116277760
# Disables read-ahead when accessing the LMDB mapping. Reduces IO activity when DB size is larger than RAM. (restart required)
noReadAhead = false
}
events {
# Maximum size of normalised JSON, in bytes
maxEventSize = 65536
# Events newer than this will be rejected
rejectEventsNewerThanSeconds = 900
# Events older than this will be rejected
rejectEventsOlderThanSeconds = 94608000
# Ephemeral events older than this will be rejected
rejectEphemeralEventsOlderThanSeconds = 60
# Ephemeral events will be deleted from the DB when older than this
ephemeralEventsLifetimeSeconds = 300
# Maximum number of tags allowed
maxNumTags = 2000
# Maximum size for tag values, in bytes
maxTagValSize = 1024
}
relay {
# Interface to listen on. Use 0.0.0.0 to listen on all interfaces (restart required)
bind = "0.0.0.0"
# Port to open for the nostr websocket protocol (restart required)
port = 7777
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
nofiles = 1000000
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
realIpHeader = ""
info {
# NIP-11: Name of this server. Short/descriptive (< 30 characters)
name = "strfry default"
# NIP-11: Detailed information about relay, free-form
description = "This is a strfry instance."
# NIP-11: Administrative nostr pubkey, for contact purposes
pubkey = ""
# NIP-11: Alternative administrative contact (email, website, etc)
contact = ""
# NIP-11: URL pointing to an image to be used as an icon for the relay
icon = ""
# List of supported lists as JSON array, or empty string to use default. Example: "[1,2]"
nips = ""
}
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
maxWebsocketPayloadSize = 131072
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
autoPingSeconds = 55
# If TCP keep-alive should be enabled (detect dropped connections to upstream reverse proxy)
enableTcpKeepalive = false
# How much uninterrupted CPU time a REQ query should get during its DB scan
queryTimesliceBudgetMicroseconds = 10000
# Maximum records that can be returned per filter
maxFilterLimit = 500
# Maximum number of subscriptions (concurrent REQs) a connection can have open at any time
maxSubsPerConnection = 20
writePolicy {
# If non-empty, path to an executable script that implements the writePolicy plugin logic
plugin = ""
}
compression {
# Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required)
enabled = true
# Maintain a sliding window buffer for each connection. Improves compression, but uses more memory (restart required)
slidingWindow = true
}
logging {
# Dump all incoming messages
dumpInAll = false
# Dump all incoming EVENT messages
dumpInEvents = false
# Dump all incoming REQ/CLOSE messages
dumpInReqs = false
# Log performance metrics for initial REQ database scans
dbScanPerf = false
# Log reason for invalid event rejection? Can be disabled to silence excessive logging
invalidEvents = true
}
numThreads {
# Ingester threads: route incoming requests, validate events/sigs (restart required)
ingester = 3
# reqWorker threads: Handle initial DB scan for events (restart required)
reqWorker = 3
# reqMonitor threads: Handle filtering of new events (restart required)
reqMonitor = 3
# negentropy threads: Handle negentropy protocol messages (restart required)
negentropy = 2
}
negentropy {
# Support negentropy protocol messages
enabled = true
# Maximum records that sync will process before returning an error
maxSyncEvents = 1000000
}
}

View File

@ -7,11 +7,14 @@ services:
ports: ports:
- "3368:3306" - "3368:3306"
volumes: volumes:
- "./db.sql:/docker-entrypoint-initdb.d/00-init.sql" - "./dev-setup/db.sql:/docker-entrypoint-initdb.d/00-init.sql"
relay: relay:
image: scsibug/nostr-rs-relay image: dockurr/strfry
ports: ports:
- "7766:8080" - "7766:7777"
volumes:
- "relay:/app/strfry-db"
- "./dev-setup/strfry.conf:/etc/strfry.conf"
blossom: blossom:
depends_on: depends_on:
- db - db
@ -22,7 +25,7 @@ services:
- "8881:8000" - "8881:8000"
volumes: volumes:
- "blossom:/app/data" - "blossom:/app/data"
- "./route96.toml:/app/config.toml" - "./dev-setup/route96.toml:/app/config.toml"
volumes: volumes:
db: db:
blossom: blossom:

View File

@ -1,9 +1,9 @@
use std::collections::HashMap;
use anyhow::Result; use anyhow::Result;
use base64::Engine; use base64::Engine;
use nostr_sdk::{EventBuilder, JsonUtil, Keys, Kind, Tag, Timestamp}; use nostr_sdk::{EventBuilder, JsonUtil, Keys, Kind, Tag, Timestamp};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::ops::Add; use std::ops::Add;
use std::path::PathBuf; use std::path::PathBuf;
@ -40,7 +40,7 @@ impl Blossom {
let mut hash = Sha256::new(); let mut hash = Sha256::new();
let mut buf: [u8; 1024] = [0; 1024]; let mut buf: [u8; 1024] = [0; 1024];
f.seek(SeekFrom::Start(0)).await?; f.seek(SeekFrom::Start(0)).await?;
while let Ok(data) = f.read(&mut buf).await { while let Ok(data) = f.read(&mut buf[..]).await {
if data == 0 { if data == 0 {
break; break;
} }
@ -51,11 +51,7 @@ impl Blossom {
Ok(hex::encode(hash)) Ok(hex::encode(hash))
} }
pub async fn upload( pub async fn upload(&self, from_file: &PathBuf, keys: &Keys) -> Result<BlobDescriptor> {
&self,
from_file: &PathBuf,
keys: &Keys,
) -> Result<BlobDescriptor> {
let mut f = File::open(from_file).await?; let mut f = File::open(from_file).await?;
let hash = Self::hash_file(&mut f).await?; let hash = Self::hash_file(&mut f).await?;
let auth_event = EventBuilder::new( let auth_event = EventBuilder::new(

View File

@ -1,6 +1,5 @@
use anyhow::Result; use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
use std::fmt::Display;
use uuid::Uuid; use uuid::Uuid;
use crate::egress::{Egress, EgressResult}; use crate::egress::{Egress, EgressResult};

View File

@ -1,6 +1,5 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer; use crate::overseer::Overseer;
use crate::settings::Settings;
use anyhow::Result; use anyhow::Result;
use log::info; use log::info;
use std::path::PathBuf; use std::path::PathBuf;

View File

@ -1,7 +1,5 @@
use crate::overseer::Overseer; use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner; use crate::pipeline::runner::PipelineRunner;
use crate::settings::Settings;
use anyhow::Result;
use log::{error, info}; use log::{error, info};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::io::Read; use std::io::Read;

View File

@ -5,7 +5,6 @@ use tokio::net::TcpListener;
use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer; use crate::overseer::Overseer;
use crate::settings::Settings;
pub async fn listen(addr: String, overseer: Arc<dyn Overseer>) -> Result<()> { pub async fn listen(addr: String, overseer: Arc<dyn Overseer>) -> Result<()> {
let listener = TcpListener::bind(addr.clone()).await?; let listener = TcpListener::bind(addr.clone()).await?;

View File

@ -1,6 +1,5 @@
use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::ingress::{spawn_pipeline, ConnectionInfo};
use crate::overseer::Overseer; use crate::overseer::Overseer;
use crate::settings::Settings;
use anyhow::Result; use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_RGB; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_RGB;

View File

@ -1,3 +1,5 @@
#[cfg(feature = "zap-stream")]
pub mod blossom;
pub mod egress; pub mod egress;
pub mod ingress; pub mod ingress;
pub mod mux; pub mod mux;
@ -5,4 +7,3 @@ pub mod overseer;
pub mod pipeline; pub mod pipeline;
pub mod settings; pub mod settings;
pub mod variant; pub mod variant;
mod blossom;

View File

@ -1,4 +1,4 @@
use crate::egress::{EgressResult, NewSegment}; use crate::egress::NewSegment;
use crate::variant::{StreamMapping, VariantStream}; use crate::variant::{StreamMapping, VariantStream};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
@ -220,11 +220,13 @@ impl HlsVariant {
.find(|a| matches!(*a, HlsVariantStream::Video { .. })) .find(|a| matches!(*a, HlsVariantStream::Video { .. }))
.map_or(Default::default(), |v| v.id().clone()); .map_or(Default::default(), |v| v.id().clone());
// emit result of the previously completed segment,
let prev_seg = self.idx - 1;
Ok(NewSegment { Ok(NewSegment {
variant: video_var, variant: video_var,
idx: self.idx - 1, // emit result of the previously completed segment, idx: prev_seg,
duration, duration,
path: PathBuf::from(next_seg_url), path: PathBuf::from(Self::map_segment_path(&*self.out_dir, &self.name, prev_seg)),
}) })
} }

View File

@ -11,7 +11,6 @@ use crate::variant::video::VideoVariant;
use crate::variant::{StreamMapping, VariantStream}; use crate::variant::{StreamMapping, VariantStream};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use std::cmp::PartialEq; use std::cmp::PartialEq;
use std::path::PathBuf; use std::path::PathBuf;
@ -66,7 +65,7 @@ pub trait Overseer: Send + Sync {
/// This handler is usually used for distribution / billing /// This handler is usually used for distribution / billing
async fn on_segment( async fn on_segment(
&self, &self,
pipeline: &Uuid, pipeline_id: &Uuid,
variant_id: &Uuid, variant_id: &Uuid,
index: u64, index: u64,
duration: f32, duration: f32,
@ -179,7 +178,7 @@ impl Overseer for StaticOverseer {
let vars = get_default_variants(stream_info)?; let vars = get_default_variants(stream_info)?;
let var_ids = vars.iter().map(|v| v.id()).collect(); let var_ids = vars.iter().map(|v| v.id()).collect();
Ok(PipelineConfig { Ok(PipelineConfig {
id: Utc::now().timestamp() as u64, id: Uuid::new_v4(),
variants: vars, variants: vars,
egress: vec![ egress: vec![
/*EgressType::Recorder(EgressConfig { /*EgressType::Recorder(EgressConfig {
@ -199,7 +198,7 @@ impl Overseer for StaticOverseer {
async fn on_segment( async fn on_segment(
&self, &self,
pipeline: &Uuid, pipeline_id: &Uuid,
variant_id: &Uuid, variant_id: &Uuid,
index: u64, index: u64,
duration: f32, duration: f32,

View File

@ -31,7 +31,7 @@ impl Overseer for WebhookOverseer {
async fn on_segment( async fn on_segment(
&self, &self,
pipeline: &Uuid, pipeline_id: &Uuid,
variant_id: &Uuid, variant_id: &Uuid,
index: u64, index: u64,
duration: f32, duration: f32,

View File

@ -1,4 +1,4 @@
use crate::blossom::Blossom; use crate::blossom::{BlobDescriptor, Blossom};
use crate::egress::hls::HlsEgress; use crate::egress::hls::HlsEgress;
use crate::egress::EgressConfig; use crate::egress::EgressConfig;
use crate::ingress::ConnectionInfo; use crate::ingress::ConnectionInfo;
@ -10,6 +10,7 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc; use chrono::Utc;
use fedimint_tonic_lnd::verrpc::VersionRequest; use fedimint_tonic_lnd::verrpc::VersionRequest;
use futures_util::FutureExt;
use log::info; use log::info;
use nostr_sdk::bitcoin::PrivateKey; use nostr_sdk::bitcoin::PrivateKey;
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag}; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag};
@ -20,6 +21,8 @@ use std::str::FromStr;
use uuid::Uuid; use uuid::Uuid;
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
const STREAM_EVENT_KIND: u16 = 30_313;
/// zap.stream NIP-53 overseer /// zap.stream NIP-53 overseer
pub struct ZapStreamOverseer { pub struct ZapStreamOverseer {
db: ZapStreamDb, db: ZapStreamDb,
@ -36,6 +39,7 @@ impl ZapStreamOverseer {
relays: &Vec<String>, relays: &Vec<String>,
) -> Result<Self> { ) -> Result<Self> {
let db = ZapStreamDb::new(db).await?; let db = ZapStreamDb::new(db).await?;
db.migrate().await?;
let mut lnd = fedimint_tonic_lnd::connect( let mut lnd = fedimint_tonic_lnd::connect(
lnd.address.clone(), lnd.address.clone(),
@ -93,22 +97,18 @@ impl Overseer for ZapStreamOverseer {
// insert new stream record // insert new stream record
let mut new_stream = UserStream { let mut new_stream = UserStream {
id: 0, id: Uuid::new_v4(),
user_id: uid, user_id: uid,
starts: Utc::now(), starts: Utc::now(),
state: UserStreamState::Live, state: UserStreamState::Live,
..Default::default() ..Default::default()
}; };
let stream_id = self.db.insert_stream(&new_stream).await?;
new_stream.id = stream_id;
let stream_event = publish_stream_event(&new_stream, &self.client).await?; let stream_event = publish_stream_event(&new_stream, &self.client).await?;
new_stream.event = Some(stream_event.as_json()); new_stream.event = Some(stream_event.as_json());
self.db.update_stream(&new_stream).await?;
self.db.insert_stream(&new_stream).await?;
Ok(PipelineConfig { Ok(PipelineConfig {
id: stream_id, id: new_stream.id,
variants, variants,
egress, egress,
}) })
@ -116,21 +116,33 @@ impl Overseer for ZapStreamOverseer {
async fn on_segment( async fn on_segment(
&self, &self,
pipeline: &Uuid, pipeline_id: &Uuid,
variant_id: &Uuid, variant_id: &Uuid,
index: u64, index: u64,
duration: f32, duration: f32,
path: &PathBuf, path: &PathBuf,
) -> Result<()> { ) -> Result<()> {
let blossom = Blossom::new("http://localhost:8881/"); let blossom = Blossom::new("http://localhost:8881/");
let blob = blossom.upload(path, &self.keys).await?; let blob = blossom.upload(path, &self.keys).await?;
let a_tag = format!(
"{}:{}:{}",
pipeline_id,
self.keys.public_key.to_hex(),
STREAM_EVENT_KIND
);
// publish nip94 tagged to stream
let n96 = blob_to_event_builder(&blob)?
.add_tags(Tag::parse(&["a", &a_tag]))
.sign_with_keys(&self.keys)?;
self.client.send_event(n96).await?;
info!("Published N96 segment for {}", a_tag);
Ok(()) Ok(())
} }
} }
pub(super) fn to_event_builder(this: &UserStream) -> Result<EventBuilder> { fn stream_to_event_builder(this: &UserStream) -> Result<EventBuilder> {
let mut tags = vec![ let mut tags = vec![
Tag::parse(&["d".to_string(), this.id.to_string()])?, Tag::parse(&["d".to_string(), this.id.to_string()])?,
Tag::parse(&["status".to_string(), this.state.to_string()])?, Tag::parse(&["status".to_string(), this.state.to_string()])?,
@ -171,13 +183,33 @@ pub(super) fn to_event_builder(this: &UserStream) -> Result<EventBuilder> {
tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?); tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?);
} }
} }
Ok(EventBuilder::new(Kind::from(30_313), "", tags)) Ok(EventBuilder::new(Kind::from(STREAM_EVENT_KIND), "", tags))
} }
pub(super) async fn publish_stream_event(this: &UserStream, client: &Client) -> Result<Event> { async fn publish_stream_event(this: &UserStream, client: &Client) -> Result<Event> {
let ev = to_event_builder(this)? let ev = stream_to_event_builder(this)?
.sign(&client.signer().await?) .sign(&client.signer().await?)
.await?; .await?;
client.send_event(ev.clone()).await?; client.send_event(ev.clone()).await?;
Ok(ev) Ok(ev)
} }
fn blob_to_event_builder(this: &BlobDescriptor) -> Result<EventBuilder> {
let tags = if let Some(tags) = this.nip94.as_ref() {
tags.iter()
.map_while(|(k, v)| Tag::parse(&[k, v]).ok())
.collect()
} else {
let mut tags = vec![
Tag::parse(&["x", &this.sha256])?,
Tag::parse(&["url", &this.url])?,
Tag::parse(&["size", &this.size.to_string()])?,
];
if let Some(m) = this.mime_type.as_ref() {
tags.push(Tag::parse(&["m", m])?)
}
tags
};
Ok(EventBuilder::new(Kind::FileMetadata, "", tags))
}

View File

@ -45,7 +45,7 @@ impl Display for EgressType {
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct PipelineConfig { pub struct PipelineConfig {
pub id: u64, pub id: Uuid,
/// Transcoded/Copied stream config /// Transcoded/Copied stream config
pub variants: Vec<VariantStream>, pub variants: Vec<VariantStream>,
/// Output muxers /// Output muxers

View File

@ -7,7 +7,7 @@ use std::time::Instant;
use crate::egress::hls::HlsEgress; use crate::egress::hls::HlsEgress;
use crate::egress::recorder::RecorderEgress; use crate::egress::recorder::RecorderEgress;
use crate::egress::Egress; use crate::egress::{Egress, EgressResult};
use crate::ingress::ConnectionInfo; use crate::ingress::ConnectionInfo;
use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
use crate::pipeline::{EgressType, PipelineConfig}; use crate::pipeline::{EgressType, PipelineConfig};
@ -20,7 +20,7 @@ use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, StreamType, cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, StreamType,
}; };
use itertools::Itertools; use itertools::Itertools;
use log::{info, warn}; use log::{error, info, warn};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use uuid::Uuid; use uuid::Uuid;
@ -116,6 +116,7 @@ impl PipelineRunner {
return Ok(()); return Ok(());
}; };
let mut egress_results = vec![];
for frame in frames { for frame in frames {
self.frame_ctr += 1; self.frame_ctr += 1;
@ -175,7 +176,8 @@ impl PipelineRunner {
// pass new packets to egress // pass new packets to egress
for mut pkt in packets { for mut pkt in packets {
for eg in self.egress.iter_mut() { for eg in self.egress.iter_mut() {
eg.process_pkt(pkt, &var.id())?; let er = eg.process_pkt(pkt, &var.id())?;
egress_results.push(er);
} }
av_packet_free(&mut pkt); av_packet_free(&mut pkt);
} }
@ -190,6 +192,20 @@ impl PipelineRunner {
av_packet_free(&mut pkt); av_packet_free(&mut pkt);
// egress results
self.handle.block_on(async {
for er in egress_results {
if let EgressResult::NewSegment(seg) = er {
if let Err(e) = self
.overseer
.on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path)
.await
{
error!("Failed to process segment: {}", e);
}
}
}
});
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
if elapsed >= 2f32 { if elapsed >= 2f32 {
info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed); info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed);
@ -230,11 +246,9 @@ impl PipelineRunner {
.collect(), .collect(),
}; };
let cfg = self.handle.block_on(async { let cfg = self
self.overseer .handle
.start_stream(&self.connection, &i_info) .block_on(async { self.overseer.start_stream(&self.connection, &i_info).await })?;
.await
})?;
self.config = Some(cfg); self.config = Some(cfg);
self.info = Some(i_info); self.info = Some(i_info);

View File

@ -1,4 +1,3 @@
use crate::pipeline::EgressType;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -1274,6 +1274,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tracing", "tracing",
"url", "url",
"uuid",
] ]
[[package]] [[package]]
@ -1355,6 +1356,7 @@ dependencies = [
"stringprep", "stringprep",
"thiserror", "thiserror",
"tracing", "tracing",
"uuid",
"whoami", "whoami",
] ]
@ -1394,6 +1396,7 @@ dependencies = [
"stringprep", "stringprep",
"thiserror", "thiserror",
"tracing", "tracing",
"uuid",
"whoami", "whoami",
] ]
@ -1419,6 +1422,7 @@ dependencies = [
"sqlx-core", "sqlx-core",
"tracing", "tracing",
"url", "url",
"uuid",
] ]
[[package]] [[package]]
@ -1644,6 +1648,15 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "uuid"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.15" version = "0.2.15"
@ -1934,6 +1947,7 @@ dependencies = [
"chrono", "chrono",
"log", "log",
"sqlx", "sqlx",
"uuid",
] ]
[[package]] [[package]]

View File

@ -10,5 +10,6 @@ test-pattern = []
[dependencies] [dependencies]
anyhow = "^1.0.70" anyhow = "^1.0.70"
chrono = { version = "0.4.38", features = ["serde"] } chrono = { version = "0.4.38", features = ["serde"] }
sqlx = { version = "0.8.2", features = ["runtime-tokio", "migrate", "mysql", "chrono"] } sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono", "uuid"] }
log = "0.4.22" log = "0.4.22"
uuid = { version = "1.11.0", features = ["v4"] }

View File

@ -13,7 +13,7 @@ create table user
create unique index ix_user_pubkey on user (pubkey); create unique index ix_user_pubkey on user (pubkey);
create table user_stream create table user_stream
( (
id integer unsigned not null auto_increment primary key, id UUID not null primary key,
user_id integer unsigned not null, user_id integer unsigned not null,
starts timestamp not null, starts timestamp not null,
ends timestamp, ends timestamp,

View File

@ -1,6 +1,5 @@
use crate::UserStream; use crate::UserStream;
use anyhow::Result; use anyhow::Result;
use log::info;
use sqlx::{MySqlPool, Row}; use sqlx::{MySqlPool, Row};
pub struct ZapStreamDb { pub struct ZapStreamDb {
@ -49,17 +48,16 @@ impl ZapStreamDb {
} }
} }
pub async fn insert_stream(&self, user_stream: &UserStream) -> Result<u64> { pub async fn insert_stream(&self, user_stream: &UserStream) -> Result<()> {
sqlx::query( sqlx::query("insert into user_stream (id, user_id, state, starts) values (?, ?, ?, ?)")
"insert into user_stream (user_id, state, starts) values (?, ?, ?) returning id", .bind(&user_stream.id)
) .bind(&user_stream.user_id)
.bind(&user_stream.user_id) .bind(&user_stream.state)
.bind(&user_stream.state) .bind(&user_stream.starts)
.bind(&user_stream.starts) .execute(&self.db)
.fetch_one(&self.db) .await?;
.await?
.try_get(0) Ok(())
.map_err(anyhow::Error::new)
} }
pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> { pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> {

View File

@ -1,6 +1,7 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::{FromRow, Type}; use sqlx::{FromRow, Type};
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use uuid::Uuid;
#[derive(Debug, Clone, FromRow)] #[derive(Debug, Clone, FromRow)]
pub struct User { pub struct User {
@ -37,7 +38,7 @@ impl Display for UserStreamState {
#[derive(Debug, Clone, Default, FromRow)] #[derive(Debug, Clone, Default, FromRow)]
pub struct UserStream { pub struct UserStream {
pub id: u64, pub id: Uuid,
pub user_id: u64, pub user_id: u64,
pub starts: DateTime<Utc>, pub starts: DateTime<Utc>,
pub ends: Option<DateTime<Utc>>, pub ends: Option<DateTime<Utc>>,