ToMinionMessage to replace BusMessage in that direction (3 benefits):

1. Allows sending types that aren't serializable (main reason)
2. Provides stronger typing
3. Avoids the cost of serialization and deserialization
This commit is contained in:
Mike Dilger 2023-01-06 15:02:41 +13:00
parent dbba43c87d
commit 6daf59c781
7 changed files with 95 additions and 106 deletions

View File

@ -1,10 +1,9 @@
use std::ops::Drop;
use nostr_types::{Event, Id, IdHex, PublicKeyHex};
use serde::Serialize;
use std::ops::Drop;
use zeroize::Zeroize;
/// This is a message sent between the Overlord and Minions
/// in either direction
/// This is a message sent to the Overlord
#[derive(Debug, Clone, Serialize)]
pub struct BusMessage {
/// Indended recipient of the message
@ -24,3 +23,24 @@ impl Drop for BusMessage {
self.json_payload.zeroize();
}
}
/// This is a message sent to the minions
#[derive(Debug, Clone)]
pub struct ToMinionMessage {
/// The minion we are addressing, based on the URL they are listening to
/// as a String. "all" means all minions.
pub target: String,
pub payload: ToMinionPayload,
}
#[derive(Debug, Clone)]
pub enum ToMinionPayload {
Shutdown,
SubscribeGeneralFeed,
SubscribePersonFeed(PublicKeyHex),
SubscribeThreadFeed(Id),
TempSubscribeMetadata(PublicKeyHex),
FetchEvents(Vec<IdHex>),
PostEvent(Box<Event>),
}

View File

