Reprocess relay lists reworked to handle kind 3 too

This commit is contained in:
Mike Dilger 2024-06-19 13:03:24 +12:00
parent a4aba05ead
commit c8ae74d85a
4 changed files with 82 additions and 62 deletions

View File

@ -25,7 +25,7 @@ impl Command {
}
}
const COMMANDS: [Command; 34] = [
const COMMANDS: [Command; 35] = [
Command {
cmd: "oneshot",
usage_params: "{depends}",
@ -176,6 +176,11 @@ const COMMANDS: [Command; 34] = [
usage_params: "",
desc: "Reprocess events that came during the last 24 hours",
},
Command {
cmd: "reprocess_relay_lists",
usage_params: "",
desc: "Reprocess relay lists (including kind 3 contents)",
},
Command {
cmd: "ungiftwrap",
usage_params: "<idhex>",
@ -251,6 +256,7 @@ pub fn handle_command(mut args: env::Args, runtime: &Runtime) -> Result<bool, Er
"rebuild_indices" => rebuild_indices()?,
"rename_person_list" => rename_person_list(command, args)?,
"reprocess_recent" => reprocess_recent(command, runtime)?,
"reprocess_relay_lists" => reprocess_relay_lists()?,
"ungiftwrap" => ungiftwrap(command, args)?,
"verify" => verify(command, args)?,
"verify_json" => verify_json(command, args)?,
@ -932,6 +938,13 @@ pub fn reprocess_recent(_cmd: Command, runtime: &Runtime) -> Result<(), Error> {
Ok(runtime.block_on(job)?)
}
pub fn reprocess_relay_lists() -> Result<(), Error> {
let (c1, c2) = gossip_lib::process::reprocess_relay_lists()?;
println!("Reprocessed {} contact lists", c1);
println!("Reprocessed {} relay lists", c2);
Ok(())
}
pub fn verify(cmd: Command, mut args: env::Args) -> Result<(), Error> {
let idstr = match args.next() {
Some(id) => id,

View File

@ -169,7 +169,7 @@ impl Overlord {
// If we need to reapply relay lists, do so now
if GLOBALS.storage.get_flag_reprocess_relay_lists_needed() {
tracing::info!("Reprocessing relay lists...");
GLOBALS.storage.reprocess_relay_lists()?;
crate::process::reprocess_relay_lists()?;
}
// Data migrations complete

View File

@ -9,8 +9,8 @@ use crate::storage::{PersonTable, Table};
use async_recursion::async_recursion;
use heed::RwTxn;
use nostr_types::{
Event, EventAddr, EventKind, EventReference, Id, Metadata, NostrBech32, PublicKey, RelayList,
RelayUrl, RelayUsage, SimpleRelayList, Tag, Unixtime,
Event, EventAddr, EventKind, EventReference, Filter, Id, Metadata, NostrBech32, PublicKey,
RelayList, RelayUrl, RelayUsage, SimpleRelayList, Tag, Unixtime,
};
use std::sync::atomic::Ordering;
@ -264,10 +264,10 @@ pub async fn process_new_event(
let (_personlist, _metadata) =
update_or_allocate_person_list_from_event(event, pubkey)?;
} else {
process_somebody_elses_contact_list(event)?;
process_somebody_elses_contact_list(event, false)?;
}
} else {
process_somebody_elses_contact_list(event)?;
process_somebody_elses_contact_list(event, false)?;
}
} else if event.kind == EventKind::MuteList || event.kind == EventKind::FollowSets {
// Only our own
@ -279,7 +279,7 @@ pub async fn process_new_event(
}
}
} else if event.kind == EventKind::RelayList {
GLOBALS.storage.process_relay_list(event, None)?;
GLOBALS.storage.process_relay_list(event, false, None)?;
// Let the seeker know we now have relays for this author, in case the seeker
// wants to update it's state
@ -441,7 +441,7 @@ pub async fn process_new_event(
Ok(())
}
pub fn process_somebody_elses_contact_list(event: &Event) -> Result<(), Error> {
fn process_somebody_elses_contact_list(event: &Event, force: bool) -> Result<(), Error> {
// We don't keep their contacts or show to the user yet.
// We only process the contents for (non-standard) relay list information.
@ -453,7 +453,7 @@ pub fn process_somebody_elses_contact_list(event: &Event) -> Result<(), Error> {
.people
.update_relay_list_stamps(event.pubkey, event.created_at.0)?;
if !newer {
if !newer && !force {
return Ok(());
}
@ -473,15 +473,56 @@ pub fn process_somebody_elses_contact_list(event: &Event) -> Result<(), Error> {
.storage
.set_relay_list(event.pubkey, relay_list, None)?;
// the following also refreshes scores before it picks relays
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::RefreshScoresAndPickRelays);
if !force {
// the following also refreshes scores before it picks relays
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::RefreshScoresAndPickRelays);
}
} else if event.content.len() > 0 {
tracing::info!("Contact list content does not parse: {}", &event.content);
}
Ok(())
}
pub fn reprocess_relay_lists() -> Result<(usize, usize), Error> {
let mut counts: (usize, usize) = (0, 0);
// Reprocess all contact lists
let mut filter = Filter::new();
filter.add_event_kind(EventKind::ContactList);
let events = GLOBALS.storage.find_events_by_filter(&filter, |_e| true)?;
for event in &events {
process_somebody_elses_contact_list(event, true)?;
}
counts.0 = events.len();
// Reprocess all relay lists
let mut filter = Filter::new();
filter.add_event_kind(EventKind::RelayList);
let mut txn = GLOBALS.storage.get_write_txn()?;
let relay_lists = GLOBALS.storage.find_events_by_filter(&filter, |_| true)?;
// Process all RelayLists
for event in relay_lists.iter() {
GLOBALS
.storage
.process_relay_list(event, true, Some(&mut txn))?;
}
counts.1 = events.len();
// Turn off the flag
GLOBALS
.storage
.set_flag_reprocess_relay_lists_needed(false, Some(&mut txn))?;
txn.commit()?;
Ok(counts)
}
/// Process relationships of an event.
/// This returns IDs that should be UI invalidated (must be redrawn)
pub(crate) fn process_relationships_of_event<'a>(

View File

@ -1135,24 +1135,27 @@ impl Storage {
pub fn process_relay_list<'a>(
&'a self,
event: &Event,
force: bool,
rw_txn: Option<&mut RwTxn<'a>>,
) -> Result<(), Error> {
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
if let Some(mut person) = PersonTable::read_record(event.pubkey, Some(txn))? {
// Check if this relay list is newer than the stamp we have for its author
if let Some(previous_at) = person.relay_list_created_at {
if event.created_at.0 <= previous_at {
// This list is old.
return Ok(());
if !force {
// Check if this relay list is newer than the stamp we have for its author
if let Some(previous_at) = person.relay_list_created_at {
if event.created_at.0 <= previous_at {
// This list is old.
return Ok(());
}
}
// If we got here, the list is new
// Mark when it was created
person.relay_list_created_at = Some(event.created_at.0);
// And save those marks in the Person record
PersonTable::write_record(&mut person, Some(txn))?;
}
// If we got here, the list is new.
// Mark when it was created
person.relay_list_created_at = Some(event.created_at.0);
// And save those marks in the Person record
PersonTable::write_record(&mut person, Some(txn))?;
}
let mut ours = false;
@ -2449,43 +2452,6 @@ impl Storage {
write_transact!(self, rw_txn, f)
}
pub fn reprocess_relay_lists(&self) -> Result<(), Error> {
let mut txn = self.env.write_txn()?;
// Clear relay_list_created_at fields in person records so that
// it will rebuild
PersonTable::filter_modify(
|_| true,
|person| {
person.relay_list_created_at = None;
},
Some(&mut txn),
)?;
// Commit this change, otherwise read_person (which takes no transaction)
// will give stale data when it is called within process_relay_list()
txn.commit()?;
let mut txn = self.env.write_txn()?;
// Load all RelayLists
let mut filter = Filter::new();
filter.add_event_kind(EventKind::RelayList);
let relay_lists = self.find_events_by_filter(&filter, |_| true)?;
// Process all RelayLists
for event in relay_lists.iter() {
self.process_relay_list(event, Some(&mut txn))?;
}
// Turn off the flag
self.set_flag_reprocess_relay_lists_needed(false, Some(&mut txn))?;
txn.commit()?;
Ok(())
}
/// Read person lists
pub fn read_person_lists(
&self,