From 3c4fde2aee6283d170577433cded528a315c21ae Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sun, 19 Feb 2023 10:27:32 +1300 Subject: [PATCH] When viewing a thread, pull relevant events from local database --- src/comms.rs | 2 +- src/events.rs | 1 - src/feed.rs | 9 +-- src/overlord/mod.rs | 153 ++++++++++++++++++++++++++++++-------------- 4 files changed, 110 insertions(+), 55 deletions(-) diff --git a/src/comms.rs b/src/comms.rs index 9863f75b..a8436bae 100644 --- a/src/comms.rs +++ b/src/comms.rs @@ -31,7 +31,7 @@ pub enum ToOverlordMessage { SetActivePerson(PublicKeyHex), SetRelayReadWrite(RelayUrl, bool, bool), SetRelayAdvertise(RelayUrl, bool), - SetThreadFeed(Id, Id, Option), + SetThreadFeed(Id, Id), Shutdown, UnlockKey(String), UpdateMetadata(PublicKeyHex), diff --git a/src/events.rs b/src/events.rs index 14e33020..1d629b05 100644 --- a/src/events.rs +++ b/src/events.rs @@ -63,7 +63,6 @@ impl Events { } /// Get event from database, by Filter - #[allow(dead_code)] pub async fn get_local_events_by_filter(&self, filter: Filter) -> Result, Error> { let mut conditions: Vec = Vec::new(); if !filter.ids.is_empty() { diff --git a/src/feed.rs b/src/feed.rs index 13650769..20aab76d 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -82,18 +82,15 @@ impl Feed { // Parent starts with the post itself // Overlord will climb it, and recompute will climb it - let previous_thread_parent = *self.thread_parent.read(); *self.thread_parent.write() = Some(id); let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), payload: ToMinionPayload::UnsubscribePersonFeed, }); - let _ = GLOBALS.to_overlord.send(ToOverlordMessage::SetThreadFeed( - id, - referenced_by, - previous_thread_parent, - )); + let _ = GLOBALS + .to_overlord + .send(ToOverlordMessage::SetThreadFeed(id, referenced_by)); } pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) { diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 3766d9b8..6282cc72 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -13,8 +13,8 @@ use crate::tags::{ use dashmap::mapref::entry::Entry; use minion::Minion; use nostr_types::{ - EncryptedPrivateKey, Event, EventKind, Id, IdHex, Metadata, PreEvent, PrivateKey, Profile, - PublicKey, PublicKeyHex, RelayUrl, Tag, Unixtime, + EncryptedPrivateKey, Event, EventKind, Filter, Id, IdHex, IdHexPrefix, Metadata, PreEvent, + PrivateKey, Profile, PublicKey, PublicKeyHex, RelayUrl, Tag, Unixtime, }; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -607,9 +607,8 @@ impl Overlord { } DbRelay::update_advertise(relay_url, advertise).await?; } - ToOverlordMessage::SetThreadFeed(id, referenced_by, previous_thread_parent) => { - self.set_thread_feed(id, referenced_by, previous_thread_parent) - .await?; + ToOverlordMessage::SetThreadFeed(id, referenced_by) => { + self.set_thread_feed(id, referenced_by).await?; } ToOverlordMessage::Shutdown => { tracing::info!("Overlord shutting down"); @@ -1114,26 +1113,26 @@ impl Overlord { Ok(()) } - async fn set_thread_feed( - &mut self, - id: Id, - referenced_by: Id, - previous_thread_parent: Option, - ) -> Result<(), Error> { - // Collect missing ancestors and relays they might be at. - // We will ask all the relays about all the ancestors, which is more than we need to - // but isn't too much to ask for. + async fn set_thread_feed(&mut self, id: Id, referenced_by: Id) -> Result<(), Error> { + // We are responsible for loading all the ancestors and all the replies, and + // process.rs is responsible for building the relationships. + // The UI can only show events if they are loaded into memory and the relationships + // exist in memory. + + // Our task is fourfold: + // ancestors from sqlite, replies from sqlite + // ancestors from relays, replies from relays, + + // We simplify things by asking for this data from every relay we are + // connected to, as well as any relays we discover might know. This is + // more than strictly necessary, but not too expensive. + let mut missing_ancestors: Vec = Vec::new(); let mut relays: Vec = Vec::new(); // Include the relays where the referenced_by event was seen relays.extend(DbEventSeen::get_relays_for_event(referenced_by).await?); relays.extend(DbEventSeen::get_relays_for_event(id).await?); - if relays.is_empty() { - *GLOBALS.status_message.write().await = - "Could not find any relays for that event".to_owned(); - return Ok(()); - } // Climb the tree as high as we can, and if there are higher events, // we will ask for those in the initial subscription @@ -1141,17 +1140,16 @@ impl Overlord { if let Some(hpid) = GLOBALS.events.get_highest_local_parent(&id).await? { hpid } else { + // we don't have the event itself! missing_ancestors.push(id); id }; - if previous_thread_parent == Some(highest_parent_id) { - tracing::debug!("Same thread, not resubscribing."); - return Ok(()); - } - + // Set the thread feed to the highest parent that we have, or to the event itself + // even if we don't have it (it might be coming in soon) GLOBALS.feed.set_thread_parent(highest_parent_id); + // Collect missing ancestors and potential relays further up the chain if let Some(highest_parent) = GLOBALS.events.get_local(highest_parent_id).await? { // Use relays in 'e' tags for (id, opturl) in highest_parent.replies_to_ancestors() { @@ -1174,36 +1172,97 @@ impl Overlord { let missing_ancestors_hex: Vec = missing_ancestors.iter().map(|id| (*id).into()).collect(); - if missing_ancestors_hex.is_empty() { - return Ok(()); - } + // Load events from local database + // FIXME: This replicates filters that the minion also builds. We should + // instead build the filters, then both send them to the minion and + // also query them locally. + { + let enable_reactions = GLOBALS.settings.read().await.reactions; - tracing::debug!("Seeking ancestors {:?}", missing_ancestors_hex); + if !missing_ancestors_hex.is_empty() { + let idhp: Vec = missing_ancestors_hex + .iter() + .map(|id| id.to_owned().into()) + .collect(); + let _ = GLOBALS + .events + .get_local_events_by_filter(Filter { + ids: idhp, + ..Default::default() + }) + .await?; - // Clean up relays - relays.sort(); - relays.dedup(); + let mut kinds = vec![EventKind::EventDeletion]; + if enable_reactions { + kinds.push(EventKind::Reaction); + } - // Cancel current thread subscriptions, if any - let _ = self.to_minions.send(ToMinionMessage { - target: "all".to_string(), - payload: ToMinionPayload::UnsubscribeThreadFeed, - }); - - for url in relays.iter() { - // Start minion if needed - if !GLOBALS.relay_is_connected(url) { - self.start_minion(url.clone()).await?; + let e = GLOBALS + .events + .get_local_events_by_filter(Filter { + e: missing_ancestors_hex.clone(), + kinds, + ..Default::default() + }) + .await?; + if !e.is_empty() { + tracing::debug!("Loaded {} local ancestor events", e.len()); + } } - // Subscribe + let mut kinds = vec![ + EventKind::TextNote, + EventKind::Repost, + EventKind::EventDeletion, + ]; + if enable_reactions { + kinds.push(EventKind::Reaction); + } + + let e = GLOBALS + .events + .get_local_events_by_filter(Filter { + e: vec![id.into()], + kinds, + ..Default::default() + }) + .await?; + if !e.is_empty() { + tracing::debug!("Loaded {} local reply events", e.len()); + } + } + + // Subscribe on relays + if relays.is_empty() { + *GLOBALS.status_message.write().await = + "Could not find any relays for that event".to_owned(); + return Ok(()); + } else { + // Clean up relays + relays.sort(); + relays.dedup(); + + // Cancel current thread subscriptions, if any let _ = self.to_minions.send(ToMinionMessage { - target: url.0.to_string(), - payload: ToMinionPayload::SubscribeThreadFeed( - id.into(), - missing_ancestors_hex.clone(), - ), + target: "all".to_string(), + payload: ToMinionPayload::UnsubscribeThreadFeed, }); + + for url in relays.iter() { + // Start minion if needed + if !GLOBALS.relay_is_connected(url) { + self.start_minion(url.clone()).await?; + } + + // Subscribe + let _ = self.to_minions.send(ToMinionMessage { + target: url.0.to_string(), + payload: ToMinionPayload::SubscribeThreadFeed( + id.into(), + missing_ancestors_hex.clone(), + ), + }); + } } Ok(())