From 8a7e35ac9b11269cc6e65d4e5acbd25fa6fb0672 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sat, 31 Dec 2022 07:37:20 +1300 Subject: [PATCH] GLOBALS.people replaced with a People Manager object that keeps memory and database in sync. [plus a lot of dead code commented out] --- src/db/contact.rs | 32 ++--- src/db/event.rs | 32 +++-- src/db/event_hashtag.rs | 43 +++--- src/db/event_relationship.rs | 50 +++---- src/db/event_seen.rs | 26 ++-- src/db/event_tag.rs | 26 ++-- src/db/person.rs | 109 ++------------ src/db/person_relay.rs | 29 ++-- src/db/relay.rs | 3 +- src/error.rs | 3 + src/globals.rs | 78 ++-------- src/main.rs | 1 + src/overlord/minion/mod.rs | 11 +- src/overlord/minion/subscription.rs | 13 +- src/overlord/mod.rs | 22 +-- src/people/mod.rs | 215 ++++++++++++++++++++++++++++ src/process.rs | 72 ++-------- src/signer.rs | 7 - src/syncer.rs | 6 + src/ui/feed.rs | 4 +- src/ui/people.rs | 4 +- 21 files changed, 412 insertions(+), 374 deletions(-) create mode 100644 src/people/mod.rs diff --git a/src/db/contact.rs b/src/db/contact.rs index 18d77f39..12fb007c 100644 --- a/src/db/contact.rs +++ b/src/db/contact.rs @@ -1,7 +1,4 @@ -use crate::error::Error; -use crate::globals::GLOBALS; use serde::{Deserialize, Serialize}; -use tokio::task::spawn_blocking; #[derive(Debug, Serialize, Deserialize)] pub struct DbContact { @@ -12,7 +9,7 @@ pub struct DbContact { } impl DbContact { - #[allow(dead_code)] + /* pub async fn fetch(criteria: Option<&str>) -> Result, Error> { let sql = "SELECT source, contact, relay, petname FROM contact".to_owned(); let sql = match criteria { @@ -44,8 +41,9 @@ impl DbContact { output } + */ - #[allow(dead_code)] + /* pub async fn insert(contact: DbContact) -> Result<(), Error> { let sql = "INSERT OR IGNORE INTO contact (source, contact, relay, petname) \ VALUES (?1, ?2, ?3, ?4)"; @@ -67,19 +65,21 @@ impl DbContact { Ok(()) } + */ - #[allow(dead_code)] - pub async fn delete(criteria: &str) -> Result<(), Error> { - let sql = format!("DELETE FROM contact WHERE {}", criteria); + /* + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM contact WHERE {}", criteria); - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - db.execute(&sql, [])?; - Ok::<(), Error>(()) - }) - .await??; + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; - Ok(()) + Ok(()) } + */ } diff --git a/src/db/event.rs b/src/db/event.rs index e03c87be..66cd5836 100644 --- a/src/db/event.rs +++ b/src/db/event.rs @@ -148,7 +148,7 @@ impl DbEvent { Ok(()) } - #[allow(dead_code)] + /* pub async fn delete(criteria: &str) -> Result<(), Error> { let sql = format!("DELETE FROM event WHERE {}", criteria); @@ -162,23 +162,25 @@ impl DbEvent { Ok(()) } + */ - #[allow(dead_code)] - pub async fn get_author(id: IdHex) -> Result, Error> { - let sql = "SELECT pubkey FROM event WHERE id=?"; + /* + pub async fn get_author(id: IdHex) -> Result, Error> { + let sql = "SELECT pubkey FROM event WHERE id=?"; - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - let mut stmt = db.prepare(sql)?; - let mut rows = stmt.query_map([id.0], |row| row.get(0))?; - if let Some(row) = rows.next() { - return Ok(Some(PublicKeyHex(row?))); - } - Ok(None) - }) - .await? + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + let mut stmt = db.prepare(sql)?; + let mut rows = stmt.query_map([id.0], |row| row.get(0))?; + if let Some(row) = rows.next() { + return Ok(Some(PublicKeyHex(row?))); + } + Ok(None) + }) + .await? } + */ } fn repeat_vars(count: usize) -> String { diff --git a/src/db/event_hashtag.rs b/src/db/event_hashtag.rs index 88938b0a..efdabf12 100644 --- a/src/db/event_hashtag.rs +++ b/src/db/event_hashtag.rs @@ -25,26 +25,27 @@ impl DbEventHashtag { Ok(()) } - #[allow(dead_code)] - pub async fn get_events_with_hashtag(hashtag: String) -> Result, Error> { - let sql = "SELECT event FROM event_hashtag WHERE hashtag=?"; - let output: Result, Error> = spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - let mut stmt = db.prepare(sql)?; - let rows = stmt.query_map([hashtag.clone()], |row| { - Ok(DbEventHashtag { - event: row.get(0)?, - hashtag: hashtag.clone(), - }) - })?; - let mut output: Vec = Vec::new(); - for row in rows { - output.push(row?); - } - Ok(output) - }) - .await?; - output + /* + pub async fn get_events_with_hashtag(hashtag: String) -> Result, Error> { + let sql = "SELECT event FROM event_hashtag WHERE hashtag=?"; + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + let mut stmt = db.prepare(sql)?; + let rows = stmt.query_map([hashtag.clone()], |row| { + Ok(DbEventHashtag { + event: row.get(0)?, + hashtag: hashtag.clone(), + }) + })?; + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + output } + */ } diff --git a/src/db/event_relationship.rs b/src/db/event_relationship.rs index b9bb519b..5cbc7a95 100644 --- a/src/db/event_relationship.rs +++ b/src/db/event_relationship.rs @@ -1,6 +1,5 @@ use crate::error::Error; use crate::globals::GLOBALS; -use nostr_types::Id; use serde::{Deserialize, Serialize}; use tokio::task::spawn_blocking; @@ -30,29 +29,30 @@ impl DbEventRelationship { Ok(()) } - #[allow(dead_code)] - pub async fn get_events_referring_to(id: Id) -> Result, Error> { - let sql = - "SELECT referring, relationship, content FROM event_relationship WHERE original=?"; - let output: Result, Error> = spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - let mut stmt = db.prepare(sql)?; - let rows = stmt.query_map([id.as_hex_string()], |row| { - Ok(DbEventRelationship { - original: id.as_hex_string(), - referring: row.get(0)?, - relationship: row.get(1)?, - content: row.get(2)?, - }) - })?; - let mut output: Vec = Vec::new(); - for row in rows { - output.push(row?); - } - Ok(output) - }) - .await?; - output + /* + pub async fn get_events_referring_to(id: Id) -> Result, Error> { + let sql = + "SELECT referring, relationship, content FROM event_relationship WHERE original=?"; + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + let mut stmt = db.prepare(sql)?; + let rows = stmt.query_map([id.as_hex_string()], |row| { + Ok(DbEventRelationship { + original: id.as_hex_string(), + referring: row.get(0)?, + relationship: row.get(1)?, + content: row.get(2)?, + }) + })?; + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + output } + */ } diff --git a/src/db/event_seen.rs b/src/db/event_seen.rs index 87272166..c7fc3e98 100644 --- a/src/db/event_seen.rs +++ b/src/db/event_seen.rs @@ -11,7 +11,7 @@ pub struct DbEventSeen { } impl DbEventSeen { - #[allow(dead_code)] + /* pub async fn fetch(criteria: Option<&str>) -> Result, Error> { let sql = "SELECT event, relay, when_seen FROM event_seen".to_owned(); let sql = match criteria { @@ -42,6 +42,7 @@ impl DbEventSeen { output } + */ pub async fn replace(event_seen: DbEventSeen) -> Result<(), Error> { let sql = "REPLACE INTO event_seen (event, relay, when_seen) \ @@ -60,18 +61,19 @@ impl DbEventSeen { Ok(()) } - #[allow(dead_code)] - pub async fn delete(criteria: &str) -> Result<(), Error> { - let sql = format!("DELETE FROM event_seen WHERE {}", criteria); + /* + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM event_seen WHERE {}", criteria); - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - db.execute(&sql, [])?; - Ok::<(), Error>(()) - }) - .await??; + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; - Ok(()) + Ok(()) } + */ } diff --git a/src/db/event_tag.rs b/src/db/event_tag.rs index 4dd48b08..59decbbf 100644 --- a/src/db/event_tag.rs +++ b/src/db/event_tag.rs @@ -15,7 +15,7 @@ pub struct DbEventTag { } impl DbEventTag { - #[allow(dead_code)] + /* pub async fn fetch(criteria: Option<&str>) -> Result, Error> { let sql = "SELECT event, seq, label, field0, field1, field2, field3 FROM event_tag".to_owned(); @@ -51,6 +51,7 @@ impl DbEventTag { output } + */ pub async fn insert(event_tag: DbEventTag) -> Result<(), Error> { let sql = @@ -78,18 +79,19 @@ impl DbEventTag { Ok(()) } - #[allow(dead_code)] - pub async fn delete(criteria: &str) -> Result<(), Error> { - let sql = format!("DELETE FROM event_tag WHERE {}", criteria); + /* + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM event_tag WHERE {}", criteria); - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - db.execute(&sql, [])?; - Ok::<(), Error>(()) - }) - .await??; + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; - Ok(()) + Ok(()) } + */ } diff --git a/src/db/person.rs b/src/db/person.rs index cfc608d1..8e520d26 100644 --- a/src/db/person.rs +++ b/src/db/person.rs @@ -1,6 +1,6 @@ use crate::error::Error; use crate::globals::GLOBALS; -use nostr_types::{Metadata, PublicKeyHex, Unixtime}; +use nostr_types::PublicKeyHex; use serde::{Deserialize, Serialize}; use tokio::task::spawn_blocking; @@ -18,20 +18,6 @@ pub struct DbPerson { } impl DbPerson { - pub fn new(pubkey: PublicKeyHex) -> DbPerson { - DbPerson { - pubkey, - name: None, - about: None, - picture: None, - dns_id: None, - dns_id_valid: 0, - dns_id_last_checked: None, - metadata_at: None, - followed: 0, - } - } - pub async fn fetch(criteria: Option<&str>) -> Result, Error> { let sql = "SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed FROM person".to_owned(); @@ -108,62 +94,6 @@ impl DbPerson { Ok(()) } - pub async fn update(person: DbPerson) -> Result<(), Error> { - let sql = - "UPDATE person SET name=?, about=?, picture=?, dns_id=?, dns_id_valid=?, dns_id_last_checked=?, metadata_at=?, followed=? WHERE pubkey=?"; - - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - - let mut stmt = db.prepare(sql)?; - stmt.execute(( - &person.name, - &person.about, - &person.picture, - &person.dns_id, - &person.dns_id_valid, - &person.dns_id_last_checked, - &person.metadata_at, - &person.followed, - &person.pubkey.0, - ))?; - Ok::<(), Error>(()) - }) - .await??; - - Ok(()) - } - - // Update metadata without clobbering anything else - pub async fn update_metadata( - pubkey: PublicKeyHex, - metadata: Metadata, - created_at: Unixtime, - ) -> Result<(), Error> { - let sql = - "UPDATE person SET name=?, about=?, picture=?, dns_id=?, metadata_at=? WHERE pubkey=?"; - - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - - let mut stmt = db.prepare(sql)?; - stmt.execute(( - &metadata.name, - &metadata.about, - &metadata.picture, - &metadata.nip05, - &created_at.0, - &pubkey.0, - ))?; - Ok::<(), Error>(()) - }) - .await??; - - Ok(()) - } - pub async fn upsert_valid_nip05( pubkey: PublicKeyHex, dns_id: String, @@ -209,32 +139,19 @@ impl DbPerson { Ok(()) } - #[allow(dead_code)] - pub async fn delete(criteria: &str) -> Result<(), Error> { - let sql = format!("DELETE FROM person WHERE {}", criteria); + /* + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM person WHERE {}", criteria); - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - db.execute(&sql, [])?; - Ok::<(), Error>(()) - }) - .await??; + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; - Ok(()) - } - - pub async fn populate_new_people() -> Result<(), Error> { - let sql = "INSERT or IGNORE INTO person (pubkey) SELECT DISTINCT pubkey FROM EVENT"; - - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - db.execute(sql, [])?; - Ok::<(), Error>(()) - }) - .await??; - - Ok(()) + Ok(()) } + */ } diff --git a/src/db/person_relay.rs b/src/db/person_relay.rs index 9846385a..1db30d40 100644 --- a/src/db/person_relay.rs +++ b/src/db/person_relay.rs @@ -17,7 +17,7 @@ pub struct DbPersonRelay { } impl DbPersonRelay { - #[allow(dead_code)] + /* pub async fn fetch(criteria: Option<&str>) -> Result, Error> { let sql = "SELECT person, relay, last_fetched, last_suggested_kind2, last_suggested_kind3, last_suggested_nip23, last_suggested_nip35, last_suggested_bytag FROM person_relay".to_owned(); let sql = match criteria { @@ -53,6 +53,7 @@ impl DbPersonRelay { output } + */ /// Fetch records matching the given public keys, ordered from highest to lowest rank pub async fn fetch_for_pubkeys(pubkeys: &[PublicKeyHex]) -> Result, Error> { @@ -101,8 +102,8 @@ impl DbPersonRelay { output } + /* /// Fetch oldest last_fetched among a set of public keys for a relay - #[allow(dead_code)] pub async fn fetch_oldest_last_fetched( pubkeys: &[PublicKeyHex], relay: &str, @@ -136,6 +137,7 @@ impl DbPersonRelay { output } + */ pub async fn insert(person_relay: DbPersonRelay) -> Result<(), Error> { let sql = "INSERT OR IGNORE INTO person_relay (person, relay, last_fetched, \ @@ -241,20 +243,21 @@ impl DbPersonRelay { Ok(()) } - #[allow(dead_code)] - pub async fn delete(criteria: &str) -> Result<(), Error> { - let sql = format!("DELETE FROM person_relay WHERE {}", criteria); + /* + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM person_relay WHERE {}", criteria); - spawn_blocking(move || { - let maybe_db = GLOBALS.db.blocking_lock(); - let db = maybe_db.as_ref().unwrap(); - db.execute(&sql, [])?; - Ok::<(), Error>(()) - }) - .await??; + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; - Ok(()) + Ok(()) } + */ } fn repeat_vars(count: usize) -> String { diff --git a/src/db/relay.rs b/src/db/relay.rs index ea594511..c156001a 100644 --- a/src/db/relay.rs +++ b/src/db/relay.rs @@ -167,7 +167,7 @@ impl DbRelay { Ok(()) } - #[allow(dead_code)] + /* pub async fn delete(criteria: &str) -> Result<(), Error> { let sql = format!("DELETE FROM relay WHERE {}", criteria); @@ -181,6 +181,7 @@ impl DbRelay { Ok(()) } + */ pub async fn populate_new_relays() -> Result<(), Error> { // Get relays from person_relay list diff --git a/src/error.rs b/src/error.rs index b1073430..2726877e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -39,6 +39,9 @@ pub enum Error { #[error("I/O Error: {0}")] Io(#[from] std::io::Error), + #[error("INTERNAL: {0}")] + Internal(String), + #[error("Invalid DNS ID (nip-05 / nip-35), should be user@domain")] InvalidDnsId, diff --git a/src/globals.rs b/src/globals.rs index 2ad40649..67c026cd 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -1,11 +1,12 @@ use crate::comms::BusMessage; -use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay}; +use crate::db::{DbEvent, DbRelay}; use crate::error::Error; use crate::feed::Feed; +use crate::people::People; use crate::relationship::Relationship; use crate::settings::Settings; use crate::signer::Signer; -use nostr_types::{Event, Id, IdHex, PublicKey, PublicKeyHex, Unixtime, Url}; +use nostr_types::{Event, Id, IdHex, Unixtime, Url}; use rusqlite::Connection; use std::collections::HashMap; use std::sync::atomic::AtomicBool; @@ -56,7 +57,8 @@ pub struct Globals { pub desired_events: RwLock>>, /// All nostr people records currently loaded into memory, keyed by pubkey - pub people: RwLock>, + //pub people: RwLock>, + pub people: RwLock, /// All nostr relay records we have pub relays: RwLock>, @@ -105,7 +107,7 @@ lazy_static! { relationships: RwLock::new(HashMap::new()), last_reply: RwLock::new(HashMap::new()), desired_events: RwLock::new(HashMap::new()), - people: RwLock::new(HashMap::new()), + people: RwLock::new(People::new()), relays: RwLock::new(HashMap::new()), shutting_down: AtomicBool::new(false), settings: RwLock::new(Settings::default()), @@ -202,7 +204,7 @@ impl Globals { Ok((output, orphans)) } - #[allow(dead_code)] + /* pub async fn get_desired_events_for_url(url: Url) -> Result, Error> { Globals::get_desired_events_prelude().await?; @@ -216,6 +218,7 @@ impl Globals { Ok(output) } + */ pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) { let r = (related, relationship); @@ -283,68 +286,3 @@ impl Globals { v } } - -pub async fn followed_pubkeys() -> Vec { - let people = GLOBALS.people.read().await; - people - .iter() - .map(|(_, p)| p) - .filter(|p| p.followed == 1) - .map(|p| p.pubkey.clone()) - .collect() -} - -#[allow(dead_code)] -pub async fn follow_key_and_relay(pubkey: String, relay: String) -> Result { - let pubkeyhex = PublicKeyHex(pubkey.clone()); - - // Validate the url - let u = Url::new(&relay); - if !u.is_valid() { - return Err(format!("Invalid url: {}", relay)); - } - - // Create or update them - let person = match DbPerson::fetch_one(pubkeyhex.clone()) - .await - .map_err(|e| format!("{}", e))? - { - Some(mut person) => { - person.followed = 1; - DbPerson::update(person.clone()) - .await - .map_err(|e| format!("{}", e))?; - person - } - None => { - let mut person = DbPerson::new(pubkeyhex.clone()); - person.followed = 1; - DbPerson::insert(person.clone()) - .await - .map_err(|e| format!("{}", e))?; - person - } - }; - - // Insert (or ignore) this relay - let dbrelay = DbRelay::new(relay.clone()).map_err(|e| format!("{}", e))?; - DbRelay::insert(dbrelay) - .await - .map_err(|e| format!("{}", e))?; - - // Insert (or ignore) this person's relay - DbPersonRelay::insert(DbPersonRelay { - person: pubkey, - relay, - ..Default::default() - }) - .await - .map_err(|e| format!("{}", e))?; - - // Tell the overlord to update the minion to watch for their events - // possibly starting a new minion if necessary. - // FIXME TODO - - // Reply to javascript with the person which will be set in the store - Ok(person) -} diff --git a/src/main.rs b/src/main.rs index 143e0157..f837b976 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ mod error; mod feed; mod globals; mod overlord; +mod people; mod process; mod relationship; mod settings; diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index 950d17e4..8c2b09eb 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -30,7 +30,6 @@ pub struct Minion { stream: Option>>>, sink: Option>, WsMessage>>, subscriptions: Subscriptions, - #[allow(dead_code)] next_events_subscription_id: u32, } @@ -356,7 +355,6 @@ impl Minion { Ok(()) } - #[allow(dead_code)] async fn get_events(&mut self, ids: Vec) -> Result<(), Error> { if ids.is_empty() { return Ok(()); @@ -388,9 +386,10 @@ impl Minion { Ok(()) } - #[allow(dead_code)] - async fn follow_event_reactions(&mut self, _ids: Vec) -> Result<(), Error> { - // Create or extend the "reactions" subscription - unimplemented!() + /* + async fn follow_event_reactions(&mut self, _ids: Vec) -> Result<(), Error> { + // Create or extend the "reactions" subscription + unimplemented!() } + */ } diff --git a/src/overlord/minion/subscription.rs b/src/overlord/minion/subscription.rs index bfc9f7ff..e54851e1 100644 --- a/src/overlord/minion/subscription.rs +++ b/src/overlord/minion/subscription.rs @@ -35,10 +35,11 @@ impl Subscriptions { } } - #[allow(dead_code)] + /* pub fn get_by_id(&self, id: &str) -> Option { self.by_id.get(id).cloned() } + */ pub fn get_handle_by_id(&self, id: &str) -> Option { for (handle, xid) in self.handle_to_id.iter() { @@ -67,10 +68,11 @@ impl Subscriptions { } } - #[allow(dead_code)] - pub fn remove_by_id(&mut self, id: &str) { - self.by_id.remove(id); + /* + pub fn remove_by_id(&mut self, id: &str) { + self.by_id.remove(id); } + */ } #[derive(Clone, Debug)] @@ -101,10 +103,11 @@ impl Subscription { self.eose = true; } - #[allow(dead_code)] + /* pub fn eose(&self) -> bool { self.eose } + */ pub fn req_message(&self) -> ClientMessage { ClientMessage::Req(SubscriptionId(self.get_id()), self.filters.clone()) diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index f4448e73..269e7c39 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -5,6 +5,7 @@ use crate::comms::BusMessage; use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay}; use crate::error::Error; use crate::globals::{Globals, GLOBALS}; +use crate::people::People; use minion::Minion; use nostr_types::{ Event, EventKind, Id, Nip05, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, Unixtime, Url, @@ -94,7 +95,7 @@ impl Overlord { // new people are encountered, not batch-style on startup. // Create a person record for every person seen - DbPerson::populate_new_people().await?; + People::populate_new_people().await?; // FIXME - if this needs doing, it should be done dynamically as // new people are encountered, not batch-style on startup. @@ -118,13 +119,7 @@ impl Overlord { let _ = GLOBALS.to_syncer.send("test".to_owned()); // Load people from the database - { - let mut dbpeople = DbPerson::fetch(None).await?; - for dbperson in dbpeople.drain(..) { - let pubkey = PublicKey::try_from(dbperson.pubkey.clone())?; - GLOBALS.people.write().await.insert(pubkey, dbperson); - } - } + GLOBALS.people.write().await.load_all_followed().await?; // Load latest metadata per person and update their metadata { @@ -172,7 +167,16 @@ impl Overlord { // Pick Relays and start Minions { - let pubkeys: Vec = crate::globals::followed_pubkeys().await; + let pubkeys: Vec = GLOBALS + .people + .read() + .await + .get_followed_pubkeys() + .await + .iter() + .map(|p| PublicKeyHex::from(*p)) + .collect(); + let (num_relays_per_person, max_relays) = { let settings = GLOBALS.settings.read().await; (settings.num_relays_per_person, settings.max_relays) diff --git a/src/people/mod.rs b/src/people/mod.rs new file mode 100644 index 00000000..c79fef74 --- /dev/null +++ b/src/people/mod.rs @@ -0,0 +1,215 @@ +use crate::db::DbPerson; +use crate::error::Error; +use crate::globals::GLOBALS; +use nostr_types::{Metadata, PublicKey, PublicKeyHex, Unixtime}; +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; +use tokio::task; + +pub struct People { + people: HashMap, + deferred_load: HashSet, +} + +impl People { + pub fn new() -> People { + People { + people: HashMap::new(), + deferred_load: HashSet::new(), + } + } + + pub async fn get_followed_pubkeys(&self) -> Vec { + let mut output: Vec = Vec::new(); + for (_, person) in self.people.iter() { + if let Ok(pubkey) = PublicKey::try_from_hex_string(&person.pubkey.0) { + output.push(pubkey); + } + } + output + } + + pub async fn create_if_missing(&mut self, pubkey: PublicKey) -> Result<(), Error> { + if self.people.contains_key(&pubkey) { + return Ok(()); + } + + // Try loading from the database + let maybe_dbperson = DbPerson::fetch_one(pubkey.into()).await?; + if let Some(dbperson) = maybe_dbperson { + // Insert into the map + self.people.insert(pubkey, dbperson); + } else { + // Create new + let dbperson = DbPerson { + pubkey: pubkey.into(), + name: None, + about: None, + picture: None, + dns_id: None, + dns_id_valid: 0, + dns_id_last_checked: None, + metadata_at: None, + followed: 0, + }; + // Insert into the map + self.people.insert(pubkey, dbperson.clone()); + // Insert into the database + DbPerson::insert(dbperson).await?; + } + + Ok(()) + } + + pub async fn update_metadata( + &mut self, + pubkey: PublicKey, + metadata: Metadata, + asof: Unixtime, + ) -> Result<(), Error> { + // Sync in from database first + self.create_if_missing(pubkey).await?; + + // Update the map + let person = self.people.get_mut(&pubkey).unwrap(); + if let Some(metadata_at) = person.metadata_at { + if asof.0 <= metadata_at { + // Old metadata. Ignore it + return Ok(()); + } + } + person.name = metadata.name; + person.about = metadata.about; + person.picture = metadata.picture; + if person.dns_id != metadata.nip05 { + person.dns_id = metadata.nip05; + person.dns_id_valid = 0; // changed, so reset to invalid + person.dns_id_last_checked = None; // we haven't checked this one yet + } + person.metadata_at = Some(asof.0); + + // Update the database + let person = person.clone(); + let pubkeyhex: PublicKeyHex = person.pubkey.clone(); + task::spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare( + "UPDATE person SET name=?, about=?, picture=?, dns_id=?, metadata_at=? WHERE pubkey=?" + )?; + stmt.execute(( + &person.name, + &person.about, + &person.picture, + &person.dns_id, + &person.metadata_at, + &pubkeyhex.0, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + pub async fn load_all_followed(&mut self) -> Result<(), Error> { + if !self.people.is_empty() { + return Err(Error::Internal( + "load_all_followed should only be called before people is otherwise used." + .to_owned(), + )); + } + + let sql = + "SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, \ + metadata_at, followed FROM person WHERE followed=1" + .to_owned(); + + let output: Result, Error> = task::spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbPerson { + pubkey: PublicKeyHex(row.get(0)?), + name: row.get(1)?, + about: row.get(2)?, + picture: row.get(3)?, + dns_id: row.get(4)?, + dns_id_valid: row.get(5)?, + dns_id_last_checked: row.get(6)?, + metadata_at: row.get(7)?, + followed: row.get(8)?, + }) + })?; + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + for person in output? { + if let Ok(pubkey) = PublicKey::try_from_hex_string(&person.pubkey.0) { + self.people.insert(pubkey, person); + } + } + + Ok(()) + } + + pub fn get(&mut self, pubkey: PublicKey) -> Option { + if self.people.contains_key(&pubkey) { + self.people.get(&pubkey).cloned() + } else { + // Not there. Maybe it's in the database. Defer and let syncer + // try to load + self.deferred_load.insert(pubkey); + let _ = GLOBALS.to_syncer.send("sync_people".to_owned()); + None + } + } + + pub fn get_all(&mut self) -> Vec { + let mut v: Vec = self.people.values().map(|p| p.to_owned()).collect(); + v.sort_by(|a, b| { + let c = a.name.cmp(&b.name); + if c == Ordering::Equal { + a.pubkey.cmp(&b.pubkey) + } else { + c + } + }); + v + } + + pub async fn sync(&mut self) -> Result<(), Error> { + for pubkey in self.deferred_load.iter() { + if !self.people.contains_key(pubkey) { + if let Some(person) = DbPerson::fetch_one((*pubkey).into()).await? { + let _ = self.people.insert(*pubkey, person); + } + } + } + self.deferred_load.clear(); + Ok(()) + } + + /// This is a 'just in case' the main code isn't keeping them in sync. + pub async fn populate_new_people() -> Result<(), Error> { + let sql = "INSERT or IGNORE INTO person (pubkey) SELECT DISTINCT pubkey FROM EVENT"; + + task::spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} diff --git a/src/process.rs b/src/process.rs index 3e98a3ff..d289937b 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,6 +1,5 @@ use crate::db::{ - DbEvent, DbEventHashtag, DbEventRelationship, DbEventSeen, DbEventTag, DbPerson, DbPersonRelay, - DbRelay, + DbEvent, DbEventHashtag, DbEventRelationship, DbEventSeen, DbEventTag, DbPersonRelay, DbRelay, }; use crate::error::Error; use crate::globals::{Globals, GLOBALS}; @@ -44,28 +43,12 @@ pub async fn process_new_event( DbEventSeen::replace(db_event_seen).await?; // Create the person if missing in the database - DbPerson::populate_new_people().await?; - - // Create the person if missing in GLOBALS.people - // FIXME - if the database has better data we should get it. - // we should fix that by making GLOBALS.people an - // object that persists on it's backend. - let _ = GLOBALS + GLOBALS .people .write() .await - .entry(event.pubkey) - .or_insert_with(|| DbPerson { - pubkey: event.pubkey.into(), - name: None, - about: None, - picture: None, - dns_id: None, - dns_id_valid: 0, - dns_id_last_checked: None, - metadata_at: None, - followed: 0, - }); + .create_if_missing(event.pubkey) + .await?; // Update person_relay.last_fetched DbPersonRelay::upsert_last_fetched( @@ -239,48 +222,13 @@ pub async fn process_new_event( // If metadata, update person if event.kind == EventKind::Metadata { let metadata: Metadata = serde_json::from_str(&event.content)?; - let metadata2 = metadata.clone(); - if from_relay { - DbPerson::update_metadata(event.pubkey.into(), metadata.clone(), event.created_at) - .await?; - } - - { - let mut people = GLOBALS.people.write().await; - people - .entry(event.pubkey) - .and_modify(|person| { - if let Some(metadata_at) = person.metadata_at { - if event.created_at.0 <= metadata_at { - // Old metadata. Ignore it - return; - } - } - - // Update the metadata - person.name = metadata.name; - person.about = metadata.about; - person.picture = metadata.picture; - if person.dns_id != metadata.nip05 { - person.dns_id = metadata.nip05; - person.dns_id_valid = 0; // changed, so reset to invalid - person.dns_id_last_checked = None; // we haven't checked this one yet - } - person.metadata_at = Some(event.created_at.0); - }) - .or_insert_with(|| { - let mut person = DbPerson::new(event.pubkey.into()); - person.name = metadata2.name; - person.about = metadata2.about; - person.picture = metadata2.picture; - person.dns_id = metadata2.nip05; - person.dns_id_valid = 0; - person.dns_id_last_checked = None; // we haven't checked this one yet - person.metadata_at = Some(event.created_at.0); - person - }); - } + GLOBALS + .people + .write() + .await + .update_metadata(event.pubkey, metadata, event.created_at) + .await?; } // FIXME: Handle EventKind::RecommendedRelay diff --git a/src/signer.rs b/src/signer.rs index c047850d..09600fd8 100644 --- a/src/signer.rs +++ b/src/signer.rs @@ -14,12 +14,10 @@ impl Default for Signer { } impl Signer { - #[allow(dead_code)] pub fn load_encrypted_private_key(&mut self, epk: EncryptedPrivateKey) { *self = Signer::Encrypted(epk); } - #[allow(dead_code)] pub fn unlock_encrypted_private_key(&mut self, pass: &str) -> Result<(), Error> { if let Signer::Encrypted(epk) = self { *self = Signer::Ready(epk.decrypt(pass)?); @@ -38,17 +36,14 @@ impl Signer { } } - #[allow(dead_code)] pub fn is_loaded(&self) -> bool { matches!(self, Signer::Encrypted(_)) || matches!(self, Signer::Ready(_)) } - #[allow(dead_code)] pub fn is_ready(&self) -> bool { matches!(self, Signer::Ready(_)) } - #[allow(dead_code)] pub fn public_key(&self) -> Option { if let Signer::Ready(pk) = self { Some(pk.public_key()) @@ -57,7 +52,6 @@ impl Signer { } } - #[allow(dead_code)] pub fn key_security(&self) -> Option { if let Signer::Ready(pk) = self { Some(pk.key_security()) @@ -66,7 +60,6 @@ impl Signer { } } - #[allow(dead_code)] pub fn sign_preevent(&self, preevent: PreEvent, pow: Option) -> Result { match self { Signer::Ready(pk) => match pow { diff --git a/src/syncer.rs b/src/syncer.rs index d67fa9d1..27ff4147 100644 --- a/src/syncer.rs +++ b/src/syncer.rs @@ -1,3 +1,4 @@ +use crate::globals::GLOBALS; use tokio::sync::mpsc; pub struct Syncer { @@ -23,6 +24,11 @@ impl Syncer { "test" => { tracing::debug!("Syncer received test message."); } + "sync_people" => { + if let Err(e) = GLOBALS.people.write().await.sync().await { + tracing::error!("Problem syncing people: {}", e); + } + } _ => { tracing::debug!("Syncer received unknown message: {}", message); } diff --git a/src/ui/feed.rs b/src/ui/feed.rs index 144c149d..0f1a2e5c 100644 --- a/src/ui/feed.rs +++ b/src/ui/feed.rs @@ -166,7 +166,7 @@ fn render_post( return; } - let maybe_person = GLOBALS.people.blocking_read().get(&event.pubkey).cloned(); + let maybe_person = GLOBALS.people.blocking_write().get(event.pubkey); let reactions = Globals::get_reactions_sync(event.id); let replies = Globals::get_replies_sync(event.id); @@ -339,7 +339,7 @@ fn render_post( } fn set_person_view(app: &mut GossipUi, pubkey: PublicKey) { - if let Some(dbperson) = GLOBALS.people.blocking_read().get(&pubkey).cloned() { + if let Some(dbperson) = GLOBALS.people.blocking_write().get(pubkey) { app.person_view_name = if let Some(name) = &dbperson.name { Some(name.to_string()) } else { diff --git a/src/ui/people.rs b/src/ui/people.rs index 766311a1..39554410 100644 --- a/src/ui/people.rs +++ b/src/ui/people.rs @@ -120,10 +120,10 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra ui.heading("People Followed"); ui.add_space(18.0); - let people = GLOBALS.people.blocking_read().clone(); + let people = GLOBALS.people.blocking_write().get_all(); ScrollArea::vertical().show(ui, |ui| { - for (_, person) in people.iter() { + for person in people.iter() { if person.followed != 1 { continue; }