From 8f1a4e037f6ee9db7347ba9b3251a2148a171043 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Wed, 11 Jan 2023 08:13:21 +1300 Subject: [PATCH] Distinguish (for relays) last_connected_at from last_general_eose_at --- src/db/mod.rs | 1 + src/db/relay.rs | 55 ++++++++++++++++++------- src/db/schema10.sql | 5 +++ src/overlord/minion/handle_websocket.rs | 21 +++++++++- src/overlord/minion/mod.rs | 18 ++++++-- src/overlord/minion/subscription.rs | 2 - 6 files changed, 80 insertions(+), 22 deletions(-) create mode 100644 src/db/schema10.sql diff --git a/src/db/mod.rs b/src/db/mod.rs index b7dd326a..f95f0680 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -111,6 +111,7 @@ fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> { apply_sql!(db, version, 7, "schema7.sql"); apply_sql!(db, version, 8, "schema8.sql"); apply_sql!(db, version, 9, "schema9.sql"); + apply_sql!(db, version, 10, "schema10.sql"); tracing::info!("Database is at version {}", version); Ok(()) } diff --git a/src/db/relay.rs b/src/db/relay.rs index 225a35cd..26d5e1c2 100644 --- a/src/db/relay.rs +++ b/src/db/relay.rs @@ -11,7 +11,8 @@ pub struct DbRelay { pub success_count: u64, pub failure_count: u64, pub rank: Option, - pub last_success_at: Option, + pub last_connected_at: Option, + pub last_general_eose_at: Option, pub post: bool, } @@ -28,15 +29,16 @@ impl DbRelay { success_count: 0, failure_count: 0, rank: Some(3), - last_success_at: None, + last_connected_at: None, + last_general_eose_at: None, post: false, }) } pub async fn fetch(criteria: Option<&str>) -> Result, Error> { - let sql = - "SELECT url, success_count, failure_count, rank, last_success_at, post FROM relay" - .to_owned(); + let sql = "SELECT url, success_count, failure_count, rank, last_connected_at, \ + last_general_eose_at, post FROM relay" + .to_owned(); let sql = match criteria { None => sql, Some(crit) => format!("{} WHERE {}", sql, crit), @@ -48,14 +50,15 @@ impl DbRelay { let mut stmt = db.prepare(&sql)?; let rows = stmt.query_map([], |row| { - let postint: u32 = row.get(5)?; + let postint: u32 = row.get(6)?; Ok(DbRelay { dirty: false, url: row.get(0)?, success_count: row.get(1)?, failure_count: row.get(2)?, rank: row.get(3)?, - last_success_at: row.get(4)?, + last_connected_at: row.get(4)?, + last_general_eose_at: row.get(5)?, post: (postint > 0), }) })?; @@ -87,8 +90,9 @@ impl DbRelay { return Err(Error::InvalidUrl(relay.url.clone())); } - let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank, last_success_at, post) \ - VALUES (?1, ?2, ?3, ?4, ?5, ?6)"; + let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank, \ + last_connected_at, last_general_eose_at, post) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"; spawn_blocking(move || { let maybe_db = GLOBALS.db.blocking_lock(); @@ -101,7 +105,8 @@ impl DbRelay { &relay.success_count, &relay.failure_count, &relay.rank, - &relay.last_success_at, + &relay.last_connected_at, + &relay.last_general_eose_at, &postint, ))?; Ok::<(), Error>(()) @@ -112,7 +117,8 @@ impl DbRelay { } pub async fn update(relay: DbRelay) -> Result<(), Error> { - let sql = "UPDATE relay SET success_count=?, failure_count=?, rank=?, last_success_at=?, post=? WHERE url=?"; + let sql = "UPDATE relay SET success_count=?, failure_count=?, rank=?, \ + last_connected_at=?, last_general_eose_at=?, post=? WHERE url=?"; spawn_blocking(move || { let maybe_db = GLOBALS.db.blocking_lock(); @@ -124,7 +130,8 @@ impl DbRelay { &relay.success_count, &relay.failure_count, &relay.rank, - &relay.last_success_at, + &relay.last_connected_at, + &relay.last_general_eose_at, &postint, &relay.url, ))?; @@ -136,8 +143,8 @@ impl DbRelay { } /// This also bumps success_count - pub async fn update_success(url: String, last_success_at: u64) -> Result<(), Error> { - let sql = "UPDATE relay SET success_count = success_count + 1, last_success_at = ? \ + pub async fn update_success(url: String, last_connected_at: u64) -> Result<(), Error> { + let sql = "UPDATE relay SET success_count = success_count + 1, last_connected_at = ? \ WHERE url = ?"; spawn_blocking(move || { @@ -145,7 +152,25 @@ impl DbRelay { let db = maybe_db.as_ref().unwrap(); let mut stmt = db.prepare(sql)?; - stmt.execute((&last_success_at, &url))?; + stmt.execute((&last_connected_at, &url))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + /// This also bumps success_count + pub async fn update_general_eose(url: String, last_general_eose_at: u64) -> Result<(), Error> { + let sql = + "UPDATE relay SET last_general_eose_at = max(?, ifnull(last_general_eose_at,0)) WHERE url = ?"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute((&last_general_eose_at, &url))?; Ok::<(), Error>(()) }) .await??; diff --git a/src/db/schema10.sql b/src/db/schema10.sql new file mode 100644 index 00000000..eed3e007 --- /dev/null +++ b/src/db/schema10.sql @@ -0,0 +1,5 @@ +ALTER TABLE relay RENAME COLUMN last_success_at TO last_connected_at; +ALTER TABLE relay ADD COLUMN last_general_eose_at INTEGER DEFAULT NULL; + +-- Start at last_connected_at +UPDATE relay SET last_general_eose_at = last_connected_at; diff --git a/src/overlord/minion/handle_websocket.rs b/src/overlord/minion/handle_websocket.rs index 69ab1350..c869e661 100644 --- a/src/overlord/minion/handle_websocket.rs +++ b/src/overlord/minion/handle_websocket.rs @@ -1,5 +1,6 @@ use super::Minion; -use crate::Error; +use crate::db::DbRelay; +use crate::error::Error; use futures::SinkExt; use nostr_types::{RelayMessage, Unixtime}; use tungstenite::protocol::Message as WsMessage; @@ -39,6 +40,18 @@ impl Minion { .unwrap_or_else(|| "_".to_owned()); tracing::debug!("{}: {}: NEW EVENT", &self.url, handle); + // Events that come in after EOSE on the general feed bump the last_general_eose + // timestamp for that relay, so we don't query before them next time we run. + if let Some(sub) = self.subscriptions.get_mut_by_id(&subid.0) { + if handle == "general_feed" && sub.eose() { + DbRelay::update_general_eose( + self.dbrelay.url.clone(), + event.created_at.0 as u64, + ) + .await?; + } + } + // Try processing everything immediately crate::process::process_new_event( &event, @@ -78,7 +91,7 @@ impl Minion { // Update the matching subscription match self.subscriptions.get_mut_by_id(&subid.0) { Some(sub) => { - tracing::trace!("{}: {}: EOSE: {:?}", &self.url, handle, subid); + tracing::debug!("{}: {}: EOSE: {:?}", &self.url, handle, subid); if close { let close_message = sub.close_message(); let websocket_sink = self.sink.as_mut().unwrap(); @@ -87,6 +100,10 @@ impl Minion { } else { sub.set_eose(); } + if handle == "general_feed" { + let now = Unixtime::now().unwrap().0 as u64; + DbRelay::update_general_eose(self.dbrelay.url.clone(), now).await?; + } } None => { tracing::debug!( diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index d911245d..8db37554 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -152,7 +152,7 @@ impl Minion { // Bump the success count for the relay { let now = Unixtime::now().unwrap().0 as u64; - DbRelay::update_success(self.dbrelay.url.clone(), now).await? + DbRelay::update_success(self.dbrelay.url.clone(), now).await?; } // Tell the overlord we are ready to receive commands @@ -300,7 +300,7 @@ impl Minion { // Start with where we left off, the time we last got something from // this relay. - let mut special_since: Unixtime = match self.dbrelay.last_success_at { + let mut special_since: Unixtime = match self.dbrelay.last_general_eose_at { Some(u) => Unixtime(u as i64), None => Unixtime(0), }; @@ -390,6 +390,18 @@ impl Minion { self.unsubscribe("general_feed").await?; } else { self.subscribe(filters, "general_feed").await?; + + if let Some(sub) = self.subscriptions.get_mut("general_feed") { + if let Some(nip11) = &self.nip11 { + if !nip11.supports_nip(15) { + // Does not support EOSE. Set subscription to EOSE now. + sub.set_eose(); + } + } else { + // Does not support EOSE. Set subscription to EOSE now. + sub.set_eose(); + } + } } Ok(()) @@ -528,7 +540,7 @@ impl Minion { // Start with where we left off, the time we last got something from // this relay. - let mut special_since: i64 = match self.dbrelay.last_success_at { + let mut special_since: i64 = match self.dbrelay.last_general_eose_at { Some(u) => u as i64, None => 0, }; diff --git a/src/overlord/minion/subscription.rs b/src/overlord/minion/subscription.rs index fd623ab8..04e829dc 100644 --- a/src/overlord/minion/subscription.rs +++ b/src/overlord/minion/subscription.rs @@ -103,11 +103,9 @@ impl Subscription { self.eose = true; } - /* pub fn eose(&self) -> bool { self.eose } - */ pub fn req_message(&self) -> ClientMessage { ClientMessage::Req(SubscriptionId(self.get_id()), self.filters.clone())