PART 3 of globals/process revamp: Switch processing over to the new process

This commit is contained in:
Mike Dilger 2022-12-24 15:39:40 +13:00
parent 9302b36a57
commit b544603d5e
7 changed files with 19 additions and 419 deletions

View File

@ -1,7 +1,6 @@
use super::{DbEventSeen, DbEventTag};
use crate::error::Error; use crate::error::Error;
use crate::globals::GLOBALS; use crate::globals::GLOBALS;
use nostr_types::{Event, IdHex, PublicKeyHex, Unixtime, Url}; use nostr_types::{IdHex, PublicKeyHex};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
@ -140,49 +139,4 @@ impl DbEvent {
}) })
.await? .await?
} }
pub async fn save_nostr_event(event: &Event, seen_on: Option<Url>) -> Result<(), Error> {
// Convert a nostr Event into a DbEvent
let db_event = DbEvent {
id: event.id.into(),
raw: serde_json::to_string(&event)?,
pubkey: event.pubkey.into(),
created_at: event.created_at.0,
kind: event.kind.into(),
content: event.content.clone(),
ots: event.ots.clone(),
};
// Save into event table
DbEvent::insert(db_event).await?;
// Save the tags into event_tag table
for (seq, tag) in event.tags.iter().enumerate() {
// convert to vec of strings
let v: Vec<String> = serde_json::from_str(&serde_json::to_string(&tag)?)?;
let db_event_tag = DbEventTag {
event: event.id.as_hex_string(),
seq: seq as u64,
label: v.get(0).cloned(),
field0: v.get(1).cloned(),
field1: v.get(2).cloned(),
field2: v.get(3).cloned(),
field3: v.get(4).cloned(),
};
DbEventTag::insert(db_event_tag).await?;
}
// Save the event into event_seen table
if let Some(url) = seen_on {
let db_event_seen = DbEventSeen {
event: event.id.as_hex_string(),
relay: url.0,
when_seen: Unixtime::now()?.0 as u64,
};
DbEventSeen::replace(db_event_seen).await?;
}
Ok(())
}
} }

View File

