storage: migration 15: migrate to EventV2

This commit is contained in:
Mike Dilger 2023-11-18 08:47:44 +13:00
parent 17636851ed
commit 53a8bb4efe
5 changed files with 256 additions and 85 deletions

View File

@ -3,8 +3,8 @@ use std::sync::mpsc::Sender;
use crate::error::{Error, ErrorKind};
use crate::globals::GLOBALS;
use nostr_types::{
ContentEncryptionAlgorithm, EncryptedPrivateKey, Event, EventKind, Id, KeySecurity, PreEvent,
PrivateKey, PublicKey, Rumor,
ContentEncryptionAlgorithm, EncryptedPrivateKey, Event, EventV1, EventKind, Id, KeySecurity,
PreEvent, PrivateKey, PublicKey, Rumor, RumorV1
};
use parking_lot::RwLock;
use tokio::task;
@ -324,6 +324,14 @@ impl Signer {
}
}
/// Unwrap a giftwrap event V1
pub fn unwrap_giftwrap1(&self, event: &EventV1) -> Result<RumorV1, Error> {
match &*self.private.read() {
Some(private) => Ok(event.giftwrap_unwrap(private)?),
_ => Err((ErrorKind::NoPrivateKey, file!(), line!()).into()),
}
}
/// Encrypt content
pub fn encrypt(
&self,

View File

@ -2,7 +2,7 @@ use crate::error::{Error, ErrorKind};
use crate::globals::GLOBALS;
use crate::storage::{RawDatabase, Storage};
use heed::{types::UnalignedSlice, DatabaseFlags, RwTxn};
use nostr_types::{Event, EventKind, PublicKeyHex};
use nostr_types::{Event, EventV1, EventKind, PublicKeyHex};
use std::sync::Mutex;
// NOTE: "innerp" is a fake tag. We store events that reference a person internally under it.
@ -122,4 +122,81 @@ impl Storage {
Ok(())
}
pub fn write_event_tag_index1_event1<'a>(
&'a self,
event: &EventV1,
rw_txn: Option<&mut RwTxn<'a>>,
) -> Result<(), Error> {
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
let mut event = event;
let mut rumor_event: EventV1;
if event.kind == EventKind::GiftWrap {
match GLOBALS.signer.unwrap_giftwrap1(event) {
Ok(rumor) => {
rumor_event = rumor.into_event_with_bad_signature();
rumor_event.id = event.id; // lie, so it indexes it under the giftwrap
event = &rumor_event;
}
Err(e) => {
if matches!(e.kind, ErrorKind::NoPrivateKey) {
// Store as unindexed for later indexing
let bytes = vec![];
self.db_unindexed_giftwraps()?
.put(txn, event.id.as_slice(), &bytes)?;
}
}
}
}
// our user's public key
let pk: Option<PublicKeyHex> = self.read_setting_public_key().map(|p| p.into());
for tag in &event.tags {
let tagname = tag.tagname();
let value = match tag.value(1) {
Ok(v) => v,
Err(_) => continue, // no tag value, not indexable.
};
// Only index tags we intend to lookup later by tag.
// If that set changes, (1) add to this code and (2) do a reindex migration
if !INDEXED_TAGS.contains(&&*tagname) {
continue;
}
// For 'p' tags, only index them if 'p' is our user
if tagname == "p" {
match &pk {
None => continue,
Some(pk) => {
if value != pk.as_str() {
continue;
}
}
}
}
let mut key: Vec<u8> = tagname.as_bytes().to_owned();
key.push(b'\"'); // double quote separator, unlikely to be inside of a tagname
key.extend(value.as_bytes());
let key = key!(&key); // limit the size
let bytes = event.id.as_slice();
self.db_event_tag_index()?.put(txn, key, bytes)?;
}
Ok(())
};
match rw_txn {
Some(txn) => f(txn)?,
None => {
let mut txn = self.env.write_txn()?;
f(&mut txn)?;
txn.commit()?;
}
};
Ok(())
}
}

View File

