Sync following list (pull part only, push tbd)

This commit is contained in:
Mike Dilger 2023-01-07 12:45:47 +13:00
parent 367df9373e
commit d96733642d
4 changed files with 93 additions and 16 deletions

View File

@ -17,6 +17,9 @@ pub enum ToOverlordMessage {
ProcessIncomingEvents,
PostReply(String, Vec<Tag>, Id),
PostTextNote(String, Vec<Tag>),
PullFollowMerge,
PullFollowOverwrite,
PushFollow,
SaveRelays,
SaveSettings,
Shutdown,
@ -36,11 +39,12 @@ pub struct ToMinionMessage {
#[derive(Debug, Clone)]
pub enum ToMinionPayload {
FetchEvents(Vec<IdHex>),
PostEvent(Box<Event>),
PullFollowing,
Shutdown,
SubscribeGeneralFeed,
SubscribePersonFeed(PublicKeyHex),
SubscribeThreadFeed(Id),
TempSubscribeMetadata(PublicKeyHex),
FetchEvents(Vec<IdHex>),
PostEvent(Box<Event>),
}

View File

@ -231,6 +231,19 @@ impl Minion {
pub async fn handle_message(&mut self, message: ToMinionMessage) -> Result<bool, Error> {
match message.payload {
ToMinionPayload::FetchEvents(vec) => {
self.get_events(vec).await?;
}
ToMinionPayload::PostEvent(event) => {
let msg = ClientMessage::Event(event);
let wire = serde_json::to_string(&msg)?;
let ws_sink = self.sink.as_mut().unwrap();
ws_sink.send(WsMessage::Text(wire)).await?;
tracing::info!("Posted event to {}", &self.url);
}
ToMinionPayload::PullFollowing => {
self.pull_following().await?;
}
ToMinionPayload::Shutdown => {
tracing::info!("{}: Websocket listener shutting down", &self.url);
return Ok(false);
@ -244,16 +257,6 @@ impl Minion {
ToMinionPayload::SubscribeThreadFeed(id) => {
self.subscribe_thread_feed(id).await?;
}
ToMinionPayload::FetchEvents(vec) => {
self.get_events(vec).await?;
}
ToMinionPayload::PostEvent(event) => {
let msg = ClientMessage::Event(event);
let wire = serde_json::to_string(&msg)?;
let ws_sink = self.sink.as_mut().unwrap();
ws_sink.send(WsMessage::Text(wire)).await?;
tracing::info!("Posted event to {}", &self.url);
}
ToMinionPayload::TempSubscribeMetadata(pubkeyhex) => {
self.temp_subscribe_metadata(pubkeyhex).await?;
}
@ -641,6 +644,18 @@ impl Minion {
self.subscribe(vec![filter], &handle).await
}
async fn pull_following(&mut self) -> Result<(), Error> {
if let Some(pubkey) = GLOBALS.signer.read().await.public_key() {
let filter = Filter {
authors: vec![pubkey.into()],
kinds: vec![EventKind::ContactList],
..Default::default()
};
self.subscribe(vec![filter], "following").await?;
}
Ok(())
}
#[allow(dead_code)]
async fn subscribe(&mut self, filters: Vec<Filter>, handle: &str) -> Result<(), Error> {
let req_message = if self.subscriptions.has(handle) {

View File

@ -12,6 +12,7 @@ use nostr_types::{
};
use relay_picker::{BestRelay, RelayPicker};
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::{select, task};
@ -50,9 +51,7 @@ impl Overlord {
tracing::info!("Overlord signalling UI to shutdown");
GLOBALS
.shutting_down
.store(true, std::sync::atomic::Ordering::Relaxed);
GLOBALS.shutting_down.store(true, Ordering::Relaxed);
tracing::info!("Overlord signalling minions to shutdown");
@ -442,6 +441,15 @@ impl Overlord {
ToOverlordMessage::PostTextNote(content, tags) => {
self.post_textnote(content, tags).await?;
}
ToOverlordMessage::PullFollowMerge => {
self.pull_following(true).await?;
}
ToOverlordMessage::PullFollowOverwrite => {
self.pull_following(false).await?;
}
ToOverlordMessage::PushFollow => {
tracing::error!("Push Follow Unimplemented");
}
ToOverlordMessage::SaveRelays => {
let dirty_relays: Vec<DbRelay> = GLOBALS
.relays
@ -859,4 +867,39 @@ impl Overlord {
Ok(())
}
async fn pull_following(&mut self, merge: bool) -> Result<(), Error> {
// Set globally whether we are merging or not when newer following lists
// come in.
GLOBALS.pull_following_merge.store(merge, Ordering::Relaxed);
// Pull our list from all of the relays we post to
let relays: Vec<DbRelay> = GLOBALS
.relays
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {
// Start a minion for it, if there is none
if !self.urls_watching.contains(&Url::new(&relay.url)) {
self.start_minion(relay.url.clone()).await?;
}
// Send it the event to pull our followers
tracing::debug!("Asking {} to pull our followers", &relay.url);
let _ = self.to_minions.send(ToMinionMessage {
target: relay.url.clone(),
payload: ToMinionPayload::PullFollowing,
});
}
// When the event comes in, process will handle it with our global
// merge preference.
Ok(())
}
}

View File

@ -1,4 +1,5 @@
use super::{GossipUi, Page};
use crate::comms::ToOverlordMessage;
use crate::db::DbPerson;
use crate::globals::GLOBALS;
use eframe::egui;
@ -29,7 +30,21 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra
if app.page == Page::PeopleList {
ui.add_space(24.0);
ui.heading("NOTICE: Gossip is not synchronizing with data on the nostr relays. This is a separate list and it won't overwrite anything.");
ui.horizontal(|ui| {
if ui.button("↓ PULL ↓\nOverwrite").clicked() {
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::PullFollowOverwrite);
}
if ui.button("↓ PULL ↓\nMerge (Add)").clicked() {
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PullFollowMerge);
}
/* not yet implemented
if ui.button("↑ PUSH ↑\n").clicked() {
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PushFollow);
}
*/
});
ui.add_space(10.0);
ui.separator();