From f9b54572e32e5b8dc0c8b28c4ecab6c51f9cad15 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Wed, 21 Dec 2022 08:23:30 +1300 Subject: [PATCH] Bring in minions --- Cargo.lock | 8 +- src/globals.rs | 3 +- src/overlord/minion/handle_bus.rs | 13 + src/overlord/minion/handle_websocket.rs | 62 +++++ src/overlord/minion/mod.rs | 321 ++++++++++++++++++++++++ src/overlord/minion/subscription.rs | 43 ++++ src/overlord/mod.rs | 68 ++++- 7 files changed, 509 insertions(+), 9 deletions(-) create mode 100644 src/overlord/minion/handle_bus.rs create mode 100644 src/overlord/minion/handle_websocket.rs create mode 100644 src/overlord/minion/mod.rs create mode 100644 src/overlord/minion/subscription.rs diff --git a/Cargo.lock b/Cargo.lock index 32f48935..7de4cf8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1705,9 +1705,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.19" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" dependencies = [ "libc", ] @@ -2309,9 +2309,9 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" dependencies = [ "hermit-abi", "libc", diff --git a/src/globals.rs b/src/globals.rs index 2979f972..122a310f 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -268,8 +268,7 @@ async fn save_person(pubkey: PublicKey) -> Result<(), Error> { Ok(()) } -#[allow(dead_code)] -async fn followed_pubkeys() -> Vec { +pub async fn followed_pubkeys() -> Vec { let people = GLOBALS.people.lock().await; people .iter() diff --git a/src/overlord/minion/handle_bus.rs b/src/overlord/minion/handle_bus.rs new file mode 100644 index 00000000..d7e99447 --- /dev/null +++ b/src/overlord/minion/handle_bus.rs @@ -0,0 +1,13 @@ +use super::Minion; +use crate::{BusMessage, Error}; +use tracing::warn; + +impl Minion { + pub(super) async fn handle_bus_message(&self, bus_message: BusMessage) -> Result<(), Error> { + warn!( + "Websocket task got message, unimplemented: {}", + bus_message.kind + ); + Ok(()) + } +} diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs new file mode 100644 index 00000000..eb9a9fbe --- /dev/null +++ b/src/overlord/minion/handle_websocket.rs @@ -0,0 +1,62 @@ +use super::Minion; +use crate::db::{DbEvent, DbPersonRelay}; +use crate::{BusMessage, Error}; +use nostr_proto::{Event, RelayMessage, Unixtime}; +use tracing::{error, info, warn}; + +impl Minion { + pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> { + // TODO: pull out the raw event without any deserialization to be sure we don't mangle + // it. + + let relay_message: RelayMessage = serde_json::from_str(&ws_message)?; + + let mut maxtime = Unixtime::now()?; + maxtime.0 += 60 * 15; // 15 minutes into the future + + match relay_message { + RelayMessage::Event(_subid, event) => { + if let Err(e) = event.verify(Some(maxtime)) { + error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?) + } else { + DbEvent::save_nostr_event(&event, Some(self.url.clone())).await?; + self.send_overlord_newevent(*event).await?; + } + } + RelayMessage::Notice(msg) => { + info!("NOTICE: {} {}", &self.url, msg); + } + RelayMessage::Eose(subid) => { + // We should update last_fetched + let now = Unixtime::now().unwrap().0 as u64; + DbPersonRelay::update_last_fetched(self.url.0.clone(), now).await?; + + // Update the matching subscription + match self.subscriptions.get_mut(&subid.0) { + Some(sub) => { + sub.set_eose(); + info!("EOSE: {} {:?}", &self.url, subid); + } + None => { + warn!("EOSE for unknown subsription: {} {:?}", &self.url, subid); + } + } + } + RelayMessage::Ok(id, ok, ok_message) => { + // These don't have to be processed. + info!("OK: {} {:?} {} {}", &self.url, id, ok, ok_message); + } + } + + Ok(()) + } + + async fn send_overlord_newevent(&self, event: Event) -> Result<(), Error> { + self.to_overlord.send(BusMessage { + target: "overlord".to_string(), + kind: "new_event".to_string(), + json_payload: serde_json::to_string(&event)?, + })?; + Ok(()) + } +} diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs new file mode 100644 index 00000000..0478550d --- /dev/null +++ b/src/overlord/minion/mod.rs @@ -0,0 +1,321 @@ +use crate::comms::BusMessage; +use crate::db::{DbPersonRelay, DbRelay}; +use crate::error::Error; +use crate::globals::GLOBALS; +use crate::settings::Settings; +use futures::{SinkExt, StreamExt}; +use http::Uri; +use nostr_proto::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url}; +use std::collections::HashMap; +use tokio::net::TcpStream; +use tokio::select; +use tokio::sync::broadcast::Receiver; +use tokio::sync::mpsc::UnboundedSender; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tracing::{debug, error, info, trace, warn}; +use tungstenite::protocol::{Message as WsMessage, WebSocketConfig}; + +mod handle_bus; +mod handle_websocket; +mod subscription; +use subscription::Subscription; + +pub struct Minion { + url: Url, + pubkeys: Vec, + to_overlord: UnboundedSender, + from_overlord: Receiver, + settings: Settings, + dbrelay: DbRelay, + nip11: Option, + stream: Option>>, + subscriptions: HashMap, +} + +impl Minion { + pub async fn new(url: Url, pubkeys: Vec) -> Result { + let to_overlord = GLOBALS.to_overlord.clone(); + let from_overlord = GLOBALS.to_minions.subscribe(); + let settings = Settings::load().await?; + let dbrelay = match DbRelay::fetch_one(&url).await? { + Some(dbrelay) => dbrelay, + None => { + let dbrelay = DbRelay::new(url.0.clone()); + DbRelay::insert(dbrelay.clone()).await?; + dbrelay + } + }; + + Ok(Minion { + url, + pubkeys, + to_overlord, + from_overlord, + settings, + dbrelay, + nip11: None, + stream: None, + subscriptions: HashMap::new(), + }) + } +} + +impl Minion { + pub async fn handle(&mut self) { + // Catch errors, Return nothing. + if let Err(e) = self.handle_inner().await { + error!("ERROR handling {}: {}", &self.url, e); + } + + // Bump the failure count for the relay. + self.dbrelay.failure_count += 1; + if let Err(e) = DbRelay::update(self.dbrelay.clone()).await { + error!("ERROR bumping relay failure count {}: {}", &self.url, e); + } + + debug!("Minion exiting: {}", self.url); + } + + async fn handle_inner(&mut self) -> Result<(), Error> { + info!("Task started to handle relay at {}", &self.url); + + // Connect to the relay + let websocket_stream = { + let uri: http::Uri = self.url.0.parse::()?; + let authority = uri.authority().ok_or(Error::UrlHasNoHostname)?.as_str(); + let host = authority + .find('@') + .map(|idx| authority.split_at(idx + 1).1) + .unwrap_or_else(|| authority); + if host.is_empty() { + return Err(Error::UrlHasEmptyHostname); + } + + // Read NIP-11 information + if let Ok(response) = reqwest::Client::new() + .get(&format!("https://{}", host)) + .header("Host", host) + .header("Accept", "application/nostr+json") + .send() + .await + { + match response.json::().await { + Ok(nip11) => { + info!("{:?}", &nip11); + self.nip11 = Some(nip11); + } + Err(e) => { + error!("Unable to parse response as NIP-11 {}", e); + } + } + } + + let key: [u8; 16] = rand::random(); + + let req = http::request::Request::builder() + .method("GET") + .header("Host", host) + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Key", base64::encode(key)) + .uri(uri) + .body(())?; + + let config: WebSocketConfig = WebSocketConfig { + max_send_queue: None, + max_message_size: Some(1024 * 1024 * 16), // their default is 64 MiB, I choose 16 MiB + max_frame_size: Some(1024 * 1024 * 16), // their default is 16 MiB. + accept_unmasked_frames: true, // default is false which is the standard + }; + + let (websocket_stream, _response) = + tokio_tungstenite::connect_async_with_config(req, Some(config)).await?; + info!("Connected to {}", &self.url); + + websocket_stream + }; + + self.stream = Some(websocket_stream); + + // Bump the success count for the relay + { + self.dbrelay.success_count += 1; + DbRelay::update(self.dbrelay.clone()).await?; + } + + // Subscribe to the people we follow + if !self.pubkeys.is_empty() { + self.update_following_subscription().await?; + } + + // Tell the overlord we are ready to receive commands + self.tell_overlord_we_are_ready().await?; + + 'relayloop: loop { + match self.loop_handler().await { + Ok(keepgoing) => { + if !keepgoing { + break 'relayloop; + } + } + Err(e) => { + // Log them and keep going + error!("{}", e); + } + } + } + + Ok(()) + } + + async fn loop_handler(&mut self) -> Result { + let mut keepgoing: bool = true; + + let ws_stream = self.stream.as_mut().unwrap(); + + select! { + ws_message = ws_stream.next() => { + let ws_message = match ws_message { + Some(m) => m, + None => return Ok(false), // probably connection reset + }?; + + trace!("Handling message from {}", &self.url); + match ws_message { + WsMessage::Text(t) => { + self.handle_nostr_message(t).await?; + // FIXME: some errors we should probably bail on. + // For now, try to continue. + }, + WsMessage::Binary(_) => warn!("Unexpected binary message"), + WsMessage::Ping(x) => ws_stream.send(WsMessage::Pong(x)).await?, + WsMessage::Pong(_) => warn!("Unexpected pong message"), + WsMessage::Close(_) => keepgoing = false, + WsMessage::Frame(_) => warn!("Unexpected frame message"), + } + }, + bus_message = self.from_overlord.recv() => { + let bus_message = match bus_message { + Ok(bm) => bm, + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + return Ok(false); + }, + Err(e) => return Err(e.into()) + }; + if bus_message.target == self.url.0 { + self.handle_bus_message(bus_message).await?; + } else if &*bus_message.target == "all" { + if &*bus_message.kind == "shutdown" { + info!("Websocket listener {} shutting down", &self.url); + keepgoing = false; + } else if &*bus_message.kind == "settings_changed" { + self.settings = serde_json::from_str(&bus_message.json_payload)?; + } + } + }, + } + + Ok(keepgoing) + } + + async fn tell_overlord_we_are_ready(&self) -> Result<(), Error> { + self.to_overlord.send(BusMessage { + target: "overlord".to_string(), + kind: "minion_is_ready".to_string(), + json_payload: "".to_owned(), + })?; + + Ok(()) + } + + // This updates a subscription named "following" which watches for events + // from the people we follow. + async fn update_following_subscription(&mut self) -> Result<(), Error> { + let websocket_stream = self.stream.as_mut().unwrap(); + + if self.pubkeys.is_empty() { + if let Some(sub) = self.subscriptions.get("following") { + // Close the subscription + let wire = serde_json::to_string(&sub.close_message())?; + websocket_stream.send(WsMessage::Text(wire.clone())).await?; + + // Remove the subscription from the map + self.subscriptions.remove("following"); + } + + // Since pubkeys is empty, nothing to subscribe to. + return Ok(()); + } + + // Compute how far to look back + let (feed_since, special_since) = { + // Find the oldest 'last_fetched' among the 'person_relay' table. + // Null values will come through as 0. + let mut special_since: i64 = + DbPersonRelay::fetch_oldest_last_fetched(&self.pubkeys, &self.url.0).await? as i64; + + // Subtract overlap to avoid gaps due to clock sync and event + // propogation delay + special_since -= self.settings.overlap as i64; + + // For feed related events, don't look back more than one feed_chunk ago + let one_feedchunk_ago = Unixtime::now().unwrap().0 - self.settings.feed_chunk as i64; + let feed_since = special_since.max(one_feedchunk_ago); + + (Unixtime(feed_since), Unixtime(special_since)) + }; + + // Create the author filter + let mut feed_filter: Filters = Filters::new(); + for pk in self.pubkeys.iter() { + feed_filter.add_author(pk, None); + } + feed_filter.add_event_kind(EventKind::TextNote); + feed_filter.add_event_kind(EventKind::Reaction); + feed_filter.add_event_kind(EventKind::EventDeletion); + feed_filter.since = Some(feed_since); + debug!( + "Feed Filter {}: {}", + &self.url, + serde_json::to_string(&feed_filter)? + ); + + // Create the lookback filter + let mut special_filter: Filters = Filters::new(); + for pk in self.pubkeys.iter() { + special_filter.add_author(pk, None); + } + special_filter.add_event_kind(EventKind::Metadata); + //special_filter.add_event_kind(EventKind::RecommendRelay); + //special_filter.add_event_kind(EventKind::ContactList); + special_filter.since = Some(special_since); + debug!( + "Special Filter {}: {}", + &self.url, + serde_json::to_string(&special_filter)? + ); + + // Get the subscription + let sub = self + .subscriptions + .entry("following".to_string()) + .or_insert_with(|| Subscription::new("following".to_string())); + + // Write our filters into it + { + let vec: &mut Vec = sub.get_mut(); + vec.clear(); + vec.push(feed_filter); + vec.push(special_filter); + } + + // Subscribe (or resubscribe) to the subscription + let wire = serde_json::to_string(&sub.req_message())?; + websocket_stream.send(WsMessage::Text(wire.clone())).await?; + + trace!("Sent {}", &wire); + + Ok(()) + } +} diff --git a/src/overlord/minion/subscription.rs b/src/overlord/minion/subscription.rs new file mode 100644 index 00000000..695a1b48 --- /dev/null +++ b/src/overlord/minion/subscription.rs @@ -0,0 +1,43 @@ +use nostr_proto::{ClientMessage, Filters, SubscriptionId}; + +#[derive(Debug)] +pub struct Subscription { + id: String, + filters: Vec, + eose: bool, +} + +impl Subscription { + pub fn new(id: String) -> Subscription { + Subscription { + id, + filters: vec![], + eose: false, + } + } + + pub fn get_id(&self) -> String { + self.id.clone() + } + + pub fn get_mut(&mut self) -> &mut Vec { + &mut self.filters + } + + pub fn set_eose(&mut self) { + self.eose = true; + } + + #[allow(dead_code)] + pub fn eose(&self) -> bool { + self.eose + } + + pub fn req_message(&self) -> ClientMessage { + ClientMessage::Req(SubscriptionId(self.get_id()), self.filters.clone()) + } + + pub fn close_message(&self) -> ClientMessage { + ClientMessage::Close(SubscriptionId(self.get_id())) + } +} diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 6f8bba6c..451283e1 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -1,14 +1,18 @@ +mod minion; mod relay_picker; use crate::comms::BusMessage; -use crate::db::{DbEvent, DbPerson, DbRelay}; +use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay}; use crate::error::Error; use crate::globals::GLOBALS; use crate::settings::Settings; -use nostr_proto::{Event, Unixtime}; -use tokio::select; +use minion::Minion; +use nostr_proto::{Event, PublicKey, PublicKeyHex, Unixtime, Url}; +use relay_picker::{BestRelay, RelayPicker}; +use std::collections::HashMap; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::UnboundedReceiver; +use tokio::{select, task}; use tracing::{error, info}; pub struct Overlord { @@ -16,6 +20,8 @@ pub struct Overlord { to_minions: Sender, #[allow(dead_code)] from_minions: UnboundedReceiver, + minions: task::JoinSet<()>, + minions_task_url: HashMap, } impl Overlord { @@ -25,6 +31,8 @@ impl Overlord { settings: Settings::default(), to_minions, from_minions, + minions: task::JoinSet::new(), + minions_task_url: HashMap::new(), } } @@ -67,6 +75,15 @@ impl Overlord { // updated from events without necessarily updating our relays list) DbRelay::populate_new_relays().await?; + // Load people from the database + { + let mut dbpeople = DbPerson::fetch(None).await?; + for dbperson in dbpeople.drain(..) { + let pubkey = PublicKey::try_from(dbperson.pubkey.clone())?; + GLOBALS.people.lock().await.insert(pubkey, dbperson); + } + } + // Load feed-related events from database and process (TextNote, EventDeletion, Reaction) { let now = Unixtime::now().unwrap(); @@ -93,6 +110,41 @@ impl Overlord { info!("Loaded {} events from the database", count); } + // Pick Relays and start Minions + { + let pubkeys: Vec = crate::globals::followed_pubkeys().await; + + let mut relay_picker = RelayPicker { + relays: DbRelay::fetch(None).await?, + pubkeys: pubkeys.clone(), + person_relays: DbPersonRelay::fetch_for_pubkeys(&pubkeys).await?, + }; + let mut best_relay: BestRelay; + loop { + if relay_picker.is_degenerate() { + break; + } + + let (rd, rp) = relay_picker.best()?; + best_relay = rd; + relay_picker = rp; + + if best_relay.is_degenerate() { + break; + } + + // Fire off a minion to handle this relay + self.start_minion(best_relay.relay.url.clone(), best_relay.pubkeys.clone()) + .await?; + + info!( + "Picked relay {}, {} people left", + best_relay.relay.url, + relay_picker.pubkeys.len() + ); + } + } + 'mainloop: loop { match self.loop_handler().await { Ok(keepgoing) => { @@ -110,6 +162,16 @@ impl Overlord { Ok(()) } + async fn start_minion(&mut self, url: String, pubkeys: Vec) -> Result<(), Error> { + let moved_url = Url(url.clone()); + let mut minion = Minion::new(moved_url, pubkeys).await?; + let abort_handle = self.minions.spawn(async move { minion.handle().await }); + let id = abort_handle.id(); + self.minions_task_url.insert(id, Url(url)); + + Ok(()) + } + #[allow(unused_assignments)] async fn loop_handler(&mut self) -> Result { let mut keepgoing: bool = true;