From 56dac884046ec7e7426c70849308a4ada8ee8f3b Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Mon, 2 Jan 2023 11:14:54 +1300 Subject: [PATCH] New subscription work, not yet in use --- src/overlord/minion/handle_bus.rs | 38 ++++++++- src/overlord/minion/mod.rs | 128 ++++++++++++++++++++++++++++-- 2 files changed, 158 insertions(+), 8 deletions(-) diff --git a/src/overlord/minion/handle_bus.rs b/src/overlord/minion/handle_bus.rs index 9cd10836..7bb73b68 100644 --- a/src/overlord/minion/handle_bus.rs +++ b/src/overlord/minion/handle_bus.rs @@ -19,9 +19,6 @@ impl Minion { let v: Vec = serde_json::from_str(&bus_message.json_payload)?; self.get_events(v).await?; } - "follow_event_reactions" => { - warn!("{}: follow event reactions unimplemented", &self.url); - } "post_event" => { let event: Event = serde_json::from_str(&bus_message.json_payload)?; let msg = ClientMessage::Event(Box::new(event)); @@ -30,6 +27,41 @@ impl Minion { ws_sink.send(WsMessage::Text(wire)).await?; info!("Posted event to {}", &self.url); } + // + // NEW handling + // + "subscribe_ephemeral_for_all" => { + let data: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_ephemeral_for_all(data).await?; + } + "subscribe_posts_by_me" => { + let data: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_posts_by_me(data).await?; + } + "subscribe_posts_by_followed" => { + let data: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_posts_by_followed(data).await?; + } + "subscribe_ancestors" => { + let data: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_ancestors(data).await?; + } + "subscribe_my_descendants" => { + let data: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_my_descendants(data).await?; + } + "subscribe_follower_descendants" => { + let data: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_follower_descendants(data).await?; + } + "subscribe_my_mentions" => { + let data: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_my_mentions(data).await?; + } + "subscribe_follower_mentions" => { + let data: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_follower_mentions(data).await?; + } _ => { warn!( "{} Unrecognized bus message kind received by minion: {}", diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index 6e2907d8..ac49ed0d 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -12,6 +12,7 @@ use http::Uri; use nostr_types::{ EventKind, Filter, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url, }; +use std::time::Duration; use subscription::Subscriptions; use tokio::net::TcpStream; use tokio::select; @@ -386,10 +387,127 @@ impl Minion { Ok(()) } - /* - async fn follow_event_reactions(&mut self, _ids: Vec) -> Result<(), Error> { - // Create or extend the "reactions" subscription - unimplemented!() + async fn subscribe_ephemeral_for_all( + &mut self, + people: Vec, + ) -> Result<(), Error> { + let filter = Filter { + authors: people, + kinds: vec![EventKind::Metadata, EventKind::ContactList], + // No since. The list of people changes so we can't even use 'last checked' here. + ..Default::default() + }; + + self.subscribe(vec![filter], "ephemeral_for_all").await + } + + async fn subscribe_posts_by_me(&mut self, me: PublicKeyHex) -> Result<(), Error> { + let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + + let filter = Filter { + authors: vec![me], + // leave ALL kinds + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }; + + self.subscribe(vec![filter], "posts_by_me").await + } + + async fn subscribe_posts_by_followed( + &mut self, + followed: Vec, + ) -> Result<(), Error> { + let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + + let filter = Filter { + authors: followed, + // leave ALL kinds + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }; + + self.subscribe(vec![filter], "posts_by_followed").await + } + + async fn subscribe_ancestors(&mut self, ancestors: Vec) -> Result<(), Error> { + let filter = Filter { + ids: ancestors, + // leave ALL kinds + // no since filter + ..Default::default() + }; + + self.subscribe(vec![filter], "ancestors").await + } + + async fn subscribe_my_descendants(&mut self, my_posts: Vec) -> Result<(), Error> { + let filter = Filter { + e: my_posts, + // leave ALL kinds + // no since filter + ..Default::default() + }; + + self.subscribe(vec![filter], "my_descendants").await + } + + async fn subscribe_follower_descendants( + &mut self, + follower_posts: Vec, + ) -> Result<(), Error> { + let filter = Filter { + e: follower_posts, + // leave ALL kinds + // no since filter + ..Default::default() + }; + + self.subscribe(vec![filter], "follower_descendants").await + } + + async fn subscribe_my_mentions(&mut self, me: PublicKeyHex) -> Result<(), Error> { + let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + + let filter = Filter { + p: vec![me], + // leave ALL kinds + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }; + + self.subscribe(vec![filter], "my_mentions").await + } + + async fn subscribe_follower_mentions( + &mut self, + followers: Vec, + ) -> Result<(), Error> { + let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + + let filter = Filter { + p: followers, + // leave ALL kinds + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }; + + self.subscribe(vec![filter], "follower_mentions").await + } + + async fn subscribe(&mut self, filters: Vec, handle: &str) -> Result<(), Error> { + let req_message = if self.subscriptions.has(handle) { + let sub = self.subscriptions.get_mut(handle).unwrap(); + *sub.get_mut() = filters; + sub.req_message() + } else { + self.subscriptions.add(handle, filters); + self.subscriptions.get(handle).unwrap().req_message() + }; + let wire = serde_json::to_string(&req_message)?; + let websocket_sink = self.sink.as_mut().unwrap(); + websocket_sink.send(WsMessage::Text(wire.clone())).await?; + trace!("{}: Sent {}", &self.url, &wire); + Ok(()) } - */ }