When viewing a thread, pull relevant events from local database

This commit is contained in:
Mike Dilger 2023-02-19 10:27:32 +13:00
parent 80f03a57fe
commit 3c4fde2aee
4 changed files with 110 additions and 55 deletions

View File

@ -31,7 +31,7 @@ pub enum ToOverlordMessage {
SetActivePerson(PublicKeyHex), SetActivePerson(PublicKeyHex),
SetRelayReadWrite(RelayUrl, bool, bool), SetRelayReadWrite(RelayUrl, bool, bool),
SetRelayAdvertise(RelayUrl, bool), SetRelayAdvertise(RelayUrl, bool),
SetThreadFeed(Id, Id, Option<Id>), SetThreadFeed(Id, Id),
Shutdown, Shutdown,
UnlockKey(String), UnlockKey(String),
UpdateMetadata(PublicKeyHex), UpdateMetadata(PublicKeyHex),

View File

@ -63,7 +63,6 @@ impl Events {
} }
/// Get event from database, by Filter /// Get event from database, by Filter
#[allow(dead_code)]
pub async fn get_local_events_by_filter(&self, filter: Filter) -> Result<Vec<Event>, Error> { pub async fn get_local_events_by_filter(&self, filter: Filter) -> Result<Vec<Event>, Error> {
let mut conditions: Vec<String> = Vec::new(); let mut conditions: Vec<String> = Vec::new();
if !filter.ids.is_empty() { if !filter.ids.is_empty() {

View File

@ -82,18 +82,15 @@ impl Feed {
// Parent starts with the post itself // Parent starts with the post itself
// Overlord will climb it, and recompute will climb it // Overlord will climb it, and recompute will climb it
let previous_thread_parent = *self.thread_parent.read();
*self.thread_parent.write() = Some(id); *self.thread_parent.write() = Some(id);
let _ = GLOBALS.to_minions.send(ToMinionMessage { let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(), target: "all".to_string(),
payload: ToMinionPayload::UnsubscribePersonFeed, payload: ToMinionPayload::UnsubscribePersonFeed,
}); });
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::SetThreadFeed( let _ = GLOBALS
id, .to_overlord
referenced_by, .send(ToOverlordMessage::SetThreadFeed(id, referenced_by));
previous_thread_parent,
));
} }
pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) { pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) {

View File

@ -13,8 +13,8 @@ use crate::tags::{
use dashmap::mapref::entry::Entry; use dashmap::mapref::entry::Entry;
use minion::Minion; use minion::Minion;
use nostr_types::{ use nostr_types::{
EncryptedPrivateKey, Event, EventKind, Id, IdHex, Metadata, PreEvent, PrivateKey, Profile, EncryptedPrivateKey, Event, EventKind, Filter, Id, IdHex, IdHexPrefix, Metadata, PreEvent,
PublicKey, PublicKeyHex, RelayUrl, Tag, Unixtime, PrivateKey, Profile, PublicKey, PublicKeyHex, RelayUrl, Tag, Unixtime,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -607,9 +607,8 @@ impl Overlord {
} }
DbRelay::update_advertise(relay_url, advertise).await?; DbRelay::update_advertise(relay_url, advertise).await?;
} }
ToOverlordMessage::SetThreadFeed(id, referenced_by, previous_thread_parent) => { ToOverlordMessage::SetThreadFeed(id, referenced_by) => {
self.set_thread_feed(id, referenced_by, previous_thread_parent) self.set_thread_feed(id, referenced_by).await?;
.await?;
} }
ToOverlordMessage::Shutdown => { ToOverlordMessage::Shutdown => {
tracing::info!("Overlord shutting down"); tracing::info!("Overlord shutting down");
@ -1114,26 +1113,26 @@ impl Overlord {
Ok(()) Ok(())
} }
async fn set_thread_feed( async fn set_thread_feed(&mut self, id: Id, referenced_by: Id) -> Result<(), Error> {
&mut self, // We are responsible for loading all the ancestors and all the replies, and
id: Id, // process.rs is responsible for building the relationships.
referenced_by: Id, // The UI can only show events if they are loaded into memory and the relationships
previous_thread_parent: Option<Id>, // exist in memory.
) -> Result<(), Error> {
// Collect missing ancestors and relays they might be at. // Our task is fourfold:
// We will ask all the relays about all the ancestors, which is more than we need to // ancestors from sqlite, replies from sqlite
// but isn't too much to ask for. // ancestors from relays, replies from relays,
// We simplify things by asking for this data from every relay we are
// connected to, as well as any relays we discover might know. This is
// more than strictly necessary, but not too expensive.
let mut missing_ancestors: Vec<Id> = Vec::new(); let mut missing_ancestors: Vec<Id> = Vec::new();
let mut relays: Vec<RelayUrl> = Vec::new(); let mut relays: Vec<RelayUrl> = Vec::new();
// Include the relays where the referenced_by event was seen // Include the relays where the referenced_by event was seen
relays.extend(DbEventSeen::get_relays_for_event(referenced_by).await?); relays.extend(DbEventSeen::get_relays_for_event(referenced_by).await?);
relays.extend(DbEventSeen::get_relays_for_event(id).await?); relays.extend(DbEventSeen::get_relays_for_event(id).await?);
if relays.is_empty() {
*GLOBALS.status_message.write().await =
"Could not find any relays for that event".to_owned();
return Ok(());
}
// Climb the tree as high as we can, and if there are higher events, // Climb the tree as high as we can, and if there are higher events,
// we will ask for those in the initial subscription // we will ask for those in the initial subscription
@ -1141,17 +1140,16 @@ impl Overlord {
if let Some(hpid) = GLOBALS.events.get_highest_local_parent(&id).await? { if let Some(hpid) = GLOBALS.events.get_highest_local_parent(&id).await? {
hpid hpid
} else { } else {
// we don't have the event itself!
missing_ancestors.push(id); missing_ancestors.push(id);
id id
}; };
if previous_thread_parent == Some(highest_parent_id) { // Set the thread feed to the highest parent that we have, or to the event itself
tracing::debug!("Same thread, not resubscribing."); // even if we don't have it (it might be coming in soon)
return Ok(());
}
GLOBALS.feed.set_thread_parent(highest_parent_id); GLOBALS.feed.set_thread_parent(highest_parent_id);
// Collect missing ancestors and potential relays further up the chain
if let Some(highest_parent) = GLOBALS.events.get_local(highest_parent_id).await? { if let Some(highest_parent) = GLOBALS.events.get_local(highest_parent_id).await? {
// Use relays in 'e' tags // Use relays in 'e' tags
for (id, opturl) in highest_parent.replies_to_ancestors() { for (id, opturl) in highest_parent.replies_to_ancestors() {
@ -1174,36 +1172,97 @@ impl Overlord {
let missing_ancestors_hex: Vec<IdHex> = let missing_ancestors_hex: Vec<IdHex> =
missing_ancestors.iter().map(|id| (*id).into()).collect(); missing_ancestors.iter().map(|id| (*id).into()).collect();
if missing_ancestors_hex.is_empty() { // Load events from local database
return Ok(()); // FIXME: This replicates filters that the minion also builds. We should
} // instead build the filters, then both send them to the minion and
// also query them locally.
{
let enable_reactions = GLOBALS.settings.read().await.reactions;
tracing::debug!("Seeking ancestors {:?}", missing_ancestors_hex); if !missing_ancestors_hex.is_empty() {
let idhp: Vec<IdHexPrefix> = missing_ancestors_hex
.iter()
.map(|id| id.to_owned().into())
.collect();
let _ = GLOBALS
.events
.get_local_events_by_filter(Filter {
ids: idhp,
..Default::default()
})
.await?;
// Clean up relays let mut kinds = vec![EventKind::EventDeletion];
relays.sort(); if enable_reactions {
relays.dedup(); kinds.push(EventKind::Reaction);
}
// Cancel current thread subscriptions, if any let e = GLOBALS
let _ = self.to_minions.send(ToMinionMessage { .events
target: "all".to_string(), .get_local_events_by_filter(Filter {
payload: ToMinionPayload::UnsubscribeThreadFeed, e: missing_ancestors_hex.clone(),
}); kinds,
..Default::default()
for url in relays.iter() { })
// Start minion if needed .await?;
if !GLOBALS.relay_is_connected(url) { if !e.is_empty() {
self.start_minion(url.clone()).await?; tracing::debug!("Loaded {} local ancestor events", e.len());
}
} }
// Subscribe let mut kinds = vec![
EventKind::TextNote,
EventKind::Repost,
EventKind::EventDeletion,
];
if enable_reactions {
kinds.push(EventKind::Reaction);
}
let e = GLOBALS
.events
.get_local_events_by_filter(Filter {
e: vec![id.into()],
kinds,
..Default::default()
})
.await?;
if !e.is_empty() {
tracing::debug!("Loaded {} local reply events", e.len());
}
}
// Subscribe on relays
if relays.is_empty() {
*GLOBALS.status_message.write().await =
"Could not find any relays for that event".to_owned();
return Ok(());
} else {
// Clean up relays
relays.sort();
relays.dedup();
// Cancel current thread subscriptions, if any
let _ = self.to_minions.send(ToMinionMessage { let _ = self.to_minions.send(ToMinionMessage {
target: url.0.to_string(), target: "all".to_string(),
payload: ToMinionPayload::SubscribeThreadFeed( payload: ToMinionPayload::UnsubscribeThreadFeed,
id.into(),
missing_ancestors_hex.clone(),
),
}); });
for url in relays.iter() {
// Start minion if needed
if !GLOBALS.relay_is_connected(url) {
self.start_minion(url.clone()).await?;
}
// Subscribe
let _ = self.to_minions.send(ToMinionMessage {
target: url.0.to_string(),
payload: ToMinionPayload::SubscribeThreadFeed(
id.into(),
missing_ancestors_hex.clone(),
),
});
}
} }
Ok(()) Ok(())