Bring in minions

This commit is contained in:
Mike Dilger 2022-12-21 08:23:30 +13:00
parent 25b8ceee2e
commit f9b54572e3
7 changed files with 509 additions and 9 deletions

8
Cargo.lock generated
View File

@ -1705,9 +1705,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.1.19"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
dependencies = [
"libc",
]
@ -2309,9 +2309,9 @@ dependencies = [
[[package]]
name = "num_cpus"
version = "1.14.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"libc",

View File

@ -268,8 +268,7 @@ async fn save_person(pubkey: PublicKey) -> Result<(), Error> {
Ok(())
}
#[allow(dead_code)]
async fn followed_pubkeys() -> Vec<PublicKeyHex> {
pub async fn followed_pubkeys() -> Vec<PublicKeyHex> {
let people = GLOBALS.people.lock().await;
people
.iter()

View File

@ -0,0 +1,13 @@
use super::Minion;
use crate::{BusMessage, Error};
use tracing::warn;
impl Minion {
pub(super) async fn handle_bus_message(&self, bus_message: BusMessage) -> Result<(), Error> {
warn!(
"Websocket task got message, unimplemented: {}",
bus_message.kind
);
Ok(())
}
}

View File

@ -0,0 +1,62 @@
use super::Minion;
use crate::db::{DbEvent, DbPersonRelay};
use crate::{BusMessage, Error};
use nostr_proto::{Event, RelayMessage, Unixtime};
use tracing::{error, info, warn};
impl Minion {
pub(super) async fn handle_nostr_message(&mut self, ws_message: String) -> Result<(), Error> {
// TODO: pull out the raw event without any deserialization to be sure we don't mangle
// it.
let relay_message: RelayMessage = serde_json::from_str(&ws_message)?;
let mut maxtime = Unixtime::now()?;
maxtime.0 += 60 * 15; // 15 minutes into the future
match relay_message {
RelayMessage::Event(_subid, event) => {
if let Err(e) = event.verify(Some(maxtime)) {
error!("VERIFY ERROR: {}, {}", e, serde_json::to_string(&event)?)
} else {
DbEvent::save_nostr_event(&event, Some(self.url.clone())).await?;
self.send_overlord_newevent(*event).await?;
}
}
RelayMessage::Notice(msg) => {
info!("NOTICE: {} {}", &self.url, msg);
}
RelayMessage::Eose(subid) => {
// We should update last_fetched
let now = Unixtime::now().unwrap().0 as u64;
DbPersonRelay::update_last_fetched(self.url.0.clone(), now).await?;
// Update the matching subscription
match self.subscriptions.get_mut(&subid.0) {
Some(sub) => {
sub.set_eose();
info!("EOSE: {} {:?}", &self.url, subid);
}
None => {
warn!("EOSE for unknown subsription: {} {:?}", &self.url, subid);
}
}
}
RelayMessage::Ok(id, ok, ok_message) => {
// These don't have to be processed.
info!("OK: {} {:?} {} {}", &self.url, id, ok, ok_message);
}
}
Ok(())
}
async fn send_overlord_newevent(&self, event: Event) -> Result<(), Error> {
self.to_overlord.send(BusMessage {
target: "overlord".to_string(),
kind: "new_event".to_string(),
json_payload: serde_json::to_string(&event)?,
})?;
Ok(())
}
}

321
src/overlord/minion/mod.rs Normal file
View File

@ -0,0 +1,321 @@
use crate::comms::BusMessage;
use crate::db::{DbPersonRelay, DbRelay};
use crate::error::Error;
use crate::globals::GLOBALS;
use crate::settings::Settings;
use futures::{SinkExt, StreamExt};
use http::Uri;
use nostr_proto::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url};
use std::collections::HashMap;
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, trace, warn};
use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};
mod handle_bus;
mod handle_websocket;
mod subscription;
use subscription::Subscription;
pub struct Minion {
url: Url,
pubkeys: Vec<PublicKeyHex>,
to_overlord: UnboundedSender<BusMessage>,
from_overlord: Receiver<BusMessage>,
settings: Settings,
dbrelay: DbRelay,
nip11: Option<RelayInformationDocument>,
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
subscriptions: HashMap<String, Subscription>,
}
impl Minion {
pub async fn new(url: Url, pubkeys: Vec<PublicKeyHex>) -> Result<Minion, Error> {
let to_overlord = GLOBALS.to_overlord.clone();
let from_overlord = GLOBALS.to_minions.subscribe();
let settings = Settings::load().await?;
let dbrelay = match DbRelay::fetch_one(&url).await? {
Some(dbrelay) => dbrelay,
None => {
let dbrelay = DbRelay::new(url.0.clone());
DbRelay::insert(dbrelay.clone()).await?;
dbrelay
}
};
Ok(Minion {
url,
pubkeys,
to_overlord,
from_overlord,
settings,
dbrelay,
nip11: None,
stream: None,
subscriptions: HashMap::new(),
})
}
}
impl Minion {
pub async fn handle(&mut self) {
// Catch errors, Return nothing.
if let Err(e) = self.handle_inner().await {
error!("ERROR handling {}: {}", &self.url, e);
}
// Bump the failure count for the relay.
self.dbrelay.failure_count += 1;
if let Err(e) = DbRelay::update(self.dbrelay.clone()).await {
error!("ERROR bumping relay failure count {}: {}", &self.url, e);
}
debug!("Minion exiting: {}", self.url);
}
async fn handle_inner(&mut self) -> Result<(), Error> {
info!("Task started to handle relay at {}", &self.url);
// Connect to the relay
let websocket_stream = {
let uri: http::Uri = self.url.0.parse::<Uri>()?;
let authority = uri.authority().ok_or(Error::UrlHasNoHostname)?.as_str();
let host = authority
.find('@')
.map(|idx| authority.split_at(idx + 1).1)
.unwrap_or_else(|| authority);
if host.is_empty() {
return Err(Error::UrlHasEmptyHostname);
}
// Read NIP-11 information
if let Ok(response) = reqwest::Client::new()
.get(&format!("https://{}", host))
.header("Host", host)
.header("Accept", "application/nostr+json")
.send()
.await
{
match response.json::<RelayInformationDocument>().await {
Ok(nip11) => {
info!("{:?}", &nip11);
self.nip11 = Some(nip11);
}
Err(e) => {
error!("Unable to parse response as NIP-11 {}", e);
}
}
}
let key: [u8; 16] = rand::random();
let req = http::request::Request::builder()
.method("GET")
.header("Host", host)
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", base64::encode(key))
.uri(uri)
.body(())?;
let config: WebSocketConfig = WebSocketConfig {
max_send_queue: None,
max_message_size: Some(1024 * 1024 * 16), // their default is 64 MiB, I choose 16 MiB
max_frame_size: Some(1024 * 1024 * 16), // their default is 16 MiB.
accept_unmasked_frames: true, // default is false which is the standard
};
let (websocket_stream, _response) =
tokio_tungstenite::connect_async_with_config(req, Some(config)).await?;
info!("Connected to {}", &self.url);
websocket_stream
};
self.stream = Some(websocket_stream);
// Bump the success count for the relay
{
self.dbrelay.success_count += 1;
DbRelay::update(self.dbrelay.clone()).await?;
}
// Subscribe to the people we follow
if !self.pubkeys.is_empty() {
self.update_following_subscription().await?;
}
// Tell the overlord we are ready to receive commands
self.tell_overlord_we_are_ready().await?;
'relayloop: loop {
match self.loop_handler().await {
Ok(keepgoing) => {
if !keepgoing {
break 'relayloop;
}
}
Err(e) => {
// Log them and keep going
error!("{}", e);
}
}
}
Ok(())
}
async fn loop_handler(&mut self) -> Result<bool, Error> {
let mut keepgoing: bool = true;
let ws_stream = self.stream.as_mut().unwrap();
select! {
ws_message = ws_stream.next() => {
let ws_message = match ws_message {
Some(m) => m,
None => return Ok(false), // probably connection reset
}?;
trace!("Handling message from {}", &self.url);
match ws_message {
WsMessage::Text(t) => {
self.handle_nostr_message(t).await?;
// FIXME: some errors we should probably bail on.
// For now, try to continue.
},
WsMessage::Binary(_) => warn!("Unexpected binary message"),
WsMessage::Ping(x) => ws_stream.send(WsMessage::Pong(x)).await?,
WsMessage::Pong(_) => warn!("Unexpected pong message"),
WsMessage::Close(_) => keepgoing = false,
WsMessage::Frame(_) => warn!("Unexpected frame message"),
}
},
bus_message = self.from_overlord.recv() => {
let bus_message = match bus_message {
Ok(bm) => bm,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
return Ok(false);
},
Err(e) => return Err(e.into())
};
if bus_message.target == self.url.0 {
self.handle_bus_message(bus_message).await?;
} else if &*bus_message.target == "all" {
if &*bus_message.kind == "shutdown" {
info!("Websocket listener {} shutting down", &self.url);
keepgoing = false;
} else if &*bus_message.kind == "settings_changed" {
self.settings = serde_json::from_str(&bus_message.json_payload)?;
}
}
},
}
Ok(keepgoing)
}
async fn tell_overlord_we_are_ready(&self) -> Result<(), Error> {
self.to_overlord.send(BusMessage {
target: "overlord".to_string(),
kind: "minion_is_ready".to_string(),
json_payload: "".to_owned(),
})?;
Ok(())
}
// This updates a subscription named "following" which watches for events
// from the people we follow.
async fn update_following_subscription(&mut self) -> Result<(), Error> {
let websocket_stream = self.stream.as_mut().unwrap();
if self.pubkeys.is_empty() {
if let Some(sub) = self.subscriptions.get("following") {
// Close the subscription
let wire = serde_json::to_string(&sub.close_message())?;
websocket_stream.send(WsMessage::Text(wire.clone())).await?;
// Remove the subscription from the map
self.subscriptions.remove("following");
}
// Since pubkeys is empty, nothing to subscribe to.
return Ok(());
}
// Compute how far to look back
let (feed_since, special_since) = {
// Find the oldest 'last_fetched' among the 'person_relay' table.
// Null values will come through as 0.
let mut special_since: i64 =
DbPersonRelay::fetch_oldest_last_fetched(&self.pubkeys, &self.url.0).await? as i64;
// Subtract overlap to avoid gaps due to clock sync and event
// propogation delay
special_since -= self.settings.overlap as i64;
// For feed related events, don't look back more than one feed_chunk ago
let one_feedchunk_ago = Unixtime::now().unwrap().0 - self.settings.feed_chunk as i64;
let feed_since = special_since.max(one_feedchunk_ago);
(Unixtime(feed_since), Unixtime(special_since))
};
// Create the author filter
let mut feed_filter: Filters = Filters::new();
for pk in self.pubkeys.iter() {
feed_filter.add_author(pk, None);
}
feed_filter.add_event_kind(EventKind::TextNote);
feed_filter.add_event_kind(EventKind::Reaction);
feed_filter.add_event_kind(EventKind::EventDeletion);
feed_filter.since = Some(feed_since);
debug!(
"Feed Filter {}: {}",
&self.url,
serde_json::to_string(&feed_filter)?
);
// Create the lookback filter
let mut special_filter: Filters = Filters::new();
for pk in self.pubkeys.iter() {
special_filter.add_author(pk, None);
}
special_filter.add_event_kind(EventKind::Metadata);
//special_filter.add_event_kind(EventKind::RecommendRelay);
//special_filter.add_event_kind(EventKind::ContactList);
special_filter.since = Some(special_since);
debug!(
"Special Filter {}: {}",
&self.url,
serde_json::to_string(&special_filter)?
);
// Get the subscription
let sub = self
.subscriptions
.entry("following".to_string())
.or_insert_with(|| Subscription::new("following".to_string()));
// Write our filters into it
{
let vec: &mut Vec<Filters> = sub.get_mut();
vec.clear();
vec.push(feed_filter);
vec.push(special_filter);
}
// Subscribe (or resubscribe) to the subscription
let wire = serde_json::to_string(&sub.req_message())?;
websocket_stream.send(WsMessage::Text(wire.clone())).await?;
trace!("Sent {}", &wire);
Ok(())
}
}

