Minion internal subscribe/unsubscribe generic

This commit is contained in:
Mike Dilger 2023-01-02 23:22:57 +13:00
parent 30aea31818
commit f991813f27

View File

@ -385,4 +385,35 @@ impl Minion {
Ok(()) Ok(())
} }
#[allow(dead_code)]
async fn subscribe(&mut self, filters: Vec<Filter>, handle: &str) -> Result<(), Error> {
let req_message = if self.subscriptions.has(handle) {
let sub = self.subscriptions.get_mut(handle).unwrap();
*sub.get_mut() = filters;
sub.req_message()
} else {
self.subscriptions.add(handle, filters);
self.subscriptions.get(handle).unwrap().req_message()
};
let wire = serde_json::to_string(&req_message)?;
let websocket_sink = self.sink.as_mut().unwrap();
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
trace!("{}: Sent {}", &self.url, &wire);
Ok(())
}
#[allow(dead_code)]
async fn unsubscribe(&mut self, handle: &str) -> Result<(), Error> {
if !self.subscriptions.has(handle) {
return Ok(());
}
let close_message = self.subscriptions.get(handle).unwrap().close_message();
let wire = serde_json::to_string(&close_message)?;
let websocket_sink = self.sink.as_mut().unwrap();
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
trace!("{}: Sent {}", &self.url, &wire);
self.subscriptions.remove(handle);
Ok(())
}
} }