Merge branch 'filter_set' into unstable

This commit is contained in:
Mike Dilger 2024-08-19 05:18:52 +12:00
commit ace9b9c0c2
7 changed files with 694 additions and 927 deletions

View File

@ -1,11 +1,12 @@
use crate::dm_channel::DmChannel;
use crate::filter_set::FilterSet;
use crate::misc::Private;
use crate::nip46::{Approval, ParsedCommand};
use crate::people::PersonList;
use crate::relay::Relay;
use nostr_types::{
Event, EventReference, Id, IdHex, Metadata, MilliSatoshi, NAddr, Profile, PublicKey, RelayUrl,
Tag, UncheckedUrl, Unixtime,
Event, EventReference, Id, Metadata, MilliSatoshi, NAddr, Profile, PublicKey, RelayUrl, Tag,
UncheckedUrl, Unixtime,
};
use std::fmt;
use std::hash::{Hash, Hasher};
@ -262,24 +263,8 @@ pub(crate) enum ToMinionPayloadDetail {
FetchNAddr(NAddr),
PostEvents(Vec<Event>),
Shutdown,
SubscribeAugments(Vec<IdHex>),
SubscribeConfig,
SubscribeDiscover(Vec<PublicKey>),
SubscribeGeneralFeed(Vec<PublicKey>, Unixtime),
SubscribeGiftwraps(Unixtime),
SubscribeGlobalFeed(Unixtime),
SubscribeInbox(Unixtime),
SubscribePersonFeed(PublicKey, Unixtime),
SubscribeReplies(IdHex),
SubscribeRootReplies(EventReference),
SubscribeDmChannel(DmChannel),
SubscribeNip46,
TempSubscribeGeneralFeedChunk(Unixtime),
TempSubscribePersonFeedChunk { pubkey: PublicKey, anchor: Unixtime },
TempSubscribeInboxFeedChunk(Unixtime),
TempSubscribeMetadata(Vec<PublicKey>),
UnsubscribeGlobalFeed,
UnsubscribePersonFeed,
Subscribe(FilterSet),
Unsubscribe(FilterSet),
UnsubscribeReplies,
}

View File

@ -3,15 +3,24 @@ pub use feed_kind::FeedKind;
use crate::comms::{ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage};
use crate::error::{Error, ErrorKind};
use crate::filter_set::FilterSet;
use crate::globals::GLOBALS;
use crate::people::PersonList;
use dashmap::DashMap;
use nostr_types::{Event, EventKind, EventReference, Filter, Id, NAddr, Unixtime};
use nostr_types::{Event, EventKind, EventReference, Filter, Id, NAddr, PublicKey, Unixtime};
use parking_lot::RwLock;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::task;
lazy_static! {
static ref DUMMY_PUBKEY: PublicKey = PublicKey::try_from_hex_string(
"ce4e68468c717280aa2fdd9db282897c969c172ba06fd7096b785c3c3ce79903",
false
)
.unwrap();
}
/// The system that computes feeds as an ordered list of event Ids.
pub struct Feed {
/// Consumers of gossip-lib should only read this, not write to it.
@ -105,7 +114,10 @@ impl Feed {
target: "all".to_string(),
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribePersonFeed,
detail: ToMinionPayloadDetail::Unsubscribe(FilterSet::PersonFeedFuture {
pubkey: *DUMMY_PUBKEY,
anchor: Unixtime::now(), // does not matter
}),
},
});
}
@ -117,7 +129,9 @@ impl Feed {
target: "all".to_string(),
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::UnsubscribeGlobalFeed,
detail: ToMinionPayloadDetail::Unsubscribe(FilterSet::GlobalFeedFuture(
Unixtime::now(),
)),
},
});
}

View File

