From 3c17d49124fa984b73489a28f13025b3ffda5774 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sat, 13 May 2023 19:10:37 +1200 Subject: [PATCH] Add a job_id to minions and overlords can communicate about finished jobs --- src/comms.rs | 10 +++- src/feed.rs | 37 +++++++++--- src/overlord/minion/mod.rs | 30 +++++----- src/overlord/mod.rs | 115 ++++++++++++++++++++++++++++--------- 4 files changed, 142 insertions(+), 50 deletions(-) diff --git a/src/comms.rs b/src/comms.rs index 1b428876..108f316c 100644 --- a/src/comms.rs +++ b/src/comms.rs @@ -55,7 +55,15 @@ pub struct ToMinionMessage { } #[derive(Debug, Clone)] -pub enum ToMinionPayload { +pub struct ToMinionPayload { + /// A job id, so the minion and overlord can talk about the job. + pub job_id: u64, + + pub detail: ToMinionPayloadDetail, +} + +#[derive(Debug, Clone)] +pub enum ToMinionPayloadDetail { FetchEvent(IdHex), PostEvent(Box), PullFollowing, diff --git a/src/feed.rs b/src/feed.rs index 294cf53b..4c048e53 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -1,4 +1,4 @@ -use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage}; +use crate::comms::{ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage}; use crate::error::Error; use crate::globals::GLOBALS; use nostr_types::{EventDelegation, EventKind, Id, PublicKeyHex, RelayUrl, Unixtime}; @@ -68,11 +68,17 @@ impl Feed { // When going to Followed or Inbox, we stop listening for Thread/Person events let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::UnsubscribeThreadFeed, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::UnsubscribeThreadFeed, + } }); let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::UnsubscribePersonFeed, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::UnsubscribePersonFeed, + } }); } @@ -86,11 +92,17 @@ impl Feed { // When going to Followed or Inbox, we stop listening for Thread/Person events let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::UnsubscribeThreadFeed, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::UnsubscribeThreadFeed, + } }); let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::UnsubscribePersonFeed, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::UnsubscribePersonFeed, + } }); } @@ -106,7 +118,10 @@ impl Feed { let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::UnsubscribePersonFeed, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::UnsubscribePersonFeed, + } }); let _ = GLOBALS @@ -123,11 +138,17 @@ impl Feed { let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::UnsubscribeThreadFeed, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::UnsubscribeThreadFeed, + } }); let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::SubscribePersonFeed(pubkey), + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::SubscribePersonFeed(pubkey), + } }); } diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index 910d8cce..2f288562 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -1,7 +1,7 @@ mod handle_websocket; mod subscription; -use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage}; +use crate::comms::{ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage}; use crate::db::DbRelay; use crate::error::Error; use crate::globals::GLOBALS; @@ -301,11 +301,11 @@ impl Minion { } pub async fn handle_overlord_message(&mut self, message: ToMinionPayload) -> Result<(), Error> { - match message { - ToMinionPayload::FetchEvent(id) => { + match message.detail { + ToMinionPayloadDetail::FetchEvent(id) => { self.get_event(id).await?; } - ToMinionPayload::PostEvent(event) => { + ToMinionPayloadDetail::PostEvent(event) => { let id = event.id; self.postings.insert(id); let msg = ClientMessage::Event(event); @@ -314,38 +314,38 @@ impl Minion { ws_stream.send(WsMessage::Text(wire)).await?; tracing::info!("Posted event to {}", &self.url); } - ToMinionPayload::PullFollowing => { + ToMinionPayloadDetail::PullFollowing => { self.pull_following().await?; } - ToMinionPayload::Shutdown => { + ToMinionPayloadDetail::Shutdown => { tracing::info!("{}: Websocket listener shutting down", &self.url); self.keepgoing = false; } - ToMinionPayload::SubscribeGeneralFeed(pubkeys) => { + ToMinionPayloadDetail::SubscribeGeneralFeed(pubkeys) => { self.subscribe_general_feed(pubkeys).await?; } - ToMinionPayload::SubscribeMentions => { + ToMinionPayloadDetail::SubscribeMentions => { self.subscribe_mentions().await?; } - ToMinionPayload::SubscribeConfig => { + ToMinionPayloadDetail::SubscribeConfig => { self.subscribe_config().await?; } - ToMinionPayload::SubscribeDiscover(pubkeys) => { + ToMinionPayloadDetail::SubscribeDiscover(pubkeys) => { self.subscribe_discover(pubkeys).await?; } - ToMinionPayload::SubscribePersonFeed(pubkeyhex) => { + ToMinionPayloadDetail::SubscribePersonFeed(pubkeyhex) => { self.subscribe_person_feed(pubkeyhex).await?; } - ToMinionPayload::SubscribeThreadFeed(main, parents) => { + ToMinionPayloadDetail::SubscribeThreadFeed(main, parents) => { self.subscribe_thread_feed(main, parents).await?; } - ToMinionPayload::TempSubscribeMetadata(pubkeyhexs) => { + ToMinionPayloadDetail::TempSubscribeMetadata(pubkeyhexs) => { self.temp_subscribe_metadata(pubkeyhexs).await?; } - ToMinionPayload::UnsubscribePersonFeed => { + ToMinionPayloadDetail::UnsubscribePersonFeed => { self.unsubscribe("person_feed").await?; } - ToMinionPayload::UnsubscribeThreadFeed => { + ToMinionPayloadDetail::UnsubscribeThreadFeed => { self.unsubscribe("thread_feed").await?; } } diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index e7a43380..0f7a6cd9 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -1,6 +1,6 @@ mod minion; -use crate::comms::{RelayJob, ToMinionMessage, ToMinionPayload, ToOverlordMessage}; +use crate::comms::{RelayJob, ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage}; use crate::db::{DbEvent, DbEventFlags, DbEventRelay, DbPersonRelay, DbRelay}; use crate::error::{Error, ErrorKind}; use crate::globals::GLOBALS; @@ -61,7 +61,10 @@ impl Overlord { // so just ignore it and keep shutting down. let _ = self.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::Shutdown, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::Shutdown, + } }); tracing::info!("Overlord waiting for minions to all shutdown"); @@ -255,7 +258,10 @@ impl Overlord { relay_url.to_owned(), vec![RelayJob { reason: "discovery", - payload: ToMinionPayload::SubscribeDiscover(followed.clone()), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::SubscribeDiscover(followed.clone()), + }, persistent: true }]).await?; } @@ -268,7 +274,10 @@ impl Overlord { relay_url.to_owned(), vec![RelayJob { reason: "config", - payload: ToMinionPayload::SubscribeConfig, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::SubscribeConfig, + }, persistent: true }]).await?; } @@ -283,7 +292,10 @@ impl Overlord { relay_url.to_owned(), vec![RelayJob { reason: "mentions", - payload: ToMinionPayload::SubscribeMentions, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::SubscribeMentions, + }, persistent: true, }]).await?; } @@ -339,13 +351,19 @@ impl Overlord { assignment.relay_url.clone(), vec![RelayJob { reason: "follow", - payload: ToMinionPayload::SubscribeGeneralFeed(assignment.pubkeys.clone()), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::SubscribeGeneralFeed(assignment.pubkeys.clone()), + }, persistent: false },RelayJob { // Until NIP-65 is in widespread use, we should listen for mentions // of us on all these relays too reason: "mentions", - payload: ToMinionPayload::SubscribeMentions, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::SubscribeMentions, + }, persistent: false }]).await?; @@ -367,7 +385,7 @@ impl Overlord { for job in jobs.drain(..) { let _ = self.to_minions.send(ToMinionMessage { target: url.0.clone(), - payload: job.payload.clone() + payload: job.payload.clone(), }); // And record @@ -563,7 +581,10 @@ impl Overlord { ToOverlordMessage::DropRelay(relay_url) => { let _ = self.to_minions.send(ToMinionMessage { target: relay_url.0, - payload: ToMinionPayload::Shutdown, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::Shutdown, + } }); } ToOverlordMessage::FetchEvent(id, relay_urls) => { @@ -573,7 +594,10 @@ impl Overlord { url.to_owned(), vec![RelayJob { reason: "fetch-event", - payload: ToMinionPayload::FetchEvent(id.into()), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::FetchEvent(id.into()), + }, persistent: false }]).await?; } @@ -742,7 +766,10 @@ impl Overlord { relay_url.to_owned(), vec![RelayJob { reason: "tmp-metadata", - payload: ToMinionPayload::TempSubscribeMetadata(vec![pubkey.clone()]), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::TempSubscribeMetadata(vec![pubkey.clone()]), + }, persistent: false }]).await?; } @@ -770,7 +797,10 @@ impl Overlord { relay_url.clone(), vec![RelayJob { reason: "tmp-metadata", - payload: ToMinionPayload::TempSubscribeMetadata(pubkeys), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys), + }, persistent: false }]).await?; } @@ -995,7 +1025,10 @@ impl Overlord { url.clone(), vec![RelayJob { reason: "posting", - payload: ToMinionPayload::PostEvent(Box::new(event.clone())), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())), + }, persistent: false }]).await?; } @@ -1055,7 +1088,10 @@ impl Overlord { relay_url.to_owned(), vec![RelayJob { reason: "advertising", - payload: ToMinionPayload::PostEvent(Box::new(event.clone())), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())), + }, persistent: false, }]).await?; } @@ -1128,7 +1164,10 @@ impl Overlord { relay.url.clone(), vec![RelayJob { reason: "post-like", - payload: ToMinionPayload::PostEvent(Box::new(event.clone())), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())), + }, persistent: false }]).await?; } @@ -1151,7 +1190,10 @@ impl Overlord { relay.url.clone(), vec![RelayJob { reason: "pull-contacts", - payload: ToMinionPayload::PullFollowing, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PullFollowing, + }, persistent: false, }]).await?; } @@ -1173,7 +1215,10 @@ impl Overlord { relay.url.clone(), vec![RelayJob { reason: "pushing-contacts", - payload: ToMinionPayload::PostEvent(Box::new(event.clone())), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())), + }, persistent: false }]).await?; } @@ -1214,7 +1259,10 @@ impl Overlord { relay.url.clone(), vec![RelayJob { reason: "write-metadata", - payload: ToMinionPayload::PostEvent(Box::new(event.clone())), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())), + }, persistent: false }]).await?; } @@ -1248,7 +1296,10 @@ impl Overlord { url.clone(), vec![RelayJob { reason: "tmp-metadata", - payload: ToMinionPayload::TempSubscribeMetadata(pubkeys), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys), + }, persistent: false, }]).await?; } @@ -1344,7 +1395,10 @@ impl Overlord { url.clone(), vec![RelayJob { reason: "reposting", - payload: ToMinionPayload::PostEvent(Box::new(event.clone())), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())), + }, persistent: false }]).await?; } @@ -1483,7 +1537,10 @@ impl Overlord { // Cancel current thread subscriptions, if any let _ = self.to_minions.send(ToMinionMessage { target: "all".to_string(), - payload: ToMinionPayload::UnsubscribeThreadFeed, + payload: ToMinionPayload { + job_id: 0, + detail: ToMinionPayloadDetail::UnsubscribeThreadFeed, + } }); for url in relays.iter() { @@ -1492,10 +1549,13 @@ impl Overlord { url.to_owned(), vec![RelayJob { reason: "read-thread", - payload: ToMinionPayload::SubscribeThreadFeed( - id.into(), - missing_ancestors_hex.clone(), - ), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::SubscribeThreadFeed( + id.into(), + missing_ancestors_hex.clone(), + ), + }, persistent: false, }]).await?; } @@ -1598,7 +1658,10 @@ impl Overlord { url.to_owned(), vec![RelayJob { reason: "deleting", - payload: ToMinionPayload::PostEvent(Box::new(event.clone())), + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())), + }, persistent: false }]).await?; }