From 55c0ee4db2bf4bfb3d57b94133bb02ab358bffc5 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Thu, 22 Dec 2022 18:50:40 +1300 Subject: [PATCH] Ping relays every 55 seconds to keep the connection alive --- Cargo.lock | 1 + Cargo.toml | 1 + src/overlord/minion/mod.rs | 27 ++++++++++++++++++++------- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 436728af..6691f53b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1678,6 +1678,7 @@ dependencies = [ "eframe", "egui_extras", "futures", + "futures-util", "http", "image", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index c47319d3..78ab77fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ dirs = "4.0" eframe = { version = "0.20", features = [ "dark-light", "persistence" ] } egui_extras = { version = "0.20", features = [ "svg", "tracing" ] } futures = "0.3" +futures-util = "0.3" http = "0.2" image = "0.24" lazy_static = "1.4" diff --git a/src/overlord/minion/mod.rs b/src/overlord/minion/mod.rs index 6720d8ab..5f854442 100644 --- a/src/overlord/minion/mod.rs +++ b/src/overlord/minion/mod.rs @@ -8,6 +8,7 @@ use crate::error::Error; use crate::globals::GLOBALS; use crate::settings::Settings; use futures::{SinkExt, StreamExt}; +use futures_util::stream::{SplitStream, SplitSink}; use http::Uri; use nostr_proto::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url}; use subscription::Subscriptions; @@ -27,7 +28,8 @@ pub struct Minion { settings: Settings, dbrelay: DbRelay, nip11: Option, - stream: Option>>, + stream: Option>>>, + sink: Option>, WsMessage>>, subscriptions: Subscriptions, } @@ -54,6 +56,7 @@ impl Minion { dbrelay, nip11: None, stream: None, + sink: None, subscriptions: Subscriptions::new(), }) } @@ -135,7 +138,9 @@ impl Minion { websocket_stream }; - self.stream = Some(websocket_stream); + let (sink, stream) = websocket_stream.split(); + self.stream = Some(stream); + self.sink = Some(sink); // Bump the success count for the relay { @@ -172,8 +177,16 @@ impl Minion { let mut keepgoing: bool = true; let ws_stream = self.stream.as_mut().unwrap(); + let ws_sink = self.sink.as_mut().unwrap(); + + let mut timer = tokio::time::interval(std::time::Duration::new(55, 0)); + timer.tick().await; // use up the first immediate tick. select! { + _ = timer.tick() => { + debug!("Pinging {}", self.url.0); + ws_sink.send(WsMessage::Ping(vec![])).await?; + }, ws_message = ws_stream.next() => { let ws_message = match ws_message { Some(m) => m, @@ -188,8 +201,8 @@ impl Minion { // For now, try to continue. }, WsMessage::Binary(_) => warn!("Unexpected binary message"), - WsMessage::Ping(x) => ws_stream.send(WsMessage::Pong(x)).await?, - WsMessage::Pong(_) => warn!("Unexpected pong message"), + WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?, + WsMessage::Pong(_) => { }, // we just ignore pongs WsMessage::Close(_) => keepgoing = false, WsMessage::Frame(_) => warn!("Unexpected frame message"), } @@ -231,13 +244,13 @@ impl Minion { // This updates a subscription named "following" which watches for events // from the people we follow. async fn update_following_subscription(&mut self) -> Result<(), Error> { - let websocket_stream = self.stream.as_mut().unwrap(); + let websocket_sink = self.sink.as_mut().unwrap(); if self.pubkeys.is_empty() { if let Some(sub) = self.subscriptions.get("following") { // Close the subscription let wire = serde_json::to_string(&sub.close_message())?; - websocket_stream.send(WsMessage::Text(wire.clone())).await?; + websocket_sink.send(WsMessage::Text(wire.clone())).await?; // Remove the subscription from the map self.subscriptions.remove("following"); @@ -312,7 +325,7 @@ impl Minion { // Subscribe (or resubscribe) to the subscription let wire = serde_json::to_string(&req_message)?; - websocket_stream.send(WsMessage::Text(wire.clone())).await?; + websocket_sink.send(WsMessage::Text(wire.clone())).await?; trace!("Sent {}", &wire);