Ping relays every 55 seconds to keep the connection alive

This commit is contained in:
Mike Dilger 2022-12-22 18:50:40 +13:00
parent 08f6bd1224
commit 55c0ee4db2
3 changed files with 22 additions and 7 deletions

1
Cargo.lock generated
View File

@ -1678,6 +1678,7 @@ dependencies = [
"eframe",
"egui_extras",
"futures",
"futures-util",
"http",
"image",
"lazy_static",

View File

@ -17,6 +17,7 @@ dirs = "4.0"
eframe = { version = "0.20", features = [ "dark-light", "persistence" ] }
egui_extras = { version = "0.20", features = [ "svg", "tracing" ] }
futures = "0.3"
futures-util = "0.3"
http = "0.2"
image = "0.24"
lazy_static = "1.4"

View File

@ -8,6 +8,7 @@ use crate::error::Error;
use crate::globals::GLOBALS;
use crate::settings::Settings;
use futures::{SinkExt, StreamExt};
use futures_util::stream::{SplitStream, SplitSink};
use http::Uri;
use nostr_proto::{EventKind, Filters, PublicKeyHex, RelayInformationDocument, Unixtime, Url};
use subscription::Subscriptions;
@ -27,7 +28,8 @@ pub struct Minion {
settings: Settings,
dbrelay: DbRelay,
nip11: Option<RelayInformationDocument>,
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
sink: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>>,
subscriptions: Subscriptions,
}
@ -54,6 +56,7 @@ impl Minion {
dbrelay,
nip11: None,
stream: None,
sink: None,
subscriptions: Subscriptions::new(),
})
}
@ -135,7 +138,9 @@ impl Minion {
websocket_stream
};
self.stream = Some(websocket_stream);
let (sink, stream) = websocket_stream.split();
self.stream = Some(stream);
self.sink = Some(sink);
// Bump the success count for the relay
{
@ -172,8 +177,16 @@ impl Minion {
let mut keepgoing: bool = true;
let ws_stream = self.stream.as_mut().unwrap();
let ws_sink = self.sink.as_mut().unwrap();
let mut timer = tokio::time::interval(std::time::Duration::new(55, 0));
timer.tick().await; // use up the first immediate tick.
select! {
_ = timer.tick() => {
debug!("Pinging {}", self.url.0);
ws_sink.send(WsMessage::Ping(vec![])).await?;
},
ws_message = ws_stream.next() => {
let ws_message = match ws_message {
Some(m) => m,
@ -188,8 +201,8 @@ impl Minion {
// For now, try to continue.
},
WsMessage::Binary(_) => warn!("Unexpected binary message"),
WsMessage::Ping(x) => ws_stream.send(WsMessage::Pong(x)).await?,
WsMessage::Pong(_) => warn!("Unexpected pong message"),
WsMessage::Ping(x) => ws_sink.send(WsMessage::Pong(x)).await?,
WsMessage::Pong(_) => { }, // we just ignore pongs
WsMessage::Close(_) => keepgoing = false,
WsMessage::Frame(_) => warn!("Unexpected frame message"),
}
@ -231,13 +244,13 @@ impl Minion {
// This updates a subscription named "following" which watches for events
// from the people we follow.
async fn update_following_subscription(&mut self) -> Result<(), Error> {
let websocket_stream = self.stream.as_mut().unwrap();
let websocket_sink = self.sink.as_mut().unwrap();
if self.pubkeys.is_empty() {
if let Some(sub) = self.subscriptions.get("following") {
// Close the subscription
let wire = serde_json::to_string(&sub.close_message())?;
websocket_stream.send(WsMessage::Text(wire.clone())).await?;
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
// Remove the subscription from the map
self.subscriptions.remove("following");
@ -312,7 +325,7 @@ impl Minion {
// Subscribe (or resubscribe) to the subscription
let wire = serde_json::to_string(&req_message)?;
websocket_stream.send(WsMessage::Text(wire.clone())).await?;
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
trace!("Sent {}", &wire);