Use dashmap in People, no longer need RwLock around it

This commit is contained in:
Mike Dilger 2023-01-13 07:16:15 +13:00
parent bb25c671d4
commit 0782b21c06
11 changed files with 54 additions and 107 deletions

View File

@ -159,7 +159,7 @@ impl Feed {
.filter(|e| e.kind == EventKind::TextNote)
.collect();
let mut pubkeys = GLOBALS.people.blocking_read().get_followed_pubkeys();
let mut pubkeys = GLOBALS.people.get_followed_pubkeys();
if let Some(pubkey) = GLOBALS.signer.blocking_read().public_key() {
pubkeys.push(pubkey.into()); // add the user
}

View File

@ -47,7 +47,7 @@ pub struct Globals {
pub desired_events: RwLock<HashMap<Id, Vec<Url>>>,
/// All nostr people records currently loaded into memory, keyed by pubkey
pub people: RwLock<People>,
pub people: People,
/// All nostr relay records we have
pub relays: RwLock<HashMap<Url, DbRelay>>,
@ -104,7 +104,7 @@ lazy_static! {
incoming_events: RwLock::new(Vec::new()),
relationships: RwLock::new(HashMap::new()),
desired_events: RwLock::new(HashMap::new()),
people: RwLock::new(People::new()),
people: People::new(),
relays: RwLock::new(HashMap::new()),
relays_watching: RwLock::new(Vec::new()),
shutting_down: AtomicBool::new(false),

View File

@ -12,8 +12,6 @@ pub async fn validate_nip05(person: DbPerson) -> Result<(), Error> {
if person.dns_id.is_none() {
GLOBALS
.people
.write()
.await
.upsert_nip05_validity(&person.pubkey, person.dns_id, false, now.0 as u64)
.await?;
return Ok(());
@ -26,8 +24,6 @@ pub async fn validate_nip05(person: DbPerson) -> Result<(), Error> {
Err(_) => {
GLOBALS
.people
.write()
.await
.upsert_nip05_validity(&person.pubkey, person.dns_id, false, now.0 as u64)
.await?;
return Ok(());
@ -50,8 +46,6 @@ pub async fn validate_nip05(person: DbPerson) -> Result<(), Error> {
// Validated
GLOBALS
.people
.write()
.await
.upsert_nip05_validity(&person.pubkey, person.dns_id, true, now.0 as u64)
.await?;
}
@ -60,8 +54,6 @@ pub async fn validate_nip05(person: DbPerson) -> Result<(), Error> {
// Failed
GLOBALS
.people
.write()
.await
.upsert_nip05_validity(&person.pubkey, person.dns_id, false, now.0 as u64)
.await?;
}
@ -86,8 +78,6 @@ pub async fn get_and_follow_nip05(dns_id: String) -> Result<(), Error> {
// Save person
GLOBALS
.people
.write()
.await
.upsert_nip05_validity(
pubkey,
Some(dns_id.clone()),
@ -97,12 +87,7 @@ pub async fn get_and_follow_nip05(dns_id: String) -> Result<(), Error> {
.await?;
// Mark as followed
GLOBALS
.people
.write()
.await
.async_follow(pubkey, true)
.await?;
GLOBALS.people.async_follow(pubkey, true).await?;
tracing::info!("Followed {}", &dns_id);

View File

@ -282,7 +282,7 @@ impl Minion {
)
};
let followed_pubkeys = GLOBALS.people.read().await.get_followed_pubkeys();
let followed_pubkeys = GLOBALS.people.get_followed_pubkeys();
tracing::debug!(
"Following {} people at {}",
followed_pubkeys.len(),

View File

@ -100,7 +100,7 @@ impl Overlord {
}
// Load people from the database
GLOBALS.people.write().await.load_all_followed().await?;
GLOBALS.people.load_all_followed().await?;
// Load latest metadata per person and update their metadata
// This can happen in the background
@ -158,8 +158,6 @@ impl Overlord {
if !GLOBALS.settings.read().await.offline {
let pubkeys: Vec<PublicKeyHex> = GLOBALS
.people
.read()
.await
.get_followed_pubkeys()
.iter()
.map(|p| p.to_owned())
@ -578,12 +576,7 @@ impl Overlord {
async fn follow_bech32(bech32: String, relay: String) -> Result<(), Error> {
let pk = PublicKey::try_from_bech32_string(&bech32)?;
let pkhex: PublicKeyHex = pk.into();
GLOBALS
.people
.write()
.await
.async_follow(&pkhex, true)
.await?;
GLOBALS.people.async_follow(&pkhex, true).await?;
tracing::debug!("Followed {}", &pkhex);
@ -611,12 +604,7 @@ impl Overlord {
async fn follow_hexkey(hexkey: String, relay: String) -> Result<(), Error> {
let pk = PublicKey::try_from_hex_string(&hexkey)?;
let pkhex: PublicKeyHex = pk.into();
GLOBALS
.people
.write()
.await
.async_follow(&pkhex, true)
.await?;
GLOBALS.people.async_follow(&pkhex, true).await?;
tracing::debug!("Followed {}", &pkhex);

View File

@ -1,32 +1,32 @@
use crate::db::DbPerson;
use crate::error::Error;
use crate::globals::GLOBALS;
use dashmap::{DashMap, DashSet};
use image::RgbaImage;
use nostr_types::{Metadata, PublicKey, PublicKeyHex, Unixtime, Url};
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use tokio::task;
pub struct People {
people: HashMap<PublicKeyHex, DbPerson>,
people: DashMap<PublicKeyHex, DbPerson>,
// We fetch (with Fetcher), process, and temporarily hold avatars
// until the UI next asks for them, at which point we remove them
// and hand them over. This way we can do the work that takes
// longer and the UI can do as little work as possible.
avatars_temp: HashMap<PublicKeyHex, RgbaImage>,
avatars_pending_processing: HashSet<PublicKeyHex>,
avatars_failed: HashSet<PublicKeyHex>,
avatars_temp: DashMap<PublicKeyHex, RgbaImage>,
avatars_pending_processing: DashSet<PublicKeyHex>,
avatars_failed: DashSet<PublicKeyHex>,
}
impl People {
pub fn new() -> People {
People {
people: HashMap::new(),
avatars_temp: HashMap::new(),
avatars_pending_processing: HashSet::new(),
avatars_failed: HashSet::new(),
people: DashMap::new(),
avatars_temp: DashMap::new(),
avatars_pending_processing: DashSet::new(),
avatars_failed: DashSet::new(),
}
}
@ -35,14 +35,14 @@ impl People {
for person in self
.people
.iter()
.filter_map(|(_, p)| if p.followed == 1 { Some(p) } else { None })
.filter_map(|p| if p.followed == 1 { Some(p) } else { None })
{
output.push(person.pubkey.clone());
}
output
}
pub async fn create_all_if_missing(&mut self, pubkeys: &[PublicKeyHex]) -> Result<(), Error> {
pub async fn create_all_if_missing(&self, pubkeys: &[PublicKeyHex]) -> Result<(), Error> {
// Collect the public keys that we don't have already (by checking in memory).
let pubkeys: Vec<&PublicKeyHex> = pubkeys
.iter()
@ -86,7 +86,7 @@ impl People {
}
pub async fn update_metadata(
&mut self,
&self,
pubkeyhex: &PublicKeyHex,
metadata: Metadata,
asof: Unixtime,
@ -95,7 +95,7 @@ impl People {
self.create_all_if_missing(&[pubkeyhex.to_owned()]).await?;
// Update the map
let person = self.people.get_mut(pubkeyhex).unwrap();
let mut person = self.people.get_mut(pubkeyhex).unwrap();
// Determine whether to update it
let mut doit = person.metadata_at.is_none();
@ -181,7 +181,7 @@ impl People {
Ok(())
}
pub async fn load_all_followed(&mut self) -> Result<(), Error> {
pub async fn load_all_followed(&self) -> Result<(), Error> {
if !self.people.is_empty() {
return Err(Error::Internal(
"load_all_followed should only be called before people is otherwise used."
@ -228,19 +228,18 @@ impl People {
Ok(())
}
pub fn get(&mut self, pubkeyhex: &PublicKeyHex) -> Option<DbPerson> {
pub fn get(&self, pubkeyhex: &PublicKeyHex) -> Option<DbPerson> {
if self.people.contains_key(pubkeyhex) {
self.people.get(pubkeyhex).cloned()
self.people.get(pubkeyhex).map(|o| o.value().to_owned())
} else {
// We can't get it now, but we can setup a task to do it soon
let pubkeyhex = pubkeyhex.to_owned();
tokio::spawn(async move {
let mut people = GLOBALS.people.write().await;
#[allow(clippy::map_entry)]
if !people.people.contains_key(&pubkeyhex) {
if !GLOBALS.people.people.contains_key(&pubkeyhex) {
match People::fetch_one(&pubkeyhex).await {
Ok(Some(person)) => {
let _ = people.people.insert(pubkeyhex, person);
let _ = GLOBALS.people.people.insert(pubkeyhex, person);
}
Err(e) => tracing::error!("{}", e),
_ => {}
@ -252,7 +251,7 @@ impl People {
}
pub fn get_all(&self) -> Vec<DbPerson> {
let mut v: Vec<DbPerson> = self.people.values().map(|p| p.to_owned()).collect();
let mut v: Vec<DbPerson> = self.people.iter().map(|e| e.value().to_owned()).collect();
v.sort_by(|a, b| {
let c = a.name.cmp(&b.name);
if c == Ordering::Equal {
@ -265,10 +264,10 @@ impl People {
}
// If returns Err, means you're never going to get it so stop trying.
pub fn get_avatar(&mut self, pubkeyhex: &PublicKeyHex) -> Result<Option<image::RgbaImage>, ()> {
pub fn get_avatar(&self, pubkeyhex: &PublicKeyHex) -> Result<Option<image::RgbaImage>, ()> {
// If we have it, hand it over (we won't need a copy anymore)
if let Some(th) = self.avatars_temp.remove(pubkeyhex) {
return Ok(Some(th));
return Ok(Some(th.1));
}
// If it failed before, error out now
@ -313,12 +312,7 @@ impl People {
// DynamicImage
Ok(di) => di,
Err(_) => {
let _ = GLOBALS
.people
.write()
.await
.avatars_failed
.insert(apubkeyhex.clone());
let _ = GLOBALS.people.avatars_failed.insert(apubkeyhex.clone());
return;
}
};
@ -329,12 +323,7 @@ impl People {
); // DynamicImage
let image_buffer = image.into_rgba8(); // RgbaImage (ImageBuffer)
GLOBALS
.people
.write()
.await
.avatars_temp
.insert(apubkeyhex, image_buffer);
GLOBALS.people.avatars_temp.insert(apubkeyhex, image_buffer);
});
self.avatars_pending_processing.insert(pubkeyhex.to_owned());
Ok(None)
@ -356,7 +345,7 @@ impl People {
}
self.people
.iter()
.filter_map(|(_, person)| {
.filter_map(|person| {
if let Some(name) = &person.name {
if name.starts_with(prefix) {
let pubkey = PublicKey::try_from_hex_string(&person.pubkey).unwrap(); // FIXME
@ -384,22 +373,17 @@ impl People {
Ok(())
}
pub fn follow(&mut self, pubkeyhex: &PublicKeyHex, follow: bool) {
pub fn follow(&self, pubkeyhex: &PublicKeyHex, follow: bool) {
// We can't do it now, but we spawn a task to do it soon
let pubkeyhex = pubkeyhex.to_owned();
tokio::spawn(async move {
let mut people = GLOBALS.people.write().await;
if let Err(e) = people.async_follow(&pubkeyhex, follow).await {
if let Err(e) = GLOBALS.people.async_follow(&pubkeyhex, follow).await {
tracing::error!("{}", e);
}
});
}
pub async fn async_follow(
&mut self,
pubkeyhex: &PublicKeyHex,
follow: bool,
) -> Result<(), Error> {
pub async fn async_follow(&self, pubkeyhex: &PublicKeyHex, follow: bool) -> Result<(), Error> {
let f: u8 = u8::from(follow);
// Follow in database
@ -416,7 +400,7 @@ impl People {
.await??;
// Make sure memory matches
if let Some(dbperson) = self.people.get_mut(pubkeyhex) {
if let Some(mut dbperson) = self.people.get_mut(pubkeyhex) {
dbperson.followed = f;
} else {
// load
@ -429,7 +413,7 @@ impl People {
}
pub async fn follow_all(
&mut self,
&self,
pubkeys: &[PublicKeyHex],
merge: bool,
asof: Unixtime,
@ -494,9 +478,11 @@ impl People {
}
// Make sure memory matches
for (pkh, person) in self.people.iter_mut() {
for mut elem in self.people.iter_mut() {
let pkh = elem.key().clone();
let mut person = elem.value_mut();
if person.followed_last_updated < asof.0 {
if pubkeys.contains(pkh) {
if pubkeys.contains(&pkh) {
person.followed = 1;
} else if !merge {
person.followed = 0;
@ -507,10 +493,7 @@ impl People {
Ok(())
}
pub async fn update_dns_id_last_checked(
&mut self,
pubkeyhex: PublicKeyHex,
) -> Result<(), Error> {
pub async fn update_dns_id_last_checked(&self, pubkeyhex: PublicKeyHex) -> Result<(), Error> {
let maybe_db = GLOBALS.db.lock().await;
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare("UPDATE person SET dns_id_last_checked=? WHERE pubkey=?")?;
@ -520,14 +503,14 @@ impl People {
}
pub async fn upsert_nip05_validity(
&mut self,
&self,
pubkeyhex: &PublicKeyHex,
dns_id: Option<String>,
dns_id_valid: bool,
dns_id_last_checked: u64,
) -> Result<(), Error> {
// Update memory
if let Some(dbperson) = self.people.get_mut(pubkeyhex) {
if let Some(mut dbperson) = self.people.get_mut(pubkeyhex) {
dbperson.dns_id = dns_id.clone();
dbperson.dns_id_valid = u8::from(dns_id_valid);
dbperson.dns_id_last_checked = Some(dns_id_last_checked);

View File

@ -47,8 +47,6 @@ pub async fn process_new_event(
// Create the person if missing in the database
GLOBALS
.people
.write()
.await
.create_all_if_missing(&[event.pubkey.into()])
.await?;
@ -212,8 +210,6 @@ pub async fn process_new_event(
GLOBALS
.people
.write()
.await
.update_metadata(&event.pubkey.into(), metadata, event.created_at)
.await?;
}
@ -237,8 +233,6 @@ pub async fn process_new_event(
// (and the date is used to ignore if the data is outdated)
GLOBALS
.people
.write()
.await
.follow_all(&pubkeys, merge, event.created_at)
.await?;
}

View File

@ -223,10 +223,7 @@ fn real_posting_area(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
.hint_text("@username"),
);
if !app.tag_someone.is_empty() {
let pairs = GLOBALS
.people
.blocking_read()
.get_ids_from_prefix(&app.tag_someone);
let pairs = GLOBALS.people.get_ids_from_prefix(&app.tag_someone);
if !pairs.is_empty() {
ui.menu_button("@", |ui| {
for pair in pairs {
@ -259,7 +256,7 @@ fn real_posting_area(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
for (i, tag) in app.draft_tags.iter().enumerate() {
let rendered = match tag {
Tag::Pubkey { pubkey, .. } => {
if let Some(person) = GLOBALS.people.blocking_write().get(&(*pubkey).into()) {
if let Some(person) = GLOBALS.people.get(&(*pubkey).into()) {
match person.name {
Some(name) => name,
None => GossipUi::pubkey_long(pubkey),
@ -405,7 +402,7 @@ fn render_post_actual(
return;
}
let maybe_person = GLOBALS.people.blocking_write().get(&event.pubkey.into());
let maybe_person = GLOBALS.people.get(&event.pubkey.into());
let reactions = Globals::get_reactions_sync(event.id);
@ -649,7 +646,7 @@ fn render_content(app: &mut GossipUi, ui: &mut Ui, tag_re: &regex::Regex, event:
match tag {
Tag::Pubkey { pubkey, .. } => {
let pkhex: PublicKeyHex = (*pubkey).into();
let nam = match GLOBALS.people.blocking_write().get(&pkhex) {
let nam = match GLOBALS.people.get(&pkhex) {
Some(p) => match p.name {
Some(n) => format!("@{}", n),
None => format!("@{}", GossipUi::hex_pubkey_short(&pkhex)),

View File

@ -354,7 +354,7 @@ impl GossipUi {
return Some(th.to_owned());
}
match GLOBALS.people.blocking_write().get_avatar(pubkeyhex) {
match GLOBALS.people.get_avatar(pubkeyhex) {
Err(_) => {
GLOBALS
.failed_avatars

View File

@ -10,7 +10,7 @@ mod person;
pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Frame, ui: &mut Ui) {
let maybe_person = if let Some(pubkeyhex) = &app.person_view_pubkey {
GLOBALS.people.blocking_write().get(pubkeyhex)
GLOBALS.people.get(pubkeyhex)
} else {
None
};
@ -53,7 +53,7 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra
ui.heading("People Followed");
ui.add_space(18.0);
let people = GLOBALS.people.blocking_write().get_all();
let people = GLOBALS.people.get_all();
ScrollArea::vertical().show(ui, |ui| {
for person in people.iter() {

View File

@ -7,7 +7,7 @@ use egui::{Context, RichText, Ui, Vec2};
pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Frame, ui: &mut Ui) {
let maybe_person = if let Some(pubkeyhex) = &app.person_view_pubkey {
GLOBALS.people.blocking_write().get(pubkeyhex)
GLOBALS.people.get(pubkeyhex)
} else {
None
};
@ -48,11 +48,11 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra
#[allow(clippy::collapsible_else_if)]
if person.followed == 0 {
if ui.button("FOLLOW").clicked() {
GLOBALS.people.blocking_write().follow(&pubkeyhex, true);
GLOBALS.people.follow(&pubkeyhex, true);
}
} else {
if ui.button("UNFOLLOW").clicked() {
GLOBALS.people.blocking_write().follow(&pubkeyhex, false);
GLOBALS.people.follow(&pubkeyhex, false);
}
}