lmdb: index: event_references_person

This commit is contained in:
Mike Dilger 2023-08-02 17:16:40 +12:00
parent d76325c267
commit c185d6e531

View File

@ -70,6 +70,7 @@ pub struct Storage {
events: Database,
// EventKind:PublicKey -> Id
// (pubkey is event author)
// (dup keys, so multiple Ids per key)
// val: id.as_slice() | Id(val[0..32].try_into()?)
event_ek_pk_index: Database,
@ -79,6 +80,14 @@ pub struct Storage {
// val: id.as_slice() | Id(val[0..32].try_into()?)
event_ek_c_index: Database,
// PublicKey:ReverseUnixtime -> Id
// (pubkey is referenced by the event somehow)
// (only feed-displayable events are included)
// (dup keys, so multiple Ids per key)
// NOTE: this may be far too much data. Maybe we should only build this for the
// user's pubkey as their inbox.
event_references_person: Database,
// Id:Id -> Relationship
// key: id.as_slice(), id.as_slice() | Id(val[32..64].try_into()?)
// val: relationship.write_to_vec() | Relationship::read_from_buffer(val)
@ -144,6 +153,10 @@ impl Storage {
DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED,
)?;
let event_references_person = env.create_db(
Some("event_references_person"),
DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED,
)?;
let relationships = env.create_db(Some("relationships"), DatabaseFlags::empty())?;
let people = env.create_db(Some("people"), DatabaseFlags::empty())?;
@ -161,6 +174,7 @@ impl Storage {
events,
event_ek_pk_index,
event_ek_c_index,
event_references_person,
relationships,
people,
person_relays,
@ -659,6 +673,7 @@ impl Storage {
// also index the event
self.write_event_ek_pk_index(event)?;
self.write_event_ek_c_index(event)?;
self.write_event_references_person(event)?;
Ok(())
}
@ -805,7 +820,7 @@ impl Storage {
// Break if we moved to a different public key
let this_pubkey = match PublicKey::from_bytes(&key[4..4 + 32]) {
Err(_) => continue 'pubkeyloop,
Err(_) => continue,
Ok(pk) => pk,
};
if this_pubkey != *pubkey {
@ -891,24 +906,7 @@ impl Storage {
where
F: Fn(&Event) -> bool,
{
if kinds.is_empty() {
return Err(ErrorKind::General(
"find_events() requires some event kinds to be specified.".to_string(),
)
.into());
}
// Get the Ids
let ids = match (pubkeys.is_empty(), since) {
(true, None) => self.find_ek_pk_events(kinds, pubkeys)?,
(true, Some(when)) => self.find_ek_c_events(kinds, when)?,
(false, None) => self.find_ek_pk_events(kinds, pubkeys)?,
(false, Some(when)) => {
let group1 = self.find_ek_pk_events(kinds, pubkeys)?;
let group2 = self.find_ek_c_events(kinds, when)?;
group1.intersection(&group2).copied().collect()
}
};
let ids = self.find_event_ids(kinds, pubkeys, since)?;
// Now that we have that Ids, fetch the events
let txn = self.env.begin_ro_txn()?;
@ -934,6 +932,44 @@ impl Storage {
Ok(events)
}
// Find events of interest. This is just like find_events() but it just gives the Ids,
// unsorted.
//
// You must specify some event kinds.
// If pubkeys is empty, they won't matter.
// If since is None, it won't matter.
//
// The function f is run after the matching-so-far events have been deserialized
// to finish filtering, and optionally they are sorted in reverse chronological
// order.
pub fn find_event_ids(
&self,
kinds: &[EventKind],
pubkeys: &[PublicKey],
since: Option<Unixtime>,
) -> Result<HashSet<Id>, Error> {
if kinds.is_empty() {
return Err(ErrorKind::General(
"find_events() requires some event kinds to be specified.".to_string(),
)
.into());
}
// Get the Ids
let ids = match (pubkeys.is_empty(), since) {
(true, None) => self.find_ek_pk_events(kinds, pubkeys)?,
(true, Some(when)) => self.find_ek_c_events(kinds, when)?,
(false, None) => self.find_ek_pk_events(kinds, pubkeys)?,
(false, Some(when)) => {
let group1 = self.find_ek_pk_events(kinds, pubkeys)?;
let group2 = self.find_ek_c_events(kinds, when)?;
group1.intersection(&group2).copied().collect()
}
};
Ok(ids)
}
pub fn search_events(&self, text: &str) -> Result<Vec<Event>, Error> {
let event_kinds = GLOBALS.settings.read().feed_displayable_event_kinds();
@ -1008,6 +1044,97 @@ impl Storage {
Ok(())
}
// We don't call this externally. Whenever we write an event, we do this.
fn write_event_references_person(&self, event: &Event) -> Result<(), Error> {
if !event.kind.is_feed_displayable() {
return Ok(());
}
let bytes = event.id.as_slice();
let mut pubkeys: HashSet<PublicKey> = HashSet::new();
for (pubkeyhex, _, _) in event.people() {
let pubkey = match PublicKey::try_from_hex_string(pubkeyhex.as_str()) {
Ok(pk) => pk,
Err(_) => continue,
};
pubkeys.insert(pubkey);
}
for pubkey in event.people_referenced_in_content() {
pubkeys.insert(pubkey);
}
if !pubkeys.is_empty() {
let mut txn = self.env.begin_rw_txn()?;
for pubkey in pubkeys.drain() {
let mut key: Vec<u8> = pubkey.as_bytes();
key.extend((i64::MAX - event.created_at.0).to_be_bytes().as_slice()); // reverse created_at
txn.put(
self.event_references_person,
&key,
&bytes,
WriteFlags::empty(),
)?;
}
txn.commit()?;
}
Ok(())
}
// Read all events referencing a given person in reverse time order
pub fn read_events_referencing_person<F>(
&self,
pubkey: &PublicKey,
since: Unixtime,
f: F,
) -> Result<Vec<Event>, Error>
where
F: Fn(&Event) -> bool,
{
let txn = self.env.begin_ro_txn()?;
let mut cursor = txn.open_ro_cursor(self.event_references_person)?;
let now = Unixtime::now().unwrap();
let mut start_key: Vec<u8> = pubkey.as_bytes();
start_key.extend((i64::MAX - now.0).to_be_bytes().as_slice()); // work back from now
let iter = cursor.iter_from(start_key);
let mut events: Vec<Event> = Vec::new();
for result in iter {
match result {
Err(e) => return Err(e.into()),
Ok((key, val)) => {
// Break if we moved to a different pubkey
let this_pubkey = match PublicKey::from_bytes(&key[..32]) {
Err(_) => continue,
Ok(pk) => pk,
};
if this_pubkey != *pubkey {
break;
}
// Break if these events are getting to old
let this_time = i64::from_be_bytes(key[32..32 + 8].try_into().unwrap());
if this_time > (i64::MAX - since.0) {
break;
}
// Take the event
let id = Id(val[0..32].try_into()?);
// (like read_event, but we supply our on transaction)
match txn.get(self.events, &id.as_slice()) {
Ok(bytes) => {
let event = Event::read_from_buffer(bytes)?;
if f(&event) {
events.push(event);
}
}
Err(lmdb::Error::NotFound) => continue,
Err(e) => return Err(e.into()),
}
}
}
}
Ok(events)
}
// TBD: optimize this by storing better event indexes
// currently we stupidly scan every event (just to get LMDB up and running first)
pub fn fetch_contact_list(&self, pubkey: &PublicKey) -> Result<Option<Event>, Error> {