diff --git a/src/comms.rs b/src/comms.rs index 178bdce2..78bbd255 100644 --- a/src/comms.rs +++ b/src/comms.rs @@ -17,6 +17,9 @@ pub enum ToOverlordMessage { ProcessIncomingEvents, PostReply(String, Vec, Id), PostTextNote(String, Vec), + PullFollowMerge, + PullFollowOverwrite, + PushFollow, SaveRelays, SaveSettings, Shutdown, @@ -36,11 +39,12 @@ pub struct ToMinionMessage { #[derive(Debug, Clone)] pub enum ToMinionPayload { + FetchEvents(Vec), + PostEvent(Box), + PullFollowing, Shutdown, SubscribeGeneralFeed, SubscribePersonFeed(PublicKeyHex), SubscribeThreadFeed(Id), TempSubscribeMetadata(PublicKeyHex), - FetchEvents(Vec), - PostEvent(Box), } diff --git a/src/db/mod.rs b/src/db/mod.rs index 99de0b2e..b7dd326a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -110,6 +110,7 @@ fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> { apply_sql!(db, version, 6, "schema6.sql"); apply_sql!(db, version, 7, "schema7.sql"); apply_sql!(db, version, 8, "schema8.sql"); + apply_sql!(db, version, 9, "schema9.sql"); tracing::info!("Database is at version {}", version); Ok(()) } diff --git a/src/db/person.rs b/src/db/person.rs index cbbd7180..81a41fc6 100644 --- a/src/db/person.rs +++ b/src/db/person.rs @@ -12,4 +12,5 @@ pub struct DbPerson { pub dns_id_last_checked: Option, pub metadata_at: Option, pub followed: u8, + pub followed_last_updated: i64, } diff --git a/src/db/schema9.sql b/src/db/schema9.sql new file mode 100644 index 00000000..e1000687 --- /dev/null +++ b/src/db/schema9.sql @@ -0,0 +1 @@ +ALTER TABLE person ADD COLUMN followed_last_updated INTEGER NOT NULL DEFAULT 0; diff --git a/src/globals.rs b/src/globals.rs index 99ab8483..b6fa7ee0 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -79,6 +79,8 @@ pub struct Globals { /// UI status message pub status_message: RwLock, + + pub pull_following_merge: AtomicBool, } lazy_static! { @@ -110,6 +112,7 @@ lazy_static! { fetcher: Fetcher::new(), failed_avatars: RwLock::new(HashSet::new()), status_message: RwLock::new("Welcome to Gossip. Status messages will appear here. Click them to dismiss them.".to_owned()), + pull_following_merge: AtomicBool::new(true), } }; } diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index b8e7a8fb..fa851cfb 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -231,6 +231,19 @@ impl Minion { pub async fn handle_message(&mut self, message: ToMinionMessage) -> Result { match message.payload { + ToMinionPayload::FetchEvents(vec) => { + self.get_events(vec).await?; + } + ToMinionPayload::PostEvent(event) => { + let msg = ClientMessage::Event(event); + let wire = serde_json::to_string(&msg)?; + let ws_sink = self.sink.as_mut().unwrap(); + ws_sink.send(WsMessage::Text(wire)).await?; + tracing::info!("Posted event to {}", &self.url); + } + ToMinionPayload::PullFollowing => { + self.pull_following().await?; + } ToMinionPayload::Shutdown => { tracing::info!("{}: Websocket listener shutting down", &self.url); return Ok(false); @@ -244,16 +257,6 @@ impl Minion { ToMinionPayload::SubscribeThreadFeed(id) => { self.subscribe_thread_feed(id).await?; } - ToMinionPayload::FetchEvents(vec) => { - self.get_events(vec).await?; - } - ToMinionPayload::PostEvent(event) => { - let msg = ClientMessage::Event(event); - let wire = serde_json::to_string(&msg)?; - let ws_sink = self.sink.as_mut().unwrap(); - ws_sink.send(WsMessage::Text(wire)).await?; - tracing::info!("Posted event to {}", &self.url); - } ToMinionPayload::TempSubscribeMetadata(pubkeyhex) => { self.temp_subscribe_metadata(pubkeyhex).await?; } @@ -641,6 +644,18 @@ impl Minion { self.subscribe(vec![filter], &handle).await } + async fn pull_following(&mut self) -> Result<(), Error> { + if let Some(pubkey) = GLOBALS.signer.read().await.public_key() { + let filter = Filter { + authors: vec![pubkey.into()], + kinds: vec![EventKind::ContactList], + ..Default::default() + }; + self.subscribe(vec![filter], "following").await?; + } + Ok(()) + } + #[allow(dead_code)] async fn subscribe(&mut self, filters: Vec, handle: &str) -> Result<(), Error> { let req_message = if self.subscriptions.has(handle) { diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 5df39275..6ce8843c 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -12,6 +12,7 @@ use nostr_types::{ }; use relay_picker::{BestRelay, RelayPicker}; use std::collections::HashMap; +use std::sync::atomic::Ordering; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::UnboundedReceiver; use tokio::{select, task}; @@ -50,9 +51,7 @@ impl Overlord { tracing::info!("Overlord signalling UI to shutdown"); - GLOBALS - .shutting_down - .store(true, std::sync::atomic::Ordering::Relaxed); + GLOBALS.shutting_down.store(true, Ordering::Relaxed); tracing::info!("Overlord signalling minions to shutdown"); @@ -442,6 +441,15 @@ impl Overlord { ToOverlordMessage::PostTextNote(content, tags) => { self.post_textnote(content, tags).await?; } + ToOverlordMessage::PullFollowMerge => { + self.pull_following(true).await?; + } + ToOverlordMessage::PullFollowOverwrite => { + self.pull_following(false).await?; + } + ToOverlordMessage::PushFollow => { + tracing::error!("Push Follow Unimplemented"); + } ToOverlordMessage::SaveRelays => { let dirty_relays: Vec = GLOBALS .relays @@ -859,4 +867,39 @@ impl Overlord { Ok(()) } + + async fn pull_following(&mut self, merge: bool) -> Result<(), Error> { + // Set globally whether we are merging or not when newer following lists + // come in. + GLOBALS.pull_following_merge.store(merge, Ordering::Relaxed); + + // Pull our list from all of the relays we post to + let relays: Vec = GLOBALS + .relays + .read() + .await + .iter() + .filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None }) + .collect(); + + for relay in relays { + // Start a minion for it, if there is none + if !self.urls_watching.contains(&Url::new(&relay.url)) { + self.start_minion(relay.url.clone()).await?; + } + + // Send it the event to pull our followers + tracing::debug!("Asking {} to pull our followers", &relay.url); + + let _ = self.to_minions.send(ToMinionMessage { + target: relay.url.clone(), + payload: ToMinionPayload::PullFollowing, + }); + } + + // When the event comes in, process will handle it with our global + // merge preference. + + Ok(()) + } } diff --git a/src/people.rs b/src/people.rs index 29d78e29..55520ce0 100644 --- a/src/people.rs +++ b/src/people.rs @@ -42,21 +42,43 @@ impl People { output } - pub async fn create_if_missing(&mut self, pubkeyhex: &PublicKeyHex) -> Result<(), Error> { - if self.people.contains_key(pubkeyhex) { + pub async fn create_all_if_missing(&mut self, pubkeys: &[PublicKeyHex]) -> Result<(), Error> { + // Collect the public keys that we don't have already (by checking in memory). + // Anything in memory surely already is on disk so we don't have to check disk. + let pubkeys: Vec<&PublicKeyHex> = pubkeys + .iter() + .filter(|pk| !self.people.contains_key(pk)) + .collect(); + + if pubkeys.is_empty() { return Ok(()); } - // Try loading from the database - let maybe_dbperson = Self::fetch_one(pubkeyhex).await?; + // Make sure all these people exist in the database + let mut sql: String = "INSERT OR IGNORE INTO person (pubkey) VALUES ".to_owned(); + sql.push_str(&"(?),".repeat(pubkeys.len())); + sql.pop(); // remove trailing comma - if let Some(dbperson) = maybe_dbperson { - // Insert into the map - self.people.insert(pubkeyhex.to_owned(), dbperson); - } else { - // Create new + let pubkey_strings: Vec = pubkeys.iter().map(|p| p.0.clone()).collect(); + + task::spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + let mut stmt = db.prepare(&sql)?; + let mut pos = 1; + for pk in pubkey_strings.iter() { + stmt.raw_bind_parameter(pos, pk)?; + pos += 1; + } + stmt.raw_execute()?; + Ok::<(), Error>(()) + }) + .await??; + + // Make matching records for them in memory + for pk in pubkeys { let dbperson = DbPerson { - pubkey: pubkeyhex.to_owned(), + pubkey: pk.to_owned(), name: None, about: None, picture: None, @@ -65,11 +87,9 @@ impl People { dns_id_last_checked: None, metadata_at: None, followed: 0, + followed_last_updated: 0, }; - // Insert into the map - self.people.insert(pubkeyhex.to_owned(), dbperson.clone()); - // Insert into the database - Self::insert(dbperson).await?; + self.people.insert(pk.to_owned(), dbperson); } Ok(()) @@ -82,7 +102,7 @@ impl People { asof: Unixtime, ) -> Result<(), Error> { // Sync in from database first - self.create_if_missing(pubkeyhex).await?; + self.create_all_if_missing(&[pubkeyhex.to_owned()]).await?; // Update the map let person = self.people.get_mut(pubkeyhex).unwrap(); @@ -178,7 +198,7 @@ impl People { let sql = "SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, \ - metadata_at, followed FROM person WHERE followed=1" + metadata_at, followed, followed_last_updated FROM person WHERE followed=1" .to_owned(); let output: Result, Error> = task::spawn_blocking(move || { @@ -197,6 +217,7 @@ impl People { dns_id_last_checked: row.get(6)?, metadata_at: row.get(7)?, followed: row.get(8)?, + followed_last_updated: row.get(9)?, }) })?; let mut output: Vec = Vec::new(); @@ -414,6 +435,85 @@ impl People { Ok(()) } + pub async fn follow_all( + &mut self, + pubkeys: &[PublicKeyHex], + merge: bool, + asof: Unixtime, + ) -> Result<(), Error> { + tracing::debug!( + "Updating following list, {} people long, merge={}", + pubkeys.len(), + merge + ); + + // Make sure they are all in the database (and memory) first. + self.create_all_if_missing(pubkeys).await?; + + // Follow in database + let sql = format!( + "UPDATE person SET followed=1, followed_last_updated=? WHERE pubkey IN ({}) and followed_last_updated = pubkeys.iter().map(|p| p.0.clone()).collect(); + + task::spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + let mut stmt = db.prepare(&sql)?; + stmt.raw_bind_parameter(1, asof.0)?; + let mut pos = 2; + for pk in pubkey_strings.iter() { + stmt.raw_bind_parameter(pos, pk)?; + pos += 1; + } + stmt.raw_bind_parameter(pos, asof.0)?; + stmt.raw_execute()?; + Ok::<(), Error>(()) + }) + .await??; + + if !merge { + // Unfollow in database + let sql = format!( + "UPDATE person SET followed=0, followed_last_updated=? WHERE pubkey NOT IN ({}) and followed_last_updated = pubkeys.iter().map(|p| p.0.clone()).collect(); + + task::spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + let mut stmt = db.prepare(&sql)?; + stmt.raw_bind_parameter(1, asof.0)?; + let mut pos = 2; + for pk in pubkey_strings.iter() { + stmt.raw_bind_parameter(pos, pk)?; + pos += 1; + } + stmt.raw_bind_parameter(pos, asof.0)?; + stmt.raw_execute()?; + Ok::<(), Error>(()) + }) + .await??; + } + + // Make sure memory matches + for (pkh, person) in self.people.iter_mut() { + if person.followed_last_updated < asof.0 { + if pubkeys.contains(pkh) { + person.followed = 1; + } else if !merge { + person.followed = 0; + } + } + } + + Ok(()) + } + pub async fn update_dns_id_last_checked( &mut self, pubkeyhex: PublicKeyHex, @@ -473,7 +573,9 @@ impl People { async fn fetch(criteria: Option<&str>) -> Result, Error> { let sql = - "SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed FROM person".to_owned(); + "SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, \ + metadata_at, followed, followed_last_updated FROM person" + .to_owned(); let sql = match criteria { None => sql, Some(crit) => format!("{} WHERE {}", sql, crit), @@ -495,6 +597,7 @@ impl People { dns_id_last_checked: row.get(6)?, metadata_at: row.get(7)?, followed: row.get(8)?, + followed_last_updated: row.get(9)?, }) })?; @@ -519,10 +622,12 @@ impl People { } } + #[allow(dead_code)] async fn insert(person: DbPerson) -> Result<(), Error> { let sql = - "INSERT OR IGNORE INTO person (pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed) \ - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"; + "INSERT OR IGNORE INTO person (pubkey, name, about, picture, dns_id, dns_id_valid, \ + dns_id_last_checked, metadata_at, followed, followed_last_updated) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"; task::spawn_blocking(move || { let maybe_db = GLOBALS.db.blocking_lock(); @@ -539,6 +644,7 @@ impl People { &person.dns_id_last_checked, &person.metadata_at, &person.followed, + &person.followed_last_updated, ))?; Ok::<(), Error>(()) }) @@ -563,3 +669,11 @@ impl People { } */ } + +fn repeat_vars(count: usize) -> String { + assert_ne!(count, 0); + let mut s = "?,".repeat(count); + // Remove trailing comma + s.pop(); + s +} diff --git a/src/process.rs b/src/process.rs index e0c6f7d9..d3dbe09c 100644 --- a/src/process.rs +++ b/src/process.rs @@ -4,7 +4,8 @@ use crate::db::{ use crate::error::Error; use crate::globals::{Globals, GLOBALS}; use crate::relationship::Relationship; -use nostr_types::{Event, EventKind, Metadata, Tag, Unixtime, Url}; +use nostr_types::{Event, EventKind, Metadata, PublicKeyHex, Tag, Unixtime, Url}; +use std::sync::atomic::Ordering; // This processes a new event, saving the results into the database // and also populating the GLOBALS maps. @@ -48,7 +49,7 @@ pub async fn process_new_event( .people .write() .await - .create_if_missing(&event.pubkey.into()) + .create_all_if_missing(&[event.pubkey.into()]) .await?; // Update person_relay.last_fetched @@ -217,11 +218,36 @@ pub async fn process_new_event( .await?; } + if event.kind == EventKind::ContactList { + // We only handle the user's own contact list currently + if let Some(pubkey) = GLOBALS.signer.read().await.public_key() { + if event.pubkey == pubkey { + let merge: bool = GLOBALS.pull_following_merge.load(Ordering::Relaxed); + let mut pubkeys: Vec = Vec::new(); + + // 'p' tags represent the author's contacts + for tag in &event.tags { + if let Tag::Pubkey { pubkey, .. } = tag { + pubkeys.push((*pubkey).into()); + // FIXME do something with recommended_relay_url and petname + } + } + + // Follow all those pubkeys, and unfollow everbody else if merge=false + // (and the date is used to ignore if the data is outdated) + GLOBALS + .people + .write() + .await + .follow_all(&pubkeys, merge, event.created_at) + .await?; + } + } + } + // FIXME: Handle EventKind::RecommendedRelay - // FIXME: Handle EventKind::ContactList - - // Save in event_is_new + // Save in event_is_new (to highlight it in the feed, if feed related) GLOBALS.event_is_new.write().await.push(event.id); Ok(()) diff --git a/src/ui/people/mod.rs b/src/ui/people/mod.rs index 861412a2..df5fcd74 100644 --- a/src/ui/people/mod.rs +++ b/src/ui/people/mod.rs @@ -1,4 +1,5 @@ use super::{GossipUi, Page}; +use crate::comms::ToOverlordMessage; use crate::db::DbPerson; use crate::globals::GLOBALS; use eframe::egui; @@ -29,7 +30,21 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra if app.page == Page::PeopleList { ui.add_space(24.0); - ui.heading("NOTICE: Gossip is not synchronizing with data on the nostr relays. This is a separate list and it won't overwrite anything."); + ui.horizontal(|ui| { + if ui.button("↓ PULL ↓\nOverwrite").clicked() { + let _ = GLOBALS + .to_overlord + .send(ToOverlordMessage::PullFollowOverwrite); + } + if ui.button("↓ PULL ↓\nMerge (Add)").clicked() { + let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PullFollowMerge); + } + /* not yet implemented + if ui.button("↑ PUSH ↑\n").clicked() { + let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PushFollow); + } + */ + }); ui.add_space(10.0); ui.separator();