Get desired events (from relays) at startup at least (and minion logging improvements)

This commit is contained in:
Mike Dilger 2022-12-26 13:20:19 +13:00
parent 28c2439989
commit ccf969f717
6 changed files with 257 additions and 48 deletions

View File

@ -51,6 +51,42 @@ impl DbEvent {
output output
} }
pub async fn fetch_by_ids(ids: Vec<IdHex>) -> Result<Vec<DbEvent>, Error> {
let sql = format!(
"SELECT id, raw, pubkey, created_at, kind, content, ots FROM event WHERE id IN ({})",
repeat_vars(ids.len())
);
let output: Result<Vec<DbEvent>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let id_strings: Vec<String> = ids.iter().map(|p| p.0.clone()).collect();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map(rusqlite::params_from_iter(id_strings), |row| {
Ok(DbEvent {
id: IdHex(row.get(0)?),
raw: row.get(1)?,
pubkey: PublicKeyHex(row.get(2)?),
created_at: row.get(3)?,
kind: row.get(4)?,
content: row.get(5)?,
ots: row.get(6)?,
})
})?;
let mut output: Vec<DbEvent> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
pub async fn fetch_latest_metadata() -> Result<Vec<DbEvent>, Error> { pub async fn fetch_latest_metadata() -> Result<Vec<DbEvent>, Error> {
// THIS SQL MIGHT WORK, NEEDS REVIEW // THIS SQL MIGHT WORK, NEEDS REVIEW
let sql = "SELECT id, LAST_VALUE(raw) OVER (ORDER BY created_at desc) as last_raw, pubkey, LAST_VALUE(created_at) OVER (ORDER BY created_at desc), kind, content, ots FROM event WHERE kind=0".to_owned(); let sql = "SELECT id, LAST_VALUE(raw) OVER (ORDER BY created_at desc) as last_raw, pubkey, LAST_VALUE(created_at) OVER (ORDER BY created_at desc), kind, content, ots FROM event WHERE kind=0".to_owned();
@ -140,3 +176,11 @@ impl DbEvent {
.await? .await?
} }
} }
fn repeat_vars(count: usize) -> String {
assert_ne!(count, 0);
let mut s = "?,".repeat(count);
// Remove trailing comma
s.pop();
s
}

View File

