Start driving minions with messages

This commit is contained in:
Mike Dilger 2022-12-26 14:40:15 +13:00
parent eb8df21e2e
commit a5c5627749
3 changed files with 53 additions and 24 deletions

View File

@ -1,13 +1,27 @@
use super::Minion; use super::Minion;
use crate::{BusMessage, Error}; use crate::{BusMessage, Error};
use nostr_types::PublicKeyHex;
use tracing::warn; use tracing::warn;
impl Minion { impl Minion {
pub(super) async fn handle_bus_message(&self, bus_message: BusMessage) -> Result<(), Error> { pub(super) async fn handle_bus_message(
warn!( &mut self,
"Websocket task got message, unimplemented: {}", bus_message: BusMessage,
bus_message.kind ) -> Result<(), Error> {
); match &*bus_message.kind {
"set_followed_people" => {
let v: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?;
self.upsert_following(v).await?;
}
"fetch_events" => {}
"follow_event_reactions" => {}
_ => {
warn!(
"Unrecognized bus message kind received by minion: {}",
bus_message.kind
);
}
}
Ok(()) Ok(())
} }
} }

View File

@ -9,7 +9,9 @@ use crate::globals::GLOBALS;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream}; use futures_util::stream::{SplitSink, SplitStream};
use http::Uri; use http::Uri;
use nostr_types::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url}; use nostr_types::{
EventKind, Filters, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url,
};
use subscription::Subscriptions; use subscription::Subscriptions;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::select; use tokio::select;
@ -21,7 +23,6 @@ use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};
pub struct Minion { pub struct Minion {
url: Url, url: Url,
pubkeys: Vec<PublicKeyHex>,
to_overlord: UnboundedSender<BusMessage>, to_overlord: UnboundedSender<BusMessage>,
from_overlord: Receiver<BusMessage>, from_overlord: Receiver<BusMessage>,
dbrelay: DbRelay, dbrelay: DbRelay,
@ -29,10 +30,12 @@ pub struct Minion {
stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>, stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
sink: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>>, sink: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>>,
subscriptions: Subscriptions, subscriptions: Subscriptions,
#[allow(dead_code)]
next_events_subscription_id: u32,
} }
impl Minion { impl Minion {
pub async fn new(url: Url, pubkeys: Vec<PublicKeyHex>) -> Result<Minion, Error> { pub async fn new(url: Url) -> Result<Minion, Error> {
let to_overlord = GLOBALS.to_overlord.clone(); let to_overlord = GLOBALS.to_overlord.clone();
let from_overlord = GLOBALS.to_minions.subscribe(); let from_overlord = GLOBALS.to_minions.subscribe();
let dbrelay = match DbRelay::fetch_one(&url).await? { let dbrelay = match DbRelay::fetch_one(&url).await? {
@ -46,7 +49,6 @@ impl Minion {
Ok(Minion { Ok(Minion {
url, url,
pubkeys,
to_overlord, to_overlord,
from_overlord, from_overlord,
dbrelay, dbrelay,
@ -54,6 +56,7 @@ impl Minion {
stream: None, stream: None,
sink: None, sink: None,
subscriptions: Subscriptions::new(), subscriptions: Subscriptions::new(),
next_events_subscription_id: 0,
}) })
} }
} }
@ -144,11 +147,6 @@ impl Minion {
DbRelay::update_success(self.dbrelay.url.clone(), now).await? DbRelay::update_success(self.dbrelay.url.clone(), now).await?
} }
// Subscribe to the people we follow
if !self.pubkeys.is_empty() {
self.update_following_subscription().await?;
}
// Tell the overlord we are ready to receive commands // Tell the overlord we are ready to receive commands
self.tell_overlord_we_are_ready().await?; self.tell_overlord_we_are_ready().await?;
@ -239,12 +237,11 @@ impl Minion {
Ok(()) Ok(())
} }
// This updates a subscription named "following" which watches for events // Create or replace the following subscription
// from the people we follow. async fn upsert_following(&mut self, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> {
async fn update_following_subscription(&mut self) -> Result<(), Error> {
let websocket_sink = self.sink.as_mut().unwrap(); let websocket_sink = self.sink.as_mut().unwrap();
if self.pubkeys.is_empty() { if pubkeys.is_empty() {
if let Some(sub) = self.subscriptions.get("following") { if let Some(sub) = self.subscriptions.get("following") {
// Close the subscription // Close the subscription
let wire = serde_json::to_string(&sub.close_message())?; let wire = serde_json::to_string(&sub.close_message())?;
@ -263,7 +260,7 @@ impl Minion {
// Find the oldest 'last_fetched' among the 'person_relay' table. // Find the oldest 'last_fetched' among the 'person_relay' table.
// Null values will come through as 0. // Null values will come through as 0.
let mut special_since: i64 = let mut special_since: i64 =
DbPersonRelay::fetch_oldest_last_fetched(&self.pubkeys, &self.url.0).await? as i64; DbPersonRelay::fetch_oldest_last_fetched(&pubkeys, &self.url.0).await? as i64;
let settings = GLOBALS.settings.lock().await.clone(); let settings = GLOBALS.settings.lock().await.clone();
@ -280,7 +277,7 @@ impl Minion {
// Create the author filter // Create the author filter
let mut feed_filter: Filters = Filters::new(); let mut feed_filter: Filters = Filters::new();
for pk in self.pubkeys.iter() { for pk in pubkeys.iter() {
feed_filter.add_author(pk, None); feed_filter.add_author(pk, None);
} }
feed_filter.add_event_kind(EventKind::TextNote); feed_filter.add_event_kind(EventKind::TextNote);
@ -295,7 +292,7 @@ impl Minion {
// Create the lookback filter // Create the lookback filter
let mut special_filter: Filters = Filters::new(); let mut special_filter: Filters = Filters::new();
for pk in self.pubkeys.iter() { for pk in pubkeys.iter() {
special_filter.add_author(pk, None); special_filter.add_author(pk, None);
} }
special_filter.add_event_kind(EventKind::Metadata); special_filter.add_event_kind(EventKind::Metadata);
@ -310,7 +307,6 @@ impl Minion {
); );
// Get the subscription // Get the subscription
let req_message = if self.subscriptions.has("following") { let req_message = if self.subscriptions.has("following") {
let sub = self.subscriptions.get_mut("following").unwrap(); let sub = self.subscriptions.get_mut("following").unwrap();
let vec: &mut Vec<Filters> = sub.get_mut(); let vec: &mut Vec<Filters> = sub.get_mut();
@ -332,4 +328,17 @@ impl Minion {
Ok(()) Ok(())
} }
#[allow(dead_code)]
async fn get_events(&mut self, _ids: Vec<IdHex>) -> Result<(), Error> {
// Create a new "events" subscription which closes on EOSE
// Use and bump next_events_subscription_id so they are independent subscriptions
unimplemented!()
}
#[allow(dead_code)]
async fn follow_event_reactions(&mut self, _ids: Vec<IdHex>) -> Result<(), Error> {
// Create or extend the "reactions" subscription
unimplemented!()
}
} }

View File

@ -224,11 +224,17 @@ impl Overlord {
async fn start_minion(&mut self, url: String, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> { async fn start_minion(&mut self, url: String, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> {
let moved_url = Url(url.clone()); let moved_url = Url(url.clone());
let mut minion = Minion::new(moved_url, pubkeys).await?; let mut minion = Minion::new(moved_url).await?;
let abort_handle = self.minions.spawn(async move { minion.handle().await }); let abort_handle = self.minions.spawn(async move { minion.handle().await });
let id = abort_handle.id(); let id = abort_handle.id();
self.minions_task_url.insert(id, Url(url.clone())); self.minions_task_url.insert(id, Url(url.clone()));
self.urls_watching.push(Url(url)); self.urls_watching.push(Url(url.clone()));
let _ = self.to_minions.send(BusMessage {
target: url.clone(),
kind: "set_followed_people".to_string(),
json_payload: serde_json::to_string(&pubkeys).unwrap(),
});
Ok(()) Ok(())
} }