Overlord, and shutdown handling

This commit is contained in:
Mike Dilger 2022-12-20 20:07:22 +13:00
parent 170270d83e
commit 36da2c831d
3 changed files with 314 additions and 3 deletions

View File

@ -8,18 +8,60 @@ mod db;
mod error; mod error;
mod event_related; mod event_related;
mod globals; mod globals;
mod overlord;
mod settings; mod settings;
mod ui; mod ui;
use crate::comms::BusMessage;
use crate::error::Error;
use crate::globals::GLOBALS;
use std::ops::DerefMut;
use std::{mem, thread};
fn main() { fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// TBD: start async code // Start async code
// We do this on a separate thread because egui is most portable by
// being on the main thread.
let async_thread = thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(tokio_main());
});
if let Err(e) = ui::run() { if let Err(e) = ui::run() {
tracing::error!("{}", e); tracing::error!("{}", e);
} }
// TBD: Tell the async parties to close down // Tell the async parties to close down
// TBD: wait for the async parties to close down if let Err(e) = initiate_shutdown() {
tracing::error!("{}", e);
}
// Wait for the async thread to complete
async_thread.join().unwrap();
}
async fn tokio_main() {
// Steal `from_minions` from the GLOBALS, and give it to a new Overlord
let from_minions = {
let mut mutex_option = GLOBALS.from_minions.lock().await;
mem::replace(mutex_option.deref_mut(), None)
}
.unwrap();
// Run the overlord
let mut overlord = crate::overlord::Overlord::new(from_minions);
overlord.run().await;
}
// Any task can call this to shutdown
pub fn initiate_shutdown() -> Result<(), Error> {
let to_overlord = GLOBALS.to_overlord.clone();
to_overlord.send(BusMessage {
target: "all".to_string(),
kind: "shutdown".to_string(),
json_payload: serde_json::to_string("").unwrap(),
})?;
Ok(())
} }

159
src/overlord/mod.rs Normal file
View File

@ -0,0 +1,159 @@
mod relay_picker;
use crate::comms::BusMessage;
use crate::db::{DbEvent, DbPerson, DbRelay};
use crate::error::Error;
use crate::globals::GLOBALS;
use crate::settings::Settings;
use nostr_proto::{Event, Unixtime};
use tokio::select;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::{error, info};
pub struct Overlord {
settings: Settings,
to_minions: Sender<BusMessage>,
#[allow(dead_code)]
from_minions: UnboundedReceiver<BusMessage>,
}
impl Overlord {
pub fn new(from_minions: UnboundedReceiver<BusMessage>) -> Overlord {
let to_minions = GLOBALS.to_minions.clone();
Overlord {
settings: Settings::default(),
to_minions,
from_minions,
}
}
pub async fn run(&mut self) {
if let Err(e) = self.run_inner().await {
error!("{}", e);
}
// Send shutdown message to all minions (and ui)
// If this fails, it's probably because there are no more listeners
// so just ignore it and keep shutting down.
let _ = self.to_minions.send(BusMessage {
target: "all".to_string(),
kind: "shutdown".to_string(),
json_payload: serde_json::to_string("shutdown").unwrap(),
});
// Wait on all minions to finish. When there are no more senders
// sending to `from_minions` then they are all completed.
// In that case this call will return an error, but we don't care we
// just finish.
let _ = self.from_minions.recv();
}
pub async fn run_inner(&mut self) -> Result<(), Error> {
// Setup the database (possibly create, possibly upgrade)
crate::db::setup_database().await?;
// Load settings
self.settings = Settings::load().await?;
// FIXME - if this needs doing, it should be done dynamically as
// new people are encountered, not batch-style on startup.
// Create a person record for every person seen, possibly autofollow
DbPerson::populate_new_people(self.settings.autofollow != 0).await?;
// FIXME - if this needs doing, it should be done dynamically as
// new people are encountered, not batch-style on startup.
// Create a relay record for every relay in person_relay map (these get
// updated from events without necessarily updating our relays list)
DbRelay::populate_new_relays().await?;
// Load feed-related events from database and process (TextNote, EventDeletion, Reaction)
{
let now = Unixtime::now().unwrap();
let then = now.0 - self.settings.feed_chunk as i64;
let db_events = DbEvent::fetch(Some(&format!(
" (kind=1 OR kind=5 OR kind=7) AND created_at > {} ORDER BY created_at ASC",
then
)))
.await?;
// Map db events into Events
let mut events: Vec<Event> = Vec::with_capacity(db_events.len());
for dbevent in db_events.iter() {
let e = serde_json::from_str(&dbevent.raw)?;
events.push(e);
}
// Process these events
let mut count = 0;
for event in events.iter() {
count += 1;
crate::globals::add_event(event).await?;
}
info!("Loaded {} events from the database", count);
}
'mainloop: loop {
match self.loop_handler().await {
Ok(keepgoing) => {
if !keepgoing {
break 'mainloop;
}
}
Err(e) => {
// Log them and keep looping
error!("{}", e);
}
}
}
Ok(())
}
#[allow(unused_assignments)]
async fn loop_handler(&mut self) -> Result<bool, Error> {
let mut keepgoing: bool = true;
select! {
bus_message = self.from_minions.recv() => {
let bus_message = match bus_message {
Some(bm) => bm,
None => {
// All senders dropped, or one of them closed.
return Ok(false);
}
};
keepgoing = self.handle_bus_message(bus_message).await?;
},
}
Ok(keepgoing)
}
async fn handle_bus_message(&mut self, bus_message: BusMessage) -> Result<bool, Error> {
#[allow(clippy::single_match)] // because temporarily so
match &*bus_message.target {
"all" => match &*bus_message.kind {
"shutdown" => {
info!("Overlord shutting down");
return Ok(false);
}
"settings_changed" => {
self.settings = serde_json::from_str(&bus_message.json_payload)?;
// We need to inform the minions
self.to_minions.send(BusMessage {
target: "all".to_string(),
kind: "settings_changed".to_string(),
json_payload: bus_message.json_payload.clone(),
})?;
}
_ => {}
},
//"overlord" => match &*bus_message.kind {
//}
_ => {}
}
Ok(true)
}
}

