Rework subscriptions for these feeds (incomplete but it is generally working)

This commit is contained in:
Mike Dilger 2023-01-03 14:50:30 +13:00
parent d03cf859e1
commit 336db661ec
5 changed files with 216 additions and 20 deletions

View File

@ -1,3 +1,4 @@
use crate::comms::BusMessage;
use crate::globals::GLOBALS; use crate::globals::GLOBALS;
use nostr_types::PublicKeyHex; use nostr_types::PublicKeyHex;
use nostr_types::{Event, EventKind, Id}; use nostr_types::{Event, EventKind, Id};
@ -38,16 +39,27 @@ impl Feed {
} }
pub fn set_feed_to_general(&self) { pub fn set_feed_to_general(&self) {
// We are always subscribed to the general feed. Don't resubscribe here
// because it won't have changed, but the relays will shower you with
// all those events again.
*self.current_feed_kind.write() = FeedKind::General; *self.current_feed_kind.write() = FeedKind::General;
} }
pub fn set_feed_to_thread(&self, id: Id) { pub fn set_feed_to_thread(&self, id: Id) {
// get parent? let _ = GLOBALS.to_minions.send(BusMessage {
target: "all".to_string(),
kind: "subscribe_thread_feed".to_string(),
json_payload: serde_json::to_string(&id).unwrap(),
});
*self.current_feed_kind.write() = FeedKind::Thread(id); *self.current_feed_kind.write() = FeedKind::Thread(id);
} }
pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) { pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) {
// FIXME - TRIGGER OVERLORD TO FETCH THEIR EVENTS FURTHER BACK let _ = GLOBALS.to_minions.send(BusMessage {
target: "all".to_string(),
kind: "subscribe_person_feed".to_string(),
json_payload: serde_json::to_string(&pubkey).unwrap(),
});
*self.current_feed_kind.write() = FeedKind::Person(pubkey); *self.current_feed_kind.write() = FeedKind::Person(pubkey);
} }
@ -67,7 +79,6 @@ impl Feed {
} }
pub fn get_thread_parent(&self, id: Id) -> Id { pub fn get_thread_parent(&self, id: Id) -> Id {
// FIXME - TRIGGER OVERLORD TO FETCH THIS FEED
let mut event = match GLOBALS.events.blocking_read().get(&id).cloned() { let mut event = match GLOBALS.events.blocking_read().get(&id).cloned() {
None => return id, None => return id,
Some(e) => e, Some(e) => e,

View File

@ -1,18 +1,33 @@
use super::Minion; use super::Minion;
use crate::{BusMessage, Error}; use crate::{BusMessage, Error};
use futures::SinkExt; use futures::SinkExt;
use nostr_types::{ClientMessage, Event, IdHex, PublicKeyHex}; use nostr_types::{ClientMessage, Event, Id, IdHex, PublicKeyHex};
use tungstenite::protocol::Message as WsMessage; use tungstenite::protocol::Message as WsMessage;
impl Minion { impl Minion {
pub(super) async fn handle_bus_message( pub(super) async fn handle_bus_message(
&mut self, &mut self,
bus_message: BusMessage, bus_message: BusMessage,
) -> Result<(), Error> { ) -> Result<bool, Error> {
match &*bus_message.kind { match &*bus_message.kind {
"set_followed_people" => { "shutdown" => {
let v: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?; tracing::info!("{}: Websocket listener shutting down", &self.url);
self.upsert_following(v).await?; return Ok(false);
}
//"set_followed_people" => {
// let v: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?;
// self.upsert_following(v).await?;
//}
"subscribe_general_feed" => {
self.subscribe_general_feed().await?;
}
"subscribe_person_feed" => {
let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_person_feed(pubkeyhex).await?;
}
"subscribe_thread_feed" => {
let id: Id = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_thread_feed(id).await?;
} }
"fetch_events" => { "fetch_events" => {
let v: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?; let v: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?;
@ -38,6 +53,6 @@ impl Minion {
); );
} }
} }
Ok(()) Ok(true)
} }
} }

View File

@ -29,7 +29,7 @@ impl Minion {
.subscriptions .subscriptions
.get_handle_by_id(&subid.0) .get_handle_by_id(&subid.0)
.unwrap_or_else(|| "_".to_owned()); .unwrap_or_else(|| "_".to_owned());
tracing::trace!("{}: {}: NEW EVENT", &self.url, handle); tracing::debug!("{}: {}: NEW EVENT", &self.url, handle);
if event.kind == EventKind::TextNote { if event.kind == EventKind::TextNote {
// Just store text notes in incoming // Just store text notes in incoming

View File

@ -10,8 +10,9 @@ use futures::{SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream}; use futures_util::stream::{SplitSink, SplitStream};
use http::Uri; use http::Uri;
use nostr_types::{ use nostr_types::{
EventKind, Filter, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url, EventKind, Filter, Id, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url,
}; };
use std::time::Duration;
use subscription::Subscriptions; use subscription::Subscriptions;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::select; use tokio::select;
@ -219,13 +220,8 @@ impl Minion {
Err(e) => return Err(e.into()) Err(e) => return Err(e.into())
}; };
#[allow(clippy::collapsible_if)] #[allow(clippy::collapsible_if)]
if bus_message.target == self.url.inner() { if bus_message.target == self.url.inner() || bus_message.target == "all" {
self.handle_bus_message(bus_message).await?; keepgoing = self.handle_bus_message(bus_message).await?;
} else if &*bus_message.target == "all" {
if &*bus_message.kind == "shutdown" {
tracing::info!("{}: Websocket listener shutting down", &self.url);
keepgoing = false;
}
} }
}, },
} }
@ -243,7 +239,180 @@ impl Minion {
Ok(()) Ok(())
} }
async fn subscribe_general_feed(&mut self) -> Result<(), Error> {
// NOTE if the general feed is already subscribed we shoudn't do anything
// but we may need to update the subscription
let mut filters: Vec<Filter> = Vec::new();
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
let followed_pubkeys = GLOBALS.people.read().await.get_followed_pubkeys();
if let Some(pubkey) = GLOBALS.signer.read().await.public_key() {
// feed related by me
filters.push(Filter {
authors: vec![pubkey.into()],
kinds: vec![
EventKind::TextNote,
EventKind::Reaction,
EventKind::EventDeletion,
],
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
});
// Any mentions of me
filters.push(Filter {
p: vec![pubkey.into()],
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
});
// my metadata
// FIXME TBD
/*
filters.push(Filter {
authors: vec![pubkey],
kinds: vec![EventKind::Metadata, EventKind::RecommendRelay, EventKind::ContactList, EventKind::RelaysList],
since: // last we last checked
.. Default::default()
});
*/
}
if !followed_pubkeys.is_empty() {
// feed related by people followed
filters.push(Filter {
authors: followed_pubkeys.clone(),
kinds: vec![
EventKind::TextNote,
EventKind::Reaction,
EventKind::EventDeletion,
],
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
});
// metadata by people followed
// FIXME TBD
/*
filters.push(Filter {
authors: pubkeys.clone(),
kinds: vec![EventKind::Metadata, EventKind::RecommendRelay, EventKind::ContactList, EventKind::RelaysList],
since: // last we last checked
.. Default::default()
});
*/
}
// reactions to posts by me
// FIXME TBD
// reactions to posts by people followed
// FIXME TBD
// NO REPLIES OR ANCESTORS
if filters.is_empty() {
self.unsubscribe("general_feed").await?;
} else {
self.subscribe(filters, "general_feed").await?;
}
Ok(())
}
async fn subscribe_person_feed(&mut self, pubkey: PublicKeyHex) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed
let mut filters: Vec<Filter> = Vec::new();
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
// feed related by person
filters.push(Filter {
authors: vec![pubkey],
kinds: vec![
EventKind::TextNote,
EventKind::Reaction,
EventKind::EventDeletion,
],
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
});
// persons metadata
// FIXME TBD
/*
filters.push(Filter {
authors: vec![pubkey],
kinds: vec![EventKind::Metadata, EventKind::RecommendRelay, EventKind::ContactList, EventKind::RelaysList],
since: // last we last checked
.. Default::default()
});
*/
// reactions to post by person
// FIXME TBD
// NO REPLIES OR ANCESTORS
if filters.is_empty() {
self.unsubscribe("person_feed").await?;
} else {
self.subscribe(filters, "person_feed").await?;
}
Ok(())
}
async fn subscribe_thread_feed(&mut self, id: Id) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed
let mut filters: Vec<Filter> = Vec::new();
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
// This post and ancestors
let mut ids: Vec<IdHex> = vec![id.into()];
// FIXME - We could have this precalculated like GLOBALS.relationships
// in reverse. It would be potentially more complete having
// iteratively climbed the chain.
if let Some(event) = GLOBALS.events.read().await.get(&id) {
for (id, url) in &event.replies_to_ancestors() {
if let Some(url) = url {
if url == &self.url {
ids.push((*id).into());
}
} else {
ids.push((*id).into());
}
}
}
filters.push(Filter {
ids: ids.clone(),
..Default::default()
});
// Replies and reactions to this post and ancestors
filters.push(Filter {
e: ids,
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
});
// Metadata for people in those events
// TBD
if filters.is_empty() {
self.unsubscribe("thread_feed").await?;
} else {
self.subscribe(filters, "thread_feed").await?;
}
Ok(())
}
// Create or replace the following subscription // Create or replace the following subscription
/*
async fn upsert_following(&mut self, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> { async fn upsert_following(&mut self, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> {
let websocket_sink = self.sink.as_mut().unwrap(); let websocket_sink = self.sink.as_mut().unwrap();
@ -354,6 +523,7 @@ impl Minion {
Ok(()) Ok(())
} }
*/
async fn get_events(&mut self, ids: Vec<IdHex>) -> Result<(), Error> { async fn get_events(&mut self, ids: Vec<IdHex>) -> Result<(), Error> {
if ids.is_empty() { if ids.is_empty() {

View File

@ -220,10 +220,10 @@ impl Overlord {
// Fire off a minion to handle this relay // Fire off a minion to handle this relay
self.start_minion(best_relay.relay.url.clone()).await?; self.start_minion(best_relay.relay.url.clone()).await?;
// Tell it to follow the chosen people // Subscribe to the general feed
let _ = self.to_minions.send(BusMessage { let _ = self.to_minions.send(BusMessage {
target: best_relay.relay.url.clone(), target: best_relay.relay.url.clone(),
kind: "set_followed_people".to_string(), kind: "subscribe_general_feed".to_string(),
json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(), json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(),
}); });