From f991813f271681a34d9a36575ad891472916ad92 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Mon, 2 Jan 2023 23:22:57 +1300 Subject: [PATCH] Minion internal subscribe/unsubscribe generic --- src/overlord/minion/mod.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index 317670ad..08983731 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -385,4 +385,35 @@ impl Minion { Ok(()) } + + #[allow(dead_code)] + async fn subscribe(&mut self, filters: Vec, handle: &str) -> Result<(), Error> { + let req_message = if self.subscriptions.has(handle) { + let sub = self.subscriptions.get_mut(handle).unwrap(); + *sub.get_mut() = filters; + sub.req_message() + } else { + self.subscriptions.add(handle, filters); + self.subscriptions.get(handle).unwrap().req_message() + }; + let wire = serde_json::to_string(&req_message)?; + let websocket_sink = self.sink.as_mut().unwrap(); + websocket_sink.send(WsMessage::Text(wire.clone())).await?; + trace!("{}: Sent {}", &self.url, &wire); + Ok(()) + } + + #[allow(dead_code)] + async fn unsubscribe(&mut self, handle: &str) -> Result<(), Error> { + if !self.subscriptions.has(handle) { + return Ok(()); + } + let close_message = self.subscriptions.get(handle).unwrap().close_message(); + let wire = serde_json::to_string(&close_message)?; + let websocket_sink = self.sink.as_mut().unwrap(); + websocket_sink.send(WsMessage::Text(wire.clone())).await?; + trace!("{}: Sent {}", &self.url, &wire); + self.subscriptions.remove(handle); + Ok(()) + } }