@ -1,9 +1,8 @@
use crate::error::Error;
use crate::storage::{RawDatabase, Storage};
use heed::types::UnalignedSlice;
use heed::RwTxn;
use nostr_types::{Event, Id};
use speedy::{Readable, Writable};
use speedy::Readable;
use std::sync::Mutex;
// Id -> Event
@ -44,42 +43,6 @@ impl Storage {
}
}
pub(crate) fn write_event1<'a>(
&'a self,
event: &Event,
rw_txn: Option<&mut RwTxn<'a>>,
) -> Result<(), Error> {
// write to lmdb 'events'
let bytes = event.write_to_vec()?;
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
self.db_events1()?.put(txn, event.id.as_slice(), &bytes)?;
// also index the event
self.write_event_ek_pk_index(event, Some(txn))?;
self.write_event_ek_c_index(event, Some(txn))?;
self.write_event_tag_index(event, Some(txn))?;
for hashtag in event.hashtags() {
if hashtag.is_empty() {
continue;
} // upstream bug
self.add_hashtag(&hashtag, event.id, Some(txn))?;
}
Ok(())
};
match rw_txn {
Some(txn) => f(txn)?,
None => {
let mut txn = self.env.write_txn()?;
f(&mut txn)?;
txn.commit()?;
}
};
Ok(())
}
pub(crate) fn read_event1(&self, id: Id) -> Result<Option<Event>, Error> {
let txn = self.env.read_txn()?;
match self.db_events1()?.get(&txn, id.as_slice())? {
@ -87,34 +50,4 @@ impl Storage {
Some(bytes) => Ok(Some(Event::read_from_buffer(bytes)?)),
}
}
pub(crate) fn has_event1(&self, id: Id) -> Result<bool, Error> {
let txn = self.env.read_txn()?;
match self.db_events1()?.get(&txn, id.as_slice())? {
None => Ok(false),
Some(_) => Ok(true),
}
}
pub(crate) fn delete_event1<'a>(
&'a self,
id: Id,
rw_txn: Option<&mut RwTxn<'a>>,
) -> Result<(), Error> {
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
let _ = self.db_events1()?.delete(txn, id.as_slice());
Ok(())
};
match rw_txn {
Some(txn) => f(txn)?,
None => {
let mut txn = self.env.write_txn()?;
f(&mut txn)?;
txn.commit()?;
}
};
Ok(())
}
}

View File

@ -5,14 +5,15 @@ use super::types::{
};
use super::Storage;
use crate::error::{Error, ErrorKind};
use crate::relationship::Relationship;
use heed::types::UnalignedSlice;
use heed::{DatabaseFlags, RwTxn};
use nostr_types::{Event, Id, PublicKey, RelayUrl, Signature};
use nostr_types::{EventV1, EventV2, EventReference, Id, PublicKey, RelayUrl, Signature, TagV2};
use speedy::{Readable, Writable};
use std::collections::HashMap;
impl Storage {
const MAX_MIGRATION_LEVEL: u32 = 15;
const MAX_MIGRATION_LEVEL: u32 = 16;
/// Initialize the database from empty
pub(super) fn init_from_empty(&self) -> Result<(), Error> {
@ -84,6 +85,10 @@ impl Storage {
let _ = self.db_person_lists1()?;
let _ = self.db_person_lists2()?;
}
15 => {
let _ = self.db_events1()?;
let _ = self.db_events2()?;
}
_ => {}
};
Ok(())
@ -155,6 +160,10 @@ impl Storage {
tracing::info!("{prefix}: moving person list last edit times...");
self.move_person_list_last_edit_times(txn)?;
}
15 => {
tracing::info!("{prefix}: migrating events...");
self.migrate_to_events2(txn)?;
}
_ => panic!("Unreachable migration level"),
};
@ -176,8 +185,8 @@ impl Storage {
let event_txn = self.env.read_txn()?;
for result in self.db_events1()?.iter(&event_txn)? {
let pair = result?;
let event = Event::read_from_buffer(pair.1)?;
let _ = self.process_relationships_of_event(&event, Some(txn))?;
let event = EventV1::read_from_buffer(pair.1)?;
let _ = self.process_relationships_of_eventv1(&event, Some(txn))?;
// track progress
count += 1;
@ -419,7 +428,7 @@ impl Storage {
let iter = self.db_events1()?.iter(txn)?;
for result in iter {
let (_key, val) = result?;
let event = Event::read_from_buffer(val)?;
let event = EventV1::read_from_buffer(val)?;
if event.sig == Signature::zeroes() {
ids.push(event.id);
}
@ -565,8 +574,8 @@ impl Storage {
let loop_txn = self.env.read_txn()?;
for result in self.db_events1()?.iter(&loop_txn)? {
let (_key, val) = result?;
let event = Event::read_from_buffer(val)?;
self.write_event_tag_index(&event, Some(txn))?;
let event = EventV1::read_from_buffer(val)?;
self.write_event_tag_index1_event1(&event, Some(txn))?;
}
Ok(())
@ -632,4 +641,148 @@ impl Storage {
self.write_person_lists_last_edit_times(edit_times, Some(txn))?;
Ok(())
}
fn migrate_to_events2<'a>(
&'a self,
txn: &mut RwTxn<'a>,
) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let mut count: usize = 0;
for result in self.db_events1()?.iter(&loop_txn)? {
let (_key, val) = result?;
let event1 = EventV1::read_from_buffer(val)?;
let tags_json = serde_json::to_value(event1.tags)?;
let tags2: Vec<TagV2> = serde_json::from_value(tags_json)?;
let event2 = EventV2 {
id: event1.id,
pubkey: event1.pubkey,
created_at: event1.created_at,
kind: event1.kind,
sig: event1.sig,
content: event1.content,
tags: tags2,
};
self.write_event2(&event2, Some(txn))?;
count += 1;
}
tracing::info!("Migrated {} events", count);
// clear events1 database (we don't have an interface to delete it)
self.db_events1()?.clear(txn)?;
Ok(())
}
/// Process relationships of an eventv1.
pub fn process_relationships_of_eventv1<'a>(
&'a self,
event: &EventV1,
rw_txn: Option<&mut RwTxn<'a>>,
) -> Result<Vec<Id>, Error> {
let mut invalidate: Vec<Id> = Vec::new();
let mut f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
// replies to
match event.replies_to() {
Some(EventReference::Id(id, _, _)) => {
self.write_relationship(id, event.id, Relationship::Reply, Some(txn))?;
}
Some(EventReference::Addr(_ea)) => {
// will only work if we already have it... yuck.
// We need a new relationships database for EventAddrs
// FIXME
}
None => (),
}
// reacts to
if let Some((reacted_to_id, reaction, _maybe_url)) = event.reacts_to() {
if let Some(reacted_to_event) = self.read_event1(reacted_to_id)? {
// Only if they are different people (no liking your own posts)
if reacted_to_event.pubkey != event.pubkey {
self.write_relationship(
reacted_to_id, // event reacted to
event.id, // the reaction event id
Relationship::Reaction(event.pubkey, reaction),
Some(txn),
)?;
}
invalidate.push(reacted_to_id);
} else {
// Store the reaction to the event we dont have yet.
// We filter bad ones when reading them back too, so even if this
// turns out to be a reaction by the author, they can't like
// their own post
self.write_relationship(
reacted_to_id, // event reacted to
event.id, // the reaction event id
Relationship::Reaction(event.pubkey, reaction),
Some(txn),
)?;
invalidate.push(reacted_to_id);
}
}
// deletes
if let Some((deleted_event_ids, reason)) = event.deletes() {
invalidate.extend(&deleted_event_ids);
for deleted_event_id in deleted_event_ids {
// since it is a delete, we don't actually desire the event.
if let Some(deleted_event) = self.read_event1(deleted_event_id)? {
// Only if it is the same author
if deleted_event.pubkey == event.pubkey {
self.write_relationship(
deleted_event_id,
event.id,
Relationship::Deletion(reason.clone()),
Some(txn),
)?;
}
} else {
// We don't have the deleted event. Presume it is okay. We check again
// when we read these back
self.write_relationship(
deleted_event_id,
event.id,
Relationship::Deletion(reason.clone()),
Some(txn),
)?;
}
}
}
// zaps
match event.zaps() {
Ok(Some(zapdata)) => {
self.write_relationship(
zapdata.id,
event.id,
Relationship::ZapReceipt(event.pubkey, zapdata.amount),
Some(txn),
)?;
invalidate.push(zapdata.id);
}
Err(e) => tracing::error!("Invalid zap receipt: {}", e),
_ => {}
}
Ok(())
};
match rw_txn {
Some(txn) => f(txn)?,
None => {
let mut txn = self.env.write_txn()?;
f(&mut txn)?;
txn.commit()?;
}
};
Ok(invalidate)
}
}

