lmdb: event_relay: Switch to LMDB

This commit is contained in:
Mike Dilger 2023-07-18 13:45:46 +12:00
parent ff8db47f7c
commit 0663477f3a
5 changed files with 33 additions and 117 deletions

View File

@ -1,115 +1,28 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_types::{Id, RelayUrl};
use rusqlite::DatabaseName;
use nostr_types::{Id, RelayUrl, Unixtime};
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Serialize, Deserialize)]
pub struct DbEventRelay {
pub event: String,
pub relay: String,
pub when_seen: u64,
pub id: Id,
pub relay: RelayUrl,
pub when_seen: Unixtime,
}
impl DbEventRelay {
/*
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbEventRelay>, Error> {
let sql = "SELECT event, relay, when_seen FROM event_relay".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbEventRelay>, Error> = spawn_blocking(move || {
let db = GLOBALS.db.blocking_lock();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbEventRelay {
event: row.get(0)?,
relay: row.get(1)?,
when_seen: row.get(2)?,
})
})?;
let mut output: Vec<DbEventRelay> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
*/
pub async fn get_relays_for_event(id: Id) -> Result<Vec<RelayUrl>, Error> {
let sql = "SELECT relay FROM event_relay WHERE event=?";
let relays: Result<Vec<RelayUrl>, Error> = spawn_blocking(move || {
let db = GLOBALS.db.blocking_lock();
let mut stmt = db.prepare(sql)?;
stmt.raw_bind_parameter(1, id.as_hex_string())?;
let mut rows = stmt.raw_query();
let mut relays: Vec<RelayUrl> = Vec::new();
while let Some(row) = rows.next()? {
let s: String = row.get(0)?;
// Just skip over bad relay URLs
if let Ok(url) = RelayUrl::try_from_str(&s) {
relays.push(url);
}
}
Ok(relays)
})
.await?;
relays
pub fn get_relays_for_event(id: Id) -> Result<Vec<RelayUrl>, Error> {
Ok(GLOBALS
.storage
.get_event_seen_on_relay(id)?
.drain(..)
.map(|(url, _time)| url)
.collect())
}
// Sometimes we insert an event and an event_relay so fast that this happens first
// and we get a 'FOREIGN KEY constraint failed' error.
pub async fn insert(event_relay: DbEventRelay, ignore_constraint: bool) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO event_relay (event, relay, when_seen) \
VALUES (?1, ?2, ?3)";
spawn_blocking(move || {
let db = GLOBALS.db.blocking_lock();
if ignore_constraint {
db.pragma_update(Some(DatabaseName::Main), "foreign_keys", false)?;
}
let mut stmt = db.prepare(sql)?;
rtry!(stmt.execute((
&event_relay.event,
&event_relay.relay,
&event_relay.when_seen,
)));
if ignore_constraint {
db.pragma_update(Some(DatabaseName::Main), "foreign_keys", true)?;
}
Ok::<(), Error>(())
})
.await??;
Ok(())
pub fn save(&self) -> Result<(), Error> {
GLOBALS
.storage
.add_event_seen_on_relay(self.id, &self.relay, self.when_seen)
}
/*
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM event_relay WHERE {}", criteria);
spawn_blocking(move || {
let db = GLOBALS.db.blocking_lock();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
*/
}

View File

@ -224,7 +224,7 @@ impl Events {
tracing::info!("Loading event seen-on data...");
for dashref in self.events.iter() {
let id = dashref.key();
let vecurl = DbEventRelay::get_relays_for_event(*id).await?;
let vecurl = DbEventRelay::get_relays_for_event(*id)?;
for url in vecurl.iter() {
self.add_seen_on(*id, url);
}

View File

@ -123,11 +123,11 @@ impl Minion {
// save seen_on data in database
let event_relay = DbEventRelay {
event: idhex,
relay: url.0.to_owned(),
when_seen: Unixtime::now()?.0 as u64,
id,
relay: url.clone(),
when_seen: Unixtime::now()?,
};
DbEventRelay::insert(event_relay, true).await?;
event_relay.save()?;
} else {
// demerit the relay
self.bump_failure_count().await;

View File

@ -1635,8 +1635,8 @@ impl Overlord {
let mut missing_ancestors: Vec<Id> = Vec::new();
// Include the relays where the referenced_by event was seen
relays.extend(DbEventRelay::get_relays_for_event(referenced_by).await?);
relays.extend(DbEventRelay::get_relays_for_event(id).await?);
relays.extend(DbEventRelay::get_relays_for_event(referenced_by)?);
relays.extend(DbEventRelay::get_relays_for_event(id)?);
// If we have less than 2 relays, include the write relays of the author
if relays.len() < 2 {

View File

@ -16,7 +16,7 @@ pub async fn process_new_event(
seen_on: Option<RelayUrl>,
subscription: Option<String>,
) -> Result<(), Error> {
let now = Unixtime::now()?.0 as u64;
let now = Unixtime::now()?;
// If it was from a relay,
// Insert into database; bail if event is an already-replaced replaceable event.
@ -83,11 +83,11 @@ pub async fn process_new_event(
// Insert into event_relay "seen" relationship (database)
if from_relay {
let db_event_relay = DbEventRelay {
event: event.id.as_hex_string(),
relay: url.0.to_owned(),
id: event.id,
relay: url.to_owned(),
when_seen: now,
};
if let Err(e) = DbEventRelay::insert(db_event_relay, true).await {
if let Err(e) = db_event_relay.save() {
tracing::error!(
"Error saving relay of old-event {} {}: {}",
event.id.as_hex_string(),
@ -103,8 +103,12 @@ pub async fn process_new_event(
.await?;
// Update person_relay.last_fetched
DbPersonRelay::upsert_last_fetched(event.pubkey.as_hex_string(), url.to_owned(), now)
.await?;
DbPersonRelay::upsert_last_fetched(
event.pubkey.as_hex_string(),
url.to_owned(),
now.0 as u64,
)
.await?;
}
}
@ -176,11 +180,10 @@ pub async fn process_new_event(
.await?;
// upsert person_relay.last_suggested_bytag
let now = Unixtime::now()?.0 as u64;
DbPersonRelay::upsert_last_suggested_bytag(
pubkey.to_string(),
url.clone(),
now,
now.0 as u64,
)
.await?;
}