diff --git a/src/comms.rs b/src/comms.rs index 178bdce2..78bbd255 100644 --- a/src/comms.rs +++ b/src/comms.rs @@ -17,6 +17,9 @@ pub enum ToOverlordMessage { ProcessIncomingEvents, PostReply(String, Vec, Id), PostTextNote(String, Vec), + PullFollowMerge, + PullFollowOverwrite, + PushFollow, SaveRelays, SaveSettings, Shutdown, @@ -36,11 +39,12 @@ pub struct ToMinionMessage { #[derive(Debug, Clone)] pub enum ToMinionPayload { + FetchEvents(Vec), + PostEvent(Box), + PullFollowing, Shutdown, SubscribeGeneralFeed, SubscribePersonFeed(PublicKeyHex), SubscribeThreadFeed(Id), TempSubscribeMetadata(PublicKeyHex), - FetchEvents(Vec), - PostEvent(Box), } diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index b8e7a8fb..fa851cfb 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -231,6 +231,19 @@ impl Minion { pub async fn handle_message(&mut self, message: ToMinionMessage) -> Result { match message.payload { + ToMinionPayload::FetchEvents(vec) => { + self.get_events(vec).await?; + } + ToMinionPayload::PostEvent(event) => { + let msg = ClientMessage::Event(event); + let wire = serde_json::to_string(&msg)?; + let ws_sink = self.sink.as_mut().unwrap(); + ws_sink.send(WsMessage::Text(wire)).await?; + tracing::info!("Posted event to {}", &self.url); + } + ToMinionPayload::PullFollowing => { + self.pull_following().await?; + } ToMinionPayload::Shutdown => { tracing::info!("{}: Websocket listener shutting down", &self.url); return Ok(false); @@ -244,16 +257,6 @@ impl Minion { ToMinionPayload::SubscribeThreadFeed(id) => { self.subscribe_thread_feed(id).await?; } - ToMinionPayload::FetchEvents(vec) => { - self.get_events(vec).await?; - } - ToMinionPayload::PostEvent(event) => { - let msg = ClientMessage::Event(event); - let wire = serde_json::to_string(&msg)?; - let ws_sink = self.sink.as_mut().unwrap(); - ws_sink.send(WsMessage::Text(wire)).await?; - tracing::info!("Posted event to {}", &self.url); - } ToMinionPayload::TempSubscribeMetadata(pubkeyhex) => { self.temp_subscribe_metadata(pubkeyhex).await?; } @@ -641,6 +644,18 @@ impl Minion { self.subscribe(vec![filter], &handle).await } + async fn pull_following(&mut self) -> Result<(), Error> { + if let Some(pubkey) = GLOBALS.signer.read().await.public_key() { + let filter = Filter { + authors: vec![pubkey.into()], + kinds: vec![EventKind::ContactList], + ..Default::default() + }; + self.subscribe(vec![filter], "following").await?; + } + Ok(()) + } + #[allow(dead_code)] async fn subscribe(&mut self, filters: Vec, handle: &str) -> Result<(), Error> { let req_message = if self.subscriptions.has(handle) { diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 5df39275..6ce8843c 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -12,6 +12,7 @@ use nostr_types::{ }; use relay_picker::{BestRelay, RelayPicker}; use std::collections::HashMap; +use std::sync::atomic::Ordering; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::UnboundedReceiver; use tokio::{select, task}; @@ -50,9 +51,7 @@ impl Overlord { tracing::info!("Overlord signalling UI to shutdown"); - GLOBALS - .shutting_down - .store(true, std::sync::atomic::Ordering::Relaxed); + GLOBALS.shutting_down.store(true, Ordering::Relaxed); tracing::info!("Overlord signalling minions to shutdown"); @@ -442,6 +441,15 @@ impl Overlord { ToOverlordMessage::PostTextNote(content, tags) => { self.post_textnote(content, tags).await?; } + ToOverlordMessage::PullFollowMerge => { + self.pull_following(true).await?; + } + ToOverlordMessage::PullFollowOverwrite => { + self.pull_following(false).await?; + } + ToOverlordMessage::PushFollow => { + tracing::error!("Push Follow Unimplemented"); + } ToOverlordMessage::SaveRelays => { let dirty_relays: Vec = GLOBALS .relays @@ -859,4 +867,39 @@ impl Overlord { Ok(()) } + + async fn pull_following(&mut self, merge: bool) -> Result<(), Error> { + // Set globally whether we are merging or not when newer following lists + // come in. + GLOBALS.pull_following_merge.store(merge, Ordering::Relaxed); + + // Pull our list from all of the relays we post to + let relays: Vec = GLOBALS + .relays + .read() + .await + .iter() + .filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None }) + .collect(); + + for relay in relays { + // Start a minion for it, if there is none + if !self.urls_watching.contains(&Url::new(&relay.url)) { + self.start_minion(relay.url.clone()).await?; + } + + // Send it the event to pull our followers + tracing::debug!("Asking {} to pull our followers", &relay.url); + + let _ = self.to_minions.send(ToMinionMessage { + target: relay.url.clone(), + payload: ToMinionPayload::PullFollowing, + }); + } + + // When the event comes in, process will handle it with our global + // merge preference. + + Ok(()) + } } diff --git a/src/ui/people/mod.rs b/src/ui/people/mod.rs index 861412a2..df5fcd74 100644 --- a/src/ui/people/mod.rs +++ b/src/ui/people/mod.rs @@ -1,4 +1,5 @@ use super::{GossipUi, Page}; +use crate::comms::ToOverlordMessage; use crate::db::DbPerson; use crate::globals::GLOBALS; use eframe::egui; @@ -29,7 +30,21 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra if app.page == Page::PeopleList { ui.add_space(24.0); - ui.heading("NOTICE: Gossip is not synchronizing with data on the nostr relays. This is a separate list and it won't overwrite anything."); + ui.horizontal(|ui| { + if ui.button("↓ PULL ↓\nOverwrite").clicked() { + let _ = GLOBALS + .to_overlord + .send(ToOverlordMessage::PullFollowOverwrite); + } + if ui.button("↓ PULL ↓\nMerge (Add)").clicked() { + let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PullFollowMerge); + } + /* not yet implemented + if ui.button("↑ PUSH ↑\n").clicked() { + let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PushFollow); + } + */ + }); ui.add_space(10.0); ui.separator();