Distinguish (for relays) last_connected_at from last_general_eose_at

This commit is contained in:
Mike Dilger 2023-01-11 08:13:21 +13:00 committed by Mike Dilger
parent cc917122a8
commit 8f1a4e037f
6 changed files with 80 additions and 22 deletions

View File

@ -111,6 +111,7 @@ fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> {
apply_sql!(db, version, 7, "schema7.sql"); apply_sql!(db, version, 7, "schema7.sql");
apply_sql!(db, version, 8, "schema8.sql"); apply_sql!(db, version, 8, "schema8.sql");
apply_sql!(db, version, 9, "schema9.sql"); apply_sql!(db, version, 9, "schema9.sql");
apply_sql!(db, version, 10, "schema10.sql");
tracing::info!("Database is at version {}", version); tracing::info!("Database is at version {}", version);
Ok(()) Ok(())
} }

View File

@ -11,7 +11,8 @@ pub struct DbRelay {
pub success_count: u64, pub success_count: u64,
pub failure_count: u64, pub failure_count: u64,
pub rank: Option<u64>, pub rank: Option<u64>,
pub last_success_at: Option<u64>, pub last_connected_at: Option<u64>,
pub last_general_eose_at: Option<u64>,
pub post: bool, pub post: bool,
} }
@ -28,15 +29,16 @@ impl DbRelay {
success_count: 0, success_count: 0,
failure_count: 0, failure_count: 0,
rank: Some(3), rank: Some(3),
last_success_at: None, last_connected_at: None,
last_general_eose_at: None,
post: false, post: false,
}) })
} }
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbRelay>, Error> { pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbRelay>, Error> {
let sql = let sql = "SELECT url, success_count, failure_count, rank, last_connected_at, \
"SELECT url, success_count, failure_count, rank, last_success_at, post FROM relay" last_general_eose_at, post FROM relay"
.to_owned(); .to_owned();
let sql = match criteria { let sql = match criteria {
None => sql, None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit), Some(crit) => format!("{} WHERE {}", sql, crit),
@ -48,14 +50,15 @@ impl DbRelay {
let mut stmt = db.prepare(&sql)?; let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| { let rows = stmt.query_map([], |row| {
let postint: u32 = row.get(5)?; let postint: u32 = row.get(6)?;
Ok(DbRelay { Ok(DbRelay {
dirty: false, dirty: false,
url: row.get(0)?, url: row.get(0)?,
success_count: row.get(1)?, success_count: row.get(1)?,
failure_count: row.get(2)?, failure_count: row.get(2)?,
rank: row.get(3)?, rank: row.get(3)?,
last_success_at: row.get(4)?, last_connected_at: row.get(4)?,
last_general_eose_at: row.get(5)?,
post: (postint > 0), post: (postint > 0),
}) })
})?; })?;
@ -87,8 +90,9 @@ impl DbRelay {
return Err(Error::InvalidUrl(relay.url.clone())); return Err(Error::InvalidUrl(relay.url.clone()));
} }
let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank, last_success_at, post) \ let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank, \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)"; last_connected_at, last_general_eose_at, post) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)";
spawn_blocking(move || { spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock(); let maybe_db = GLOBALS.db.blocking_lock();
@ -101,7 +105,8 @@ impl DbRelay {
&relay.success_count, &relay.success_count,
&relay.failure_count, &relay.failure_count,
&relay.rank, &relay.rank,
&relay.last_success_at, &relay.last_connected_at,
&relay.last_general_eose_at,
&postint, &postint,
))?; ))?;
Ok::<(), Error>(()) Ok::<(), Error>(())
@ -112,7 +117,8 @@ impl DbRelay {
} }
pub async fn update(relay: DbRelay) -> Result<(), Error> { pub async fn update(relay: DbRelay) -> Result<(), Error> {
let sql = "UPDATE relay SET success_count=?, failure_count=?, rank=?, last_success_at=?, post=? WHERE url=?"; let sql = "UPDATE relay SET success_count=?, failure_count=?, rank=?, \
last_connected_at=?, last_general_eose_at=?, post=? WHERE url=?";
spawn_blocking(move || { spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock(); let maybe_db = GLOBALS.db.blocking_lock();
@ -124,7 +130,8 @@ impl DbRelay {
&relay.success_count, &relay.success_count,
&relay.failure_count, &relay.failure_count,
&relay.rank, &relay.rank,
&relay.last_success_at, &relay.last_connected_at,
&relay.last_general_eose_at,
&postint, &postint,
&relay.url, &relay.url,
))?; ))?;
@ -136,8 +143,8 @@ impl DbRelay {
} }
/// This also bumps success_count /// This also bumps success_count
pub async fn update_success(url: String, last_success_at: u64) -> Result<(), Error> { pub async fn update_success(url: String, last_connected_at: u64) -> Result<(), Error> {
let sql = "UPDATE relay SET success_count = success_count + 1, last_success_at = ? \ let sql = "UPDATE relay SET success_count = success_count + 1, last_connected_at = ? \
WHERE url = ?"; WHERE url = ?";
spawn_blocking(move || { spawn_blocking(move || {
@ -145,7 +152,25 @@ impl DbRelay {
let db = maybe_db.as_ref().unwrap(); let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?; let mut stmt = db.prepare(sql)?;
stmt.execute((&last_success_at, &url))?; stmt.execute((&last_connected_at, &url))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
/// This also bumps success_count
pub async fn update_general_eose(url: String, last_general_eose_at: u64) -> Result<(), Error> {
let sql =
"UPDATE relay SET last_general_eose_at = max(?, ifnull(last_general_eose_at,0)) WHERE url = ?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&last_general_eose_at, &url))?;
Ok::<(), Error>(()) Ok::<(), Error>(())
}) })
.await??; .await??;

5
src/db/schema10.sql Normal file
View File

@ -0,0 +1,5 @@
ALTER TABLE relay RENAME COLUMN last_success_at TO last_connected_at;
ALTER TABLE relay ADD COLUMN last_general_eose_at INTEGER DEFAULT NULL;
-- Start at last_connected_at
UPDATE relay SET last_general_eose_at = last_connected_at;

View File

@ -1,5 +1,6 @@
use super::Minion; use super::Minion;
use crate::Error; use crate::db::DbRelay;
use crate::error::Error;
use futures::SinkExt; use futures::SinkExt;
use nostr_types::{RelayMessage, Unixtime}; use nostr_types::{RelayMessage, Unixtime};
use tungstenite::protocol::Message as WsMessage; use tungstenite::protocol::Message as WsMessage;
@ -39,6 +40,18 @@ impl Minion {
.unwrap_or_else(|| "_".to_owned()); .unwrap_or_else(|| "_".to_owned());
tracing::debug!("{}: {}: NEW EVENT", &self.url, handle); tracing::debug!("{}: {}: NEW EVENT", &self.url, handle);
// Events that come in after EOSE on the general feed bump the last_general_eose
// timestamp for that relay, so we don't query before them next time we run.
if let Some(sub) = self.subscriptions.get_mut_by_id(&subid.0) {
if handle == "general_feed" && sub.eose() {
DbRelay::update_general_eose(
self.dbrelay.url.clone(),
event.created_at.0 as u64,
)
.await?;
}
}
// Try processing everything immediately // Try processing everything immediately
crate::process::process_new_event( crate::process::process_new_event(
&event, &event,
@ -78,7 +91,7 @@ impl Minion {
// Update the matching subscription // Update the matching subscription
match self.subscriptions.get_mut_by_id(&subid.0) { match self.subscriptions.get_mut_by_id(&subid.0) {
Some(sub) => { Some(sub) => {
tracing::trace!("{}: {}: EOSE: {:?}", &self.url, handle, subid); tracing::debug!("{}: {}: EOSE: {:?}", &self.url, handle, subid);
if close { if close {
let close_message = sub.close_message(); let close_message = sub.close_message();
let websocket_sink = self.sink.as_mut().unwrap(); let websocket_sink = self.sink.as_mut().unwrap();
@ -87,6 +100,10 @@ impl Minion {
} else { } else {
sub.set_eose(); sub.set_eose();
} }
if handle == "general_feed" {
let now = Unixtime::now().unwrap().0 as u64;
DbRelay::update_general_eose(self.dbrelay.url.clone(), now).await?;
}
} }
None => { None => {
tracing::debug!( tracing::debug!(

View File

@ -152,7 +152,7 @@ impl Minion {
// Bump the success count for the relay // Bump the success count for the relay
{ {
let now = Unixtime::now().unwrap().0 as u64; let now = Unixtime::now().unwrap().0 as u64;
DbRelay::update_success(self.dbrelay.url.clone(), now).await? DbRelay::update_success(self.dbrelay.url.clone(), now).await?;
} }
// Tell the overlord we are ready to receive commands // Tell the overlord we are ready to receive commands
@ -300,7 +300,7 @@ impl Minion {
// Start with where we left off, the time we last got something from // Start with where we left off, the time we last got something from
// this relay. // this relay.
let mut special_since: Unixtime = match self.dbrelay.last_success_at { let mut special_since: Unixtime = match self.dbrelay.last_general_eose_at {
Some(u) => Unixtime(u as i64), Some(u) => Unixtime(u as i64),
None => Unixtime(0), None => Unixtime(0),
}; };
@ -390,6 +390,18 @@ impl Minion {
self.unsubscribe("general_feed").await?; self.unsubscribe("general_feed").await?;
} else { } else {
self.subscribe(filters, "general_feed").await?; self.subscribe(filters, "general_feed").await?;
if let Some(sub) = self.subscriptions.get_mut("general_feed") {
if let Some(nip11) = &self.nip11 {
if !nip11.supports_nip(15) {
// Does not support EOSE. Set subscription to EOSE now.
sub.set_eose();
}
} else {
// Does not support EOSE. Set subscription to EOSE now.
sub.set_eose();
}
}
} }
Ok(()) Ok(())
@ -528,7 +540,7 @@ impl Minion {
// Start with where we left off, the time we last got something from // Start with where we left off, the time we last got something from
// this relay. // this relay.
let mut special_since: i64 = match self.dbrelay.last_success_at { let mut special_since: i64 = match self.dbrelay.last_general_eose_at {
Some(u) => u as i64, Some(u) => u as i64,
None => 0, None => 0,
}; };

View File

@ -103,11 +103,9 @@ impl Subscription {
self.eose = true; self.eose = true;
} }
/*
pub fn eose(&self) -> bool { pub fn eose(&self) -> bool {
self.eose self.eose
} }
*/
pub fn req_message(&self) -> ClientMessage { pub fn req_message(&self) -> ClientMessage {
ClientMessage::Req(SubscriptionId(self.get_id()), self.filters.clone()) ClientMessage::Req(SubscriptionId(self.get_id()), self.filters.clone())