From f0a50918a84ef662294d4e6c0e3d3e1fa02bc4d0 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 16 Oct 2024 10:59:28 +0100 Subject: [PATCH] feat: query system --- .gitignore | 3 + Cargo.lock | 21 +++- Cargo.toml | 4 + src/app.rs | 13 ++- src/link.rs | 62 ++++++++---- src/main.rs | 18 ++-- src/note_util.rs | 3 +- src/route/home.rs | 40 ++++---- src/route/mod.rs | 43 ++++----- src/route/stream.rs | 26 ++--- src/services/mod.rs | 4 +- src/services/ndb_wrapper.rs | 119 ++++++++++++++++++----- src/services/profile.rs | 85 ----------------- src/services/query.rs | 158 +++++++++++++++++++++++++++++++ src/stream_info.rs | 2 +- src/widgets/avatar.rs | 35 ++++--- src/widgets/chat.rs | 53 +++++++---- src/widgets/chat_message.rs | 9 +- src/widgets/header.rs | 51 +++++----- src/widgets/mod.rs | 22 ++--- src/widgets/profile.rs | 31 +++--- src/widgets/stream.rs | 34 ++++--- src/widgets/stream_list.rs | 5 +- src/widgets/stream_player.rs | 4 +- src/widgets/video_placeholder.rs | 8 +- 25 files changed, 536 insertions(+), 317 deletions(-) delete mode 100644 src/services/profile.rs create mode 100644 src/services/query.rs diff --git a/.gitignore b/.gitignore index ea8c4bf..adec2cb 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ /target +/lock.mdb +/data.mdb +/.idea \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index fe35670..bc1b26c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,9 +138,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.88" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356" +checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" [[package]] name = "arbitrary" @@ -203,9 +203,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.82" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", @@ -4403,6 +4403,15 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", +] + [[package]] name = "v_frame" version = "0.3.8" @@ -5237,7 +5246,10 @@ checksum = "ec7a2a501ed189703dba8b08142f057e887dfc4b2cc4db2d343ac6376ba3e0b9" name = "zap_stream_app" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "bech32", + "chrono", "eframe", "egui", "egui-video", @@ -5250,6 +5262,7 @@ dependencies = [ "nostrdb", "pretty_env_logger", "tokio", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9e92344..f01e44e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,7 @@ egui_inbox = "0.6.0" bech32 = "0.11.0" libc = "0.2.158" egui-video = { git = "https://github.com/v0l/egui-video.git", rev = "4766d939ce4d34b5a3a57b2fbe750ea10f389f39" } +uuid = { version = "1.11.0", features = ["v4"] } +chrono = "0.4.38" +anyhow = "1.0.89" +async-trait = "0.1.83" diff --git a/src/app.rs b/src/app.rs index 1cbc00e..c50cd2a 100644 --- a/src/app.rs +++ b/src/app.rs @@ -14,13 +14,18 @@ pub struct ZapStreamApp { impl ZapStreamApp { pub fn new(cc: &CreationContext) -> Self { - let client = Client::builder().database(MemoryDatabase::with_opts(Default::default())).build(); + let client = Client::builder() + .database(MemoryDatabase::with_opts(Default::default())) + .build(); let notifications = client.notifications(); let ctx_clone = cc.egui_ctx.clone(); let client_clone = client.clone(); tokio::spawn(async move { - client_clone.add_relay("wss://nos.lol").await.expect("Failed to add relay"); + client_clone + .add_relay("wss://nos.lol") + .await + .expect("Failed to add relay"); client_clone.connect().await; let mut notifications = client_clone.notifications(); while let Ok(_) = notifications.recv().await { @@ -48,8 +53,6 @@ impl App for ZapStreamApp { egui::CentralPanel::default() .frame(app_frame) - .show(ctx, |ui| { - self.router.show(ui) - }); + .show(ctx, |ui| self.router.show(ui)); } } diff --git a/src/link.rs b/src/link.rs index 9ca0d54..7140350 100644 --- a/src/link.rs +++ b/src/link.rs @@ -42,7 +42,13 @@ pub enum NostrLinkType { } impl NostrLink { - pub fn new(hrp: NostrLinkType, id: IdOrStr, kind: Option, author: Option<[u8; 32]>, relays: Vec) -> Self { + pub fn new( + hrp: NostrLinkType, + id: IdOrStr, + kind: Option, + author: Option<[u8; 32]>, + relays: Vec, + ) -> Self { Self { hrp, id, @@ -56,7 +62,14 @@ impl NostrLink { if note.kind() >= 30_000 && note.kind() < 40_000 { Self { hrp: NostrLinkType::Coordinate, - id: IdOrStr::Str(note.get_tag_value("d").unwrap().variant().str().unwrap().to_string()), + id: IdOrStr::Str( + note.get_tag_value("d") + .unwrap() + .variant() + .str() + .unwrap() + .to_string(), + ), kind: Some(note.kind()), author: Some(note.pubkey().clone()), relays: vec![], @@ -82,7 +95,12 @@ impl NostrLink { pub fn to_tag_value(&self) -> String { if self.hrp == NostrLinkType::Coordinate { - format!("{}:{}:{}", self.kind.unwrap(), hex::encode(self.author.unwrap()), self.id) + format!( + "{}:{}:{}", + self.kind.unwrap(), + hex::encode(self.author.unwrap()), + self.id + ) } else { self.id.to_string() } @@ -93,18 +111,17 @@ impl TryInto for &NostrLink { type Error = (); fn try_into(self) -> Result { match self.hrp { - NostrLinkType::Coordinate => { - Ok(Filter::new() - .tags([match self.id { + NostrLinkType::Coordinate => Ok(Filter::new() + .tags( + [match self.id { IdOrStr::Str(ref s) => s.to_owned(), - IdOrStr::Id(ref i) => hex::encode(i) - }], 'd') - .build()) - } - NostrLinkType::Event | NostrLinkType::Note => { - Ok(Filter::new().build()) - } - _ => Err(()) + IdOrStr::Id(ref i) => hex::encode(i), + }], + 'd', + ) + .build()), + NostrLinkType::Event | NostrLinkType::Note => Ok(Filter::new().build()), + _ => Err(()), } } } @@ -133,12 +150,17 @@ impl Display for NostrLink { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.hrp { NostrLinkType::Note | NostrLinkType::PrivateKey | NostrLinkType::PublicKey => { - Ok(bech32::encode_to_fmt::(f, self.hrp.to_hrp(), match &self.id { - IdOrStr::Str(s) => s.as_bytes(), - IdOrStr::Id(i) => i - }).map_err(|e| std::fmt::Error)?) + Ok(bech32::encode_to_fmt::( + f, + self.hrp.to_hrp(), + match &self.id { + IdOrStr::Str(s) => s.as_bytes(), + IdOrStr::Id(i) => i, + }, + ) + .map_err(|e| std::fmt::Error)?) } - NostrLinkType::Event | NostrLinkType::Profile | NostrLinkType::Coordinate => todo!() + NostrLinkType::Event | NostrLinkType::Profile | NostrLinkType::Coordinate => todo!(), } } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index c66d754..f0ac737 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,28 @@ -use eframe::Renderer; use crate::app::ZapStreamApp; +use eframe::Renderer; use egui::Vec2; -use nostrdb::Note; mod app; -pub mod widgets; -mod services; -mod route; -mod note_util; mod link; +mod note_util; +mod route; +mod services; mod stream_info; +pub mod widgets; #[tokio::main] async fn main() { pretty_env_logger::init(); + // TODO: redirect FFMPEG logs to log file (noisy) + let mut options = eframe::NativeOptions::default(); options.renderer = Renderer::Glow; - options.viewport = options.viewport - .with_inner_size(Vec2::new(360., 720.)); + options.viewport = options.viewport.with_inner_size(Vec2::new(360., 720.)); let _res = eframe::run_native( "zap.stream", options, Box::new(move |cc| Ok(Box::new(ZapStreamApp::new(cc)))), ); -} \ No newline at end of file +} diff --git a/src/note_util.rs b/src/note_util.rs index 3903a43..ce1f974 100644 --- a/src/note_util.rs +++ b/src/note_util.rs @@ -35,7 +35,6 @@ impl<'a> NoteUtil for Note<'a> { } } - #[derive(Debug, Clone)] pub struct TagIterBorrow<'a> { tag: &'a Tag<'a>, @@ -68,4 +67,4 @@ impl<'a> Iterator for TagIterBorrow<'a> { } #[derive(Eq, PartialEq)] -pub struct OwnedNote(pub u64); \ No newline at end of file +pub struct OwnedNote(pub u64); diff --git a/src/route/home.rs b/src/route/home.rs index ef6e256..0d5bd01 100644 --- a/src/route/home.rs +++ b/src/route/home.rs @@ -1,47 +1,41 @@ use crate::note_util::OwnedNote; use crate::route::RouteServices; -use crate::services::ndb_wrapper::NDBWrapper; +use crate::services::ndb_wrapper::{NDBWrapper, SubWrapper}; use crate::widgets; use crate::widgets::NostrWidget; use egui::{Response, Ui, Widget}; use log::{error, info}; -use nostrdb::{Filter, Ndb, Note, NoteKey, Subscription, Transaction}; +use nostrdb::{Filter, Ndb, Note, NoteKey, Transaction}; pub struct HomePage { - sub: Subscription, + sub: SubWrapper, events: Vec, - ndb: NDBWrapper, } impl HomePage { pub fn new(ndb: &NDBWrapper, tx: &Transaction) -> Self { - let filter = [ - Filter::new() - .kinds([30_311]) - .limit(10) - .build() - ]; - let (sub, events) = ndb.subscribe_with_results(&filter, tx, 100); + let filter = [Filter::new().kinds([30_311]).limit(10).build()]; + let (sub, events) = ndb.subscribe_with_results("home-page", &filter, tx, 100); Self { sub, - events: events.iter().map(|e| OwnedNote(e.note_key.as_u64())).collect(), - ndb: ndb.clone(), + events: events + .iter() + .map(|e| OwnedNote(e.note_key.as_u64())) + .collect(), } } } -impl Drop for HomePage { - fn drop(&mut self) { - self.ndb.unsubscribe(self.sub); - } -} - impl NostrWidget for HomePage { fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response { - let new_notes = services.ndb.poll(self.sub, 100); - new_notes.iter().for_each(|n| self.events.push(OwnedNote(n.as_u64()))); + let new_notes = services.ndb.poll(&self.sub, 100); + new_notes + .iter() + .for_each(|n| self.events.push(OwnedNote(n.as_u64()))); - let events: Vec> = self.events.iter() + let events: Vec> = self + .events + .iter() .map(|n| services.ndb.get_note_by_key(services.tx, NoteKey::new(n.0))) .map_while(|f| f.map_or(None, |f| Some(f))) .collect(); @@ -49,4 +43,4 @@ impl NostrWidget for HomePage { info!("HomePage events: {}", events.len()); widgets::StreamList::new(&events, &services).ui(ui) } -} \ No newline at end of file +} diff --git a/src/route/mod.rs b/src/route/mod.rs index 0aaf068..06122d5 100644 --- a/src/route/mod.rs +++ b/src/route/mod.rs @@ -4,7 +4,6 @@ use crate::route; use crate::route::home::HomePage; use crate::route::stream::StreamPage; use crate::services::ndb_wrapper::NDBWrapper; -use crate::services::profile::ProfileService; use crate::widgets::{Header, NostrWidget, StreamList}; use egui::{Context, Response, ScrollArea, Ui, Widget}; use egui_inbox::{UiInbox, UiInboxSender}; @@ -15,8 +14,8 @@ use nostr_sdk::{Client, Kind, PublicKey}; use nostrdb::{Filter, Ndb, Note, Transaction}; use std::borrow::Borrow; -mod stream; mod home; +mod stream; #[derive(PartialEq)] pub enum Routes { @@ -45,7 +44,6 @@ pub struct Router { router: UiInbox, ctx: Context, - profile_service: ProfileService, ndb: NDBWrapper, login: Option<[u8; 32]>, client: Client, @@ -58,7 +56,6 @@ impl Router { current_widget: None, router: UiInbox::new(), ctx: ctx.clone(), - profile_service: ProfileService::new(client.clone(), ctx.clone()), ndb: NDBWrapper::new(ctx.clone(), ndb.clone(), client.clone()), client, login: None, @@ -75,7 +72,7 @@ impl Router { let w = StreamPage::new_from_link(&self.ndb, tx, link.clone()); self.current_widget = Some(Box::new(w)); } - _ => warn!("Not implemented") + _ => warn!("Not implemented"), } self.current = route; } @@ -87,10 +84,8 @@ impl Router { while let Some(r) = self.router.read(ui).next() { if let Routes::Action(a) = &r { match a { - RouteAction::Login(k) => { - self.login = Some(k.clone()) - } - _ => info!("Not implemented") + RouteAction::Login(k) => self.login = Some(k.clone()), + _ => info!("Not implemented"), } } else { self.load_widget(r, &tx); @@ -104,32 +99,32 @@ impl Router { let svc = RouteServices { context: self.ctx.clone(), - profile: &self.profile_service, router: self.router.sender(), - ndb: self.ndb.clone(), + ndb: &self.ndb, tx: &tx, login: &self.login, }; // display app - ScrollArea::vertical().show(ui, |ui| { - ui.add(Header::new(&svc)); - if let Some(w) = self.current_widget.as_mut() { - w.render(ui, &svc) - } else { - ui.label("No widget") - } - }).inner + ScrollArea::vertical() + .show(ui, |ui| { + Header::new().render(ui, &svc); + if let Some(w) = self.current_widget.as_mut() { + w.render(ui, &svc) + } else { + ui.label("No widget") + } + }) + .inner } } pub struct RouteServices<'a> { - pub context: Context, //cloned + pub context: Context, //cloned pub router: UiInboxSender, //cloned - pub ndb: NDBWrapper, //cloned - pub profile: &'a ProfileService, //ref - pub tx: &'a Transaction, //ref + pub ndb: &'a NDBWrapper, //ref + pub tx: &'a Transaction, //ref pub login: &'a Option<[u8; 32]>, //ref } @@ -139,4 +134,4 @@ impl<'a> RouteServices<'a> { warn!("Failed to navigate"); } } -} \ No newline at end of file +} diff --git a/src/route/stream.rs b/src/route/stream.rs index 2b2fdac..2ececee 100644 --- a/src/route/stream.rs +++ b/src/route/stream.rs @@ -1,7 +1,7 @@ use crate::link::NostrLink; use crate::note_util::{NoteUtil, OwnedNote}; use crate::route::RouteServices; -use crate::services::ndb_wrapper::NDBWrapper; +use crate::services::ndb_wrapper::{NDBWrapper, SubWrapper}; use crate::stream_info::StreamInfo; use crate::widgets::{Chat, NostrWidget, StreamPlayer}; use egui::{Color32, Label, Response, RichText, TextWrapMode, Ui, Widget}; @@ -13,20 +13,20 @@ pub struct StreamPage { event: Option, player: Option, chat: Option, - sub: Subscription, + sub: SubWrapper, } impl StreamPage { pub fn new_from_link(ndb: &NDBWrapper, tx: &Transaction, link: NostrLink) -> Self { let f: Filter = link.borrow().try_into().unwrap(); - let f = [ - f.limit_mut(1) - ]; - let (sub, events) = ndb.subscribe_with_results(&f, tx, 1); + let f = [f.limit_mut(1)]; + let (sub, events) = ndb.subscribe_with_results("streams", &f, tx, 1); Self { link, sub, - event: events.first().map_or(None, |n| Some(OwnedNote(n.note_key.as_u64()))), + event: events + .first() + .map_or(None, |n| Some(OwnedNote(n.note_key.as_u64()))), chat: None, player: None, } @@ -35,13 +35,15 @@ impl StreamPage { impl NostrWidget for StreamPage { fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response { - let poll = services.ndb.poll(self.sub, 1); + let poll = services.ndb.poll(&self.sub, 1); if let Some(k) = poll.first() { self.event = Some(OwnedNote(k.as_u64())) } let event = if let Some(k) = &self.event { - services.ndb.get_note_by_key(services.tx, NoteKey::new(k.0)) + services + .ndb + .get_note_by_key(services.tx, NoteKey::new(k.0)) .map_or(None, |f| Some(f)) } else { None @@ -62,8 +64,8 @@ impl NostrWidget for StreamPage { Some(s) => s.variant().str().unwrap_or("Unknown"), None => "Unknown", }) - .size(16.) - .color(Color32::WHITE); + .size(16.) + .color(Color32::WHITE); ui.add(Label::new(title).wrap_mode(TextWrapMode::Truncate)); if self.chat.is_none() { @@ -80,4 +82,4 @@ impl NostrWidget for StreamPage { ui.label("Loading..") } } -} \ No newline at end of file +} diff --git a/src/services/mod.rs b/src/services/mod.rs index 7b874d9..08975b8 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,2 +1,2 @@ -pub mod profile; -pub mod ndb_wrapper; \ No newline at end of file +pub mod ndb_wrapper; +pub mod query; diff --git a/src/services/ndb_wrapper.rs b/src/services/ndb_wrapper.rs index f23c3bc..39686cd 100644 --- a/src/services/ndb_wrapper.rs +++ b/src/services/ndb_wrapper.rs @@ -1,15 +1,44 @@ +use crate::services::query::QueryManager; use egui::CursorIcon::Default; use log::{info, warn}; use nostr_sdk::secp256k1::Context; use nostr_sdk::{Client, JsonUtil, Kind, RelayPoolNotification}; -use nostrdb::{Error, Filter, Ndb, Note, NoteKey, ProfileRecord, QueryResult, Subscription, Transaction}; +use nostrdb::{ + Error, Filter, Ndb, NdbProfile, Note, NoteKey, ProfileRecord, QueryResult, Subscription, + Transaction, +}; +use std::sync::{Arc, RwLock}; use tokio::sync::mpsc::UnboundedSender; -#[derive(Debug, Clone)] pub struct NDBWrapper { ctx: egui::Context, ndb: Ndb, client: Client, + query_manager: QueryManager, +} + +/// Automatic cleanup for subscriptions +pub struct SubWrapper { + ndb: Ndb, + subscription: Subscription, +} + +impl SubWrapper { + pub fn new(ndb: Ndb, subscription: Subscription) -> Self { + Self { ndb, subscription } + } +} + +impl Into for &SubWrapper { + fn into(self) -> u64 { + self.subscription.id() + } +} + +impl Drop for SubWrapper { + fn drop(&mut self) { + self.ndb.unsubscribe(self.subscription).unwrap() + } } impl NDBWrapper { @@ -34,49 +63,93 @@ impl NDBWrapper { } } }); - Self { ctx, ndb, client } + let qm = QueryManager::new(client.clone()); + + Self { + ctx, + ndb, + client, + query_manager: qm, + } } pub fn start_transaction(&self) -> Transaction { Transaction::new(&self.ndb).unwrap() } - pub fn subscribe(&self, filters: &[Filter]) -> Subscription { + pub fn subscribe(&self, id: &str, filters: &[Filter]) -> SubWrapper { let sub = self.ndb.subscribe(filters).unwrap(); - let c_clone = self.client.clone(); - let filters = filters.iter().map(|f| nostr_sdk::Filter::from_json(f.json().unwrap()).unwrap()).collect(); - let id_clone = sub.id(); - tokio::spawn(async move { - let nostr_sub = c_clone.subscribe(filters, None).await.unwrap(); - info!("Sub mapping {}->{}", id_clone, nostr_sub.id()) - }); - sub + // very lazy conversion + let filters: Vec = filters + .iter() + .map(|f| nostr_sdk::Filter::from_json(f.json().unwrap()).unwrap()) + .collect(); + self.query_manager.queue_query(id, filters); + SubWrapper::new(self.ndb.clone(), sub) } - pub fn unsubscribe(&self, sub: Subscription) { - self.ndb.unsubscribe(sub).unwrap() + pub fn unsubscribe(&self, sub: &SubWrapper) { + self.ndb.unsubscribe(sub.subscription).unwrap() } - pub fn subscribe_with_results<'a>(&self, filters: &[Filter], tx: &'a Transaction, max_results: i32) -> (Subscription, Vec>) { - let sub = self.subscribe(filters); + pub fn subscribe_with_results<'a>( + &self, + id: &str, + filters: &[Filter], + tx: &'a Transaction, + max_results: i32, + ) -> (SubWrapper, Vec>) { + let sub = self.subscribe(id, filters); let q = self.query(tx, filters, max_results); (sub, q) } - - pub fn query<'a>(&self, tx: &'a Transaction, filters: &[Filter], max_results: i32) -> Vec> { + pub fn query<'a>( + &self, + tx: &'a Transaction, + filters: &[Filter], + max_results: i32, + ) -> Vec> { self.ndb.query(tx, filters, max_results).unwrap() } - pub fn poll(&self, sub: Subscription, max_results: u32) -> Vec { - self.ndb.poll_for_notes(sub, max_results) + pub fn poll(&self, sub: &SubWrapper, max_results: u32) -> Vec { + self.ndb.poll_for_notes(sub.subscription, max_results) } - pub fn get_note_by_key<'a>(&self, tx: &'a Transaction, key: NoteKey) -> Result, Error> { + pub fn get_note_by_key<'a>( + &self, + tx: &'a Transaction, + key: NoteKey, + ) -> Result, Error> { self.ndb.get_note_by_key(tx, key) } - pub fn get_profile_by_pubkey<'a>(&self, tx: &'a Transaction, pubkey: &[u8; 32]) -> Result, Error> { + pub fn get_profile_by_pubkey<'a>( + &self, + tx: &'a Transaction, + pubkey: &[u8; 32], + ) -> Result, Error> { self.ndb.get_profile_by_pubkey(tx, pubkey) } -} \ No newline at end of file + + pub fn fetch_profile<'a>( + &self, + tx: &'a Transaction, + pubkey: &[u8; 32], + ) -> (Option>, Option) { + let p = self + .get_profile_by_pubkey(tx, pubkey) + .map_or(None, |p| p.record().profile()); + + let sub = if p.is_none() { + Some(self.subscribe( + "profile", + &[Filter::new().kinds([0]).authors([pubkey]).build()], + )) + } else { + None + }; + (p, sub) + } +} diff --git a/src/services/profile.rs b/src/services/profile.rs deleted file mode 100644 index 744f00d..0000000 --- a/src/services/profile.rs +++ /dev/null @@ -1,85 +0,0 @@ -use egui::load::BytesLoader; -use log::{info, warn}; -use nostr_sdk::prelude::hex; -use nostr_sdk::{Client, Metadata, PublicKey}; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::Mutex; - -pub struct ProfileService { - client: Client, - pub profiles: Arc>>>, - ctx: egui::Context, - request_profile: UnboundedSender<[u8; 32]>, -} - -struct Loader { - client: Client, - ctx: egui::Context, - profiles: Arc>>>, - queue: UnboundedReceiver<[u8; 32]>, -} - -impl Loader { - pub async fn run(&mut self) { - while let Some(key) = self.queue.recv().await { - let mut profiles = self.profiles.lock().await; - if !profiles.contains_key(&key) { - info!("Requesting profile {}", hex::encode(key)); - match self.client.fetch_metadata(PublicKey::from_slice(&key).unwrap(), - Some(Duration::from_secs(3))).await { - Ok(meta) => { - profiles.insert(key, Some(meta)); - self.ctx.request_repaint(); - } - Err(e) => { - warn!("Error getting metadata: {}", e); - } - } - } - } - } -} - -impl ProfileService { - pub fn new(client: Client, ctx: egui::Context) -> ProfileService - { - let profiles = Arc::new(Mutex::new(HashMap::new())); - let (tx, rx) = unbounded_channel(); - let mut loader = Loader { - client: client.clone(), - ctx: ctx.clone(), - profiles: profiles.clone(), - queue: rx, - }; - - tokio::spawn(async move { - loader.run().await; - }); - - Self { - client, - ctx, - profiles, - request_profile: tx, - } - } - - pub fn get_profile(&self, public_key: &[u8; 32]) -> Option { - if let Ok(profiles) = self.profiles.try_lock() { - return if let Some(p) = profiles.get(public_key) { - if let Some(p) = p { - Some(p.clone()) - } else { - None - } - } else { - self.request_profile.send(*public_key).expect("Failed request"); - None - }; - } - None - } -} \ No newline at end of file diff --git a/src/services/query.rs b/src/services/query.rs new file mode 100644 index 0000000..8f01ad3 --- /dev/null +++ b/src/services/query.rs @@ -0,0 +1,158 @@ +use anyhow::Error; +use chrono::Utc; +use log::{error, info}; +use nostr_sdk::prelude::StreamExt; +use nostr_sdk::{Client, Filter, SubscriptionId}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use uuid::Uuid; + +#[async_trait::async_trait] +pub trait QueryClient { + async fn subscribe(&self, id: &str, filters: &[QueryFilter]) -> Result<(), Error>; +} + +pub type QueryFilter = Filter; + +pub struct Query { + pub id: String, + queue: HashSet, + traces: HashSet, +} + +#[derive(Hash, Eq, PartialEq, Debug)] +pub struct QueryTrace { + /// Subscription id on the relay + pub id: Uuid, + /// Filters associated with this subscription + pub filters: Vec, + /// When the query was created + pub queued: u64, + /// When the query was sent to the relay + pub sent: Option, + /// When EOSE was received + pub eose: Option, +} + +impl Query { + pub fn new(id: &str) -> Self { + Self { + id: id.to_string(), + queue: HashSet::new(), + traces: HashSet::new(), + } + } + + /// Add filters to query + pub fn add(&mut self, filter: Vec) { + for f in filter { + self.queue.insert(f); + } + } + + /// Return next query batch + pub fn next(&mut self) -> Option { + let next: Vec = self.queue.drain().collect(); + if next.len() == 0 { + return None; + } + let now = Utc::now(); + let id = Uuid::new_v4(); + Some(QueryTrace { + id, + filters: next, + queued: now.timestamp() as u64, + sent: None, + eose: None, + }) + } +} + +struct QueueDefer { + id: String, + filters: Vec, +} + +pub struct QueryManager { + client: C, + queries: Arc>>, + queue_into_queries: UnboundedSender, + sender: JoinHandle<()>, +} + +impl QueryManager +where + C: QueryClient + Clone + Send + Sync + 'static, +{ + pub(crate) fn new(client: C) -> Self { + let queries = Arc::new(RwLock::new(HashMap::new())); + let (tx, mut rx) = unbounded_channel::(); + Self { + client: client.clone(), + queries: queries.clone(), + queue_into_queries: tx, + sender: tokio::spawn(async move { + loop { + { + let mut q = queries.write().await; + while let Ok(x) = rx.try_recv() { + Self::push_filters(&mut q, &x.id, x.filters); + } + for (k, v) in q.iter_mut() { + if let Some(qt) = v.next() { + info!("Sending trace: {:?}", qt); + match client + .subscribe(&qt.id.to_string(), qt.filters.as_slice()) + .await + { + Ok(_) => {} + Err(e) => { + error!("Failed to subscribe to query filters: {}", e); + } + } + } + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }), + } + } + + pub async fn query(&mut self, id: &str, filters: F) + where + F: Into>, + { + let mut qq = self.queries.write().await; + Self::push_filters(&mut qq, id, filters.into()); + } + + fn push_filters(qq: &mut HashMap, id: &str, filters: Vec) { + if let Some(q) = qq.get_mut(id) { + q.add(filters.into()); + } else { + let mut q = Query::new(id); + q.add(filters.into()); + qq.insert(id.to_string(), q); + } + } + + pub fn queue_query(&self, id: &str, filters: F) + where + F: Into>, + { + } +} + +#[async_trait::async_trait] +impl QueryClient for Client { + async fn subscribe(&self, id: &str, filters: &[QueryFilter]) -> Result<(), Error> { + self.subscribe_with_id(SubscriptionId::new(id), filters.into(), None) + .await?; + Ok(()) + } +} diff --git a/src/stream_info.rs b/src/stream_info.rs index a63ea37..4f41c5e 100644 --- a/src/stream_info.rs +++ b/src/stream_info.rs @@ -35,4 +35,4 @@ impl<'a> StreamInfo for Note<'a> { None } } -} \ No newline at end of file +} diff --git a/src/widgets/avatar.rs b/src/widgets/avatar.rs index c677376..7f40945 100644 --- a/src/widgets/avatar.rs +++ b/src/widgets/avatar.rs @@ -1,26 +1,35 @@ -use crate::services::profile::ProfileService; +use crate::route::RouteServices; +use crate::services::ndb_wrapper::SubWrapper; use egui::{Color32, Image, Rect, Response, Rounding, Sense, Ui, Vec2, Widget}; pub struct Avatar<'a> { image: Option>, + sub: Option, } impl<'a> Avatar<'a> { pub fn new(img: Image<'a>) -> Self { - Self { image: Some(img) } + Self { + image: Some(img), + sub: None, + } } pub fn new_optional(img: Option>) -> Self { - Self { image: img } + Self { + image: img, + sub: None, + } } - pub fn public_key(svc: &'a ProfileService, pk: &[u8; 32]) -> Self { - if let Some(meta) = svc.get_profile(pk) { - if let Some(img) = &meta.picture { - return Self { image: Some(Image::from_uri(img.clone())) }; - } + pub fn pubkey(pk: &[u8; 32], svc: &RouteServices<'a>) -> Self { + let (img, sub) = svc.ndb.fetch_profile(svc.tx, pk); + Self { + image: img + .map_or(None, |p| p.picture()) + .map(|p| Image::from_uri(p)), + sub, } - Self { image: None } } pub fn max_size(mut self, size: f32) -> Self { @@ -45,16 +54,14 @@ impl<'a> Avatar<'a> { impl<'a> Widget for Avatar<'a> { fn ui(self, ui: &mut Ui) -> Response { match self.image { - Some(img) => { - img.rounding(Rounding::same(ui.available_height())).ui(ui) - } + Some(img) => img.rounding(Rounding::same(ui.available_height())).ui(ui), None => { let h = ui.available_height(); let rnd = Rounding::same(h); - let (response, painter) = ui.allocate_painter(Vec2::new(h,h), Sense::click()); + let (response, painter) = ui.allocate_painter(Vec2::new(h, h), Sense::click()); painter.rect_filled(Rect::EVERYTHING, rnd, Color32::from_rgb(200, 200, 200)); response } } } -} \ No newline at end of file +} diff --git a/src/widgets/chat.rs b/src/widgets/chat.rs index 0835521..cab9037 100644 --- a/src/widgets/chat.rs +++ b/src/widgets/chat.rs @@ -1,17 +1,16 @@ use crate::link::NostrLink; use crate::note_util::OwnedNote; use crate::route::RouteServices; -use crate::services::ndb_wrapper::NDBWrapper; +use crate::services::ndb_wrapper::{NDBWrapper, SubWrapper}; use crate::widgets::chat_message::ChatMessage; use crate::widgets::NostrWidget; use egui::{Response, ScrollArea, Ui, Widget}; -use nostrdb::{Filter, Note, NoteKey, Subscription, Transaction}; -use std::borrow::Borrow; +use nostrdb::{Filter, Note, NoteKey, Transaction}; pub struct Chat { link: NostrLink, events: Vec, - sub: Subscription, + sub: SubWrapper, } impl Chat { @@ -22,33 +21,45 @@ impl Chat { .build(); let filter = [filter]; - let (sub, events) = ndb.subscribe_with_results(&filter, tx, 500); + let (sub, events) = ndb.subscribe_with_results("live-chat", &filter, tx, 500); Self { link, sub, - events: events.iter().map(|n| OwnedNote(n.note_key.as_u64())).collect(), + events: events + .iter() + .map(|n| OwnedNote(n.note_key.as_u64())) + .collect(), } } } impl NostrWidget for Chat { fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response { - let poll = services.ndb.poll(self.sub, 500); - poll.iter().for_each(|n| self.events.push(OwnedNote(n.as_u64()))); + let poll = services.ndb.poll(&self.sub, 500); + poll.iter() + .for_each(|n| self.events.push(OwnedNote(n.as_u64()))); - let events: Vec = self.events.iter().map_while(|n| - services.ndb - .get_note_by_key(services.tx, NoteKey::new(n.0)) - .map_or(None, |n| Some(n)) - ).collect(); + let events: Vec = self + .events + .iter() + .map_while(|n| { + services + .ndb + .get_note_by_key(services.tx, NoteKey::new(n.0)) + .map_or(None, |n| Some(n)) + }) + .collect(); - ScrollArea::vertical().show(ui, |ui| { - ui.vertical(|ui| { - for ev in events { - ChatMessage::new(&ev, services).ui(ui); - } - }).response - }).inner + ScrollArea::vertical() + .show(ui, |ui| { + ui.vertical(|ui| { + for ev in events { + ChatMessage::new(&ev, services).ui(ui); + } + }) + .response + }) + .inner } -} \ No newline at end of file +} diff --git a/src/widgets/chat_message.rs b/src/widgets/chat_message.rs index 2d66c94..25b71aa 100644 --- a/src/widgets/chat_message.rs +++ b/src/widgets/chat_message.rs @@ -20,12 +20,11 @@ impl<'a> Widget for ChatMessage<'a> { ui.horizontal(|ui| { ui.spacing_mut().item_spacing = Vec2::new(8., 2.); let author = self.ev.pubkey(); - Profile::new(author, self.services) - .size(24.) - .ui(ui); + Profile::new(author, self.services).size(24.).ui(ui); let content = self.ev.content(); ui.label(content); - }).response + }) + .response } -} \ No newline at end of file +} diff --git a/src/widgets/header.rs b/src/widgets/header.rs index f6763c2..dc7595a 100644 --- a/src/widgets/header.rs +++ b/src/widgets/header.rs @@ -1,38 +1,43 @@ use crate::route::{RouteServices, Routes}; use crate::widgets::avatar::Avatar; +use crate::widgets::NostrWidget; use eframe::emath::Align; use eframe::epaint::Vec2; use egui::{Frame, Image, Layout, Margin, Response, Sense, Ui, Widget}; -use nostr_sdk::util::hex; -pub struct Header<'a> { - services: &'a RouteServices<'a>, -} +pub struct Header; -impl<'a> Header<'a> { - pub fn new(services: &'a RouteServices) -> Self { - Self { services } +impl Header { + pub fn new() -> Self { + Self {} } } -impl Widget for Header<'_> { - fn ui(self, ui: &mut Ui) -> Response { - let login: [u8; 32] = hex::decode("63fe6318dc58583cfe16810f86dd09e18bfd76aabc24a0081ce2856f330504ed").unwrap().try_into().unwrap(); +impl NostrWidget for Header { + fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response { let logo_bytes = include_bytes!("../logo.svg"); Frame::none() .outer_margin(Margin::symmetric(16., 8.)) .show(ui, |ui| { - ui.allocate_ui_with_layout(Vec2::new(ui.available_width(), 32.), Layout::left_to_right(Align::Center), |ui| { - ui.style_mut() - .spacing.item_spacing.x = 16.; - if Image::from_bytes("logo.svg", logo_bytes) - .max_height(22.62) - .sense(Sense::click()) - .ui(ui).clicked() { - self.services.navigate(Routes::HomePage); - } - ui.add(Avatar::public_key(&self.services.profile, &login)); - }) - }).response + ui.allocate_ui_with_layout( + Vec2::new(ui.available_width(), 32.), + Layout::left_to_right(Align::Center), + |ui| { + ui.style_mut().spacing.item_spacing.x = 16.; + if Image::from_bytes("logo.svg", logo_bytes) + .max_height(22.62) + .sense(Sense::click()) + .ui(ui) + .clicked() + { + services.navigate(Routes::HomePage); + } + if let Some(pk) = services.login { + ui.add(Avatar::pubkey(pk, services)); + } + }, + ) + }) + .response } -} \ No newline at end of file +} diff --git a/src/widgets/mod.rs b/src/widgets/mod.rs index 2eb42f3..519346c 100644 --- a/src/widgets/mod.rs +++ b/src/widgets/mod.rs @@ -1,24 +1,24 @@ -mod header; -mod stream; -mod stream_list; mod avatar; -mod stream_player; -mod video_placeholder; mod chat; mod chat_message; +mod header; mod profile; +mod stream; +mod stream_list; +mod stream_player; +mod video_placeholder; -use egui::{Response, Ui}; use crate::route::RouteServices; +use egui::{Response, Ui}; pub trait NostrWidget { fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response; } pub use self::avatar::Avatar; -pub use self::header::Header; -pub use self::stream_list::StreamList; -pub use self::video_placeholder::VideoPlaceholder; -pub use self::stream_player::StreamPlayer; -pub use self::profile::Profile; pub use self::chat::Chat; +pub use self::header::Header; +pub use self::profile::Profile; +pub use self::stream_list::StreamList; +pub use self::stream_player::StreamPlayer; +pub use self::video_placeholder::VideoPlaceholder; diff --git a/src/widgets/profile.rs b/src/widgets/profile.rs index 133181b..d40a586 100644 --- a/src/widgets/profile.rs +++ b/src/widgets/profile.rs @@ -1,4 +1,5 @@ use crate::route::RouteServices; +use crate::services::ndb_wrapper::SubWrapper; use crate::widgets::Avatar; use egui::{Color32, Image, Label, Response, RichText, TextWrapMode, Ui, Widget}; use nostrdb::NdbProfile; @@ -7,15 +8,18 @@ pub struct Profile<'a> { size: f32, pubkey: &'a [u8; 32], profile: Option>, + sub: Option, } impl<'a> Profile<'a> { pub fn new(pubkey: &'a [u8; 32], services: &'a RouteServices<'a>) -> Self { - let p = services.ndb.get_profile_by_pubkey(services.tx, &pubkey) - .map(|f| f.record().profile()) - .map_or(None, |r| r); - - Self { pubkey, size: 40., profile: p } + let (p, sub) = services.ndb.fetch_profile(services.tx, pubkey); + Self { + pubkey, + size: 40., + profile: p, + sub, + } } pub fn size(self, size: f32) -> Self { @@ -28,14 +32,17 @@ impl<'a> Widget for Profile<'a> { ui.horizontal(|ui| { ui.spacing_mut().item_spacing.x = 8.; - let img = self.profile.map_or(None, |f| f.picture().map(|f| Image::from_uri(f))); + let img = self + .profile + .map_or(None, |f| f.picture().map(|f| Image::from_uri(f))); ui.add(Avatar::new_optional(img).size(self.size)); - let name = self.profile.map_or("Nostrich", |f| f.name().map_or("Nostrich", |f| f)); - let name = RichText::new(name) - .size(13.) - .color(Color32::WHITE); + let name = self + .profile + .map_or("Nostrich", |f| f.name().map_or("Nostrich", |f| f)); + let name = RichText::new(name).size(13.).color(Color32::WHITE); ui.add(Label::new(name).wrap_mode(TextWrapMode::Truncate)); - }).response + }) + .response } -} \ No newline at end of file +} diff --git a/src/widgets/stream.rs b/src/widgets/stream.rs index d896da3..02bc8a5 100644 --- a/src/widgets/stream.rs +++ b/src/widgets/stream.rs @@ -20,24 +20,29 @@ impl<'a> StreamEvent<'a> { Some(i) => match i.variant().str() { Some(i) => Some(Image::from_uri(i)), None => None, - } + }, None => None, }; - Self { event, picture: cover, services } + Self { + event, + picture: cover, + services, + } } } impl Widget for StreamEvent<'_> { fn ui(self, ui: &mut Ui) -> Response { ui.vertical(|ui| { - ui.style_mut() - .spacing.item_spacing = Vec2::new(12., 16.); + ui.style_mut().spacing.item_spacing = Vec2::new(12., 16.); - let host = match self.event.find_tag_value(|t| t[0].variant().str() == Some("p") && t[3].variant().str() == Some("host")) { + let host = match self.event.find_tag_value(|t| { + t[0].variant().str() == Some("p") && t[3].variant().str() == Some("host") + }) { Some(t) => match t.variant() { NdbStrVariant::Id(i) => i, NdbStrVariant::Str(s) => self.event.pubkey(), - } - None => self.event.pubkey() + }, + None => self.event.pubkey(), }; let w = ui.available_width(); let h = (w / 16.0) * 9.0; @@ -49,9 +54,7 @@ impl Widget for StreamEvent<'_> { .rounding(Rounding::same(12.)) .sense(Sense::click()) .ui(ui), - None => { - VideoPlaceholder.ui(ui) - } + None => VideoPlaceholder.ui(ui), }; if img.clicked() { self.services.navigate(Routes::Event { @@ -60,15 +63,16 @@ impl Widget for StreamEvent<'_> { }); } ui.horizontal(|ui| { - ui.add(Avatar::public_key(self.services.profile, &host).size(40.)); + ui.add(Avatar::pubkey(&host, self.services).size(40.)); let title = RichText::new(match self.event.get_tag_value("title") { Some(s) => s.variant().str().unwrap_or("Unknown"), None => "Unknown", }) - .size(16.) - .color(Color32::WHITE); + .size(16.) + .color(Color32::WHITE); ui.add(Label::new(title).wrap_mode(TextWrapMode::Truncate)); }) - }).response + }) + .response } -} \ No newline at end of file +} diff --git a/src/widgets/stream_list.rs b/src/widgets/stream_list.rs index 05ef6aa..8eceec1 100644 --- a/src/widgets/stream_list.rs +++ b/src/widgets/stream_list.rs @@ -25,6 +25,7 @@ impl Widget for StreamList<'_> { ui.add(StreamEvent::new(event, self.services)); } }) - }).response + }) + .response } -} \ No newline at end of file +} diff --git a/src/widgets/stream_player.rs b/src/widgets/stream_player.rs index 1232a4b..03b112d 100644 --- a/src/widgets/stream_player.rs +++ b/src/widgets/stream_player.rs @@ -12,7 +12,7 @@ impl StreamPlayer { player: Player::new(ctx, url).map_or(None, |mut f| { f.start(); Some(f) - }) + }), } } } @@ -29,4 +29,4 @@ impl Widget for &mut StreamPlayer { VideoPlaceholder.ui(ui) } } -} \ No newline at end of file +} diff --git a/src/widgets/video_placeholder.rs b/src/widgets/video_placeholder.rs index b650b6b..9165f20 100644 --- a/src/widgets/video_placeholder.rs +++ b/src/widgets/video_placeholder.rs @@ -9,7 +9,11 @@ impl Widget for VideoPlaceholder { let img_size = Vec2::new(w, h); let (response, painter) = ui.allocate_painter(img_size, Sense::click()); - painter.rect_filled(Rect::EVERYTHING, Rounding::same(12.), Color32::from_rgb(200, 200, 200)); + painter.rect_filled( + Rect::EVERYTHING, + Rounding::same(12.), + Color32::from_rgb(200, 200, 200), + ); response } -} \ No newline at end of file +}