@ -135,6 +135,7 @@ impl DbPerson {
Ok(()) Ok(())
} }
#[allow(dead_code)]
pub async fn update_metadata( pub async fn update_metadata(
pubkey: PublicKeyHex, pubkey: PublicKeyHex,
metadata: Metadata, metadata: Metadata,

View File

@ -30,6 +30,7 @@ pub struct FeedEvent {
} }
impl FeedEvent { impl FeedEvent {
#[allow(dead_code)]
pub fn new(id: Id) -> FeedEvent { pub fn new(id: Id) -> FeedEvent {
FeedEvent { FeedEvent {
id, id,

View File

@ -5,12 +5,11 @@ use crate::feed_event::FeedEvent;
use crate::relationship::Relationship; use crate::relationship::Relationship;
use crate::settings::Settings; use crate::settings::Settings;
use async_recursion::async_recursion; use async_recursion::async_recursion;
use nostr_types::{Event, EventKind, Id, Metadata, PublicKey, PublicKeyHex, Tag, Unixtime, Url}; use nostr_types::{Event, EventKind, Id, PublicKey, PublicKeyHex, Unixtime, Url};
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::sync::{broadcast, mpsc, Mutex};
use tracing::{trace, warn};
/// Only one of these is ever created, via lazy_static!, and represents /// Only one of these is ever created, via lazy_static!, and represents
/// global state for the rust application /// global state for the rust application
@ -232,299 +231,6 @@ impl Globals {
} }
} }
pub async fn add_event(event: &Event) -> Result<(), Error> {
// Insert the event
insert_event(event).await;
// Deal with 'e' tags on kind 1 5 and 7 (they have no meaning elsewhere)
// we dont support EncryptedDirectMessage
if event.kind == EventKind::TextNote || event.kind == EventKind::Reaction {
let mut root: Option<Id> = None;
let mut reply: Option<Id> = None;
let mut refer: Vec<Id> = Vec::new(); // this is not used yet
// Count the 'e' tags first
let count = event
.tags
.iter()
.filter(|t| matches!(t, Tag::Event { .. }))
.count();
for (n, tag) in event
.tags
.iter()
.filter(|t| matches!(t, Tag::Event { .. }))
.enumerate()
{
if let Tag::Event {
id,
recommended_relay_url,
marker,
} = tag
{
// Add the id to desired events list
{
let mut desired_events = GLOBALS.desired_events.lock().await;
desired_events
.entry(*id)
.and_modify(|urls| {
if let Some(url) = recommended_relay_url {
urls.push(url.to_owned());
}
})
.or_insert_with(|| {
if let Some(url) = recommended_relay_url {
vec![url.to_owned()]
} else {
vec![]
}
});
}
// Sort out root, reply and refer
if marker.is_some() {
if marker.as_ref().unwrap() == "root" {
root = Some(*id);
} else if marker.as_ref().unwrap() == "reply" {
reply = Some(*id);
} else {
warn!("Unknown event tag marker: {}", marker.as_ref().unwrap());
}
} else if n == count - 1 {
reply = Some(*id);
} else if n == 0 {
root = Some(*id);
} else {
refer.push(*id);
}
}
}
if let Some(id) = reply {
// Mark our event in reply to
update_feed_event(event.id, |this_event| {
this_event.in_reply_to = Some(id);
})
.await;
// Mark the parent event as having us as a reply
update_feed_event(id, |parent_event| {
parent_event.replies.push(event.id);
})
.await;
// Get our last_reply_at for propogating upwards
let mut last_reply_at = event.created_at.0;
// Update the last_reply_at all the way up the chain
let mut xid = id;
loop {
let mut in_reply_to: Option<Id> = None;
update_feed_event(xid, |ancestor_event| {
if let Some(other) = ancestor_event.last_reply_at {
last_reply_at = other.max(last_reply_at);
ancestor_event.last_reply_at = Some(last_reply_at);
} else {
ancestor_event.last_reply_at = Some(last_reply_at);
}
in_reply_to = ancestor_event.in_reply_to; // next up the chain
})
.await;
xid = match in_reply_to {
Some(ref id) => *id,
None => break,
}
}
}
// We ignore 'root' and 'refer'.
if let Some(id) = root {
trace!("event root = {}", id.as_hex_string());
}
}
// Some kinds seen in the wild:
// nonce, p, e, t, client, content-warning,
// subject, h, i, nostril, r, hashtag
for tag in event.tags.iter() {
// Get some metadata from tags that could apply to multiple
// kinds of events
match tag {
Tag::Event {
id,
recommended_relay_url: _,
marker: _,
} => {
if event.kind == EventKind::EventDeletion {
// Find the other event
let maybe_other_event = GLOBALS.feed_events.lock().await.get(id).cloned();
if let Some(deleted_feed_event) = maybe_other_event {
match &deleted_feed_event.event {
None => {
// Can't verify the author. Take no action
}
Some(deleted_event) => {
if deleted_event.pubkey != event.pubkey {
// Invalid delete event, author does not match
warn!("Somebody tried to delete someone elses event");
GLOBALS.feed_events.lock().await.remove(id);
return Ok(());
} else {
update_feed_event(*id, |er| {
er.deleted_reason = Some(event.content.clone());
})
.await;
}
}
}
} else {
// FIXME - currently we don't apply this deletion event
// if we don't have the event it refers to because we cannot
// check that the authors match.
// but if we get the event it refers to later, nothing will
// trigger us to reapply it.
}
}
}
Tag::Pubkey { .. } => {
// Maybe we can generally handle these?
// Maybe it is too specific to certain event types.
// For now we process these under specific event types.
}
Tag::Hashtag(s) => {
update_feed_event(event.id, |er| {
er.hashtags.push(s.to_string());
})
.await;
}
Tag::Reference(r) => {
update_feed_event(event.id, |er| {
er.urls.push(r.to_string());
})
.await;
}
Tag::Geohash(_) => {} // not implemented
Tag::Subject(s) => {
update_feed_event(event.id, |er| {
er.subject = Some(s.to_string());
})
.await;
}
Tag::Nonce { .. } => {} // not implemented
Tag::Other { tag, data } => {
if tag == "client" && !data.is_empty() {
update_feed_event(event.id, |er| {
er.client = Some(data[0].to_string());
})
.await;
}
}
Tag::Empty => {} // nothing to do
}
}
if event.kind == EventKind::Reaction {
for tag in event.tags.iter() {
if let Tag::Event {
id,
recommended_relay_url: _,
marker: _,
} = tag
{
// last 'e' is the id reacted to
if event.content.starts_with('+') {
update_feed_event(id.to_owned(), |er| {
er.reactions.upvotes += 1;
})
.await;
} else if event.content.starts_with('-') {
update_feed_event(id.to_owned(), |er| {
er.reactions.downvotes += 1;
})
.await;
} else if event.content.is_empty() {
// consider it an upvote
update_feed_event(id.to_owned(), |er| {
er.reactions.upvotes += 1;
})
.await;
} else {
// consider it an emoji
update_feed_event(id.to_owned(), |er| {
// FIXME: If it exists, increment it
er.reactions
.emojis
.push((event.content.chars().next().unwrap(), 1))
})
.await;
}
}
}
}
Ok(())
}
async fn insert_event(event: &Event) {
let mut feed_events = GLOBALS.feed_events.lock().await;
feed_events
.entry(event.id)
.and_modify(|feed_event| {
// If the event already exists, update it's base data.
// (sometimes it is created to add feed data, but doesn't have base data yet)
feed_event.feed_related = event.kind == EventKind::TextNote;
if feed_event.last_reply_at.is_none() {
feed_event.last_reply_at = Some(event.created_at.0)
}
if feed_event.event.is_none() {
feed_event.event = Some(event.to_owned());
}
})
.or_insert_with(|| event.into());
}
async fn update_feed_event<F>(id: Id, mut f: F)
where
F: FnMut(&mut FeedEvent),
{
let mut feed_events = GLOBALS.feed_events.lock().await;
let feed_event = feed_events.entry(id).or_insert_with(|| FeedEvent::new(id));
f(feed_event);
}
pub async fn update_person_from_event_metadata(
pubkey: PublicKey,
created_at: Unixtime,
metadata: Metadata,
) {
let mut people = GLOBALS.people.lock().await;
let person = people
.entry(pubkey)
.or_insert_with(|| DbPerson::new(pubkey.into()));
// Do not update the metadata if ours is newer
if let Some(metadata_at) = person.metadata_at {
if created_at.0 <= metadata_at {
// Old metadata. Ignore it
return;
}
}
// Update the metadata
person.name = metadata.name;
person.about = metadata.about;
person.picture = metadata.picture;
if person.dns_id != metadata.nip05 {
person.dns_id = metadata.nip05;
person.dns_id_valid = 0; // changed, so reset to invalid
person.dns_id_last_checked = None; // we haven't checked this one yet
}
person.metadata_at = Some(created_at.0);
}
#[allow(dead_code)] #[allow(dead_code)]
async fn save_person(pubkey: PublicKey) -> Result<(), Error> { async fn save_person(pubkey: PublicKey) -> Result<(), Error> {
let mut people = GLOBALS.people.lock().await; let mut people = GLOBALS.people.lock().await;

View File

@ -1,8 +1,8 @@
use super::Minion; use super::Minion;
use crate::db::{DbEvent, DbPersonRelay}; use crate::db::DbPersonRelay;
use crate::{BusMessage, Error}; use crate::Error;
use nostr_types::{Event, RelayMessage, Unixtime}; use nostr_types::{RelayMessage, Unixtime};
use tracing::{error, info, trace, warn}; use tracing::{debug, error, info, warn};
impl Minion { impl Minion {
pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> { pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> {
@ -19,9 +19,8 @@ impl Minion {
if let Err(e) = event.verify(Some(maxtime)) { if let Err(e) = event.verify(Some(maxtime)) {
error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?) error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?)
} else { } else {
trace!("NEW EVENT ON {}", subid.0); debug!("NEW EVENT ON {}", subid.0);
DbEvent::save_nostr_event(&event, Some(self.url.clone())).await?; crate::process::process_new_event(&event, true, Some(self.url.clone())).await?;
self.send_overlord_newevent(*event).await?;
} }
} }
RelayMessage::Notice(msg) => { RelayMessage::Notice(msg) => {
@ -51,13 +50,4 @@ impl Minion {
Ok(()) Ok(())
} }
async fn send_overlord_newevent(&self, event: Event) -> Result<(), Error> {
self.to_overlord.send(BusMessage {
target: "overlord".to_string(),
kind: "new_event".to_string(),
json_payload: serde_json::to_string(&event)?,
})?;
Ok(())
}
} }

View File

@ -7,7 +7,7 @@ use crate::error::Error;
use crate::globals::GLOBALS; use crate::globals::GLOBALS;
use crate::settings::Settings; use crate::settings::Settings;
use minion::Minion; use minion::Minion;
use nostr_types::{Event, EventKind, Metadata, PrivateKey, PublicKey, PublicKeyHex, Unixtime, Url}; use nostr_types::{Event, PrivateKey, PublicKey, PublicKeyHex, Unixtime, Url};
use relay_picker::{BestRelay, RelayPicker}; use relay_picker::{BestRelay, RelayPicker};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::broadcast::Sender; use tokio::sync::broadcast::Sender;
@ -104,32 +104,9 @@ impl Overlord {
continue; continue;
} }
}; };
let metadata: Metadata = match serde_json::from_str(&dbevent.content) {
Ok(e) => e,
Err(_) => {
error!(
"Bad metadata: id={}, content={}",
dbevent.id, dbevent.content
);
continue;
}
};
// Update in globals // Process this metadata event to update people
crate::globals::update_person_from_event_metadata( crate::process::process_new_event(&e, false, None).await?;
e.pubkey,
e.created_at,
metadata.clone(),
)
.await;
// Update in database
DbPerson::update_metadata(
PublicKeyHex(e.pubkey.as_hex_string()),
metadata,
e.created_at,
)
.await?;
} }
} }
@ -154,7 +131,7 @@ impl Overlord {
let mut count = 0; let mut count = 0;
for event in events.iter() { for event in events.iter() {
count += 1; count += 1;
crate::globals::add_event(event).await?; crate::process::process_new_event(event, false, None).await?;
} }
info!("Loaded {} events from the database", count); info!("Loaded {} events from the database", count);
} }
@ -261,39 +238,6 @@ impl Overlord {
_ => {} _ => {}
}, },
"overlord" => match &*bus_message.kind { "overlord" => match &*bus_message.kind {
"new_event" => {
let event: Event = serde_json::from_str(&bus_message.json_payload)?;
// If feed-related, send to the feed event processor
if event.kind == EventKind::TextNote
|| event.kind == EventKind::EncryptedDirectMessage
|| event.kind == EventKind::EventDeletion
|| event.kind == EventKind::Reaction
{
crate::globals::add_event(&event).await?;
debug!("new feed event arrived: {}...", event.id.as_hex_string());
} else {
// Not Feed Related: Metadata, RecommendRelay, ContactList
debug!(
"new non-feed event arrived: {}...",
event.id.as_hex_string()
);
if event.kind == EventKind::Metadata {
let metadata: Metadata = serde_json::from_str(&event.content)?;
crate::globals::update_person_from_event_metadata(
event.pubkey,
event.created_at,
metadata,
)
.await;
}
// FIXME: Handle EventKind::RecommendedRelay
// FIXME: Handle EventKind::ContactList
}
}
"minion_is_ready" => {} "minion_is_ready" => {}
"save_settings" => { "save_settings" => {
let settings: Settings = serde_json::from_str(&bus_message.json_payload)?; let settings: Settings = serde_json::from_str(&bus_message.json_payload)?;

View File

@ -7,7 +7,11 @@ use nostr_types::{Event, EventKind, Metadata, Unixtime, Url};
// This processes a new event, saving the results into the database // This processes a new event, saving the results into the database
// and also populating the GLOBALS maps. // and also populating the GLOBALS maps.
#[allow(dead_code)] #[allow(dead_code)]
pub async fn process_new_event(event: &Event, from_relay: bool, seen_on: Option<Url>) -> Result<(), Error> { pub async fn process_new_event(
event: &Event,
from_relay: bool,
seen_on: Option<Url>,
) -> Result<(), Error> {
// Save the event into the database // Save the event into the database
if from_relay { if from_relay {
// Convert a nostr Event into a DbEvent // Convert a nostr Event into a DbEvent