tweak ephemeral connection timeout logic

This commit is contained in:
Kieran 2023-06-17 18:17:39 +01:00
parent c02cd9c300
commit 119d1c526e
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941

View File

@ -1,6 +1,6 @@
import { v4 as uuid } from "uuid";
import debug from "debug";
import { unwrap, ExternalStore } from "@snort/shared";
import { unwrap, ExternalStore, unixNowMs } from "@snort/shared";
import { DefaultConnectTimeout } from "./Const";
import { ConnectionStats } from "./ConnectionStats";
@ -39,6 +39,9 @@ export interface ConnectionStateSnapshot {
export class Connection extends ExternalStore<ConnectionStateSnapshot> {
#log = debug("Connection");
#ephemeralCheck?: ReturnType<typeof setInterval>;
#activity: number = unixNowMs();
Id: string;
Address: string;
Socket: WebSocket | null = null;
@ -66,7 +69,6 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
AwaitingAuth: Map<string, boolean>;
Authed = false;
Ephemeral: boolean;
EphemeralTimeout?: ReturnType<typeof setTimeout>;
Down = true;
constructor(addr: string, options: RelaySettings, auth?: AuthHandler, ephemeral: boolean = false) {
@ -81,17 +83,6 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
this.Ephemeral = ephemeral;
}
ResetEphemeralTimeout() {
if (this.EphemeralTimeout) {
clearTimeout(this.EphemeralTimeout);
}
if (this.Ephemeral) {
this.EphemeralTimeout = setTimeout(() => {
this.Close();
}, 30_000);
}
}
async Connect() {
try {
if (this.Info === undefined) {
@ -111,8 +102,8 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
this.Info = data;
}
}
} catch (e) {
this.#log("Could not load relay information %O", e);
} catch {
// ignored
}
if (this.Socket) {
@ -132,10 +123,6 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
Close() {
this.IsClosed = true;
if (this.ReconnectTimer !== null) {
clearTimeout(this.ReconnectTimer);
this.ReconnectTimer = undefined;
}
this.Socket?.close();
this.notifyChange();
}
@ -144,18 +131,12 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
this.ConnectTimeout = DefaultConnectTimeout;
this.#log(`[${this.Address}] Open!`);
this.Down = false;
if (this.Ephemeral) {
this.ResetEphemeralTimeout();
}
this.#setupEphemeral();
this.OnConnected?.();
this.#sendPendingRaw();
}
OnClose(e: CloseEvent) {
if (this.EphemeralTimeout) {
clearTimeout(this.EphemeralTimeout);
this.EphemeralTimeout = undefined;
}
if (this.ReconnectTimer) {
clearTimeout(this.ReconnectTimer);
this.ReconnectTimer = undefined;
@ -168,7 +149,7 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
} else if (!this.IsClosed) {
this.ConnectTimeout = this.ConnectTimeout * 2;
this.#log(
`[${this.Address}] Closed (${e.reason}), trying again in ${(this.ConnectTimeout / 1000)
`[${this.Address}] Closed (code=${e.code}), trying again in ${(this.ConnectTimeout / 1000)
.toFixed(0)
.toLocaleString()} sec`
);
@ -182,19 +163,20 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
}
this.OnDisconnect?.(this.Id);
this.#ResetQueues();
this.#resetQueues();
// reset connection Id on disconnect, for query-tracking
this.Id = uuid();
this.notifyChange();
}
OnMessage(e: MessageEvent) {
this.#activity = unixNowMs();
if (e.data.length > 0) {
const msg = JSON.parse(e.data);
const tag = msg[0];
switch (tag) {
case "AUTH": {
this._OnAuthAsync(msg[1])
this.#onAuthAsync(msg[1])
.then(() => this.#sendPendingRaw())
.catch(this.#log);
this.Stats.EventsReceived++;
@ -250,7 +232,7 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
return;
}
const req = ["EVENT", e];
this.#SendJson(req);
this.#sendJson(req);
this.Stats.EventsSent++;
this.notifyChange();
}
@ -273,7 +255,7 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
});
const req = ["EVENT", e];
this.#SendJson(req);
this.#sendJson(req);
this.Stats.EventsSent++;
this.notifyChange();
});
@ -299,16 +281,15 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
this.#log("Queuing: %s %O", this.Address, cmd);
} else {
this.ActiveRequests.add(cmd[1]);
this.#SendJson(cmd);
this.#sendJson(cmd);
cbSent();
}
this.ResetEphemeralTimeout();
this.notifyChange();
}
CloseReq(id: string) {
if (this.ActiveRequests.delete(id)) {
this.#SendJson(["CLOSE", id]);
this.#sendJson(["CLOSE", id]);
this.OnEose?.(id);
this.#SendQueuedRequests();
}
@ -343,7 +324,7 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
const p = this.PendingRequests.shift();
if (p) {
this.ActiveRequests.add(p.cmd[1]);
this.#SendJson(p.cmd);
this.#sendJson(p.cmd);
p.cb();
this.#log("Sent pending REQ %s %O", this.Address, p.cmd);
}
@ -351,14 +332,14 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
}
}
#ResetQueues() {
#resetQueues() {
this.ActiveRequests.clear();
this.PendingRequests = [];
this.PendingRaw = [];
this.notifyChange();
}
#SendJson(obj: object) {
#sendJson(obj: object) {
const authPending = !this.Authed && (this.AwaitingAuth.size > 0 || this.Info?.limitation?.auth_required === true);
if (this.Socket?.readyState !== WebSocket.OPEN || authPending) {
this.PendingRaw.push(obj);
@ -386,11 +367,12 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
throw new Error(`Socket is not open, state is ${this.Socket?.readyState}`);
}
const json = JSON.stringify(obj);
this.#activity = unixNowMs();
this.Socket.send(json);
return true;
}
async _OnAuthAsync(challenge: string): Promise<void> {
async #onAuthAsync(challenge: string): Promise<void> {
const authCleanup = () => {
this.AwaitingAuth.delete(challenge);
};
@ -426,4 +408,23 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
get #maxSubscriptions() {
return this.Info?.limitation?.max_subscriptions ?? 25;
}
#setupEphemeral() {
if (this.Ephemeral) {
if (this.#ephemeralCheck) {
clearInterval(this.#ephemeralCheck);
this.#ephemeralCheck = undefined;
}
this.#ephemeralCheck = setInterval(() => {
const lastActivity = unixNowMs() - this.#activity;
if (lastActivity > 30_000 && !this.IsClosed) {
if (this.ActiveRequests.size > 0) {
this.#log("%s Inactive connection has %d active requests! %O", this.Address, this.ActiveRequests.size, this.ActiveRequests);
} else {
this.Close();
}
}
}, 5_000);
}
}
}