storage: Migrate 7->8: Populate missing last_fetched data

This commit is contained in:
Mike Dilger 2023-09-24 14:45:31 +13:00
parent f2dded420e
commit 72c828f10a

View File

@ -1,4 +1,4 @@
use super::types::{Person2, Settings1, Settings2};
use super::types::{Person2, PersonRelay1, Settings1, Settings2};
use super::Storage;
use crate::error::{Error, ErrorKind};
use crate::people::PersonList;
@ -7,7 +7,7 @@ use nostr_types::{Event, Id, RelayUrl, Signature};
use speedy::{Readable, Writable};
impl Storage {
const MAX_MIGRATION_LEVEL: u32 = 7;
const MAX_MIGRATION_LEVEL: u32 = 8;
pub(super) fn migrate(&self, mut level: u32) -> Result<(), Error> {
if level > Self::MAX_MIGRATION_LEVEL {
@ -92,6 +92,10 @@ impl Storage {
tracing::info!("{prefix}: migrating person records...");
self.migrate_people(txn)?;
}
7 => {
tracing::info!("{prefix}: populating missing last_fetched data...");
self.populate_last_fetched(txn)?;
}
_ => panic!("Unreachable migration level"),
};
@ -423,4 +427,55 @@ impl Storage {
Ok(())
}
pub fn populate_last_fetched<'a>(&'a self, txn: &mut RwTxn<'a>) -> Result<(), Error> {
let total = self.get_event_seen_on_relay_len()?;
let mut count = 0;
// Since we failed to properly collect person_relay.last_fetched, we will
// use seen_on data to reconstruct it
let loop_txn = self.env.read_txn()?;
for result in self.db_event_seen_on_relay1()?.iter(&loop_txn)? {
let (key, val) = result?;
// Extract out the data
let id = Id(key[..32].try_into().unwrap());
let url = match RelayUrl::try_from_str(std::str::from_utf8(&key[32..])?) {
Ok(url) => url,
Err(_) => continue, // skip if relay url is bad. We will prune these in the future.
};
let time = u64::from_be_bytes(val[..8].try_into()?);
// Read event to get the person
if let Some(event) = self.read_event(id)? {
// Read (or create) person_relay
let (mut pr, update) = match self.read_person_relay(event.pubkey, &url)? {
Some(pr) => match pr.last_fetched {
Some(lf) => (pr, lf < time),
None => (pr, true),
},
None => {
let pr = PersonRelay1::new(event.pubkey, url.clone());
(pr, true)
}
};
if update {
pr.last_fetched = Some(time);
self.write_person_relay(&pr, Some(txn))?;
}
}
count += 1;
for checkpoint in &[10, 20, 30, 40, 50, 60, 70, 80, 90] {
if count == checkpoint * total / 100 {
tracing::info!("{}% done", checkpoint);
}
}
}
Ok(())
}
}