@ -1,13 +1,15 @@
use crate::comms::BusMessage; use crate::comms::BusMessage;
use crate::db::{DbPerson, DbPersonRelay, DbRelay}; use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay};
use crate::error::Error;
use crate::relationship::Relationship; use crate::relationship::Relationship;
use crate::settings::Settings; use crate::settings::Settings;
use async_recursion::async_recursion; use async_recursion::async_recursion;
use nostr_types::{Event, EventKind, Id, PublicKey, PublicKeyHex, Unixtime, Url}; use nostr_types::{Event, EventKind, Id, IdHex, PublicKey, PublicKeyHex, Unixtime, Url};
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::sync::{broadcast, mpsc, Mutex};
use tracing::info;
/// Only one of these is ever created, via lazy_static!, and represents /// Only one of these is ever created, via lazy_static!, and represents
/// global state for the rust application /// global state for the rust application
@ -136,6 +138,86 @@ impl Globals {
.or_insert_with(|| if let Some(u) = url { vec![u] } else { vec![] }); .or_insert_with(|| if let Some(u) = url { vec![u] } else { vec![] });
} }
async fn get_desired_events_prelude() -> Result<(), Error> {
// Strip out Ids of events that we already have
{
// danger - two locks could lead to deadlock, check other code locking these
let mut desired_events = GLOBALS.desired_events.lock().await;
let events = GLOBALS.events.lock().await;
desired_events.retain(|&id, _| !events.contains_key(&id));
}
// Load from database
{
let ids: Vec<IdHex> = GLOBALS
.desired_events
.lock()
.await
.iter()
.map(|(id, _)| Into::<IdHex>::into(*id))
.collect();
let db_events = DbEvent::fetch_by_ids(ids).await?;
let mut events: Vec<Event> = Vec::with_capacity(db_events.len());
for dbevent in db_events.iter() {
let e = serde_json::from_str(&dbevent.raw)?;
events.push(e);
}
let mut count = 0;
for event in events.iter() {
count += 1;
crate::process::process_new_event(event, false, None).await?;
}
info!("Loaded {} desired events from the database", count);
}
// Strip out Ids of events that we already have (again, we just loaded from db)
{
// danger - two locks could lead to deadlock, check other code locking these
let mut desired_events = GLOBALS.desired_events.lock().await;
let events = GLOBALS.events.lock().await;
desired_events.retain(|&id, _| !events.contains_key(&id));
}
Ok(())
}
pub async fn get_desired_events() -> Result<(HashMap<Url, Vec<Id>>, Vec<Id>), Error> {
Globals::get_desired_events_prelude().await?;
let desired_events = GLOBALS.desired_events.lock().await;
let mut output: HashMap<Url, Vec<Id>> = HashMap::new();
let mut orphans: Vec<Id> = Vec::new();
for (id, vec_url) in desired_events.iter() {
if vec_url.is_empty() {
orphans.push(*id);
} else {
for url in vec_url.iter() {
output
.entry(url.to_owned())
.and_modify(|vec| vec.push(*id))
.or_insert_with(|| vec![*id]);
}
}
}
Ok((output, orphans))
}
#[allow(dead_code)]
pub async fn get_desired_events_for_url(url: Url) -> Result<Vec<Id>, Error> {
Globals::get_desired_events_prelude().await?;
let desired_events = GLOBALS.desired_events.lock().await;
let mut output: Vec<Id> = Vec::new();
for (id, vec_url) in desired_events.iter() {
if vec_url.is_empty() || vec_url.contains(&url) {
output.push(*id);
}
}
Ok(output)
}
pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) { pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) {
let r = (related, relationship); let r = (related, relationship);
let mut relationships = GLOBALS.relationships.lock().await; let mut relationships = GLOBALS.relationships.lock().await;

View File

@ -1,6 +1,6 @@
use super::Minion; use super::Minion;
use crate::{BusMessage, Error}; use crate::{BusMessage, Error};
use nostr_types::PublicKeyHex; use nostr_types::{IdHex, PublicKeyHex};
use tracing::warn; use tracing::warn;
impl Minion { impl Minion {
@ -13,12 +13,17 @@ impl Minion {
let v: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?; let v: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?;
self.upsert_following(v).await?; self.upsert_following(v).await?;
} }
"fetch_events" => {} "fetch_events" => {
"follow_event_reactions" => {} let v: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?;
self.get_events(v).await?;
}
"follow_event_reactions" => {
warn!("{}: follow event reactions unimplemented", &self.url);
}
_ => { _ => {
warn!( warn!(
"Unrecognized bus message kind received by minion: {}", "{} Unrecognized bus message kind received by minion: {}",
bus_message.kind &self.url, bus_message.kind
); );
} }
} }

View File

