From 477fdbcf80e6e471c10ed0211b2b2ce324a09ce0 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Fri, 30 Dec 2022 09:13:52 +1300 Subject: [PATCH] Buffer up incoming events, require "Process N incoming events" button to process them. --- src/globals.rs | 4 ++++ src/overlord/minion/handle_websocket.rs | 7 ++++++- src/overlord/mod.rs | 5 +++++ src/ui/feed.rs | 20 +++++++++++++++++++- 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/globals.rs b/src/globals.rs index 8dc9e192..5071a680 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -33,6 +33,9 @@ pub struct Globals { /// All nostr events, keyed by the event Id pub events: RwLock>, + /// Events coming in from relays that are not processed yet + pub incoming_events: RwLock>, + /// All relationships between events pub relationships: RwLock>>, @@ -82,6 +85,7 @@ lazy_static! { to_overlord, from_minions: Mutex::new(Some(from_minions)), events: RwLock::new(HashMap::new()), + incoming_events: RwLock::new(Vec::new()), relationships: RwLock::new(HashMap::new()), last_reply: RwLock::new(HashMap::new()), desired_events: RwLock::new(HashMap::new()), diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index 700ec4d0..37095c8a 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -1,4 +1,5 @@ use super::Minion; +use crate::globals::GLOBALS; use crate::Error; use futures::SinkExt; use nostr_types::{RelayMessage, Unixtime}; @@ -31,7 +32,11 @@ impl Minion { .unwrap_or_else(|| "_".to_owned()); debug!("{}: {}: NEW EVENT", &self.url, handle); - crate::process::process_new_event(&event, true, Some(self.url.clone())).await?; + GLOBALS + .incoming_events + .write() + .await + .push((*event, self.url.clone())); } } RelayMessage::Notice(msg) => { diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 17db4787..72b5fd0c 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -470,6 +470,11 @@ impl Overlord { serde_json::from_str(&bus_message.json_payload)?; self.post_reply(content, reply_to).await?; } + "process_incoming_events" => { + for (event, url) in GLOBALS.incoming_events.write().await.drain(..) { + crate::process::process_new_event(&event, true, Some(url)).await?; + } + } _ => {} }, _ => {} diff --git a/src/ui/feed.rs b/src/ui/feed.rs index 3ab63556..b0b458c8 100644 --- a/src/ui/feed.rs +++ b/src/ui/feed.rs @@ -15,11 +15,15 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram Globals::trim_desired_events_sync(); GLOBALS.desired_events.blocking_read().len() }; + let incoming_count = GLOBALS.incoming_events.blocking_read().len(); ui.with_layout(Layout::right_to_left(Align::TOP), |ui| { ui.with_layout(Layout::top_down(Align::Max), |ui| { if ui - .button(&format!("Get {} missing events", desired_count)) + .button(&format!( + "Query relays for {} missing events", + desired_count + )) .clicked() { let tx = GLOBALS.to_overlord.clone(); @@ -30,6 +34,20 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram }); } }); + + ui.with_layout(Layout::top_down(Align::Max), |ui| { + if ui + .button(&format!("Process {} incoming events", incoming_count)) + .clicked() + { + let tx = GLOBALS.to_overlord.clone(); + let _ = tx.send(BusMessage { + target: "overlord".to_string(), + kind: "process_incoming_events".to_string(), + json_payload: serde_json::to_string("").unwrap(), + }); + } + }); }); ui.vertical(|ui| {