diff --git a/Cargo.lock b/Cargo.lock index 99bf9b09..35349ff9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1601,6 +1601,7 @@ name = "gossip" version = "0.4.90-unstable" dependencies = [ "async-recursion", + "async-trait", "base64 0.21.0", "dashmap", "dirs", @@ -1609,6 +1610,7 @@ dependencies = [ "encoding_rs", "futures", "futures-util", + "gossip-relay-picker", "hex", "http", "humansize", @@ -1637,6 +1639,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "gossip-relay-picker" +version = "0.1.0" +source = "git+https://github.com/mikedilger/gossip-relay-picker#98cd2acc494bf716f9fbefa9e64cbc2322e65c1d" +dependencies = [ + "async-trait", + "dashmap", + "nostr-types", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "group" version = "0.12.1" diff --git a/Cargo.toml b/Cargo.toml index 54e626b9..6741a508 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ edition = "2021" [dependencies] async-recursion = "1.0" +async-trait = "0.1" base64 = "0.21" dashmap = "5.4" dirs = "4.0" @@ -21,6 +22,7 @@ egui_extras = { git = "https://github.com/mikedilger/egui", branch="gossip", fea encoding_rs = "0.8" futures = "0.3" futures-util = "0.3" +gossip-relay-picker = { git = "https://github.com/mikedilger/gossip-relay-picker" } hex = "0.4" http = "0.2" humansize = "2.1" diff --git a/src/db/event.rs b/src/db/event.rs index 3db767da..0ba8429b 100644 --- a/src/db/event.rs +++ b/src/db/event.rs @@ -105,13 +105,13 @@ impl DbEvent { }; let mut kinds = vec![EventKind::TextNote, EventKind::EventDeletion]; - if GLOBALS.settings.read().await.direct_messages { + if GLOBALS.settings.read().direct_messages { kinds.push(EventKind::EncryptedDirectMessage); } - if GLOBALS.settings.read().await.reposts { + if GLOBALS.settings.read().reposts { kinds.push(EventKind::Repost); } - if GLOBALS.settings.read().await.reactions { + if GLOBALS.settings.read().reactions { kinds.push(EventKind::Reaction); } diff --git a/src/db/mod.rs b/src/db/mod.rs index 3ff78ae9..97504bce 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -20,7 +20,7 @@ mod contact; pub use contact::DbContact; mod person_relay; -pub use person_relay::{DbPersonRelay, Direction}; +pub use person_relay::DbPersonRelay; use crate::error::Error; use crate::globals::GLOBALS; diff --git a/src/db/person_relay.rs b/src/db/person_relay.rs index 194710c4..439cd7b8 100644 --- a/src/db/person_relay.rs +++ b/src/db/person_relay.rs @@ -1,14 +1,9 @@ use crate::error::Error; use crate::globals::GLOBALS; +use gossip_relay_picker::Direction; use nostr_types::{PublicKeyHex, RelayUrl, Unixtime}; use tokio::task::spawn_blocking; -#[derive(Debug, Copy, Clone)] -pub enum Direction { - Read, - Write, -} - #[derive(Debug)] pub struct DbPersonRelay { pub person: String, @@ -353,7 +348,7 @@ impl DbPersonRelay { let mut ranked_relays = ranked_relays?; - let num_relays_per_person = GLOBALS.settings.read().await.num_relays_per_person as usize; + let num_relays_per_person = GLOBALS.settings.read().num_relays_per_person as usize; // If we can't get enough of them, extend with some of our relays // at whatever the lowest score of their last one was @@ -368,7 +363,6 @@ impl DbPersonRelay { Direction::Write => { // substitute our read relays let additional: Vec<(RelayUrl, u64)> = GLOBALS - .relay_tracker .all_relays .iter() .filter_map(|r| { @@ -387,7 +381,6 @@ impl DbPersonRelay { Direction::Read => { // substitute our write relays let additional: Vec<(RelayUrl, u64)> = GLOBALS - .relay_tracker .all_relays .iter() .filter_map(|r| { diff --git a/src/error.rs b/src/error.rs index 93cdb057..c9bf3283 100644 --- a/src/error.rs +++ b/src/error.rs @@ -54,6 +54,9 @@ pub enum Error { #[error("Bad integer: {0}")] ParseInt(#[from] std::num::ParseIntError), + #[error("Relay Picker error: {0}")] + RelayPickerError(#[from] gossip_relay_picker::Error), + #[error("HTTP (reqwest) error: {0}")] ReqwestHttpError(#[from] reqwest::Error), diff --git a/src/feed.rs b/src/feed.rs index 55994dd1..ee5e6c65 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -121,7 +121,7 @@ impl Feed { } pub fn get_person_feed(&self, person: PublicKeyHex) -> Vec { - let enable_reposts = GLOBALS.settings.blocking_read().reposts; + let enable_reposts = GLOBALS.settings.read().reposts; self.maybe_recompute(); let mut events: Vec = GLOBALS @@ -167,7 +167,7 @@ impl Feed { } pub async fn recompute(&self) -> Result<(), Error> { - let settings = GLOBALS.settings.read().await.clone(); + let settings = GLOBALS.settings.read().clone(); *self.interval_ms.write() = settings.feed_recompute_interval_ms; let events: Vec = GLOBALS @@ -223,7 +223,7 @@ impl Feed { *self.general_feed.write() = fevents.iter().map(|e| e.id).collect(); // Filter differently for the replies feed - let direct_only = GLOBALS.settings.read().await.direct_replies_only; + let direct_only = GLOBALS.settings.read().direct_replies_only; if let Some(my_pubkey) = GLOBALS.signer.public_key() { let my_events: HashSet = self.my_event_ids.read().iter().copied().collect(); diff --git a/src/fetcher.rs b/src/fetcher.rs index 693b283c..6bcd4401 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -120,7 +120,7 @@ impl Fetcher { } // Do not fetch if offline - if GLOBALS.settings.blocking_read().offline { + if GLOBALS.settings.read().offline { return Ok(None); } diff --git a/src/globals.rs b/src/globals.rs index 33197244..fb9bd77f 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -5,10 +5,13 @@ use crate::feed::Feed; use crate::fetcher::Fetcher; use crate::people::People; use crate::relationship::Relationship; -use crate::relays::RelayTracker; +use crate::relay_picker_hooks::Hooks; use crate::settings::Settings; use crate::signer::Signer; +use dashmap::{DashMap, DashSet}; +use gossip_relay_picker::RelayPicker; use nostr_types::{Event, Id, Profile, PublicKeyHex, RelayUrl}; +use parking_lot::RwLock as PRwLock; use rusqlite::Connection; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize}; @@ -49,15 +52,21 @@ pub struct Globals { /// All nostr people records currently loaded into memory, keyed by pubkey pub people: People, - /// The relay tracker, used to pick the next relay - pub relay_tracker: RelayTracker, + /// All the relays we know about + pub all_relays: DashMap, + + /// The relays currently connected to + pub connected_relays: DashSet, + + /// The relay picker, used to pick the next relay + pub relay_picker: RelayPicker, /// Whether or not we are shutting down. For the UI (minions will be signaled and /// waited for by the overlord) pub shutting_down: AtomicBool, /// Settings - pub settings: RwLock, + pub settings: PRwLock, /// Signer pub signer: Signer, @@ -103,9 +112,11 @@ lazy_static! { incoming_events: RwLock::new(Vec::new()), relationships: RwLock::new(HashMap::new()), people: People::new(), - relay_tracker: Default::default(), + all_relays: DashMap::new(), + connected_relays: DashSet::new(), + relay_picker: Default::default(), shutting_down: AtomicBool::new(false), - settings: RwLock::new(Settings::default()), + settings: PRwLock::new(Settings::default()), signer: Signer::default(), dismissed: RwLock::new(Vec::new()), feed: Feed::new(), @@ -217,7 +228,6 @@ impl Globals { }; for ri in GLOBALS - .relay_tracker .all_relays .iter() .filter(|ri| ri.value().write) @@ -232,8 +242,7 @@ impl Globals { where F: FnMut(&DbRelay) -> bool, { - self.relay_tracker - .all_relays + self.all_relays .iter() .filter_map(|r| { if f(r.value()) { @@ -249,8 +258,7 @@ impl Globals { where F: FnMut(&DbRelay) -> bool, { - self.relay_tracker - .all_relays + self.all_relays .iter() .filter_map(|r| { if f(r.value()) { @@ -263,6 +271,6 @@ impl Globals { } pub fn relay_is_connected(&self, url: &RelayUrl) -> bool { - self.relay_tracker.connected_relays.contains(url) + self.connected_relays.contains(url) } } diff --git a/src/main.rs b/src/main.rs index d1028e2c..5d4948aa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,7 @@ mod overlord; mod people; mod process; mod relationship; -mod relays; +mod relay_picker_hooks; mod settings; mod signer; mod tags; @@ -55,7 +55,7 @@ fn main() -> Result<(), Error> { // Load settings let settings = crate::settings::Settings::blocking_load()?; - *GLOBALS.settings.blocking_write() = settings; + *GLOBALS.settings.write() = settings; // We create and enter the runtime on the main thread so that // non-async code can have a runtime context within which to spawn diff --git a/src/nip05.rs b/src/nip05.rs index 10e91aa3..e49e6161 100644 --- a/src/nip05.rs +++ b/src/nip05.rs @@ -8,7 +8,7 @@ use std::sync::atomic::Ordering; // This updates the people map and the database with the result pub async fn validate_nip05(person: DbPerson) -> Result<(), Error> { - if !GLOBALS.settings.read().await.check_nip05 { + if !GLOBALS.settings.read().check_nip05 { return Ok(()); } @@ -126,7 +126,7 @@ async fn update_relays( let db_relay = DbRelay::new(relay_url.clone()); DbRelay::insert(db_relay.clone()).await?; - if let Entry::Vacant(entry) = GLOBALS.relay_tracker.all_relays.entry(relay_url.clone()) + if let Entry::Vacant(entry) = GLOBALS.all_relays.entry(relay_url.clone()) { entry.insert(db_relay); } diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index cddb78fb..6468b025 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -54,7 +54,7 @@ impl Minion { .await?; // set in globals if let Some(mut dbrelay) = - GLOBALS.relay_tracker.all_relays.get_mut(&self.dbrelay.url) + GLOBALS.all_relays.get_mut(&self.dbrelay.url) { dbrelay.last_general_eose_at = Some(event.created_at.0 as u64); } diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index 2bcf5c2c..432a0763 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -80,7 +80,7 @@ impl Minion { tracing::error!("{}: ERROR bumping relay failure count: {}", &self.url, e); } // Update in globals too - if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&self.dbrelay.url) { + if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) { dbrelay.failure_count += 1; } } @@ -144,7 +144,7 @@ impl Minion { let req = http::request::Request::builder().method("GET"); - let req = if GLOBALS.settings.read().await.set_user_agent { + let req = if GLOBALS.settings.read().set_user_agent { let about = crate::about::about(); req.header("User-Agent", format!("gossip/{}", about.version)) } else { @@ -207,7 +207,7 @@ impl Minion { tracing::error!("{}: ERROR bumping relay success count: {}", &self.url, e); } // set in globals - if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&self.dbrelay.url) { + if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) { dbrelay.last_connected_at = self.dbrelay.last_connected_at; } } @@ -364,7 +364,7 @@ impl Minion { ) -> Result<(), Error> { let mut filters: Vec = Vec::new(); let (overlap, feed_chunk) = { - let settings = GLOBALS.settings.read().await.clone(); + let settings = GLOBALS.settings.read().clone(); ( Duration::from_secs(settings.overlap), Duration::from_secs(settings.feed_chunk), @@ -406,8 +406,8 @@ impl Minion { } }; - let enable_reactions = GLOBALS.settings.read().await.reactions; - let enable_reposts = GLOBALS.settings.read().await.reposts; + let enable_reactions = GLOBALS.settings.read().reactions; + let enable_reposts = GLOBALS.settings.read().reposts; if let Some(pubkey) = GLOBALS.signer.public_key() { let mut kinds = vec![EventKind::TextNote, EventKind::EventDeletion]; @@ -508,7 +508,7 @@ impl Minion { async fn subscribe_mentions(&mut self) -> Result<(), Error> { let mut filters: Vec = Vec::new(); let (overlap, replies_chunk) = { - let settings = GLOBALS.settings.read().await.clone(); + let settings = GLOBALS.settings.read().clone(); ( Duration::from_secs(settings.overlap), Duration::from_secs(settings.replies_chunk), @@ -537,8 +537,8 @@ impl Minion { replies_since.max(one_replieschunk_ago) }; - let enable_reactions = GLOBALS.settings.read().await.reactions; - let enable_reposts = GLOBALS.settings.read().await.reposts; + let enable_reactions = GLOBALS.settings.read().reactions; + let enable_reposts = GLOBALS.settings.read().reposts; if let Some(pubkey) = GLOBALS.signer.public_key() { // Any mentions of me @@ -663,7 +663,7 @@ impl Minion { let mut filters: Vec = Vec::new(); - let enable_reactions = GLOBALS.settings.read().await.reactions; + let enable_reactions = GLOBALS.settings.read().reactions; if !vec_ids.is_empty() { let idhp: Vec = vec_ids.iter().map(|id| id.to_owned().into()).collect(); diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 76abb0f2..c88f471f 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -1,16 +1,16 @@ mod minion; use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage}; -use crate::db::{DbEvent, DbEventSeen, DbPersonRelay, DbRelay, Direction}; +use crate::db::{DbEvent, DbEventSeen, DbPersonRelay, DbRelay}; use crate::error::Error; use crate::globals::GLOBALS; use crate::people::People; -use crate::relays::RelayAssignment; use crate::tags::{ add_event_to_tags, add_pubkey_hex_to_tags, add_pubkey_to_tags, add_subject_to_tags_if_missing, keys_from_text, notes_from_text, }; use dashmap::mapref::entry::Entry; +use gossip_relay_picker::{Direction, RelayAssignment}; use minion::Minion; use nostr_types::{ EncryptedPrivateKey, Event, EventKind, Filter, Id, IdHex, IdHexPrefix, Metadata, PreEvent, @@ -97,7 +97,6 @@ impl Overlord { let mut all_relays: Vec = DbRelay::fetch(None).await?; for dbrelay in all_relays.drain(..) { GLOBALS - .relay_tracker .all_relays .insert(dbrelay.url.clone(), dbrelay); } @@ -106,12 +105,15 @@ impl Overlord { // Load followed people from the database GLOBALS.people.load_all_followed().await?; + // Initialize the relay picker + GLOBALS.relay_picker.init().await?; + let now = Unixtime::now().unwrap(); // Load reply-related events from database and process // (where you are tagged) { - let replies_chunk = GLOBALS.settings.read().await.replies_chunk; + let replies_chunk = GLOBALS.settings.read().replies_chunk; let then = now.0 - replies_chunk as i64; let db_events = DbEvent::fetch_reply_related(then).await?; @@ -134,15 +136,15 @@ impl Overlord { // Load feed-related events from database and process { - let feed_chunk = GLOBALS.settings.read().await.feed_chunk; + let feed_chunk = GLOBALS.settings.read().feed_chunk; let then = now.0 - feed_chunk as i64; - let reactions = if GLOBALS.settings.read().await.reactions { + let reactions = if GLOBALS.settings.read().reactions { " OR kind=7" } else { "" }; - let reposts = if GLOBALS.settings.read().await.reactions { + let reposts = if GLOBALS.settings.read().reactions { " OR kind=6" } else { "" @@ -185,11 +187,7 @@ impl Overlord { } // Pick Relays and start Minions - if !GLOBALS.settings.read().await.offline { - // Initialize the RelayPicker - GLOBALS.relay_tracker.init().await?; - - // Pick relays + if !GLOBALS.settings.read().offline { self.pick_relays().await; } @@ -246,23 +244,23 @@ impl Overlord { async fn pick_relays(&mut self) { loop { - match GLOBALS.relay_tracker.pick().await { + match GLOBALS.relay_picker.pick().await { Err(failure) => { tracing::info!("Done picking relays: {}", failure); break; } Ok(relay_url) => { - if let Some(elem) = GLOBALS.relay_tracker.relay_assignments.get(&relay_url) { + if let Some(ra) = GLOBALS.relay_picker.get_relay_assignment(&relay_url) { tracing::debug!( "Picked {} covering {} pubkeys", &relay_url, - elem.value().pubkeys.len() + ra.pubkeys.len() ); // Apply the relay assignment - if let Err(e) = self.apply_relay_assignment(elem.value().to_owned()).await { + if let Err(e) = self.apply_relay_assignment(ra.to_owned()).await { tracing::error!("{}", e); // On failure, return it - GLOBALS.relay_tracker.relay_disconnected(&relay_url); + GLOBALS.relay_picker.relay_disconnected(&relay_url); } } else { tracing::warn!("Relay Picker just picked {} but it is already no longer part of it's relay assignments!", &relay_url); @@ -295,7 +293,7 @@ impl Overlord { } async fn start_minion(&mut self, url: RelayUrl) -> Result<(), Error> { - if GLOBALS.settings.read().await.offline { + if GLOBALS.settings.read().offline { return Ok(()); } @@ -303,7 +301,7 @@ impl Overlord { let abort_handle = self.minions.spawn(async move { minion.handle().await }); let id = abort_handle.id(); self.minions_task_url.insert(id, url.clone()); - GLOBALS.relay_tracker.connected_relays.insert(url); + GLOBALS.connected_relays.insert(url); Ok(()) } @@ -366,7 +364,7 @@ impl Overlord { // Minion probably already logged failure in relay table // Set to not connected - GLOBALS.relay_tracker.connected_relays.remove(&url); + GLOBALS.connected_relays.remove(&url); // Remove from our hashmap self.minions_task_url.remove(&id); @@ -388,7 +386,7 @@ impl Overlord { tracing::info!("Relay Task {} completed", &url); // Set to not connected - GLOBALS.relay_tracker.connected_relays.remove(&url); + GLOBALS.connected_relays.remove(&url); // Remove from our hashmap self.minions_task_url.remove(&id); @@ -405,10 +403,10 @@ impl Overlord { } async fn recover_from_minion_exit(&mut self, url: RelayUrl) { - GLOBALS.relay_tracker.relay_disconnected(&url); + GLOBALS.relay_picker.relay_disconnected(&url); if let Err(e) = GLOBALS - .relay_tracker - .refresh_person_relay_scores(false) + .relay_picker + .refresh_person_relay_scores() .await { tracing::error!("Error: {}", e); @@ -421,7 +419,7 @@ impl Overlord { ToOverlordMessage::AddRelay(relay_str) => { let dbrelay = DbRelay::new(relay_str.clone()); DbRelay::insert(dbrelay.clone()).await?; - GLOBALS.relay_tracker.all_relays.insert(relay_str, dbrelay); + GLOBALS.all_relays.insert(relay_str, dbrelay); } ToOverlordMessage::AdvertiseRelayList => { self.advertise_relay_list().await?; @@ -510,8 +508,8 @@ impl Overlord { // When manually doing this, we refresh person_relay scores first which // often change if the user just added follows. GLOBALS - .relay_tracker - .refresh_person_relay_scores(false) + .relay_picker + .refresh_person_relay_scores() .await?; // Then pick @@ -548,7 +546,7 @@ impl Overlord { self.push_metadata(metadata).await?; } ToOverlordMessage::RankRelay(relay_url, rank) => { - if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) { + if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&relay_url) { dbrelay.rank = rank as u64; } DbRelay::set_rank(relay_url, rank).await?; @@ -557,21 +555,21 @@ impl Overlord { self.refresh_followed_metadata().await?; } ToOverlordMessage::SaveSettings => { - GLOBALS.settings.read().await.save().await?; + GLOBALS.settings.read().save().await?; tracing::debug!("Settings saved."); } ToOverlordMessage::SetActivePerson(pubkey) => { GLOBALS.people.set_active_person(pubkey).await?; } ToOverlordMessage::SetRelayReadWrite(relay_url, read, write) => { - if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) { + if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&relay_url) { dbrelay.read = read; dbrelay.write = write; } DbRelay::update_read_and_write(relay_url, read, write).await?; } ToOverlordMessage::SetRelayAdvertise(relay_url, advertise) => { - if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) { + if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&relay_url) { dbrelay.advertise = advertise; } DbRelay::update_advertise(relay_url, advertise).await?; @@ -593,16 +591,13 @@ impl Overlord { // Update public key from private key let public_key = GLOBALS.signer.public_key().unwrap(); - { - let mut settings = GLOBALS.settings.write().await; - settings.public_key = Some(public_key); - settings.save().await?; - } + GLOBALS.settings.write().public_key = Some(public_key); + GLOBALS.settings.read().clone().save().await?; } ToOverlordMessage::UpdateMetadata(pubkey) => { let best_relays = DbPersonRelay::get_best_relays(pubkey.clone(), Direction::Write).await?; - let num_relays_per_person = GLOBALS.settings.read().await.num_relays_per_person; + let num_relays_per_person = GLOBALS.settings.read().num_relays_per_person; // we do 1 more than num_relays_per_person, which is really for main posts, // since metadata is more important and I didn't want to bother with @@ -693,7 +688,7 @@ impl Overlord { } }; - if GLOBALS.settings.read().await.set_client_tag { + if GLOBALS.settings.read().set_client_tag { tags.push(Tag::Other { tag: "client".to_owned(), data: vec!["gossip".to_owned()], @@ -785,7 +780,7 @@ impl Overlord { ots: None, }; - let powint = GLOBALS.settings.read().await.pow; + let powint = GLOBALS.settings.read().pow; let pow = if powint > 0 { Some(powint) } else { None }; GLOBALS.signer.sign_preevent(pre_event, pow)? }; @@ -917,7 +912,7 @@ impl Overlord { }, ]; - if GLOBALS.settings.read().await.set_client_tag { + if GLOBALS.settings.read().set_client_tag { tags.push(Tag::Other { tag: "client".to_owned(), data: vec!["gossip".to_owned()], @@ -933,7 +928,7 @@ impl Overlord { ots: None, }; - let powint = GLOBALS.settings.read().await.pow; + let powint = GLOBALS.settings.read().pow; let pow = if powint > 0 { Some(powint) } else { None }; GLOBALS.signer.sign_preevent(pre_event, pow)? }; @@ -1062,7 +1057,7 @@ impl Overlord { async fn refresh_followed_metadata(&mut self) -> Result<(), Error> { let pubkeys = GLOBALS.people.get_followed_pubkeys(); - let num_relays_per_person = GLOBALS.settings.read().await.num_relays_per_person; + let num_relays_per_person = GLOBALS.settings.read().num_relays_per_person; let mut map: HashMap> = HashMap::new(); @@ -1159,7 +1154,7 @@ impl Overlord { // instead build the filters, then both send them to the minion and // also query them locally. { - let enable_reactions = GLOBALS.settings.read().await.reactions; + let enable_reactions = GLOBALS.settings.read().reactions; if !missing_ancestors_hex.is_empty() { let idhp: Vec = missing_ancestors_hex @@ -1262,7 +1257,7 @@ impl Overlord { DbRelay::insert(db_relay.clone()).await?; if let Entry::Vacant(entry) = - GLOBALS.relay_tracker.all_relays.entry(relay_url.clone()) + GLOBALS.all_relays.entry(relay_url.clone()) { entry.insert(db_relay); } diff --git a/src/people.rs b/src/people.rs index 817d25d2..2727f487 100644 --- a/src/people.rs +++ b/src/people.rs @@ -1,11 +1,12 @@ use crate::comms::ToOverlordMessage; -use crate::db::{DbEvent, DbPersonRelay, Direction}; +use crate::db::{DbEvent, DbPersonRelay}; use crate::error::Error; use crate::globals::GLOBALS; use crate::AVATAR_SIZE; use dashmap::{DashMap, DashSet}; use eframe::egui::ColorImage; use egui_extras::image::FitTo; +use gossip_relay_picker::Direction; use image::imageops::FilterType; use nostr_types::{ Event, EventKind, Metadata, PreEvent, PublicKey, PublicKeyHex, RelayUrl, Tag, UncheckedUrl, @@ -246,7 +247,7 @@ impl People { pub fn person_of_interest(&self, pubkeyhex: PublicKeyHex) { if !GLOBALS .settings - .blocking_read() + .read() .automatically_fetch_metadata { return; @@ -545,7 +546,7 @@ impl People { }; // Do not fetch if disabled - if !GLOBALS.settings.blocking_read().load_avatars { + if !GLOBALS.settings.read().load_avatars { GLOBALS.people.avatars_failed.insert(pubkeyhex.clone()); return Err(()); } @@ -774,10 +775,10 @@ impl People { } if follow > 0 { - // Add the person to the relay_tracker for picking - GLOBALS.relay_tracker.add_someone(pubkeyhex.to_owned())?; + // Add the person to the relay_picker for picking + GLOBALS.relay_picker.add_someone(pubkeyhex.to_owned())?; } else { - GLOBALS.relay_tracker.remove_someone(pubkeyhex.to_owned()); + GLOBALS.relay_picker.remove_someone(pubkeyhex.to_owned()); } Ok(()) @@ -879,9 +880,9 @@ impl People { } } - // Add the people to the relay_tracker for picking + // Add the people to the relay_picker for picking for pubkey in pubkeys.iter() { - GLOBALS.relay_tracker.add_someone(pubkey.to_owned())?; + GLOBALS.relay_picker.add_someone(pubkey.to_owned())?; } Ok(()) diff --git a/src/process.rs b/src/process.rs index 83d8108c..2f4b2ddd 100644 --- a/src/process.rs +++ b/src/process.rs @@ -265,7 +265,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> { DbRelay::clear_read_and_write().await?; // in memory - for mut elem in GLOBALS.relay_tracker.all_relays.iter_mut() { + for mut elem in GLOBALS.all_relays.iter_mut() { elem.value_mut().read = false; elem.value_mut().write = false; } @@ -287,7 +287,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> { .await?; // set in memory if let Some(mut elem) = - GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) + GLOBALS.all_relays.get_mut(&relay_url) { elem.read = true; elem.write = false; @@ -302,7 +302,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> { .await?; // set in memory if let Some(mut elem) = - GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) + GLOBALS.all_relays.get_mut(&relay_url) { elem.read = false; elem.write = true; @@ -318,7 +318,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> { // set in database DbRelay::update_read_and_write(relay_url.clone(), true, true).await?; // set in memory - if let Some(mut elem) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) + if let Some(mut elem) = GLOBALS.all_relays.get_mut(&relay_url) { elem.read = true; elem.write = true; diff --git a/src/relay_picker_hooks.rs b/src/relay_picker_hooks.rs new file mode 100644 index 00000000..f8396cc9 --- /dev/null +++ b/src/relay_picker_hooks.rs @@ -0,0 +1,61 @@ +use crate::globals::GLOBALS; +use crate::db::DbPersonRelay; +use crate::error::Error; +use async_trait::async_trait; +use gossip_relay_picker::{Direction, RelayPickerHooks}; +use nostr_types::{PublicKeyHex, RelayUrl}; + +#[derive(Default)] +pub struct Hooks { } + +#[async_trait] +impl RelayPickerHooks for Hooks { + type Error = Error; + + /// Returns all relays available to be connected to + fn get_all_relays(&self) -> Vec { + GLOBALS.all_relays.iter().map(|elem| elem.key().to_owned()).collect() + } + + /// Returns all relays that this public key uses in the given Direction + async fn get_relays_for_pubkey( + &self, + pubkey: PublicKeyHex, + direction: Direction, + ) -> Result, Error> { + DbPersonRelay::get_best_relays(pubkey, direction).await + } + + /// Is the relay currently connected? + fn is_relay_connected(&self, relay: &RelayUrl) -> bool { + GLOBALS.connected_relays.contains(relay) + } + + /// Returns the maximum number of relays that should be connected to at one time + fn get_max_relays(&self) -> usize { + GLOBALS.settings.read().max_relays as usize + } + + /// Returns the number of relays each followed person's events should be pulled from + /// Many people use 2 or 3 for redundancy. + fn get_num_relays_per_person(&self) -> usize { + GLOBALS.settings.read().num_relays_per_person as usize + } + + /// Returns the public keys of all the people followed + fn get_followed_pubkeys(&self) -> Vec { + GLOBALS.people.get_followed_pubkeys() + } + + /// Adjusts the score for a given relay, perhaps based on relay-specific metrics + fn adjust_score(&self, relay: RelayUrl, score: u64) -> u64 { + if let Some(relay) = GLOBALS.all_relays.get(&relay) { + let success_rate = relay.success_rate(); + let rank = (relay.rank as f32 * (1.3 * success_rate)) as u64; + score * rank + } else { + score + } + } +} + diff --git a/src/relays.rs b/src/relays.rs deleted file mode 100644 index b42209a1..00000000 --- a/src/relays.rs +++ /dev/null @@ -1,356 +0,0 @@ -use crate::db::{DbPersonRelay, DbRelay, Direction}; -use crate::error::Error; -use crate::globals::GLOBALS; -use dashmap::{DashMap, DashSet}; -use nostr_types::{PublicKeyHex, RelayUrl, Unixtime}; -use std::fmt; -use std::sync::atomic::{AtomicU8, Ordering}; - -/// A RelayAssignment is a record of a relay which is serving (or will serve) the general -/// feed for a set of public keys. -#[derive(Debug, Clone)] -pub struct RelayAssignment { - pub relay_url: RelayUrl, - pub pubkeys: Vec, -} - -impl RelayAssignment { - pub fn merge_in(&mut self, other: RelayAssignment) -> Result<(), Error> { - if self.relay_url != other.relay_url { - return Err(Error::General( - "Attempted to merge relay assignments on different relays".to_owned(), - )); - } - self.pubkeys.extend(other.pubkeys); - Ok(()) - } -} - -/// Ways that pick() can fail -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[allow(clippy::enum_variant_names)] -pub enum RelayPickFailure { - /// No relays to pick from - NoRelays, - - /// No people left to assign. A good result. - NoPeopleLeft, - - /// No progress was made. A stuck result. - NoProgress, -} - -impl fmt::Display for RelayPickFailure { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - RelayPickFailure::NoRelays => write!(f, "No relays to pick from."), - RelayPickFailure::NoPeopleLeft => write!(f, "All people accounted for."), - RelayPickFailure::NoProgress => write!(f, "Unable to make further progress."), - } - } -} - -/// The RelayTracker is a structure that helps assign people we follow to relays we watch. -/// It remembers which publickeys are assigned to which relays, which pubkeys need more -/// relays and how many, which relays need a time out, and person-relay scores for making -/// good assignments dynamically. -#[derive(Debug, Default)] -pub struct RelayTracker { - /// All of the relays we might use - pub all_relays: DashMap, - - /// All of the relays actually connected - pub connected_relays: DashSet, - - /// All of the relays currently connected, with optional assignments. - /// (Sometimes a relay is connected for a different kind of subscription.) - pub relay_assignments: DashMap, - - /// Relays which recently failed and which require a timeout before - /// they can be chosen again. The value is the time when it can be removed - /// from this list. - pub excluded_relays: DashMap, - - /// For each followed pubkey that still needs assignments, the number of relay - /// assignments it is seeking. These start out at settings.num_relays_per_person - /// (if the person doesn't have that many relays, it will do the best it can) - pub pubkey_counts: DashMap, - - /// A ranking of relays per person. - pub person_relay_scores: DashMap>, - - /// Number of relays per person. Taken from settings and fixed. - pub num_relays_per_person: AtomicU8, -} - -impl RelayTracker { - /// This starts a new RelayTracker that has: - /// * All relays - /// * All followed public keys, with count starting at num_relays_per_person - /// * person relay scores for all person-relay pairings - pub async fn init(&self) -> Result<(), Error> { - // just in case it is re-initialized (not sure why it would be) - self.all_relays.clear(); - self.connected_relays.clear(); - self.relay_assignments.clear(); - self.excluded_relays.clear(); - self.pubkey_counts.clear(); - self.person_relay_scores.clear(); - - // Load relays from the database - for (relay_url, dbrelay) in DbRelay::fetch(None) - .await? - .drain(..) - .map(|dbr| (dbr.url.clone(), dbr)) - { - self.all_relays.insert(relay_url, dbrelay); - } - - self.num_relays_per_person.store( - GLOBALS.settings.read().await.num_relays_per_person, - Ordering::Relaxed, - ); - - self.refresh_person_relay_scores(true).await?; - - Ok(()) - } - - pub fn add_someone(&self, pubkey: PublicKeyHex) -> Result<(), Error> { - // Check if we already have them - if self.pubkey_counts.get(&pubkey).is_some() { - return Ok(()); - } - for elem in self.relay_assignments.iter() { - let assignment = elem.value(); - if assignment.pubkeys.contains(&pubkey) { - return Ok(()); - } - } - - self.pubkey_counts - .insert(pubkey, self.num_relays_per_person.load(Ordering::Relaxed)); - Ok(()) - } - - pub fn remove_someone(&self, pubkey: PublicKeyHex) { - // Remove from pubkey counts - self.pubkey_counts.remove(&pubkey); - - // Remove from relay assignments - for mut elem in self.relay_assignments.iter_mut() { - let assignment = elem.value_mut(); - if let Some(pos) = assignment.pubkeys.iter().position(|x| x == &pubkey) { - assignment.pubkeys.remove(pos); - } - } - - // This doesn't indicate that the assignment has changed, so the relay is - // still delivering their events. But the feed shouldn't be showing them. - } - - pub async fn refresh_person_relay_scores(&self, initialize_counts: bool) -> Result<(), Error> { - self.person_relay_scores.clear(); - - if initialize_counts { - self.pubkey_counts.clear(); - } - - // Get all the people we follow - let pubkeys: Vec = GLOBALS - .people - .get_followed_pubkeys() - .iter() - .map(|p| p.to_owned()) - .collect(); - - // Compute scores for each person_relay pairing - for pubkey in &pubkeys { - let best_relays: Vec<(RelayUrl, u64)> = - DbPersonRelay::get_best_relays(pubkey.to_owned(), Direction::Write).await?; - self.person_relay_scores.insert(pubkey.clone(), best_relays); - - if initialize_counts { - self.pubkey_counts.insert( - pubkey.clone(), - self.num_relays_per_person.load(Ordering::Relaxed), - ); - } - } - - Ok(()) - } - - /// When a relay disconnects, call this so that whatever assignments it might have - /// had can be reassigned. Then call pick_relays() again. - pub fn relay_disconnected(&self, url: &RelayUrl) { - // Remove from connected relays list - if let Some((_key, assignment)) = self.relay_assignments.remove(url) { - // Exclude the relay for the next 30 seconds - let hence = Unixtime::now().unwrap().0 + 30; - self.excluded_relays.insert(url.to_owned(), hence); - tracing::debug!("{} goes into the penalty box until {}", url, hence,); - - // Put the public keys back into pubkey_counts - for pubkey in assignment.pubkeys.iter() { - self.pubkey_counts - .entry(pubkey.to_owned()) - .and_modify(|e| *e += 1) - .or_insert(1); - } - } - } - - /// Create the next assignment, and return the RelayUrl that has it. - /// The caller is responsible for making that assignment actually happen. - pub async fn pick(&self) -> Result { - // If we are at max relays, only consider relays we are already - // connected to - let max_relays = GLOBALS.settings.read().await.max_relays as usize; - let at_max_relays = self.relay_assignments.len() >= max_relays; - - // Maybe include excluded relays - let now = Unixtime::now().unwrap().0; - self.excluded_relays.retain(|_, v| *v > now); - - if self.pubkey_counts.is_empty() { - return Err(RelayPickFailure::NoPeopleLeft); - } - - if self.all_relays.is_empty() { - return Err(RelayPickFailure::NoRelays); - } - - // Keep score for each relay - let scoreboard: DashMap = self - .all_relays - .iter() - .map(|x| (x.key().to_owned(), 0)) - .collect(); - - // Assign scores to relays from each pubkey - for elem in self.person_relay_scores.iter() { - let pubkeyhex = elem.key(); - let relay_scores = elem.value(); - - // Skip if this pubkey doesn't need any more assignments - if let Some(pkc) = self.pubkey_counts.get(pubkeyhex) { - if *pkc == 0 { - // person doesn't need anymore - continue; - } - } else { - continue; // person doesn't need any - } - - // Add scores of their relays - for (relay, score) in relay_scores.iter() { - // Skip relays that are excluded - if self.excluded_relays.contains_key(relay) { - continue; - } - - // If at max, skip relays not already connected - if at_max_relays && !self.connected_relays.contains(relay) { - continue; - } - - // Skip if relay is already assigned this pubkey - if let Some(assignment) = self.relay_assignments.get(relay) { - if assignment.pubkeys.contains(pubkeyhex) { - continue; - } - } - - // Add the score - if let Some(mut entry) = scoreboard.get_mut(relay) { - *entry += score; - } - } - } - - // Adjust all scores based on relay rank and relay success rate - for mut score_entry in scoreboard.iter_mut() { - let url = score_entry.key().to_owned(); - let score = score_entry.value_mut(); - if let Some(relay) = self.all_relays.get(&url) { - let success_rate = relay.success_rate(); - let rank = (relay.rank as f32 * (1.3 * success_rate)) as u64; - *score *= rank; - } - } - - let winner = scoreboard - .iter() - .max_by(|x, y| x.value().cmp(y.value())) - .unwrap(); - let winning_url: RelayUrl = winner.key().to_owned(); - let winning_score: u64 = *winner.value(); - - if winning_score == 0 { - return Err(RelayPickFailure::NoProgress); - } - - // Now sort out which public keys go with that relay (we did this already - // above when assigning scores, but in a way which would require a lot of - // storage to keep, so we just do it again) - let covered_public_keys = { - let pubkeys_seeking_relays: Vec = self - .pubkey_counts - .iter() - .filter(|e| *e.value() > 0) - .map(|e| e.key().to_owned()) - .collect(); - - let mut covered_pubkeys: Vec = Vec::new(); - - for pubkey in pubkeys_seeking_relays.iter() { - // Skip if relay is already assigned this pubkey - if let Some(assignment) = self.relay_assignments.get(&winning_url) { - if assignment.pubkeys.contains(pubkey) { - continue; - } - } - - if let Some(elem) = self.person_relay_scores.get(pubkey) { - let relay_scores = elem.value(); - - if relay_scores.iter().any(|e| e.0 == winning_url) { - covered_pubkeys.push(pubkey.to_owned()); - - if let Some(mut count) = self.pubkey_counts.get_mut(pubkey) { - if *count > 0 { - *count -= 1; - } - } - } - } - } - - covered_pubkeys - }; - - if covered_public_keys.is_empty() { - return Err(RelayPickFailure::NoProgress); - } - - // Only keep pubkey_counts that are still > 0 - self.pubkey_counts.retain(|_, count| *count > 0); - - let assignment = RelayAssignment { - relay_url: winning_url.clone(), - pubkeys: covered_public_keys, - }; - - // Put assignment into relay_assignments - if let Some(mut maybe_elem) = self.relay_assignments.get_mut(&winning_url) { - // FIXME this could cause a panic, but it would mean we have bad code. - maybe_elem.value_mut().merge_in(assignment).unwrap(); - } else { - self.relay_assignments - .insert(winning_url.clone(), assignment); - } - - Ok(winning_url) - } -} diff --git a/src/signer.rs b/src/signer.rs index c62d65a9..2dc64cf9 100644 --- a/src/signer.rs +++ b/src/signer.rs @@ -15,9 +15,7 @@ pub struct Signer { impl Signer { pub async fn load_from_settings(&self) { - let settings = GLOBALS.settings.read().await; - - *self.public.write() = settings.public_key; + *self.public.write() = GLOBALS.settings.read().public_key; *self.private.write() = None; let maybe_db = GLOBALS.db.lock().await; @@ -32,8 +30,8 @@ impl Signer { } pub async fn save_through_settings(&self) -> Result<(), Error> { - let mut settings = GLOBALS.settings.write().await; - settings.public_key = *self.public.read(); + GLOBALS.settings.write().public_key = *self.public.read(); + let settings = GLOBALS.settings.read().clone(); settings.save().await?; let epk = self.encrypted.read().clone(); diff --git a/src/ui/feed/mod.rs b/src/ui/feed/mod.rs index f71bae96..813c327e 100644 --- a/src/ui/feed/mod.rs +++ b/src/ui/feed/mod.rs @@ -259,8 +259,8 @@ fn render_post_actual( let top = ui.next_widget_position(); // Only render known relevent events - let enable_reposts = GLOBALS.settings.blocking_read().reposts; - let direct_messages = GLOBALS.settings.blocking_read().direct_messages; + let enable_reposts = GLOBALS.settings.read().reposts; + let direct_messages = GLOBALS.settings.read().direct_messages; if event.kind != EventKind::TextNote && !(enable_reposts && (event.kind == EventKind::Repost)) && !(direct_messages && (event.kind == EventKind::EncryptedDirectMessage)) diff --git a/src/ui/feed/post.rs b/src/ui/feed/post.rs index 80f936cd..f0c6a271 100644 --- a/src/ui/feed/post.rs +++ b/src/ui/feed/post.rs @@ -24,7 +24,6 @@ pub(super) fn posting_area( ui.label(" to post."); }); } else if !GLOBALS - .relay_tracker .all_relays .iter() .any(|r| r.value().write) diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 0d736dec..d573eea0 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -148,7 +148,7 @@ impl Drop for GossipUi { impl GossipUi { fn new(cctx: &eframe::CreationContext<'_>) -> Self { - let settings = GLOBALS.settings.blocking_read().clone(); + let settings = GLOBALS.settings.read().clone(); if let Some(dpi) = settings.override_dpi { let ppt: f32 = dpi as f32 / 72.0; @@ -329,7 +329,7 @@ impl GossipUi { impl eframe::App for GossipUi { fn update(&mut self, ctx: &Context, frame: &mut eframe::Frame) { - let max_fps = GLOBALS.settings.blocking_read().max_fps as f32; + let max_fps = GLOBALS.settings.read().max_fps as f32; if self.future_scroll_offset != 0.0 { ctx.request_repaint(); diff --git a/src/ui/relays/all.rs b/src/ui/relays/all.rs index 21c5e529..164efd8e 100644 --- a/src/ui/relays/all.rs +++ b/src/ui/relays/all.rs @@ -42,7 +42,6 @@ pub(super) fn update(app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::Fr // TBD time how long this takes. We don't want expensive code in the UI // FIXME keep more relay info and display it let mut relays: Vec = GLOBALS - .relay_tracker .all_relays .iter() .map(|ri| ri.value().clone()) diff --git a/src/ui/relays/mod.rs b/src/ui/relays/mod.rs index cfb6f848..90744bd8 100644 --- a/src/ui/relays/mod.rs +++ b/src/ui/relays/mod.rs @@ -37,7 +37,6 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram ui.add_space(18.0); let connected_relays: Vec = GLOBALS - .relay_tracker .connected_relays .iter() .map(|r| r.key().clone()) @@ -73,7 +72,7 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram }); row.col(|ui| { if let Some(ref assignment) = - GLOBALS.relay_tracker.relay_assignments.get(relay_url) + GLOBALS.relay_picker.get_relay_assignment(relay_url) { ui.label(format!("{}", assignment.pubkeys.len())); } @@ -100,8 +99,8 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram ui.add_space(12.0); ui.heading("Coverage"); - if !GLOBALS.relay_tracker.pubkey_counts.is_empty() { - for elem in GLOBALS.relay_tracker.pubkey_counts.iter() { + if GLOBALS.relay_picker.pubkey_counts_iter().count() > 0 { + for elem in GLOBALS.relay_picker.pubkey_counts_iter() { let pk = elem.key(); let count = elem.value(); let maybe_person = GLOBALS.people.get(pk); diff --git a/src/ui/settings.rs b/src/ui/settings.rs index 1cc8cee8..921bc446 100644 --- a/src/ui/settings.rs +++ b/src/ui/settings.rs @@ -13,7 +13,7 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra ui.add_space(12.0); if ui.button("SAVE CHANGES").clicked() { // Copy local settings to global settings - *GLOBALS.settings.blocking_write() = app.settings.clone(); + *GLOBALS.settings.write() = app.settings.clone(); // Tell the overlord to save them let _ = GLOBALS.to_overlord.send(ToOverlordMessage::SaveSettings); diff --git a/src/ui/you/metadata.rs b/src/ui/you/metadata.rs index fcb0e1af..9a2495e4 100644 --- a/src/ui/you/metadata.rs +++ b/src/ui/you/metadata.rs @@ -119,7 +119,6 @@ pub(super) fn update(app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::Fr ui.label("to edit/save metadata."); }); } else if !GLOBALS - .relay_tracker .all_relays .iter() .any(|r| r.value().write)