From 9f114ffb44b21cb0fcdcc896e0f6ed4cf98da660 Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 27 Jun 2023 10:38:26 +0100 Subject: [PATCH] Resend leaveOpen queries fixes #594 --- packages/system/src/connection.ts | 10 ++++++---- packages/system/src/nostr-system.ts | 10 ++++++++++ packages/system/src/query.ts | 17 ++++++++++++++++- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index ef8e118a..d99a2e3c 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -61,7 +61,7 @@ export class Connection extends ExternalStore { IsClosed: boolean; ReconnectTimer?: ReturnType; EventsCallback: Map void>; - OnConnected?: () => void; + OnConnected?: (wasReconnect: boolean) => void; OnEvent?: (sub: string, e: TaggedRawEvent) => void; OnEose?: (sub: string) => void; OnDisconnect?: (code: number) => void; @@ -106,16 +106,18 @@ export class Connection extends ExternalStore { // ignored } + const wasReconnect = this.Socket !== null && !this.IsClosed; if (this.Socket) { this.Id = uuid(); this.Socket.onopen = null; this.Socket.onmessage = null; this.Socket.onerror = null; this.Socket.onclose = null; + this.Socket = null; } this.IsClosed = false; this.Socket = new WebSocket(this.Address); - this.Socket.onopen = () => this.OnOpen(); + this.Socket.onopen = () => this.OnOpen(wasReconnect); this.Socket.onmessage = e => this.OnMessage(e); this.Socket.onerror = e => this.OnError(e); this.Socket.onclose = e => this.OnClose(e); @@ -127,12 +129,12 @@ export class Connection extends ExternalStore { this.notifyChange(); } - OnOpen() { + OnOpen(wasReconnect: boolean) { this.ConnectTimeout = DefaultConnectTimeout; this.#log(`[${this.Address}] Open!`); this.Down = false; this.#setupEphemeral(); - this.OnConnected?.(); + this.OnConnected?.(wasReconnect); this.#sendPendingRaw(); } diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 38a3e7bb..61486cae 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -105,6 +105,7 @@ export class NostrSystem extends ExternalStore implements System c.OnEvent = (s, e) => this.OnEvent(s, e); c.OnEose = s => this.OnEndOfStoredEvents(c, s); c.OnDisconnect = (code) => this.OnRelayDisconnect(c, code); + c.OnConnected = (r) => this.OnRelayConnected(c, r); await c.Connect(); } else { // update settings if already connected @@ -115,6 +116,14 @@ export class NostrSystem extends ExternalStore implements System } } + OnRelayConnected(c: Connection, wasReconnect: boolean) { + if (wasReconnect) { + for (const [, q] of this.Queries) { + q.connectionRestored(c); + } + } + } + OnRelayDisconnect(c: Connection, code: number) { this.#relayMetrics.onDisconnect(c, code); for (const [, q] of this.Queries) { @@ -147,6 +156,7 @@ export class NostrSystem extends ExternalStore implements System c.OnEvent = (s, e) => this.OnEvent(s, e); c.OnEose = s => this.OnEndOfStoredEvents(c, s); c.OnDisconnect = code => this.OnRelayDisconnect(c, code); + c.OnConnected = (r) => this.OnRelayConnected(c, r); await c.Connect(); return c; } diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index c5cf2a79..570135d3 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -152,6 +152,10 @@ export class Query implements QueryBase { this.#checkTraces(); } + isOpen() { + return this.#cancelAt === undefined && this.#leaveOpen; + } + canRemove() { return this.#cancelAt !== undefined && this.#cancelAt < unixNowMs(); } @@ -212,6 +216,17 @@ export class Query implements QueryBase { this.#tracing.filter(a => a.connId == id).forEach(a => a.forceEose()); } + connectionRestored(c: Connection) { + if (this.isOpen()) { + for (const qt of this.#tracing) { + if (qt.relay === c.Address) { + debugger; + c.QueueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay()); + } + } + } + } + sendClose() { for (const qt of this.#tracing) { qt.sendClose(); @@ -287,7 +302,7 @@ export class Query implements QueryBase { () => this.#onProgress() ); this.#tracing.push(qt); - c.QueueReq(["REQ", qt.id, ...q.filters], () => qt.sentToRelay()); + c.QueueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay()); return qt; } }