mirror of
https://github.com/mikedilger/gossip.git
synced 2024-09-19 19:46:50 +00:00
tracing logging changes
This commit is contained in:
parent
8d7f062a1a
commit
9c10c8a56c
@ -32,7 +32,6 @@ use crate::error::Error;
|
|||||||
use crate::globals::GLOBALS;
|
use crate::globals::GLOBALS;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
// This sets up the database
|
// This sets up the database
|
||||||
#[allow(clippy::or_fun_call)]
|
#[allow(clippy::or_fun_call)]
|
||||||
@ -88,7 +87,7 @@ fn check_and_upgrade() -> Result<(), Error> {
|
|||||||
macro_rules! apply_sql {
|
macro_rules! apply_sql {
|
||||||
($db:ident, $version:ident, $thisversion:expr, $file:expr) => {{
|
($db:ident, $version:ident, $thisversion:expr, $file:expr) => {{
|
||||||
if $version < $thisversion {
|
if $version < $thisversion {
|
||||||
info!("Upgrading database to version {}", $thisversion);
|
tracing::info!("Upgrading database to version {}", $thisversion);
|
||||||
$db.execute_batch(include_str!($file))?;
|
$db.execute_batch(include_str!($file))?;
|
||||||
$db.execute(
|
$db.execute(
|
||||||
&format!(
|
&format!(
|
||||||
@ -110,6 +109,6 @@ fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> {
|
|||||||
apply_sql!(db, version, 5, "schema5.sql");
|
apply_sql!(db, version, 5, "schema5.sql");
|
||||||
apply_sql!(db, version, 6, "schema6.sql");
|
apply_sql!(db, version, 6, "schema6.sql");
|
||||||
apply_sql!(db, version, 7, "schema7.sql");
|
apply_sql!(db, version, 7, "schema7.sql");
|
||||||
info!("Database is at version {}", version);
|
tracing::info!("Database is at version {}", version);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,6 @@ impl Fetcher {
|
|||||||
let cache_file = self.cache_file(&url);
|
let cache_file = self.cache_file(&url);
|
||||||
match fs::read(cache_file) {
|
match fs::read(cache_file) {
|
||||||
Ok(contents) => {
|
Ok(contents) => {
|
||||||
tracing::debug!("cache hit");
|
|
||||||
return Ok(Some(contents));
|
return Ok(Some(contents));
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -12,7 +12,6 @@ use rusqlite::Connection;
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
|
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
/// Only one of these is ever created, via lazy_static!, and represents
|
/// Only one of these is ever created, via lazy_static!, and represents
|
||||||
/// global state for the rust application
|
/// global state for the rust application
|
||||||
@ -169,7 +168,7 @@ impl Globals {
|
|||||||
count += 1;
|
count += 1;
|
||||||
crate::process::process_new_event(event, false, None).await?;
|
crate::process::process_new_event(event, false, None).await?;
|
||||||
}
|
}
|
||||||
info!("Loaded {} desired events from the database", count);
|
tracing::info!("Loaded {} desired events from the database", count);
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::trim_desired_events().await; // again
|
Self::trim_desired_events().await; // again
|
||||||
|
@ -2,7 +2,6 @@ use super::Minion;
|
|||||||
use crate::{BusMessage, Error};
|
use crate::{BusMessage, Error};
|
||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
use nostr_types::{ClientMessage, Event, IdHex, PublicKeyHex};
|
use nostr_types::{ClientMessage, Event, IdHex, PublicKeyHex};
|
||||||
use tracing::{info, warn};
|
|
||||||
use tungstenite::protocol::Message as WsMessage;
|
use tungstenite::protocol::Message as WsMessage;
|
||||||
|
|
||||||
impl Minion {
|
impl Minion {
|
||||||
@ -25,16 +24,17 @@ impl Minion {
|
|||||||
let wire = serde_json::to_string(&msg)?;
|
let wire = serde_json::to_string(&msg)?;
|
||||||
let ws_sink = self.sink.as_mut().unwrap();
|
let ws_sink = self.sink.as_mut().unwrap();
|
||||||
ws_sink.send(WsMessage::Text(wire)).await?;
|
ws_sink.send(WsMessage::Text(wire)).await?;
|
||||||
info!("Posted event to {}", &self.url);
|
tracing::info!("Posted event to {}", &self.url);
|
||||||
}
|
}
|
||||||
"temp_subscribe_metadata" => {
|
"temp_subscribe_metadata" => {
|
||||||
let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?;
|
let pubkeyhex: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?;
|
||||||
self.temp_subscribe_metadata(pubkeyhex).await?;
|
self.temp_subscribe_metadata(pubkeyhex).await?;
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
warn!(
|
tracing::warn!(
|
||||||
"{} Unrecognized bus message kind received by minion: {}",
|
"{} Unrecognized bus message kind received by minion: {}",
|
||||||
&self.url, bus_message.kind
|
&self.url,
|
||||||
|
bus_message.kind
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ use crate::globals::GLOBALS;
|
|||||||
use crate::Error;
|
use crate::Error;
|
||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
use nostr_types::{EventKind, RelayMessage, Unixtime};
|
use nostr_types::{EventKind, RelayMessage, Unixtime};
|
||||||
use tracing::{debug, error, info, warn};
|
|
||||||
use tungstenite::protocol::Message as WsMessage;
|
use tungstenite::protocol::Message as WsMessage;
|
||||||
|
|
||||||
impl Minion {
|
impl Minion {
|
||||||
@ -19,7 +18,7 @@ impl Minion {
|
|||||||
match relay_message {
|
match relay_message {
|
||||||
RelayMessage::Event(subid, event) => {
|
RelayMessage::Event(subid, event) => {
|
||||||
if let Err(e) = event.verify(Some(maxtime)) {
|
if let Err(e) = event.verify(Some(maxtime)) {
|
||||||
error!(
|
tracing::error!(
|
||||||
"{}: VERIFY ERROR: {}, {}",
|
"{}: VERIFY ERROR: {}, {}",
|
||||||
&self.url,
|
&self.url,
|
||||||
e,
|
e,
|
||||||
@ -30,7 +29,7 @@ impl Minion {
|
|||||||
.subscriptions
|
.subscriptions
|
||||||
.get_handle_by_id(&subid.0)
|
.get_handle_by_id(&subid.0)
|
||||||
.unwrap_or_else(|| "_".to_owned());
|
.unwrap_or_else(|| "_".to_owned());
|
||||||
debug!("{}: {}: NEW EVENT", &self.url, handle);
|
tracing::trace!("{}: {}: NEW EVENT", &self.url, handle);
|
||||||
|
|
||||||
if event.kind == EventKind::TextNote {
|
if event.kind == EventKind::TextNote {
|
||||||
// Just store text notes in incoming
|
// Just store text notes in incoming
|
||||||
@ -47,7 +46,7 @@ impl Minion {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
RelayMessage::Notice(msg) => {
|
RelayMessage::Notice(msg) => {
|
||||||
info!("{}: NOTICE: {}", &self.url, msg);
|
tracing::info!("{}: NOTICE: {}", &self.url, msg);
|
||||||
}
|
}
|
||||||
RelayMessage::Eose(subid) => {
|
RelayMessage::Eose(subid) => {
|
||||||
let handle = self
|
let handle = self
|
||||||
@ -60,7 +59,7 @@ impl Minion {
|
|||||||
// Update the matching subscription
|
// Update the matching subscription
|
||||||
match self.subscriptions.get_mut_by_id(&subid.0) {
|
match self.subscriptions.get_mut_by_id(&subid.0) {
|
||||||
Some(sub) => {
|
Some(sub) => {
|
||||||
info!("{}: {}: EOSE: {:?}", &self.url, handle, subid);
|
tracing::trace!("{}: {}: EOSE: {:?}", &self.url, handle, subid);
|
||||||
if close {
|
if close {
|
||||||
let close_message = sub.close_message();
|
let close_message = sub.close_message();
|
||||||
let websocket_sink = self.sink.as_mut().unwrap();
|
let websocket_sink = self.sink.as_mut().unwrap();
|
||||||
@ -71,16 +70,18 @@ impl Minion {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!(
|
tracing::debug!(
|
||||||
"{}: {} EOSE for unknown subscription {:?}",
|
"{}: {} EOSE for unknown subscription {:?}",
|
||||||
&self.url, handle, subid
|
&self.url,
|
||||||
|
handle,
|
||||||
|
subid
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RelayMessage::Ok(id, ok, ok_message) => {
|
RelayMessage::Ok(id, ok, ok_message) => {
|
||||||
// These don't have to be processed.
|
// These don't have to be processed.
|
||||||
info!(
|
tracing::info!(
|
||||||
"{}: OK: id={} ok={} message=\"{}\"",
|
"{}: OK: id={} ok={} message=\"{}\"",
|
||||||
&self.url,
|
&self.url,
|
||||||
id.as_hex_string(),
|
id.as_hex_string(),
|
||||||
|
@ -18,7 +18,6 @@ use tokio::select;
|
|||||||
use tokio::sync::broadcast::Receiver;
|
use tokio::sync::broadcast::Receiver;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||||
use tracing::{debug, error, info, trace, warn};
|
|
||||||
use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};
|
use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};
|
||||||
|
|
||||||
pub struct Minion {
|
pub struct Minion {
|
||||||
@ -68,20 +67,20 @@ impl Minion {
|
|||||||
pub async fn handle(&mut self) {
|
pub async fn handle(&mut self) {
|
||||||
// Catch errors, Return nothing.
|
// Catch errors, Return nothing.
|
||||||
if let Err(e) = self.handle_inner().await {
|
if let Err(e) = self.handle_inner().await {
|
||||||
error!("{}: ERROR: {}", &self.url, e);
|
tracing::error!("{}: ERROR: {}", &self.url, e);
|
||||||
|
|
||||||
// Bump the failure count for the relay.
|
// Bump the failure count for the relay.
|
||||||
self.dbrelay.failure_count += 1;
|
self.dbrelay.failure_count += 1;
|
||||||
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
|
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
|
||||||
error!("{}: ERROR bumping relay failure count: {}", &self.url, e);
|
tracing::error!("{}: ERROR bumping relay failure count: {}", &self.url, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("{}: minion exiting", self.url);
|
tracing::info!("{}: minion exiting", self.url);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_inner(&mut self) -> Result<(), Error> {
|
async fn handle_inner(&mut self) -> Result<(), Error> {
|
||||||
info!("{}: Minion handling started", &self.url);
|
tracing::trace!("{}: Minion handling started", &self.url); // minion will log when it connects
|
||||||
|
|
||||||
// Connect to the relay
|
// Connect to the relay
|
||||||
let websocket_stream = {
|
let websocket_stream = {
|
||||||
@ -107,11 +106,11 @@ impl Minion {
|
|||||||
{
|
{
|
||||||
match response.json::<RelayInformationDocument>().await {
|
match response.json::<RelayInformationDocument>().await {
|
||||||
Ok(nip11) => {
|
Ok(nip11) => {
|
||||||
info!("{}: {}", &self.url, nip11);
|
tracing::info!("{}: {}", &self.url, nip11);
|
||||||
self.nip11 = Some(nip11);
|
self.nip11 = Some(nip11);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("{}: Unable to parse response as NIP-11: {}", &self.url, e);
|
tracing::warn!("{}: Unable to parse response as NIP-11: {}", &self.url, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -140,7 +139,7 @@ impl Minion {
|
|||||||
tokio_tungstenite::connect_async_with_config(req, Some(config)),
|
tokio_tungstenite::connect_async_with_config(req, Some(config)),
|
||||||
)
|
)
|
||||||
.await??;
|
.await??;
|
||||||
info!("{}: Connected", &self.url);
|
tracing::info!("{}: Connected", &self.url);
|
||||||
|
|
||||||
websocket_stream
|
websocket_stream
|
||||||
};
|
};
|
||||||
@ -167,7 +166,7 @@ impl Minion {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Log them and keep going
|
// Log them and keep going
|
||||||
error!("{}: {}", &self.url, e);
|
tracing::error!("{}: {}", &self.url, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -195,7 +194,7 @@ impl Minion {
|
|||||||
None => return Ok(false), // probably connection reset
|
None => return Ok(false), // probably connection reset
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
trace!("{}: Handling message", &self.url);
|
tracing::trace!("{}: Handling message", &self.url);
|
||||||
match ws_message {
|
match ws_message {
|
||||||
WsMessage::Text(t) => {
|
WsMessage::Text(t) => {
|
||||||
// MAYBE FIXME, spawn a separate task here so that
|
// MAYBE FIXME, spawn a separate task here so that
|
||||||
@ -204,11 +203,11 @@ impl Minion {
|
|||||||
// FIXME: some errors we should probably bail on.
|
// FIXME: some errors we should probably bail on.
|
||||||
// For now, try to continue.
|
// For now, try to continue.
|
||||||
},
|
},
|
||||||
WsMessage::Binary(_) => warn!("{}, Unexpected binary message", &self.url),
|
WsMessage::Binary(_) => tracing::warn!("{}, Unexpected binary message", &self.url),
|
||||||
WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?,
|
WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?,
|
||||||
WsMessage::Pong(_) => { }, // we just ignore pongs
|
WsMessage::Pong(_) => { }, // we just ignore pongs
|
||||||
WsMessage::Close(_) => keepgoing = false,
|
WsMessage::Close(_) => keepgoing = false,
|
||||||
WsMessage::Frame(_) => warn!("{}: Unexpected frame message", &self.url),
|
WsMessage::Frame(_) => tracing::warn!("{}: Unexpected frame message", &self.url),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
bus_message = self.from_overlord.recv() => {
|
bus_message = self.from_overlord.recv() => {
|
||||||
@ -224,7 +223,7 @@ impl Minion {
|
|||||||
self.handle_bus_message(bus_message).await?;
|
self.handle_bus_message(bus_message).await?;
|
||||||
} else if &*bus_message.target == "all" {
|
} else if &*bus_message.target == "all" {
|
||||||
if &*bus_message.kind == "shutdown" {
|
if &*bus_message.kind == "shutdown" {
|
||||||
info!("{}: Websocket listener shutting down", &self.url);
|
tracing::info!("{}: Websocket listener shutting down", &self.url);
|
||||||
keepgoing = false;
|
keepgoing = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -310,7 +309,7 @@ impl Minion {
|
|||||||
feed_filter.add_event_kind(EventKind::EventDeletion);
|
feed_filter.add_event_kind(EventKind::EventDeletion);
|
||||||
feed_filter.since = Some(feed_since);
|
feed_filter.since = Some(feed_since);
|
||||||
|
|
||||||
debug!(
|
tracing::trace!(
|
||||||
"{}: Feed Filter: {} authors",
|
"{}: Feed Filter: {} authors",
|
||||||
&self.url,
|
&self.url,
|
||||||
feed_filter.authors.len()
|
feed_filter.authors.len()
|
||||||
@ -326,7 +325,8 @@ impl Minion {
|
|||||||
special_filter.add_event_kind(EventKind::ContactList);
|
special_filter.add_event_kind(EventKind::ContactList);
|
||||||
special_filter.add_event_kind(EventKind::RelaysList);
|
special_filter.add_event_kind(EventKind::RelaysList);
|
||||||
special_filter.since = Some(special_since);
|
special_filter.since = Some(special_since);
|
||||||
debug!(
|
|
||||||
|
tracing::trace!(
|
||||||
"{}: Special Filter: {} authors",
|
"{}: Special Filter: {} authors",
|
||||||
&self.url,
|
&self.url,
|
||||||
special_filter.authors.len()
|
special_filter.authors.len()
|
||||||
@ -350,7 +350,7 @@ impl Minion {
|
|||||||
let wire = serde_json::to_string(&req_message)?;
|
let wire = serde_json::to_string(&req_message)?;
|
||||||
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
||||||
|
|
||||||
trace!("{}: Sent {}", &self.url, &wire);
|
tracing::trace!("{}: Sent {}", &self.url, &wire);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -364,7 +364,7 @@ impl Minion {
|
|||||||
let mut filter = Filter::new();
|
let mut filter = Filter::new();
|
||||||
filter.ids = ids;
|
filter.ids = ids;
|
||||||
|
|
||||||
debug!("{}: Event Filter: {} events", &self.url, filter.ids.len());
|
tracing::trace!("{}: Event Filter: {} events", &self.url, filter.ids.len());
|
||||||
|
|
||||||
// create a handle for ourselves
|
// create a handle for ourselves
|
||||||
let handle = format!("temp_events_{}", self.next_events_subscription_id);
|
let handle = format!("temp_events_{}", self.next_events_subscription_id);
|
||||||
@ -381,7 +381,7 @@ impl Minion {
|
|||||||
let wire = serde_json::to_string(&req_message)?;
|
let wire = serde_json::to_string(&req_message)?;
|
||||||
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
||||||
|
|
||||||
trace!("{}: Sent {}", &self.url, &wire);
|
tracing::trace!("{}: Sent {}", &self.url, &wire);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -411,7 +411,7 @@ impl Minion {
|
|||||||
let wire = serde_json::to_string(&req_message)?;
|
let wire = serde_json::to_string(&req_message)?;
|
||||||
let websocket_sink = self.sink.as_mut().unwrap();
|
let websocket_sink = self.sink.as_mut().unwrap();
|
||||||
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
||||||
trace!("{}: Sent {}", &self.url, &wire);
|
tracing::trace!("{}: Sent {}", &self.url, &wire);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -424,7 +424,7 @@ impl Minion {
|
|||||||
let wire = serde_json::to_string(&close_message)?;
|
let wire = serde_json::to_string(&close_message)?;
|
||||||
let websocket_sink = self.sink.as_mut().unwrap();
|
let websocket_sink = self.sink.as_mut().unwrap();
|
||||||
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
|
||||||
trace!("{}: Sent {}", &self.url, &wire);
|
tracing::trace!("{}: Sent {}", &self.url, &wire);
|
||||||
self.subscriptions.remove(handle);
|
self.subscriptions.remove(handle);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,6 @@ use std::collections::HashMap;
|
|||||||
use tokio::sync::broadcast::Sender;
|
use tokio::sync::broadcast::Sender;
|
||||||
use tokio::sync::mpsc::UnboundedReceiver;
|
use tokio::sync::mpsc::UnboundedReceiver;
|
||||||
use tokio::{select, task};
|
use tokio::{select, task};
|
||||||
use tracing::{debug, error, info, warn};
|
|
||||||
use zeroize::Zeroize;
|
use zeroize::Zeroize;
|
||||||
|
|
||||||
pub struct Overlord {
|
pub struct Overlord {
|
||||||
@ -46,16 +45,16 @@ impl Overlord {
|
|||||||
|
|
||||||
pub async fn run(&mut self) {
|
pub async fn run(&mut self) {
|
||||||
if let Err(e) = self.run_inner().await {
|
if let Err(e) = self.run_inner().await {
|
||||||
error!("{}", e);
|
tracing::error!("{}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Overlord signalling UI to shutdown");
|
tracing::info!("Overlord signalling UI to shutdown");
|
||||||
|
|
||||||
GLOBALS
|
GLOBALS
|
||||||
.shutting_down
|
.shutting_down
|
||||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
info!("Overlord signalling minions to shutdown");
|
tracing::info!("Overlord signalling minions to shutdown");
|
||||||
|
|
||||||
// Send shutdown message to all minions (and ui)
|
// Send shutdown message to all minions (and ui)
|
||||||
// If this fails, it's probably because there are no more listeners
|
// If this fails, it's probably because there are no more listeners
|
||||||
@ -66,7 +65,7 @@ impl Overlord {
|
|||||||
json_payload: serde_json::to_string("shutdown").unwrap(),
|
json_payload: serde_json::to_string("shutdown").unwrap(),
|
||||||
});
|
});
|
||||||
|
|
||||||
info!("Overlord waiting for minions to all shutdown");
|
tracing::info!("Overlord waiting for minions to all shutdown");
|
||||||
|
|
||||||
// Listen on self.minions until it is empty
|
// Listen on self.minions until it is empty
|
||||||
while !self.minions.is_empty() {
|
while !self.minions.is_empty() {
|
||||||
@ -75,7 +74,7 @@ impl Overlord {
|
|||||||
self.handle_task_nextjoined(task_nextjoined).await;
|
self.handle_task_nextjoined(task_nextjoined).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Overlord confirms all minions have shutdown");
|
tracing::info!("Overlord confirms all minions have shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_inner(&mut self) -> Result<(), Error> {
|
pub async fn run_inner(&mut self) -> Result<(), Error> {
|
||||||
@ -125,7 +124,7 @@ impl Overlord {
|
|||||||
let e: Event = match serde_json::from_str(&dbevent.raw) {
|
let e: Event = match serde_json::from_str(&dbevent.raw) {
|
||||||
Ok(e) => e,
|
Ok(e) => e,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("Bad raw event: id={}, raw={}", dbevent.id, dbevent.raw);
|
tracing::error!("Bad raw event: id={}, raw={}", dbevent.id, dbevent.raw);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -159,7 +158,7 @@ impl Overlord {
|
|||||||
count += 1;
|
count += 1;
|
||||||
crate::process::process_new_event(event, false, None).await?;
|
crate::process::process_new_event(event, false, None).await?;
|
||||||
}
|
}
|
||||||
info!("Loaded {} events from the database", count);
|
tracing::info!("Loaded {} events from the database", count);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pick Relays and start Minions
|
// Pick Relays and start Minions
|
||||||
@ -192,7 +191,7 @@ impl Overlord {
|
|||||||
let mut relay_count = 0;
|
let mut relay_count = 0;
|
||||||
loop {
|
loop {
|
||||||
if relay_count >= max_relays {
|
if relay_count >= max_relays {
|
||||||
warn!(
|
tracing::info!(
|
||||||
"Safety catch: we have picked {} relays. That's enough.",
|
"Safety catch: we have picked {} relays. That's enough.",
|
||||||
max_relays
|
max_relays
|
||||||
);
|
);
|
||||||
@ -200,7 +199,7 @@ impl Overlord {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if relay_picker.is_degenerate() {
|
if relay_picker.is_degenerate() {
|
||||||
info!(
|
tracing::debug!(
|
||||||
"Relay picker is degenerate, relays={} pubkey_counts={}, person_relays={}",
|
"Relay picker is degenerate, relays={} pubkey_counts={}, person_relays={}",
|
||||||
relay_picker.relays.len(),
|
relay_picker.relays.len(),
|
||||||
relay_picker.pubkey_counts.len(),
|
relay_picker.pubkey_counts.len(),
|
||||||
@ -214,7 +213,7 @@ impl Overlord {
|
|||||||
relay_picker = rp;
|
relay_picker = rp;
|
||||||
|
|
||||||
if best_relay.is_degenerate() {
|
if best_relay.is_degenerate() {
|
||||||
info!("Best relay is now degenerate.");
|
tracing::debug!("Best relay is now degenerate.");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,7 +227,7 @@ impl Overlord {
|
|||||||
json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(),
|
json_payload: serde_json::to_string(&best_relay.pubkeys).unwrap(),
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(
|
tracing::info!(
|
||||||
"Picked relay {} covering {} people.",
|
"Picked relay {} covering {} people.",
|
||||||
&best_relay.relay.url,
|
&best_relay.relay.url,
|
||||||
best_relay.pubkeys.len()
|
best_relay.pubkeys.len()
|
||||||
@ -237,7 +236,7 @@ impl Overlord {
|
|||||||
relay_count += 1;
|
relay_count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Listening on {} relays", relay_count);
|
tracing::info!("Listening on {} relays", relay_count);
|
||||||
|
|
||||||
// Get desired events from relays
|
// Get desired events from relays
|
||||||
self.get_missing_events().await?;
|
self.get_missing_events().await?;
|
||||||
@ -252,7 +251,7 @@ impl Overlord {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Log them and keep looping
|
// Log them and keep looping
|
||||||
error!("{}", e);
|
tracing::error!("{}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -281,7 +280,7 @@ impl Overlord {
|
|||||||
async fn loop_handler(&mut self) -> Result<bool, Error> {
|
async fn loop_handler(&mut self) -> Result<bool, Error> {
|
||||||
let mut keepgoing: bool = true;
|
let mut keepgoing: bool = true;
|
||||||
|
|
||||||
tracing::debug!("overlord looping");
|
tracing::trace!("overlord looping");
|
||||||
|
|
||||||
if self.minions.is_empty() {
|
if self.minions.is_empty() {
|
||||||
// Just listen on inbox
|
// Just listen on inbox
|
||||||
@ -331,7 +330,7 @@ impl Overlord {
|
|||||||
Some(url) => {
|
Some(url) => {
|
||||||
// JoinError also has is_cancelled, is_panic, into_panic, try_into_panic
|
// JoinError also has is_cancelled, is_panic, into_panic, try_into_panic
|
||||||
// Minion probably alreaedy logged, this may be redundant.
|
// Minion probably alreaedy logged, this may be redundant.
|
||||||
warn!("Minion {} completed with error: {}", &url, join_error);
|
tracing::error!("Minion {} completed with error: {}", &url, join_error);
|
||||||
|
|
||||||
// Minion probably already logged failure in relay table
|
// Minion probably already logged failure in relay table
|
||||||
|
|
||||||
@ -342,7 +341,7 @@ impl Overlord {
|
|||||||
self.minions_task_url.remove(&id);
|
self.minions_task_url.remove(&id);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!("Minion UNKNOWN completed with error: {}", join_error);
|
tracing::error!("Minion UNKNOWN completed with error: {}", join_error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -350,7 +349,7 @@ impl Overlord {
|
|||||||
let maybe_url = self.minions_task_url.get(&id);
|
let maybe_url = self.minions_task_url.get(&id);
|
||||||
match maybe_url {
|
match maybe_url {
|
||||||
Some(url) => {
|
Some(url) => {
|
||||||
info!("Relay Task {} completed", &url);
|
tracing::info!("Relay Task {} completed", &url);
|
||||||
|
|
||||||
// Remove from our urls_watching vec
|
// Remove from our urls_watching vec
|
||||||
self.urls_watching.retain(|value| value != url);
|
self.urls_watching.retain(|value| value != url);
|
||||||
@ -358,7 +357,7 @@ impl Overlord {
|
|||||||
// Remove from our hashmap
|
// Remove from our hashmap
|
||||||
self.minions_task_url.remove(&id);
|
self.minions_task_url.remove(&id);
|
||||||
}
|
}
|
||||||
None => warn!("Relay Task UNKNOWN completed"),
|
None => tracing::error!("Relay Task UNKNOWN completed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -369,7 +368,7 @@ impl Overlord {
|
|||||||
match &*bus_message.target {
|
match &*bus_message.target {
|
||||||
"all" => match &*bus_message.kind {
|
"all" => match &*bus_message.kind {
|
||||||
"shutdown" => {
|
"shutdown" => {
|
||||||
info!("Overlord shutting down");
|
tracing::info!("Overlord shutting down");
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
@ -378,7 +377,7 @@ impl Overlord {
|
|||||||
"minion_is_ready" => {}
|
"minion_is_ready" => {}
|
||||||
"save_settings" => {
|
"save_settings" => {
|
||||||
GLOBALS.settings.read().await.save().await?;
|
GLOBALS.settings.read().await.save().await?;
|
||||||
debug!("Settings saved.");
|
tracing::debug!("Settings saved.");
|
||||||
}
|
}
|
||||||
"get_missing_events" => {
|
"get_missing_events" => {
|
||||||
self.get_missing_events().await?;
|
self.get_missing_events().await?;
|
||||||
@ -387,7 +386,7 @@ impl Overlord {
|
|||||||
let dns_id: String = serde_json::from_str(&bus_message.json_payload)?;
|
let dns_id: String = serde_json::from_str(&bus_message.json_payload)?;
|
||||||
let _ = tokio::spawn(async move {
|
let _ = tokio::spawn(async move {
|
||||||
if let Err(e) = Overlord::get_and_follow_nip35(dns_id).await {
|
if let Err(e) = Overlord::get_and_follow_nip35(dns_id).await {
|
||||||
error!("{}", e);
|
tracing::error!("{}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -486,7 +485,7 @@ impl Overlord {
|
|||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(_, r)| if r.dirty { Some(r.to_owned()) } else { None })
|
.filter_map(|(_, r)| if r.dirty { Some(r.to_owned()) } else { None })
|
||||||
.collect();
|
.collect();
|
||||||
info!("Saving {} relays", dirty_relays.len());
|
tracing::info!("Saving {} relays", dirty_relays.len());
|
||||||
for relay in dirty_relays.iter() {
|
for relay in dirty_relays.iter() {
|
||||||
// Just update 'post' since that's all 'dirty' indicates currently
|
// Just update 'post' since that's all 'dirty' indicates currently
|
||||||
DbRelay::update_post(relay.url.to_owned(), relay.post).await?;
|
DbRelay::update_post(relay.url.to_owned(), relay.post).await?;
|
||||||
@ -559,7 +558,7 @@ impl Overlord {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Seeking {} events", desired_count);
|
tracing::info!("Seeking {} events", desired_count);
|
||||||
|
|
||||||
let urls: Vec<Url> = desired_events_map
|
let urls: Vec<Url> = desired_events_map
|
||||||
.keys()
|
.keys()
|
||||||
@ -584,7 +583,7 @@ impl Overlord {
|
|||||||
self.start_minion(url.inner().to_owned()).await?;
|
self.start_minion(url.inner().to_owned()).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("{}: Asking to fetch {} events", url.inner(), ids.len());
|
tracing::debug!("{}: Asking to fetch {} events", url.inner(), ids.len());
|
||||||
|
|
||||||
// Tell it to get these events
|
// Tell it to get these events
|
||||||
let _ = self.to_minions.send(BusMessage {
|
let _ = self.to_minions.send(BusMessage {
|
||||||
@ -647,7 +646,7 @@ impl Overlord {
|
|||||||
.async_follow(&(*pubkey).into(), true)
|
.async_follow(&(*pubkey).into(), true)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!("Followed {}", &dns_id);
|
tracing::info!("Followed {}", &dns_id);
|
||||||
|
|
||||||
let relays = match nip05.relays.get(pubkey) {
|
let relays = match nip05.relays.get(pubkey) {
|
||||||
Some(relays) => relays,
|
Some(relays) => relays,
|
||||||
@ -671,7 +670,7 @@ impl Overlord {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Setup {} relays for {}", relays.len(), &dns_id);
|
tracing::info!("Setup {} relays for {}", relays.len(), &dns_id);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -686,7 +685,7 @@ impl Overlord {
|
|||||||
.async_follow(&pkhex, true)
|
.async_follow(&pkhex, true)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
debug!("Followed {}", &pkhex);
|
tracing::debug!("Followed {}", &pkhex);
|
||||||
|
|
||||||
// Save relay
|
// Save relay
|
||||||
let relay_url = Url::new(&relay);
|
let relay_url = Url::new(&relay);
|
||||||
@ -704,7 +703,7 @@ impl Overlord {
|
|||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!("Setup 1 relay for {}", &pkhex);
|
tracing::info!("Setup 1 relay for {}", &pkhex);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -719,7 +718,7 @@ impl Overlord {
|
|||||||
.async_follow(&pkhex, true)
|
.async_follow(&pkhex, true)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
debug!("Followed {}", &pkhex);
|
tracing::debug!("Followed {}", &pkhex);
|
||||||
|
|
||||||
// Save relay
|
// Save relay
|
||||||
let relay_url = Url::new(&relay);
|
let relay_url = Url::new(&relay);
|
||||||
@ -737,7 +736,7 @@ impl Overlord {
|
|||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!("Setup 1 relay for {}", &pkhex);
|
tracing::info!("Setup 1 relay for {}", &pkhex);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -747,7 +746,7 @@ impl Overlord {
|
|||||||
let public_key = match GLOBALS.signer.read().await.public_key() {
|
let public_key = match GLOBALS.signer.read().await.public_key() {
|
||||||
Some(pk) => pk,
|
Some(pk) => pk,
|
||||||
None => {
|
None => {
|
||||||
warn!("No public key! Not posting");
|
tracing::warn!("No public key! Not posting");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -781,7 +780,7 @@ impl Overlord {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send it the event to post
|
// Send it the event to post
|
||||||
debug!("Asking {} to post", &relay.url);
|
tracing::debug!("Asking {} to post", &relay.url);
|
||||||
|
|
||||||
let _ = self.to_minions.send(BusMessage {
|
let _ = self.to_minions.send(BusMessage {
|
||||||
target: relay.url.clone(),
|
target: relay.url.clone(),
|
||||||
@ -801,7 +800,7 @@ impl Overlord {
|
|||||||
let public_key = match GLOBALS.signer.read().await.public_key() {
|
let public_key = match GLOBALS.signer.read().await.public_key() {
|
||||||
Some(pk) => pk,
|
Some(pk) => pk,
|
||||||
None => {
|
None => {
|
||||||
warn!("No public key! Not posting");
|
tracing::warn!("No public key! Not posting");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -839,7 +838,7 @@ impl Overlord {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send it the event to post
|
// Send it the event to post
|
||||||
debug!("Asking {} to post", &relay.url);
|
tracing::debug!("Asking {} to post", &relay.url);
|
||||||
|
|
||||||
let _ = self.to_minions.send(BusMessage {
|
let _ = self.to_minions.send(BusMessage {
|
||||||
target: relay.url.clone(),
|
target: relay.url.clone(),
|
||||||
|
@ -2,7 +2,6 @@ use crate::db::{DbPersonRelay, DbRelay};
|
|||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use nostr_types::PublicKeyHex;
|
use nostr_types::PublicKeyHex;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
/// See RelayPicker::best()
|
/// See RelayPicker::best()
|
||||||
pub struct RelayPicker {
|
pub struct RelayPicker {
|
||||||
@ -33,7 +32,7 @@ impl RelayPicker {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
tracing::info!(
|
||||||
"Searching for the best relay among {} for {} people",
|
"Searching for the best relay among {} for {} people",
|
||||||
self.relays.len(),
|
self.relays.len(),
|
||||||
self.pubkey_counts.len()
|
self.pubkey_counts.len()
|
||||||
|
@ -2,7 +2,6 @@ use crate::error::Error;
|
|||||||
use crate::globals::GLOBALS;
|
use crate::globals::GLOBALS;
|
||||||
use nostr_types::{EncryptedPrivateKey, PublicKey};
|
use nostr_types::{EncryptedPrivateKey, PublicKey};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
pub const DEFAULT_FEED_CHUNK: u64 = 43200; // 12 hours
|
pub const DEFAULT_FEED_CHUNK: u64 = 43200; // 12 hours
|
||||||
pub const DEFAULT_OVERLAP: u64 = 600; // 10 minutes
|
pub const DEFAULT_OVERLAP: u64 = 600; // 10 minutes
|
||||||
@ -92,7 +91,7 @@ impl Settings {
|
|||||||
settings.public_key = match PublicKey::try_from_hex_string(&row.1) {
|
settings.public_key = match PublicKey::try_from_hex_string(&row.1) {
|
||||||
Ok(pk) => Some(pk),
|
Ok(pk) => Some(pk),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Public key in database is invalid or corrupt: {}", e);
|
tracing::error!("Public key in database is invalid or corrupt: {}", e);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user