@ -1,7 +1,9 @@
use super::Minion; use super::Minion;
use crate::Error; use crate::Error;
use futures::SinkExt;
use nostr_types::{RelayMessage, Unixtime}; use nostr_types::{RelayMessage, Unixtime};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use tungstenite::protocol::Message as WsMessage;
impl Minion { impl Minion {
pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> { pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> {
@ -16,35 +18,57 @@ impl Minion {
match relay_message { match relay_message {
RelayMessage::Event(subid, event) => { RelayMessage::Event(subid, event) => {
if let Err(e) = event.verify(Some(maxtime)) { if let Err(e) = event.verify(Some(maxtime)) {
error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?) error!(
"{}: VERIFY ERROR: {}, {}",
&self.url,
e,
serde_json::to_string(&event)?
)
} else { } else {
let handle = self let handle = self
.subscriptions .subscriptions
.get_handle_by_id(&subid.0) .get_handle_by_id(&subid.0)
.unwrap_or_else(|| "_".to_owned()); .unwrap_or_else(|| "_".to_owned());
debug!("NEW EVENT on {} [{}]", &self.url, handle); debug!("{}: {}: NEW EVENT", &self.url, handle);
crate::process::process_new_event(&event, true, Some(self.url.clone())).await?; crate::process::process_new_event(&event, true, Some(self.url.clone())).await?;
} }
} }
RelayMessage::Notice(msg) => { RelayMessage::Notice(msg) => {
info!("NOTICE: {} {}", &self.url, msg); info!("{}: NOTICE: {}", &self.url, msg);
} }
RelayMessage::Eose(subid) => { RelayMessage::Eose(subid) => {
let handle = self
.subscriptions
.get_handle_by_id(&subid.0)
.unwrap_or_else(|| "_".to_owned());
let close: bool = &handle[0..5] == "event";
// Update the matching subscription // Update the matching subscription
match self.subscriptions.get_mut_by_id(&subid.0) { match self.subscriptions.get_mut_by_id(&subid.0) {
Some(sub) => { Some(sub) => {
info!("{}: {}: EOSE: {:?}", &self.url, handle, subid);
if close {
let close_message = sub.close_message();
let websocket_sink = self.sink.as_mut().unwrap();
let wire = serde_json::to_string(&close_message)?;
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
} else {
sub.set_eose(); sub.set_eose();
info!("EOSE: {} {:?}", &self.url, subid); }
} }
None => { None => {
warn!("EOSE for unknown subscription: {} {:?}", &self.url, subid); warn!(
"{}: {} EOSE for unknown subscription {:?}",
&self.url, handle, subid
);
} }
} }
} }
RelayMessage::Ok(id, ok, ok_message) => { RelayMessage::Ok(id, ok, ok_message) => {
// These don't have to be processed. // These don't have to be processed.
info!("OK: {} {:?} {} {}", &self.url, id, ok, ok_message); info!("{}: OK: {:?} {} {}", &self.url, id, ok, ok_message);
} }
} }

View File

@ -65,20 +65,20 @@ impl Minion {
pub async fn handle(&mut self) { pub async fn handle(&mut self) {
// Catch errors, Return nothing. // Catch errors, Return nothing.
if let Err(e) = self.handle_inner().await { if let Err(e) = self.handle_inner().await {
error!("ERROR handling {}: {}", &self.url, e); error!("{}: ERROR: {}", &self.url, e);
// Bump the failure count for the relay. // Bump the failure count for the relay.
self.dbrelay.failure_count += 1; self.dbrelay.failure_count += 1;
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await { if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
error!("ERROR bumping relay failure count {}: {}", &self.url, e); error!("{}: ERROR bumping relay failure count: {}", &self.url, e);
} }
} }
debug!("Minion exiting: {}", self.url); debug!("{}: minion exiting", self.url);
} }
async fn handle_inner(&mut self) -> Result<(), Error> { async fn handle_inner(&mut self) -> Result<(), Error> {
info!("Task started to handle relay at {}", &self.url); info!("{}: Minion handling started", &self.url);
// Connect to the relay // Connect to the relay
let websocket_stream = { let websocket_stream = {
@ -104,11 +104,11 @@ impl Minion {
{ {
match response.json::<RelayInformationDocument>().await { match response.json::<RelayInformationDocument>().await {
Ok(nip11) => { Ok(nip11) => {
info!("{:?}", &nip11); info!("{}: {:?}", &self.url, &nip11);
self.nip11 = Some(nip11); self.nip11 = Some(nip11);
} }
Err(e) => { Err(e) => {
error!("Unable to parse response as NIP-11 {}", e); error!("{}: Unable to parse response as NIP-11: {}", &self.url, e);
} }
} }
} }
@ -137,7 +137,7 @@ impl Minion {
tokio_tungstenite::connect_async_with_config(req, Some(config)), tokio_tungstenite::connect_async_with_config(req, Some(config)),
) )
.await??; .await??;
info!("Connected to {}", &self.url); info!("{}: Connected", &self.url);
websocket_stream websocket_stream
}; };
@ -164,7 +164,7 @@ impl Minion {
} }
Err(e) => { Err(e) => {
// Log them and keep going // Log them and keep going
error!("{}", e); error!("{}: {}", &self.url, e);
} }
} }
} }
@ -192,7 +192,7 @@ impl Minion {
None => return Ok(false), // probably connection reset None => return Ok(false), // probably connection reset
}?; }?;
trace!("Handling message from {}", &self.url); trace!("{}: Handling message", &self.url);
match ws_message { match ws_message {
WsMessage::Text(t) => { WsMessage::Text(t) => {
// MAYBE FIXME, spawn a separate task here so that // MAYBE FIXME, spawn a separate task here so that
@ -201,11 +201,11 @@ impl Minion {
// FIXME: some errors we should probably bail on. // FIXME: some errors we should probably bail on.
// For now, try to continue. // For now, try to continue.
}, },
WsMessage::Binary(_) => warn!("Unexpected binary message"), WsMessage::Binary(_) => warn!("{}, Unexpected binary message", &self.url),
WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?, WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?,
WsMessage::Pong(_) => { }, // we just ignore pongs WsMessage::Pong(_) => { }, // we just ignore pongs
WsMessage::Close(_) => keepgoing = false, WsMessage::Close(_) => keepgoing = false,
WsMessage::Frame(_) => warn!("Unexpected frame message"), WsMessage::Frame(_) => warn!("{}: Unexpected frame message", &self.url),
} }
}, },
bus_message = self.from_overlord.recv() => { bus_message = self.from_overlord.recv() => {
@ -220,7 +220,7 @@ impl Minion {
self.handle_bus_message(bus_message).await?; self.handle_bus_message(bus_message).await?;
} else if &*bus_message.target == "all" { } else if &*bus_message.target == "all" {
if &*bus_message.kind == "shutdown" { if &*bus_message.kind == "shutdown" {
info!("Websocket listener {} shutting down", &self.url); info!("{}: Websocket listener shutting down", &self.url);
keepgoing = false; keepgoing = false;
} else if &*bus_message.kind == "settings_changed" { } else if &*bus_message.kind == "settings_changed" {
// TBD: possibly redo filters based on overlap, feed_chunk, etc. // TBD: possibly redo filters based on overlap, feed_chunk, etc.
@ -289,10 +289,11 @@ impl Minion {
feed_filter.add_event_kind(EventKind::Reaction); feed_filter.add_event_kind(EventKind::Reaction);
feed_filter.add_event_kind(EventKind::EventDeletion); feed_filter.add_event_kind(EventKind::EventDeletion);
feed_filter.since = Some(feed_since); feed_filter.since = Some(feed_since);
debug!( debug!(
"Feed Filter {}: {}", "{}: Feed Filter: {} authors",
&self.url, &self.url,
serde_json::to_string(&feed_filter)? feed_filter.authors.len()
); );
// Create the lookback filter // Create the lookback filter
@ -306,9 +307,9 @@ impl Minion {
special_filter.add_event_kind(EventKind::RelaysList); special_filter.add_event_kind(EventKind::RelaysList);
special_filter.since = Some(special_since); special_filter.since = Some(special_since);
debug!( debug!(
"Special Filter {}: {}", "{}: Special Filter: {} authors",
&self.url, &self.url,
serde_json::to_string(&special_filter)? special_filter.authors.len()
); );
// Get the subscription // Get the subscription
@ -329,16 +330,41 @@ impl Minion {
let wire = serde_json::to_string(&req_message)?; let wire = serde_json::to_string(&req_message)?;
websocket_sink.send(WsMessage::Text(wire.clone())).await?; websocket_sink.send(WsMessage::Text(wire.clone())).await?;
trace!("Sent {}", &wire); trace!("{}: Sent {}", &self.url, &wire);
Ok(()) Ok(())
} }
#[allow(dead_code)] #[allow(dead_code)]
async fn get_events(&mut self, _ids: Vec<IdHex>) -> Result<(), Error> { async fn get_events(&mut self, ids: Vec<IdHex>) -> Result<(), Error> {
// Create a new "events" subscription which closes on EOSE if ids.is_empty() {
// Use and bump next_events_subscription_id so they are independent subscriptions return Ok(());
unimplemented!() }
// create the filter
let mut filter = Filters::new();
filter.ids = ids;
debug!("{}: Event Filter: {} events", &self.url, filter.ids.len());
// create a handle for ourselves
let handle = format!("events{}", self.next_events_subscription_id);
self.next_events_subscription_id += 1;
// save the subscription
self.subscriptions.add(&handle, vec![filter]);
// get the request message
let req_message = self.subscriptions.get(&handle).unwrap().req_message();
// Subscribe on the relay
let websocket_sink = self.sink.as_mut().unwrap();
let wire = serde_json::to_string(&req_message)?;
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
trace!("{}: Sent {}", &self.url, &wire);
Ok(())
} }
#[allow(dead_code)] #[allow(dead_code)]

View File

@ -4,7 +4,7 @@ mod relay_picker;
use crate::comms::BusMessage; use crate::comms::BusMessage;
use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay, DbSetting}; use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay, DbSetting};
use crate::error::Error; use crate::error::Error;
use crate::globals::GLOBALS; use crate::globals::{Globals, GLOBALS};
use crate::settings::Settings; use crate::settings::Settings;
use minion::Minion; use minion::Minion;
use nostr_types::{Event, PrivateKey, PublicKey, PublicKeyHex, Unixtime, Url}; use nostr_types::{Event, PrivateKey, PublicKey, PublicKeyHex, Unixtime, Url};
@ -179,6 +179,7 @@ impl Overlord {
pubkeys: pubkeys.clone(), pubkeys: pubkeys.clone(),
person_relays: DbPersonRelay::fetch_for_pubkeys(&pubkeys).await?, person_relays: DbPersonRelay::fetch_for_pubkeys(&pubkeys).await?,
}; };
let mut best_relay: BestRelay; let mut best_relay: BestRelay;
loop { loop {
if relay_picker.is_degenerate() { if relay_picker.is_degenerate() {
@ -194,17 +195,51 @@ impl Overlord {
} }
// Fire off a minion to handle this relay // Fire off a minion to handle this relay
self.start_minion(best_relay.relay.url.clone(), best_relay.pubkeys.clone()) self.start_minion(best_relay.relay.url.clone()).await?;
.await?;
// Tell it to follow the chosen people
let _ = self.to_minions.send(BusMessage {
target: best_relay.relay.url.clone(),
kind: "set_followed_people".to_string(),
json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(),
});
info!( info!(
"Picked relay {}, {} people left", "Picked relay {}, {} people left",
best_relay.relay.url, &best_relay.relay.url,
relay_picker.pubkeys.len() relay_picker.pubkeys.len()
); );
} }
} }
// Get desired events from relays
{
let (desired_events_map, desired_events_vec) = Globals::get_desired_events().await?;
info!(
"Seeking {} events",
desired_events_map.len() + desired_events_vec.len()
);
for (url, mut ids) in desired_events_map {
// Add the orphans
ids.extend(&desired_events_vec);
// If we don't have such a minion, start one
if !self.urls_watching.contains(&url) {
// Start a minion
self.start_minion(url.0.clone()).await?;
}
// Tell it to get these events
let _ = self.to_minions.send(BusMessage {
target: url.0.clone(),
kind: "fetch_events".to_string(),
json_payload: serde_json::to_string(&ids).unwrap(),
});
}
}
'mainloop: loop { 'mainloop: loop {
match self.loop_handler().await { match self.loop_handler().await {
Ok(keepgoing) => { Ok(keepgoing) => {
@ -222,20 +257,13 @@ impl Overlord {
Ok(()) Ok(())
} }
async fn start_minion(&mut self, url: String, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> { async fn start_minion(&mut self, url: String) -> Result<(), Error> {
let moved_url = Url(url.clone()); let moved_url = Url(url.clone());
let mut minion = Minion::new(moved_url).await?; let mut minion = Minion::new(moved_url).await?;
let abort_handle = self.minions.spawn(async move { minion.handle().await }); let abort_handle = self.minions.spawn(async move { minion.handle().await });
let id = abort_handle.id(); let id = abort_handle.id();
self.minions_task_url.insert(id, Url(url.clone())); self.minions_task_url.insert(id, Url(url.clone()));
self.urls_watching.push(Url(url.clone())); self.urls_watching.push(Url(url.clone()));
let _ = self.to_minions.send(BusMessage {
target: url.clone(),
kind: "set_followed_people".to_string(),
json_payload: serde_json::to_string(&pubkeys).unwrap(),
});
Ok(()) Ok(())
} }
@ -296,7 +324,7 @@ impl Overlord {
let maybe_url = self.minions_task_url.get(&id); let maybe_url = self.minions_task_url.get(&id);
match maybe_url { match maybe_url {
Some(url) => { Some(url) => {
warn!("Relay Task {} completed", &url); info!("Relay Task {} completed", &url);
// Remove from our urls_watching vec // Remove from our urls_watching vec
self.urls_watching.retain(|value| value != url); self.urls_watching.retain(|value| value != url);