Merge branch 'rapid' into unstable

This commit is contained in:
Mike Dilger 2024-07-26 10:40:02 +12:00
commit a518f29d6c
46 changed files with 261 additions and 179 deletions

View File

@ -25,7 +25,7 @@ impl Command {
}
}
const COMMANDS: [Command; 36] = [
const COMMANDS: [Command; 37] = [
Command {
cmd: "oneshot",
usage_params: "{depends}",
@ -166,6 +166,11 @@ const COMMANDS: [Command; 36] = [
usage_params: "<idhex>",
desc: "print the relays the event was seen on",
},
Command {
cmd: "rapid",
usage_params: "",
desc: "Use much faster disk access. A crash can corrupt your local data, unless your filesystem preserves write ordering",
},
Command {
cmd: "rebuild_indices",
usage_params: "",
@ -209,7 +214,6 @@ const COMMANDS: [Command; 36] = [
];
pub fn handle_command(mut args: env::Args, runtime: &Runtime) -> Result<bool, Error> {
let _ = args.next(); // program name
let command_string = args.next().unwrap(); // must be there or we would not have been called
let mut command: Option<Command> = None;
@ -259,6 +263,7 @@ pub fn handle_command(mut args: env::Args, runtime: &Runtime) -> Result<bool, Er
"print_relay" => print_relay(command, args)?,
"print_relays" => print_relays(command)?,
"print_seen_on" => print_seen_on(command, args)?,
"rapid" => {} // is handled early in main.rs
"rebuild_indices" => rebuild_indices()?,
"rename_person_list" => rename_person_list(command, args)?,
"reprocess_recent" => reprocess_recent(command, runtime)?,

View File

@ -43,8 +43,27 @@ fn main() -> Result<(), Error> {
let about = about::About::new();
println!("Gossip {}", about.version);
// Handle rapid command before initializing the lib
let mut rapid: bool = false;
{
let mut args = env::args();
let _ = args.next(); // program name
if let Some(cmd) = args.next() {
if &*cmd == "rapid" {
rapid = true;
}
}
}
// restart args
let mut args = env::args();
let _ = args.next(); // program name
if rapid {
let _ = args.next(); // rapid param
}
// Initialize the lib
gossip_lib::init()?;
gossip_lib::init(rapid)?;
// Setup async
// We create and enter the runtime on the main thread so that
@ -54,8 +73,7 @@ fn main() -> Result<(), Error> {
let _main_rt = rt.enter(); // <-- this allows it.
// If we were handed a command, execute the command and return
let args = env::args();
if args.len() > 1 {
if args.len() > 0 {
match commands::handle_command(args, &rt) {
Err(e) => {
println!("{}", e);

View File

@ -181,10 +181,7 @@ lazy_static! {
// We start in the Offline state
let (write_runstate, read_runstate) = tokio::sync::watch::channel(RunState::Initializing);
let storage = match Storage::new() {
Ok(s) => s,
Err(e) => panic!("{e}")
};
let storage = Storage::new();
let filter_engine = Engine::new();
let filter = crate::filter::load_script(&filter_engine);

View File

@ -198,11 +198,11 @@ impl std::convert::TryFrom<u8> for RunState {
}
/// Initialize gossip-lib
pub fn init() -> Result<(), Error> {
pub fn init(rapid: bool) -> Result<(), Error> {
use std::sync::atomic::Ordering;
// Initialize storage
GLOBALS.storage.init()?;
GLOBALS.storage.init(rapid)?;
// Load signer from settings
GLOBALS.identity.load()?;

View File

@ -25,9 +25,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Unit>()
.name("event_akci_index")

View File

@ -26,9 +26,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
.flags(DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED)

View File

@ -27,9 +27,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
.flags(DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED)

View File

@ -34,9 +34,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Unit>()
.name("event_kci_index")

View File

@ -28,9 +28,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -44,7 +44,7 @@ impl Storage {
}
pub(crate) fn get_event_seen_on_relay1_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_event_seen_on_relay1()?.len(&txn)?)
}
@ -73,7 +73,7 @@ impl Storage {
id: Id,
) -> Result<Vec<(RelayUrl, Unixtime)>, Error> {
let start_key: Vec<u8> = id.as_slice().to_owned();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<(RelayUrl, Unixtime)> = Vec::new();
for result in self
.db_event_seen_on_relay1()?

View File

@ -32,9 +32,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
.flags(DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED)

View File

@ -28,9 +28,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -44,7 +44,7 @@ impl Storage {
}
pub(crate) fn get_event_viewed1_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_event_viewed1()?.len(&txn)?)
}
@ -64,7 +64,7 @@ impl Storage {
}
pub(crate) fn is_event_viewed1(&self, id: Id) -> Result<bool, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_event_viewed1()?.get(&txn, id.as_slice())?.is_some())
}
}

View File

@ -26,9 +26,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed

View File

@ -29,9 +29,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -92,7 +92,7 @@ impl Storage {
}
pub(crate) fn read_event3(&self, id: Id) -> Result<Option<EventV3>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
match self.db_events3()?.get(&txn, id.as_slice())? {
None => Ok(None),
Some(bytes) => Ok(Some(EventV3::read_from_buffer(bytes)?)),
@ -100,7 +100,7 @@ impl Storage {
}
pub(crate) fn has_event3(&self, id: Id) -> Result<bool, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
match self.db_events3()?.get(&txn, id.as_slice())? {
None => Ok(false),
Some(_) => Ok(true),

View File

@ -0,0 +1,39 @@
use crate::error::Error;
use crate::storage::{RawDatabase, Storage};
use heed::types::Bytes;
use std::sync::Mutex;
static GENERAL_DB_CREATE_LOCK: Mutex<()> = Mutex::new(());
static mut GENERAL_DB: Option<RawDatabase> = None;
impl Storage {
pub(super) fn db_general(&self) -> Result<RawDatabase, Error> {
unsafe {
if let Some(db) = GENERAL_DB {
Ok(db)
} else {
// Lock. This drops when anything returns.
let _lock = GENERAL_DB_CREATE_LOCK.lock();
// In case of a race, check again
if let Some(db) = GENERAL_DB {
return Ok(db);
}
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env().write_txn()?;
let db = self
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
// unnamed!
.create(&mut txn)?;
txn.commit()?;
GENERAL_DB = Some(db);
Ok(db)
}
}
}
}

View File

@ -28,9 +28,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
.flags(DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED)
@ -69,7 +69,7 @@ impl Storage {
if key.is_empty() {
return Err(ErrorKind::Empty("hashtag".to_owned()).into());
}
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<Id> = Vec::new();
let iter = match self.db_hashtags1()?.get_duplicates(&txn, key)? {
Some(i) => i,

View File

@ -13,7 +13,7 @@ macro_rules! write_transact {
match $opttxn {
Some(txn) => $f(txn),
None => {
let mut txn = $storage.env.write_txn()?;
let mut txn = $storage.env().write_txn()?;
let result = $f(&mut txn);
txn.commit()?;
result
@ -27,7 +27,7 @@ macro_rules! read_transact {
match $opttxn {
Some(txn) => $f(txn),
None => {
let txn = $storage.env.read_txn()?;
let txn = $storage.env().read_txn()?;
$f(&txn)
}
}
@ -48,7 +48,7 @@ macro_rules! def_setting {
let bytes = $field.write_to_vec()?;
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
Ok(self.general.put(txn, $string, &bytes)?)
Ok(self.db_general()?.put(txn, $string, &bytes)?)
};
write_transact!(self, rw_txn, f)
@ -56,12 +56,12 @@ macro_rules! def_setting {
#[allow(dead_code)]
pub fn [<read_setting_ $field>](&self) -> $type {
let txn = match self.env.read_txn() {
let txn = match self.env().read_txn() {
Ok(txn) => txn,
Err(_) => return $default,
};
match self.general.get(&txn, $string) {
match self.db_general().unwrap().get(&txn, $string) {
Err(_) => $default,
Ok(None) => $default,
Ok(Some(bytes)) => match <$type>::read_from_buffer(bytes) {
@ -98,19 +98,19 @@ macro_rules! def_flag {
let bytes = $field.write_to_vec()?;
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
Ok(self.general.put(txn, $string, &bytes)?)
Ok(self.db_general()?.put(txn, $string, &bytes)?)
};
write_transact!(self, rw_txn, f)
}
pub fn [<get_flag_ $field>](&self) -> bool {
let txn = match self.env.read_txn() {
let txn = match self.env().read_txn() {
Ok(txn) => txn,
Err(_) => return $default,
};
match self.general.get(&txn, $string) {
match self.db_general().unwrap().get(&txn, $string) {
Err(_) => $default,
Ok(None) => $default,
Ok(Some(bytes)) => bool::read_from_buffer(bytes).unwrap_or($default),

View File

@ -28,7 +28,7 @@ impl Storage {
}
fn m13_migrate_lists<'a>(&'a self, txn: &mut RwTxn<'a>) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
for result in self.db_person_lists1()?.iter(&loop_txn)? {
let (key, val) = result?;
let pubkey = PublicKey::from_bytes(key, true)?;

View File

@ -29,7 +29,7 @@ impl Storage {
fn m19_populate_person_list_metadata<'a>(&'a self, txn: &mut RwTxn<'a>) -> Result<(), Error> {
// read custom_person_list_map setting
let name_map: BTreeMap<u8, String> = {
let maybe_map = match self.general.get(txn, b"custom_person_list_map") {
let maybe_map = match self.db_general()?.get(txn, b"custom_person_list_map") {
Err(_) => None,
Ok(None) => None,
Ok(Some(bytes)) => match <BTreeMap<u8, String>>::read_from_buffer(bytes) {
@ -80,8 +80,9 @@ impl Storage {
}
// Now remove the two maps
self.general.delete(txn, b"custom_person_list_map")?;
self.general.delete(txn, b"person_lists_last_edit_times")?;
self.db_general()?.delete(txn, b"custom_person_list_map")?;
self.db_general()?
.delete(txn, b"person_lists_last_edit_times")?;
Ok(())
}
@ -90,9 +91,12 @@ impl Storage {
pub fn m19_read_person_lists_last_edit_times(
&self,
) -> Result<HashMap<PersonList1, i64>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
match self.general.get(&txn, b"person_lists_last_edit_times")? {
match self
.db_general()?
.get(&txn, b"person_lists_last_edit_times")?
{
None => Ok(HashMap::new()),
Some(bytes) => Ok(HashMap::<PersonList1, i64>::read_from_buffer(bytes)?),
}

View File

@ -73,7 +73,7 @@ impl Storage {
let ids = self.m20_find_event_ids(kinds, pubkeys, since)?;
// Now that we have that Ids, fetch the events
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut events: Vec<EventV2> = Vec::new();
for id in ids {
// this is like self.read_event(), but we supply our existing transaction
@ -173,7 +173,7 @@ impl Storage {
}
let mut ids: HashSet<Id> = HashSet::new();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
for kind in kinds {
let ek: u32 = (*kind).into();
@ -223,7 +223,7 @@ impl Storage {
let now = Unixtime::now();
let mut ids: HashSet<Id> = HashSet::new();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
for kind in kinds {
let ek: u32 = (*kind).into();

View File

@ -26,7 +26,7 @@ impl Storage {
}
fn m25_migrate_to_events3<'a>(&'a self, txn: &mut RwTxn<'a>) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
let mut count: usize = 0;
for result in self.db_events2()?.iter(&loop_txn)? {
let (_key, val) = result?;

View File

@ -24,7 +24,7 @@ impl Storage {
}
fn m28_fix_empty_petnames<'a>(&'a self, txn: &mut RwTxn<'a>) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
for result in self.db_people2()?.iter(&loop_txn)? {
let (key, val) = result?;
let mut person: Person2 = serde_json::from_slice(val)?;

View File

@ -24,7 +24,7 @@ impl Storage {
}
fn m29_build_new_event_indexes<'a>(&'a self, txn: &mut RwTxn<'a>) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
for result in self.db_events()?.iter(&loop_txn)? {
let (_, bytes) = result?;
let event = Event::read_from_buffer(bytes)?;

View File

@ -31,7 +31,7 @@ impl Storage {
&'a self,
txn: &mut RwTxn<'a>,
) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
for result in self.db_person_lists2()?.iter(&loop_txn)? {
let (key, val) = result?;
let pubkey = PublicKey::from_bytes(key, true)?;

View File

@ -26,7 +26,7 @@ impl Storage {
}
fn m34_migrate_person_relay_records<'a>(&'a self, txn: &mut RwTxn<'a>) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
let iter = self.db_person_relays1()?.iter(&loop_txn)?;
for result in iter {
let (key, val) = result?;

View File

@ -22,7 +22,7 @@ impl Storage {
}
fn m35_migrate_person_records(&self, txn: &mut RwTxn<'_>) -> Result<(), Error> {
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
let iter = self.db_people2()?.iter(&loop_txn)?;
for result in iter {
let (_key, val) = result?;

View File

@ -47,13 +47,13 @@ impl Storage {
for level in necessary.iter() {
self.trigger(*level)?;
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
self.migrate_inner(*level, &mut txn)?;
self.write_migration_level(*level, Some(&mut txn))?;
txn.commit()?;
}
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
self.write_migration_level(Self::MAX_MIGRATION_LEVEL, Some(&mut txn))?;
txn.commit()?;
@ -92,7 +92,7 @@ impl Storage {
while level < Self::MAX_MIGRATION_LEVEL {
level += 1;
self.trigger(level)?;
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
self.migrate_inner(level, &mut txn)?;
self.write_migration_level(level, Some(&mut txn))?;
txn.commit()?;

View File

@ -29,6 +29,7 @@ mod event_tag_index1;
mod event_viewed1;
mod events2;
mod events3;
mod general;
mod hashtags1;
mod nip46servers1;
mod nip46servers2;
@ -69,6 +70,7 @@ use paste::paste;
use speedy::{Readable, Writable};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::ops::Bound;
use std::sync::OnceLock;
use self::event_kci_index::INDEXED_KINDS;
use self::event_tag_index1::INDEXED_TAGS;
@ -80,20 +82,38 @@ type EmptyDatabase = Database<Bytes, Unit>;
///
/// All calls are synchronous but fast so callers can just wait on them.
pub struct Storage {
env: Env,
// General database (settings, local_settings)
general: RawDatabase,
env: OnceLock<Env>,
}
impl Storage {
pub(crate) fn new() -> Result<Storage, Error> {
pub(crate) fn new() -> Storage {
Storage {
env: OnceLock::new(),
}
}
/// Run this after GLOBALS lazy static initialisation, so functions within storage can
/// access GLOBALS without hanging.
pub fn init(&self, rapid: bool) -> Result<(), Error> {
let mut builder = EnvOpenOptions::new();
let flags = if rapid {
tracing::warn!("Storage using rapid config - data corruption is possible on crash");
EnvFlags::NO_TLS
| EnvFlags::NO_META_SYNC
| EnvFlags::WRITE_MAP
| EnvFlags::NO_SYNC
| EnvFlags::MAP_ASYNC
} else {
EnvFlags::NO_TLS | EnvFlags::NO_META_SYNC
};
unsafe {
builder.flags(EnvFlags::NO_TLS | EnvFlags::NO_META_SYNC);
builder.flags(flags);
// See flats at http://www.lmdb.tech/doc/group__mdb__env.html
// See flags at http://www.lmdb.tech/doc/group__mdb.html (more detail)
}
// builder.max_readers(126); // this is the default
builder.max_dbs(32);
@ -117,21 +137,10 @@ impl Storage {
}
};
let mut txn = env.write_txn()?;
self.env
.set(env)
.expect("Unable to setup storage environment");
let general = env
.database_options()
.types::<Bytes, Bytes>()
.create(&mut txn)?;
txn.commit()?;
Ok(Storage { env, general })
}
/// Run this after GLOBALS lazy static initialisation, so functions within storage can
/// access GLOBALS without hanging.
pub fn init(&self) -> Result<(), Error> {
// We have to trigger all of the current-version databases into existence
// because otherwise there will be MVCC visibility problems later having
// different transactions in parallel
@ -164,21 +173,28 @@ impl Storage {
Ok(())
}
pub fn env(&self) -> &Env {
match self.env.get() {
Some(e) => e,
None => panic!("Storage call before initialization"),
}
}
/// Get a write transaction. With it, you can do multiple writes before you commit it.
/// Bundling multiple writes together is more efficient.
pub fn get_write_txn(&self) -> Result<RwTxn<'_>, Error> {
Ok(self.env.write_txn()?)
Ok(self.env().write_txn()?)
}
/// Get a read transaction.
pub fn get_read_txn(&self) -> Result<RoTxn<'_>, Error> {
Ok(self.env.read_txn()?)
Ok(self.env().read_txn()?)
}
/// Sync the data to disk. This happens periodically, but sometimes it's useful to force
/// it.
pub fn sync(&self) -> Result<(), Error> {
self.env.force_sync()?;
self.env().force_sync()?;
Ok(())
}
@ -253,8 +269,8 @@ impl Storage {
/// The number of records in the general table
pub fn get_general_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
Ok(self.general.len(&txn)?)
let txn = self.env().read_txn()?;
Ok(self.db_general()?.len(&txn)?)
}
/// The number of records in the event_seen_on table
@ -271,13 +287,13 @@ impl Storage {
/// The number of records in the hashtags table
pub fn get_hashtags_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_hashtags()?.len(&txn)?)
}
/// The number of records in the nip46servers table
pub fn get_nip46servers_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_nip46servers()?.len(&txn)?)
}
@ -289,39 +305,39 @@ impl Storage {
/// The number of records in the event table
pub fn get_event_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_events()?.len(&txn)?)
}
/// The number of records in the event_akci_index table
pub fn get_event_akci_index_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_event_akci_index()?.len(&txn)?)
}
/// The number of records in the event_kci_index table
pub fn get_event_kci_index_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_event_kci_index()?.len(&txn)?)
}
/// The number of records in the event_tag index table
pub fn get_event_tag_index_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_event_tag_index()?.len(&txn)?)
}
/// The number of records in the relationships_by_addr table
#[inline]
pub fn get_relationships_by_addr_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_relationships_by_addr()?.len(&txn)?)
}
/// The number of records in the relationships_by_id table
#[inline]
pub fn get_relationships_by_id_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_relationships_by_id()?.len(&txn)?)
}
@ -333,7 +349,7 @@ impl Storage {
/// The number of records in the person_lists table
pub fn get_person_lists_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_person_lists()?.len(&txn)?)
}
@ -343,7 +359,7 @@ impl Storage {
/// and all related indexes.
pub fn prune(&self, from: Unixtime) -> Result<usize, Error> {
// Extract the Ids to delete.
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut ids: HashSet<Id> = HashSet::new();
for result in self.db_events()?.iter(&txn)? {
let (_key, val) = result?;
@ -361,7 +377,7 @@ impl Storage {
}
drop(txn);
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
// Delete from event_seen_on_relay
let mut deletions: Vec<Vec<u8>> = Vec::new();
@ -446,17 +462,17 @@ impl Storage {
let bytes = migration_level.to_be_bytes();
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
Ok(self.general.put(txn, b"migration_level", &bytes)?)
Ok(self.db_general()?.put(txn, b"migration_level", &bytes)?)
};
write_transact!(self, rw_txn, f)
}
pub(crate) fn read_migration_level(&self) -> Result<Option<u32>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self
.general
.db_general()?
.get(&txn, b"migration_level")?
.map(|bytes| u32::from_be_bytes(bytes[..4].try_into().unwrap())))
}
@ -470,7 +486,8 @@ impl Storage {
let bytes = epk.map(|e| &e.0).write_to_vec()?;
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
self.general.put(txn, b"encrypted_private_key", &bytes)?;
self.db_general()?
.put(txn, b"encrypted_private_key", &bytes)?;
Ok(())
};
@ -479,9 +496,9 @@ impl Storage {
/// Read the user's encrypted private key
pub fn read_encrypted_private_key(&self) -> Result<Option<EncryptedPrivateKey>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
match self.general.get(&txn, b"encrypted_private_key")? {
match self.db_general()?.get(&txn, b"encrypted_private_key")? {
None => Ok(None),
Some(bytes) => {
let os = Option::<String>::read_from_buffer(bytes)?;
@ -500,7 +517,8 @@ impl Storage {
let bytes = server.write_to_vec()?;
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
self.general.put(txn, b"nip46_unconnected_server", &bytes)?;
self.db_general()?
.put(txn, b"nip46_unconnected_server", &bytes)?;
Ok(())
};
@ -510,8 +528,8 @@ impl Storage {
/// Read NIP-46 unconnected server
#[allow(dead_code)]
pub fn read_nip46_unconnected_server(&self) -> Result<Option<Nip46UnconnectedServer>, Error> {
let txn = self.env.read_txn()?;
match self.general.get(&txn, b"nip46_unconnected_server")? {
let txn = self.env().read_txn()?;
match self.db_general()?.get(&txn, b"nip46_unconnected_server")? {
None => Ok(None),
Some(bytes) => {
let server = Nip46UnconnectedServer::read_from_buffer(bytes)?;
@ -527,7 +545,8 @@ impl Storage {
rw_txn: Option<&mut RwTxn<'a>>,
) -> Result<(), Error> {
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
self.general.delete(txn, b"nip46_unconnected_server")?;
self.db_general()?
.delete(txn, b"nip46_unconnected_server")?;
Ok(())
};
@ -1109,7 +1128,7 @@ impl Storage {
f(txn)?;
}
None => {
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
f(&mut txn)?;
txn.commit()?;
}
@ -1198,7 +1217,7 @@ impl Storage {
f(txn)?;
}
None => {
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
f(&mut txn)?;
txn.commit()?;
}
@ -1413,7 +1432,7 @@ impl Storage {
where
F: Fn(&Event) -> bool,
{
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
// We insert into a BTreeSet to keep them time-ordered
let mut output: BTreeSet<Event> = BTreeSet::new();
@ -1632,7 +1651,7 @@ impl Storage {
.case_insensitive(true)
.build()?;
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let iter = self.db_events()?.iter(&txn)?;
let mut events: Vec<Event> = Vec::new();
for result in iter {
@ -2236,7 +2255,7 @@ impl Storage {
self.db_event_tag_index()?.clear(txn)?;
self.db_hashtags()?.clear(txn)?;
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
for result in self.db_events()?.iter(&loop_txn)? {
let (_key, val) = result?;
let event = Event::read_from_buffer(val)?;
@ -2285,7 +2304,7 @@ impl Storage {
// Erase the index first
self.db_event_tag_index()?.clear(txn)?;
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
for result in self.db_events()?.iter(&loop_txn)? {
let (_key, val) = result?;
let event = Event::read_from_buffer(val)?;
@ -2323,7 +2342,7 @@ impl Storage {
if let Some(mut metadata) = self.get_person_list_metadata(list)? {
if metadata.len != people.len() {
metadata.len = people.len();
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
self.set_person_list_metadata(list, &metadata, Some(&mut txn))?;
txn.commit()?;
}
@ -2452,7 +2471,7 @@ impl Storage {
) -> Result<(), Error> {
let f = |txn: &mut RwTxn<'a>| -> Result<(), Error> {
// Iterate through all events
let loop_txn = self.env.read_txn()?;
let loop_txn = self.env().read_txn()?;
for result in self.db_events()?.iter(&loop_txn)? {
let (_key, val) = result?;
let event = Event::read_from_buffer(val)?;

View File

@ -29,9 +29,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed

View File

@ -30,9 +30,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -67,7 +67,7 @@ impl Storage {
pubkey: PublicKey,
) -> Result<Option<Nip46Server>, Error> {
let key = pubkey.as_bytes();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(match self.db_nip46servers2()?.get(&txn, key)? {
Some(bytes) => Some(Nip46Server::read_from_buffer(bytes)?),
None => None,
@ -75,7 +75,7 @@ impl Storage {
}
pub(crate) fn read_all_nip46servers2(&self) -> Result<Vec<Nip46Server>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<Nip46Server> = Vec::new();
for result in self.db_nip46servers2()?.iter(&txn)? {
let (_key, val) = result?;

View File

@ -28,9 +28,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed

View File

@ -33,10 +33,10 @@ impl Table for Person3Table {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = GLOBALS.storage.env.write_txn()?;
let mut txn = GLOBALS.storage.env().write_txn()?;
let db = GLOBALS
.storage
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
.name(Self::lmdb_name())

View File

@ -32,9 +32,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -52,7 +52,7 @@ impl Storage {
pubkey: &PublicKey,
) -> Result<HashMap<PersonList1, Private>, Error> {
let key: Vec<u8> = pubkey.to_bytes();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(match self.db_person_lists2()?.get(&txn, &key)? {
None => HashMap::new(),
Some(bytes) => HashMap::<PersonList1, Private>::read_from_buffer(bytes)?,
@ -77,7 +77,7 @@ impl Storage {
}
pub(crate) fn get_people_in_all_followed_lists2(&self) -> Result<Vec<PublicKey>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut pubkeys: Vec<PublicKey> = Vec::new();
for result in self.db_person_lists2()?.iter(&txn)? {
let (key, val) = result?;
@ -94,7 +94,7 @@ impl Storage {
&self,
list: PersonList1,
) -> Result<Vec<(PublicKey, Private)>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<(PublicKey, Private)> = Vec::new();
for result in self.db_person_lists2()?.iter(&txn)? {
let (key, val) = result?;

View File

@ -27,9 +27,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -76,7 +76,7 @@ impl Storage {
pub(crate) fn get_all_person_list_metadata1(
&self,
) -> Result<Vec<(PersonList1, PersonListMetadata1)>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<(PersonList1, PersonListMetadata1)> = Vec::new();
for result in self.db_person_lists_metadata1()?.iter(&txn)? {
let (key, val) = result?;

View File

@ -27,9 +27,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -76,7 +76,7 @@ impl Storage {
pub(crate) fn get_all_person_list_metadata2(
&self,
) -> Result<Vec<(PersonList1, PersonListMetadata2)>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<(PersonList1, PersonListMetadata2)> = Vec::new();
for result in self.db_person_lists_metadata2()?.iter(&txn)? {
let (key, val) = result?;

View File

@ -28,9 +28,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -48,7 +48,7 @@ impl Storage {
list: PersonList1,
) -> Result<Option<PersonListMetadata3>, Error> {
let key: Vec<u8> = list.write_to_vec()?;
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(match self.db_person_lists_metadata3()?.get(&txn, &key)? {
None => None,
Some(bytes) => {
@ -99,7 +99,7 @@ impl Storage {
pub(crate) fn get_all_person_list_metadata3(
&self,
) -> Result<Vec<(PersonList1, PersonListMetadata3)>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<(PersonList1, PersonListMetadata3)> = Vec::new();
for result in self.db_person_lists_metadata3()?.iter(&txn)? {
let (key, val) = result?;
@ -120,7 +120,7 @@ impl Storage {
&self,
dtag: &str,
) -> Result<Option<(PersonList1, PersonListMetadata3)>, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
for result in self.db_person_lists_metadata3()?.iter(&txn)? {
let (key, val) = result?;
let list = PersonList1::read_from_buffer(key)?;

View File

@ -29,9 +29,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed

View File

@ -30,9 +30,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -46,7 +46,7 @@ impl Storage {
}
pub(crate) fn get_person_relays2_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_person_relays2()?.len(&txn)?)
}
@ -77,7 +77,7 @@ impl Storage {
let mut key = pubkey.to_bytes();
key.extend(url.as_str().as_bytes());
key.truncate(MAX_LMDB_KEY);
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(match self.db_person_relays2()?.get(&txn, &key)? {
Some(bytes) => Some(PersonRelay2::read_from_buffer(bytes)?),
None => None,
@ -86,7 +86,7 @@ impl Storage {
pub(crate) fn get_person_relays2(&self, pubkey: PublicKey) -> Result<Vec<PersonRelay2>, Error> {
let start_key = pubkey.to_bytes();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let iter = self.db_person_relays2()?.prefix_iter(&txn, &start_key)?;
let mut output: Vec<PersonRelay2> = Vec::new();
for result in iter {
@ -99,7 +99,7 @@ impl Storage {
pub(crate) fn have_persons_relays2(&self, pubkey: PublicKey) -> Result<bool, Error> {
let start_key = pubkey.to_bytes();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let iter = self.db_person_relays2()?.prefix_iter(&txn, &start_key)?;
for result in iter {
let (_key, val) = result?;

View File

@ -25,9 +25,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
.flags(DatabaseFlags::DUP_SORT) // NOT FIXED, RelationshipByAddr1 serialized isn't.

View File

@ -29,9 +29,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
.flags(DatabaseFlags::DUP_SORT) // NOT FIXED, RelationshipByAddr2 serialized isn't.
@ -66,7 +66,7 @@ impl Storage {
addr: &NAddr,
) -> Result<Vec<(Id, RelationshipByAddr2)>, Error> {
let key = relationships_by_addr2_into_key(addr);
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let iter = match self
.db_relationships_by_addr2()?
.get_duplicates(&txn, &key)?

View File

@ -32,9 +32,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed?

View File

@ -36,9 +36,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed?
@ -75,7 +75,7 @@ impl Storage {
id: Id,
) -> Result<Vec<(Id, RelationshipById2)>, Error> {
let start_key = id.as_slice();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let iter = self
.db_relationships_by_id2()?
.prefix_iter(&txn, start_key)?;

View File

@ -30,9 +30,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -47,7 +47,7 @@ impl Storage {
#[allow(dead_code)]
pub(crate) fn get_relays1_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_relays1()?.len(&txn)?)
}
@ -167,7 +167,7 @@ impl Storage {
if key.is_empty() {
return Err(ErrorKind::Empty("relay url".to_owned()).into());
}
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
match self.db_relays1()?.get(&txn, key)? {
Some(bytes) => Ok(Some(serde_json::from_slice(bytes)?)),
None => Ok(None),
@ -178,7 +178,7 @@ impl Storage {
where
F: Fn(&Relay1) -> bool,
{
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<Relay1> = Vec::new();
let iter = self.db_relays1()?.iter(&txn)?;
for result in iter {

View File

@ -28,9 +28,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -70,7 +70,7 @@ impl Storage {
where
F: Fn(&Relay2) -> bool,
{
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<Relay2> = Vec::new();
let iter = self.db_relays2()?.iter(&txn)?;
for result in iter {

View File

@ -29,9 +29,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -45,7 +45,7 @@ impl Storage {
}
pub(crate) fn get_relays3_len(&self) -> Result<u64, Error> {
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
Ok(self.db_relays3()?.len(&txn)?)
}
@ -179,7 +179,7 @@ impl Storage {
where
F: Fn(&Relay3) -> bool,
{
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let mut output: Vec<Relay3> = Vec::new();
let iter = self.db_relays3()?.iter(&txn)?;
for result in iter {

View File

@ -20,7 +20,7 @@ pub trait Table {
/// Number of records
#[allow(dead_code)]
fn num_records() -> Result<u64, Error> {
let txn = GLOBALS.storage.env.read_txn()?;
let txn = GLOBALS.storage.env().read_txn()?;
Ok(Self::db()?.len(&txn)?)
}

View File

@ -28,9 +28,9 @@ impl Storage {
// Create it. We know that nobody else is doing this and that
// it cannot happen twice.
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
let db = self
.env
.env()
.database_options()
.types::<Bytes, Bytes>()
// no .flags needed
@ -49,7 +49,7 @@ impl Storage {
}
let mut ids: Vec<Id> = Vec::new();
let txn = self.env.read_txn()?;
let txn = self.env().read_txn()?;
let iter = self.db_unindexed_giftwraps1()?.iter(&txn)?;
for result in iter {
let (key, _val) = result?;
@ -58,7 +58,7 @@ impl Storage {
ids.push(id);
}
let mut txn = self.env.write_txn()?;
let mut txn = self.env().write_txn()?;
for id in ids {
if let Some(event) = self.read_event(id)? {
self.write_event_akci_index(