diff --git a/src/main.rs b/src/main.rs index 76e474b1..e2729177 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,18 +8,60 @@ mod db; mod error; mod event_related; mod globals; +mod overlord; mod settings; mod ui; +use crate::comms::BusMessage; +use crate::error::Error; +use crate::globals::GLOBALS; +use std::ops::DerefMut; +use std::{mem, thread}; + fn main() { tracing_subscriber::fmt::init(); - // TBD: start async code + // Start async code + // We do this on a separate thread because egui is most portable by + // being on the main thread. + let async_thread = thread::spawn(|| { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(tokio_main()); + }); if let Err(e) = ui::run() { tracing::error!("{}", e); } - // TBD: Tell the async parties to close down - // TBD: wait for the async parties to close down + // Tell the async parties to close down + if let Err(e) = initiate_shutdown() { + tracing::error!("{}", e); + } + + // Wait for the async thread to complete + async_thread.join().unwrap(); +} + +async fn tokio_main() { + // Steal `from_minions` from the GLOBALS, and give it to a new Overlord + let from_minions = { + let mut mutex_option = GLOBALS.from_minions.lock().await; + mem::replace(mutex_option.deref_mut(), None) + } + .unwrap(); + + // Run the overlord + let mut overlord = crate::overlord::Overlord::new(from_minions); + overlord.run().await; +} + +// Any task can call this to shutdown +pub fn initiate_shutdown() -> Result<(), Error> { + let to_overlord = GLOBALS.to_overlord.clone(); + to_overlord.send(BusMessage { + target: "all".to_string(), + kind: "shutdown".to_string(), + json_payload: serde_json::to_string("").unwrap(), + })?; + Ok(()) } diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs new file mode 100644 index 00000000..6f8bba6c --- /dev/null +++ b/src/overlord/mod.rs @@ -0,0 +1,159 @@ +mod relay_picker; + +use crate::comms::BusMessage; +use crate::db::{DbEvent, DbPerson, DbRelay}; +use crate::error::Error; +use crate::globals::GLOBALS; +use crate::settings::Settings; +use nostr_proto::{Event, Unixtime}; +use tokio::select; +use tokio::sync::broadcast::Sender; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::{error, info}; + +pub struct Overlord { + settings: Settings, + to_minions: Sender, + #[allow(dead_code)] + from_minions: UnboundedReceiver, +} + +impl Overlord { + pub fn new(from_minions: UnboundedReceiver) -> Overlord { + let to_minions = GLOBALS.to_minions.clone(); + Overlord { + settings: Settings::default(), + to_minions, + from_minions, + } + } + + pub async fn run(&mut self) { + if let Err(e) = self.run_inner().await { + error!("{}", e); + } + + // Send shutdown message to all minions (and ui) + // If this fails, it's probably because there are no more listeners + // so just ignore it and keep shutting down. + let _ = self.to_minions.send(BusMessage { + target: "all".to_string(), + kind: "shutdown".to_string(), + json_payload: serde_json::to_string("shutdown").unwrap(), + }); + + // Wait on all minions to finish. When there are no more senders + // sending to `from_minions` then they are all completed. + // In that case this call will return an error, but we don't care we + // just finish. + let _ = self.from_minions.recv(); + } + + pub async fn run_inner(&mut self) -> Result<(), Error> { + // Setup the database (possibly create, possibly upgrade) + crate::db::setup_database().await?; + + // Load settings + self.settings = Settings::load().await?; + + // FIXME - if this needs doing, it should be done dynamically as + // new people are encountered, not batch-style on startup. + // Create a person record for every person seen, possibly autofollow + DbPerson::populate_new_people(self.settings.autofollow != 0).await?; + + // FIXME - if this needs doing, it should be done dynamically as + // new people are encountered, not batch-style on startup. + // Create a relay record for every relay in person_relay map (these get + // updated from events without necessarily updating our relays list) + DbRelay::populate_new_relays().await?; + + // Load feed-related events from database and process (TextNote, EventDeletion, Reaction) + { + let now = Unixtime::now().unwrap(); + let then = now.0 - self.settings.feed_chunk as i64; + let db_events = DbEvent::fetch(Some(&format!( + " (kind=1 OR kind=5 OR kind=7) AND created_at > {} ORDER BY created_at ASC", + then + ))) + .await?; + + // Map db events into Events + let mut events: Vec = Vec::with_capacity(db_events.len()); + for dbevent in db_events.iter() { + let e = serde_json::from_str(&dbevent.raw)?; + events.push(e); + } + + // Process these events + let mut count = 0; + for event in events.iter() { + count += 1; + crate::globals::add_event(event).await?; + } + info!("Loaded {} events from the database", count); + } + + 'mainloop: loop { + match self.loop_handler().await { + Ok(keepgoing) => { + if !keepgoing { + break 'mainloop; + } + } + Err(e) => { + // Log them and keep looping + error!("{}", e); + } + } + } + + Ok(()) + } + + #[allow(unused_assignments)] + async fn loop_handler(&mut self) -> Result { + let mut keepgoing: bool = true; + + select! { + bus_message = self.from_minions.recv() => { + let bus_message = match bus_message { + Some(bm) => bm, + None => { + // All senders dropped, or one of them closed. + return Ok(false); + } + }; + keepgoing = self.handle_bus_message(bus_message).await?; + }, + } + + Ok(keepgoing) + } + + async fn handle_bus_message(&mut self, bus_message: BusMessage) -> Result { + #[allow(clippy::single_match)] // because temporarily so + match &*bus_message.target { + "all" => match &*bus_message.kind { + "shutdown" => { + info!("Overlord shutting down"); + return Ok(false); + } + "settings_changed" => { + self.settings = serde_json::from_str(&bus_message.json_payload)?; + // We need to inform the minions + self.to_minions.send(BusMessage { + target: "all".to_string(), + kind: "settings_changed".to_string(), + json_payload: bus_message.json_payload.clone(), + })?; + } + _ => {} + }, + //"overlord" => match &*bus_message.kind { + //} + _ => {} + } + + Ok(true) + } +} diff --git a/src/overlord/relay_picker.rs b/src/overlord/relay_picker.rs new file mode 100644 index 00000000..3f8241eb --- /dev/null +++ b/src/overlord/relay_picker.rs @@ -0,0 +1,110 @@ +use crate::db::{DbPersonRelay, DbRelay}; +use crate::error::Error; +use nostr_proto::PublicKeyHex; +use tracing::info; + +/// See RelayPicker::best() +#[allow(dead_code)] +pub struct RelayPicker { + pub relays: Vec, + pub pubkeys: Vec, + pub person_relays: Vec, +} + +impl RelayPicker { + #[allow(dead_code)] + pub fn is_degenerate(&self) -> bool { + self.relays.is_empty() || self.pubkeys.is_empty() || self.person_relays.is_empty() + } + + /// This function takes a RelayPicker which consists of a list of relays, + /// a list of public keys, and a mapping between them. It outputs a + /// BestRelay structure which includes the best relay to listen to and + /// the public keys such a relay will cover. It also outpus a new RelayPicker + /// that contains only the remaining relays and public keys. + #[allow(dead_code)] + pub fn best(mut self) -> Result<(BestRelay, RelayPicker), Error> { + if self.pubkeys.is_empty() { + return Err(Error::General( + "best_relay called for zero people".to_owned(), + )); + } + if self.relays.is_empty() { + return Err(Error::General( + "best_relay called for zero relays".to_owned(), + )); + } + + info!( + "Searching for the best relay among {} for {} people", + self.relays.len(), + self.pubkeys.len() + ); + + // Keep score + let mut score: Vec = [0].repeat(self.relays.len()); + + // Count how many keys a relay covers, to use as part of it's score + for person_relay in self.person_relays.iter() { + let i = match self + .relays + .iter() + .position(|relay| relay.url == person_relay.relay) + { + Some(index) => index, + None => continue, // we don't have that relay? + }; + score[i] += 1; + } + + // Multiply scores by relay rank + #[allow(clippy::needless_range_loop)] + for i in 0..self.relays.len() { + score[i] *= self.relays[i].rank.unwrap_or(1); + } + + let winner_index = score + .iter() + .enumerate() + .max_by(|x: &(usize, &u64), y: &(usize, &u64)| x.1.cmp(y.1)) + .unwrap() + .0; + + let winner = self.relays.swap_remove(winner_index); + + let covered_public_keys: Vec = self + .person_relays + .iter() + .filter(|x| x.relay == winner.url) + .map(|x| PublicKeyHex(x.person.clone())) + .collect(); + + self.pubkeys.retain(|x| !covered_public_keys.contains(x)); + + self.person_relays.retain(|pr| { + !covered_public_keys.contains(&PublicKeyHex(pr.person.clone())) + && pr.relay != winner.url + }); + + Ok(( + BestRelay { + relay: winner, + pubkeys: covered_public_keys, + }, + self, + )) + } +} + +/// See RelayPicker::best() +pub struct BestRelay { + pub relay: DbRelay, + pub pubkeys: Vec, +} + +impl BestRelay { + #[allow(dead_code)] + pub fn is_degenerate(&self) -> bool { + self.pubkeys.is_empty() || self.relay.rank == Some(0) + } +}