Add a job_id to minions and overlords can communicate about finished jobs

This commit is contained in:
Mike Dilger 2023-05-13 19:10:37 +12:00
parent 0a707c2cf3
commit 3c17d49124
4 changed files with 142 additions and 50 deletions

View File

@ -55,7 +55,15 @@ pub struct ToMinionMessage {
}
#[derive(Debug, Clone)]
pub enum ToMinionPayload {
pub struct ToMinionPayload {
/// A job id, so the minion and overlord can talk about the job.
pub job_id: u64,
pub detail: ToMinionPayloadDetail,
}
#[derive(Debug, Clone)]
pub enum ToMinionPayloadDetail {
FetchEvent(IdHex),
PostEvent(Box<Event>),
PullFollowing,

View File

@ -1,4 +1,4 @@
use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage};
use crate::comms::{ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage};
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_types::{EventDelegation, EventKind, Id, PublicKeyHex, RelayUrl, Unixtime};
@ -68,11 +68,17 @@ impl Feed {
// When going to Followed or Inbox, we stop listening for Thread/Person events
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribeThreadFeed,
}
});
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribePersonFeed,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribePersonFeed,
}
});
}
@ -86,11 +92,17 @@ impl Feed {
// When going to Followed or Inbox, we stop listening for Thread/Person events
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribeThreadFeed,
}
});
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribePersonFeed,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribePersonFeed,
}
});
}
@ -106,7 +118,10 @@ impl Feed {
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribePersonFeed,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribePersonFeed,
}
});
let _ =
GLOBALS
@ -123,11 +138,17 @@ impl Feed {
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribeThreadFeed,
}
});
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::SubscribePersonFeed(pubkey),
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::SubscribePersonFeed(pubkey),
}
});
}

View File

