From a5c5627749001282949f022087d99aabbc959095 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Mon, 26 Dec 2022 14:40:15 +1300 Subject: [PATCH] Start driving minions with messages --- src/overlord/minion/handle_bus.rs | 24 +++++++++++++---- src/overlord/minion/mod.rs | 43 +++++++++++++++++++------------ src/overlord/mod.rs | 10 +++++-- 3 files changed, 53 insertions(+), 24 deletions(-) diff --git a/src/overlord/minion/handle_bus.rs b/src/overlord/minion/handle_bus.rs index d7e99447..f66109ae 100644 --- a/src/overlord/minion/handle_bus.rs +++ b/src/overlord/minion/handle_bus.rs @@ -1,13 +1,27 @@ use super::Minion; use crate::{BusMessage, Error}; +use nostr_types::PublicKeyHex; use tracing::warn; impl Minion { - pub(super) async fn handle_bus_message(&self, bus_message: BusMessage) -> Result<(), Error> { - warn!( - "Websocket task got message, unimplemented: {}", - bus_message.kind - ); + pub(super) async fn handle_bus_message( + &mut self, + bus_message: BusMessage, + ) -> Result<(), Error> { + match &*bus_message.kind { + "set_followed_people" => { + let v: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.upsert_following(v).await?; + } + "fetch_events" => {} + "follow_event_reactions" => {} + _ => { + warn!( + "Unrecognized bus message kind received by minion: {}", + bus_message.kind + ); + } + } Ok(()) } } diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index 96821859..22a29537 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -9,7 +9,9 @@ use crate::globals::GLOBALS; use futures::{SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use http::Uri; -use nostr_types::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url}; +use nostr_types::{ + EventKind, Filters, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url, +}; use subscription::Subscriptions; use tokio::net::TcpStream; use tokio::select; @@ -21,7 +23,6 @@ use tungstenite::protocol::{Message as WsMessage, WebSocketConfig}; pub struct Minion { url: Url, - pubkeys: Vec, to_overlord: UnboundedSender, from_overlord: Receiver, dbrelay: DbRelay, @@ -29,10 +30,12 @@ pub struct Minion { stream: Option>>>, sink: Option>, WsMessage>>, subscriptions: Subscriptions, + #[allow(dead_code)] + next_events_subscription_id: u32, } impl Minion { - pub async fn new(url: Url, pubkeys: Vec) -> Result { + pub async fn new(url: Url) -> Result { let to_overlord = GLOBALS.to_overlord.clone(); let from_overlord = GLOBALS.to_minions.subscribe(); let dbrelay = match DbRelay::fetch_one(&url).await? { @@ -46,7 +49,6 @@ impl Minion { Ok(Minion { url, - pubkeys, to_overlord, from_overlord, dbrelay, @@ -54,6 +56,7 @@ impl Minion { stream: None, sink: None, subscriptions: Subscriptions::new(), + next_events_subscription_id: 0, }) } } @@ -144,11 +147,6 @@ impl Minion { DbRelay::update_success(self.dbrelay.url.clone(), now).await? } - // Subscribe to the people we follow - if !self.pubkeys.is_empty() { - self.update_following_subscription().await?; - } - // Tell the overlord we are ready to receive commands self.tell_overlord_we_are_ready().await?; @@ -239,12 +237,11 @@ impl Minion { Ok(()) } - // This updates a subscription named "following" which watches for events - // from the people we follow. - async fn update_following_subscription(&mut self) -> Result<(), Error> { + // Create or replace the following subscription + async fn upsert_following(&mut self, pubkeys: Vec) -> Result<(), Error> { let websocket_sink = self.sink.as_mut().unwrap(); - if self.pubkeys.is_empty() { + if pubkeys.is_empty() { if let Some(sub) = self.subscriptions.get("following") { // Close the subscription let wire = serde_json::to_string(&sub.close_message())?; @@ -263,7 +260,7 @@ impl Minion { // Find the oldest 'last_fetched' among the 'person_relay' table. // Null values will come through as 0. let mut special_since: i64 = - DbPersonRelay::fetch_oldest_last_fetched(&self.pubkeys, &self.url.0).await? as i64; + DbPersonRelay::fetch_oldest_last_fetched(&pubkeys, &self.url.0).await? as i64; let settings = GLOBALS.settings.lock().await.clone(); @@ -280,7 +277,7 @@ impl Minion { // Create the author filter let mut feed_filter: Filters = Filters::new(); - for pk in self.pubkeys.iter() { + for pk in pubkeys.iter() { feed_filter.add_author(pk, None); } feed_filter.add_event_kind(EventKind::TextNote); @@ -295,7 +292,7 @@ impl Minion { // Create the lookback filter let mut special_filter: Filters = Filters::new(); - for pk in self.pubkeys.iter() { + for pk in pubkeys.iter() { special_filter.add_author(pk, None); } special_filter.add_event_kind(EventKind::Metadata); @@ -310,7 +307,6 @@ impl Minion { ); // Get the subscription - let req_message = if self.subscriptions.has("following") { let sub = self.subscriptions.get_mut("following").unwrap(); let vec: &mut Vec = sub.get_mut(); @@ -332,4 +328,17 @@ impl Minion { Ok(()) } + + #[allow(dead_code)] + async fn get_events(&mut self, _ids: Vec) -> Result<(), Error> { + // Create a new "events" subscription which closes on EOSE + // Use and bump next_events_subscription_id so they are independent subscriptions + unimplemented!() + } + + #[allow(dead_code)] + async fn follow_event_reactions(&mut self, _ids: Vec) -> Result<(), Error> { + // Create or extend the "reactions" subscription + unimplemented!() + } } diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 0b9e0b9e..dbb3ca5b 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -224,11 +224,17 @@ impl Overlord { async fn start_minion(&mut self, url: String, pubkeys: Vec) -> Result<(), Error> { let moved_url = Url(url.clone()); - let mut minion = Minion::new(moved_url, pubkeys).await?; + let mut minion = Minion::new(moved_url).await?; let abort_handle = self.minions.spawn(async move { minion.handle().await }); let id = abort_handle.id(); self.minions_task_url.insert(id, Url(url.clone())); - self.urls_watching.push(Url(url)); + self.urls_watching.push(Url(url.clone())); + + let _ = self.to_minions.send(BusMessage { + target: url.clone(), + kind: "set_followed_people".to_string(), + json_payload: serde_json::to_string(&pubkeys).unwrap(), + }); Ok(()) }