@ -0,0 +1,512 @@
use crate::dm_channel::DmChannel;
use crate::globals::GLOBALS;
use nostr_types::{EventKind, Filter, IdHex, NAddr, PublicKey, PublicKeyHex, Tag, Unixtime};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq)]
pub enum FeedRange {
// Long-term subscription for anything after the given time
After { since: Unixtime },
// Short-term subscription for up to limit events preceding the until time
ChunkBefore { until: Unixtime, limit: usize },
}
impl FeedRange {
pub fn since_until_limit(&self) -> (Option<Unixtime>, Option<Unixtime>, Option<usize>) {
match *self {
FeedRange::After { since } => (Some(since), None, None),
FeedRange::ChunkBefore { until, limit } => (None, Some(until), Some(limit)),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum FilterSet {
Augments(Vec<IdHex>),
Config,
Discover(Vec<PublicKey>),
DmChannel(DmChannel),
GeneralFeedFuture {
pubkeys: Vec<PublicKey>,
anchor: Unixtime,
},
GeneralFeedChunk {
pubkeys: Vec<PublicKey>,
anchor: Unixtime,
},
Giftwraps(FeedRange),
GlobalFeedFuture(Unixtime),
GlobalFeedChunk(Unixtime),
InboxFeedFuture(Unixtime),
InboxFeedChunk(Unixtime),
Metadata(Vec<PublicKey>),
Nip46,
PersonFeedFuture {
pubkey: PublicKey,
anchor: Unixtime,
},
PersonFeedChunk {
pubkey: PublicKey,
anchor: Unixtime,
},
RepliesToId(IdHex),
RepliesToAddr(NAddr),
}
impl FilterSet {
pub fn temporary(&self) -> bool {
match self {
FilterSet::Augments(_) => true,
FilterSet::Config => false,
FilterSet::Discover(_) => true,
FilterSet::DmChannel(_) => false,
FilterSet::GeneralFeedFuture { .. } => false,
FilterSet::GeneralFeedChunk { .. } => true,
FilterSet::Giftwraps(_) => false,
FilterSet::GlobalFeedFuture(_) => false,
FilterSet::GlobalFeedChunk(_) => true,
FilterSet::InboxFeedFuture(_) => false,
FilterSet::InboxFeedChunk(_) => true,
FilterSet::Metadata(_) => true,
FilterSet::Nip46 => false,
FilterSet::PersonFeedFuture { .. } => false,
FilterSet::PersonFeedChunk { .. } => true,
FilterSet::RepliesToId(_) => false,
FilterSet::RepliesToAddr(_) => false,
}
}
pub fn can_have_duplicates(&self) -> bool {
match self {
FilterSet::GeneralFeedChunk { .. } => true,
FilterSet::GlobalFeedChunk(_) => true,
FilterSet::InboxFeedChunk(_) => true,
FilterSet::PersonFeedChunk { .. } => true,
_ => false,
}
}
pub fn is_loading_more(&self) -> bool {
match self {
FilterSet::GeneralFeedChunk { .. } => true,
FilterSet::GlobalFeedChunk(_) => true,
FilterSet::InboxFeedChunk(_) => true,
FilterSet::PersonFeedChunk { .. } => true,
_ => false,
}
}
pub fn inner_handle(&self) -> &'static str {
match self {
FilterSet::Augments(_) => "augments",
FilterSet::Config => "config_feed",
FilterSet::Discover(_) => "discover_feed",
FilterSet::DmChannel(_) => "dm_channel",
FilterSet::GeneralFeedFuture { .. } => "general_feed",
FilterSet::GeneralFeedChunk { .. } => "general_feed_chunk",
FilterSet::Giftwraps(_) => "giftwraps",
FilterSet::GlobalFeedFuture(_) => "global_feed",
FilterSet::GlobalFeedChunk(_) => "global_feed_chunk",
FilterSet::InboxFeedFuture(_) => "inbox_feed",
FilterSet::InboxFeedChunk(_) => "inbox_feed_chunk",
FilterSet::Metadata(_) => "subscribe_metadata",
FilterSet::Nip46 => "nip46",
FilterSet::PersonFeedFuture { .. } => "person_feed",
FilterSet::PersonFeedChunk { .. } => "person_feed_chunk",
FilterSet::RepliesToId(_) => "id_replies",
FilterSet::RepliesToAddr(_) => "addr_replies",
}
}
pub fn handle(&self, job_id: u64) -> String {
let mut handle = self.inner_handle().to_owned();
if self.temporary() {
handle = format!("temp_{}", handle);
}
if self.can_have_duplicates() {
handle = format!("{}_{}", handle, job_id)
}
handle
}
pub fn filters(&self, spamsafe: bool) -> Vec<Filter> {
let mut filters: Vec<Filter> = Vec::new();
match self {
FilterSet::Augments(ids) => {
let event_kinds = crate::feed::feed_augment_event_kinds();
let filter = {
let mut filter = Filter {
kinds: event_kinds,
..Default::default()
};
filter.set_tag_values('e', ids.iter().map(|id| id.to_string()).collect());
filter
};
filters.push(filter);
}
FilterSet::Config => {
let since = Unixtime::now() - Duration::from_secs(60 * 60 * 24 * 15);
if let Some(pubkey) = GLOBALS.identity.public_key() {
let pkh: PublicKeyHex = pubkey.into();
if GLOBALS.identity.is_unlocked() {
// GiftWraps to me, recent only
let giftwrap_since = Unixtime(since.0 - 60 * 60 * 24 * 7);
let giftwrap_filter = {
let mut f = Filter {
kinds: vec![EventKind::GiftWrap],
since: Some(giftwrap_since),
..Default::default()
};
f.set_tag_values('p', vec![pkh.to_string()]);
f
};
filters.push(giftwrap_filter);
}
// Actual config stuff
filters.push(Filter {
authors: vec![pkh.clone()],
kinds: vec![
EventKind::Metadata,
//EventKind::RecommendRelay,
EventKind::ContactList,
EventKind::MuteList,
EventKind::FollowSets,
EventKind::RelayList,
EventKind::DmRelayList,
EventKind::BookmarkList,
],
// these are all replaceable, no since required
..Default::default()
});
// Events I posted recently, including feed_displayable and
// augments (deletions, reactions, timestamp, label,reporting, and zap)
filters.push(Filter {
authors: vec![pkh],
kinds: crate::feed::feed_related_event_kinds(false), // not DMs
since: Some(since),
..Default::default()
});
}
}
FilterSet::Discover(pubkeys) => {
let pkp: Vec<PublicKeyHex> = pubkeys.iter().map(|pk| pk.into()).collect();
filters.push(Filter {
authors: pkp,
kinds: vec![EventKind::RelayList, EventKind::DmRelayList],
// these are all replaceable, no since required
..Default::default()
});
}
FilterSet::DmChannel(channel) => {
let pubkey = match GLOBALS.identity.public_key() {
Some(pk) => pk,
None => return vec![],
};
let pkh: PublicKeyHex = pubkey.into();
// note: giftwraps can't be subscribed by channel. they are subscribed more
// globally, and have to be limited to recent ones.
let mut authors: Vec<PublicKeyHex> =
channel.keys().iter().map(|k| k.into()).collect();
authors.push(pkh.clone());
let mut filter = Filter {
authors: authors.clone(),
kinds: vec![EventKind::EncryptedDirectMessage],
..Default::default()
};
// tagging the user
filter.set_tag_values('p', authors.iter().map(|x| x.as_str().to_owned()).collect());
filters.push(filter);
}
FilterSet::GeneralFeedFuture { pubkeys, anchor } => {
if pubkeys.is_empty() {
return vec![];
}
let pkp: Vec<PublicKeyHex> = pubkeys.iter().map(|pk| pk.into()).collect();
// Do not load feed related event kinds, or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let range = FeedRange::After { since: *anchor };
let (since, until, limit) = range.since_until_limit();
filters.push(Filter {
authors: pkp,
kinds: event_kinds,
since,
until,
limit,
..Default::default()
});
}
FilterSet::GeneralFeedChunk { pubkeys, anchor } => {
if pubkeys.is_empty() {
return vec![];
}
let pkp: Vec<PublicKeyHex> = pubkeys.iter().map(|pk| pk.into()).collect();
// Do not load feed related event kinds, or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let range = FeedRange::ChunkBefore {
until: *anchor,
limit,
};
let (since, until, limit) = range.since_until_limit();
filters.push(Filter {
authors: pkp,
kinds: event_kinds,
since,
until,
limit,
..Default::default()
});
}
FilterSet::Giftwraps(range) => {
let (since, until, limit) = range.since_until_limit();
if let Some(pubkey) = GLOBALS.identity.public_key() {
let pkh: PublicKeyHex = pubkey.into();
// Giftwraps cannot be filtered by author so we have to take them regardless
// of the spamsafe designation of the relay.
//
// Sure, the TOTAL number of giftwraps being the limit will be MORE than we need,
// but since giftwraps get backdated, this is probably a good thing.
let filter = {
let mut filter = Filter {
kinds: vec![EventKind::GiftWrap],
// giftwraps may be dated 1 week in the past:
since: since.map(|u| Unixtime(*u - (3600 * 24 * 7))),
until,
limit,
..Default::default()
};
let values = vec![pkh.to_string()];
filter.set_tag_values('p', values);
filter
};
filters.push(filter);
}
}
FilterSet::GlobalFeedFuture(anchor) => {
// Allow all feed related event kinds (excluding DMs)
// Do not load feed related or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let range = FeedRange::After { since: *anchor };
let (since, until, limit) = range.since_until_limit();
filters.push(Filter {
kinds: event_kinds.clone(),
since,
until,
limit,
..Default::default()
});
}
FilterSet::GlobalFeedChunk(anchor) => {
// Allow all feed related event kinds (excluding DMs)
// Do not load feed related or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let range = FeedRange::ChunkBefore {
until: *anchor,
limit,
};
let (since, until, limit) = range.since_until_limit();
filters.push(Filter {
kinds: event_kinds,
since,
until,
limit,
..Default::default()
});
}
FilterSet::InboxFeedFuture(anchor) => {
if let Some(pubkey) = GLOBALS.identity.public_key() {
let mut filter = Self::inbox_base_filter(pubkey, spamsafe);
let range = FeedRange::After { since: *anchor };
let (since, until, limit) = range.since_until_limit();
filter.since = since;
filter.until = until;
filter.limit = limit;
filters.push(filter);
}
}
FilterSet::InboxFeedChunk(anchor) => {
if let Some(pubkey) = GLOBALS.identity.public_key() {
let mut filter = Self::inbox_base_filter(pubkey, spamsafe);
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let range = FeedRange::ChunkBefore {
until: *anchor,
limit,
};
let (since, until, limit) = range.since_until_limit();
filter.since = since;
filter.until = until;
filter.limit = limit;
filters.push(filter.clone());
}
}
FilterSet::Metadata(pubkeys) => {
let pkhp: Vec<PublicKeyHex> = pubkeys.iter().map(|pk| pk.into()).collect();
filters.push(Filter {
authors: pkhp,
kinds: vec![
EventKind::Metadata,
EventKind::RelayList,
EventKind::DmRelayList,
],
// FIXME: we could probably get a since-last-fetched-their-metadata here.
// but relays should just return the latest of these.
..Default::default()
});
}
FilterSet::Nip46 => {
let pubkey = match GLOBALS.identity.public_key() {
Some(pk) => pk,
None => return vec![],
};
let pkh: PublicKeyHex = pubkey.into();
let mut filter = Filter {
kinds: vec![EventKind::NostrConnect],
..Default::default()
};
filter.set_tag_values('p', vec![pkh.to_string()]);
filters.push(filter);
}
FilterSet::PersonFeedFuture { pubkey, anchor } => {
// Allow all feed related event kinds (excluding DMs)
// Do not load feed related or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let range = FeedRange::After { since: *anchor };
let (since, until, limit) = range.since_until_limit();
filters.push(Filter {
authors: vec![pubkey.into()],
kinds: event_kinds,
since,
until,
limit,
..Default::default()
});
}
FilterSet::PersonFeedChunk { pubkey, anchor } => {
// Allow all feed related event kinds (excluding DMs)
// Do not load feed related or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let range = FeedRange::ChunkBefore {
until: *anchor,
limit,
};
let (since, until, limit) = range.since_until_limit();
filters.push(Filter {
authors: vec![pubkey.into()],
kinds: event_kinds,
since,
until,
limit,
..Default::default()
});
}
FilterSet::RepliesToId(id) => {
// Allow all feed related event kinds (excluding DMs)
// (related because we want deletion events, and may as well get likes and zaps too)
let event_kinds = crate::feed::feed_related_event_kinds(false);
let filter = {
let mut filter = Filter {
kinds: event_kinds,
..Default::default()
};
let values = vec![id.to_string()];
filter.set_tag_values('e', values);
// Spam prevention:
if !spamsafe && GLOBALS.storage.read_setting_avoid_spam_on_unsafe_relays() {
filter.authors = GLOBALS
.people
.get_subscribed_pubkeys()
.drain(..)
.map(|pk| pk.into())
.collect();
}
filter
};
filters.push(filter);
}
FilterSet::RepliesToAddr(addr) => {
// Allow all feed related event kinds (excluding DMs)
// (related because we want deletion events, and may as well get likes and zaps too)
let event_kinds = crate::feed::feed_related_event_kinds(false);
let filter = {
let mut filter = Filter {
kinds: event_kinds,
..Default::default()
};
let a_tag = Tag::new_address(addr, None);
filter.set_tag_values('a', vec![a_tag.value().to_owned()]);
// Spam prevention:
if !spamsafe && GLOBALS.storage.read_setting_avoid_spam_on_unsafe_relays() {
filter.authors = GLOBALS
.people
.get_subscribed_pubkeys()
.drain(..)
.map(|pk| pk.into())
.collect();
}
filter
};
filters.push(filter);
}
}
filters
}
fn inbox_base_filter(pubkey: PublicKey, spamsafe: bool) -> Filter {
// Allow all feed displayable event kinds (including DMs)
let mut event_kinds = crate::feed::feed_displayable_event_kinds(true);
event_kinds.retain(|f| *f != EventKind::GiftWrap); // gift wrap is not included here
// Any mentions of me (but not in peoples contact lists, for example)
let pkh: PublicKeyHex = pubkey.into();
let mut filter = Filter {
kinds: event_kinds,
..Default::default()
};
let values = vec![pkh.to_string()];
filter.set_tag_values('p', values);
// Spam prevention:
if !spamsafe && GLOBALS.storage.read_setting_avoid_spam_on_unsafe_relays() {
// As the relay is not spam safe, only take mentions from followers
filter.authors = GLOBALS
.people
.get_subscribed_pubkeys()
.drain(..)
.map(|pk| pk.into())
.collect();
}
filter
}
}

