mirror of
https://github.com/mikedilger/gossip.git
synced 2024-09-19 19:46:50 +00:00
Improve event processing/threading code
This commit is contained in:
parent
52ae2236db
commit
dd04a5df19
176
src/globals.rs
176
src/globals.rs
@ -2,12 +2,12 @@ use crate::comms::BusMessage;
|
|||||||
use crate::db::{DbPerson, DbPersonRelay, DbRelay};
|
use crate::db::{DbPerson, DbPersonRelay, DbRelay};
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::feed_event::FeedEvent;
|
use crate::feed_event::FeedEvent;
|
||||||
use nostr_proto::{Event, EventKind, Id, Metadata, PublicKey, PublicKeyHex, Tag, Unixtime};
|
use nostr_proto::{Event, EventKind, Id, Metadata, PublicKey, PublicKeyHex, Tag, Unixtime, Url};
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use tokio::sync::{broadcast, mpsc, Mutex};
|
use tokio::sync::{broadcast, mpsc, Mutex};
|
||||||
use tracing::warn;
|
use tracing::{trace, warn};
|
||||||
|
|
||||||
/// Only one of these is ever created, via lazy_static!, and represents
|
/// Only one of these is ever created, via lazy_static!, and represents
|
||||||
/// global state for the rust application
|
/// global state for the rust application
|
||||||
@ -30,6 +30,10 @@ pub struct Globals {
|
|||||||
/// All nostr event related data, keyed by the event Id
|
/// All nostr event related data, keyed by the event Id
|
||||||
pub feed_events: Mutex<HashMap<Id, FeedEvent>>,
|
pub feed_events: Mutex<HashMap<Id, FeedEvent>>,
|
||||||
|
|
||||||
|
/// Desired events, referred to by others, with possible URLs where we can
|
||||||
|
/// get them. We may already have these, but if not we should ask for them.
|
||||||
|
pub desired_events: Mutex<HashMap<Id, Vec<Url>>>,
|
||||||
|
|
||||||
/// All nostr people records currently loaded into memory, keyed by pubkey
|
/// All nostr people records currently loaded into memory, keyed by pubkey
|
||||||
pub people: Mutex<HashMap<PublicKey, DbPerson>>,
|
pub people: Mutex<HashMap<PublicKey, DbPerson>>,
|
||||||
|
|
||||||
@ -53,6 +57,7 @@ lazy_static! {
|
|||||||
to_overlord,
|
to_overlord,
|
||||||
from_minions: Mutex::new(Some(from_minions)),
|
from_minions: Mutex::new(Some(from_minions)),
|
||||||
feed_events: Mutex::new(HashMap::new()),
|
feed_events: Mutex::new(HashMap::new()),
|
||||||
|
desired_events: Mutex::new(HashMap::new()),
|
||||||
people: Mutex::new(HashMap::new()),
|
people: Mutex::new(HashMap::new()),
|
||||||
need_password: AtomicBool::new(false),
|
need_password: AtomicBool::new(false),
|
||||||
}
|
}
|
||||||
@ -118,6 +123,118 @@ pub async fn add_event(event: &Event) -> Result<(), Error> {
|
|||||||
// Insert the event
|
// Insert the event
|
||||||
insert_event(event).await;
|
insert_event(event).await;
|
||||||
|
|
||||||
|
// Deal with 'e' tags on kind 1 5 and 7 (they have no meaning elsewhere)
|
||||||
|
// we dont support EncryptedDirectMessage
|
||||||
|
if event.kind == EventKind::TextNote || event.kind == EventKind::Reaction {
|
||||||
|
let mut root: Option<Id> = None;
|
||||||
|
let mut reply: Option<Id> = None;
|
||||||
|
let mut refer: Vec<Id> = Vec::new(); // this is not used yet
|
||||||
|
|
||||||
|
// Count the 'e' tags first
|
||||||
|
let count = event
|
||||||
|
.tags
|
||||||
|
.iter()
|
||||||
|
.filter(|t| match t {
|
||||||
|
Tag::Event { .. } => true,
|
||||||
|
_ => false,
|
||||||
|
})
|
||||||
|
.count();
|
||||||
|
|
||||||
|
for (n, tag) in event
|
||||||
|
.tags
|
||||||
|
.iter()
|
||||||
|
.filter(|t| match t {
|
||||||
|
Tag::Event { .. } => true,
|
||||||
|
_ => false,
|
||||||
|
})
|
||||||
|
.enumerate()
|
||||||
|
{
|
||||||
|
if let Tag::Event {
|
||||||
|
id,
|
||||||
|
recommended_relay_url,
|
||||||
|
marker,
|
||||||
|
} = tag
|
||||||
|
{
|
||||||
|
// Add the id to desired events list
|
||||||
|
{
|
||||||
|
let mut desired_events = GLOBALS.desired_events.lock().await;
|
||||||
|
desired_events
|
||||||
|
.entry(*id)
|
||||||
|
.and_modify(|urls| {
|
||||||
|
if let Some(url) = recommended_relay_url {
|
||||||
|
urls.push(url.to_owned());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.or_insert_with(|| {
|
||||||
|
if let Some(url) = recommended_relay_url {
|
||||||
|
vec![url.to_owned()]
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort out root, reply and refer
|
||||||
|
if marker.is_some() {
|
||||||
|
if marker.as_ref().unwrap() == "root" {
|
||||||
|
root = Some(*id);
|
||||||
|
} else if marker.as_ref().unwrap() == "reply" {
|
||||||
|
reply = Some(*id);
|
||||||
|
} else {
|
||||||
|
warn!("Unknown event tag marker: {}", marker.as_ref().unwrap());
|
||||||
|
}
|
||||||
|
} else if n == count - 1 {
|
||||||
|
reply = Some(*id);
|
||||||
|
} else if n == 0 {
|
||||||
|
root = Some(*id);
|
||||||
|
} else {
|
||||||
|
refer.push(*id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(id) = reply {
|
||||||
|
// Mark our event in reply to
|
||||||
|
update_feed_event(event.id, |this_event| {
|
||||||
|
this_event.in_reply_to = Some(id);
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Mark the parent event as having us as a reply
|
||||||
|
update_feed_event(id, |parent_event| {
|
||||||
|
parent_event.replies.push(event.id);
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Get our last_reply_at for propogating upwards
|
||||||
|
let mut last_reply_at = event.created_at.0;
|
||||||
|
|
||||||
|
// Update the last_reply_at all the way up the chain
|
||||||
|
let mut xid = id;
|
||||||
|
loop {
|
||||||
|
let mut in_reply_to: Option<Id> = None;
|
||||||
|
update_feed_event(xid, |ancestor_event| {
|
||||||
|
if let Some(other) = ancestor_event.last_reply_at {
|
||||||
|
last_reply_at = other.max(last_reply_at);
|
||||||
|
ancestor_event.last_reply_at = Some(last_reply_at);
|
||||||
|
} else {
|
||||||
|
ancestor_event.last_reply_at = Some(last_reply_at);
|
||||||
|
}
|
||||||
|
in_reply_to = ancestor_event.in_reply_to; // next up the chain
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
xid = match in_reply_to {
|
||||||
|
Some(ref id) => *id,
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We ignore 'root' and 'refer'.
|
||||||
|
if let Some(id) = root { trace!("event root = {}", id.as_hex_string()); }
|
||||||
|
}
|
||||||
|
|
||||||
// Some kinds seen in the wild:
|
// Some kinds seen in the wild:
|
||||||
// nonce, p, e, t, client, content-warning,
|
// nonce, p, e, t, client, content-warning,
|
||||||
// subject, h, i, nostril, r, hashtag
|
// subject, h, i, nostril, r, hashtag
|
||||||
@ -129,45 +246,9 @@ pub async fn add_event(event: &Event) -> Result<(), Error> {
|
|||||||
Tag::Event {
|
Tag::Event {
|
||||||
id,
|
id,
|
||||||
recommended_relay_url: _,
|
recommended_relay_url: _,
|
||||||
marker,
|
marker: _,
|
||||||
} => {
|
} => {
|
||||||
if event.kind == EventKind::TextNote {
|
if event.kind == EventKind::EventDeletion {
|
||||||
if let Some(m) = marker {
|
|
||||||
if m == "reply" {
|
|
||||||
// Mark our 'in_reply_to'
|
|
||||||
update_feed_event(event.id, |er| {
|
|
||||||
er.in_reply_to = Some(*id);
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// Add ourself to the parent's replies
|
|
||||||
update_feed_event(*id, |er| {
|
|
||||||
er.replies.push(event.id);
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// Update the last_reply_at all the way up the chain
|
|
||||||
let mut xid = *id;
|
|
||||||
loop {
|
|
||||||
let mut in_reply_to: Option<Id> = None;
|
|
||||||
update_feed_event(xid, |er| {
|
|
||||||
if let Some(other) = er.last_reply_at {
|
|
||||||
er.last_reply_at = Some(other.max(event.created_at.0));
|
|
||||||
} else {
|
|
||||||
er.last_reply_at = Some(event.created_at.0);
|
|
||||||
}
|
|
||||||
in_reply_to = er.in_reply_to;
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
xid = match in_reply_to {
|
|
||||||
Some(ref id) => *id,
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if event.kind == EventKind::EventDeletion {
|
|
||||||
// Find the other event
|
// Find the other event
|
||||||
let maybe_other_event = GLOBALS.feed_events.lock().await.get(id).cloned();
|
let maybe_other_event = GLOBALS.feed_events.lock().await.get(id).cloned();
|
||||||
if let Some(deleted_feed_event) = maybe_other_event {
|
if let Some(deleted_feed_event) = maybe_other_event {
|
||||||
@ -279,7 +360,20 @@ pub async fn add_event(event: &Event) -> Result<(), Error> {
|
|||||||
|
|
||||||
async fn insert_event(event: &Event) {
|
async fn insert_event(event: &Event) {
|
||||||
let mut feed_events = GLOBALS.feed_events.lock().await;
|
let mut feed_events = GLOBALS.feed_events.lock().await;
|
||||||
feed_events.insert(event.id, event.into());
|
|
||||||
|
feed_events.entry(event.id)
|
||||||
|
.and_modify(|feed_event| {
|
||||||
|
// If the event already exists, update it's base data.
|
||||||
|
// (sometimes it is created to add feed data, but doesn't have base data yet)
|
||||||
|
feed_event.feed_related = event.kind==EventKind::TextNote;
|
||||||
|
if feed_event.last_reply_at.is_none() {
|
||||||
|
feed_event.last_reply_at = Some(event.created_at.0)
|
||||||
|
}
|
||||||
|
if feed_event.event.is_none() {
|
||||||
|
feed_event.event = Some(event.to_owned());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.or_insert(event.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_feed_event<F>(id: Id, mut f: F)
|
async fn update_feed_event<F>(id: Id, mut f: F)
|
||||||
|
Loading…
Reference in New Issue
Block a user