Merge branch 'picker'

This commit is contained in:
Mike Dilger 2023-02-25 15:38:45 +13:00
commit 2fde1dc6f4
26 changed files with 189 additions and 473 deletions

15
Cargo.lock generated
View File

@ -1601,6 +1601,7 @@ name = "gossip"
version = "0.4.90-unstable"
dependencies = [
"async-recursion",
"async-trait",
"base64 0.21.0",
"dashmap",
"dirs",
@ -1609,6 +1610,7 @@ dependencies = [
"encoding_rs",
"futures",
"futures-util",
"gossip-relay-picker",
"hex",
"http",
"humansize",
@ -1637,6 +1639,19 @@ dependencies = [
"zeroize",
]
[[package]]
name = "gossip-relay-picker"
version = "0.1.0"
source = "git+https://github.com/mikedilger/gossip-relay-picker#98cd2acc494bf716f9fbefa9e64cbc2322e65c1d"
dependencies = [
"async-trait",
"dashmap",
"nostr-types",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "group"
version = "0.12.1"

View File

@ -13,6 +13,7 @@ edition = "2021"
[dependencies]
async-recursion = "1.0"
async-trait = "0.1"
base64 = "0.21"
dashmap = "5.4"
dirs = "4.0"
@ -21,6 +22,7 @@ egui_extras = { git = "https://github.com/mikedilger/egui", branch="gossip", fea
encoding_rs = "0.8"
futures = "0.3"
futures-util = "0.3"
gossip-relay-picker = { git = "https://github.com/mikedilger/gossip-relay-picker" }
hex = "0.4"
http = "0.2"
humansize = "2.1"

View File

@ -105,13 +105,13 @@ impl DbEvent {
};
let mut kinds = vec![EventKind::TextNote, EventKind::EventDeletion];
if GLOBALS.settings.read().await.direct_messages {
if GLOBALS.settings.read().direct_messages {
kinds.push(EventKind::EncryptedDirectMessage);
}
if GLOBALS.settings.read().await.reposts {
if GLOBALS.settings.read().reposts {
kinds.push(EventKind::Repost);
}
if GLOBALS.settings.read().await.reactions {
if GLOBALS.settings.read().reactions {
kinds.push(EventKind::Reaction);
}

View File

@ -20,7 +20,7 @@ mod contact;
pub use contact::DbContact;
mod person_relay;
pub use person_relay::{DbPersonRelay, Direction};
pub use person_relay::DbPersonRelay;
use crate::error::Error;
use crate::globals::GLOBALS;

View File

@ -1,14 +1,9 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use gossip_relay_picker::Direction;
use nostr_types::{PublicKeyHex, RelayUrl, Unixtime};
use tokio::task::spawn_blocking;
#[derive(Debug, Copy, Clone)]
pub enum Direction {
Read,
Write,
}
#[derive(Debug)]
pub struct DbPersonRelay {
pub person: String,
@ -353,7 +348,7 @@ impl DbPersonRelay {
let mut ranked_relays = ranked_relays?;
let num_relays_per_person = GLOBALS.settings.read().await.num_relays_per_person as usize;
let num_relays_per_person = GLOBALS.settings.read().num_relays_per_person as usize;
// If we can't get enough of them, extend with some of our relays
// at whatever the lowest score of their last one was
@ -368,7 +363,6 @@ impl DbPersonRelay {
Direction::Write => {
// substitute our read relays
let additional: Vec<(RelayUrl, u64)> = GLOBALS
.relay_tracker
.all_relays
.iter()
.filter_map(|r| {
@ -387,7 +381,6 @@ impl DbPersonRelay {
Direction::Read => {
// substitute our write relays
let additional: Vec<(RelayUrl, u64)> = GLOBALS
.relay_tracker
.all_relays
.iter()
.filter_map(|r| {

View File

@ -54,6 +54,9 @@ pub enum Error {
#[error("Bad integer: {0}")]
ParseInt(#[from] std::num::ParseIntError),
#[error("Relay Picker error: {0}")]
RelayPickerError(#[from] gossip_relay_picker::Error),
#[error("HTTP (reqwest) error: {0}")]
ReqwestHttpError(#[from] reqwest::Error),

View File

@ -121,7 +121,7 @@ impl Feed {
}
pub fn get_person_feed(&self, person: PublicKeyHex) -> Vec<Id> {
let enable_reposts = GLOBALS.settings.blocking_read().reposts;
let enable_reposts = GLOBALS.settings.read().reposts;
self.maybe_recompute();
let mut events: Vec<Event> = GLOBALS
@ -167,7 +167,7 @@ impl Feed {
}
pub async fn recompute(&self) -> Result<(), Error> {
let settings = GLOBALS.settings.read().await.clone();
let settings = GLOBALS.settings.read().clone();
*self.interval_ms.write() = settings.feed_recompute_interval_ms;
let events: Vec<Event> = GLOBALS
@ -223,7 +223,7 @@ impl Feed {
*self.general_feed.write() = fevents.iter().map(|e| e.id).collect();
// Filter differently for the replies feed
let direct_only = GLOBALS.settings.read().await.direct_replies_only;
let direct_only = GLOBALS.settings.read().direct_replies_only;
if let Some(my_pubkey) = GLOBALS.signer.public_key() {
let my_events: HashSet<Id> = self.my_event_ids.read().iter().copied().collect();

View File

@ -120,7 +120,7 @@ impl Fetcher {
}
// Do not fetch if offline
if GLOBALS.settings.blocking_read().offline {
if GLOBALS.settings.read().offline {
return Ok(None);
}

View File

@ -5,10 +5,13 @@ use crate::feed::Feed;
use crate::fetcher::Fetcher;
use crate::people::People;
use crate::relationship::Relationship;
use crate::relays::RelayTracker;
use crate::relay_picker_hooks::Hooks;
use crate::settings::Settings;
use crate::signer::Signer;
use dashmap::{DashMap, DashSet};
use gossip_relay_picker::RelayPicker;
use nostr_types::{Event, Id, Profile, PublicKeyHex, RelayUrl};
use parking_lot::RwLock as PRwLock;
use rusqlite::Connection;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize};
@ -49,15 +52,21 @@ pub struct Globals {
/// All nostr people records currently loaded into memory, keyed by pubkey
pub people: People,
/// The relay tracker, used to pick the next relay
pub relay_tracker: RelayTracker,
/// All the relays we know about
pub all_relays: DashMap<RelayUrl, DbRelay>,
/// The relays currently connected to
pub connected_relays: DashSet<RelayUrl>,
/// The relay picker, used to pick the next relay
pub relay_picker: RelayPicker<Hooks>,
/// Whether or not we are shutting down. For the UI (minions will be signaled and
/// waited for by the overlord)
pub shutting_down: AtomicBool,
/// Settings
pub settings: RwLock<Settings>,
pub settings: PRwLock<Settings>,
/// Signer
pub signer: Signer,
@ -103,9 +112,11 @@ lazy_static! {
incoming_events: RwLock::new(Vec::new()),
relationships: RwLock::new(HashMap::new()),
people: People::new(),
relay_tracker: Default::default(),
all_relays: DashMap::new(),
connected_relays: DashSet::new(),
relay_picker: Default::default(),
shutting_down: AtomicBool::new(false),
settings: RwLock::new(Settings::default()),
settings: PRwLock::new(Settings::default()),
signer: Signer::default(),
dismissed: RwLock::new(Vec::new()),
feed: Feed::new(),
@ -217,7 +228,6 @@ impl Globals {
};
for ri in GLOBALS
.relay_tracker
.all_relays
.iter()
.filter(|ri| ri.value().write)
@ -232,8 +242,7 @@ impl Globals {
where
F: FnMut(&DbRelay) -> bool,
{
self.relay_tracker
.all_relays
self.all_relays
.iter()
.filter_map(|r| {
if f(r.value()) {
@ -249,8 +258,7 @@ impl Globals {
where
F: FnMut(&DbRelay) -> bool,
{
self.relay_tracker
.all_relays
self.all_relays
.iter()
.filter_map(|r| {
if f(r.value()) {
@ -263,6 +271,6 @@ impl Globals {
}
pub fn relay_is_connected(&self, url: &RelayUrl) -> bool {
self.relay_tracker.connected_relays.contains(url)
self.connected_relays.contains(url)
}
}

View File

@ -21,7 +21,7 @@ mod overlord;
mod people;
mod process;
mod relationship;
mod relays;
mod relay_picker_hooks;
mod settings;
mod signer;
mod tags;
@ -55,7 +55,7 @@ fn main() -> Result<(), Error> {
// Load settings
let settings = crate::settings::Settings::blocking_load()?;
*GLOBALS.settings.blocking_write() = settings;
*GLOBALS.settings.write() = settings;
// We create and enter the runtime on the main thread so that
// non-async code can have a runtime context within which to spawn

View File

@ -8,7 +8,7 @@ use std::sync::atomic::Ordering;
// This updates the people map and the database with the result
pub async fn validate_nip05(person: DbPerson) -> Result<(), Error> {
if !GLOBALS.settings.read().await.check_nip05 {
if !GLOBALS.settings.read().check_nip05 {
return Ok(());
}
@ -126,7 +126,7 @@ async fn update_relays(
let db_relay = DbRelay::new(relay_url.clone());
DbRelay::insert(db_relay.clone()).await?;
if let Entry::Vacant(entry) = GLOBALS.relay_tracker.all_relays.entry(relay_url.clone())
if let Entry::Vacant(entry) = GLOBALS.all_relays.entry(relay_url.clone())
{
entry.insert(db_relay);
}

View File

@ -54,7 +54,7 @@ impl Minion {
.await?;
// set in globals
if let Some(mut dbrelay) =
GLOBALS.relay_tracker.all_relays.get_mut(&self.dbrelay.url)
GLOBALS.all_relays.get_mut(&self.dbrelay.url)
{
dbrelay.last_general_eose_at = Some(event.created_at.0 as u64);
}

View File

@ -80,7 +80,7 @@ impl Minion {
tracing::error!("{}: ERROR bumping relay failure count: {}", &self.url, e);
}
// Update in globals too
if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&self.dbrelay.url) {
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) {
dbrelay.failure_count += 1;
}
}
@ -144,7 +144,7 @@ impl Minion {
let req = http::request::Request::builder().method("GET");
let req = if GLOBALS.settings.read().await.set_user_agent {
let req = if GLOBALS.settings.read().set_user_agent {
let about = crate::about::about();
req.header("User-Agent", format!("gossip/{}", about.version))
} else {
@ -207,7 +207,7 @@ impl Minion {
tracing::error!("{}: ERROR bumping relay success count: {}", &self.url, e);
}
// set in globals
if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&self.dbrelay.url) {
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) {
dbrelay.last_connected_at = self.dbrelay.last_connected_at;
}
}
@ -364,7 +364,7 @@ impl Minion {
) -> Result<(), Error> {
let mut filters: Vec<Filter> = Vec::new();
let (overlap, feed_chunk) = {
let settings = GLOBALS.settings.read().await.clone();
let settings = GLOBALS.settings.read().clone();
(
Duration::from_secs(settings.overlap),
Duration::from_secs(settings.feed_chunk),
@ -406,8 +406,8 @@ impl Minion {
}
};
let enable_reactions = GLOBALS.settings.read().await.reactions;
let enable_reposts = GLOBALS.settings.read().await.reposts;
let enable_reactions = GLOBALS.settings.read().reactions;
let enable_reposts = GLOBALS.settings.read().reposts;
if let Some(pubkey) = GLOBALS.signer.public_key() {
let mut kinds = vec![EventKind::TextNote, EventKind::EventDeletion];
@ -508,7 +508,7 @@ impl Minion {
async fn subscribe_mentions(&mut self) -> Result<(), Error> {
let mut filters: Vec<Filter> = Vec::new();
let (overlap, replies_chunk) = {
let settings = GLOBALS.settings.read().await.clone();
let settings = GLOBALS.settings.read().clone();
(
Duration::from_secs(settings.overlap),
Duration::from_secs(settings.replies_chunk),
@ -537,8 +537,8 @@ impl Minion {
replies_since.max(one_replieschunk_ago)
};
let enable_reactions = GLOBALS.settings.read().await.reactions;
let enable_reposts = GLOBALS.settings.read().await.reposts;
let enable_reactions = GLOBALS.settings.read().reactions;
let enable_reposts = GLOBALS.settings.read().reposts;
if let Some(pubkey) = GLOBALS.signer.public_key() {
// Any mentions of me
@ -663,7 +663,7 @@ impl Minion {
let mut filters: Vec<Filter> = Vec::new();
let enable_reactions = GLOBALS.settings.read().await.reactions;
let enable_reactions = GLOBALS.settings.read().reactions;
if !vec_ids.is_empty() {
let idhp: Vec<IdHexPrefix> = vec_ids.iter().map(|id| id.to_owned().into()).collect();

View File

@ -1,16 +1,16 @@
mod minion;
use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage};
use crate::db::{DbEvent, DbEventSeen, DbPersonRelay, DbRelay, Direction};
use crate::db::{DbEvent, DbEventSeen, DbPersonRelay, DbRelay};
use crate::error::Error;
use crate::globals::GLOBALS;
use crate::people::People;
use crate::relays::RelayAssignment;
use crate::tags::{
add_event_to_tags, add_pubkey_hex_to_tags, add_pubkey_to_tags, add_subject_to_tags_if_missing,
keys_from_text, notes_from_text,
};
use dashmap::mapref::entry::Entry;
use gossip_relay_picker::{Direction, RelayAssignment};
use minion::Minion;
use nostr_types::{
EncryptedPrivateKey, Event, EventKind, Filter, Id, IdHex, IdHexPrefix, Metadata, PreEvent,
@ -97,7 +97,6 @@ impl Overlord {
let mut all_relays: Vec<DbRelay> = DbRelay::fetch(None).await?;
for dbrelay in all_relays.drain(..) {
GLOBALS
.relay_tracker
.all_relays
.insert(dbrelay.url.clone(), dbrelay);
}
@ -106,12 +105,15 @@ impl Overlord {
// Load followed people from the database
GLOBALS.people.load_all_followed().await?;
// Initialize the relay picker
GLOBALS.relay_picker.init().await?;
let now = Unixtime::now().unwrap();
// Load reply-related events from database and process
// (where you are tagged)
{
let replies_chunk = GLOBALS.settings.read().await.replies_chunk;
let replies_chunk = GLOBALS.settings.read().replies_chunk;
let then = now.0 - replies_chunk as i64;
let db_events = DbEvent::fetch_reply_related(then).await?;
@ -134,15 +136,15 @@ impl Overlord {
// Load feed-related events from database and process
{
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
let feed_chunk = GLOBALS.settings.read().feed_chunk;
let then = now.0 - feed_chunk as i64;
let reactions = if GLOBALS.settings.read().await.reactions {
let reactions = if GLOBALS.settings.read().reactions {
" OR kind=7"
} else {
""
};
let reposts = if GLOBALS.settings.read().await.reactions {
let reposts = if GLOBALS.settings.read().reactions {
" OR kind=6"
} else {
""
@ -185,11 +187,7 @@ impl Overlord {
}
// Pick Relays and start Minions
if !GLOBALS.settings.read().await.offline {
// Initialize the RelayPicker
GLOBALS.relay_tracker.init().await?;
// Pick relays
if !GLOBALS.settings.read().offline {
self.pick_relays().await;
}
@ -246,23 +244,23 @@ impl Overlord {
async fn pick_relays(&mut self) {
loop {
match GLOBALS.relay_tracker.pick().await {
match GLOBALS.relay_picker.pick().await {
Err(failure) => {
tracing::info!("Done picking relays: {}", failure);
break;
}
Ok(relay_url) => {
if let Some(elem) = GLOBALS.relay_tracker.relay_assignments.get(&relay_url) {
if let Some(ra) = GLOBALS.relay_picker.get_relay_assignment(&relay_url) {
tracing::debug!(
"Picked {} covering {} pubkeys",
&relay_url,
elem.value().pubkeys.len()
ra.pubkeys.len()
);
// Apply the relay assignment
if let Err(e) = self.apply_relay_assignment(elem.value().to_owned()).await {
if let Err(e) = self.apply_relay_assignment(ra.to_owned()).await {
tracing::error!("{}", e);
// On failure, return it
GLOBALS.relay_tracker.relay_disconnected(&relay_url);
GLOBALS.relay_picker.relay_disconnected(&relay_url);
}
} else {
tracing::warn!("Relay Picker just picked {} but it is already no longer part of it's relay assignments!", &relay_url);
@ -295,7 +293,7 @@ impl Overlord {
}
async fn start_minion(&mut self, url: RelayUrl) -> Result<(), Error> {
if GLOBALS.settings.read().await.offline {
if GLOBALS.settings.read().offline {
return Ok(());
}
@ -303,7 +301,7 @@ impl Overlord {
let abort_handle = self.minions.spawn(async move { minion.handle().await });
let id = abort_handle.id();
self.minions_task_url.insert(id, url.clone());
GLOBALS.relay_tracker.connected_relays.insert(url);
GLOBALS.connected_relays.insert(url);
Ok(())
}
@ -366,7 +364,7 @@ impl Overlord {
// Minion probably already logged failure in relay table
// Set to not connected
GLOBALS.relay_tracker.connected_relays.remove(&url);
GLOBALS.connected_relays.remove(&url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
@ -388,7 +386,7 @@ impl Overlord {
tracing::info!("Relay Task {} completed", &url);
// Set to not connected
GLOBALS.relay_tracker.connected_relays.remove(&url);
GLOBALS.connected_relays.remove(&url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
@ -405,10 +403,10 @@ impl Overlord {
}
async fn recover_from_minion_exit(&mut self, url: RelayUrl) {
GLOBALS.relay_tracker.relay_disconnected(&url);
GLOBALS.relay_picker.relay_disconnected(&url);
if let Err(e) = GLOBALS
.relay_tracker
.refresh_person_relay_scores(false)
.relay_picker
.refresh_person_relay_scores()
.await
{
tracing::error!("Error: {}", e);
@ -421,7 +419,7 @@ impl Overlord {
ToOverlordMessage::AddRelay(relay_str) => {
let dbrelay = DbRelay::new(relay_str.clone());
DbRelay::insert(dbrelay.clone()).await?;
GLOBALS.relay_tracker.all_relays.insert(relay_str, dbrelay);
GLOBALS.all_relays.insert(relay_str, dbrelay);
}
ToOverlordMessage::AdvertiseRelayList => {
self.advertise_relay_list().await?;
@ -510,8 +508,8 @@ impl Overlord {
// When manually doing this, we refresh person_relay scores first which
// often change if the user just added follows.
GLOBALS
.relay_tracker
.refresh_person_relay_scores(false)
.relay_picker
.refresh_person_relay_scores()
.await?;
// Then pick
@ -548,7 +546,7 @@ impl Overlord {
self.push_metadata(metadata).await?;
}
ToOverlordMessage::RankRelay(relay_url, rank) => {
if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) {
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&relay_url) {
dbrelay.rank = rank as u64;
}
DbRelay::set_rank(relay_url, rank).await?;
@ -557,21 +555,21 @@ impl Overlord {
self.refresh_followed_metadata().await?;
}
ToOverlordMessage::SaveSettings => {
GLOBALS.settings.read().await.save().await?;
GLOBALS.settings.read().save().await?;
tracing::debug!("Settings saved.");
}
ToOverlordMessage::SetActivePerson(pubkey) => {
GLOBALS.people.set_active_person(pubkey).await?;
}
ToOverlordMessage::SetRelayReadWrite(relay_url, read, write) => {
if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) {
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&relay_url) {
dbrelay.read = read;
dbrelay.write = write;
}
DbRelay::update_read_and_write(relay_url, read, write).await?;
}
ToOverlordMessage::SetRelayAdvertise(relay_url, advertise) => {
if let Some(mut dbrelay) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url) {
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&relay_url) {
dbrelay.advertise = advertise;
}
DbRelay::update_advertise(relay_url, advertise).await?;
@ -593,16 +591,13 @@ impl Overlord {
// Update public key from private key
let public_key = GLOBALS.signer.public_key().unwrap();
{
let mut settings = GLOBALS.settings.write().await;
settings.public_key = Some(public_key);
settings.save().await?;
}
GLOBALS.settings.write().public_key = Some(public_key);
GLOBALS.settings.read().clone().save().await?;
}
ToOverlordMessage::UpdateMetadata(pubkey) => {
let best_relays =
DbPersonRelay::get_best_relays(pubkey.clone(), Direction::Write).await?;
let num_relays_per_person = GLOBALS.settings.read().await.num_relays_per_person;
let num_relays_per_person = GLOBALS.settings.read().num_relays_per_person;
// we do 1 more than num_relays_per_person, which is really for main posts,
// since metadata is more important and I didn't want to bother with
@ -693,7 +688,7 @@ impl Overlord {
}
};
if GLOBALS.settings.read().await.set_client_tag {
if GLOBALS.settings.read().set_client_tag {
tags.push(Tag::Other {
tag: "client".to_owned(),
data: vec!["gossip".to_owned()],
@ -785,7 +780,7 @@ impl Overlord {
ots: None,
};
let powint = GLOBALS.settings.read().await.pow;
let powint = GLOBALS.settings.read().pow;
let pow = if powint > 0 { Some(powint) } else { None };
GLOBALS.signer.sign_preevent(pre_event, pow)?
};
@ -917,7 +912,7 @@ impl Overlord {
},
];
if GLOBALS.settings.read().await.set_client_tag {
if GLOBALS.settings.read().set_client_tag {
tags.push(Tag::Other {
tag: "client".to_owned(),
data: vec!["gossip".to_owned()],
@ -933,7 +928,7 @@ impl Overlord {
ots: None,
};
let powint = GLOBALS.settings.read().await.pow;
let powint = GLOBALS.settings.read().pow;
let pow = if powint > 0 { Some(powint) } else { None };
GLOBALS.signer.sign_preevent(pre_event, pow)?
};
@ -1062,7 +1057,7 @@ impl Overlord {
async fn refresh_followed_metadata(&mut self) -> Result<(), Error> {
let pubkeys = GLOBALS.people.get_followed_pubkeys();
let num_relays_per_person = GLOBALS.settings.read().await.num_relays_per_person;
let num_relays_per_person = GLOBALS.settings.read().num_relays_per_person;
let mut map: HashMap<RelayUrl, Vec<PublicKeyHex>> = HashMap::new();
@ -1159,7 +1154,7 @@ impl Overlord {
// instead build the filters, then both send them to the minion and
// also query them locally.
{
let enable_reactions = GLOBALS.settings.read().await.reactions;
let enable_reactions = GLOBALS.settings.read().reactions;
if !missing_ancestors_hex.is_empty() {
let idhp: Vec<IdHexPrefix> = missing_ancestors_hex
@ -1262,7 +1257,7 @@ impl Overlord {
DbRelay::insert(db_relay.clone()).await?;
if let Entry::Vacant(entry) =
GLOBALS.relay_tracker.all_relays.entry(relay_url.clone())
GLOBALS.all_relays.entry(relay_url.clone())
{
entry.insert(db_relay);
}

View File

@ -1,11 +1,12 @@
use crate::comms::ToOverlordMessage;
use crate::db::{DbEvent, DbPersonRelay, Direction};
use crate::db::{DbEvent, DbPersonRelay};
use crate::error::Error;
use crate::globals::GLOBALS;
use crate::AVATAR_SIZE;
use dashmap::{DashMap, DashSet};
use eframe::egui::ColorImage;
use egui_extras::image::FitTo;
use gossip_relay_picker::Direction;
use image::imageops::FilterType;
use nostr_types::{
Event, EventKind, Metadata, PreEvent, PublicKey, PublicKeyHex, RelayUrl, Tag, UncheckedUrl,
@ -246,7 +247,7 @@ impl People {
pub fn person_of_interest(&self, pubkeyhex: PublicKeyHex) {
if !GLOBALS
.settings
.blocking_read()
.read()
.automatically_fetch_metadata
{
return;
@ -545,7 +546,7 @@ impl People {
};
// Do not fetch if disabled
if !GLOBALS.settings.blocking_read().load_avatars {
if !GLOBALS.settings.read().load_avatars {
GLOBALS.people.avatars_failed.insert(pubkeyhex.clone());
return Err(());
}
@ -774,10 +775,10 @@ impl People {
}
if follow > 0 {
// Add the person to the relay_tracker for picking
GLOBALS.relay_tracker.add_someone(pubkeyhex.to_owned())?;
// Add the person to the relay_picker for picking
GLOBALS.relay_picker.add_someone(pubkeyhex.to_owned())?;
} else {
GLOBALS.relay_tracker.remove_someone(pubkeyhex.to_owned());
GLOBALS.relay_picker.remove_someone(pubkeyhex.to_owned());
}
Ok(())
@ -879,9 +880,9 @@ impl People {
}
}
// Add the people to the relay_tracker for picking
// Add the people to the relay_picker for picking
for pubkey in pubkeys.iter() {
GLOBALS.relay_tracker.add_someone(pubkey.to_owned())?;
GLOBALS.relay_picker.add_someone(pubkey.to_owned())?;
}
Ok(())

View File

@ -265,7 +265,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> {
DbRelay::clear_read_and_write().await?;
// in memory
for mut elem in GLOBALS.relay_tracker.all_relays.iter_mut() {
for mut elem in GLOBALS.all_relays.iter_mut() {
elem.value_mut().read = false;
elem.value_mut().write = false;
}
@ -287,7 +287,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> {
.await?;
// set in memory
if let Some(mut elem) =
GLOBALS.relay_tracker.all_relays.get_mut(&relay_url)
GLOBALS.all_relays.get_mut(&relay_url)
{
elem.read = true;
elem.write = false;
@ -302,7 +302,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> {
.await?;
// set in memory
if let Some(mut elem) =
GLOBALS.relay_tracker.all_relays.get_mut(&relay_url)
GLOBALS.all_relays.get_mut(&relay_url)
{
elem.read = false;
elem.write = true;
@ -318,7 +318,7 @@ async fn process_relay_list(event: &Event) -> Result<(), Error> {
// set in database
DbRelay::update_read_and_write(relay_url.clone(), true, true).await?;
// set in memory
if let Some(mut elem) = GLOBALS.relay_tracker.all_relays.get_mut(&relay_url)
if let Some(mut elem) = GLOBALS.all_relays.get_mut(&relay_url)
{
elem.read = true;
elem.write = true;

61
src/relay_picker_hooks.rs Normal file
View File

@ -0,0 +1,61 @@
use crate::globals::GLOBALS;
use crate::db::DbPersonRelay;
use crate::error::Error;
use async_trait::async_trait;
use gossip_relay_picker::{Direction, RelayPickerHooks};
use nostr_types::{PublicKeyHex, RelayUrl};
#[derive(Default)]
pub struct Hooks { }
#[async_trait]
impl RelayPickerHooks for Hooks {
type Error = Error;
/// Returns all relays available to be connected to
fn get_all_relays(&self) -> Vec<RelayUrl> {
GLOBALS.all_relays.iter().map(|elem| elem.key().to_owned()).collect()
}
/// Returns all relays that this public key uses in the given Direction
async fn get_relays_for_pubkey(
&self,
pubkey: PublicKeyHex,
direction: Direction,
) -> Result<Vec<(RelayUrl, u64)>, Error> {
DbPersonRelay::get_best_relays(pubkey, direction).await
}
/// Is the relay currently connected?
fn is_relay_connected(&self, relay: &RelayUrl) -> bool {
GLOBALS.connected_relays.contains(relay)
}
/// Returns the maximum number of relays that should be connected to at one time
fn get_max_relays(&self) -> usize {
GLOBALS.settings.read().max_relays as usize
}
/// Returns the number of relays each followed person's events should be pulled from
/// Many people use 2 or 3 for redundancy.
fn get_num_relays_per_person(&self) -> usize {
GLOBALS.settings.read().num_relays_per_person as usize
}
/// Returns the public keys of all the people followed
fn get_followed_pubkeys(&self) -> Vec<PublicKeyHex> {
GLOBALS.people.get_followed_pubkeys()
}
/// Adjusts the score for a given relay, perhaps based on relay-specific metrics
fn adjust_score(&self, relay: RelayUrl, score: u64) -> u64 {
if let Some(relay) = GLOBALS.all_relays.get(&relay) {
let success_rate = relay.success_rate();
let rank = (relay.rank as f32 * (1.3 * success_rate)) as u64;
score * rank
} else {
score
}
}
}

View File

@ -1,356 +0,0 @@
use crate::db::{DbPersonRelay, DbRelay, Direction};
use crate::error::Error;
use crate::globals::GLOBALS;
use dashmap::{DashMap, DashSet};
use nostr_types::{PublicKeyHex, RelayUrl, Unixtime};
use std::fmt;
use std::sync::atomic::{AtomicU8, Ordering};
/// A RelayAssignment is a record of a relay which is serving (or will serve) the general
/// feed for a set of public keys.
#[derive(Debug, Clone)]
pub struct RelayAssignment {
pub relay_url: RelayUrl,
pub pubkeys: Vec<PublicKeyHex>,
}
impl RelayAssignment {
pub fn merge_in(&mut self, other: RelayAssignment) -> Result<(), Error> {
if self.relay_url != other.relay_url {
return Err(Error::General(
"Attempted to merge relay assignments on different relays".to_owned(),
));
}
self.pubkeys.extend(other.pubkeys);
Ok(())
}
}
/// Ways that pick() can fail
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(clippy::enum_variant_names)]
pub enum RelayPickFailure {
/// No relays to pick from
NoRelays,
/// No people left to assign. A good result.
NoPeopleLeft,
/// No progress was made. A stuck result.
NoProgress,
}
impl fmt::Display for RelayPickFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
RelayPickFailure::NoRelays => write!(f, "No relays to pick from."),
RelayPickFailure::NoPeopleLeft => write!(f, "All people accounted for."),
RelayPickFailure::NoProgress => write!(f, "Unable to make further progress."),
}
}
}
/// The RelayTracker is a structure that helps assign people we follow to relays we watch.
/// It remembers which publickeys are assigned to which relays, which pubkeys need more
/// relays and how many, which relays need a time out, and person-relay scores for making
/// good assignments dynamically.
#[derive(Debug, Default)]
pub struct RelayTracker {
/// All of the relays we might use
pub all_relays: DashMap<RelayUrl, DbRelay>,
/// All of the relays actually connected
pub connected_relays: DashSet<RelayUrl>,
/// All of the relays currently connected, with optional assignments.
/// (Sometimes a relay is connected for a different kind of subscription.)
pub relay_assignments: DashMap<RelayUrl, RelayAssignment>,
/// Relays which recently failed and which require a timeout before
/// they can be chosen again. The value is the time when it can be removed
/// from this list.
pub excluded_relays: DashMap<RelayUrl, i64>,
/// For each followed pubkey that still needs assignments, the number of relay
/// assignments it is seeking. These start out at settings.num_relays_per_person
/// (if the person doesn't have that many relays, it will do the best it can)
pub pubkey_counts: DashMap<PublicKeyHex, u8>,
/// A ranking of relays per person.
pub person_relay_scores: DashMap<PublicKeyHex, Vec<(RelayUrl, u64)>>,
/// Number of relays per person. Taken from settings and fixed.
pub num_relays_per_person: AtomicU8,
}
impl RelayTracker {
/// This starts a new RelayTracker that has:
/// * All relays
/// * All followed public keys, with count starting at num_relays_per_person
/// * person relay scores for all person-relay pairings
pub async fn init(&self) -> Result<(), Error> {
// just in case it is re-initialized (not sure why it would be)
self.all_relays.clear();
self.connected_relays.clear();
self.relay_assignments.clear();
self.excluded_relays.clear();
self.pubkey_counts.clear();
self.person_relay_scores.clear();
// Load relays from the database
for (relay_url, dbrelay) in DbRelay::fetch(None)
.await?
.drain(..)
.map(|dbr| (dbr.url.clone(), dbr))
{
self.all_relays.insert(relay_url, dbrelay);
}
self.num_relays_per_person.store(
GLOBALS.settings.read().await.num_relays_per_person,
Ordering::Relaxed,
);
self.refresh_person_relay_scores(true).await?;
Ok(())
}
pub fn add_someone(&self, pubkey: PublicKeyHex) -> Result<(), Error> {
// Check if we already have them
if self.pubkey_counts.get(&pubkey).is_some() {
return Ok(());
}
for elem in self.relay_assignments.iter() {
let assignment = elem.value();
if assignment.pubkeys.contains(&pubkey) {
return Ok(());
}
}
self.pubkey_counts
.insert(pubkey, self.num_relays_per_person.load(Ordering::Relaxed));
Ok(())
}
pub fn remove_someone(&self, pubkey: PublicKeyHex) {
// Remove from pubkey counts
self.pubkey_counts.remove(&pubkey);
// Remove from relay assignments
for mut elem in self.relay_assignments.iter_mut() {
let assignment = elem.value_mut();
if let Some(pos) = assignment.pubkeys.iter().position(|x| x == &pubkey) {
assignment.pubkeys.remove(pos);
}
}
// This doesn't indicate that the assignment has changed, so the relay is
// still delivering their events. But the feed shouldn't be showing them.
}
pub async fn refresh_person_relay_scores(&self, initialize_counts: bool) -> Result<(), Error> {
self.person_relay_scores.clear();
if initialize_counts {
self.pubkey_counts.clear();
}
// Get all the people we follow
let pubkeys: Vec<PublicKeyHex> = GLOBALS
.people
.get_followed_pubkeys()
.iter()
.map(|p| p.to_owned())
.collect();
// Compute scores for each person_relay pairing
for pubkey in &pubkeys {
let best_relays: Vec<(RelayUrl, u64)> =
DbPersonRelay::get_best_relays(pubkey.to_owned(), Direction::Write).await?;
self.person_relay_scores.insert(pubkey.clone(), best_relays);
if initialize_counts {
self.pubkey_counts.insert(
pubkey.clone(),
self.num_relays_per_person.load(Ordering::Relaxed),
);
}
}
Ok(())
}
/// When a relay disconnects, call this so that whatever assignments it might have
/// had can be reassigned. Then call pick_relays() again.
pub fn relay_disconnected(&self, url: &RelayUrl) {
// Remove from connected relays list
if let Some((_key, assignment)) = self.relay_assignments.remove(url) {
// Exclude the relay for the next 30 seconds
let hence = Unixtime::now().unwrap().0 + 30;
self.excluded_relays.insert(url.to_owned(), hence);
tracing::debug!("{} goes into the penalty box until {}", url, hence,);
// Put the public keys back into pubkey_counts
for pubkey in assignment.pubkeys.iter() {
self.pubkey_counts
.entry(pubkey.to_owned())
.and_modify(|e| *e += 1)
.or_insert(1);
}
}
}
/// Create the next assignment, and return the RelayUrl that has it.
/// The caller is responsible for making that assignment actually happen.
pub async fn pick(&self) -> Result<RelayUrl, RelayPickFailure> {
// If we are at max relays, only consider relays we are already
// connected to
let max_relays = GLOBALS.settings.read().await.max_relays as usize;
let at_max_relays = self.relay_assignments.len() >= max_relays;
// Maybe include excluded relays
let now = Unixtime::now().unwrap().0;
self.excluded_relays.retain(|_, v| *v > now);
if self.pubkey_counts.is_empty() {
return Err(RelayPickFailure::NoPeopleLeft);
}
if self.all_relays.is_empty() {
return Err(RelayPickFailure::NoRelays);
}
// Keep score for each relay
let scoreboard: DashMap<RelayUrl, u64> = self
.all_relays
.iter()
.map(|x| (x.key().to_owned(), 0))
.collect();
// Assign scores to relays from each pubkey
for elem in self.person_relay_scores.iter() {
let pubkeyhex = elem.key();
let relay_scores = elem.value();
// Skip if this pubkey doesn't need any more assignments
if let Some(pkc) = self.pubkey_counts.get(pubkeyhex) {
if *pkc == 0 {
// person doesn't need anymore
continue;
}
} else {
continue; // person doesn't need any
}
// Add scores of their relays
for (relay, score) in relay_scores.iter() {
// Skip relays that are excluded
if self.excluded_relays.contains_key(relay) {
continue;
}
// If at max, skip relays not already connected
if at_max_relays && !self.connected_relays.contains(relay) {
continue;
}
// Skip if relay is already assigned this pubkey
if let Some(assignment) = self.relay_assignments.get(relay) {
if assignment.pubkeys.contains(pubkeyhex) {
continue;
}
}
// Add the score
if let Some(mut entry) = scoreboard.get_mut(relay) {
*entry += score;
}
}
}
// Adjust all scores based on relay rank and relay success rate
for mut score_entry in scoreboard.iter_mut() {
let url = score_entry.key().to_owned();
let score = score_entry.value_mut();
if let Some(relay) = self.all_relays.get(&url) {
let success_rate = relay.success_rate();
let rank = (relay.rank as f32 * (1.3 * success_rate)) as u64;
*score *= rank;
}
}
let winner = scoreboard
.iter()
.max_by(|x, y| x.value().cmp(y.value()))
.unwrap();
let winning_url: RelayUrl = winner.key().to_owned();
let winning_score: u64 = *winner.value();
if winning_score == 0 {
return Err(RelayPickFailure::NoProgress);
}
// Now sort out which public keys go with that relay (we did this already
// above when assigning scores, but in a way which would require a lot of
// storage to keep, so we just do it again)
let covered_public_keys = {
let pubkeys_seeking_relays: Vec<PublicKeyHex> = self
.pubkey_counts
.iter()
.filter(|e| *e.value() > 0)
.map(|e| e.key().to_owned())
.collect();
let mut covered_pubkeys: Vec<PublicKeyHex> = Vec::new();
for pubkey in pubkeys_seeking_relays.iter() {
// Skip if relay is already assigned this pubkey
if let Some(assignment) = self.relay_assignments.get(&winning_url) {
if assignment.pubkeys.contains(pubkey) {
continue;
}
}
if let Some(elem) = self.person_relay_scores.get(pubkey) {
let relay_scores = elem.value();
if relay_scores.iter().any(|e| e.0 == winning_url) {
covered_pubkeys.push(pubkey.to_owned());
if let Some(mut count) = self.pubkey_counts.get_mut(pubkey) {
if *count > 0 {
*count -= 1;
}
}
}
}
}
covered_pubkeys
};
if covered_public_keys.is_empty() {
return Err(RelayPickFailure::NoProgress);
}
// Only keep pubkey_counts that are still > 0
self.pubkey_counts.retain(|_, count| *count > 0);
let assignment = RelayAssignment {
relay_url: winning_url.clone(),
pubkeys: covered_public_keys,
};
// Put assignment into relay_assignments
if let Some(mut maybe_elem) = self.relay_assignments.get_mut(&winning_url) {
// FIXME this could cause a panic, but it would mean we have bad code.
maybe_elem.value_mut().merge_in(assignment).unwrap();
} else {
self.relay_assignments
.insert(winning_url.clone(), assignment);
}
Ok(winning_url)
}
}

View File

@ -15,9 +15,7 @@ pub struct Signer {
impl Signer {
pub async fn load_from_settings(&self) {
let settings = GLOBALS.settings.read().await;
*self.public.write() = settings.public_key;
*self.public.write() = GLOBALS.settings.read().public_key;
*self.private.write() = None;
let maybe_db = GLOBALS.db.lock().await;
@ -32,8 +30,8 @@ impl Signer {
}
pub async fn save_through_settings(&self) -> Result<(), Error> {
let mut settings = GLOBALS.settings.write().await;
settings.public_key = *self.public.read();
GLOBALS.settings.write().public_key = *self.public.read();
let settings = GLOBALS.settings.read().clone();
settings.save().await?;
let epk = self.encrypted.read().clone();

View File

@ -259,8 +259,8 @@ fn render_post_actual(
let top = ui.next_widget_position();
// Only render known relevent events
let enable_reposts = GLOBALS.settings.blocking_read().reposts;
let direct_messages = GLOBALS.settings.blocking_read().direct_messages;
let enable_reposts = GLOBALS.settings.read().reposts;
let direct_messages = GLOBALS.settings.read().direct_messages;
if event.kind != EventKind::TextNote
&& !(enable_reposts && (event.kind == EventKind::Repost))
&& !(direct_messages && (event.kind == EventKind::EncryptedDirectMessage))

View File

@ -24,7 +24,6 @@ pub(super) fn posting_area(
ui.label(" to post.");
});
} else if !GLOBALS
.relay_tracker
.all_relays
.iter()
.any(|r| r.value().write)

View File

@ -148,7 +148,7 @@ impl Drop for GossipUi {
impl GossipUi {
fn new(cctx: &eframe::CreationContext<'_>) -> Self {
let settings = GLOBALS.settings.blocking_read().clone();
let settings = GLOBALS.settings.read().clone();
if let Some(dpi) = settings.override_dpi {
let ppt: f32 = dpi as f32 / 72.0;
@ -329,7 +329,7 @@ impl GossipUi {
impl eframe::App for GossipUi {
fn update(&mut self, ctx: &Context, frame: &mut eframe::Frame) {
let max_fps = GLOBALS.settings.blocking_read().max_fps as f32;
let max_fps = GLOBALS.settings.read().max_fps as f32;
if self.future_scroll_offset != 0.0 {
ctx.request_repaint();

View File

@ -42,7 +42,6 @@ pub(super) fn update(app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::Fr
// TBD time how long this takes. We don't want expensive code in the UI
// FIXME keep more relay info and display it
let mut relays: Vec<DbRelay> = GLOBALS
.relay_tracker
.all_relays
.iter()
.map(|ri| ri.value().clone())

View File

@ -37,7 +37,6 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
ui.add_space(18.0);
let connected_relays: Vec<RelayUrl> = GLOBALS
.relay_tracker
.connected_relays
.iter()
.map(|r| r.key().clone())
@ -73,7 +72,7 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
});
row.col(|ui| {
if let Some(ref assignment) =
GLOBALS.relay_tracker.relay_assignments.get(relay_url)
GLOBALS.relay_picker.get_relay_assignment(relay_url)
{
ui.label(format!("{}", assignment.pubkeys.len()));
}
@ -100,8 +99,8 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
ui.add_space(12.0);
ui.heading("Coverage");
if !GLOBALS.relay_tracker.pubkey_counts.is_empty() {
for elem in GLOBALS.relay_tracker.pubkey_counts.iter() {
if GLOBALS.relay_picker.pubkey_counts_iter().count() > 0 {
for elem in GLOBALS.relay_picker.pubkey_counts_iter() {
let pk = elem.key();
let count = elem.value();
let maybe_person = GLOBALS.people.get(pk);

View File

@ -13,7 +13,7 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra
ui.add_space(12.0);
if ui.button("SAVE CHANGES").clicked() {
// Copy local settings to global settings
*GLOBALS.settings.blocking_write() = app.settings.clone();
*GLOBALS.settings.write() = app.settings.clone();
// Tell the overlord to save them
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::SaveSettings);

View File

@ -119,7 +119,6 @@ pub(super) fn update(app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::Fr
ui.label("to edit/save metadata.");
});
} else if !GLOBALS
.relay_tracker
.all_relays
.iter()
.any(|r| r.value().write)