View File

@ -90,6 +90,8 @@ pub use fetcher::Fetcher;
mod filter;
mod filter_set;
mod globals;
pub use globals::{Globals, GLOBALS};

View File

@ -1,358 +0,0 @@
use crate::dm_channel::DmChannel;
use crate::globals::GLOBALS;
use nostr_types::{EventKind, Filter, IdHex, NAddr, PublicKey, PublicKeyHex, Tag, Unixtime};
pub enum FeedRange {
// Long-term subscription for anything after the given time
After {
since: Unixtime,
},
// Short-term subscription for up to limit events preceding the until time
#[allow(dead_code)]
ChunkBefore {
until: Unixtime,
limit: usize,
},
}
impl FeedRange {
pub fn since_until_limit(&self) -> (Option<Unixtime>, Option<Unixtime>, Option<usize>) {
match *self {
FeedRange::After { since } => (Some(since), None, None),
FeedRange::ChunkBefore { until, limit } => (None, Some(until), Some(limit)),
}
}
}
pub fn general_feed(authors: &[PublicKey], range: FeedRange) -> Vec<Filter> {
let mut filters: Vec<Filter> = Vec::new();
if authors.is_empty() {
return vec![];
}
let pkp: Vec<PublicKeyHex> = authors.iter().map(|pk| pk.into()).collect();
// Do not load feed related event kinds, or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let (since, until, limit) = range.since_until_limit();
// feed related by people followed
filters.push(Filter {
authors: pkp,
kinds: event_kinds.clone(),
since,
until,
limit,
..Default::default()
});
filters
}
pub fn inbox_feed(spamsafe: bool, range: FeedRange) -> Vec<Filter> {
let mut filters: Vec<Filter> = Vec::new();
// Allow all feed displayable event kinds (including DMs)
let mut event_kinds = crate::feed::feed_displayable_event_kinds(true);
event_kinds.retain(|f| *f != EventKind::GiftWrap); // gift wrap is not included here
let (since, until, limit) = range.since_until_limit();
if let Some(pubkey) = GLOBALS.identity.public_key() {
// Any mentions of me
// (but not in peoples contact lists, for example)
let pkh: PublicKeyHex = pubkey.into();
let filter = {
let mut filter = Filter {
kinds: event_kinds,
since,
until,
limit,
..Default::default()
};
let values = vec![pkh.to_string()];
filter.set_tag_values('p', values);
// Spam prevention:
if !spamsafe && GLOBALS.storage.read_setting_avoid_spam_on_unsafe_relays() {
// As the relay is not spam safe, only take mentions from followers
filter.authors = GLOBALS
.people
.get_subscribed_pubkeys()
.drain(..)
.map(|pk| pk.into())
.collect();
}
filter
};
filters.push(filter);
}
filters
}
pub fn giftwraps(range: FeedRange) -> Vec<Filter> {
let mut filters: Vec<Filter> = Vec::new();
let (since, until, limit) = range.since_until_limit();
if let Some(pubkey) = GLOBALS.identity.public_key() {
let pkh: PublicKeyHex = pubkey.into();
// Giftwraps cannot be filtered by author so we have to take them regardless
// of the spamsafe designation of the relay.
//
// Sure, the TOTAL number of giftwraps being the limit will be MORE than we need,
// but since giftwraps get backdated, this is probably a good thing.
let filter = {
let mut filter = Filter {
kinds: vec![EventKind::GiftWrap],
// giftwraps may be dated 1 week in the past:
since: since.map(|u| Unixtime(*u - (3600 * 24 * 7))),
until,
limit,
..Default::default()
};
let values = vec![pkh.to_string()];
filter.set_tag_values('p', values);
filter
};
filters.push(filter);
}
filters
}
pub fn person_feed(pubkey: PublicKey, range: FeedRange) -> Vec<Filter> {
// Allow all feed related event kinds (excluding DMs)
// Do not load feed related or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let (since, until, limit) = range.since_until_limit();
vec![Filter {
authors: vec![pubkey.into()],
kinds: event_kinds,
since,
until,
limit,
..Default::default()
}]
}
pub fn global_feed(range: FeedRange) -> Vec<Filter> {
// Allow all feed related event kinds (excluding DMs)
// Do not load feed related or the limit will be wrong
let event_kinds = crate::feed::feed_displayable_event_kinds(false);
let (since, until, limit) = range.since_until_limit();
vec![Filter {
kinds: event_kinds,
since,
until,
limit,
..Default::default()
}]
}
pub fn augments(ids: &[IdHex]) -> Vec<Filter> {
let event_kinds = crate::feed::feed_augment_event_kinds();
let filter = {
let mut filter = Filter {
kinds: event_kinds,
..Default::default()
};
filter.set_tag_values('e', ids.iter().map(|id| id.to_string()).collect());
filter
};
vec![filter]
}
pub fn config(since: Unixtime) -> Vec<Filter> {
let mut filters: Vec<Filter> = Vec::new();
if let Some(pubkey) = GLOBALS.identity.public_key() {
let pkh: PublicKeyHex = pubkey.into();
if GLOBALS.identity.is_unlocked() {
// GiftWraps to me, recent only
let giftwrap_since = Unixtime(since.0 - 60 * 60 * 24 * 7);
let giftwrap_filter = {
let mut f = Filter {
kinds: vec![EventKind::GiftWrap],
since: Some(giftwrap_since),
..Default::default()
};
f.set_tag_values('p', vec![pkh.to_string()]);
f
};
filters.push(giftwrap_filter);
}
// Actual config stuff
filters.push(Filter {
authors: vec![pkh.clone()],
kinds: vec![
EventKind::Metadata,
//EventKind::RecommendRelay,
EventKind::ContactList,
EventKind::MuteList,
EventKind::FollowSets,
EventKind::RelayList,
EventKind::DmRelayList,
EventKind::BookmarkList,
],
// these are all replaceable, no since required
..Default::default()
});
// Events I posted recently, including feed_displayable and
// augments (deletions, reactions, timestamp, label,reporting, and zap)
filters.push(Filter {
authors: vec![pkh],
kinds: crate::feed::feed_related_event_kinds(false), // not DMs
since: Some(since),
..Default::default()
});
}
filters
}
// This FORCES the fetch of relay lists without checking if we need them.
// See also relay_lists() which checks if they are needed first.
pub fn discover(pubkeys: &[PublicKey]) -> Vec<Filter> {
let pkp: Vec<PublicKeyHex> = pubkeys.iter().map(|pk| pk.into()).collect();
vec![Filter {
authors: pkp,
kinds: vec![EventKind::RelayList, EventKind::DmRelayList],
// these are all replaceable, no since required
..Default::default()
}]
}
pub fn replies(main: IdHex, spamsafe: bool) -> Vec<Filter> {
let mut filters: Vec<Filter> = Vec::new();
// Allow all feed related event kinds (excluding DMs)
// (related because we want deletion events, and may as well get likes and zaps too)
let event_kinds = crate::feed::feed_related_event_kinds(false);
let filter = {
let mut filter = Filter {
kinds: event_kinds,
..Default::default()
};
let values = vec![main.to_string()];
filter.set_tag_values('e', values);
// Spam prevention:
if !spamsafe && GLOBALS.storage.read_setting_avoid_spam_on_unsafe_relays() {
filter.authors = GLOBALS
.people
.get_subscribed_pubkeys()
.drain(..)
.map(|pk| pk.into())
.collect();
}
filter
};
filters.push(filter);
filters
}
pub fn replies_to_eaddr(ea: &NAddr, spamsafe: bool) -> Vec<Filter> {
let mut filters: Vec<Filter> = Vec::new();
// Allow all feed related event kinds (excluding DMs)
// (related because we want deletion events, and may as well get likes and zaps too)
let event_kinds = crate::feed::feed_related_event_kinds(false);
let filter = {
let mut filter = Filter {
kinds: event_kinds,
..Default::default()
};
let a_tag = Tag::new_address(ea, None);
filter.set_tag_values('a', vec![a_tag.value().to_owned()]);
// Spam prevention:
if !spamsafe && GLOBALS.storage.read_setting_avoid_spam_on_unsafe_relays() {
filter.authors = GLOBALS
.people
.get_subscribed_pubkeys()
.drain(..)
.map(|pk| pk.into())
.collect();
}
filter
};
filters.push(filter);
filters
}
pub fn dm_channel(dmchannel: DmChannel) -> Vec<Filter> {
let pubkey = match GLOBALS.identity.public_key() {
Some(pk) => pk,
None => return vec![],
};
let pkh: PublicKeyHex = pubkey.into();
// note: giftwraps can't be subscribed by channel. they are subscribed more
// globally, and have to be limited to recent ones.
let mut authors: Vec<PublicKeyHex> = dmchannel.keys().iter().map(|k| k.into()).collect();
authors.push(pkh.clone());
let mut filter = Filter {
authors: authors.clone(),
kinds: vec![EventKind::EncryptedDirectMessage],
..Default::default()
};
// tagging the user
filter.set_tag_values('p', authors.iter().map(|x| x.as_str().to_owned()).collect());
vec![filter]
}
pub fn nip46() -> Vec<Filter> {
let pubkey = match GLOBALS.identity.public_key() {
Some(pk) => pk,
None => return vec![],
};
let pkh: PublicKeyHex = pubkey.into();
let mut filter = Filter {
kinds: vec![EventKind::NostrConnect],
..Default::default()
};
filter.set_tag_values('p', vec![pkh.to_string()]);
vec![filter]
}
pub fn metadata(pubkeys: &[PublicKey]) -> Vec<Filter> {
let pkhp: Vec<PublicKeyHex> = pubkeys.iter().map(|pk| pk.into()).collect();
vec![Filter {
authors: pkhp,
kinds: vec![
EventKind::Metadata,
EventKind::RelayList,
EventKind::DmRelayList,
],
// FIXME: we could probably get a since-last-fetched-their-metadata here.
// but relays should just return the latest of these.
..Default::default()
}]
}

