diff --git a/src/comms.rs b/src/comms.rs index 43ef0607..3710d30c 100644 --- a/src/comms.rs +++ b/src/comms.rs @@ -1,10 +1,9 @@ -use std::ops::Drop; - +use nostr_types::{Event, Id, IdHex, PublicKeyHex}; use serde::Serialize; +use std::ops::Drop; use zeroize::Zeroize; -/// This is a message sent between the Overlord and Minions -/// in either direction +/// This is a message sent to the Overlord #[derive(Debug, Clone, Serialize)] pub struct BusMessage { /// Indended recipient of the message @@ -24,3 +23,24 @@ impl Drop for BusMessage { self.json_payload.zeroize(); } } + +/// This is a message sent to the minions +#[derive(Debug, Clone)] +pub struct ToMinionMessage { + /// The minion we are addressing, based on the URL they are listening to + /// as a String. "all" means all minions. + pub target: String, + + pub payload: ToMinionPayload, +} + +#[derive(Debug, Clone)] +pub enum ToMinionPayload { + Shutdown, + SubscribeGeneralFeed, + SubscribePersonFeed(PublicKeyHex), + SubscribeThreadFeed(Id), + TempSubscribeMetadata(PublicKeyHex), + FetchEvents(Vec), + PostEvent(Box), +} diff --git a/src/error.rs b/src/error.rs index 54de72a2..3b0d21ee 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,10 +1,10 @@ -use crate::comms::BusMessage; +use crate::comms::{BusMessage, ToMinionMessage}; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { #[error("Error broadcasting: {0}")] - BroadcastSend(#[from] tokio::sync::broadcast::error::SendError), + BroadcastSend(#[from] tokio::sync::broadcast::error::SendError), #[error("Error receiving broadcast: {0}")] BroadcastReceive(#[from] tokio::sync::broadcast::error::RecvError), diff --git a/src/feed.rs b/src/feed.rs index 09cc9c64..ab77991a 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -1,4 +1,4 @@ -use crate::comms::BusMessage; +use crate::comms::{ToMinionMessage, ToMinionPayload}; use crate::globals::GLOBALS; use nostr_types::PublicKeyHex; use nostr_types::{Event, EventKind, Id}; @@ -54,19 +54,17 @@ impl Feed { } pub fn set_feed_to_thread(&self, id: Id) { - let _ = GLOBALS.to_minions.send(BusMessage { + let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - kind: "subscribe_thread_feed".to_string(), - json_payload: serde_json::to_string(&id).unwrap(), + payload: ToMinionPayload::SubscribeThreadFeed(id), }); *self.current_feed_kind.write() = FeedKind::Thread(id); } pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) { - let _ = GLOBALS.to_minions.send(BusMessage { + let _ = GLOBALS.to_minions.send(ToMinionMessage { target: "all".to_string(), - kind: "subscribe_person_feed".to_string(), - json_payload: serde_json::to_string(&pubkey).unwrap(), + payload: ToMinionPayload::SubscribePersonFeed(pubkey.clone()), }); *self.current_feed_kind.write() = FeedKind::Person(pubkey); } diff --git a/src/globals.rs b/src/globals.rs index 149ffa8f..87fad4f9 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -1,4 +1,4 @@ -use crate::comms::BusMessage; +use crate::comms::{BusMessage, ToMinionMessage}; use crate::db::{DbEvent, DbRelay}; use crate::error::Error; use crate::feed::Feed; @@ -21,7 +21,7 @@ pub struct Globals { /// This is a broadcast channel. All Minions should listen on it. /// To create a receiver, just run .subscribe() on it. - pub to_minions: broadcast::Sender, + pub to_minions: broadcast::Sender, /// This is a mpsc channel. The Overlord listens on it. /// To create a sender, just clone() it. diff --git a/src/overlord/minion/handle_bus.rs b/src/overlord/minion/handle_bus.rs deleted file mode 100644 index 70a7d4a3..00000000 --- a/src/overlord/minion/handle_bus.rs +++ /dev/null @@ -1,58 +0,0 @@ -use super::Minion; -use crate::{BusMessage, Error}; -use futures::SinkExt; -use nostr_types::{ClientMessage, Event, Id, IdHex, PublicKeyHex}; -use tungstenite::protocol::Message as WsMessage; - -impl Minion { - pub(super) async fn handle_bus_message( - &mut self, - bus_message: BusMessage, - ) -> Result { - match &*bus_message.kind { - "shutdown" => { - tracing::info!("{}: Websocket listener shutting down", &self.url); - return Ok(false); - } - //"set_followed_people" => { - // let v: Vec = serde_json::from_str(&bus_message.json_payload)?; - // self.upsert_following(v).await?; - //} - "subscribe_general_feed" => { - self.subscribe_general_feed().await?; - } - "subscribe_person_feed" => { - let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?; - self.subscribe_person_feed(pubkeyhex).await?; - } - "subscribe_thread_feed" => { - let id: Id = serde_json::from_str(&bus_message.json_payload)?; - self.subscribe_thread_feed(id).await?; - } - "fetch_events" => { - let v: Vec = serde_json::from_str(&bus_message.json_payload)?; - self.get_events(v).await?; - } - "post_event" => { - let event: Event = serde_json::from_str(&bus_message.json_payload)?; - let msg = ClientMessage::Event(Box::new(event)); - let wire = serde_json::to_string(&msg)?; - let ws_sink = self.sink.as_mut().unwrap(); - ws_sink.send(WsMessage::Text(wire)).await?; - tracing::info!("Posted event to {}", &self.url); - } - "temp_subscribe_metadata" => { - let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?; - self.temp_subscribe_metadata(pubkeyhex).await?; - } - _ => { - tracing::warn!( - "{} Unrecognized bus message kind received by minion: {}", - &self.url, - bus_message.kind - ); - } - } - Ok(true) - } -} diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index aff411cf..7c5d552f 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -1,8 +1,7 @@ -mod handle_bus; mod handle_websocket; mod subscription; -use crate::comms::BusMessage; +use crate::comms::{BusMessage, ToMinionMessage, ToMinionPayload}; use crate::db::DbRelay; use crate::error::Error; use crate::globals::GLOBALS; @@ -10,7 +9,8 @@ use futures::{SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use http::Uri; use nostr_types::{ - EventKind, Filter, Id, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url, + ClientMessage, EventKind, Filter, Id, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, + Url, }; use std::time::Duration; use subscription::Subscriptions; @@ -24,7 +24,7 @@ use tungstenite::protocol::{Message as WsMessage, WebSocketConfig}; pub struct Minion { url: Url, to_overlord: UnboundedSender, - from_overlord: Receiver, + from_overlord: Receiver, dbrelay: DbRelay, nip11: Option, stream: Option>>>, @@ -211,17 +211,17 @@ impl Minion { WsMessage::Frame(_) => tracing::warn!("{}: Unexpected frame message", &self.url), } }, - bus_message = self.from_overlord.recv() => { - let bus_message = match bus_message { - Ok(bm) => bm, + to_minion_message = self.from_overlord.recv() => { + let to_minion_message = match to_minion_message { + Ok(m) => m, Err(tokio::sync::broadcast::error::RecvError::Closed) => { return Ok(false); }, Err(e) => return Err(e.into()) }; #[allow(clippy::collapsible_if)] - if bus_message.target == self.url.inner() || bus_message.target == "all" { - keepgoing = self.handle_bus_message(bus_message).await?; + if to_minion_message.target == self.url.inner() || to_minion_message.target == "all" { + keepgoing = self.handle_bus_message(to_minion_message).await?; } }, } @@ -229,6 +229,38 @@ impl Minion { Ok(keepgoing) } + pub async fn handle_bus_message(&mut self, message: ToMinionMessage) -> Result { + match message.payload { + ToMinionPayload::Shutdown => { + tracing::info!("{}: Websocket listener shutting down", &self.url); + return Ok(false); + } + ToMinionPayload::SubscribeGeneralFeed => { + self.subscribe_general_feed().await?; + } + ToMinionPayload::SubscribePersonFeed(pubkeyhex) => { + self.subscribe_person_feed(pubkeyhex).await?; + } + ToMinionPayload::SubscribeThreadFeed(id) => { + self.subscribe_thread_feed(id).await?; + } + ToMinionPayload::FetchEvents(vec) => { + self.get_events(vec).await?; + } + ToMinionPayload::PostEvent(event) => { + let msg = ClientMessage::Event(event); + let wire = serde_json::to_string(&msg)?; + let ws_sink = self.sink.as_mut().unwrap(); + ws_sink.send(WsMessage::Text(wire)).await?; + tracing::info!("Posted event to {}", &self.url); + } + ToMinionPayload::TempSubscribeMetadata(pubkeyhex) => { + self.temp_subscribe_metadata(pubkeyhex).await?; + } + } + Ok(true) + } + async fn tell_overlord_we_are_ready(&self) -> Result<(), Error> { self.to_overlord.send(BusMessage { target: "overlord".to_string(), diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 2b1a0939..e3888579 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -1,14 +1,15 @@ mod minion; mod relay_picker; -use crate::comms::BusMessage; +use crate::comms::{BusMessage, ToMinionMessage, ToMinionPayload}; use crate::db::{DbEvent, DbPersonRelay, DbRelay}; use crate::error::Error; use crate::globals::{Globals, GLOBALS}; use crate::people::People; use minion::Minion; use nostr_types::{ - Event, EventKind, Id, Nip05, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, Unixtime, Url, + Event, EventKind, Id, IdHex, Nip05, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, + Unixtime, Url, }; use relay_picker::{BestRelay, RelayPicker}; use std::collections::HashMap; @@ -18,7 +19,7 @@ use tokio::{select, task}; use zeroize::Zeroize; pub struct Overlord { - to_minions: Sender, + to_minions: Sender, inbox: UnboundedReceiver, // All the minion tasks running. @@ -59,10 +60,9 @@ impl Overlord { // Send shutdown message to all minions (and ui) // If this fails, it's probably because there are no more listeners // so just ignore it and keep shutting down. - let _ = self.to_minions.send(BusMessage { + let _ = self.to_minions.send(ToMinionMessage { target: "all".to_string(), - kind: "shutdown".to_string(), - json_payload: serde_json::to_string("shutdown").unwrap(), + payload: ToMinionPayload::Shutdown, }); tracing::info!("Overlord waiting for minions to all shutdown"); @@ -212,10 +212,11 @@ impl Overlord { self.start_minion(best_relay.relay.url.clone()).await?; // Subscribe to the general feed - let _ = self.to_minions.send(BusMessage { + // FIXME: older code sent in &best_relay.pubkeys, but minions + // stopped doing anything with that. + let _ = self.to_minions.send(ToMinionMessage { target: best_relay.relay.url.clone(), - kind: "subscribe_general_feed".to_string(), - json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(), + payload: ToMinionPayload::SubscribeGeneralFeed, }); tracing::info!( @@ -514,10 +515,9 @@ impl Overlord { } // Subscribe to metadata and contact lists for this person - let _ = self.to_minions.send(BusMessage { + let _ = self.to_minions.send(ToMinionMessage { target: person_relay.relay.to_string(), - kind: "temp_subscribe_metadata".to_string(), - json_payload: serde_json::to_string(&pubkey).unwrap(), + payload: ToMinionPayload::TempSubscribeMetadata(pubkey.clone()), }); } } @@ -566,11 +566,11 @@ impl Overlord { tracing::debug!("{}: Asking to fetch {} events", url.inner(), ids.len()); + let ids: Vec = ids.iter().map(|id| (*id).into()).collect(); // Tell it to get these events - let _ = self.to_minions.send(BusMessage { + let _ = self.to_minions.send(ToMinionMessage { target: url.inner().to_owned(), - kind: "fetch_events".to_string(), - json_payload: serde_json::to_string(&ids).unwrap(), + payload: ToMinionPayload::FetchEvents(ids), }); } @@ -766,10 +766,9 @@ impl Overlord { // Send it the event to post tracing::debug!("Asking {} to post", &relay.url); - let _ = self.to_minions.send(BusMessage { + let _ = self.to_minions.send(ToMinionMessage { target: relay.url.clone(), - kind: "post_event".to_string(), - json_payload: serde_json::to_string(&event).unwrap(), + payload: ToMinionPayload::PostEvent(Box::new(event.clone())), }); } @@ -824,10 +823,9 @@ impl Overlord { // Send it the event to post tracing::debug!("Asking {} to post", &relay.url); - let _ = self.to_minions.send(BusMessage { + let _ = self.to_minions.send(ToMinionMessage { target: relay.url.clone(), - kind: "post_event".to_string(), - json_payload: serde_json::to_string(&event).unwrap(), + payload: ToMinionPayload::PostEvent(Box::new(event.clone())), }); } @@ -889,10 +887,9 @@ impl Overlord { // Send it the event to post tracing::debug!("Asking {} to post", &relay.url); - let _ = self.to_minions.send(BusMessage { + let _ = self.to_minions.send(ToMinionMessage { target: relay.url.clone(), - kind: "post_event".to_string(), - json_payload: serde_json::to_string(&event).unwrap(), + payload: ToMinionPayload::PostEvent(Box::new(event.clone())), }); }