View File

@ -0,0 +1,110 @@
use crate::db::{DbPersonRelay, DbRelay};
use crate::error::Error;
use nostr_proto::PublicKeyHex;
use tracing::info;
/// See RelayPicker::best()
#[allow(dead_code)]
pub struct RelayPicker {
pub relays: Vec<DbRelay>,
pub pubkeys: Vec<PublicKeyHex>,
pub person_relays: Vec<DbPersonRelay>,
}
impl RelayPicker {
#[allow(dead_code)]
pub fn is_degenerate(&self) -> bool {
self.relays.is_empty() || self.pubkeys.is_empty() || self.person_relays.is_empty()
}
/// This function takes a RelayPicker which consists of a list of relays,
/// a list of public keys, and a mapping between them. It outputs a
/// BestRelay structure which includes the best relay to listen to and
/// the public keys such a relay will cover. It also outpus a new RelayPicker
/// that contains only the remaining relays and public keys.
#[allow(dead_code)]
pub fn best(mut self) -> Result<(BestRelay, RelayPicker), Error> {
if self.pubkeys.is_empty() {
return Err(Error::General(
"best_relay called for zero people".to_owned(),
));
}
if self.relays.is_empty() {
return Err(Error::General(
"best_relay called for zero relays".to_owned(),
));
}
info!(
"Searching for the best relay among {} for {} people",
self.relays.len(),
self.pubkeys.len()
);
// Keep score
let mut score: Vec<u64> = [0].repeat(self.relays.len());
// Count how many keys a relay covers, to use as part of it's score
for person_relay in self.person_relays.iter() {
let i = match self
.relays
.iter()
.position(|relay| relay.url == person_relay.relay)
{
Some(index) => index,
None => continue, // we don't have that relay?
};
score[i] += 1;
}
// Multiply scores by relay rank
#[allow(clippy::needless_range_loop)]
for i in 0..self.relays.len() {
score[i] *= self.relays[i].rank.unwrap_or(1);
}
let winner_index = score
.iter()
.enumerate()
.max_by(|x: &(usize, &u64), y: &(usize, &u64)| x.1.cmp(y.1))
.unwrap()
.0;
let winner = self.relays.swap_remove(winner_index);
let covered_public_keys: Vec<PublicKeyHex> = self
.person_relays
.iter()
.filter(|x| x.relay == winner.url)
.map(|x| PublicKeyHex(x.person.clone()))
.collect();
self.pubkeys.retain(|x| !covered_public_keys.contains(x));
self.person_relays.retain(|pr| {
!covered_public_keys.contains(&PublicKeyHex(pr.person.clone()))
&& pr.relay != winner.url
});
Ok((
BestRelay {
relay: winner,
pubkeys: covered_public_keys,
},
self,
))
}
}
/// See RelayPicker::best()
pub struct BestRelay {
pub relay: DbRelay,
pub pubkeys: Vec<PublicKeyHex>,
}
impl BestRelay {
#[allow(dead_code)]
pub fn is_degenerate(&self) -> bool {
self.pubkeys.is_empty() || self.relay.rank == Some(0)
}
}