View File

@ -0,0 +1,43 @@
use nostr_proto::{ClientMessage, Filters, SubscriptionId};
#[derive(Debug)]
pub struct Subscription {
id: String,
filters: Vec<Filters>,
eose: bool,
}
impl Subscription {
pub fn new(id: String) -> Subscription {
Subscription {
id,
filters: vec![],
eose: false,
}
}
pub fn get_id(&self) -> String {
self.id.clone()
}
pub fn get_mut(&mut self) -> &mut Vec<Filters> {
&mut self.filters
}
pub fn set_eose(&mut self) {
self.eose = true;
}
#[allow(dead_code)]
pub fn eose(&self) -> bool {
self.eose
}
pub fn req_message(&self) -> ClientMessage {
ClientMessage::Req(SubscriptionId(self.get_id()), self.filters.clone())
}
pub fn close_message(&self) -> ClientMessage {
ClientMessage::Close(SubscriptionId(self.get_id()))
}
}

View File

@ -1,14 +1,18 @@
mod minion;
mod relay_picker;
use crate::comms::BusMessage;
use crate::db::{DbEvent, DbPerson, DbRelay};
use crate::db::{DbEvent, DbPerson, DbPersonRelay, DbRelay};
use crate::error::Error;
use crate::globals::GLOBALS;
use crate::settings::Settings;
use nostr_proto::{Event, Unixtime};
use tokio::select;
use minion::Minion;
use nostr_proto::{Event, PublicKey, PublicKeyHex, Unixtime, Url};
use relay_picker::{BestRelay, RelayPicker};
use std::collections::HashMap;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::{select, task};
use tracing::{error, info};
pub struct Overlord {
@ -16,6 +20,8 @@ pub struct Overlord {
to_minions: Sender<BusMessage>,
#[allow(dead_code)]
from_minions: UnboundedReceiver<BusMessage>,
minions: task::JoinSet<()>,
minions_task_url: HashMap<task::Id, Url>,
}
impl Overlord {
@ -25,6 +31,8 @@ impl Overlord {
settings: Settings::default(),
to_minions,
from_minions,
minions: task::JoinSet::new(),
minions_task_url: HashMap::new(),
}
}
@ -67,6 +75,15 @@ impl Overlord {
// updated from events without necessarily updating our relays list)
DbRelay::populate_new_relays().await?;
// Load people from the database
{
let mut dbpeople = DbPerson::fetch(None).await?;
for dbperson in dbpeople.drain(..) {
let pubkey = PublicKey::try_from(dbperson.pubkey.clone())?;
GLOBALS.people.lock().await.insert(pubkey, dbperson);
}
}
// Load feed-related events from database and process (TextNote, EventDeletion, Reaction)
{
let now = Unixtime::now().unwrap();
@ -93,6 +110,41 @@ impl Overlord {
info!("Loaded {} events from the database", count);
}
// Pick Relays and start Minions
{
let pubkeys: Vec<PublicKeyHex> = crate::globals::followed_pubkeys().await;
let mut relay_picker = RelayPicker {
relays: DbRelay::fetch(None).await?,
pubkeys: pubkeys.clone(),
person_relays: DbPersonRelay::fetch_for_pubkeys(&pubkeys).await?,
};
let mut best_relay: BestRelay;
loop {
if relay_picker.is_degenerate() {
break;
}
let (rd, rp) = relay_picker.best()?;
best_relay = rd;
relay_picker = rp;
if best_relay.is_degenerate() {
break;
}
// Fire off a minion to handle this relay
self.start_minion(best_relay.relay.url.clone(), best_relay.pubkeys.clone())
.await?;
info!(
"Picked relay {}, {} people left",
best_relay.relay.url,
relay_picker.pubkeys.len()
);
}
}
'mainloop: loop {
match self.loop_handler().await {
Ok(keepgoing) => {
@ -110,6 +162,16 @@ impl Overlord {
Ok(())
}
async fn start_minion(&mut self, url: String, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> {
let moved_url = Url(url.clone());
let mut minion = Minion::new(moved_url, pubkeys).await?;
let abort_handle = self.minions.spawn(async move { minion.handle().await });
let id = abort_handle.id();
self.minions_task_url.insert(id, Url(url));
Ok(())
}
#[allow(unused_assignments)]
async fn loop_handler(&mut self) -> Result<bool, Error> {
let mut keepgoing: bool = true;