From 119d1c526e4fdfdd627ab0c5ac8fa25eedd1ce3c Mon Sep 17 00:00:00 2001 From: Kieran Date: Sat, 17 Jun 2023 18:17:39 +0100 Subject: [PATCH] tweak ephemeral connection timeout logic --- packages/system/src/Connection.ts | 77 ++++++++++++++++--------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/packages/system/src/Connection.ts b/packages/system/src/Connection.ts index c0585c88..db109383 100644 --- a/packages/system/src/Connection.ts +++ b/packages/system/src/Connection.ts @@ -1,6 +1,6 @@ import { v4 as uuid } from "uuid"; import debug from "debug"; -import { unwrap, ExternalStore } from "@snort/shared"; +import { unwrap, ExternalStore, unixNowMs } from "@snort/shared"; import { DefaultConnectTimeout } from "./Const"; import { ConnectionStats } from "./ConnectionStats"; @@ -39,6 +39,9 @@ export interface ConnectionStateSnapshot { export class Connection extends ExternalStore { #log = debug("Connection"); + #ephemeralCheck?: ReturnType; + #activity: number = unixNowMs(); + Id: string; Address: string; Socket: WebSocket | null = null; @@ -66,7 +69,6 @@ export class Connection extends ExternalStore { AwaitingAuth: Map; Authed = false; Ephemeral: boolean; - EphemeralTimeout?: ReturnType; Down = true; constructor(addr: string, options: RelaySettings, auth?: AuthHandler, ephemeral: boolean = false) { @@ -81,17 +83,6 @@ export class Connection extends ExternalStore { this.Ephemeral = ephemeral; } - ResetEphemeralTimeout() { - if (this.EphemeralTimeout) { - clearTimeout(this.EphemeralTimeout); - } - if (this.Ephemeral) { - this.EphemeralTimeout = setTimeout(() => { - this.Close(); - }, 30_000); - } - } - async Connect() { try { if (this.Info === undefined) { @@ -111,8 +102,8 @@ export class Connection extends ExternalStore { this.Info = data; } } - } catch (e) { - this.#log("Could not load relay information %O", e); + } catch { + // ignored } if (this.Socket) { @@ -132,10 +123,6 @@ export class Connection extends ExternalStore { Close() { this.IsClosed = true; - if (this.ReconnectTimer !== null) { - clearTimeout(this.ReconnectTimer); - this.ReconnectTimer = undefined; - } this.Socket?.close(); this.notifyChange(); } @@ -144,18 +131,12 @@ export class Connection extends ExternalStore { this.ConnectTimeout = DefaultConnectTimeout; this.#log(`[${this.Address}] Open!`); this.Down = false; - if (this.Ephemeral) { - this.ResetEphemeralTimeout(); - } + this.#setupEphemeral(); this.OnConnected?.(); this.#sendPendingRaw(); } OnClose(e: CloseEvent) { - if (this.EphemeralTimeout) { - clearTimeout(this.EphemeralTimeout); - this.EphemeralTimeout = undefined; - } if (this.ReconnectTimer) { clearTimeout(this.ReconnectTimer); this.ReconnectTimer = undefined; @@ -168,7 +149,7 @@ export class Connection extends ExternalStore { } else if (!this.IsClosed) { this.ConnectTimeout = this.ConnectTimeout * 2; this.#log( - `[${this.Address}] Closed (${e.reason}), trying again in ${(this.ConnectTimeout / 1000) + `[${this.Address}] Closed (code=${e.code}), trying again in ${(this.ConnectTimeout / 1000) .toFixed(0) .toLocaleString()} sec` ); @@ -182,19 +163,20 @@ export class Connection extends ExternalStore { } this.OnDisconnect?.(this.Id); - this.#ResetQueues(); + this.#resetQueues(); // reset connection Id on disconnect, for query-tracking this.Id = uuid(); this.notifyChange(); } OnMessage(e: MessageEvent) { + this.#activity = unixNowMs(); if (e.data.length > 0) { const msg = JSON.parse(e.data); const tag = msg[0]; switch (tag) { case "AUTH": { - this._OnAuthAsync(msg[1]) + this.#onAuthAsync(msg[1]) .then(() => this.#sendPendingRaw()) .catch(this.#log); this.Stats.EventsReceived++; @@ -250,7 +232,7 @@ export class Connection extends ExternalStore { return; } const req = ["EVENT", e]; - this.#SendJson(req); + this.#sendJson(req); this.Stats.EventsSent++; this.notifyChange(); } @@ -273,7 +255,7 @@ export class Connection extends ExternalStore { }); const req = ["EVENT", e]; - this.#SendJson(req); + this.#sendJson(req); this.Stats.EventsSent++; this.notifyChange(); }); @@ -299,16 +281,15 @@ export class Connection extends ExternalStore { this.#log("Queuing: %s %O", this.Address, cmd); } else { this.ActiveRequests.add(cmd[1]); - this.#SendJson(cmd); + this.#sendJson(cmd); cbSent(); } - this.ResetEphemeralTimeout(); this.notifyChange(); } CloseReq(id: string) { if (this.ActiveRequests.delete(id)) { - this.#SendJson(["CLOSE", id]); + this.#sendJson(["CLOSE", id]); this.OnEose?.(id); this.#SendQueuedRequests(); } @@ -343,7 +324,7 @@ export class Connection extends ExternalStore { const p = this.PendingRequests.shift(); if (p) { this.ActiveRequests.add(p.cmd[1]); - this.#SendJson(p.cmd); + this.#sendJson(p.cmd); p.cb(); this.#log("Sent pending REQ %s %O", this.Address, p.cmd); } @@ -351,14 +332,14 @@ export class Connection extends ExternalStore { } } - #ResetQueues() { + #resetQueues() { this.ActiveRequests.clear(); this.PendingRequests = []; this.PendingRaw = []; this.notifyChange(); } - #SendJson(obj: object) { + #sendJson(obj: object) { const authPending = !this.Authed && (this.AwaitingAuth.size > 0 || this.Info?.limitation?.auth_required === true); if (this.Socket?.readyState !== WebSocket.OPEN || authPending) { this.PendingRaw.push(obj); @@ -386,11 +367,12 @@ export class Connection extends ExternalStore { throw new Error(`Socket is not open, state is ${this.Socket?.readyState}`); } const json = JSON.stringify(obj); + this.#activity = unixNowMs(); this.Socket.send(json); return true; } - async _OnAuthAsync(challenge: string): Promise { + async #onAuthAsync(challenge: string): Promise { const authCleanup = () => { this.AwaitingAuth.delete(challenge); }; @@ -426,4 +408,23 @@ export class Connection extends ExternalStore { get #maxSubscriptions() { return this.Info?.limitation?.max_subscriptions ?? 25; } + + #setupEphemeral() { + if (this.Ephemeral) { + if (this.#ephemeralCheck) { + clearInterval(this.#ephemeralCheck); + this.#ephemeralCheck = undefined; + } + this.#ephemeralCheck = setInterval(() => { + const lastActivity = unixNowMs() - this.#activity; + if (lastActivity > 30_000 && !this.IsClosed) { + if (this.ActiveRequests.size > 0) { + this.#log("%s Inactive connection has %d active requests! %O", this.Address, this.ActiveRequests.size, this.ActiveRequests); + } else { + this.Close(); + } + } + }, 5_000); + } + } }