Process posting success/failure: save in event_seen, or demerit the relay

This commit is contained in:
Mike Dilger 2023-05-09 09:53:49 +12:00
parent 23db997112
commit b4e886fbd2
5 changed files with 79 additions and 38 deletions

View File

@ -8,9 +8,9 @@ fn main() {
println!("cargo:rustc-env=GIT_HASH={git_hash}");
// link to bundled libraries
#[cfg(target_os="macos")]
#[cfg(target_os = "macos")]
println!("cargo:rustc-link-arg=-Wl,-rpath,@loader_path");
#[cfg(target_os="linux")]
#[cfg(target_os = "linux")]
println!("cargo:rustc-link-arg=-Wl,-rpath,$ORIGIN");
}

View File

@ -66,8 +66,8 @@ impl DbEventRelay {
relays
}
pub async fn replace(event_relay: DbEventRelay) -> Result<(), Error> {
let sql = "REPLACE INTO event_relay (event, relay, when_seen) \
pub async fn insert(event_relay: DbEventRelay) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO event_relay (event, relay, when_seen) \
VALUES (?1, ?2, ?3)";
spawn_blocking(move || {

View File

@ -1,5 +1,5 @@
use super::Minion;
use crate::db::DbRelay;
use crate::db::{DbEventRelay, DbRelay};
use crate::error::Error;
use crate::globals::GLOBALS;
use futures::SinkExt;
@ -109,17 +109,34 @@ impl Minion {
}
}
RelayMessage::Ok(id, ok, ok_message) => {
// These don't have to be processed.
let relay_response = format!(
"{}: OK: id={} ok={} message=\"{}\"",
&self.url,
id.as_hex_string(),
ok,
ok_message
);
let url = &self.url;
let idhex = id.as_hex_string();
let relay_response = if !ok_message.is_empty() {
format!("{url}: OK={ok} id={idhex} message=\"{ok_message}\"")
} else {
format!("{url}: OK={ok} id={idhex}")
};
// If we are waiting for a response for this id, process
if self.postings.contains(&id) {
if ok {
// save in event_relay
let event_relay = DbEventRelay {
event: idhex,
relay: url.0.to_owned(),
when_seen: Unixtime::now()?.0 as u64,
};
DbEventRelay::insert(event_relay).await?;
} else {
// demerit the relay
self.bump_failure_count().await;
}
self.postings.remove(&id);
}
match ok {
true => tracing::info!(relay_response),
false => tracing::warn!(relay_response),
true => tracing::info!("{relay_response}"),
false => tracing::warn!("{relay_response}"),
}
}
RelayMessage::Auth(challenge) => {

View File

@ -14,11 +14,12 @@ use http::uri::{Parts, Scheme};
use http::Uri;
use mime::Mime;
use nostr_types::{
ClientMessage, EventKind, Filter, IdHex, IdHexPrefix, PublicKeyHex, PublicKeyHexPrefix,
ClientMessage, EventKind, Filter, Id, IdHex, IdHexPrefix, PublicKeyHex, PublicKeyHexPrefix,
RelayInformationDocument, RelayUrl, Unixtime,
};
use reqwest::Response;
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::time::Duration;
use subscription::Subscriptions;
@ -40,6 +41,7 @@ pub struct Minion {
subscriptions: Subscriptions,
next_events_subscription_id: u32,
keepgoing: bool,
postings: HashSet<Id>,
}
impl Minion {
@ -66,6 +68,7 @@ impl Minion {
subscriptions: Subscriptions::new(),
next_events_subscription_id: 0,
keepgoing: true,
postings: HashSet::new(),
})
}
}
@ -75,16 +78,7 @@ impl Minion {
// Catch errors, Return nothing.
if let Err(e) = self.handle_inner().await {
tracing::error!("{}: ERROR: {}", &self.url, e);
// Bump the failure count for the relay.
self.dbrelay.failure_count += 1;
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
tracing::error!("{}: ERROR bumping relay failure count: {}", &self.url, e);
}
// Update in globals too
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) {
dbrelay.failure_count += 1;
}
self.bump_failure_count().await;
}
tracing::info!("{}: minion exiting", self.url);
@ -210,17 +204,7 @@ impl Minion {
self.sink = Some(sink);
// Bump the success count for the relay
{
self.dbrelay.success_count += 1;
self.dbrelay.last_connected_at = Some(Unixtime::now().unwrap().0 as u64);
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
tracing::error!("{}: ERROR bumping relay success count: {}", &self.url, e);
}
// set in globals
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) {
dbrelay.last_connected_at = self.dbrelay.last_connected_at;
}
}
self.bump_success_count(true).await;
// Tell the overlord we are ready to receive commands
self.tell_overlord_we_are_ready().await?;
@ -312,6 +296,8 @@ impl Minion {
self.get_event(id).await?;
}
ToMinionPayload::PostEvent(event) => {
let id = event.id;
self.postings.insert(id);
let msg = ClientMessage::Event(event);
let wire = serde_json::to_string(&msg)?;
let ws_sink = self.sink.as_mut().unwrap();
@ -957,4 +943,42 @@ impl Minion {
Ok(String::from_utf8_unchecked(full.to_vec()))
}
}
async fn bump_failure_count(&mut self) {
// Update in self
self.dbrelay.failure_count += 1;
// Save to database
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
tracing::error!("{}: ERROR bumping relay failure count: {}", &self.url, e);
}
// Update in globals
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) {
dbrelay.failure_count += 1;
}
}
async fn bump_success_count(&mut self, also_bump_last_connected: bool) {
let now = Unixtime::now().unwrap().0 as u64;
// Update in self
self.dbrelay.success_count += 1;
if also_bump_last_connected {
self.dbrelay.last_connected_at = Some(now);
}
// Save to database
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
tracing::error!("{}: ERROR bumping relay success count: {}", &self.url, e);
}
// Update in globals
if let Some(mut dbrelay) = GLOBALS.all_relays.get_mut(&self.dbrelay.url) {
dbrelay.success_count += 1;
if also_bump_last_connected {
dbrelay.last_connected_at = Some(now);
}
}
}
}

View File

@ -86,7 +86,7 @@ pub async fn process_new_event(
relay: url.0.to_owned(),
when_seen: now,
};
if let Err(e) = DbEventRelay::replace(db_event_relay).await {
if let Err(e) = DbEventRelay::insert(db_event_relay).await {
tracing::error!(
"Error saving relay of old-event {} {}: {}",
event.id.as_hex_string(),