Merge branch 'sync-followed'

This commit is contained in:
Mike Dilger 2023-01-10 18:04:30 +13:00
commit 61105e5e4e
10 changed files with 263 additions and 40 deletions

View File

@ -17,6 +17,9 @@ pub enum ToOverlordMessage {
ProcessIncomingEvents,
PostReply(String, Vec<Tag>, Id),
PostTextNote(String, Vec<Tag>),
PullFollowMerge,
PullFollowOverwrite,
PushFollow,
SaveRelays,
SaveSettings,
Shutdown,
@ -36,11 +39,12 @@ pub struct ToMinionMessage {
#[derive(Debug, Clone)]
pub enum ToMinionPayload {
FetchEvents(Vec<IdHex>),
PostEvent(Box<Event>),
PullFollowing,
Shutdown,
SubscribeGeneralFeed,
SubscribePersonFeed(PublicKeyHex),
SubscribeThreadFeed(Id),
TempSubscribeMetadata(PublicKeyHex),
FetchEvents(Vec<IdHex>),
PostEvent(Box<Event>),
}

View File

@ -110,6 +110,7 @@ fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> {
apply_sql!(db, version, 6, "schema6.sql");
apply_sql!(db, version, 7, "schema7.sql");
apply_sql!(db, version, 8, "schema8.sql");
apply_sql!(db, version, 9, "schema9.sql");
tracing::info!("Database is at version {}", version);
Ok(())
}

View File

@ -12,4 +12,5 @@ pub struct DbPerson {
pub dns_id_last_checked: Option<u64>,
pub metadata_at: Option<i64>,
pub followed: u8,
pub followed_last_updated: i64,
}

1
src/db/schema9.sql Normal file
View File

@ -0,0 +1 @@
ALTER TABLE person ADD COLUMN followed_last_updated INTEGER NOT NULL DEFAULT 0;

View File

@ -79,6 +79,8 @@ pub struct Globals {
/// UI status message
pub status_message: RwLock<String>,
pub pull_following_merge: AtomicBool,
}
lazy_static! {
@ -110,6 +112,7 @@ lazy_static! {
fetcher: Fetcher::new(),
failed_avatars: RwLock::new(HashSet::new()),
status_message: RwLock::new("Welcome to Gossip. Status messages will appear here. Click them to dismiss them.".to_owned()),
pull_following_merge: AtomicBool::new(true),
}
};
}

View File

@ -231,6 +231,19 @@ impl Minion {
pub async fn handle_message(&mut self, message: ToMinionMessage) -> Result<bool, Error> {
match message.payload {
ToMinionPayload::FetchEvents(vec) => {
self.get_events(vec).await?;
}
ToMinionPayload::PostEvent(event) => {
let msg = ClientMessage::Event(event);
let wire = serde_json::to_string(&msg)?;
let ws_sink = self.sink.as_mut().unwrap();
ws_sink.send(WsMessage::Text(wire)).await?;
tracing::info!("Posted event to {}", &self.url);
}
ToMinionPayload::PullFollowing => {
self.pull_following().await?;
}
ToMinionPayload::Shutdown => {
tracing::info!("{}: Websocket listener shutting down", &self.url);
return Ok(false);
@ -244,16 +257,6 @@ impl Minion {
ToMinionPayload::SubscribeThreadFeed(id) => {
self.subscribe_thread_feed(id).await?;
}
ToMinionPayload::FetchEvents(vec) => {
self.get_events(vec).await?;
}
ToMinionPayload::PostEvent(event) => {
let msg = ClientMessage::Event(event);
let wire = serde_json::to_string(&msg)?;
let ws_sink = self.sink.as_mut().unwrap();
ws_sink.send(WsMessage::Text(wire)).await?;
tracing::info!("Posted event to {}", &self.url);
}
ToMinionPayload::TempSubscribeMetadata(pubkeyhex) => {
self.temp_subscribe_metadata(pubkeyhex).await?;
}
@ -641,6 +644,18 @@ impl Minion {
self.subscribe(vec![filter], &handle).await
}
async fn pull_following(&mut self) -> Result<(), Error> {
if let Some(pubkey) = GLOBALS.signer.read().await.public_key() {
let filter = Filter {
authors: vec![pubkey.into()],
kinds: vec![EventKind::ContactList],
..Default::default()
};
self.subscribe(vec![filter], "following").await?;
}
Ok(())
}
#[allow(dead_code)]
async fn subscribe(&mut self, filters: Vec<Filter>, handle: &str) -> Result<(), Error> {
let req_message = if self.subscriptions.has(handle) {

View File

@ -12,6 +12,7 @@ use nostr_types::{
};
use relay_picker::{BestRelay, RelayPicker};
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::{select, task};
@ -50,9 +51,7 @@ impl Overlord {
tracing::info!("Overlord signalling UI to shutdown");
GLOBALS
.shutting_down
.store(true, std::sync::atomic::Ordering::Relaxed);
GLOBALS.shutting_down.store(true, Ordering::Relaxed);
tracing::info!("Overlord signalling minions to shutdown");
@ -442,6 +441,15 @@ impl Overlord {
ToOverlordMessage::PostTextNote(content, tags) => {
self.post_textnote(content, tags).await?;
}
ToOverlordMessage::PullFollowMerge => {
self.pull_following(true).await?;
}
ToOverlordMessage::PullFollowOverwrite => {
self.pull_following(false).await?;
}
ToOverlordMessage::PushFollow => {
tracing::error!("Push Follow Unimplemented");
}
ToOverlordMessage::SaveRelays => {
let dirty_relays: Vec<DbRelay> = GLOBALS
.relays
@ -859,4 +867,39 @@ impl Overlord {
Ok(())
}
async fn pull_following(&mut self, merge: bool) -> Result<(), Error> {
// Set globally whether we are merging or not when newer following lists
// come in.
GLOBALS.pull_following_merge.store(merge, Ordering::Relaxed);
// Pull our list from all of the relays we post to
let relays: Vec<DbRelay> = GLOBALS
.relays
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {
// Start a minion for it, if there is none
if !self.urls_watching.contains(&Url::new(&relay.url)) {
self.start_minion(relay.url.clone()).await?;
}
// Send it the event to pull our followers
tracing::debug!("Asking {} to pull our followers", &relay.url);
let _ = self.to_minions.send(ToMinionMessage {
target: relay.url.clone(),
payload: ToMinionPayload::PullFollowing,
});
}
// When the event comes in, process will handle it with our global
// merge preference.
Ok(())
}
}

View File

@ -42,21 +42,43 @@ impl People {
output
}
pub async fn create_if_missing(&mut self, pubkeyhex: &PublicKeyHex) -> Result<(), Error> {
if self.people.contains_key(pubkeyhex) {
pub async fn create_all_if_missing(&mut self, pubkeys: &[PublicKeyHex]) -> Result<(), Error> {
// Collect the public keys that we don't have already (by checking in memory).
// Anything in memory surely already is on disk so we don't have to check disk.
let pubkeys: Vec<&PublicKeyHex> = pubkeys
.iter()
.filter(|pk| !self.people.contains_key(pk))
.collect();
if pubkeys.is_empty() {
return Ok(());
}
// Try loading from the database
let maybe_dbperson = Self::fetch_one(pubkeyhex).await?;
// Make sure all these people exist in the database
let mut sql: String = "INSERT OR IGNORE INTO person (pubkey) VALUES ".to_owned();
sql.push_str(&"(?),".repeat(pubkeys.len()));
sql.pop(); // remove trailing comma
if let Some(dbperson) = maybe_dbperson {
// Insert into the map
self.people.insert(pubkeyhex.to_owned(), dbperson);
} else {
// Create new
let pubkey_strings: Vec<String> = pubkeys.iter().map(|p| p.0.clone()).collect();
task::spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let mut pos = 1;
for pk in pubkey_strings.iter() {
stmt.raw_bind_parameter(pos, pk)?;
pos += 1;
}
stmt.raw_execute()?;
Ok::<(), Error>(())
})
.await??;
// Make matching records for them in memory
for pk in pubkeys {
let dbperson = DbPerson {
pubkey: pubkeyhex.to_owned(),
pubkey: pk.to_owned(),
name: None,
about: None,
picture: None,
@ -65,11 +87,9 @@ impl People {
dns_id_last_checked: None,
metadata_at: None,
followed: 0,
followed_last_updated: 0,
};
// Insert into the map
self.people.insert(pubkeyhex.to_owned(), dbperson.clone());
// Insert into the database
Self::insert(dbperson).await?;
self.people.insert(pk.to_owned(), dbperson);
}
Ok(())
@ -82,7 +102,7 @@ impl People {
asof: Unixtime,
) -> Result<(), Error> {
// Sync in from database first
self.create_if_missing(pubkeyhex).await?;
self.create_all_if_missing(&[pubkeyhex.to_owned()]).await?;
// Update the map
let person = self.people.get_mut(pubkeyhex).unwrap();
@ -178,7 +198,7 @@ impl People {
let sql =
"SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, \
metadata_at, followed FROM person WHERE followed=1"
metadata_at, followed, followed_last_updated FROM person WHERE followed=1"
.to_owned();
let output: Result<Vec<DbPerson>, Error> = task::spawn_blocking(move || {
@ -197,6 +217,7 @@ impl People {
dns_id_last_checked: row.get(6)?,
metadata_at: row.get(7)?,
followed: row.get(8)?,
followed_last_updated: row.get(9)?,
})
})?;
let mut output: Vec<DbPerson> = Vec::new();
@ -414,6 +435,85 @@ impl People {
Ok(())
}
pub async fn follow_all(
&mut self,
pubkeys: &[PublicKeyHex],
merge: bool,
asof: Unixtime,
) -> Result<(), Error> {
tracing::debug!(
"Updating following list, {} people long, merge={}",
pubkeys.len(),
merge
);
// Make sure they are all in the database (and memory) first.
self.create_all_if_missing(pubkeys).await?;
// Follow in database
let sql = format!(
"UPDATE person SET followed=1, followed_last_updated=? WHERE pubkey IN ({}) and followed_last_updated<?",
repeat_vars(pubkeys.len())
);
let pubkey_strings: Vec<String> = pubkeys.iter().map(|p| p.0.clone()).collect();
task::spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
stmt.raw_bind_parameter(1, asof.0)?;
let mut pos = 2;
for pk in pubkey_strings.iter() {
stmt.raw_bind_parameter(pos, pk)?;
pos += 1;
}
stmt.raw_bind_parameter(pos, asof.0)?;
stmt.raw_execute()?;
Ok::<(), Error>(())
})
.await??;
if !merge {
// Unfollow in database
let sql = format!(
"UPDATE person SET followed=0, followed_last_updated=? WHERE pubkey NOT IN ({}) and followed_last_updated<?",
repeat_vars(pubkeys.len())
);
let pubkey_strings: Vec<String> = pubkeys.iter().map(|p| p.0.clone()).collect();
task::spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
stmt.raw_bind_parameter(1, asof.0)?;
let mut pos = 2;
for pk in pubkey_strings.iter() {
stmt.raw_bind_parameter(pos, pk)?;
pos += 1;
}
stmt.raw_bind_parameter(pos, asof.0)?;
stmt.raw_execute()?;
Ok::<(), Error>(())
})
.await??;
}
// Make sure memory matches
for (pkh, person) in self.people.iter_mut() {
if person.followed_last_updated < asof.0 {
if pubkeys.contains(pkh) {
person.followed = 1;
} else if !merge {
person.followed = 0;
}
}
}
Ok(())
}
pub async fn update_dns_id_last_checked(
&mut self,
pubkeyhex: PublicKeyHex,
@ -473,7 +573,9 @@ impl People {
async fn fetch(criteria: Option<&str>) -> Result<Vec<DbPerson>, Error> {
let sql =
"SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed FROM person".to_owned();
"SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, \
metadata_at, followed, followed_last_updated FROM person"
.to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
@ -495,6 +597,7 @@ impl People {
dns_id_last_checked: row.get(6)?,
metadata_at: row.get(7)?,
followed: row.get(8)?,
followed_last_updated: row.get(9)?,
})
})?;
@ -519,10 +622,12 @@ impl People {
}
}
#[allow(dead_code)]
async fn insert(person: DbPerson) -> Result<(), Error> {
let sql =
"INSERT OR IGNORE INTO person (pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)";
"INSERT OR IGNORE INTO person (pubkey, name, about, picture, dns_id, dns_id_valid, \
dns_id_last_checked, metadata_at, followed, followed_last_updated) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)";
task::spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
@ -539,6 +644,7 @@ impl People {
&person.dns_id_last_checked,
&person.metadata_at,
&person.followed,
&person.followed_last_updated,
))?;
Ok::<(), Error>(())
})
@ -563,3 +669,11 @@ impl People {
}
*/
}
fn repeat_vars(count: usize) -> String {
assert_ne!(count, 0);
let mut s = "?,".repeat(count);
// Remove trailing comma
s.pop();
s
}

View File

@ -4,7 +4,8 @@ use crate::db::{
use crate::error::Error;
use crate::globals::{Globals, GLOBALS};
use crate::relationship::Relationship;
use nostr_types::{Event, EventKind, Metadata, Tag, Unixtime, Url};
use nostr_types::{Event, EventKind, Metadata, PublicKeyHex, Tag, Unixtime, Url};
use std::sync::atomic::Ordering;
// This processes a new event, saving the results into the database
// and also populating the GLOBALS maps.
@ -48,7 +49,7 @@ pub async fn process_new_event(
.people
.write()
.await
.create_if_missing(&event.pubkey.into())
.create_all_if_missing(&[event.pubkey.into()])
.await?;
// Update person_relay.last_fetched
@ -217,11 +218,36 @@ pub async fn process_new_event(
.await?;
}
if event.kind == EventKind::ContactList {
// We only handle the user's own contact list currently
if let Some(pubkey) = GLOBALS.signer.read().await.public_key() {
if event.pubkey == pubkey {
let merge: bool = GLOBALS.pull_following_merge.load(Ordering::Relaxed);
let mut pubkeys: Vec<PublicKeyHex> = Vec::new();
// 'p' tags represent the author's contacts
for tag in &event.tags {
if let Tag::Pubkey { pubkey, .. } = tag {
pubkeys.push((*pubkey).into());
// FIXME do something with recommended_relay_url and petname
}
}
// Follow all those pubkeys, and unfollow everbody else if merge=false
// (and the date is used to ignore if the data is outdated)
GLOBALS
.people
.write()
.await
.follow_all(&pubkeys, merge, event.created_at)
.await?;
}
}
}
// FIXME: Handle EventKind::RecommendedRelay
// FIXME: Handle EventKind::ContactList
// Save in event_is_new
// Save in event_is_new (to highlight it in the feed, if feed related)
GLOBALS.event_is_new.write().await.push(event.id);
Ok(())

View File

@ -1,4 +1,5 @@
use super::{GossipUi, Page};
use crate::comms::ToOverlordMessage;
use crate::db::DbPerson;
use crate::globals::GLOBALS;
use eframe::egui;
@ -29,7 +30,21 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, _frame: &mut eframe::Fra
if app.page == Page::PeopleList {
ui.add_space(24.0);
ui.heading("NOTICE: Gossip is not synchronizing with data on the nostr relays. This is a separate list and it won't overwrite anything.");
ui.horizontal(|ui| {
if ui.button("↓ PULL ↓\nOverwrite").clicked() {
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::PullFollowOverwrite);
}
if ui.button("↓ PULL ↓\nMerge (Add)").clicked() {
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PullFollowMerge);
}
/* not yet implemented
if ui.button("↑ PUSH ↑\n").clicked() {
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::PushFollow);
}
*/
});
ui.add_space(10.0);
ui.separator();