From dbba43c87d05b7c9ac6e0212153698f5a7aa2713 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Fri, 6 Jan 2023 06:56:23 +1300 Subject: [PATCH] Pass subscription (handle) to process_new_events --- src/globals.rs | 5 +++-- src/overlord/minion/handle_websocket.rs | 10 ++++++++-- src/overlord/mod.rs | 16 ++++++++-------- src/process.rs | 1 + 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/globals.rs b/src/globals.rs index ff566029..149ffa8f 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -35,7 +35,8 @@ pub struct Globals { pub events: RwLock>, /// Events coming in from relays that are not processed yet - pub incoming_events: RwLock>, + /// stored with Url they came from and Subscription they came in on + pub incoming_events: RwLock)>>, /// All relationships between events pub relationships: RwLock>>, @@ -165,7 +166,7 @@ impl Globals { let mut count = 0; for event in events.iter() { count += 1; - crate::process::process_new_event(event, false, None).await?; + crate::process::process_new_event(event, false, None, None).await?; } tracing::info!("Loaded {} desired events from the database", count); } diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index 58f3913a..4265de69 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -31,7 +31,13 @@ impl Minion { tracing::debug!("{}: {}: NEW EVENT", &self.url, handle); // Try processing everything immediately - crate::process::process_new_event(&event, true, Some(self.url.clone())).await?; + crate::process::process_new_event( + &event, + true, + Some(self.url.clone()), + Some(handle), + ) + .await?; /* if event.kind == EventKind::TextNote { @@ -40,7 +46,7 @@ impl Minion { .incoming_events .write() .await - .push((*event, self.url.clone())); + .push((*event, self.url.clone(), handle)); } else { // Process everything else immediately crate::process::process_new_event(&event, true, Some(self.url.clone())) diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 38fe2196..2b1a0939 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -121,7 +121,7 @@ impl Overlord { }; // Process this metadata event to update people - crate::process::process_new_event(&e, false, None).await?; + crate::process::process_new_event(&e, false, None, None).await?; } } @@ -147,7 +147,7 @@ impl Overlord { let mut count = 0; for event in events.iter() { count += 1; - crate::process::process_new_event(event, false, None).await?; + crate::process::process_new_event(event, false, None, None).await?; } tracing::info!("Loaded {} events from the database", count); } @@ -492,9 +492,9 @@ impl Overlord { GLOBALS.event_is_new.write().await.clear(); let _ = tokio::spawn(async move { - for (event, url) in GLOBALS.incoming_events.write().await.drain(..) { - let _ = - crate::process::process_new_event(&event, true, Some(url)).await; + for (event, url, sub) in GLOBALS.incoming_events.write().await.drain(..) { + let _ = crate::process::process_new_event(&event, true, Some(url), sub) + .await; } }); } @@ -774,7 +774,7 @@ impl Overlord { } // Process the message for ourself - crate::process::process_new_event(&event, false, None).await?; + crate::process::process_new_event(&event, false, None, None).await?; Ok(()) } @@ -832,7 +832,7 @@ impl Overlord { } // Process the message for ourself - crate::process::process_new_event(&event, false, None).await?; + crate::process::process_new_event(&event, false, None, None).await?; Ok(()) } @@ -897,7 +897,7 @@ impl Overlord { } // Process the message for ourself - crate::process::process_new_event(&event, false, None).await?; + crate::process::process_new_event(&event, false, None, None).await?; Ok(()) } diff --git a/src/process.rs b/src/process.rs index 1b4c10ff..a63b672b 100644 --- a/src/process.rs +++ b/src/process.rs @@ -12,6 +12,7 @@ pub async fn process_new_event( event: &Event, from_relay: bool, seen_on: Option, + _subscription: Option, ) -> Result<(), Error> { // Save the event into the database if from_relay {