View File

@ -1,13 +1,10 @@
mod filter_fns;
use filter_fns::FeedRange;
mod handle_websocket;
mod subscription;
mod subscription_map;
use crate::comms::{ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage};
use crate::dm_channel::DmChannel;
use crate::error::{Error, ErrorKind};
use crate::filter_set::FilterSet;
use crate::globals::GLOBALS;
use crate::relay::Relay;
use crate::{RunState, USER_AGENT};
@ -19,8 +16,8 @@ use http::uri::{Parts, Scheme};
use http::Uri;
use mime::Mime;
use nostr_types::{
ClientMessage, EventKind, EventReference, Filter, Id, IdHex, NAddr, PreEvent, PublicKey,
PublicKeyHex, RelayInformationDocument, RelayUrl, Tag, Unixtime,
ClientMessage, EventKind, Filter, Id, IdHex, NAddr, PreEvent, PublicKey, PublicKeyHex,
RelayInformationDocument, RelayUrl, Tag, Unixtime,
};
use reqwest::Response;
use std::borrow::Cow;
@ -86,7 +83,6 @@ pub struct Minion {
subscriptions_waiting_for_auth: HashMap<String, Unixtime>,
subscriptions_waiting_for_metadata: Vec<(u64, Vec<PublicKey>)>,
subscriptions_rate_limited: Vec<String>,
general_feed_keys: Vec<PublicKey>,
read_runstate: WatchReceiver<RunState>,
exiting: Option<MinionExitReason>,
auth_state: AuthState,
@ -132,7 +128,6 @@ impl Minion {
subscriptions_waiting_for_auth: HashMap::new(),
subscriptions_waiting_for_metadata: Vec::new(),
subscriptions_rate_limited: Vec::new(),
general_feed_keys: Vec::new(),
read_runstate,
exiting: None,
auth_state: AuthState::None,
@ -164,19 +159,11 @@ impl Minion {
// Optimization: before connecting to the relay, handle any 'loading_more' bumps
// that would happen after connecting to the relay.
for message in &messages {
let loading_more = matches!(
message.detail,
ToMinionPayloadDetail::TempSubscribeGeneralFeedChunk(_)
) || matches!(
message.detail,
ToMinionPayloadDetail::TempSubscribePersonFeedChunk { .. }
) || matches!(
message.detail,
ToMinionPayloadDetail::TempSubscribeInboxFeedChunk(_)
);
if loading_more {
self.loading_more += 1;
let _ = GLOBALS.loading_more.fetch_add(1, Ordering::SeqCst);
if let ToMinionPayloadDetail::Subscribe(filter_set) = &message.detail {
if filter_set.is_loading_more() {
self.loading_more += 1;
let _ = GLOBALS.loading_more.fetch_add(1, Ordering::SeqCst);
}
}
}
@ -618,71 +605,34 @@ impl Minion {
tracing::debug!("{}: Websocket listener shutting down", &self.url);
self.exiting = Some(MinionExitReason::GotShutdownMessage);
}
ToMinionPayloadDetail::SubscribeAugments(ids) => {
self.subscribe_augments(message.job_id, ids).await?;
}
ToMinionPayloadDetail::SubscribeGeneralFeed(pubkeys, anchor) => {
if self.general_feed_keys.is_empty() {
self.general_feed_keys = pubkeys;
self.subscribe_general_feed_initial(message.job_id, anchor)
.await?;
ToMinionPayloadDetail::Subscribe(filter_set) => {
let handle = filter_set.handle(message.job_id);
// If we aren't running it already, OR if it can have duplicates
if !self.subscription_map.has(&handle) || filter_set.can_have_duplicates() {
let spamsafe = self.dbrelay.has_usage_bits(Relay::SPAMSAFE);
let filters = filter_set.filters(spamsafe);
if !filters.is_empty() {
self.subscribe(filters, &handle, message.job_id).await?;
}
} else {
self.subscribe_general_feed_additional_keys(message.job_id, pubkeys, anchor)
.await?;
// It does not allow duplicates and we are already running it,
// but maybe we can save it for later...
if let FilterSet::Metadata(pubkeys) = filter_set {
// Save for later
self.subscriptions_waiting_for_metadata
.push((message.job_id, pubkeys));
}
}
}
ToMinionPayloadDetail::SubscribeInbox(anchor) => {
self.subscribe_inbox(message.job_id, anchor).await?;
}
ToMinionPayloadDetail::SubscribeConfig => {
self.subscribe_config(message.job_id).await?;
}
ToMinionPayloadDetail::SubscribeDiscover(pubkeys) => {
self.subscribe_discover(message.job_id, pubkeys).await?;
}
ToMinionPayloadDetail::SubscribeGiftwraps(anchor) => {
self.subscribe_giftwraps(message.job_id, anchor).await?;
}
ToMinionPayloadDetail::SubscribeGlobalFeed(anchor) => {
self.subscribe_global_feed(message.job_id, anchor).await?;
}
ToMinionPayloadDetail::SubscribePersonFeed(pubkey, anchor) => {
self.subscribe_person_feed(message.job_id, pubkey, anchor)
.await?;
}
ToMinionPayloadDetail::SubscribeReplies(main) => {
self.subscribe_replies(message.job_id, main).await?;
}
ToMinionPayloadDetail::SubscribeRootReplies(main) => {
self.subscribe_root_replies(message.job_id, main).await?;
}
ToMinionPayloadDetail::SubscribeDmChannel(dmchannel) => {
self.subscribe_dm_channel(message.job_id, dmchannel).await?;
}
ToMinionPayloadDetail::SubscribeNip46 => {
self.subscribe_nip46(message.job_id).await?;
}
ToMinionPayloadDetail::TempSubscribeGeneralFeedChunk(anchor) => {
self.temp_subscribe_general_feed_chunk(message.job_id, anchor)
.await?;
}
ToMinionPayloadDetail::TempSubscribePersonFeedChunk { pubkey, anchor } => {
self.temp_subscribe_person_feed_chunk(message.job_id, pubkey, anchor)
.await?;
}
ToMinionPayloadDetail::TempSubscribeInboxFeedChunk(anchor) => {
self.temp_subscribe_inbox_feed_chunk(message.job_id, anchor)
.await?;
}
ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys) => {
self.temp_subscribe_metadata(message.job_id, pubkeys)
.await?;
}
ToMinionPayloadDetail::UnsubscribeGlobalFeed => {
self.unsubscribe("global_feed").await?;
}
ToMinionPayloadDetail::UnsubscribePersonFeed => {
self.unsubscribe("person_feed").await?;
ToMinionPayloadDetail::Unsubscribe(filter_set) => {
let handles = self
.subscription_map
.get_all_handles_matching(filter_set.inner_handle());
for handle in handles {
self.unsubscribe(&handle).await?;
}
}
ToMinionPayloadDetail::UnsubscribeReplies => {
self.unsubscribe("replies").await?;
@ -693,370 +643,6 @@ impl Minion {
Ok(())
}
async fn subscribe_augments(&mut self, job_id: u64, ids: Vec<IdHex>) -> Result<(), Error> {
let filters = filter_fns::augments(&ids);
self.subscribe(filters, "temp_augments", job_id).await?;
Ok(())
}
// Subscribe to the user's followers on the relays they write to
async fn subscribe_general_feed_initial(
&mut self,
job_id: u64,
anchor: Unixtime,
) -> Result<(), Error> {
tracing::debug!(
"Following {} people at {}",
self.general_feed_keys.len(),
&self.url
);
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let mut filters =
filter_fns::general_feed(&self.general_feed_keys, FeedRange::After { since: anchor });
let filters2 = filter_fns::general_feed(
&self.general_feed_keys,
FeedRange::ChunkBefore {
until: anchor,
limit,
},
);
filters.extend(filters2);
if filters.is_empty() {
self.unsubscribe("general_feed").await?;
self.to_overlord.send(ToOverlordMessage::MinionJobComplete(
self.url.clone(),
job_id,
))?;
} else {
self.subscribe(filters, "general_feed", job_id).await?;
}
Ok(())
}
/// Subscribe to general feed with change of pubkeys
async fn subscribe_general_feed_additional_keys(
&mut self,
job_id: u64,
pubkeys: Vec<PublicKey>,
anchor: Unixtime,
) -> Result<(), Error> {
// Figure out who the new people are (if any)
let mut new_keys = pubkeys.clone();
new_keys.retain(|key| !self.general_feed_keys.contains(key));
if !new_keys.is_empty() {
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let mut filters =
filter_fns::general_feed(&new_keys, FeedRange::After { since: anchor });
let filters2 = filter_fns::general_feed(
&new_keys,
FeedRange::ChunkBefore {
until: anchor,
limit,
},
);
filters.extend(filters2);
if !filters.is_empty() {
self.subscribe(filters, "temp_general_feed_update", job_id)
.await?;
}
}
self.general_feed_keys = pubkeys;
Ok(())
}
async fn subscribe_giftwraps(&mut self, job_id: u64, after: Unixtime) -> Result<(), Error> {
// If we have already subscribed to giftwraps, do not resubscribe
if self.subscription_map.has("giftwraps") {
return Ok(());
}
let filters = filter_fns::giftwraps(FeedRange::After { since: after });
if filters.is_empty() {
return Ok(());
}
self.subscribe(filters, "giftwraps", job_id).await?;
Ok(())
}
// Subscribe to anybody mentioning the user on the relays the user reads from
// (and any other relay for the time being until nip65 is in widespread use)
async fn subscribe_inbox(&mut self, job_id: u64, anchor: Unixtime) -> Result<(), Error> {
// If we have already subscribed to inbox, do not resubscribe
if self.subscription_map.has("inbox_feed") {
return Ok(());
}
let spamsafe = self.dbrelay.has_usage_bits(Relay::SPAMSAFE);
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let mut filters = filter_fns::inbox_feed(
spamsafe,
FeedRange::ChunkBefore {
until: anchor,
limit,
},
);
filters.extend(filter_fns::inbox_feed(
spamsafe,
FeedRange::After { since: anchor },
));
if filters.is_empty() {
return Ok(());
}
self.subscribe(filters, "inbox_feed", job_id).await?;
Ok(())
}
// Subscribe to the user's config (config, DMs, etc) which is on their own write relays
async fn subscribe_config(&mut self, job_id: u64) -> Result<(), Error> {
let since = Unixtime::now() - Duration::from_secs(60 * 60 * 24 * 15);
let filters = filter_fns::config(since);
if filters.is_empty() {
return Ok(());
} else {
self.subscribe(filters, "config_feed", job_id).await?;
}
Ok(())
}
// Discover relay lists
async fn subscribe_discover(
&mut self,
job_id: u64,
pubkeys: Vec<PublicKey>,
) -> Result<(), Error> {
if !pubkeys.is_empty() {
let filters = filter_fns::discover(&pubkeys);
self.subscribe(filters, "temp_discover_feed", job_id)
.await?;
}
Ok(())
}
// Subscribe to the posts a person generates on the relays they write to
async fn subscribe_person_feed(
&mut self,
job_id: u64,
pubkey: PublicKey,
anchor: Unixtime,
) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let mut filters = filter_fns::person_feed(pubkey, FeedRange::After { since: anchor });
let filters2 = filter_fns::person_feed(
pubkey,
FeedRange::ChunkBefore {
until: anchor,
limit,
},
);
filters.extend(filters2);
if filters.is_empty() {
self.unsubscribe_person_feed().await?;
self.to_overlord.send(ToOverlordMessage::MinionJobComplete(
self.url.clone(),
job_id,
))?;
} else {
self.subscribe(filters, "person_feed", job_id).await?;
}
Ok(())
}
async fn subscribe_global_feed(&mut self, job_id: u64, anchor: Unixtime) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let mut filters = filter_fns::global_feed(FeedRange::After { since: anchor });
let filters2 = filter_fns::global_feed(FeedRange::ChunkBefore {
until: anchor,
limit,
});
filters.extend(filters2);
if filters.is_empty() {
self.unsubscribe_global_feed().await?;
self.to_overlord.send(ToOverlordMessage::MinionJobComplete(
self.url.clone(),
job_id,
))?;
} else {
self.subscribe(filters, "global_feed", job_id).await?;
}
Ok(())
}
async fn temp_subscribe_person_feed_chunk(
&mut self,
job_id: u64,
pubkey: PublicKey,
anchor: Unixtime,
) -> Result<(), Error> {
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let filters = filter_fns::person_feed(
pubkey,
FeedRange::ChunkBefore {
until: anchor,
limit,
},
);
if filters.is_empty() {
self.unsubscribe_person_feed().await?;
self.to_overlord.send(ToOverlordMessage::MinionJobComplete(
self.url.clone(),
job_id,
))?;
} else {
let sub_name = format!("temp_person_feed_chunk_{}", job_id);
if !self.initial_handling {
self.loading_more += 1;
let _ = GLOBALS.loading_more.fetch_add(1, Ordering::SeqCst);
}
self.subscribe(filters, &sub_name, job_id).await?;
}
Ok(())
}
async fn temp_subscribe_inbox_feed_chunk(
&mut self,
job_id: u64,
anchor: Unixtime,
) -> Result<(), Error> {
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let spamsafe = self.dbrelay.has_usage_bits(Relay::SPAMSAFE);
let filters = filter_fns::inbox_feed(
spamsafe,
FeedRange::ChunkBefore {
until: anchor,
limit,
},
);
if filters.is_empty() {
self.to_overlord.send(ToOverlordMessage::MinionJobComplete(
self.url.clone(),
job_id,
))?;
return Ok(());
}
let sub_name = format!("temp_inbox_feed_chunk_{}", job_id);
if !self.initial_handling {
self.loading_more += 1;
let _ = GLOBALS.loading_more.fetch_add(1, Ordering::SeqCst);
}
self.subscribe(filters, &sub_name, job_id).await?;
Ok(())
}
async fn unsubscribe_global_feed(&mut self) -> Result<(), Error> {
// Unsubscribe global_feed and all person feed chunks
let handles = self
.subscription_map
.get_all_handles_matching("global_feed");
for handle in handles {
self.unsubscribe(&handle).await?;
}
Ok(())
}
async fn unsubscribe_person_feed(&mut self) -> Result<(), Error> {
// Unsubscribe person_feed and all person feed chunks
let handles = self
.subscription_map
.get_all_handles_matching("person_feed");
for handle in handles {
self.unsubscribe(&handle).await?;
}
Ok(())
}
async fn subscribe_root_replies(
&mut self,
job_id: u64,
main: EventReference,
) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed
// Replies
let spamsafe = self.dbrelay.has_usage_bits(Relay::SPAMSAFE);
let filters = match main {
EventReference::Id { id, .. } => filter_fns::replies(id.into(), spamsafe),
EventReference::Addr(ref eaddr) => filter_fns::replies_to_eaddr(eaddr, spamsafe),
};
self.subscribe(filters, "root_replies", job_id).await?;
Ok(())
}
async fn subscribe_replies(&mut self, job_id: u64, main: IdHex) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed
// Replies
let spamsafe = self.dbrelay.has_usage_bits(Relay::SPAMSAFE);
let filters = filter_fns::replies(main, spamsafe);
self.subscribe(filters, "replies", job_id).await?;
Ok(())
}
async fn subscribe_dm_channel(
&mut self,
job_id: u64,
dmchannel: DmChannel,
) -> Result<(), Error> {
// We will need the private key to auth to the relay for this
if !GLOBALS.identity.is_unlocked() {
return Err(ErrorKind::NoPrivateKey.into());
}
let filters = filter_fns::dm_channel(dmchannel);
if !filters.is_empty() {
self.subscribe(filters, "dm_channel", job_id).await?;
}
Ok(())
}
async fn subscribe_nip46(&mut self, job_id: u64) -> Result<(), Error> {
let filters = filter_fns::nip46();
if !filters.is_empty() {
self.subscribe(filters, "nip46", job_id).await?;
}
Ok(())
}
async fn get_events(&mut self) -> Result<(), Error> {
// Collect all the sought events we have not yet asked for, and
// presumptively mark them as having been asked for.
@ -1094,9 +680,10 @@ impl Minion {
// This is run every tick
async fn try_subscribe_waiting(&mut self) -> Result<(), Error> {
// Subscribe to metadata
if !self.subscription_map.has("temp_subscribe_metadata")
&& !self.subscriptions_waiting_for_metadata.is_empty()
// Subscribe to metadata that is waiting (unless we already have a
// metadata subscription running in which case we just keep waiting)
if !self.subscriptions_waiting_for_metadata.is_empty()
&& !self.subscription_map.has("temp_subscribe_metadata")
{
let mut subscriptions_waiting_for_metadata =
std::mem::take(&mut self.subscriptions_waiting_for_metadata);
@ -1116,7 +703,11 @@ impl Minion {
combined_pubkeys.extend(pubkeys);
}
self.temp_subscribe_metadata(combined_job_id.unwrap(), combined_pubkeys)
let handle = "temp_subscribe_metadata".to_string();
let filter_set = FilterSet::Metadata(combined_pubkeys);
let spamsafe = self.dbrelay.has_usage_bits(Relay::SPAMSAFE);
let filters = filter_set.filters(spamsafe);
self.subscribe(filters, &handle, combined_job_id.unwrap())
.await?;
}
@ -1169,57 +760,6 @@ impl Minion {
self.subscribe(vec![filter], &handle, job_id).await
}
// Load more, one more chunk back
async fn temp_subscribe_general_feed_chunk(
&mut self,
job_id: u64,
anchor: Unixtime,
) -> Result<(), Error> {
let limit = GLOBALS.storage.read_setting_load_more_count() as usize;
let filters = filter_fns::general_feed(
&self.general_feed_keys,
FeedRange::ChunkBefore {
until: anchor,
limit,
},
);
if !filters.is_empty() {
// We include the job_id so that if the user presses "load more" yet again,
// the new chunk subscription doesn't clobber this subscription which might
// not have run to completion yet.
let sub_name = format!("temp_general_feed_chunk_{}", job_id);
if !self.initial_handling {
self.loading_more += 1;
let _ = GLOBALS.loading_more.fetch_add(1, Ordering::SeqCst);
}
self.subscribe(filters, &sub_name, job_id).await?;
}
Ok(())
}
async fn temp_subscribe_metadata(
&mut self,
job_id: u64,
pubkeys: Vec<PublicKey>,
) -> Result<(), Error> {
if self.subscription_map.has("temp_subscribe_metadata") {
// Save for later
self.subscriptions_waiting_for_metadata
.push((job_id, pubkeys));
return Ok(());
}
tracing::trace!("Temporarily subscribing to metadata on {}", &self.url);
let handle = "temp_subscribe_metadata".to_string();
let filters = filter_fns::metadata(&pubkeys);
self.subscribe(filters, &handle, job_id).await
}
async fn subscribe(
&mut self,
filters: Vec<Filter>,

View File

@ -5,6 +5,7 @@ use crate::comms::{
use crate::dm_channel::DmChannel;
use crate::error::{Error, ErrorKind};
use crate::feed::FeedKind;
use crate::filter_set::{FeedRange, FilterSet};
use crate::globals::GLOBALS;
use crate::manager;
use crate::minion::MinionExitReason;
@ -325,16 +326,28 @@ impl Overlord {
async fn apply_relay_assignment(&mut self, assignment: RelayAssignment) -> Result<(), Error> {
let anchor = GLOBALS.feed.current_anchor();
let mut jobs = vec![RelayJob {
reason: RelayConnectionReason::Follow,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeGeneralFeed(
assignment.pubkeys.clone(),
anchor,
),
let mut jobs = vec![
RelayJob {
reason: RelayConnectionReason::Follow,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::GeneralFeedFuture {
pubkeys: assignment.pubkeys.clone(),
anchor,
}),
},
},
}];
RelayJob {
reason: RelayConnectionReason::Follow,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::GeneralFeedChunk {
pubkeys: assignment.pubkeys.clone(),
anchor,
}),
},
},
];
// Until NIP-65 is in widespread use, we should listen to inbox
// of us on all these relays too
@ -353,7 +366,14 @@ impl Overlord {
reason: RelayConnectionReason::FetchInbox,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeInbox(anchor),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::InboxFeedFuture(anchor)),
},
});
jobs.push(RelayJob {
reason: RelayConnectionReason::FetchInbox,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::InboxFeedChunk(anchor)),
},
});
}
@ -1523,7 +1543,10 @@ impl Overlord {
target: relay_assignment.relay_url.as_str().to_owned(),
payload: ToMinionPayload {
job_id: 0,
detail: ToMinionPayloadDetail::TempSubscribeGeneralFeedChunk(anchor),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::GeneralFeedChunk {
pubkeys: relay_assignment.pubkeys.clone(),
anchor,
}),
},
});
}
@ -1537,7 +1560,9 @@ impl Overlord {
reason: RelayConnectionReason::FetchInbox,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribeInboxFeedChunk(anchor),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::InboxFeedChunk(
anchor,
)),
},
}],
);
@ -1553,10 +1578,10 @@ impl Overlord {
reason: RelayConnectionReason::SubscribePerson,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribePersonFeedChunk {
detail: ToMinionPayloadDetail::Subscribe(FilterSet::PersonFeedChunk {
pubkey,
anchor,
},
}),
},
}],
);
@ -1569,7 +1594,9 @@ impl Overlord {
reason: RelayConnectionReason::SubscribeGlobal,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeGlobalFeed(anchor),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::GlobalFeedChunk(
anchor,
)),
},
}],
);
@ -1973,7 +2000,7 @@ impl Overlord {
reason: RelayConnectionReason::FetchMetadata,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Metadata(pubkeys)),
},
}],
);
@ -2251,7 +2278,9 @@ impl Overlord {
reason: RelayConnectionReason::FetchDirectMessages,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeDmChannel(dmchannel.clone()),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::DmChannel(
dmchannel.clone(),
)),
},
}],
);
@ -2263,13 +2292,26 @@ impl Overlord {
let relay_urls = Relay::choose_relay_urls(Relay::GLOBAL, |_| true)?;
manager::run_jobs_on_all_relays(
relay_urls,
vec![RelayJob {
reason: RelayConnectionReason::SubscribeGlobal,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeGlobalFeed(anchor),
vec![
RelayJob {
reason: RelayConnectionReason::SubscribeGlobal,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::GlobalFeedFuture(
anchor,
)),
},
},
}],
RelayJob {
reason: RelayConnectionReason::SubscribeGlobal,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::GlobalFeedChunk(
anchor,
)),
},
},
],
);
Ok(())
@ -2279,13 +2321,28 @@ impl Overlord {
let relays: Vec<RelayUrl> = relay::get_some_pubkey_outboxes(pubkey)?;
manager::run_jobs_on_all_relays(
relays,
vec![RelayJob {
reason: RelayConnectionReason::SubscribePerson,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribePersonFeed(pubkey, anchor),
vec![
RelayJob {
reason: RelayConnectionReason::SubscribePerson,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::PersonFeedFuture {
pubkey,
anchor,
}),
},
},
}],
RelayJob {
reason: RelayConnectionReason::SubscribePerson,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::PersonFeedChunk {
pubkey,
anchor,
}),
},
},
],
);
Ok(())
@ -2434,6 +2491,10 @@ impl Overlord {
// Subscribe to replies to root
if let Some(ref root_eref) = ancestors.root {
let filter_set = match root_eref {
EventReference::Id { id, .. } => FilterSet::RepliesToId((*id).into()),
EventReference::Addr(naddr) => FilterSet::RepliesToAddr(naddr.clone()),
};
let relays = root_eref.copy_relays();
for url in relays.iter() {
// Subscribe root replies
@ -2441,7 +2502,7 @@ impl Overlord {
reason: RelayConnectionReason::ReadThread,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeRootReplies(root_eref.clone()),
detail: ToMinionPayloadDetail::Subscribe(filter_set.clone()),
},
}];
@ -2486,7 +2547,7 @@ impl Overlord {
reason: RelayConnectionReason::ReadThread,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeReplies(id.into()),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::RepliesToId(id.into())),
},
}];
@ -2553,7 +2614,7 @@ impl Overlord {
reason: RelayConnectionReason::Config,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeConfig,
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Config),
},
}],
);
@ -2597,7 +2658,7 @@ impl Overlord {
reason: RelayConnectionReason::Discovery,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeDiscover(pubkeys.clone()),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Discover(pubkeys.clone())),
},
}],
);
@ -2614,13 +2675,22 @@ impl Overlord {
};
manager::run_jobs_on_all_relays(
mention_relays,
vec![RelayJob {
reason: RelayConnectionReason::FetchInbox,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeInbox(now),
vec![
RelayJob {
reason: RelayConnectionReason::FetchInbox,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::InboxFeedFuture(now)),
},
},
}],
RelayJob {
reason: RelayConnectionReason::FetchInbox,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::InboxFeedChunk(now)),
},
},
],
);
Ok(())
@ -2642,7 +2712,9 @@ impl Overlord {
reason: RelayConnectionReason::Giftwraps,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeGiftwraps(after),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Giftwraps(
FeedRange::After { since: after },
)),
},
}],
);
@ -2658,7 +2730,7 @@ impl Overlord {
reason: RelayConnectionReason::NostrConnect,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeNip46,
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Nip46),
},
}],
);
@ -2704,7 +2776,7 @@ impl Overlord {
reason: RelayConnectionReason::FetchMetadata,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribeMetadata(vec![pubkey]),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Metadata(vec![pubkey])),
},
}],
);
@ -2741,7 +2813,7 @@ impl Overlord {
reason: RelayConnectionReason::FetchMetadata,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Metadata(pubkeys)),
},
}],
);
@ -3074,7 +3146,7 @@ impl Overlord {
reason: RelayConnectionReason::FetchAugments,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::SubscribeAugments(ids_hex),
detail: ToMinionPayloadDetail::Subscribe(FilterSet::Augments(ids_hex)),
},
}],
);