lmdb: Switch to lmdb relationships map [plus code adjustments to make migration work]

This commit is contained in:
Mike Dilger 2023-07-23 19:48:41 +12:00
parent 4209062517
commit edc2ef490f
8 changed files with 165 additions and 189 deletions

View File

@ -5,7 +5,6 @@ use crate::feed::Feed;
use crate::fetcher::Fetcher;
use crate::media::Media;
use crate::people::{DbPerson, People};
use crate::relationship::Relationship;
use crate::relay_picker_hooks::Hooks;
use crate::settings::Settings;
use crate::signer::Signer;
@ -14,13 +13,12 @@ use crate::storage::Storage;
use dashmap::DashMap;
use gossip_relay_picker::RelayPicker;
use nostr_types::{
Event, Id, MilliSatoshi, PayRequestData, Profile, PublicKey, PublicKeyHex, RelayUrl,
UncheckedUrl,
Event, Id, PayRequestData, Profile, PublicKey, PublicKeyHex, RelayUrl, UncheckedUrl,
};
use parking_lot::RwLock as PRwLock;
use regex::Regex;
use rusqlite::Connection;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize};
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
@ -54,9 +52,6 @@ pub struct Globals {
/// and stolen away when the Overlord is created.
pub tmp_overlord_receiver: Mutex<Option<mpsc::UnboundedReceiver<ToOverlordMessage>>>,
/// All relationships between events
pub relationships: RwLock<HashMap<Id, Vec<(Id, Relationship)>>>,
/// All nostr people records currently loaded into memory, keyed by pubkey
pub people: People,
@ -145,7 +140,6 @@ lazy_static! {
to_minions,
to_overlord,
tmp_overlord_receiver: Mutex::new(Some(tmp_overlord_receiver)),
relationships: RwLock::new(HashMap::new()),
people: People::new(),
connected_relays: DashMap::new(),
relay_picker: Default::default(),
@ -175,99 +169,6 @@ lazy_static! {
}
impl Globals {
pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) {
let r = (related, relationship);
let mut relationships = GLOBALS.relationships.write().await;
relationships
.entry(id)
.and_modify(|vec| {
if !vec.contains(&r) {
vec.push(r.clone());
}
})
.or_insert_with(|| vec![r]);
}
pub fn get_replies_sync(id: Id) -> Vec<Id> {
let mut output: Vec<Id> = Vec::new();
if let Some(vec) = GLOBALS.relationships.blocking_read().get(&id) {
for (id, relationship) in vec.iter() {
if *relationship == Relationship::Reply {
output.push(*id);
}
}
}
output
}
// FIXME - this allows people to react many times to the same event, and
// it counts them all!
/// Returns the list of reactions and whether or not this account has already reacted to this event
pub fn get_reactions_sync(id: Id) -> (Vec<(char, usize)>, bool) {
let mut output: HashMap<char, HashSet<PublicKeyHex>> = HashMap::new();
// Whether or not the Gossip user already reacted to this event
let mut self_already_reacted = false;
if let Some(relationships) = GLOBALS.relationships.blocking_read().get(&id) {
for (other_id, relationship) in relationships.iter() {
// get the reacting event to make sure publickeys are unique
if let Ok(Some(e)) = GLOBALS.storage.read_event(*other_id) {
if let Relationship::Reaction(reaction) = relationship {
if Some(e.pubkey) == GLOBALS.signer.public_key() {
self_already_reacted = true;
}
let symbol: char = if let Some(ch) = reaction.chars().next() {
ch
} else {
'+'
};
output
.entry(symbol)
.and_modify(|pubkeys| {
let _ = pubkeys.insert(e.pubkey.into());
})
.or_insert_with(|| {
let mut set = HashSet::new();
set.insert(e.pubkey.into());
set
});
}
}
}
}
let mut v: Vec<(char, usize)> = output.iter().map(|(c, u)| (*c, u.len())).collect();
v.sort();
(v, self_already_reacted)
}
pub fn get_zap_total_sync(id: Id) -> MilliSatoshi {
let mut total = MilliSatoshi(0);
if let Some(relationships) = GLOBALS.relationships.blocking_read().get(&id) {
for (_other_id, relationship) in relationships.iter() {
if let Relationship::ZapReceipt(millisats) = relationship {
total = total + *millisats;
}
}
}
total
}
pub fn get_deletion_sync(id: Id) -> Option<String> {
if let Some(relationships) = GLOBALS.relationships.blocking_read().get(&id) {
for (_id, relationship) in relationships.iter() {
if let Relationship::Deletion(deletion) = relationship {
return Some(deletion.clone());
}
}
}
None
}
pub fn get_your_nprofile() -> Option<Profile> {
let public_key = match GLOBALS.signer.public_key() {
Some(pk) => pk,

View File

@ -1,8 +1,7 @@
use crate::comms::ToOverlordMessage;
use crate::db::{DbPersonRelay, DbRelay};
use crate::error::Error;
use crate::globals::{Globals, GLOBALS};
use crate::relationship::Relationship;
use crate::globals::GLOBALS;
use nostr_types::{
Event, EventKind, Metadata, NostrBech32, RelayUrl, SimpleRelayList, Tag, Unixtime,
};
@ -138,69 +137,11 @@ pub async fn process_new_event(
}
}
// Save event relationships (whether from relay or not)
{
// replies to
if let Some((id, _)) = event.replies_to() {
// Insert into relationships
Globals::add_relationship(id, event.id, Relationship::Reply).await;
}
// Save event relationships (whether from a relay or not)
let invalid_ids = GLOBALS.storage.process_relationships_of_event(&event)?;
/*
// replies to root
if let Some((id, _)) = event.replies_to_root() {
// Insert into relationships
Globals::add_relationship(id, event.id, Relationship::Root).await;
}
// mentions
for (id, _) in event.mentions() {
// Insert into relationships
Globals::add_relationship(id, event.id, Relationship::Mention).await;
}
*/
// reacts to
if let Some((id, reaction, _maybe_url)) = event.reacts_to() {
// Insert into relationships
Globals::add_relationship(id, event.id, Relationship::Reaction(reaction)).await;
// UI cache invalidation (so the note get rerendered)
GLOBALS.ui_notes_to_invalidate.write().push(id);
}
// deletes
if let Some((ids, reason)) = event.deletes() {
// UI cache invalidation (so the notes get rerendered)
GLOBALS.ui_notes_to_invalidate.write().extend(&ids);
for id in ids {
// since it is a delete, we don't actually desire the event.
// Insert into relationships
Globals::add_relationship(id, event.id, Relationship::Deletion(reason.clone()))
.await;
}
}
// zaps
match event.zaps() {
Ok(Some(zapdata)) => {
// Insert into relationships
Globals::add_relationship(
zapdata.id,
event.id,
Relationship::ZapReceipt(zapdata.amount),
)
.await;
// UI cache invalidation (so the note gets rerendered)
GLOBALS.ui_notes_to_invalidate.write().push(zapdata.id);
}
Err(e) => tracing::error!("Invalid zap receipt: {}", e),
_ => {}
}
}
// Invalidate UI events indicated by those relationships
GLOBALS.ui_notes_to_invalidate.write().extend(&invalid_ids);
// Save event_hashtags
if from_relay {

View File

@ -1,13 +1,11 @@
use nostr_types::MilliSatoshi;
use nostr_types::{MilliSatoshi, PublicKey};
use speedy::{Readable, Writable};
/// A relationship between events
#[derive(Clone, Debug, PartialEq, Eq, Readable, Writable)]
pub enum Relationship {
//Root,
Reply,
//Mention,
Reaction(String),
Reaction(PublicKey, String),
Deletion(String),
ZapReceipt(MilliSatoshi),
ZapReceipt(PublicKey, MilliSatoshi),
}

View File

@ -37,7 +37,9 @@ impl Storage {
// Load and process every event in order to generate the relationships data
fn compute_relationships(&self) -> Result<(), Error> {
panic!("Not yet properly implemented");
// track progress
let total = self.get_event_stats()?.entries();
let mut count = 0;
let txn = self.env.begin_ro_txn()?;
let mut cursor = txn.open_ro_cursor(self.events)?;
@ -47,10 +49,15 @@ impl Storage {
Err(e) => return Err(e.into()),
Ok((_key, val)) => {
let event = Event::read_from_buffer(val)?;
// FIXME we can't do this async
// crate::process::process_new_event(&event, false, None, None).await?;
let _ = self.process_relationships_of_event(&event)?;
}
}
// track progress
count += 1;
if count % 1000 == 0 {
tracing::info!("{}/{}", count, total);
}
}
Ok(())

View File

@ -11,9 +11,11 @@ use lmdb::{
Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Stat, Transaction, WriteFlags,
};
use nostr_types::{
EncryptedPrivateKey, Event, EventKind, Id, PublicKey, PublicKeyHex, RelayUrl, Tag, Unixtime,
EncryptedPrivateKey, Event, EventKind, Id, MilliSatoshi, PublicKey, PublicKeyHex, RelayUrl,
Tag, Unixtime,
};
use speedy::{Readable, Writable};
use std::collections::HashMap;
const MAX_LMDB_KEY: usize = 511;
macro_rules! key {
@ -71,7 +73,7 @@ impl Storage {
// This has to be big enough for all the data.
// Note that it is the size of the map in VIRTUAL address space,
// and that it doesn't all have to be paged in at the same time.
builder.set_map_size(1048576 * 1024 * 2); // 2 GB (probably too small)
builder.set_map_size(1048576 * 1024 * 128); // 128 GB
let env = builder.open(&Profile::current()?.lmdb_dir)?;
@ -115,6 +117,7 @@ impl Storage {
None => {
// Import from sqlite
storage.import()?;
storage.migrate(0)?;
}
Some(level) => {
storage.migrate(level)?;
@ -692,8 +695,6 @@ impl Storage {
// This is temporary to feed src/events.rs which will be going away in a future
// code pass
pub fn fetch_relay_lists(&self) -> Result<Vec<Event>, Error> {
use std::collections::HashMap;
let mut relay_lists =
self.find_events(&[], &[EventKind::RelayList], None, |_| true, false)?;
@ -759,4 +760,126 @@ impl Storage {
}
Ok(output)
}
pub fn get_replies(&self, id: Id) -> Result<Vec<Id>, Error> {
Ok(self
.find_relationships(id)?
.iter()
.filter_map(|(id, rel)| {
if *rel == Relationship::Reply {
Some(*id)
} else {
None
}
})
.collect())
}
/// Returns the list of reactions and whether or not this account has already reacted to this event
pub fn get_reactions(&self, id: Id) -> Result<(Vec<(char, usize)>, bool), Error> {
// Whether or not the Gossip user already reacted to this event
let mut self_already_reacted = false;
// Collect up to one reaction per pubkey
let mut phase1: HashMap<PublicKey, char> = HashMap::new();
for (_, rel) in self.find_relationships(id)? {
if let Relationship::Reaction(pubkey, reaction) = rel {
let symbol: char = if let Some(ch) = reaction.chars().next() {
ch
} else {
'+'
};
phase1.insert(pubkey, symbol);
if Some(pubkey) == GLOBALS.signer.public_key() {
self_already_reacted = true;
}
}
}
// Collate by char
let mut output: HashMap<char, usize> = HashMap::new();
for (_, symbol) in phase1 {
output
.entry(symbol)
.and_modify(|count| *count += 1)
.or_insert_with(|| 1);
}
let mut v: Vec<(char, usize)> = output.drain().collect();
v.sort();
Ok((v, self_already_reacted))
}
pub fn get_zap_total(&self, id: Id) -> Result<MilliSatoshi, Error> {
let mut total = MilliSatoshi(0);
for (_, rel) in self.find_relationships(id)? {
if let Relationship::ZapReceipt(_pk, millisats) = rel {
total = total + millisats;
}
}
Ok(total)
}
pub fn get_deletion(&self, id: Id) -> Result<Option<String>, Error> {
for (_, rel) in self.find_relationships(id)? {
if let Relationship::Deletion(deletion) = rel {
return Ok(Some(deletion.clone()));
}
}
Ok(None)
}
// This returns IDs that should be UI invalidated
pub fn process_relationships_of_event(&self, event: &Event) -> Result<Vec<Id>, Error> {
let mut invalidate: Vec<Id> = Vec::new();
// replies to
if let Some((id, _)) = event.replies_to() {
self.write_relationship(id, event.id, Relationship::Reply)?;
}
// reacts to
if let Some((id, reaction, _maybe_url)) = event.reacts_to() {
self.write_relationship(
id,
event.id,
Relationship::Reaction(event.pubkey, reaction),
)?;
invalidate.push(id);
}
// deletes
if let Some((ids, reason)) = event.deletes() {
invalidate.extend(&ids);
for id in ids {
// since it is a delete, we don't actually desire the event.
self.write_relationship(
id,
event.id,
Relationship::Deletion(reason.clone()),
)?;
}
}
// zaps
match event.zaps() {
Ok(Some(zapdata)) => {
self.write_relationship(
zapdata.id,
event.id,
Relationship::ZapReceipt(event.pubkey, zapdata.amount),
)?;
invalidate.push(zapdata.id);
}
Err(e) => tracing::error!("Invalid zap receipt: {}", e),
_ => {}
}
Ok(invalidate)
}
}

View File

@ -1,7 +1,7 @@
use super::theme::FeedProperties;
use super::{GossipUi, Page};
use crate::feed::FeedKind;
use crate::globals::{Globals, GLOBALS};
use crate::globals::GLOBALS;
use eframe::egui;
use egui::{Context, Frame, RichText, ScrollArea, Ui, Vec2};
use nostr_types::Id;
@ -266,7 +266,7 @@ fn render_note_maybe_fake(
// Yes, and we need to fake render threads to get their approx height too.
if threaded && !as_reply_to {
let replies = Globals::get_replies_sync(event.id);
let replies = GLOBALS.storage.get_replies(event.id).unwrap_or(vec![]);
let iter = replies.iter();
let first = replies.first();
let last = replies.last();

View File

@ -9,7 +9,7 @@ use super::notedata::{NoteData, RepostType};
use super::FeedNoteParams;
use crate::comms::ToOverlordMessage;
use crate::feed::FeedKind;
use crate::globals::{Globals, ZapState, GLOBALS};
use crate::globals::{ZapState, GLOBALS};
use crate::ui::widgets::CopyButton;
use crate::ui::{GossipUi, Page};
use crate::AVATAR_SIZE_F32;
@ -159,7 +159,7 @@ pub(super) fn render_note(
// even if muted, continue rendering thread children
if threaded && !as_reply_to {
let replies = Globals::get_replies_sync(id);
let replies = GLOBALS.storage.get_replies(id).unwrap_or(vec![]);
let iter = replies.iter();
let first = replies.first();
let last = replies.last();

View File

@ -1,7 +1,4 @@
use crate::{
globals::{Globals, GLOBALS},
people::DbPerson,
};
use crate::{globals::GLOBALS, people::DbPerson};
use nostr_types::{
ContentSegment, Event, EventDelegation, EventKind, Id, MilliSatoshi, NostrBech32, PublicKeyHex,
ShatteredContent, Tag,
@ -52,11 +49,17 @@ impl NoteData {
let delegation = event.delegation();
let deletion = Globals::get_deletion_sync(event.id);
let deletion = GLOBALS.storage.get_deletion(event.id).unwrap_or(None);
let (reactions, self_already_reacted) = Globals::get_reactions_sync(event.id);
let (reactions, self_already_reacted) = GLOBALS
.storage
.get_reactions(event.id)
.unwrap_or((vec![], false));
let zaptotal = Globals::get_zap_total_sync(event.id);
let zaptotal = GLOBALS
.storage
.get_zap_total(event.id)
.unwrap_or(MilliSatoshi(0));
// build a list of all cached mentions and their index
// only notes that are in the cache will be rendered as reposts
@ -178,7 +181,10 @@ impl NoteData {
}
pub(super) fn update_reactions(&mut self) {
let (mut reactions, self_already_reacted) = Globals::get_reactions_sync(self.event.id);
let (mut reactions, self_already_reacted) = GLOBALS
.storage
.get_reactions(self.event.id)
.unwrap_or((vec![], false));
self.reactions.clear();
self.reactions.append(&mut reactions);