Buffer up incoming events, require "Process N incoming events" button to process them.

This commit is contained in:
Mike Dilger 2022-12-30 09:13:52 +13:00
parent 0adfe4329f
commit 477fdbcf80
4 changed files with 34 additions and 2 deletions

View File

@ -33,6 +33,9 @@ pub struct Globals {
/// All nostr events, keyed by the event Id /// All nostr events, keyed by the event Id
pub events: RwLock<HashMap<Id, Event>>, pub events: RwLock<HashMap<Id, Event>>,
/// Events coming in from relays that are not processed yet
pub incoming_events: RwLock<Vec<(Event, Url)>>,
/// All relationships between events /// All relationships between events
pub relationships: RwLock<HashMap<Id, Vec<(Id, Relationship)>>>, pub relationships: RwLock<HashMap<Id, Vec<(Id, Relationship)>>>,
@ -82,6 +85,7 @@ lazy_static! {
to_overlord, to_overlord,
from_minions: Mutex::new(Some(from_minions)), from_minions: Mutex::new(Some(from_minions)),
events: RwLock::new(HashMap::new()), events: RwLock::new(HashMap::new()),
incoming_events: RwLock::new(Vec::new()),
relationships: RwLock::new(HashMap::new()), relationships: RwLock::new(HashMap::new()),
last_reply: RwLock::new(HashMap::new()), last_reply: RwLock::new(HashMap::new()),
desired_events: RwLock::new(HashMap::new()), desired_events: RwLock::new(HashMap::new()),

View File

@ -1,4 +1,5 @@
use super::Minion; use super::Minion;
use crate::globals::GLOBALS;
use crate::Error; use crate::Error;
use futures::SinkExt; use futures::SinkExt;
use nostr_types::{RelayMessage, Unixtime}; use nostr_types::{RelayMessage, Unixtime};
@ -31,7 +32,11 @@ impl Minion {
.unwrap_or_else(|| "_".to_owned()); .unwrap_or_else(|| "_".to_owned());
debug!("{}: {}: NEW EVENT", &self.url, handle); debug!("{}: {}: NEW EVENT", &self.url, handle);
crate::process::process_new_event(&event, true, Some(self.url.clone())).await?; GLOBALS
.incoming_events
.write()
.await
.push((*event, self.url.clone()));
} }
} }
RelayMessage::Notice(msg) => { RelayMessage::Notice(msg) => {

View File

@ -470,6 +470,11 @@ impl Overlord {
serde_json::from_str(&bus_message.json_payload)?; serde_json::from_str(&bus_message.json_payload)?;
self.post_reply(content, reply_to).await?; self.post_reply(content, reply_to).await?;
} }
"process_incoming_events" => {
for (event, url) in GLOBALS.incoming_events.write().await.drain(..) {
crate::process::process_new_event(&event, true, Some(url)).await?;
}
}
_ => {} _ => {}
}, },
_ => {} _ => {}

View File

@ -15,11 +15,15 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
Globals::trim_desired_events_sync(); Globals::trim_desired_events_sync();
GLOBALS.desired_events.blocking_read().len() GLOBALS.desired_events.blocking_read().len()
}; };
let incoming_count = GLOBALS.incoming_events.blocking_read().len();
ui.with_layout(Layout::right_to_left(Align::TOP), |ui| { ui.with_layout(Layout::right_to_left(Align::TOP), |ui| {
ui.with_layout(Layout::top_down(Align::Max), |ui| { ui.with_layout(Layout::top_down(Align::Max), |ui| {
if ui if ui
.button(&format!("Get {} missing events", desired_count)) .button(&format!(
"Query relays for {} missing events",
desired_count
))
.clicked() .clicked()
{ {
let tx = GLOBALS.to_overlord.clone(); let tx = GLOBALS.to_overlord.clone();
@ -30,6 +34,20 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
}); });
} }
}); });
ui.with_layout(Layout::top_down(Align::Max), |ui| {
if ui
.button(&format!("Process {} incoming events", incoming_count))
.clicked()
{
let tx = GLOBALS.to_overlord.clone();
let _ = tx.send(BusMessage {
target: "overlord".to_string(),
kind: "process_incoming_events".to_string(),
json_payload: serde_json::to_string("").unwrap(),
});
}
});
}); });
ui.vertical(|ui| { ui.vertical(|ui| {