Switch from Mutex locks to to RwLocks where it makes sense

This commit is contained in:
Mike Dilger 2022-12-27 20:44:48 +13:00
parent b1995ead86
commit 83c603a5b6
9 changed files with 54 additions and 52 deletions

View File

@ -8,7 +8,7 @@ use nostr_types::{Event, EventKind, Id, IdHex, PublicKey, PublicKeyHex, Unixtime
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
use tracing::info; use tracing::info;
/// Only one of these is ever created, via lazy_static!, and represents /// Only one of these is ever created, via lazy_static!, and represents
@ -30,31 +30,31 @@ pub struct Globals {
pub from_minions: Mutex<Option<mpsc::UnboundedReceiver<BusMessage>>>, pub from_minions: Mutex<Option<mpsc::UnboundedReceiver<BusMessage>>>,
/// All nostr events, keyed by the event Id /// All nostr events, keyed by the event Id
pub events: Mutex<HashMap<Id, Event>>, pub events: RwLock<HashMap<Id, Event>>,
/// All relationships between events /// All relationships between events
pub relationships: Mutex<HashMap<Id, Vec<(Id, Relationship)>>>, pub relationships: RwLock<HashMap<Id, Vec<(Id, Relationship)>>>,
/// The date of the latest reply. Only reply relationships count, not reactions, /// The date of the latest reply. Only reply relationships count, not reactions,
/// deletions, or quotes /// deletions, or quotes
pub last_reply: Mutex<HashMap<Id, Unixtime>>, pub last_reply: RwLock<HashMap<Id, Unixtime>>,
/// Desired events, referred to by others, with possible URLs where we can /// Desired events, referred to by others, with possible URLs where we can
/// get them. We may already have these, but if not we should ask for them. /// get them. We may already have these, but if not we should ask for them.
pub desired_events: Mutex<HashMap<Id, Vec<Url>>>, pub desired_events: RwLock<HashMap<Id, Vec<Url>>>,
/// All nostr people records currently loaded into memory, keyed by pubkey /// All nostr people records currently loaded into memory, keyed by pubkey
pub people: Mutex<HashMap<PublicKey, DbPerson>>, pub people: RwLock<HashMap<PublicKey, DbPerson>>,
/// All nostr relay records we have /// All nostr relay records we have
pub relays: Mutex<HashMap<Url, DbRelay>>, pub relays: RwLock<HashMap<Url, DbRelay>>,
/// Whether or not we are shutting down. For the UI (minions will be signaled and /// Whether or not we are shutting down. For the UI (minions will be signaled and
/// waited for by the overlord) /// waited for by the overlord)
pub shutting_down: AtomicBool, pub shutting_down: AtomicBool,
/// Settings /// Settings
pub settings: Mutex<Settings>, pub settings: RwLock<Settings>,
} }
lazy_static! { lazy_static! {
@ -71,14 +71,14 @@ lazy_static! {
to_minions, to_minions,
to_overlord, to_overlord,
from_minions: Mutex::new(Some(from_minions)), from_minions: Mutex::new(Some(from_minions)),
events: Mutex::new(HashMap::new()), events: RwLock::new(HashMap::new()),
relationships: Mutex::new(HashMap::new()), relationships: RwLock::new(HashMap::new()),
last_reply: Mutex::new(HashMap::new()), last_reply: RwLock::new(HashMap::new()),
desired_events: Mutex::new(HashMap::new()), desired_events: RwLock::new(HashMap::new()),
people: Mutex::new(HashMap::new()), people: RwLock::new(HashMap::new()),
relays: Mutex::new(HashMap::new()), relays: RwLock::new(HashMap::new()),
shutting_down: AtomicBool::new(false), shutting_down: AtomicBool::new(false),
settings: Mutex::new(Settings::default()), settings: RwLock::new(Settings::default()),
} }
}; };
} }
@ -87,7 +87,7 @@ impl Globals {
pub fn blocking_get_feed(threaded: bool) -> Vec<Id> { pub fn blocking_get_feed(threaded: bool) -> Vec<Id> {
let feed: Vec<Event> = GLOBALS let feed: Vec<Event> = GLOBALS
.events .events
.blocking_lock() .blocking_read()
.iter() .iter()
.map(|(_, e)| e) .map(|(_, e)| e)
.filter(|e| e.kind == EventKind::TextNote) .filter(|e| e.kind == EventKind::TextNote)
@ -107,8 +107,8 @@ impl Globals {
fn sort_feed(mut feed: Vec<Event>, threaded: bool) -> Vec<Id> { fn sort_feed(mut feed: Vec<Event>, threaded: bool) -> Vec<Id> {
if threaded { if threaded {
feed.sort_unstable_by(|a, b| { feed.sort_unstable_by(|a, b| {
let a_last = GLOBALS.last_reply.blocking_lock().get(&a.id).cloned(); let a_last = GLOBALS.last_reply.blocking_read().get(&a.id).cloned();
let b_last = GLOBALS.last_reply.blocking_lock().get(&b.id).cloned(); let b_last = GLOBALS.last_reply.blocking_read().get(&b.id).cloned();
let a_time = a_last.unwrap_or(a.created_at); let a_time = a_last.unwrap_or(a.created_at);
let b_time = b_last.unwrap_or(b.created_at); let b_time = b_last.unwrap_or(b.created_at);
b_time.cmp(&a_time) b_time.cmp(&a_time)
@ -121,7 +121,7 @@ impl Globals {
} }
pub async fn store_desired_event(id: Id, url: Option<Url>) { pub async fn store_desired_event(id: Id, url: Option<Url>) {
let mut desired_events = GLOBALS.desired_events.lock().await; let mut desired_events = GLOBALS.desired_events.write().await;
desired_events desired_events
.entry(id) .entry(id)
.and_modify(|urls| { .and_modify(|urls| {
@ -137,16 +137,16 @@ impl Globals {
pub fn trim_desired_events_sync() { pub fn trim_desired_events_sync() {
// danger - two locks could lead to deadlock, check other code locking these // danger - two locks could lead to deadlock, check other code locking these
// don't change the order, or else change it everywhere // don't change the order, or else change it everywhere
let mut desired_events = GLOBALS.desired_events.blocking_lock(); let mut desired_events = GLOBALS.desired_events.blocking_write();
let events = GLOBALS.events.blocking_lock(); let events = GLOBALS.events.blocking_read();
desired_events.retain(|&id, _| !events.contains_key(&id)); desired_events.retain(|&id, _| !events.contains_key(&id));
} }
pub async fn trim_desired_events() { pub async fn trim_desired_events() {
// danger - two locks could lead to deadlock, check other code locking these // danger - two locks could lead to deadlock, check other code locking these
// don't change the order, or else change it everywhere // don't change the order, or else change it everywhere
let mut desired_events = GLOBALS.desired_events.lock().await; let mut desired_events = GLOBALS.desired_events.write().await;
let events = GLOBALS.events.lock().await; let events = GLOBALS.events.read().await;
desired_events.retain(|&id, _| !events.contains_key(&id)); desired_events.retain(|&id, _| !events.contains_key(&id));
} }
@ -157,7 +157,7 @@ impl Globals {
{ {
let ids: Vec<IdHex> = GLOBALS let ids: Vec<IdHex> = GLOBALS
.desired_events .desired_events
.lock() .read()
.await .await
.iter() .iter()
.map(|(id, _)| Into::<IdHex>::into(*id)) .map(|(id, _)| Into::<IdHex>::into(*id))
@ -184,7 +184,7 @@ impl Globals {
pub async fn get_desired_events() -> Result<(HashMap<Url, Vec<Id>>, Vec<Id>), Error> { pub async fn get_desired_events() -> Result<(HashMap<Url, Vec<Id>>, Vec<Id>), Error> {
Globals::get_desired_events_prelude().await?; Globals::get_desired_events_prelude().await?;
let desired_events = GLOBALS.desired_events.lock().await; let desired_events = GLOBALS.desired_events.read().await;
let mut output: HashMap<Url, Vec<Id>> = HashMap::new(); let mut output: HashMap<Url, Vec<Id>> = HashMap::new();
let mut orphans: Vec<Id> = Vec::new(); let mut orphans: Vec<Id> = Vec::new();
for (id, vec_url) in desired_events.iter() { for (id, vec_url) in desired_events.iter() {
@ -207,7 +207,7 @@ impl Globals {
pub async fn get_desired_events_for_url(url: Url) -> Result<Vec<Id>, Error> { pub async fn get_desired_events_for_url(url: Url) -> Result<Vec<Id>, Error> {
Globals::get_desired_events_prelude().await?; Globals::get_desired_events_prelude().await?;
let desired_events = GLOBALS.desired_events.lock().await; let desired_events = GLOBALS.desired_events.read().await;
let mut output: Vec<Id> = Vec::new(); let mut output: Vec<Id> = Vec::new();
for (id, vec_url) in desired_events.iter() { for (id, vec_url) in desired_events.iter() {
if vec_url.is_empty() || vec_url.contains(&url) { if vec_url.is_empty() || vec_url.contains(&url) {
@ -220,7 +220,7 @@ impl Globals {
pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) { pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) {
let r = (related, relationship); let r = (related, relationship);
let mut relationships = GLOBALS.relationships.lock().await; let mut relationships = GLOBALS.relationships.write().await;
relationships relationships
.entry(id) .entry(id)
.and_modify(|vec| { .and_modify(|vec| {
@ -234,7 +234,7 @@ impl Globals {
#[async_recursion] #[async_recursion]
pub async fn update_last_reply(id: Id, time: Unixtime) { pub async fn update_last_reply(id: Id, time: Unixtime) {
{ {
let mut last_reply = GLOBALS.last_reply.lock().await; let mut last_reply = GLOBALS.last_reply.write().await;
last_reply last_reply
.entry(id) .entry(id)
.and_modify(|lasttime| { .and_modify(|lasttime| {
@ -246,7 +246,7 @@ impl Globals {
} // drops lock } // drops lock
// Recurse upwards // Recurse upwards
if let Some(event) = GLOBALS.events.lock().await.get(&id).cloned() { if let Some(event) = GLOBALS.events.write().await.get(&id).cloned() {
if let Some((id, _maybe_url)) = event.replies_to() { if let Some((id, _maybe_url)) = event.replies_to() {
Self::update_last_reply(id, event.created_at).await; Self::update_last_reply(id, event.created_at).await;
} }
@ -255,7 +255,7 @@ impl Globals {
pub fn get_replies_sync(id: Id) -> Vec<Id> { pub fn get_replies_sync(id: Id) -> Vec<Id> {
let mut output: Vec<Id> = Vec::new(); let mut output: Vec<Id> = Vec::new();
if let Some(vec) = GLOBALS.relationships.blocking_lock().get(&id) { if let Some(vec) = GLOBALS.relationships.blocking_read().get(&id) {
for (id, relationship) in vec.iter() { for (id, relationship) in vec.iter() {
if *relationship == Relationship::Reply { if *relationship == Relationship::Reply {
output.push(*id); output.push(*id);
@ -271,7 +271,7 @@ impl Globals {
pub fn get_reactions_sync(id: Id) -> HashMap<char, usize> { pub fn get_reactions_sync(id: Id) -> HashMap<char, usize> {
let mut output: HashMap<char, usize> = HashMap::new(); let mut output: HashMap<char, usize> = HashMap::new();
if let Some(relationships) = GLOBALS.relationships.blocking_lock().get(&id).cloned() { if let Some(relationships) = GLOBALS.relationships.blocking_read().get(&id).cloned() {
for (_id, relationship) in relationships.iter() { for (_id, relationship) in relationships.iter() {
if let Relationship::Reaction(reaction) = relationship { if let Relationship::Reaction(reaction) = relationship {
if let Some(ch) = reaction.chars().next() { if let Some(ch) = reaction.chars().next() {
@ -294,7 +294,7 @@ impl Globals {
} }
pub async fn followed_pubkeys() -> Vec<PublicKeyHex> { pub async fn followed_pubkeys() -> Vec<PublicKeyHex> {
let people = GLOBALS.people.lock().await; let people = GLOBALS.people.read().await;
people people
.iter() .iter()
.map(|(_, p)| p) .map(|(_, p)| p)

View File

@ -35,7 +35,7 @@ fn main() -> Result<(), Error> {
// Load settings // Load settings
let settings = crate::settings::Settings::blocking_load()?; let settings = crate::settings::Settings::blocking_load()?;
*GLOBALS.settings.blocking_lock() = settings; *GLOBALS.settings.blocking_write() = settings;
// Start async code // Start async code
// We do this on a separate thread because egui is most portable by // We do this on a separate thread because egui is most portable by

View File

@ -269,7 +269,7 @@ impl Minion {
DbPersonRelay::fetch_oldest_last_fetched(&pubkeys, &self.url.0).await? as i64; DbPersonRelay::fetch_oldest_last_fetched(&pubkeys, &self.url.0).await? as i64;
let (overlap, feed_chunk) = { let (overlap, feed_chunk) = {
let settings = GLOBALS.settings.lock().await.clone(); let settings = GLOBALS.settings.read().await.clone();
(settings.overlap, settings.feed_chunk) (settings.overlap, settings.feed_chunk)
}; };

View File

@ -84,7 +84,7 @@ impl Overlord {
// new people are encountered, not batch-style on startup. // new people are encountered, not batch-style on startup.
// Create a person record for every person seen, possibly autofollow // Create a person record for every person seen, possibly autofollow
let autofollow = GLOBALS.settings.lock().await.autofollow; let autofollow = GLOBALS.settings.read().await.autofollow;
DbPerson::populate_new_people(autofollow).await?; DbPerson::populate_new_people(autofollow).await?;
// FIXME - if this needs doing, it should be done dynamically as // FIXME - if this needs doing, it should be done dynamically as
@ -100,7 +100,7 @@ impl Overlord {
for relay in all_relays.iter() { for relay in all_relays.iter() {
GLOBALS GLOBALS
.relays .relays
.lock() .write()
.await .await
.insert(Url(relay.url.clone()), relay.clone()); .insert(Url(relay.url.clone()), relay.clone());
} }
@ -110,7 +110,7 @@ impl Overlord {
let mut dbpeople = DbPerson::fetch(None).await?; let mut dbpeople = DbPerson::fetch(None).await?;
for dbperson in dbpeople.drain(..) { for dbperson in dbpeople.drain(..) {
let pubkey = PublicKey::try_from(dbperson.pubkey.clone())?; let pubkey = PublicKey::try_from(dbperson.pubkey.clone())?;
GLOBALS.people.lock().await.insert(pubkey, dbperson); GLOBALS.people.write().await.insert(pubkey, dbperson);
} }
} }
@ -134,7 +134,7 @@ impl Overlord {
// Load feed-related events from database and process (TextNote, EventDeletion, Reaction) // Load feed-related events from database and process (TextNote, EventDeletion, Reaction)
{ {
let now = Unixtime::now().unwrap(); let now = Unixtime::now().unwrap();
let feed_chunk = GLOBALS.settings.lock().await.feed_chunk; let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
let then = now.0 - feed_chunk as i64; let then = now.0 - feed_chunk as i64;
let db_events = DbEvent::fetch(Some(&format!( let db_events = DbEvent::fetch(Some(&format!(
" (kind=1 OR kind=5 OR kind=7) AND created_at > {} ORDER BY created_at ASC", " (kind=1 OR kind=5 OR kind=7) AND created_at > {} ORDER BY created_at ASC",
@ -161,8 +161,10 @@ impl Overlord {
// Pick Relays and start Minions // Pick Relays and start Minions
{ {
let pubkeys: Vec<PublicKeyHex> = crate::globals::followed_pubkeys().await; let pubkeys: Vec<PublicKeyHex> = crate::globals::followed_pubkeys().await;
let num_relays_per_person = GLOBALS.settings.lock().await.num_relays_per_person; let (num_relays_per_person, max_relays) = {
let max_relays = GLOBALS.settings.lock().await.max_relays; let settings = GLOBALS.settings.read().await;
(settings.num_relays_per_person, settings.max_relays)
};
let mut pubkey_counts: HashMap<PublicKeyHex, u8> = HashMap::new(); let mut pubkey_counts: HashMap<PublicKeyHex, u8> = HashMap::new();
for pk in pubkeys.iter() { for pk in pubkeys.iter() {
pubkey_counts.insert(pk.clone(), num_relays_per_person); pubkey_counts.insert(pk.clone(), num_relays_per_person);
@ -347,7 +349,7 @@ impl Overlord {
settings.save().await?; // to database settings.save().await?; // to database
// Update in globals // Update in globals
*GLOBALS.settings.lock().await = settings; *GLOBALS.settings.write().await = settings;
debug!("Settings saved."); debug!("Settings saved.");
} }
@ -381,7 +383,7 @@ impl Overlord {
async fn get_missing_events(&mut self) -> Result<(), Error> { async fn get_missing_events(&mut self) -> Result<(), Error> {
let (desired_events_map, desired_events_vec) = Globals::get_desired_events().await?; let (desired_events_map, desired_events_vec) = Globals::get_desired_events().await?;
let desired_count = GLOBALS.desired_events.lock().await.len(); let desired_count = GLOBALS.desired_events.read().await.len();
if desired_count == 0 { if desired_count == 0 {
return Ok(()); return Ok(());

View File

@ -50,7 +50,7 @@ pub async fn process_new_event(
// Insert the event into globals map // Insert the event into globals map
{ {
let mut events = GLOBALS.events.lock().await; let mut events = GLOBALS.events.write().await;
let _ = events.insert(event.id, event.clone()); let _ = events.insert(event.id, event.clone());
} }
@ -135,7 +135,7 @@ pub async fn process_new_event(
// We desire all ancestors // We desire all ancestors
for (id, maybe_url) in event.replies_to_ancestors() { for (id, maybe_url) in event.replies_to_ancestors() {
// Insert desired event if relevant // Insert desired event if relevant
if !GLOBALS.events.lock().await.contains_key(&id) { if !GLOBALS.events.read().await.contains_key(&id) {
Globals::store_desired_event(id, maybe_url).await; Globals::store_desired_event(id, maybe_url).await;
} }
} }
@ -153,7 +153,7 @@ pub async fn process_new_event(
} }
// Insert desired event if relevant // Insert desired event if relevant
if !GLOBALS.events.lock().await.contains_key(&id) { if !GLOBALS.events.read().await.contains_key(&id) {
Globals::store_desired_event(id, maybe_url).await; Globals::store_desired_event(id, maybe_url).await;
} }
@ -207,7 +207,7 @@ pub async fn process_new_event(
} }
{ {
let mut people = GLOBALS.people.lock().await; let mut people = GLOBALS.people.write().await;
people people
.entry(event.pubkey) .entry(event.pubkey)
.and_modify(|person| { .and_modify(|person| {

View File

@ -13,7 +13,7 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
let desired_count = { let desired_count = {
Globals::trim_desired_events_sync(); Globals::trim_desired_events_sync();
GLOBALS.desired_events.blocking_lock().len() GLOBALS.desired_events.blocking_read().len()
}; };
ui.horizontal(|ui| { ui.horizontal(|ui| {
@ -67,7 +67,7 @@ fn render_post(
id: Id, id: Id,
indent: usize, indent: usize,
) { ) {
let maybe_event = GLOBALS.events.blocking_lock().get(&id).cloned(); let maybe_event = GLOBALS.events.blocking_read().get(&id).cloned();
if maybe_event.is_none() { if maybe_event.is_none() {
return; return;
} }
@ -78,7 +78,7 @@ fn render_post(
return; return;
} }
let maybe_person = GLOBALS.people.blocking_lock().get(&event.pubkey).cloned(); let maybe_person = GLOBALS.people.blocking_read().get(&event.pubkey).cloned();
let reactions = Globals::get_reactions_sync(event.id); let reactions = Globals::get_reactions_sync(event.id);
let replies = Globals::get_replies_sync(event.id); let replies = Globals::get_replies_sync(event.id);

View File

@ -109,7 +109,7 @@ impl GossipUi {
) )
}; };
let settings = GLOBALS.settings.blocking_lock().clone(); let settings = GLOBALS.settings.blocking_read().clone();
GossipUi { GossipUi {
page: Page::Feed, page: Page::Feed,

View File

@ -105,7 +105,7 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra
ui.heading("People Followed"); ui.heading("People Followed");
ui.add_space(18.0); ui.add_space(18.0);
let people = GLOBALS.people.blocking_lock().clone(); let people = GLOBALS.people.blocking_read().clone();
ScrollArea::vertical().show(ui, |ui| { ScrollArea::vertical().show(ui, |ui| {
for (_, person) in people.iter() { for (_, person) in people.iter() {

View File

@ -9,7 +9,7 @@ pub(super) fn update(_app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::F
ui.heading("Relays known"); ui.heading("Relays known");
ui.add_space(18.0); ui.add_space(18.0);
let mut relays = GLOBALS.relays.blocking_lock().clone(); let mut relays = GLOBALS.relays.blocking_read().clone();
let mut relays: Vec<DbRelay> = relays.drain().map(|(_, relay)| relay).collect(); let mut relays: Vec<DbRelay> = relays.drain().map(|(_, relay)| relay).collect();
relays.sort_by(|a, b| a.url.cmp(&b.url)); relays.sort_by(|a, b| a.url.cmp(&b.url));