generic Subscribe / Unsubscribe minion messages

This commit is contained in:
Mike Dilger 2024-08-13 14:52:38 +12:00
parent 98d715b8fc
commit ddf3f6f4e4
2 changed files with 38 additions and 1 deletions

View File

@ -1,4 +1,5 @@
use crate::dm_channel::DmChannel;
use crate::filter_set::FilterSet;
use crate::misc::Private;
use crate::nip46::{Approval, ParsedCommand};
use crate::people::PersonList;
@ -262,6 +263,8 @@ pub(crate) enum ToMinionPayloadDetail {
FetchNAddr(NAddr),
PostEvents(Vec<Event>),
Shutdown,
Subscribe(FilterSet),
Unsubscribe(FilterSet),
SubscribeAugments(Vec<IdHex>),
SubscribeConfig,
SubscribeDiscover(Vec<PublicKey>),

View File

@ -7,7 +7,7 @@ mod subscription_map;
use crate::comms::{ToMinionMessage, ToMinionPayload, ToMinionPayloadDetail, ToOverlordMessage};
use crate::dm_channel::DmChannel;
use crate::error::{Error, ErrorKind};
use crate::filter_set::FeedRange;
use crate::filter_set::{FeedRange, FilterSet};
use crate::globals::GLOBALS;
use crate::relay::Relay;
use crate::{RunState, USER_AGENT};
@ -618,6 +618,40 @@ impl Minion {
tracing::debug!("{}: Websocket listener shutting down", &self.url);
self.exiting = Some(MinionExitReason::GotShutdownMessage);
}
ToMinionPayloadDetail::Subscribe(filter_set) => {
let handle = filter_set.handle(message.job_id);
// Remember general_feed_keys for chunk updates
if let FilterSet::GeneralFeedFuture { pubkeys, .. } = &filter_set {
self.general_feed_keys = pubkeys.clone();
}
// If we aren't running it already, OR if it can have duplicates
if !self.subscription_map.has(&handle) || filter_set.can_have_duplicates() {
let spamsafe = self.dbrelay.has_usage_bits(Relay::SPAMSAFE);
let filters = filter_set.filters(spamsafe);
if !filters.is_empty() {
self.subscribe(filters, &handle, message.job_id).await?;
}
} else {
// It does not allow duplicates and we are already running it,
// but maybe we can save it for later...
if let FilterSet::Metadata(pubkeys) = filter_set {
// Save for later
self.subscriptions_waiting_for_metadata
.push((message.job_id, pubkeys));
}
}
}
ToMinionPayloadDetail::Unsubscribe(filter_set) => {
let handles = self
.subscription_map
.get_all_handles_matching(filter_set.inner_handle());
for handle in handles {
self.unsubscribe(&handle).await?;
}
}
ToMinionPayloadDetail::SubscribeAugments(ids) => {
self.subscribe_augments(message.job_id, ids).await?;
}