From 0da9bd996feaa123ebeb46cded32260ffa10a468 Mon Sep 17 00:00:00 2001 From: kieran Date: Fri, 15 Nov 2024 18:26:23 +0000 Subject: [PATCH] feat: publish n96 segments --- Cargo.lock | 15 +- config.yaml | 2 +- db.sql | 2 - dev-setup/db.sql | 2 + route96.toml => dev-setup/route96.toml | 2 +- dev-setup/strfry.conf | 144 ++++++++++++++++++ docker-compose.yml | 11 +- src/blossom.rs | 10 +- src/egress/hls.rs | 1 - src/ingress/file.rs | 1 - src/ingress/mod.rs | 2 - src/ingress/tcp.rs | 1 - src/ingress/test.rs | 1 - src/lib.rs | 3 +- src/mux/hls.rs | 8 +- src/overseer/mod.rs | 7 +- src/overseer/webhook.rs | 2 +- src/overseer/zap_stream.rs | 60 ++++++-- src/pipeline/mod.rs | 2 +- src/pipeline/runner.rs | 30 +++- src/settings.rs | 1 - zap-stream-db/Cargo.lock | 14 ++ zap-stream-db/Cargo.toml | 5 +- .../migrations/20241115120541_init.sql | 2 +- zap-stream-db/src/db.rs | 22 ++- zap-stream-db/src/model.rs | 3 +- 26 files changed, 278 insertions(+), 75 deletions(-) delete mode 100644 db.sql create mode 100644 dev-setup/db.sql rename route96.toml => dev-setup/route96.toml (96%) create mode 100644 dev-setup/strfry.conf diff --git a/Cargo.lock b/Cargo.lock index 503f3ec..0d01e8b 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -1874,7 +1874,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -2702,7 +2702,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3430,6 +3430,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "uuid", ] [[package]] @@ -3511,6 +3512,7 @@ dependencies = [ "stringprep", "thiserror 1.0.57", "tracing", + "uuid", "whoami", ] @@ -3550,6 +3552,7 @@ dependencies = [ "stringprep", "thiserror 1.0.57", "tracing", + "uuid", "whoami", ] @@ -3575,6 +3578,7 @@ dependencies = [ "sqlx-core", "tracing", "url", + "uuid", ] [[package]] @@ -3746,7 +3750,7 @@ dependencies = [ "fastrand", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -4333,9 +4337,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.8.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", "serde", @@ -4816,6 +4820,7 @@ dependencies = [ "chrono", "log", "sqlx", + "uuid", ] [[package]] diff --git a/config.yaml b/config.yaml index b47170a..081dd57 100755 --- a/config.yaml +++ b/config.yaml @@ -35,7 +35,7 @@ overseer: nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" relays: - "ws://localhost:7766" - database: "mysql://root:root@localhost:3368/zap-stream?max_connections=2" + database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2" lnd: address: "https://127.0.0.1:10001" cert: "/home/kieran/.polar/networks/1/volumes/lnd/alice/tls.cert" diff --git a/db.sql b/db.sql deleted file mode 100644 index 46e16ca..0000000 --- a/db.sql +++ /dev/null @@ -1,2 +0,0 @@ -create database route96; -create database zap-stream; \ No newline at end of file diff --git a/dev-setup/db.sql b/dev-setup/db.sql new file mode 100644 index 0000000..adfe77a --- /dev/null +++ b/dev-setup/db.sql @@ -0,0 +1,2 @@ +create database route96; +create database zap_stream; \ No newline at end of file diff --git a/route96.toml b/dev-setup/route96.toml similarity index 96% rename from route96.toml rename to dev-setup/route96.toml index 73eee1d..b02004d 100644 --- a/route96.toml +++ b/dev-setup/route96.toml @@ -1,5 +1,5 @@ # Listen address for webserver -listen = "127.0.0.1:8000" +listen = "0.0.0.0:8000" # Database connection string (MYSQL) database = "mysql://root:root@db:3306/route96" diff --git a/dev-setup/strfry.conf b/dev-setup/strfry.conf new file mode 100644 index 0000000..bd62331 --- /dev/null +++ b/dev-setup/strfry.conf @@ -0,0 +1,144 @@ +## +## Default strfry config +## + +# Directory that contains the strfry LMDB database (restart required) +db = "./strfry-db/" + +dbParams { + # Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required) + maxreaders = 256 + + # Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used) (restart required) + mapsize = 10995116277760 + + # Disables read-ahead when accessing the LMDB mapping. Reduces IO activity when DB size is larger than RAM. (restart required) + noReadAhead = false +} + +events { + # Maximum size of normalised JSON, in bytes + maxEventSize = 65536 + + # Events newer than this will be rejected + rejectEventsNewerThanSeconds = 900 + + # Events older than this will be rejected + rejectEventsOlderThanSeconds = 94608000 + + # Ephemeral events older than this will be rejected + rejectEphemeralEventsOlderThanSeconds = 60 + + # Ephemeral events will be deleted from the DB when older than this + ephemeralEventsLifetimeSeconds = 300 + + # Maximum number of tags allowed + maxNumTags = 2000 + + # Maximum size for tag values, in bytes + maxTagValSize = 1024 +} + +relay { + # Interface to listen on. Use 0.0.0.0 to listen on all interfaces (restart required) + bind = "0.0.0.0" + + # Port to open for the nostr websocket protocol (restart required) + port = 7777 + + # Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required) + nofiles = 1000000 + + # HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case) + realIpHeader = "" + + info { + # NIP-11: Name of this server. Short/descriptive (< 30 characters) + name = "strfry default" + + # NIP-11: Detailed information about relay, free-form + description = "This is a strfry instance." + + # NIP-11: Administrative nostr pubkey, for contact purposes + pubkey = "" + + # NIP-11: Alternative administrative contact (email, website, etc) + contact = "" + + # NIP-11: URL pointing to an image to be used as an icon for the relay + icon = "" + + # List of supported lists as JSON array, or empty string to use default. Example: "[1,2]" + nips = "" + } + + # Maximum accepted incoming websocket frame size (should be larger than max event) (restart required) + maxWebsocketPayloadSize = 131072 + + # Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required) + autoPingSeconds = 55 + + # If TCP keep-alive should be enabled (detect dropped connections to upstream reverse proxy) + enableTcpKeepalive = false + + # How much uninterrupted CPU time a REQ query should get during its DB scan + queryTimesliceBudgetMicroseconds = 10000 + + # Maximum records that can be returned per filter + maxFilterLimit = 500 + + # Maximum number of subscriptions (concurrent REQs) a connection can have open at any time + maxSubsPerConnection = 20 + + writePolicy { + # If non-empty, path to an executable script that implements the writePolicy plugin logic + plugin = "" + } + + compression { + # Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required) + enabled = true + + # Maintain a sliding window buffer for each connection. Improves compression, but uses more memory (restart required) + slidingWindow = true + } + + logging { + # Dump all incoming messages + dumpInAll = false + + # Dump all incoming EVENT messages + dumpInEvents = false + + # Dump all incoming REQ/CLOSE messages + dumpInReqs = false + + # Log performance metrics for initial REQ database scans + dbScanPerf = false + + # Log reason for invalid event rejection? Can be disabled to silence excessive logging + invalidEvents = true + } + + numThreads { + # Ingester threads: route incoming requests, validate events/sigs (restart required) + ingester = 3 + + # reqWorker threads: Handle initial DB scan for events (restart required) + reqWorker = 3 + + # reqMonitor threads: Handle filtering of new events (restart required) + reqMonitor = 3 + + # negentropy threads: Handle negentropy protocol messages (restart required) + negentropy = 2 + } + + negentropy { + # Support negentropy protocol messages + enabled = true + + # Maximum records that sync will process before returning an error + maxSyncEvents = 1000000 + } +} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 520537e..87a08da 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,11 +7,14 @@ services: ports: - "3368:3306" volumes: - - "./db.sql:/docker-entrypoint-initdb.d/00-init.sql" + - "./dev-setup/db.sql:/docker-entrypoint-initdb.d/00-init.sql" relay: - image: scsibug/nostr-rs-relay + image: dockurr/strfry ports: - - "7766:8080" + - "7766:7777" + volumes: + - "relay:/app/strfry-db" + - "./dev-setup/strfry.conf:/etc/strfry.conf" blossom: depends_on: - db @@ -22,7 +25,7 @@ services: - "8881:8000" volumes: - "blossom:/app/data" - - "./route96.toml:/app/config.toml" + - "./dev-setup/route96.toml:/app/config.toml" volumes: db: blossom: diff --git a/src/blossom.rs b/src/blossom.rs index 22a9ac6..0d9c148 100644 --- a/src/blossom.rs +++ b/src/blossom.rs @@ -1,9 +1,9 @@ -use std::collections::HashMap; use anyhow::Result; use base64::Engine; use nostr_sdk::{EventBuilder, JsonUtil, Keys, Kind, Tag, Timestamp}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use std::collections::HashMap; use std::io::SeekFrom; use std::ops::Add; use std::path::PathBuf; @@ -40,7 +40,7 @@ impl Blossom { let mut hash = Sha256::new(); let mut buf: [u8; 1024] = [0; 1024]; f.seek(SeekFrom::Start(0)).await?; - while let Ok(data) = f.read(&mut buf).await { + while let Ok(data) = f.read(&mut buf[..]).await { if data == 0 { break; } @@ -51,11 +51,7 @@ impl Blossom { Ok(hex::encode(hash)) } - pub async fn upload( - &self, - from_file: &PathBuf, - keys: &Keys, - ) -> Result { + pub async fn upload(&self, from_file: &PathBuf, keys: &Keys) -> Result { let mut f = File::open(from_file).await?; let hash = Self::hash_file(&mut f).await?; let auth_event = EventBuilder::new( diff --git a/src/egress/hls.rs b/src/egress/hls.rs index 0e63f8b..682efcd 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -1,6 +1,5 @@ use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; -use std::fmt::Display; use uuid::Uuid; use crate::egress::{Egress, EgressResult}; diff --git a/src/ingress/file.rs b/src/ingress/file.rs index 69f0fbb..2ba8108 100644 --- a/src/ingress/file.rs +++ b/src/ingress/file.rs @@ -1,6 +1,5 @@ use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; -use crate::settings::Settings; use anyhow::Result; use log::info; use std::path::PathBuf; diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index d2ce2d3..bf23c94 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -1,7 +1,5 @@ use crate::overseer::Overseer; use crate::pipeline::runner::PipelineRunner; -use crate::settings::Settings; -use anyhow::Result; use log::{error, info}; use serde::{Deserialize, Serialize}; use std::io::Read; diff --git a/src/ingress/tcp.rs b/src/ingress/tcp.rs index 9c14670..c96473e 100644 --- a/src/ingress/tcp.rs +++ b/src/ingress/tcp.rs @@ -5,7 +5,6 @@ use tokio::net::TcpListener; use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; -use crate::settings::Settings; pub async fn listen(addr: String, overseer: Arc) -> Result<()> { let listener = TcpListener::bind(addr.clone()).await?; diff --git a/src/ingress/test.rs b/src/ingress/test.rs index bd3ad47..2aee756 100644 --- a/src/ingress/test.rs +++ b/src/ingress/test.rs @@ -1,6 +1,5 @@ use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; -use crate::settings::Settings; use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_RGB; diff --git a/src/lib.rs b/src/lib.rs index a475872..f476d2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "zap-stream")] +pub mod blossom; pub mod egress; pub mod ingress; pub mod mux; @@ -5,4 +7,3 @@ pub mod overseer; pub mod pipeline; pub mod settings; pub mod variant; -mod blossom; diff --git a/src/mux/hls.rs b/src/mux/hls.rs index 6ff87e5..bd8e897 100644 --- a/src/mux/hls.rs +++ b/src/mux/hls.rs @@ -1,4 +1,4 @@ -use crate::egress::{EgressResult, NewSegment}; +use crate::egress::NewSegment; use crate::variant::{StreamMapping, VariantStream}; use anyhow::{bail, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ @@ -220,11 +220,13 @@ impl HlsVariant { .find(|a| matches!(*a, HlsVariantStream::Video { .. })) .map_or(Default::default(), |v| v.id().clone()); + // emit result of the previously completed segment, + let prev_seg = self.idx - 1; Ok(NewSegment { variant: video_var, - idx: self.idx - 1, // emit result of the previously completed segment, + idx: prev_seg, duration, - path: PathBuf::from(next_seg_url), + path: PathBuf::from(Self::map_segment_path(&*self.out_dir, &self.name, prev_seg)), }) } diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs index e92e8b6..3048514 100644 --- a/src/overseer/mod.rs +++ b/src/overseer/mod.rs @@ -11,7 +11,6 @@ use crate::variant::video::VideoVariant; use crate::variant::{StreamMapping, VariantStream}; use anyhow::Result; use async_trait::async_trait; -use chrono::Utc; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use std::cmp::PartialEq; use std::path::PathBuf; @@ -66,7 +65,7 @@ pub trait Overseer: Send + Sync { /// This handler is usually used for distribution / billing async fn on_segment( &self, - pipeline: &Uuid, + pipeline_id: &Uuid, variant_id: &Uuid, index: u64, duration: f32, @@ -179,7 +178,7 @@ impl Overseer for StaticOverseer { let vars = get_default_variants(stream_info)?; let var_ids = vars.iter().map(|v| v.id()).collect(); Ok(PipelineConfig { - id: Utc::now().timestamp() as u64, + id: Uuid::new_v4(), variants: vars, egress: vec![ /*EgressType::Recorder(EgressConfig { @@ -199,7 +198,7 @@ impl Overseer for StaticOverseer { async fn on_segment( &self, - pipeline: &Uuid, + pipeline_id: &Uuid, variant_id: &Uuid, index: u64, duration: f32, diff --git a/src/overseer/webhook.rs b/src/overseer/webhook.rs index 86aab8f..dfe58f8 100644 --- a/src/overseer/webhook.rs +++ b/src/overseer/webhook.rs @@ -31,7 +31,7 @@ impl Overseer for WebhookOverseer { async fn on_segment( &self, - pipeline: &Uuid, + pipeline_id: &Uuid, variant_id: &Uuid, index: u64, duration: f32, diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs index c619311..12d33a8 100644 --- a/src/overseer/zap_stream.rs +++ b/src/overseer/zap_stream.rs @@ -1,4 +1,4 @@ -use crate::blossom::Blossom; +use crate::blossom::{BlobDescriptor, Blossom}; use crate::egress::hls::HlsEgress; use crate::egress::EgressConfig; use crate::ingress::ConnectionInfo; @@ -10,6 +10,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use chrono::Utc; use fedimint_tonic_lnd::verrpc::VersionRequest; +use futures_util::FutureExt; use log::info; use nostr_sdk::bitcoin::PrivateKey; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag}; @@ -20,6 +21,8 @@ use std::str::FromStr; use uuid::Uuid; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; +const STREAM_EVENT_KIND: u16 = 30_313; + /// zap.stream NIP-53 overseer pub struct ZapStreamOverseer { db: ZapStreamDb, @@ -36,6 +39,7 @@ impl ZapStreamOverseer { relays: &Vec, ) -> Result { let db = ZapStreamDb::new(db).await?; + db.migrate().await?; let mut lnd = fedimint_tonic_lnd::connect( lnd.address.clone(), @@ -93,22 +97,18 @@ impl Overseer for ZapStreamOverseer { // insert new stream record let mut new_stream = UserStream { - id: 0, + id: Uuid::new_v4(), user_id: uid, starts: Utc::now(), state: UserStreamState::Live, ..Default::default() }; - - let stream_id = self.db.insert_stream(&new_stream).await?; - new_stream.id = stream_id; - let stream_event = publish_stream_event(&new_stream, &self.client).await?; new_stream.event = Some(stream_event.as_json()); - self.db.update_stream(&new_stream).await?; + self.db.insert_stream(&new_stream).await?; Ok(PipelineConfig { - id: stream_id, + id: new_stream.id, variants, egress, }) @@ -116,21 +116,33 @@ impl Overseer for ZapStreamOverseer { async fn on_segment( &self, - pipeline: &Uuid, + pipeline_id: &Uuid, variant_id: &Uuid, index: u64, duration: f32, path: &PathBuf, ) -> Result<()> { let blossom = Blossom::new("http://localhost:8881/"); - let blob = blossom.upload(path, &self.keys).await?; + let a_tag = format!( + "{}:{}:{}", + pipeline_id, + self.keys.public_key.to_hex(), + STREAM_EVENT_KIND + ); + // publish nip94 tagged to stream + let n96 = blob_to_event_builder(&blob)? + .add_tags(Tag::parse(&["a", &a_tag])) + .sign_with_keys(&self.keys)?; + self.client.send_event(n96).await?; + info!("Published N96 segment for {}", a_tag); + Ok(()) } } -pub(super) fn to_event_builder(this: &UserStream) -> Result { +fn stream_to_event_builder(this: &UserStream) -> Result { let mut tags = vec![ Tag::parse(&["d".to_string(), this.id.to_string()])?, Tag::parse(&["status".to_string(), this.state.to_string()])?, @@ -171,13 +183,33 @@ pub(super) fn to_event_builder(this: &UserStream) -> Result { tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?); } } - Ok(EventBuilder::new(Kind::from(30_313), "", tags)) + Ok(EventBuilder::new(Kind::from(STREAM_EVENT_KIND), "", tags)) } -pub(super) async fn publish_stream_event(this: &UserStream, client: &Client) -> Result { - let ev = to_event_builder(this)? +async fn publish_stream_event(this: &UserStream, client: &Client) -> Result { + let ev = stream_to_event_builder(this)? .sign(&client.signer().await?) .await?; client.send_event(ev.clone()).await?; Ok(ev) } + +fn blob_to_event_builder(this: &BlobDescriptor) -> Result { + let tags = if let Some(tags) = this.nip94.as_ref() { + tags.iter() + .map_while(|(k, v)| Tag::parse(&[k, v]).ok()) + .collect() + } else { + let mut tags = vec![ + Tag::parse(&["x", &this.sha256])?, + Tag::parse(&["url", &this.url])?, + Tag::parse(&["size", &this.size.to_string()])?, + ]; + if let Some(m) = this.mime_type.as_ref() { + tags.push(Tag::parse(&["m", m])?) + } + tags + }; + + Ok(EventBuilder::new(Kind::FileMetadata, "", tags)) +} diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 29dffaf..7f822b3 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -45,7 +45,7 @@ impl Display for EgressType { #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct PipelineConfig { - pub id: u64, + pub id: Uuid, /// Transcoded/Copied stream config pub variants: Vec, /// Output muxers diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 8490a77..1df4357 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -7,7 +7,7 @@ use std::time::Instant; use crate::egress::hls::HlsEgress; use crate::egress::recorder::RecorderEgress; -use crate::egress::Egress; +use crate::egress::{Egress, EgressResult}; use crate::ingress::ConnectionInfo; use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use crate::pipeline::{EgressType, PipelineConfig}; @@ -20,7 +20,7 @@ use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, StreamType, }; use itertools::Itertools; -use log::{info, warn}; +use log::{error, info, warn}; use tokio::runtime::Handle; use uuid::Uuid; @@ -116,6 +116,7 @@ impl PipelineRunner { return Ok(()); }; + let mut egress_results = vec![]; for frame in frames { self.frame_ctr += 1; @@ -175,7 +176,8 @@ impl PipelineRunner { // pass new packets to egress for mut pkt in packets { for eg in self.egress.iter_mut() { - eg.process_pkt(pkt, &var.id())?; + let er = eg.process_pkt(pkt, &var.id())?; + egress_results.push(er); } av_packet_free(&mut pkt); } @@ -190,6 +192,20 @@ impl PipelineRunner { av_packet_free(&mut pkt); + // egress results + self.handle.block_on(async { + for er in egress_results { + if let EgressResult::NewSegment(seg) = er { + if let Err(e) = self + .overseer + .on_segment(&config.id, &seg.variant, seg.idx, seg.duration, &seg.path) + .await + { + error!("Failed to process segment: {}", e); + } + } + } + }); let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); if elapsed >= 2f32 { info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed); @@ -230,11 +246,9 @@ impl PipelineRunner { .collect(), }; - let cfg = self.handle.block_on(async { - self.overseer - .start_stream(&self.connection, &i_info) - .await - })?; + let cfg = self + .handle + .block_on(async { self.overseer.start_stream(&self.connection, &i_info).await })?; self.config = Some(cfg); self.info = Some(i_info); diff --git a/src/settings.rs b/src/settings.rs index f186319..e9ea14d 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,4 +1,3 @@ -use crate::pipeline::EgressType; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/zap-stream-db/Cargo.lock b/zap-stream-db/Cargo.lock index 8786ee4..eaa455f 100644 --- a/zap-stream-db/Cargo.lock +++ b/zap-stream-db/Cargo.lock @@ -1274,6 +1274,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "uuid", ] [[package]] @@ -1355,6 +1356,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -1394,6 +1396,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -1419,6 +1422,7 @@ dependencies = [ "sqlx-core", "tracing", "url", + "uuid", ] [[package]] @@ -1644,6 +1648,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -1934,6 +1947,7 @@ dependencies = [ "chrono", "log", "sqlx", + "uuid", ] [[package]] diff --git a/zap-stream-db/Cargo.toml b/zap-stream-db/Cargo.toml index c16dc04..a68f990 100644 --- a/zap-stream-db/Cargo.toml +++ b/zap-stream-db/Cargo.toml @@ -10,5 +10,6 @@ test-pattern = [] [dependencies] anyhow = "^1.0.70" chrono = { version = "0.4.38", features = ["serde"] } -sqlx = { version = "0.8.2", features = ["runtime-tokio", "migrate", "mysql", "chrono"] } -log = "0.4.22" \ No newline at end of file +sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono", "uuid"] } +log = "0.4.22" +uuid = { version = "1.11.0", features = ["v4"] } \ No newline at end of file diff --git a/zap-stream-db/migrations/20241115120541_init.sql b/zap-stream-db/migrations/20241115120541_init.sql index 1e82534..669b3e1 100644 --- a/zap-stream-db/migrations/20241115120541_init.sql +++ b/zap-stream-db/migrations/20241115120541_init.sql @@ -13,7 +13,7 @@ create table user create unique index ix_user_pubkey on user (pubkey); create table user_stream ( - id integer unsigned not null auto_increment primary key, + id UUID not null primary key, user_id integer unsigned not null, starts timestamp not null, ends timestamp, diff --git a/zap-stream-db/src/db.rs b/zap-stream-db/src/db.rs index dab6aa3..dc77c26 100644 --- a/zap-stream-db/src/db.rs +++ b/zap-stream-db/src/db.rs @@ -1,6 +1,5 @@ use crate::UserStream; use anyhow::Result; -use log::info; use sqlx::{MySqlPool, Row}; pub struct ZapStreamDb { @@ -49,17 +48,16 @@ impl ZapStreamDb { } } - pub async fn insert_stream(&self, user_stream: &UserStream) -> Result { - sqlx::query( - "insert into user_stream (user_id, state, starts) values (?, ?, ?) returning id", - ) - .bind(&user_stream.user_id) - .bind(&user_stream.state) - .bind(&user_stream.starts) - .fetch_one(&self.db) - .await? - .try_get(0) - .map_err(anyhow::Error::new) + pub async fn insert_stream(&self, user_stream: &UserStream) -> Result<()> { + sqlx::query("insert into user_stream (id, user_id, state, starts) values (?, ?, ?, ?)") + .bind(&user_stream.id) + .bind(&user_stream.user_id) + .bind(&user_stream.state) + .bind(&user_stream.starts) + .execute(&self.db) + .await?; + + Ok(()) } pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> { diff --git a/zap-stream-db/src/model.rs b/zap-stream-db/src/model.rs index fb56547..8971239 100644 --- a/zap-stream-db/src/model.rs +++ b/zap-stream-db/src/model.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; use sqlx::{FromRow, Type}; use std::fmt::{Display, Formatter}; +use uuid::Uuid; #[derive(Debug, Clone, FromRow)] pub struct User { @@ -37,7 +38,7 @@ impl Display for UserStreamState { #[derive(Debug, Clone, Default, FromRow)] pub struct UserStream { - pub id: u64, + pub id: Uuid, pub user_id: u64, pub starts: DateTime, pub ends: Option>,