diff --git a/src/db/event.rs b/src/db/event.rs index dbe28c9e..74e46511 100644 --- a/src/db/event.rs +++ b/src/db/event.rs @@ -51,6 +51,42 @@ impl DbEvent { output } + pub async fn fetch_by_ids(ids: Vec) -> Result, Error> { + let sql = format!( + "SELECT id, raw, pubkey, created_at, kind, content, ots FROM event WHERE id IN ({})", + repeat_vars(ids.len()) + ); + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let id_strings: Vec = ids.iter().map(|p| p.0.clone()).collect(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map(rusqlite::params_from_iter(id_strings), |row| { + Ok(DbEvent { + id: IdHex(row.get(0)?), + raw: row.get(1)?, + pubkey: PublicKeyHex(row.get(2)?), + created_at: row.get(3)?, + kind: row.get(4)?, + content: row.get(5)?, + ots: row.get(6)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + pub async fn fetch_latest_metadata() -> Result, Error> { // THIS SQL MIGHT WORK, NEEDS REVIEW let sql = "SELECT id, LAST_VALUE(raw) OVER (ORDER BY created_at desc) as last_raw, pubkey, LAST_VALUE(created_at) OVER (ORDER BY created_at desc), kind, content, ots FROM event WHERE kind=0".to_owned(); @@ -140,3 +176,11 @@ impl DbEvent { .await? } } + +fn repeat_vars(count: usize) -> String { + assert_ne!(count, 0); + let mut s = "?,".repeat(count); + // Remove trailing comma + s.pop(); + s +} diff --git a/src/globals.rs b/src/globals.rs index cb8cad70..e09761ea 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -1,13 +1,15 @@ use crate::comms::BusMessage; -use crate::db::{DbPerson, DbPersonRelay, DbRelay}; +use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay}; +use crate::error::Error; use crate::relationship::Relationship; use crate::settings::Settings; use async_recursion::async_recursion; -use nostr_types::{Event, EventKind, Id, PublicKey, PublicKeyHex, Unixtime, Url}; +use nostr_types::{Event, EventKind, Id, IdHex, PublicKey, PublicKeyHex, Unixtime, Url}; use rusqlite::Connection; use std::collections::HashMap; use std::sync::atomic::AtomicBool; use tokio::sync::{broadcast, mpsc, Mutex}; +use tracing::info; /// Only one of these is ever created, via lazy_static!, and represents /// global state for the rust application @@ -136,6 +138,86 @@ impl Globals { .or_insert_with(|| if let Some(u) = url { vec![u] } else { vec![] }); } + async fn get_desired_events_prelude() -> Result<(), Error> { + // Strip out Ids of events that we already have + { + // danger - two locks could lead to deadlock, check other code locking these + let mut desired_events = GLOBALS.desired_events.lock().await; + let events = GLOBALS.events.lock().await; + desired_events.retain(|&id, _| !events.contains_key(&id)); + } + + // Load from database + { + let ids: Vec = GLOBALS + .desired_events + .lock() + .await + .iter() + .map(|(id, _)| Into::::into(*id)) + .collect(); + let db_events = DbEvent::fetch_by_ids(ids).await?; + let mut events: Vec = Vec::with_capacity(db_events.len()); + for dbevent in db_events.iter() { + let e = serde_json::from_str(&dbevent.raw)?; + events.push(e); + } + let mut count = 0; + for event in events.iter() { + count += 1; + crate::process::process_new_event(event, false, None).await?; + } + info!("Loaded {} desired events from the database", count); + } + + // Strip out Ids of events that we already have (again, we just loaded from db) + { + // danger - two locks could lead to deadlock, check other code locking these + let mut desired_events = GLOBALS.desired_events.lock().await; + let events = GLOBALS.events.lock().await; + desired_events.retain(|&id, _| !events.contains_key(&id)); + } + + Ok(()) + } + + pub async fn get_desired_events() -> Result<(HashMap>, Vec), Error> { + Globals::get_desired_events_prelude().await?; + + let desired_events = GLOBALS.desired_events.lock().await; + let mut output: HashMap> = HashMap::new(); + let mut orphans: Vec = Vec::new(); + for (id, vec_url) in desired_events.iter() { + if vec_url.is_empty() { + orphans.push(*id); + } else { + for url in vec_url.iter() { + output + .entry(url.to_owned()) + .and_modify(|vec| vec.push(*id)) + .or_insert_with(|| vec![*id]); + } + } + } + + Ok((output, orphans)) + } + + #[allow(dead_code)] + pub async fn get_desired_events_for_url(url: Url) -> Result, Error> { + Globals::get_desired_events_prelude().await?; + + let desired_events = GLOBALS.desired_events.lock().await; + let mut output: Vec = Vec::new(); + for (id, vec_url) in desired_events.iter() { + if vec_url.is_empty() || vec_url.contains(&url) { + output.push(*id); + } + } + + Ok(output) + } + pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) { let r = (related, relationship); let mut relationships = GLOBALS.relationships.lock().await; diff --git a/src/overlord/minion/handle_bus.rs b/src/overlord/minion/handle_bus.rs index f66109ae..68487c3d 100644 --- a/src/overlord/minion/handle_bus.rs +++ b/src/overlord/minion/handle_bus.rs @@ -1,6 +1,6 @@ use super::Minion; use crate::{BusMessage, Error}; -use nostr_types::PublicKeyHex; +use nostr_types::{IdHex, PublicKeyHex}; use tracing::warn; impl Minion { @@ -13,12 +13,17 @@ impl Minion { let v: Vec = serde_json::from_str(&bus_message.json_payload)?; self.upsert_following(v).await?; } - "fetch_events" => {} - "follow_event_reactions" => {} + "fetch_events" => { + let v: Vec = serde_json::from_str(&bus_message.json_payload)?; + self.get_events(v).await?; + } + "follow_event_reactions" => { + warn!("{}: follow event reactions unimplemented", &self.url); + } _ => { warn!( - "Unrecognized bus message kind received by minion: {}", - bus_message.kind + "{} Unrecognized bus message kind received by minion: {}", + &self.url, bus_message.kind ); } } diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index e0e848da..3a3efb66 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -1,7 +1,9 @@ use super::Minion; use crate::Error; +use futures::SinkExt; use nostr_types::{RelayMessage, Unixtime}; use tracing::{debug, error, info, warn}; +use tungstenite::protocol::Message as WsMessage; impl Minion { pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> { @@ -16,35 +18,57 @@ impl Minion { match relay_message { RelayMessage::Event(subid, event) => { if let Err(e) = event.verify(Some(maxtime)) { - error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?) + error!( + "{}: VERIFY ERROR: {}, {}", + &self.url, + e, + serde_json::to_string(&event)? + ) } else { let handle = self .subscriptions .get_handle_by_id(&subid.0) .unwrap_or_else(|| "_".to_owned()); - debug!("NEW EVENT on {} [{}]", &self.url, handle); + debug!("{}: {}: NEW EVENT", &self.url, handle); crate::process::process_new_event(&event, true, Some(self.url.clone())).await?; } } RelayMessage::Notice(msg) => { - info!("NOTICE: {} {}", &self.url, msg); + info!("{}: NOTICE: {}", &self.url, msg); } RelayMessage::Eose(subid) => { + let handle = self + .subscriptions + .get_handle_by_id(&subid.0) + .unwrap_or_else(|| "_".to_owned()); + + let close: bool = &handle[0..5] == "event"; + // Update the matching subscription match self.subscriptions.get_mut_by_id(&subid.0) { Some(sub) => { - sub.set_eose(); - info!("EOSE: {} {:?}", &self.url, subid); + info!("{}: {}: EOSE: {:?}", &self.url, handle, subid); + if close { + let close_message = sub.close_message(); + let websocket_sink = self.sink.as_mut().unwrap(); + let wire = serde_json::to_string(&close_message)?; + websocket_sink.send(WsMessage::Text(wire.clone())).await?; + } else { + sub.set_eose(); + } } None => { - warn!("EOSE for unknown subscription: {} {:?}", &self.url, subid); + warn!( + "{}: {} EOSE for unknown subscription {:?}", + &self.url, handle, subid + ); } } } RelayMessage::Ok(id, ok, ok_message) => { // These don't have to be processed. - info!("OK: {} {:?} {} {}", &self.url, id, ok, ok_message); + info!("{}: OK: {:?} {} {}", &self.url, id, ok, ok_message); } } diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index ecfa1770..695761f4 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -65,20 +65,20 @@ impl Minion { pub async fn handle(&mut self) { // Catch errors, Return nothing. if let Err(e) = self.handle_inner().await { - error!("ERROR handling {}: {}", &self.url, e); + error!("{}: ERROR: {}", &self.url, e); // Bump the failure count for the relay. self.dbrelay.failure_count += 1; if let Err(e) = DbRelay::update(self.dbrelay.clone()).await { - error!("ERROR bumping relay failure count {}: {}", &self.url, e); + error!("{}: ERROR bumping relay failure count: {}", &self.url, e); } } - debug!("Minion exiting: {}", self.url); + debug!("{}: minion exiting", self.url); } async fn handle_inner(&mut self) -> Result<(), Error> { - info!("Task started to handle relay at {}", &self.url); + info!("{}: Minion handling started", &self.url); // Connect to the relay let websocket_stream = { @@ -104,11 +104,11 @@ impl Minion { { match response.json::().await { Ok(nip11) => { - info!("{:?}", &nip11); + info!("{}: {:?}", &self.url, &nip11); self.nip11 = Some(nip11); } Err(e) => { - error!("Unable to parse response as NIP-11 {}", e); + error!("{}: Unable to parse response as NIP-11: {}", &self.url, e); } } } @@ -137,7 +137,7 @@ impl Minion { tokio_tungstenite::connect_async_with_config(req, Some(config)), ) .await??; - info!("Connected to {}", &self.url); + info!("{}: Connected", &self.url); websocket_stream }; @@ -164,7 +164,7 @@ impl Minion { } Err(e) => { // Log them and keep going - error!("{}", e); + error!("{}: {}", &self.url, e); } } } @@ -192,7 +192,7 @@ impl Minion { None => return Ok(false), // probably connection reset }?; - trace!("Handling message from {}", &self.url); + trace!("{}: Handling message", &self.url); match ws_message { WsMessage::Text(t) => { // MAYBE FIXME, spawn a separate task here so that @@ -201,11 +201,11 @@ impl Minion { // FIXME: some errors we should probably bail on. // For now, try to continue. }, - WsMessage::Binary(_) => warn!("Unexpected binary message"), + WsMessage::Binary(_) => warn!("{}, Unexpected binary message", &self.url), WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?, WsMessage::Pong(_) => { }, // we just ignore pongs WsMessage::Close(_) => keepgoing = false, - WsMessage::Frame(_) => warn!("Unexpected frame message"), + WsMessage::Frame(_) => warn!("{}: Unexpected frame message", &self.url), } }, bus_message = self.from_overlord.recv() => { @@ -220,7 +220,7 @@ impl Minion { self.handle_bus_message(bus_message).await?; } else if &*bus_message.target == "all" { if &*bus_message.kind == "shutdown" { - info!("Websocket listener {} shutting down", &self.url); + info!("{}: Websocket listener shutting down", &self.url); keepgoing = false; } else if &*bus_message.kind == "settings_changed" { // TBD: possibly redo filters based on overlap, feed_chunk, etc. @@ -289,10 +289,11 @@ impl Minion { feed_filter.add_event_kind(EventKind::Reaction); feed_filter.add_event_kind(EventKind::EventDeletion); feed_filter.since = Some(feed_since); + debug!( - "Feed Filter {}: {}", + "{}: Feed Filter: {} authors", &self.url, - serde_json::to_string(&feed_filter)? + feed_filter.authors.len() ); // Create the lookback filter @@ -306,9 +307,9 @@ impl Minion { special_filter.add_event_kind(EventKind::RelaysList); special_filter.since = Some(special_since); debug!( - "Special Filter {}: {}", + "{}: Special Filter: {} authors", &self.url, - serde_json::to_string(&special_filter)? + special_filter.authors.len() ); // Get the subscription @@ -329,16 +330,41 @@ impl Minion { let wire = serde_json::to_string(&req_message)?; websocket_sink.send(WsMessage::Text(wire.clone())).await?; - trace!("Sent {}", &wire); + trace!("{}: Sent {}", &self.url, &wire); Ok(()) } #[allow(dead_code)] - async fn get_events(&mut self, _ids: Vec) -> Result<(), Error> { - // Create a new "events" subscription which closes on EOSE - // Use and bump next_events_subscription_id so they are independent subscriptions - unimplemented!() + async fn get_events(&mut self, ids: Vec) -> Result<(), Error> { + if ids.is_empty() { + return Ok(()); + } + + // create the filter + let mut filter = Filters::new(); + filter.ids = ids; + + debug!("{}: Event Filter: {} events", &self.url, filter.ids.len()); + + // create a handle for ourselves + let handle = format!("events{}", self.next_events_subscription_id); + self.next_events_subscription_id += 1; + + // save the subscription + self.subscriptions.add(&handle, vec![filter]); + + // get the request message + let req_message = self.subscriptions.get(&handle).unwrap().req_message(); + + // Subscribe on the relay + let websocket_sink = self.sink.as_mut().unwrap(); + let wire = serde_json::to_string(&req_message)?; + websocket_sink.send(WsMessage::Text(wire.clone())).await?; + + trace!("{}: Sent {}", &self.url, &wire); + + Ok(()) } #[allow(dead_code)] diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index dbb3ca5b..d9a05278 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -4,7 +4,7 @@ mod relay_picker; use crate::comms::BusMessage; use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay, DbSetting}; use crate::error::Error; -use crate::globals::GLOBALS; +use crate::globals::{Globals, GLOBALS}; use crate::settings::Settings; use minion::Minion; use nostr_types::{Event, PrivateKey, PublicKey, PublicKeyHex, Unixtime, Url}; @@ -179,6 +179,7 @@ impl Overlord { pubkeys: pubkeys.clone(), person_relays: DbPersonRelay::fetch_for_pubkeys(&pubkeys).await?, }; + let mut best_relay: BestRelay; loop { if relay_picker.is_degenerate() { @@ -194,17 +195,51 @@ impl Overlord { } // Fire off a minion to handle this relay - self.start_minion(best_relay.relay.url.clone(), best_relay.pubkeys.clone()) - .await?; + self.start_minion(best_relay.relay.url.clone()).await?; + + // Tell it to follow the chosen people + let _ = self.to_minions.send(BusMessage { + target: best_relay.relay.url.clone(), + kind: "set_followed_people".to_string(), + json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(), + }); info!( "Picked relay {}, {} people left", - best_relay.relay.url, + &best_relay.relay.url, relay_picker.pubkeys.len() ); } } + // Get desired events from relays + { + let (desired_events_map, desired_events_vec) = Globals::get_desired_events().await?; + + info!( + "Seeking {} events", + desired_events_map.len() + desired_events_vec.len() + ); + + for (url, mut ids) in desired_events_map { + // Add the orphans + ids.extend(&desired_events_vec); + + // If we don't have such a minion, start one + if !self.urls_watching.contains(&url) { + // Start a minion + self.start_minion(url.0.clone()).await?; + } + + // Tell it to get these events + let _ = self.to_minions.send(BusMessage { + target: url.0.clone(), + kind: "fetch_events".to_string(), + json_payload: serde_json::to_string(&ids).unwrap(), + }); + } + } + 'mainloop: loop { match self.loop_handler().await { Ok(keepgoing) => { @@ -222,20 +257,13 @@ impl Overlord { Ok(()) } - async fn start_minion(&mut self, url: String, pubkeys: Vec) -> Result<(), Error> { + async fn start_minion(&mut self, url: String) -> Result<(), Error> { let moved_url = Url(url.clone()); let mut minion = Minion::new(moved_url).await?; let abort_handle = self.minions.spawn(async move { minion.handle().await }); let id = abort_handle.id(); self.minions_task_url.insert(id, Url(url.clone())); self.urls_watching.push(Url(url.clone())); - - let _ = self.to_minions.send(BusMessage { - target: url.clone(), - kind: "set_followed_people".to_string(), - json_payload: serde_json::to_string(&pubkeys).unwrap(), - }); - Ok(()) } @@ -296,7 +324,7 @@ impl Overlord { let maybe_url = self.minions_task_url.get(&id); match maybe_url { Some(url) => { - warn!("Relay Task {} completed", &url); + info!("Relay Task {} completed", &url); // Remove from our urls_watching vec self.urls_watching.retain(|value| value != url);