From dd04a5df196340a10f8d4e91387e4a6d79151498 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Fri, 23 Dec 2022 09:28:04 +1300 Subject: [PATCH] Improve event processing/threading code --- src/globals.rs | 176 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 135 insertions(+), 41 deletions(-) diff --git a/src/globals.rs b/src/globals.rs index 590539f6..624accb7 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -2,12 +2,12 @@ use crate::comms::BusMessage; use crate::db::{DbPerson, DbPersonRelay, DbRelay}; use crate::error::Error; use crate::feed_event::FeedEvent; -use nostr_proto::{Event, EventKind, Id, Metadata, PublicKey, PublicKeyHex, Tag, Unixtime}; +use nostr_proto::{Event, EventKind, Id, Metadata, PublicKey, PublicKeyHex, Tag, Unixtime, Url}; use rusqlite::Connection; use std::collections::HashMap; use std::sync::atomic::AtomicBool; use tokio::sync::{broadcast, mpsc, Mutex}; -use tracing::warn; +use tracing::{trace, warn}; /// Only one of these is ever created, via lazy_static!, and represents /// global state for the rust application @@ -30,6 +30,10 @@ pub struct Globals { /// All nostr event related data, keyed by the event Id pub feed_events: Mutex>, + /// Desired events, referred to by others, with possible URLs where we can + /// get them. We may already have these, but if not we should ask for them. + pub desired_events: Mutex>>, + /// All nostr people records currently loaded into memory, keyed by pubkey pub people: Mutex>, @@ -53,6 +57,7 @@ lazy_static! { to_overlord, from_minions: Mutex::new(Some(from_minions)), feed_events: Mutex::new(HashMap::new()), + desired_events: Mutex::new(HashMap::new()), people: Mutex::new(HashMap::new()), need_password: AtomicBool::new(false), } @@ -118,6 +123,118 @@ pub async fn add_event(event: &Event) -> Result<(), Error> { // Insert the event insert_event(event).await; + // Deal with 'e' tags on kind 1 5 and 7 (they have no meaning elsewhere) + // we dont support EncryptedDirectMessage + if event.kind == EventKind::TextNote || event.kind == EventKind::Reaction { + let mut root: Option = None; + let mut reply: Option = None; + let mut refer: Vec = Vec::new(); // this is not used yet + + // Count the 'e' tags first + let count = event + .tags + .iter() + .filter(|t| match t { + Tag::Event { .. } => true, + _ => false, + }) + .count(); + + for (n, tag) in event + .tags + .iter() + .filter(|t| match t { + Tag::Event { .. } => true, + _ => false, + }) + .enumerate() + { + if let Tag::Event { + id, + recommended_relay_url, + marker, + } = tag + { + // Add the id to desired events list + { + let mut desired_events = GLOBALS.desired_events.lock().await; + desired_events + .entry(*id) + .and_modify(|urls| { + if let Some(url) = recommended_relay_url { + urls.push(url.to_owned()); + } + }) + .or_insert_with(|| { + if let Some(url) = recommended_relay_url { + vec![url.to_owned()] + } else { + vec![] + } + }); + } + + // Sort out root, reply and refer + if marker.is_some() { + if marker.as_ref().unwrap() == "root" { + root = Some(*id); + } else if marker.as_ref().unwrap() == "reply" { + reply = Some(*id); + } else { + warn!("Unknown event tag marker: {}", marker.as_ref().unwrap()); + } + } else if n == count - 1 { + reply = Some(*id); + } else if n == 0 { + root = Some(*id); + } else { + refer.push(*id); + } + } + } + + if let Some(id) = reply { + // Mark our event in reply to + update_feed_event(event.id, |this_event| { + this_event.in_reply_to = Some(id); + }) + .await; + + // Mark the parent event as having us as a reply + update_feed_event(id, |parent_event| { + parent_event.replies.push(event.id); + }) + .await; + + // Get our last_reply_at for propogating upwards + let mut last_reply_at = event.created_at.0; + + // Update the last_reply_at all the way up the chain + let mut xid = id; + loop { + let mut in_reply_to: Option = None; + update_feed_event(xid, |ancestor_event| { + if let Some(other) = ancestor_event.last_reply_at { + last_reply_at = other.max(last_reply_at); + ancestor_event.last_reply_at = Some(last_reply_at); + } else { + ancestor_event.last_reply_at = Some(last_reply_at); + } + in_reply_to = ancestor_event.in_reply_to; // next up the chain + }) + .await; + + xid = match in_reply_to { + Some(ref id) => *id, + None => break, + } + } + } + + // We ignore 'root' and 'refer'. + if let Some(id) = root { trace!("event root = {}", id.as_hex_string()); } + } + // Some kinds seen in the wild: // nonce, p, e, t, client, content-warning, // subject, h, i, nostril, r, hashtag @@ -129,45 +246,9 @@ pub async fn add_event(event: &Event) -> Result<(), Error> { Tag::Event { id, recommended_relay_url: _, - marker, + marker: _, } => { - if event.kind == EventKind::TextNote { - if let Some(m) = marker { - if m == "reply" { - // Mark our 'in_reply_to' - update_feed_event(event.id, |er| { - er.in_reply_to = Some(*id); - }) - .await; - - // Add ourself to the parent's replies - update_feed_event(*id, |er| { - er.replies.push(event.id); - }) - .await; - - // Update the last_reply_at all the way up the chain - let mut xid = *id; - loop { - let mut in_reply_to: Option = None; - update_feed_event(xid, |er| { - if let Some(other) = er.last_reply_at { - er.last_reply_at = Some(other.max(event.created_at.0)); - } else { - er.last_reply_at = Some(event.created_at.0); - } - in_reply_to = er.in_reply_to; - }) - .await; - - xid = match in_reply_to { - Some(ref id) => *id, - None => break, - } - } - } - } - } else if event.kind == EventKind::EventDeletion { + if event.kind == EventKind::EventDeletion { // Find the other event let maybe_other_event = GLOBALS.feed_events.lock().await.get(id).cloned(); if let Some(deleted_feed_event) = maybe_other_event { @@ -279,7 +360,20 @@ pub async fn add_event(event: &Event) -> Result<(), Error> { async fn insert_event(event: &Event) { let mut feed_events = GLOBALS.feed_events.lock().await; - feed_events.insert(event.id, event.into()); + + feed_events.entry(event.id) + .and_modify(|feed_event| { + // If the event already exists, update it's base data. + // (sometimes it is created to add feed data, but doesn't have base data yet) + feed_event.feed_related = event.kind==EventKind::TextNote; + if feed_event.last_reply_at.is_none() { + feed_event.last_reply_at = Some(event.created_at.0) + } + if feed_event.event.is_none() { + feed_event.event = Some(event.to_owned()); + } + }) + .or_insert(event.into()); } async fn update_feed_event(id: Id, mut f: F)