View File

@ -45,8 +45,8 @@ use gossip_relay_picker::Direction;
use heed::types::UnalignedSlice;
use heed::{Database, Env, EnvFlags, EnvOpenOptions, RwTxn};
use nostr_types::{
EncryptedPrivateKey, Event, EventKind, EventReference, Id, MilliSatoshi, PublicKey, RelayUrl,
Tag, Unixtime,
EncryptedPrivateKey, Event, EventKind, EventReference, Id, MilliSatoshi, PublicKey,
RelayUrl, Tag, Unixtime,
};
use paste::paste;
use speedy::{Readable, Writable};
@ -234,7 +234,7 @@ impl Storage {
#[inline]
pub(crate) fn db_events(&self) -> Result<RawDatabase, Error> {
self.db_events1()
self.db_events2()
}
#[inline]
@ -1184,25 +1184,25 @@ impl Storage {
event: &Event,
rw_txn: Option<&mut RwTxn<'a>>,
) -> Result<(), Error> {
self.write_event1(event, rw_txn)
self.write_event2(event, rw_txn)
}
/// Read an event
#[inline]
pub fn read_event(&self, id: Id) -> Result<Option<Event>, Error> {
self.read_event1(id)
self.read_event2(id)
}
/// If we have th event
#[inline]
pub fn has_event(&self, id: Id) -> Result<bool, Error> {
self.has_event1(id)
self.has_event2(id)
}
/// Delete the event
#[inline]
pub fn delete_event<'a>(&'a self, id: Id, rw_txn: Option<&mut RwTxn<'a>>) -> Result<(), Error> {
self.delete_event1(id, rw_txn)
self.delete_event2(id, rw_txn)
}
/// Replace any existing event with the passed in event, if it is of a replaceable kind