diff --git a/src/db/event_relay.rs b/src/db/event_relay.rs index 18735ff2..1a6ed03c 100644 --- a/src/db/event_relay.rs +++ b/src/db/event_relay.rs @@ -1,115 +1,28 @@ use crate::error::Error; use crate::globals::GLOBALS; -use nostr_types::{Id, RelayUrl}; -use rusqlite::DatabaseName; +use nostr_types::{Id, RelayUrl, Unixtime}; use serde::{Deserialize, Serialize}; -use tokio::task::spawn_blocking; #[derive(Debug, Serialize, Deserialize)] pub struct DbEventRelay { - pub event: String, - pub relay: String, - pub when_seen: u64, + pub id: Id, + pub relay: RelayUrl, + pub when_seen: Unixtime, } impl DbEventRelay { - /* - pub async fn fetch(criteria: Option<&str>) -> Result, Error> { - let sql = "SELECT event, relay, when_seen FROM event_relay".to_owned(); - let sql = match criteria { - None => sql, - Some(crit) => format!("{} WHERE {}", sql, crit), - }; - - let output: Result, Error> = spawn_blocking(move || { - let db = GLOBALS.db.blocking_lock(); - let mut stmt = db.prepare(&sql)?; - let rows = stmt.query_map([], |row| { - Ok(DbEventRelay { - event: row.get(0)?, - relay: row.get(1)?, - when_seen: row.get(2)?, - }) - })?; - - let mut output: Vec = Vec::new(); - for row in rows { - output.push(row?); - } - Ok(output) - }) - .await?; - - output - } - */ - - pub async fn get_relays_for_event(id: Id) -> Result, Error> { - let sql = "SELECT relay FROM event_relay WHERE event=?"; - - let relays: Result, Error> = spawn_blocking(move || { - let db = GLOBALS.db.blocking_lock(); - let mut stmt = db.prepare(sql)?; - stmt.raw_bind_parameter(1, id.as_hex_string())?; - let mut rows = stmt.raw_query(); - let mut relays: Vec = Vec::new(); - while let Some(row) = rows.next()? { - let s: String = row.get(0)?; - // Just skip over bad relay URLs - if let Ok(url) = RelayUrl::try_from_str(&s) { - relays.push(url); - } - } - Ok(relays) - }) - .await?; - - relays + pub fn get_relays_for_event(id: Id) -> Result, Error> { + Ok(GLOBALS + .storage + .get_event_seen_on_relay(id)? + .drain(..) + .map(|(url, _time)| url) + .collect()) } - // Sometimes we insert an event and an event_relay so fast that this happens first - // and we get a 'FOREIGN KEY constraint failed' error. - pub async fn insert(event_relay: DbEventRelay, ignore_constraint: bool) -> Result<(), Error> { - let sql = "INSERT OR IGNORE INTO event_relay (event, relay, when_seen) \ - VALUES (?1, ?2, ?3)"; - - spawn_blocking(move || { - let db = GLOBALS.db.blocking_lock(); - - if ignore_constraint { - db.pragma_update(Some(DatabaseName::Main), "foreign_keys", false)?; - } - - let mut stmt = db.prepare(sql)?; - rtry!(stmt.execute(( - &event_relay.event, - &event_relay.relay, - &event_relay.when_seen, - ))); - - if ignore_constraint { - db.pragma_update(Some(DatabaseName::Main), "foreign_keys", true)?; - } - - Ok::<(), Error>(()) - }) - .await??; - - Ok(()) + pub fn save(&self) -> Result<(), Error> { + GLOBALS + .storage + .add_event_seen_on_relay(self.id, &self.relay, self.when_seen) } - - /* - pub async fn delete(criteria: &str) -> Result<(), Error> { - let sql = format!("DELETE FROM event_relay WHERE {}", criteria); - - spawn_blocking(move || { - let db = GLOBALS.db.blocking_lock(); - db.execute(&sql, [])?; - Ok::<(), Error>(()) - }) - .await??; - - Ok(()) - } - */ } diff --git a/src/events.rs b/src/events.rs index 2d510719..5156e58c 100644 --- a/src/events.rs +++ b/src/events.rs @@ -224,7 +224,7 @@ impl Events { tracing::info!("Loading event seen-on data..."); for dashref in self.events.iter() { let id = dashref.key(); - let vecurl = DbEventRelay::get_relays_for_event(*id).await?; + let vecurl = DbEventRelay::get_relays_for_event(*id)?; for url in vecurl.iter() { self.add_seen_on(*id, url); } diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index 40419494..ca4fad3e 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -123,11 +123,11 @@ impl Minion { // save seen_on data in database let event_relay = DbEventRelay { - event: idhex, - relay: url.0.to_owned(), - when_seen: Unixtime::now()?.0 as u64, + id, + relay: url.clone(), + when_seen: Unixtime::now()?, }; - DbEventRelay::insert(event_relay, true).await?; + event_relay.save()?; } else { // demerit the relay self.bump_failure_count().await; diff --git a/src/overlord/mod.rs b/src/overlord/mod.rs index 2852b84c..144f324c 100644 --- a/src/overlord/mod.rs +++ b/src/overlord/mod.rs @@ -1635,8 +1635,8 @@ impl Overlord { let mut missing_ancestors: Vec = Vec::new(); // Include the relays where the referenced_by event was seen - relays.extend(DbEventRelay::get_relays_for_event(referenced_by).await?); - relays.extend(DbEventRelay::get_relays_for_event(id).await?); + relays.extend(DbEventRelay::get_relays_for_event(referenced_by)?); + relays.extend(DbEventRelay::get_relays_for_event(id)?); // If we have less than 2 relays, include the write relays of the author if relays.len() < 2 { diff --git a/src/process.rs b/src/process.rs index aac1a602..f4390954 100644 --- a/src/process.rs +++ b/src/process.rs @@ -16,7 +16,7 @@ pub async fn process_new_event( seen_on: Option, subscription: Option, ) -> Result<(), Error> { - let now = Unixtime::now()?.0 as u64; + let now = Unixtime::now()?; // If it was from a relay, // Insert into database; bail if event is an already-replaced replaceable event. @@ -83,11 +83,11 @@ pub async fn process_new_event( // Insert into event_relay "seen" relationship (database) if from_relay { let db_event_relay = DbEventRelay { - event: event.id.as_hex_string(), - relay: url.0.to_owned(), + id: event.id, + relay: url.to_owned(), when_seen: now, }; - if let Err(e) = DbEventRelay::insert(db_event_relay, true).await { + if let Err(e) = db_event_relay.save() { tracing::error!( "Error saving relay of old-event {} {}: {}", event.id.as_hex_string(), @@ -103,8 +103,12 @@ pub async fn process_new_event( .await?; // Update person_relay.last_fetched - DbPersonRelay::upsert_last_fetched(event.pubkey.as_hex_string(), url.to_owned(), now) - .await?; + DbPersonRelay::upsert_last_fetched( + event.pubkey.as_hex_string(), + url.to_owned(), + now.0 as u64, + ) + .await?; } } @@ -176,11 +180,10 @@ pub async fn process_new_event( .await?; // upsert person_relay.last_suggested_bytag - let now = Unixtime::now()?.0 as u64; DbPersonRelay::upsert_last_suggested_bytag( pubkey.to_string(), url.clone(), - now, + now.0 as u64, ) .await?; }