Subscriptions

This commit is contained in:
Mike Dilger 2022-12-21 10:05:55 +13:00
parent 319e6eb9bf
commit 8911270edc
3 changed files with 92 additions and 35 deletions

View File

@ -2,7 +2,7 @@ use super::Minion;
use crate::db::{DbEvent, DbPersonRelay};
use crate::{BusMessage, Error};
use nostr_proto::{Event, RelayMessage, Unixtime};
use tracing::{error, info, warn};
use tracing::{error, info, trace, warn};
impl Minion {
pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> {
@ -15,10 +15,11 @@ impl Minion {
maxtime.0 += 60 * 15; // 15 minutes into the future
match relay_message {
RelayMessage::Event(_subid, event) => {
RelayMessage::Event(subid, event) => {
if let Err(e) = event.verify(Some(maxtime)) {
error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?)
} else {
trace!("NEW EVENT ON {}", subid.0);
DbEvent::save_nostr_event(&event, Some(self.url.clone())).await?;
self.send_overlord_newevent(*event).await?;
}
@ -32,18 +33,13 @@ impl Minion {
DbPersonRelay::update_last_fetched(self.url.0.clone(), now).await?;
// Update the matching subscription
match self.subscription_id_to_ourname.get(&subid.0) {
Some(ourname) => match self.subscriptions_by_ourname.get_mut(ourname) {
match self.subscriptions.get_mut_by_id(&subid.0) {
Some(sub) => {
sub.set_eose();
info!("EOSE: {} {:?}", &self.url, subid);
}
None => {
warn!("EOSE for unknown subscription ourname={}", ourname);
}
},
None => {
warn!("EOSE for unknown subsription: {} {:?}", &self.url, subid);
warn!("EOSE for unknown subscription: {} {:?}", &self.url, subid);
}
}
}

View File

@ -1,3 +1,7 @@
mod handle_bus;
mod handle_websocket;
mod subscription;
use crate::comms::BusMessage;
use crate::db::{DbPersonRelay, DbRelay};
use crate::error::Error;
@ -6,7 +10,7 @@ use crate::settings::Settings;
use futures::{SinkExt, StreamExt};
use http::Uri;
use nostr_proto::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url};
use std::collections::HashMap;
use subscription::Subscriptions;
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::broadcast::Receiver;
@ -15,11 +19,6 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, trace, warn};
use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};
mod handle_bus;
mod handle_websocket;
mod subscription;
use subscription::Subscription;
pub struct Minion {
url: Url,
pubkeys: Vec<PublicKeyHex>,
@ -29,8 +28,7 @@ pub struct Minion {
dbrelay: DbRelay,
nip11: Option<RelayInformationDocument>,
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
subscriptions_by_ourname: HashMap<String, Subscription>,
subscription_id_to_ourname: HashMap<String, String>,
subscriptions: Subscriptions,
}
impl Minion {
@ -56,8 +54,7 @@ impl Minion {
dbrelay,
nip11: None,
stream: None,
subscriptions_by_ourname: HashMap::new(),
subscription_id_to_ourname: HashMap::new(),
subscriptions: Subscriptions::new(),
})
}
}
@ -237,14 +234,13 @@ impl Minion {
let websocket_stream = self.stream.as_mut().unwrap();
if self.pubkeys.is_empty() {
if let Some(sub) = self.subscriptions_by_ourname.get("following") {
if let Some(sub) = self.subscriptions.get("following") {
// Close the subscription
let wire = serde_json::to_string(&sub.close_message())?;
websocket_stream.send(WsMessage::Text(wire.clone())).await?;
// Remove the subscription from the map
self.subscription_id_to_ourname.remove(&sub.get_id());
self.subscriptions_by_ourname.remove("following");
self.subscriptions.remove("following");
}
// Since pubkeys is empty, nothing to subscribe to.
@ -300,21 +296,22 @@ impl Minion {
);
// Get the subscription
let sub = self
.subscriptions_by_ourname
.entry("following".to_string())
.or_insert_with(Subscription::new);
// Write our filters into it
{
let req_message = if self.subscriptions.has("following") {
let sub = self.subscriptions.get_mut("following").unwrap();
let vec: &mut Vec<Filters> = sub.get_mut();
vec.clear();
vec.push(feed_filter);
vec.push(special_filter);
}
sub.req_message()
} else {
self.subscriptions
.add("following", vec![feed_filter, special_filter]);
self.subscriptions.get("following").unwrap().req_message()
};
// Subscribe (or resubscribe) to the subscription
let wire = serde_json::to_string(&sub.req_message())?;
let wire = serde_json::to_string(&req_message)?;
websocket_stream.send(WsMessage::Text(wire.clone())).await?;
trace!("Sent {}", &wire);

View File

@ -1,6 +1,70 @@
use nostr_proto::{ClientMessage, Filters, SubscriptionId};
use std::collections::HashMap;
#[derive(Debug)]
pub struct Subscriptions {
handle_to_id: HashMap<String, String>,
by_id: HashMap<String, Subscription>,
}
impl Subscriptions {
pub fn new() -> Subscriptions {
Subscriptions {
handle_to_id: HashMap::new(),
by_id: HashMap::new(),
}
}
pub fn add(&mut self, handle: &str, filters: Vec<Filters>) {
let mut sub = Subscription::new();
sub.filters = filters;
self.handle_to_id.insert(handle.to_owned(), sub.get_id());
self.by_id.insert(sub.get_id(), sub);
}
pub fn has(&self, handle: &str) -> bool {
match self.handle_to_id.get(handle) {
None => false,
Some(id) => self.by_id.contains_key(id),
}
}
pub fn get(&self, handle: &str) -> Option<Subscription> {
match self.handle_to_id.get(handle) {
None => None,
Some(id) => self.by_id.get(id).cloned(),
}
}
#[allow(dead_code)]
pub fn get_by_id(&self, id: &str) -> Option<Subscription> {
self.by_id.get(id).cloned()
}
pub fn get_mut(&mut self, handle: &str) -> Option<&mut Subscription> {
match self.handle_to_id.get(handle) {
None => None,
Some(id) => self.by_id.get_mut(id),
}
}
pub fn get_mut_by_id(&mut self, id: &str) -> Option<&mut Subscription> {
self.by_id.get_mut(id)
}
pub fn remove(&mut self, handle: &str) {
if let Some(id) = self.handle_to_id.get(handle) {
self.by_id.remove(id);
self.handle_to_id.remove(handle);
}
}
#[allow(dead_code)]
pub fn remove_by_id(&mut self, id: &str) {
self.by_id.remove(id);
}
}
#[derive(Clone, Debug)]
pub struct Subscription {
id: String,
filters: Vec<Filters>,