Minion unsubscribe-related improvements

This commit is contained in:
Mike Dilger 2023-02-05 11:24:41 +13:00
parent 5e39ce260a
commit 1f4b3c8c88
5 changed files with 63 additions and 37 deletions

View File

@ -52,5 +52,6 @@ pub enum ToMinionPayload {
SubscribePersonFeed(PublicKeyHex),
SubscribeThreadFeed(IdHex, Vec<IdHex>),
TempSubscribeMetadata(Vec<PublicKeyHex>),
UnsubscribePersonFeed,
UnsubscribeThreadFeed,
}

View File

@ -52,27 +52,43 @@ impl Feed {
// all those events again.
*self.current_feed_kind.write() = FeedKind::General;
*self.thread_parent.write() = None;
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
});
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribePersonFeed,
});
}
pub fn set_feed_to_replies(&self) {
*self.current_feed_kind.write() = FeedKind::Replies;
*self.thread_parent.write() = None;
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
});
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribePersonFeed,
});
}
pub fn set_feed_to_thread(&self, id: Id, referenced_by: Id) {
*self.current_feed_kind.write() = FeedKind::Thread { id, referenced_by };
// Parent starts with the post itself
// Overlord will climb it, and recompute will climb it
let previous_thread_parent = *self.thread_parent.read();
*self.thread_parent.write() = Some(id);
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribePersonFeed,
});
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::SetThreadFeed(
id,
referenced_by,

View File

@ -95,20 +95,19 @@ impl Minion {
.get_handle_by_id(&subid.0)
.unwrap_or_else(|| "_".to_owned());
// If we opened a temporary subscription, and it is the only subscription on this
// relay, we will close it.
let close: bool = handle.starts_with("temp_") && self.subscriptions.len() <= 1;
// If this is a temporary subscription, we should close it after an EOSE
let close: bool = handle.starts_with("temp_");
// Update the matching subscription
match self.subscriptions.get_mut_by_id(&subid.0) {
Some(sub) => {
tracing::debug!("{}: {}: EOSE: {:?}", &self.url, handle, subid);
if close {
tracing::debug!("{}: {}: Closing websocket", &self.url, handle);
let close_message = sub.close_message();
let websocket_sink = self.sink.as_mut().unwrap();
let wire = serde_json::to_string(&close_message)?;
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
self.unsubscribe(&handle).await?;
// If that was the last (temp_) subscription, set minion to exit
if self.subscriptions.is_empty() {
self.keepgoing = false;
}
} else {
sub.set_eose();
}

View File

@ -32,6 +32,7 @@ pub struct Minion {
sink: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>>,
subscriptions: Subscriptions,
next_events_subscription_id: u32,
keepgoing: bool,
}
impl Minion {
@ -57,6 +58,7 @@ impl Minion {
sink: None,
subscriptions: Subscriptions::new(),
next_events_subscription_id: 0,
keepgoing: true,
})
}
}
@ -194,8 +196,8 @@ impl Minion {
'relayloop: loop {
match self.loop_handler().await {
Ok(keepgoing) => {
if !keepgoing {
Ok(_) => {
if !self.keepgoing {
break 'relayloop;
}
}
@ -206,12 +208,16 @@ impl Minion {
}
}
// Close the connection
let ws_sink = self.sink.as_mut().unwrap();
if let Err(e) = ws_sink.send(WsMessage::Close(None)).await {
tracing::error!("websocket close error: {}", e);
}
Ok(())
}
async fn loop_handler(&mut self) -> Result<bool, Error> {
let mut keepgoing: bool = true;
async fn loop_handler(&mut self) -> Result<(), Error> {
let ws_stream = self.stream.as_mut().unwrap();
let ws_sink = self.sink.as_mut().unwrap();
@ -226,7 +232,11 @@ impl Minion {
ws_message = ws_stream.next() => {
let ws_message = match ws_message {
Some(m) => m,
None => return Ok(false), // probably connection reset
None => {
// possibly connection reset
self.keepgoing = false;
return Ok(());
}
}?;
tracing::trace!("{}: Handling message", &self.url);
@ -241,7 +251,7 @@ impl Minion {
WsMessage::Binary(_) => tracing::warn!("{}, Unexpected binary message", &self.url),
WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?,
WsMessage::Pong(_) => { }, // we just ignore pongs
WsMessage::Close(_) => keepgoing = false,
WsMessage::Close(_) => self.keepgoing = false,
WsMessage::Frame(_) => tracing::warn!("{}: Unexpected frame message", &self.url),
}
},
@ -249,29 +259,22 @@ impl Minion {
let to_minion_message = match to_minion_message {
Ok(m) => m,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
return Ok(false);
self.keepgoing = false;
return Ok(());
},
Err(e) => return Err(e.into())
};
#[allow(clippy::collapsible_if)]
if to_minion_message.target == self.url.0 || to_minion_message.target == "all" {
keepgoing = self.handle_overlord_message(to_minion_message).await?;
self.handle_overlord_message(to_minion_message).await?;
}
},
}
// Close down if we aren't handling any more subscriptions
if self.subscriptions.len() == 0 {
keepgoing = false;
Ok(())
}
Ok(keepgoing)
}
pub async fn handle_overlord_message(
&mut self,
message: ToMinionMessage,
) -> Result<bool, Error> {
pub async fn handle_overlord_message(&mut self, message: ToMinionMessage) -> Result<(), Error> {
match message.payload {
ToMinionPayload::FetchEvents(vec) => {
self.get_events(vec).await?;
@ -288,7 +291,7 @@ impl Minion {
}
ToMinionPayload::Shutdown => {
tracing::info!("{}: Websocket listener shutting down", &self.url);
return Ok(false);
self.keepgoing = false;
}
ToMinionPayload::SubscribeGeneralFeed(pubkeys) => {
self.subscribe_general_feed(pubkeys).await?;
@ -302,11 +305,23 @@ impl Minion {
ToMinionPayload::TempSubscribeMetadata(pubkeyhexs) => {
self.temp_subscribe_metadata(pubkeyhexs).await?;
}
ToMinionPayload::UnsubscribePersonFeed => {
self.unsubscribe("person_feed").await?;
// Close down if we aren't handling any more subscriptions
if self.subscriptions.is_empty() {
self.keepgoing = false;
}
}
ToMinionPayload::UnsubscribeThreadFeed => {
self.unsubscribe_thread_feed().await?;
self.unsubscribe("thread_feed").await?;
// Close down if we aren't handling any more subscriptions
if self.subscriptions.is_empty() {
self.keepgoing = false;
}
}
Ok(true)
}
Ok(())
}
async fn tell_overlord_we_are_ready(&self) -> Result<(), Error> {
@ -593,11 +608,6 @@ impl Minion {
Ok(())
}
async fn unsubscribe_thread_feed(&mut self) -> Result<(), Error> {
self.unsubscribe("thread_feed").await?;
Ok(())
}
// Create or replace the following subscription
/*
async fn upsert_following(&mut self, pubkeys: Vec<PublicKeyHex>) -> Result<(), Error> {

View File

@ -68,8 +68,8 @@ impl Subscriptions {
}
}
pub fn len(&self) -> usize {
self.by_id.len()
pub fn is_empty(&self) -> bool {
self.by_id.is_empty()
}
/*