From 8911270edc2daa9754a0d406dd23098af16cf875 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Wed, 21 Dec 2022 10:05:55 +1300 Subject: [PATCH] Subscriptions --- src/overlord/minion/handle_websocket.rs | 22 ++++----- src/overlord/minion/mod.rs | 39 +++++++-------- src/overlord/minion/subscription.rs | 66 ++++++++++++++++++++++++- 3 files changed, 92 insertions(+), 35 deletions(-) diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index 9b4ca4d2..2f423047 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -2,7 +2,7 @@ use super::Minion; use crate::db::{DbEvent, DbPersonRelay}; use crate::{BusMessage, Error}; use nostr_proto::{Event, RelayMessage, Unixtime}; -use tracing::{error, info, warn}; +use tracing::{error, info, trace, warn}; impl Minion { pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> { @@ -15,10 +15,11 @@ impl Minion { maxtime.0 += 60 * 15; // 15 minutes into the future match relay_message { - RelayMessage::Event(_subid, event) => { + RelayMessage::Event(subid, event) => { if let Err(e) = event.verify(Some(maxtime)) { error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?) } else { + trace!("NEW EVENT ON {}", subid.0); DbEvent::save_nostr_event(&event, Some(self.url.clone())).await?; self.send_overlord_newevent(*event).await?; } @@ -32,18 +33,13 @@ impl Minion { DbPersonRelay::update_last_fetched(self.url.0.clone(), now).await?; // Update the matching subscription - match self.subscription_id_to_ourname.get(&subid.0) { - Some(ourname) => match self.subscriptions_by_ourname.get_mut(ourname) { - Some(sub) => { - sub.set_eose(); - info!("EOSE: {} {:?}", &self.url, subid); - } - None => { - warn!("EOSE for unknown subscription ourname={}", ourname); - } - }, + match self.subscriptions.get_mut_by_id(&subid.0) { + Some(sub) => { + sub.set_eose(); + info!("EOSE: {} {:?}", &self.url, subid); + } None => { - warn!("EOSE for unknown subsription: {} {:?}", &self.url, subid); + warn!("EOSE for unknown subscription: {} {:?}", &self.url, subid); } } } diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index d1c32437..6720d8ab 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -1,3 +1,7 @@ +mod handle_bus; +mod handle_websocket; +mod subscription; + use crate::comms::BusMessage; use crate::db::{DbPersonRelay, DbRelay}; use crate::error::Error; @@ -6,7 +10,7 @@ use crate::settings::Settings; use futures::{SinkExt, StreamExt}; use http::Uri; use nostr_proto::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url}; -use std::collections::HashMap; +use subscription::Subscriptions; use tokio::net::TcpStream; use tokio::select; use tokio::sync::broadcast::Receiver; @@ -15,11 +19,6 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use tracing::{debug, error, info, trace, warn}; use tungstenite::protocol::{Message as WsMessage, WebSocketConfig}; -mod handle_bus; -mod handle_websocket; -mod subscription; -use subscription::Subscription; - pub struct Minion { url: Url, pubkeys: Vec, @@ -29,8 +28,7 @@ pub struct Minion { dbrelay: DbRelay, nip11: Option, stream: Option>>, - subscriptions_by_ourname: HashMap, - subscription_id_to_ourname: HashMap, + subscriptions: Subscriptions, } impl Minion { @@ -56,8 +54,7 @@ impl Minion { dbrelay, nip11: None, stream: None, - subscriptions_by_ourname: HashMap::new(), - subscription_id_to_ourname: HashMap::new(), + subscriptions: Subscriptions::new(), }) } } @@ -237,14 +234,13 @@ impl Minion { let websocket_stream = self.stream.as_mut().unwrap(); if self.pubkeys.is_empty() { - if let Some(sub) = self.subscriptions_by_ourname.get("following") { + if let Some(sub) = self.subscriptions.get("following") { // Close the subscription let wire = serde_json::to_string(&sub.close_message())?; websocket_stream.send(WsMessage::Text(wire.clone())).await?; // Remove the subscription from the map - self.subscription_id_to_ourname.remove(&sub.get_id()); - self.subscriptions_by_ourname.remove("following"); + self.subscriptions.remove("following"); } // Since pubkeys is empty, nothing to subscribe to. @@ -300,21 +296,22 @@ impl Minion { ); // Get the subscription - let sub = self - .subscriptions_by_ourname - .entry("following".to_string()) - .or_insert_with(Subscription::new); - // Write our filters into it - { + let req_message = if self.subscriptions.has("following") { + let sub = self.subscriptions.get_mut("following").unwrap(); let vec: &mut Vec = sub.get_mut(); vec.clear(); vec.push(feed_filter); vec.push(special_filter); - } + sub.req_message() + } else { + self.subscriptions + .add("following", vec![feed_filter, special_filter]); + self.subscriptions.get("following").unwrap().req_message() + }; // Subscribe (or resubscribe) to the subscription - let wire = serde_json::to_string(&sub.req_message())?; + let wire = serde_json::to_string(&req_message)?; websocket_stream.send(WsMessage::Text(wire.clone())).await?; trace!("Sent {}", &wire); diff --git a/src/overlord/minion/subscription.rs b/src/overlord/minion/subscription.rs index 379b139c..6bc5ce6a 100644 --- a/src/overlord/minion/subscription.rs +++ b/src/overlord/minion/subscription.rs @@ -1,6 +1,70 @@ use nostr_proto::{ClientMessage, Filters, SubscriptionId}; +use std::collections::HashMap; -#[derive(Debug)] +pub struct Subscriptions { + handle_to_id: HashMap, + by_id: HashMap, +} + +impl Subscriptions { + pub fn new() -> Subscriptions { + Subscriptions { + handle_to_id: HashMap::new(), + by_id: HashMap::new(), + } + } + + pub fn add(&mut self, handle: &str, filters: Vec) { + let mut sub = Subscription::new(); + sub.filters = filters; + self.handle_to_id.insert(handle.to_owned(), sub.get_id()); + self.by_id.insert(sub.get_id(), sub); + } + + pub fn has(&self, handle: &str) -> bool { + match self.handle_to_id.get(handle) { + None => false, + Some(id) => self.by_id.contains_key(id), + } + } + + pub fn get(&self, handle: &str) -> Option { + match self.handle_to_id.get(handle) { + None => None, + Some(id) => self.by_id.get(id).cloned(), + } + } + + #[allow(dead_code)] + pub fn get_by_id(&self, id: &str) -> Option { + self.by_id.get(id).cloned() + } + + pub fn get_mut(&mut self, handle: &str) -> Option<&mut Subscription> { + match self.handle_to_id.get(handle) { + None => None, + Some(id) => self.by_id.get_mut(id), + } + } + + pub fn get_mut_by_id(&mut self, id: &str) -> Option<&mut Subscription> { + self.by_id.get_mut(id) + } + + pub fn remove(&mut self, handle: &str) { + if let Some(id) = self.handle_to_id.get(handle) { + self.by_id.remove(id); + self.handle_to_id.remove(handle); + } + } + + #[allow(dead_code)] + pub fn remove_by_id(&mut self, id: &str) { + self.by_id.remove(id); + } +} + +#[derive(Clone, Debug)] pub struct Subscription { id: String, filters: Vec,