From b544603d5e55c0edc8b81f6db3ec6d29f72c6175 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sat, 24 Dec 2022 15:39:40 +1300 Subject: [PATCH] PART 3 of globals/process revamp: Switch processing over to the new process --- src/db/event.rs | 48 +--- src/db/person.rs | 1 + src/feed_event.rs | 1 + src/globals.rs | 296 +----------------------- src/overlord/minion/handle_websocket.rs | 22 +- src/overlord/mod.rs | 64 +---- src/process.rs | 6 +- 7 files changed, 19 insertions(+), 419 deletions(-) diff --git a/src/db/event.rs b/src/db/event.rs index 10f4ece5..dbe28c9e 100644 --- a/src/db/event.rs +++ b/src/db/event.rs @@ -1,7 +1,6 @@ -use super::{DbEventSeen, DbEventTag}; use crate::error::Error; use crate::globals::GLOBALS; -use nostr_types::{Event, IdHex, PublicKeyHex, Unixtime, Url}; +use nostr_types::{IdHex, PublicKeyHex}; use serde::{Deserialize, Serialize}; use tokio::task::spawn_blocking; @@ -140,49 +139,4 @@ impl DbEvent { }) .await? } - - pub async fn save_nostr_event(event: &Event, seen_on: Option) -> Result<(), Error> { - // Convert a nostr Event into a DbEvent - let db_event = DbEvent { - id: event.id.into(), - raw: serde_json::to_string(&event)?, - pubkey: event.pubkey.into(), - created_at: event.created_at.0, - kind: event.kind.into(), - content: event.content.clone(), - ots: event.ots.clone(), - }; - - // Save into event table - DbEvent::insert(db_event).await?; - - // Save the tags into event_tag table - for (seq, tag) in event.tags.iter().enumerate() { - // convert to vec of strings - let v: Vec = serde_json::from_str(&serde_json::to_string(&tag)?)?; - - let db_event_tag = DbEventTag { - event: event.id.as_hex_string(), - seq: seq as u64, - label: v.get(0).cloned(), - field0: v.get(1).cloned(), - field1: v.get(2).cloned(), - field2: v.get(3).cloned(), - field3: v.get(4).cloned(), - }; - DbEventTag::insert(db_event_tag).await?; - } - - // Save the event into event_seen table - if let Some(url) = seen_on { - let db_event_seen = DbEventSeen { - event: event.id.as_hex_string(), - relay: url.0, - when_seen: Unixtime::now()?.0 as u64, - }; - DbEventSeen::replace(db_event_seen).await?; - } - - Ok(()) - } } diff --git a/src/db/person.rs b/src/db/person.rs index 384bfe1c..58648404 100644 --- a/src/db/person.rs +++ b/src/db/person.rs @@ -135,6 +135,7 @@ impl DbPerson { Ok(()) } + #[allow(dead_code)] pub async fn update_metadata( pubkey: PublicKeyHex, metadata: Metadata, diff --git a/src/feed_event.rs b/src/feed_event.rs index c659616e..1ec7e4fd 100644 --- a/src/feed_event.rs +++ b/src/feed_event.rs @@ -30,6 +30,7 @@ pub struct FeedEvent { } impl FeedEvent { + #[allow(dead_code)] pub fn new(id: Id) -> FeedEvent { FeedEvent { id, diff --git a/src/globals.rs b/src/globals.rs index b5e5e760..750e8e24 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -5,12 +5,11 @@ use crate::feed_event::FeedEvent; use crate::relationship::Relationship; use crate::settings::Settings; use async_recursion::async_recursion; -use nostr_types::{Event, EventKind, Id, Metadata, PublicKey, PublicKeyHex, Tag, Unixtime, Url}; +use nostr_types::{Event, EventKind, Id, PublicKey, PublicKeyHex, Unixtime, Url}; use rusqlite::Connection; use std::collections::HashMap; use std::sync::atomic::AtomicBool; use tokio::sync::{broadcast, mpsc, Mutex}; -use tracing::{trace, warn}; /// Only one of these is ever created, via lazy_static!, and represents /// global state for the rust application @@ -232,299 +231,6 @@ impl Globals { } } -pub async fn add_event(event: &Event) -> Result<(), Error> { - // Insert the event - insert_event(event).await; - - // Deal with 'e' tags on kind 1 5 and 7 (they have no meaning elsewhere) - // we dont support EncryptedDirectMessage - if event.kind == EventKind::TextNote || event.kind == EventKind::Reaction { - let mut root: Option = None; - let mut reply: Option = None; - let mut refer: Vec = Vec::new(); // this is not used yet - - // Count the 'e' tags first - let count = event - .tags - .iter() - .filter(|t| matches!(t, Tag::Event { .. })) - .count(); - - for (n, tag) in event - .tags - .iter() - .filter(|t| matches!(t, Tag::Event { .. })) - .enumerate() - { - if let Tag::Event { - id, - recommended_relay_url, - marker, - } = tag - { - // Add the id to desired events list - { - let mut desired_events = GLOBALS.desired_events.lock().await; - desired_events - .entry(*id) - .and_modify(|urls| { - if let Some(url) = recommended_relay_url { - urls.push(url.to_owned()); - } - }) - .or_insert_with(|| { - if let Some(url) = recommended_relay_url { - vec![url.to_owned()] - } else { - vec![] - } - }); - } - - // Sort out root, reply and refer - if marker.is_some() { - if marker.as_ref().unwrap() == "root" { - root = Some(*id); - } else if marker.as_ref().unwrap() == "reply" { - reply = Some(*id); - } else { - warn!("Unknown event tag marker: {}", marker.as_ref().unwrap()); - } - } else if n == count - 1 { - reply = Some(*id); - } else if n == 0 { - root = Some(*id); - } else { - refer.push(*id); - } - } - } - - if let Some(id) = reply { - // Mark our event in reply to - update_feed_event(event.id, |this_event| { - this_event.in_reply_to = Some(id); - }) - .await; - - // Mark the parent event as having us as a reply - update_feed_event(id, |parent_event| { - parent_event.replies.push(event.id); - }) - .await; - - // Get our last_reply_at for propogating upwards - let mut last_reply_at = event.created_at.0; - - // Update the last_reply_at all the way up the chain - let mut xid = id; - loop { - let mut in_reply_to: Option = None; - update_feed_event(xid, |ancestor_event| { - if let Some(other) = ancestor_event.last_reply_at { - last_reply_at = other.max(last_reply_at); - ancestor_event.last_reply_at = Some(last_reply_at); - } else { - ancestor_event.last_reply_at = Some(last_reply_at); - } - in_reply_to = ancestor_event.in_reply_to; // next up the chain - }) - .await; - - xid = match in_reply_to { - Some(ref id) => *id, - None => break, - } - } - } - - // We ignore 'root' and 'refer'. - if let Some(id) = root { - trace!("event root = {}", id.as_hex_string()); - } - } - - // Some kinds seen in the wild: - // nonce, p, e, t, client, content-warning, - // subject, h, i, nostril, r, hashtag - for tag in event.tags.iter() { - // Get some metadata from tags that could apply to multiple - // kinds of events - - match tag { - Tag::Event { - id, - recommended_relay_url: _, - marker: _, - } => { - if event.kind == EventKind::EventDeletion { - // Find the other event - let maybe_other_event = GLOBALS.feed_events.lock().await.get(id).cloned(); - if let Some(deleted_feed_event) = maybe_other_event { - match &deleted_feed_event.event { - None => { - // Can't verify the author. Take no action - } - Some(deleted_event) => { - if deleted_event.pubkey != event.pubkey { - // Invalid delete event, author does not match - warn!("Somebody tried to delete someone elses event"); - GLOBALS.feed_events.lock().await.remove(id); - return Ok(()); - } else { - update_feed_event(*id, |er| { - er.deleted_reason = Some(event.content.clone()); - }) - .await; - } - } - } - } else { - // FIXME - currently we don't apply this deletion event - // if we don't have the event it refers to because we cannot - // check that the authors match. - // but if we get the event it refers to later, nothing will - // trigger us to reapply it. - } - } - } - Tag::Pubkey { .. } => { - // Maybe we can generally handle these? - // Maybe it is too specific to certain event types. - // For now we process these under specific event types. - } - Tag::Hashtag(s) => { - update_feed_event(event.id, |er| { - er.hashtags.push(s.to_string()); - }) - .await; - } - Tag::Reference(r) => { - update_feed_event(event.id, |er| { - er.urls.push(r.to_string()); - }) - .await; - } - Tag::Geohash(_) => {} // not implemented - Tag::Subject(s) => { - update_feed_event(event.id, |er| { - er.subject = Some(s.to_string()); - }) - .await; - } - Tag::Nonce { .. } => {} // not implemented - Tag::Other { tag, data } => { - if tag == "client" && !data.is_empty() { - update_feed_event(event.id, |er| { - er.client = Some(data[0].to_string()); - }) - .await; - } - } - Tag::Empty => {} // nothing to do - } - } - - if event.kind == EventKind::Reaction { - for tag in event.tags.iter() { - if let Tag::Event { - id, - recommended_relay_url: _, - marker: _, - } = tag - { - // last 'e' is the id reacted to - if event.content.starts_with('+') { - update_feed_event(id.to_owned(), |er| { - er.reactions.upvotes += 1; - }) - .await; - } else if event.content.starts_with('-') { - update_feed_event(id.to_owned(), |er| { - er.reactions.downvotes += 1; - }) - .await; - } else if event.content.is_empty() { - // consider it an upvote - update_feed_event(id.to_owned(), |er| { - er.reactions.upvotes += 1; - }) - .await; - } else { - // consider it an emoji - update_feed_event(id.to_owned(), |er| { - // FIXME: If it exists, increment it - er.reactions - .emojis - .push((event.content.chars().next().unwrap(), 1)) - }) - .await; - } - } - } - } - - Ok(()) -} - -async fn insert_event(event: &Event) { - let mut feed_events = GLOBALS.feed_events.lock().await; - - feed_events - .entry(event.id) - .and_modify(|feed_event| { - // If the event already exists, update it's base data. - // (sometimes it is created to add feed data, but doesn't have base data yet) - feed_event.feed_related = event.kind == EventKind::TextNote; - if feed_event.last_reply_at.is_none() { - feed_event.last_reply_at = Some(event.created_at.0) - } - if feed_event.event.is_none() { - feed_event.event = Some(event.to_owned()); - } - }) - .or_insert_with(|| event.into()); -} - -async fn update_feed_event(id: Id, mut f: F) -where - F: FnMut(&mut FeedEvent), -{ - let mut feed_events = GLOBALS.feed_events.lock().await; - let feed_event = feed_events.entry(id).or_insert_with(|| FeedEvent::new(id)); - f(feed_event); -} - -pub async fn update_person_from_event_metadata( - pubkey: PublicKey, - created_at: Unixtime, - metadata: Metadata, -) { - let mut people = GLOBALS.people.lock().await; - let person = people - .entry(pubkey) - .or_insert_with(|| DbPerson::new(pubkey.into())); - - // Do not update the metadata if ours is newer - if let Some(metadata_at) = person.metadata_at { - if created_at.0 <= metadata_at { - // Old metadata. Ignore it - return; - } - } - - // Update the metadata - person.name = metadata.name; - person.about = metadata.about; - person.picture = metadata.picture; - if person.dns_id != metadata.nip05 { - person.dns_id = metadata.nip05; - person.dns_id_valid = 0; // changed, so reset to invalid - person.dns_id_last_checked = None; // we haven't checked this one yet - } - person.metadata_at = Some(created_at.0); -} - #[allow(dead_code)] async fn save_person(pubkey: PublicKey) -> Result<(), Error> { let mut people = GLOBALS.people.lock().await; diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index c3d84dba..53ddc89f 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -1,8 +1,8 @@ use super::Minion; -use crate::db::{DbEvent, DbPersonRelay}; -use crate::{BusMessage, Error}; -use nostr_types::{Event, RelayMessage, Unixtime}; -use tracing::{error, info, trace, warn}; +use crate::db::DbPersonRelay; +use crate::Error; +use nostr_types::{RelayMessage, Unixtime}; +use tracing::{debug, error, info, warn}; impl Minion { pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> { @@ -19,9 +19,8 @@ impl Minion { if let Err(e) = event.verify(Some(maxtime)) { error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?) } else { - trace!("NEW EVENT ON {}", subid.0); - DbEvent::save_nostr_event(&event, Some(self.url.clone())).await?; - self.send_overlord_newevent(*event).await?; + debug!("NEW EVENT ON {}", subid.0); + crate::process::process_new_event(&event, true, Some(self.url.clone())).await?; } } RelayMessage::Notice(msg) => { @@ -51,13 +50,4 @@ impl Minion { Ok(()) } - - async fn send_overlord_newevent(&self, event: Event) -> Result<(), Error> { - self.to_overlord.send(BusMessage { - target: "overlord".to_string(), - kind: "new_event".to_string(), - json_payload: serde_json::to_string(&event)?, - })?; - Ok(()) - } } diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index d79c2a50..aa71527b 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -7,7 +7,7 @@ use crate::error::Error; use crate::globals::GLOBALS; use crate::settings::Settings; use minion::Minion; -use nostr_types::{Event, EventKind, Metadata, PrivateKey, PublicKey, PublicKeyHex, Unixtime, Url}; +use nostr_types::{Event, PrivateKey, PublicKey, PublicKeyHex, Unixtime, Url}; use relay_picker::{BestRelay, RelayPicker}; use std::collections::HashMap; use tokio::sync::broadcast::Sender; @@ -104,32 +104,9 @@ impl Overlord { continue; } }; - let metadata: Metadata = match serde_json::from_str(&dbevent.content) { - Ok(e) => e, - Err(_) => { - error!( - "Bad metadata: id={}, content={}", - dbevent.id, dbevent.content - ); - continue; - } - }; - // Update in globals - crate::globals::update_person_from_event_metadata( - e.pubkey, - e.created_at, - metadata.clone(), - ) - .await; - - // Update in database - DbPerson::update_metadata( - PublicKeyHex(e.pubkey.as_hex_string()), - metadata, - e.created_at, - ) - .await?; + // Process this metadata event to update people + crate::process::process_new_event(&e, false, None).await?; } } @@ -154,7 +131,7 @@ impl Overlord { let mut count = 0; for event in events.iter() { count += 1; - crate::globals::add_event(event).await?; + crate::process::process_new_event(event, false, None).await?; } info!("Loaded {} events from the database", count); } @@ -261,39 +238,6 @@ impl Overlord { _ => {} }, "overlord" => match &*bus_message.kind { - "new_event" => { - let event: Event = serde_json::from_str(&bus_message.json_payload)?; - - // If feed-related, send to the feed event processor - if event.kind == EventKind::TextNote - || event.kind == EventKind::EncryptedDirectMessage - || event.kind == EventKind::EventDeletion - || event.kind == EventKind::Reaction - { - crate::globals::add_event(&event).await?; - - debug!("new feed event arrived: {}...", event.id.as_hex_string()); - } else { - // Not Feed Related: Metadata, RecommendRelay, ContactList - debug!( - "new non-feed event arrived: {}...", - event.id.as_hex_string() - ); - - if event.kind == EventKind::Metadata { - let metadata: Metadata = serde_json::from_str(&event.content)?; - crate::globals::update_person_from_event_metadata( - event.pubkey, - event.created_at, - metadata, - ) - .await; - } - - // FIXME: Handle EventKind::RecommendedRelay - // FIXME: Handle EventKind::ContactList - } - } "minion_is_ready" => {} "save_settings" => { let settings: Settings = serde_json::from_str(&bus_message.json_payload)?; diff --git a/src/process.rs b/src/process.rs index 4c2abbf9..51bd4d94 100644 --- a/src/process.rs +++ b/src/process.rs @@ -7,7 +7,11 @@ use nostr_types::{Event, EventKind, Metadata, Unixtime, Url}; // This processes a new event, saving the results into the database // and also populating the GLOBALS maps. #[allow(dead_code)] -pub async fn process_new_event(event: &Event, from_relay: bool, seen_on: Option) -> Result<(), Error> { +pub async fn process_new_event( + event: &Event, + from_relay: bool, + seen_on: Option, +) -> Result<(), Error> { // Save the event into the database if from_relay { // Convert a nostr Event into a DbEvent