@ -1,10 +1,10 @@
use crate::comms::BusMessage;
use crate::comms::{BusMessage, ToMinionMessage};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("Error broadcasting: {0}")]
BroadcastSend(#[from] tokio::sync::broadcast::error::SendError<BusMessage>),
BroadcastSend(#[from] tokio::sync::broadcast::error::SendError<ToMinionMessage>),
#[error("Error receiving broadcast: {0}")]
BroadcastReceive(#[from] tokio::sync::broadcast::error::RecvError),

View File

@ -1,4 +1,4 @@
use crate::comms::BusMessage;
use crate::comms::{ToMinionMessage, ToMinionPayload};
use crate::globals::GLOBALS;
use nostr_types::PublicKeyHex;
use nostr_types::{Event, EventKind, Id};
@ -54,19 +54,17 @@ impl Feed {
}
pub fn set_feed_to_thread(&self, id: Id) {
let _ = GLOBALS.to_minions.send(BusMessage {
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
kind: "subscribe_thread_feed".to_string(),
json_payload: serde_json::to_string(&id).unwrap(),
payload: ToMinionPayload::SubscribeThreadFeed(id),
});
*self.current_feed_kind.write() = FeedKind::Thread(id);
}
pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) {
let _ = GLOBALS.to_minions.send(BusMessage {
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
kind: "subscribe_person_feed".to_string(),
json_payload: serde_json::to_string(&pubkey).unwrap(),
payload: ToMinionPayload::SubscribePersonFeed(pubkey.clone()),
});
*self.current_feed_kind.write() = FeedKind::Person(pubkey);
}

View File

@ -1,4 +1,4 @@
use crate::comms::BusMessage;
use crate::comms::{BusMessage, ToMinionMessage};
use crate::db::{DbEvent, DbRelay};
use crate::error::Error;
use crate::feed::Feed;
@ -21,7 +21,7 @@ pub struct Globals {
/// This is a broadcast channel. All Minions should listen on it.
/// To create a receiver, just run .subscribe() on it.
pub to_minions: broadcast::Sender<BusMessage>,
pub to_minions: broadcast::Sender<ToMinionMessage>,
/// This is a mpsc channel. The Overlord listens on it.
/// To create a sender, just clone() it.

View File

@ -1,58 +0,0 @@
use super::Minion;
use crate::{BusMessage, Error};
use futures::SinkExt;
use nostr_types::{ClientMessage, Event, Id, IdHex, PublicKeyHex};
use tungstenite::protocol::Message as WsMessage;
impl Minion {
pub(super) async fn handle_bus_message(
&mut self,
bus_message: BusMessage,
) -> Result<bool, Error> {
match &*bus_message.kind {
"shutdown" => {
tracing::info!("{}: Websocket listener shutting down", &self.url);
return Ok(false);
}
//"set_followed_people" => {
// let v: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?;
// self.upsert_following(v).await?;
//}
"subscribe_general_feed" => {
self.subscribe_general_feed().await?;
}
"subscribe_person_feed" => {
let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_person_feed(pubkeyhex).await?;
}
"subscribe_thread_feed" => {
let id: Id = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_thread_feed(id).await?;
}
"fetch_events" => {
let v: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?;
self.get_events(v).await?;
}
"post_event" => {
let event: Event = serde_json::from_str(&bus_message.json_payload)?;
let msg = ClientMessage::Event(Box::new(event));
let wire = serde_json::to_string(&msg)?;
let ws_sink = self.sink.as_mut().unwrap();
ws_sink.send(WsMessage::Text(wire)).await?;
tracing::info!("Posted event to {}", &self.url);
}
"temp_subscribe_metadata" => {
let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?;
self.temp_subscribe_metadata(pubkeyhex).await?;
}
_ => {
tracing::warn!(
"{} Unrecognized bus message kind received by minion: {}",
&self.url,
bus_message.kind
);
}
}
Ok(true)
}
}

View File

@ -1,8 +1,7 @@
mod handle_bus;
mod handle_websocket;
mod subscription;
use crate::comms::BusMessage;
use crate::comms::{BusMessage, ToMinionMessage, ToMinionPayload};
use crate::db::DbRelay;
use crate::error::Error;
use crate::globals::GLOBALS;
@ -10,7 +9,8 @@ use futures::{SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
use http::Uri;
use nostr_types::{
EventKind, Filter, Id, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url,
ClientMessage, EventKind, Filter, Id, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime,
Url,
};
use std::time::Duration;
use subscription::Subscriptions;
@ -24,7 +24,7 @@ use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};
pub struct Minion {
url: Url,
to_overlord: UnboundedSender<BusMessage>,
from_overlord: Receiver<BusMessage>,
from_overlord: Receiver<ToMinionMessage>,
dbrelay: DbRelay,
nip11: Option<RelayInformationDocument>,
stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
@ -211,17 +211,17 @@ impl Minion {
WsMessage::Frame(_) => tracing::warn!("{}: Unexpected frame message", &self.url),
}
},
bus_message = self.from_overlord.recv() => {
let bus_message = match bus_message {
Ok(bm) => bm,
to_minion_message = self.from_overlord.recv() => {
let to_minion_message = match to_minion_message {
Ok(m) => m,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
return Ok(false);
},
Err(e) => return Err(e.into())
};
#[allow(clippy::collapsible_if)]
if bus_message.target == self.url.inner() || bus_message.target == "all" {
keepgoing = self.handle_bus_message(bus_message).await?;
if to_minion_message.target == self.url.inner() || to_minion_message.target == "all" {
keepgoing = self.handle_bus_message(to_minion_message).await?;
}
},
}
@ -229,6 +229,38 @@ impl Minion {
Ok(keepgoing)
}
pub async fn handle_bus_message(&mut self, message: ToMinionMessage) -> Result<bool, Error> {
match message.payload {
ToMinionPayload::Shutdown => {
tracing::info!("{}: Websocket listener shutting down", &self.url);
return Ok(false);
}
ToMinionPayload::SubscribeGeneralFeed => {
self.subscribe_general_feed().await?;
}
ToMinionPayload::SubscribePersonFeed(pubkeyhex) => {
self.subscribe_person_feed(pubkeyhex).await?;
}
ToMinionPayload::SubscribeThreadFeed(id) => {
self.subscribe_thread_feed(id).await?;
}
ToMinionPayload::FetchEvents(vec) => {
self.get_events(vec).await?;
}
ToMinionPayload::PostEvent(event) => {
let msg = ClientMessage::Event(event);
let wire = serde_json::to_string(&msg)?;
let ws_sink = self.sink.as_mut().unwrap();
ws_sink.send(WsMessage::Text(wire)).await?;
tracing::info!("Posted event to {}", &self.url);
}
ToMinionPayload::TempSubscribeMetadata(pubkeyhex) => {
self.temp_subscribe_metadata(pubkeyhex).await?;
}
}
Ok(true)
}
async fn tell_overlord_we_are_ready(&self) -> Result<(), Error> {
self.to_overlord.send(BusMessage {
target: "overlord".to_string(),

View File

@ -1,14 +1,15 @@
mod minion;
mod relay_picker;
use crate::comms::BusMessage;
use crate::comms::{BusMessage, ToMinionMessage, ToMinionPayload};
use crate::db::{DbEvent, DbPersonRelay, DbRelay};
use crate::error::Error;
use crate::globals::{Globals, GLOBALS};
use crate::people::People;
use minion::Minion;
use nostr_types::{
Event, EventKind, Id, Nip05, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, Unixtime, Url,
Event, EventKind, Id, IdHex, Nip05, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag,
Unixtime, Url,
};
use relay_picker::{BestRelay, RelayPicker};
use std::collections::HashMap;
@ -18,7 +19,7 @@ use tokio::{select, task};
use zeroize::Zeroize;
pub struct Overlord {
to_minions: Sender<BusMessage>,
to_minions: Sender<ToMinionMessage>,
inbox: UnboundedReceiver<BusMessage>,
// All the minion tasks running.
@ -59,10 +60,9 @@ impl Overlord {
// Send shutdown message to all minions (and ui)
// If this fails, it's probably because there are no more listeners
// so just ignore it and keep shutting down.
let _ = self.to_minions.send(BusMessage {
let _ = self.to_minions.send(ToMinionMessage {
target: "all".to_string(),
kind: "shutdown".to_string(),
json_payload: serde_json::to_string("shutdown").unwrap(),
payload: ToMinionPayload::Shutdown,
});
tracing::info!("Overlord waiting for minions to all shutdown");
@ -212,10 +212,11 @@ impl Overlord {
self.start_minion(best_relay.relay.url.clone()).await?;
// Subscribe to the general feed
let _ = self.to_minions.send(BusMessage {
// FIXME: older code sent in &best_relay.pubkeys, but minions
// stopped doing anything with that.
let _ = self.to_minions.send(ToMinionMessage {
target: best_relay.relay.url.clone(),
kind: "subscribe_general_feed".to_string(),
json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(),
payload: ToMinionPayload::SubscribeGeneralFeed,
});
tracing::info!(
@ -514,10 +515,9 @@ impl Overlord {
}
// Subscribe to metadata and contact lists for this person
let _ = self.to_minions.send(BusMessage {
let _ = self.to_minions.send(ToMinionMessage {
target: person_relay.relay.to_string(),
kind: "temp_subscribe_metadata".to_string(),
json_payload: serde_json::to_string(&pubkey).unwrap(),
payload: ToMinionPayload::TempSubscribeMetadata(pubkey.clone()),
});
}
}
@ -566,11 +566,11 @@ impl Overlord {
tracing::debug!("{}: Asking to fetch {} events", url.inner(), ids.len());
let ids: Vec<IdHex> = ids.iter().map(|id| (*id).into()).collect();
// Tell it to get these events
let _ = self.to_minions.send(BusMessage {
let _ = self.to_minions.send(ToMinionMessage {
target: url.inner().to_owned(),
kind: "fetch_events".to_string(),
json_payload: serde_json::to_string(&ids).unwrap(),
payload: ToMinionPayload::FetchEvents(ids),
});
}
@ -766,10 +766,9 @@ impl Overlord {
// Send it the event to post
tracing::debug!("Asking {} to post", &relay.url);
let _ = self.to_minions.send(BusMessage {
let _ = self.to_minions.send(ToMinionMessage {
target: relay.url.clone(),
kind: "post_event".to_string(),
json_payload: serde_json::to_string(&event).unwrap(),
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
});
}
@ -824,10 +823,9 @@ impl Overlord {
// Send it the event to post
tracing::debug!("Asking {} to post", &relay.url);
let _ = self.to_minions.send(BusMessage {
let _ = self.to_minions.send(ToMinionMessage {
target: relay.url.clone(),
kind: "post_event".to_string(),
json_payload: serde_json::to_string(&event).unwrap(),
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
});
}
@ -889,10 +887,9 @@ impl Overlord {
// Send it the event to post
tracing::debug!("Asking {} to post", &relay.url);
let _ = self.to_minions.send(BusMessage {
let _ = self.to_minions.send(ToMinionMessage {
target: relay.url.clone(),
kind: "post_event".to_string(),
json_payload: serde_json::to_string(&event).unwrap(),
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
});
}