More subscription related logging

This commit is contained in:
Mike Dilger 2023-02-12 18:24:08 +13:00
parent 7554430337
commit 16d25f06bc
2 changed files with 43 additions and 11 deletions

View File

@ -814,7 +814,13 @@ impl Minion {
self.next_events_subscription_id += 1; self.next_events_subscription_id += 1;
// save the subscription // save the subscription
self.subscriptions.add(&handle, vec![filter]); let id = self.subscriptions.add(&handle, vec![filter]);
tracing::debug!(
"NEW SUBSCRIPTION on {} handle={}, id={}",
&self.url,
handle,
&id
);
// get the request message // get the request message
let req_message = self.subscriptions.get(&handle).unwrap().req_message(); let req_message = self.subscriptions.get(&handle).unwrap().req_message();
@ -871,7 +877,13 @@ impl Minion {
let now = Unixtime::now().unwrap(); let now = Unixtime::now().unwrap();
DbRelay::update_general_eose(self.dbrelay.url.clone(), now.0 as u64).await?; DbRelay::update_general_eose(self.dbrelay.url.clone(), now.0 as u64).await?;
} }
self.subscriptions.add(handle, filters); let id = self.subscriptions.add(handle, filters);
tracing::debug!(
"NEW SUBSCRIPTION on {} handle={}, id={}",
&self.url,
handle,
&id
);
let req_message = self.subscriptions.get(handle).unwrap().req_message(); let req_message = self.subscriptions.get(handle).unwrap().req_message();
let wire = serde_json::to_string(&req_message)?; let wire = serde_json::to_string(&req_message)?;
let websocket_sink = self.sink.as_mut().unwrap(); let websocket_sink = self.sink.as_mut().unwrap();
@ -889,7 +901,21 @@ impl Minion {
let websocket_sink = self.sink.as_mut().unwrap(); let websocket_sink = self.sink.as_mut().unwrap();
tracing::trace!("{}: Sending {}", &self.url, &wire); tracing::trace!("{}: Sending {}", &self.url, &wire);
websocket_sink.send(WsMessage::Text(wire.clone())).await?; websocket_sink.send(WsMessage::Text(wire.clone())).await?;
self.subscriptions.remove(handle); let id = self.subscriptions.remove(handle);
if let Some(id) = id {
tracing::debug!(
"END SUBSCRIPTION on {} handle={}, id={}",
&self.url,
handle,
&id
);
} else {
tracing::debug!(
"END SUBSCRIPTION on {} handle={} NOT FOUND",
&self.url,
handle
);
}
Ok(()) Ok(())
} }
} }

View File

@ -16,12 +16,14 @@ impl Subscriptions {
} }
} }
pub fn add(&mut self, handle: &str, filters: Vec<Filter>) { pub fn add(&mut self, handle: &str, filters: Vec<Filter>) -> String {
let mut sub = Subscription::new(self.count); let id = format!("{}", self.count);
let mut sub = Subscription::new(&id);
self.count += 1; self.count += 1;
sub.filters = filters; sub.filters = filters;
self.handle_to_id.insert(handle.to_owned(), sub.get_id()); self.handle_to_id.insert(handle.to_owned(), id.clone());
self.by_id.insert(sub.get_id(), sub); self.by_id.insert(id.clone(), sub);
id
} }
pub fn has(&self, handle: &str) -> bool { pub fn has(&self, handle: &str) -> bool {
@ -64,10 +66,14 @@ impl Subscriptions {
self.by_id.get_mut(id) self.by_id.get_mut(id)
} }
pub fn remove(&mut self, handle: &str) { pub fn remove(&mut self, handle: &str) -> Option<String> {
if let Some(id) = self.handle_to_id.get(handle) { if let Some(id) = self.handle_to_id.get(handle) {
self.by_id.remove(id); let id = id.to_owned();
self.by_id.remove(&id);
self.handle_to_id.remove(handle); self.handle_to_id.remove(handle);
Some(id)
} else {
None
} }
} }
@ -90,9 +96,9 @@ pub struct Subscription {
} }
impl Subscription { impl Subscription {
pub fn new(count: usize) -> Subscription { pub fn new(id: &str) -> Subscription {
Subscription { Subscription {
id: format!("{}", count), id: id.to_owned(),
filters: vec![], filters: vec![],
eose: false, eose: false,
} }