forked from Kieran/snort
1
0
Fork 0

Resend leaveOpen queries

fixes #594
This commit is contained in:
Kieran 2023-06-27 10:38:26 +01:00
parent 9f8095b0df
commit 9f114ffb44
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
3 changed files with 32 additions and 5 deletions

View File

@ -61,7 +61,7 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
IsClosed: boolean;
ReconnectTimer?: ReturnType<typeof setTimeout>;
EventsCallback: Map<u256, (msg: boolean[]) => void>;
OnConnected?: () => void;
OnConnected?: (wasReconnect: boolean) => void;
OnEvent?: (sub: string, e: TaggedRawEvent) => void;
OnEose?: (sub: string) => void;
OnDisconnect?: (code: number) => void;
@ -106,16 +106,18 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
// ignored
}
const wasReconnect = this.Socket !== null && !this.IsClosed;
if (this.Socket) {
this.Id = uuid();
this.Socket.onopen = null;
this.Socket.onmessage = null;
this.Socket.onerror = null;
this.Socket.onclose = null;
this.Socket = null;
}
this.IsClosed = false;
this.Socket = new WebSocket(this.Address);
this.Socket.onopen = () => this.OnOpen();
this.Socket.onopen = () => this.OnOpen(wasReconnect);
this.Socket.onmessage = e => this.OnMessage(e);
this.Socket.onerror = e => this.OnError(e);
this.Socket.onclose = e => this.OnClose(e);
@ -127,12 +129,12 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
this.notifyChange();
}
OnOpen() {
OnOpen(wasReconnect: boolean) {
this.ConnectTimeout = DefaultConnectTimeout;
this.#log(`[${this.Address}] Open!`);
this.Down = false;
this.#setupEphemeral();
this.OnConnected?.();
this.OnConnected?.(wasReconnect);
this.#sendPendingRaw();
}

View File

@ -105,6 +105,7 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
c.OnEvent = (s, e) => this.OnEvent(s, e);
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
c.OnDisconnect = (code) => this.OnRelayDisconnect(c, code);
c.OnConnected = (r) => this.OnRelayConnected(c, r);
await c.Connect();
} else {
// update settings if already connected
@ -115,6 +116,14 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
}
}
OnRelayConnected(c: Connection, wasReconnect: boolean) {
if (wasReconnect) {
for (const [, q] of this.Queries) {
q.connectionRestored(c);
}
}
}
OnRelayDisconnect(c: Connection, code: number) {
this.#relayMetrics.onDisconnect(c, code);
for (const [, q] of this.Queries) {
@ -147,6 +156,7 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
c.OnEvent = (s, e) => this.OnEvent(s, e);
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
c.OnDisconnect = code => this.OnRelayDisconnect(c, code);
c.OnConnected = (r) => this.OnRelayConnected(c, r);
await c.Connect();
return c;
}

View File

@ -152,6 +152,10 @@ export class Query implements QueryBase {
this.#checkTraces();
}
isOpen() {
return this.#cancelAt === undefined && this.#leaveOpen;
}
canRemove() {
return this.#cancelAt !== undefined && this.#cancelAt < unixNowMs();
}
@ -212,6 +216,17 @@ export class Query implements QueryBase {
this.#tracing.filter(a => a.connId == id).forEach(a => a.forceEose());
}
connectionRestored(c: Connection) {
if (this.isOpen()) {
for (const qt of this.#tracing) {
if (qt.relay === c.Address) {
debugger;
c.QueueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay());
}
}
}
}
sendClose() {
for (const qt of this.#tracing) {
qt.sendClose();
@ -287,7 +302,7 @@ export class Query implements QueryBase {
() => this.#onProgress()
);
this.#tracing.push(qt);
c.QueueReq(["REQ", qt.id, ...q.filters], () => qt.sentToRelay());
c.QueueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay());
return qt;
}
}