@ -1,7 +1,7 @@
mod handle_websocket;
mod subscription;
use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage};
use crate::comms::{ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage};
use crate::db::DbRelay;
use crate::error::Error;
use crate::globals::GLOBALS;
@ -301,11 +301,11 @@ impl Minion {
}
pub async fn handle_overlord_message(&mut self, message: ToMinionPayload) -> Result<(), Error> {
match message {
ToMinionPayload::FetchEvent(id) => {
match message.detail {
ToMinionPayloadDetail::FetchEvent(id) => {
self.get_event(id).await?;
}
ToMinionPayload::PostEvent(event) => {
ToMinionPayloadDetail::PostEvent(event) => {
let id = event.id;
self.postings.insert(id);
let msg = ClientMessage::Event(event);
@ -314,38 +314,38 @@ impl Minion {
ws_stream.send(WsMessage::Text(wire)).await?;
tracing::info!("Posted event to {}", &self.url);
}
ToMinionPayload::PullFollowing => {
ToMinionPayloadDetail::PullFollowing => {
self.pull_following().await?;
}
ToMinionPayload::Shutdown => {
ToMinionPayloadDetail::Shutdown => {
tracing::info!("{}: Websocket listener shutting down", &self.url);
self.keepgoing = false;
}
ToMinionPayload::SubscribeGeneralFeed(pubkeys) => {
ToMinionPayloadDetail::SubscribeGeneralFeed(pubkeys) => {
self.subscribe_general_feed(pubkeys).await?;
}
ToMinionPayload::SubscribeMentions => {
ToMinionPayloadDetail::SubscribeMentions => {
self.subscribe_mentions().await?;
}
ToMinionPayload::SubscribeConfig => {
ToMinionPayloadDetail::SubscribeConfig => {
self.subscribe_config().await?;
}
ToMinionPayload::SubscribeDiscover(pubkeys) => {
ToMinionPayloadDetail::SubscribeDiscover(pubkeys) => {
self.subscribe_discover(pubkeys).await?;
}
ToMinionPayload::SubscribePersonFeed(pubkeyhex) => {
ToMinionPayloadDetail::SubscribePersonFeed(pubkeyhex) => {
self.subscribe_person_feed(pubkeyhex).await?;
}
ToMinionPayload::SubscribeThreadFeed(main, parents) => {
ToMinionPayloadDetail::SubscribeThreadFeed(main, parents) => {
self.subscribe_thread_feed(main, parents).await?;
}
ToMinionPayload::TempSubscribeMetadata(pubkeyhexs) => {
ToMinionPayloadDetail::TempSubscribeMetadata(pubkeyhexs) => {
self.temp_subscribe_metadata(pubkeyhexs).await?;
}
ToMinionPayload::UnsubscribePersonFeed => {
ToMinionPayloadDetail::UnsubscribePersonFeed => {
self.unsubscribe("person_feed").await?;
}
ToMinionPayload::UnsubscribeThreadFeed => {
ToMinionPayloadDetail::UnsubscribeThreadFeed => {
self.unsubscribe("thread_feed").await?;
}
}

View File

@ -1,6 +1,6 @@
mod minion;
use crate::comms::{RelayJob, ToMinionMessage, ToMinionPayload, ToOverlordMessage};
use crate::comms::{RelayJob, ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage};
use crate::db::{DbEvent, DbEventFlags, DbEventRelay, DbPersonRelay, DbRelay};
use crate::error::{Error, ErrorKind};
use crate::globals::GLOBALS;
@ -61,7 +61,10 @@ impl Overlord {
// so just ignore it and keep shutting down.
let _ = self.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::Shutdown,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::Shutdown,
}
});
tracing::info!("Overlord waiting for minions to all shutdown");
@ -255,7 +258,10 @@ impl Overlord {
relay_url.to_owned(),
vec![RelayJob {
reason: "discovery",
payload: ToMinionPayload::SubscribeDiscover(followed.clone()),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeDiscover(followed.clone()),
},
persistent: true
}]).await?;
}
@ -268,7 +274,10 @@ impl Overlord {
relay_url.to_owned(),
vec![RelayJob {
reason: "config",
payload: ToMinionPayload::SubscribeConfig,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeConfig,
},
persistent: true
}]).await?;
}
@ -283,7 +292,10 @@ impl Overlord {
relay_url.to_owned(),
vec![RelayJob {
reason: "mentions",
payload: ToMinionPayload::SubscribeMentions,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeMentions,
},
persistent: true,
}]).await?;
}
@ -339,13 +351,19 @@ impl Overlord {
assignment.relay_url.clone(),
vec![RelayJob {
reason: "follow",
payload: ToMinionPayload::SubscribeGeneralFeed(assignment.pubkeys.clone()),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeGeneralFeed(assignment.pubkeys.clone()),
},
persistent: false
},RelayJob {
// Until NIP-65 is in widespread use, we should listen for mentions
// of us on all these relays too
reason: "mentions",
payload: ToMinionPayload::SubscribeMentions,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeMentions,
},
persistent: false
}]).await?;
@ -367,7 +385,7 @@ impl Overlord {
for job in jobs.drain(..) {
let _ = self.to_minions.send(ToMinionMessage {
target: url.0.clone(),
payload: job.payload.clone()
payload: job.payload.clone(),
});
// And record
@ -563,7 +581,10 @@ impl Overlord {
ToOverlordMessage::DropRelay(relay_url) => {
let _ = self.to_minions.send(ToMinionMessage {
target: relay_url.0,
payload: ToMinionPayload::Shutdown,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::Shutdown,
}
});
}
ToOverlordMessage::FetchEvent(id, relay_urls) => {
@ -573,7 +594,10 @@ impl Overlord {
url.to_owned(),
vec![RelayJob {
reason: "fetch-event",
payload: ToMinionPayload::FetchEvent(id.into()),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::FetchEvent(id.into()),
},
persistent: false
}]).await?;
}
@ -742,7 +766,10 @@ impl Overlord {
relay_url.to_owned(),
vec![RelayJob {
reason: "tmp-metadata",
payload: ToMinionPayload::TempSubscribeMetadata(vec![pubkey.clone()]),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribeMetadata(vec![pubkey.clone()]),
},
persistent: false
}]).await?;
}
@ -770,7 +797,10 @@ impl Overlord {
relay_url.clone(),
vec![RelayJob {
reason: "tmp-metadata",
payload: ToMinionPayload::TempSubscribeMetadata(pubkeys),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys),
},
persistent: false
}]).await?;
}
@ -995,7 +1025,10 @@ impl Overlord {
url.clone(),
vec![RelayJob {
reason: "posting",
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())),
},
persistent: false
}]).await?;
}
@ -1055,7 +1088,10 @@ impl Overlord {
relay_url.to_owned(),
vec![RelayJob {
reason: "advertising",
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())),
},
persistent: false,
}]).await?;
}
@ -1128,7 +1164,10 @@ impl Overlord {
relay.url.clone(),
vec![RelayJob {
reason: "post-like",
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())),
},
persistent: false
}]).await?;
}
@ -1151,7 +1190,10 @@ impl Overlord {
relay.url.clone(),
vec![RelayJob {
reason: "pull-contacts",
payload: ToMinionPayload::PullFollowing,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PullFollowing,
},
persistent: false,
}]).await?;
}
@ -1173,7 +1215,10 @@ impl Overlord {
relay.url.clone(),
vec![RelayJob {
reason: "pushing-contacts",
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())),
},
persistent: false
}]).await?;
}
@ -1214,7 +1259,10 @@ impl Overlord {
relay.url.clone(),
vec![RelayJob {
reason: "write-metadata",
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())),
},
persistent: false
}]).await?;
}
@ -1248,7 +1296,10 @@ impl Overlord {
url.clone(),
vec![RelayJob {
reason: "tmp-metadata",
payload: ToMinionPayload::TempSubscribeMetadata(pubkeys),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys),
},
persistent: false,
}]).await?;
}
@ -1344,7 +1395,10 @@ impl Overlord {
url.clone(),
vec![RelayJob {
reason: "reposting",
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())),
},
persistent: false
}]).await?;
}
@ -1483,7 +1537,10 @@ impl Overlord {
// Cancel current thread subscriptions, if any
let _ = self.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribeThreadFeed,
}
});
for url in relays.iter() {
@ -1492,10 +1549,13 @@ impl Overlord {
url.to_owned(),
vec![RelayJob {
reason: "read-thread",
payload: ToMinionPayload::SubscribeThreadFeed(
id.into(),
missing_ancestors_hex.clone(),
),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeThreadFeed(
id.into(),
missing_ancestors_hex.clone(),
),
},
persistent: false,
}]).await?;
}
@ -1598,7 +1658,10 @@ impl Overlord {
url.to_owned(),
vec![RelayJob {
reason: "deleting",
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvent(Box::new(event.clone())),
},
persistent: false
}]).await?;
}