diff --git a/src/feed.rs b/src/feed.rs index 87e41514..5b84a066 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -1,3 +1,4 @@ +use crate::comms::BusMessage; use crate::globals::GLOBALS; use nostr_types::PublicKeyHex; use nostr_types::{Event, EventKind, Id}; @@ -38,16 +39,27 @@ impl Feed { } pub fn set_feed_to_general(&self) { + // We are always subscribed to the general feed. Don't resubscribe here + // because it won't have changed, but the relays will shower you with + // all those events again. *self.current_feed_kind.write() = FeedKind::General; } pub fn set_feed_to_thread(&self, id: Id) { - // get parent? + let _ = GLOBALS.to_minions.send(BusMessage { + target: "all".to_string(), + kind: "subscribe_thread_feed".to_string(), + json_payload: serde_json::to_string(&id).unwrap(), + }); *self.current_feed_kind.write() = FeedKind::Thread(id); } pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) { - // FIXME - TRIGGER OVERLORD TO FETCH THEIR EVENTS FURTHER BACK + let _ = GLOBALS.to_minions.send(BusMessage { + target: "all".to_string(), + kind: "subscribe_person_feed".to_string(), + json_payload: serde_json::to_string(&pubkey).unwrap(), + }); *self.current_feed_kind.write() = FeedKind::Person(pubkey); } @@ -67,7 +79,6 @@ impl Feed { } pub fn get_thread_parent(&self, id: Id) -> Id { - // FIXME - TRIGGER OVERLORD TO FETCH THIS FEED let mut event = match GLOBALS.events.blocking_read().get(&id).cloned() { None => return id, Some(e) => e, diff --git a/src/overlord/minion/handle_bus.rs b/src/overlord/minion/handle_bus.rs index 5cf60422..70a7d4a3 100644 --- a/src/overlord/minion/handle_bus.rs +++ b/src/overlord/minion/handle_bus.rs @@ -1,18 +1,33 @@ use super::Minion; use crate::{BusMessage, Error}; use futures::SinkExt; -use nostr_types::{ClientMessage, Event, IdHex, PublicKeyHex}; +use nostr_types::{ClientMessage, Event, Id, IdHex, PublicKeyHex}; use tungstenite::protocol::Message as WsMessage; impl Minion { pub(super) async fn handle_bus_message( &mut self, bus_message: BusMessage, - ) -> Result<(), Error> { + ) -> Result { match &*bus_message.kind { - "set_followed_people" => { - let v: Vec = serde_json::from_str(&bus_message.json_payload)?; - self.upsert_following(v).await?; + "shutdown" => { + tracing::info!("{}: Websocket listener shutting down", &self.url); + return Ok(false); + } + //"set_followed_people" => { + // let v: Vec = serde_json::from_str(&bus_message.json_payload)?; + // self.upsert_following(v).await?; + //} + "subscribe_general_feed" => { + self.subscribe_general_feed().await?; + } + "subscribe_person_feed" => { + let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_person_feed(pubkeyhex).await?; + } + "subscribe_thread_feed" => { + let id: Id = serde_json::from_str(&bus_message.json_payload)?; + self.subscribe_thread_feed(id).await?; } "fetch_events" => { let v: Vec = serde_json::from_str(&bus_message.json_payload)?; @@ -38,6 +53,6 @@ impl Minion { ); } } - Ok(()) + Ok(true) } } diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index c07fd78c..3e9d0a6f 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -29,7 +29,7 @@ impl Minion { .subscriptions .get_handle_by_id(&subid.0) .unwrap_or_else(|| "_".to_owned()); - tracing::trace!("{}: {}: NEW EVENT", &self.url, handle); + tracing::debug!("{}: {}: NEW EVENT", &self.url, handle); if event.kind == EventKind::TextNote { // Just store text notes in incoming diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index edb2294f..74240005 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -10,8 +10,9 @@ use futures::{SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use http::Uri; use nostr_types::{ - EventKind, Filter, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url, + EventKind, Filter, Id, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url, }; +use std::time::Duration; use subscription::Subscriptions; use tokio::net::TcpStream; use tokio::select; @@ -219,13 +220,8 @@ impl Minion { Err(e) => return Err(e.into()) }; #[allow(clippy::collapsible_if)] - if bus_message.target == self.url.inner() { - self.handle_bus_message(bus_message).await?; - } else if &*bus_message.target == "all" { - if &*bus_message.kind == "shutdown" { - tracing::info!("{}: Websocket listener shutting down", &self.url); - keepgoing = false; - } + if bus_message.target == self.url.inner() || bus_message.target == "all" { + keepgoing = self.handle_bus_message(bus_message).await?; } }, } @@ -243,7 +239,180 @@ impl Minion { Ok(()) } + async fn subscribe_general_feed(&mut self) -> Result<(), Error> { + // NOTE if the general feed is already subscribed we shoudn't do anything + // but we may need to update the subscription + + let mut filters: Vec = Vec::new(); + let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + + let followed_pubkeys = GLOBALS.people.read().await.get_followed_pubkeys(); + + if let Some(pubkey) = GLOBALS.signer.read().await.public_key() { + // feed related by me + filters.push(Filter { + authors: vec![pubkey.into()], + kinds: vec![ + EventKind::TextNote, + EventKind::Reaction, + EventKind::EventDeletion, + ], + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }); + + // Any mentions of me + filters.push(Filter { + p: vec![pubkey.into()], + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }); + + // my metadata + // FIXME TBD + /* + filters.push(Filter { + authors: vec![pubkey], + kinds: vec![EventKind::Metadata, EventKind::RecommendRelay, EventKind::ContactList, EventKind::RelaysList], + since: // last we last checked + .. Default::default() + }); + */ + } + + if !followed_pubkeys.is_empty() { + // feed related by people followed + filters.push(Filter { + authors: followed_pubkeys.clone(), + kinds: vec![ + EventKind::TextNote, + EventKind::Reaction, + EventKind::EventDeletion, + ], + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }); + + // metadata by people followed + // FIXME TBD + /* + filters.push(Filter { + authors: pubkeys.clone(), + kinds: vec![EventKind::Metadata, EventKind::RecommendRelay, EventKind::ContactList, EventKind::RelaysList], + since: // last we last checked + .. Default::default() + }); + */ + } + + // reactions to posts by me + // FIXME TBD + + // reactions to posts by people followed + // FIXME TBD + + // NO REPLIES OR ANCESTORS + + if filters.is_empty() { + self.unsubscribe("general_feed").await?; + } else { + self.subscribe(filters, "general_feed").await?; + } + + Ok(()) + } + + async fn subscribe_person_feed(&mut self, pubkey: PublicKeyHex) -> Result<(), Error> { + // NOTE we do not unsubscribe to the general feed + + let mut filters: Vec = Vec::new(); + let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + + // feed related by person + filters.push(Filter { + authors: vec![pubkey], + kinds: vec![ + EventKind::TextNote, + EventKind::Reaction, + EventKind::EventDeletion, + ], + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }); + + // persons metadata + // FIXME TBD + /* + filters.push(Filter { + authors: vec![pubkey], + kinds: vec![EventKind::Metadata, EventKind::RecommendRelay, EventKind::ContactList, EventKind::RelaysList], + since: // last we last checked + .. Default::default() + }); + */ + + // reactions to post by person + // FIXME TBD + + // NO REPLIES OR ANCESTORS + + if filters.is_empty() { + self.unsubscribe("person_feed").await?; + } else { + self.subscribe(filters, "person_feed").await?; + } + + Ok(()) + } + + async fn subscribe_thread_feed(&mut self, id: Id) -> Result<(), Error> { + // NOTE we do not unsubscribe to the general feed + + let mut filters: Vec = Vec::new(); + let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + + // This post and ancestors + let mut ids: Vec = vec![id.into()]; + // FIXME - We could have this precalculated like GLOBALS.relationships + // in reverse. It would be potentially more complete having + // iteratively climbed the chain. + if let Some(event) = GLOBALS.events.read().await.get(&id) { + for (id, url) in &event.replies_to_ancestors() { + if let Some(url) = url { + if url == &self.url { + ids.push((*id).into()); + } + } else { + ids.push((*id).into()); + } + } + } + filters.push(Filter { + ids: ids.clone(), + ..Default::default() + }); + + // Replies and reactions to this post and ancestors + filters.push(Filter { + e: ids, + since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)), + ..Default::default() + }); + + // Metadata for people in those events + // TBD + + if filters.is_empty() { + self.unsubscribe("thread_feed").await?; + } else { + self.subscribe(filters, "thread_feed").await?; + } + + Ok(()) + } + // Create or replace the following subscription + /* async fn upsert_following(&mut self, pubkeys: Vec) -> Result<(), Error> { let websocket_sink = self.sink.as_mut().unwrap(); @@ -354,6 +523,7 @@ impl Minion { Ok(()) } + */ async fn get_events(&mut self, ids: Vec) -> Result<(), Error> { if ids.is_empty() { diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index a7abbaac..930f8725 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -220,10 +220,10 @@ impl Overlord { // Fire off a minion to handle this relay self.start_minion(best_relay.relay.url.clone()).await?; - // Tell it to follow the chosen people + // Subscribe to the general feed let _ = self.to_minions.send(BusMessage { target: best_relay.relay.url.clone(), - kind: "set_followed_people".to_string(), + kind: "subscribe_general_feed".to_string(), json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(), });