From dd46586e4343423c3969aad2d372f5e69031a4c7 Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 6 Apr 2023 22:37:40 +0100 Subject: [PATCH] tmp: query tracing --- packages/app/src/Hooks/useRequestBuilder.tsx | 6 +- packages/app/src/System/Query.test.ts | 41 ++--- packages/app/src/System/Query.ts | 163 ++++++++++++++++--- packages/app/src/System/index.ts | 67 +++----- packages/nostr/src/legacy/Connection.ts | 40 +++-- 5 files changed, 206 insertions(+), 111 deletions(-) diff --git a/packages/app/src/Hooks/useRequestBuilder.tsx b/packages/app/src/Hooks/useRequestBuilder.tsx index 00d4e493..a1bba65a 100644 --- a/packages/app/src/Hooks/useRequestBuilder.tsx +++ b/packages/app/src/Hooks/useRequestBuilder.tsx @@ -30,9 +30,9 @@ const useRequestBuilder = => { if (rb?.id) { - const feed = System.GetFeed(rb.id); - if (feed) { - return unwrap(feed).snapshot as StoreSnapshot; + const q = System.GetQuery(rb.id); + if (q) { + return unwrap(q).feed?.snapshot as StoreSnapshot; } } return EmptySnapshot as StoreSnapshot; diff --git a/packages/app/src/System/Query.test.ts b/packages/app/src/System/Query.test.ts index 61a0369c..3771e2d6 100644 --- a/packages/app/src/System/Query.test.ts +++ b/packages/app/src/System/Query.test.ts @@ -1,5 +1,4 @@ import { Connection } from "@snort/nostr"; -import { unixNow } from "Util"; import { Query } from "./Query"; import { getRandomValues } from "crypto"; @@ -8,15 +7,12 @@ window.crypto.getRandomValues = getRandomValues as any; describe("query", () => { test("progress", () => { - const q = new Query("test", { - filters: [ - { - kinds: [1], - authors: ["test"], - }, - ], - started: unixNow(), - }); + const q = new Query("test", [ + { + kinds: [1], + authors: ["test"], + }, + ]); const opt = { read: true, write: true, @@ -33,29 +29,26 @@ describe("query", () => { q.sendToRelay(c3); expect(q.progress).toBe(0); - q.eose(q.id, c1.Address); + q.eose(q.id, c1); expect(q.progress).toBe(1 / 3); - q.eose(q.id, c1.Address); + q.eose(q.id, c1); expect(q.progress).toBe(1 / 3); - q.eose(q.id, c2.Address); + q.eose(q.id, c2); expect(q.progress).toBe(2 / 3); - q.eose(q.id, c3.Address); + q.eose(q.id, c3); expect(q.progress).toBe(1); - const qs = new Query("test-1", { - filters: [ - { - kinds: [1], - authors: ["test-sub"], - }, - ], - started: unixNow(), - }); + const qs = new Query("test-1", [ + { + kinds: [1], + authors: ["test-sub"], + }, + ]); q.subQueries.push(qs); qs.sendToRelay(c1); expect(q.progress).toBe(0.5); - q.eose(qs.id, c1.Address); + q.eose(qs.id, c1); expect(q.progress).toBe(1); qs.sendToRelay(c2); // 1 + 0.5 (1/2 sent sub query) diff --git a/packages/app/src/System/Query.ts b/packages/app/src/System/Query.ts index 0bdfe30b..088681a1 100644 --- a/packages/app/src/System/Query.ts +++ b/packages/app/src/System/Query.ts @@ -1,10 +1,77 @@ +import { v4 as uuid } from "uuid"; import { Connection, RawReqFilter, Nips } from "@snort/nostr"; import { unixNowMs } from "Util"; +import { NoteStore } from "./NoteCollection"; +/** + * Tracing for relay query status + */ +class QueryTrace { + readonly id: string; + readonly subId: string; + readonly relay: string; + readonly connId: string; + readonly start: number; + sent?: number; + eose?: number; + close?: number; + #wasForceClosed = false; + readonly #fnClose: (id: string) => void; -export interface QueryRequest { - filters: Array; - started: number; - finished?: number; + constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void) { + this.id = uuid(); + this.subId = sub; + this.relay = relay; + this.connId = connId; + this.start = unixNowMs(); + this.#fnClose = fnClose; + } + + sentToRelay() { + this.sent = unixNowMs(); + } + + gotEose() { + this.eose = unixNowMs(); + } + + forceEose() { + this.eose = unixNowMs(); + this.#wasForceClosed = true; + } + + sendClose() { + this.close = unixNowMs(); + this.#fnClose(this.subId); + } + + log() { + console.debug( + `QT:${this.id}, ${this.relay}, ${this.subId}, finished=${ + this.finished + }, queued=${this.queued.toLocaleString()}ms, runtime=${this.runtime?.toLocaleString()}ms` + ); + } + + /** + * Time spent in queue + */ + get queued() { + return (this.sent === undefined ? unixNowMs() : this.sent) - this.start; + } + + /** + * Total query runtime + */ + get runtime() { + return (this.eose === undefined ? unixNowMs() : this.eose) - this.start; + } + + /** + * If tracing is finished, we got EOSE or timeout + */ + get finished() { + return this.eose !== undefined; + } } /** @@ -19,7 +86,7 @@ export class Query { /** * The query payload (REQ filters) */ - request: QueryRequest; + filters: Array; /** * Sub-Queries which are connected to this subscription @@ -29,12 +96,7 @@ export class Query { /** * Which relays this query has already been executed on */ - #sentToRelays: Array> = []; - - /** - * When each relay returned EOSE - */ - #eoseRelays: Map = new Map(); + #tracing: Array = []; /** * Leave the query open until its removed @@ -51,9 +113,21 @@ export class Query { */ #cancelTimeout?: number; - constructor(id: string, request: QueryRequest) { + /** + * Timer used to track tracing status + */ + #checkTrace?: ReturnType; + + /** + * Feed object which collects events + */ + #feed?: NoteStore; + + constructor(id: string, filters: Array, feed?: NoteStore) { this.id = id; - this.request = request; + this.filters = filters; + this.#feed = feed; + this.#checkTraces(); } get closing() { @@ -64,6 +138,10 @@ export class Query { return this.#cancelTimeout; } + get feed() { + return this.#feed; + } + cancel() { this.#cancelTimeout = unixNowMs() + 5_000; } @@ -72,6 +150,11 @@ export class Query { this.#cancelTimeout = undefined; } + cleanup() { + console.debug("Cleanup", this.id); + this.#stopCheckTraces(); + } + sendToRelay(c: Connection) { if (this.relays.length > 0 && !this.relays.includes(c.Address)) { return; @@ -80,31 +163,47 @@ export class Query { console.debug("Cant send non-specific REQ to ephemeral connection"); return; } - if (this.request.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) { + if (this.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) { console.debug("Cant send REQ to non-search relay", c.Address); return; } - c.QueueReq(["REQ", this.id, ...this.request.filters]); - this.#sentToRelays.push(c); + const qt = new QueryTrace(this.id, c.Address, c.Id, x => c.CloseReq(x)); + this.#tracing.push(qt); + c.QueueReq(["REQ", this.id, ...this.filters], () => qt.sentToRelay()); + } + + connectionLost(c: Connection, active: Array, pending: Array) { + const allQueriesLost = [...active, ...pending].filter(a => this.id === a || this.subQueries.some(b => b.id === a)); + if (allQueriesLost.length > 0) { + console.debug("Lost", allQueriesLost, c.Address, c.Id); + } } sendClose() { - for (const c of this.#sentToRelays) { - c.CloseReq(this.id); + for (const qt of this.#tracing) { + qt.sendClose(); } for (const sq of this.subQueries) { sq.sendClose(); } + this.cleanup(); } - eose(sub: string, relay: string) { + eose(sub: string, conn: Readonly) { + const qt = this.#tracing.filter(a => a.subId === sub && a.connId === conn.Id); if (sub === this.id) { - console.debug(`[EOSE][${sub}] ${relay}`); - this.#eoseRelays.set(relay, unixNowMs()); + console.debug(`[EOSE][${sub}] ${conn.Address}`); + qt.forEach(a => a.gotEose()); + if (this.#feed) { + this.#feed.loading = this.progress < 1; + } + if (!this.leaveOpen) { + this.sendClose(); + } } else { const subQ = this.subQueries.find(a => a.id === sub); if (subQ) { - subQ.eose(sub, relay); + subQ.eose(sub, conn); } else { throw new Error("No query found"); } @@ -115,7 +214,7 @@ export class Query { * Get the progress to EOSE, can be used to determine when we should load more content */ get progress() { - let thisProgress = this.#eoseRelays.size / this.#sentToRelays.reduce((acc, v) => (acc += v.Down ? 0 : 1), 0); + let thisProgress = this.#tracing.reduce((acc, v) => (acc += v.finished ? 1 : 0), 0) / this.#tracing.length; if (isNaN(thisProgress)) { thisProgress = 0; } @@ -129,4 +228,22 @@ export class Query { } return totalProgress / (this.subQueries.length + 1); } + + #stopCheckTraces() { + if (this.#checkTrace) { + clearInterval(this.#checkTrace); + } + } + + #checkTraces() { + this.#stopCheckTraces(); + this.#checkTrace = setInterval(() => { + for (const v of this.#tracing) { + //v.log(); + if (v.runtime > 5_000 && !v.finished) { + v.forceEose(); + } + } + }, 2_000); + } } diff --git a/packages/app/src/System/index.ts b/packages/app/src/System/index.ts index 73474f51..e65defe1 100644 --- a/packages/app/src/System/index.ts +++ b/packages/app/src/System/index.ts @@ -50,11 +50,6 @@ export class NostrSystem { */ Queries: Map = new Map(); - /** - * Collection of all feeds which are keyed by subscription id - */ - Feeds: Map = new Map(); - /** * Handler function for NIP-42 */ @@ -98,6 +93,7 @@ export class NostrSystem { this.Sockets.set(addr, c); c.OnEvent = (s, e) => this.OnEvent(s, e); c.OnEose = s => this.OnEndOfStoredEvents(c, s); + c.OnDisconnect = (a, p) => this.OnRelayDisconnect(c, a, p); c.OnConnected = () => { for (const [, q] of this.Queries) { q.sendToRelay(c); @@ -113,37 +109,26 @@ export class NostrSystem { } } - OnEndOfStoredEvents(c: Connection, sub: string) { + OnRelayDisconnect(c: Connection, active: Array, pending: Array) { + for (const [, q] of this.Queries) { + q.connectionLost(c, active, pending); + } + } + + OnEndOfStoredEvents(c: Readonly, sub: string) { const q = this.GetQuery(sub); if (q) { - q.eose(sub, c.Address); - const f = this.Feeds.get(q.id); - if (f) { - f.loading = q.progress <= 0.5; - console.debug(`${sub} loading=${f.loading}, progress=${q.progress}`); - } - if (!q.leaveOpen) { - c.CloseReq(sub); - } + q.eose(sub, c); } } OnEvent(sub: string, ev: TaggedRawEvent) { - const feed = this.GetFeed(sub); - if (feed) { - feed.add(ev); + const q = this.GetQuery(sub); + if (q?.feed) { + q.feed.add(ev); } } - GetFeed(sub: string) { - const subFilterId = /-\d+$/i; - if (sub.match(subFilterId)) { - // feed events back into parent query - sub = sub.split(subFilterId)[0]; - } - return this.Feeds.get(sub); - } - GetQuery(sub: string) { const subFilterId = /-\d+$/i; if (sub.match(subFilterId)) { @@ -165,6 +150,7 @@ export class NostrSystem { this.Sockets.set(addr, c); c.OnEvent = (s, e) => this.OnEvent(s, e); c.OnEose = s => this.OnEndOfStoredEvents(c, s); + c.OnDisconnect = (a, p) => this.OnRelayDisconnect(c, a, p); c.OnConnected = () => { for (const [, q] of this.Queries) { q.sendToRelay(c); @@ -221,18 +207,15 @@ export class NostrSystem { const q = unwrap(this.Queries.get(req.id)); q.unCancel(); - const diff = diffFilters(q.request.filters, filters); + const diff = diffFilters(q.filters, filters); if (!diff.changed && !req.options?.skipDiff) { this.#changed(); - return unwrap(this.Feeds.get(req.id)) as Readonly; + return unwrap(q.feed) as Readonly; } else { - const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, { - filters: diff.filters, - started: unixNowMs(), - }); + const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, filters); q.subQueries.push(subQ); - q.request.filters = filters; - const f = unwrap(this.Feeds.get(req.id)); + q.filters = filters; + const f = unwrap(q.feed); f.loading = true; this.SendQuery(subQ); this.#changed(); @@ -244,11 +227,8 @@ export class NostrSystem { } AddQuery(type: { new (): T }, rb: RequestBuilder): T { - const q = new Query(rb.id, { - filters: rb.build(), - started: unixNowMs(), - finished: 0, - }); + const store = new type(); + const q = new Query(rb.id, rb.build(), store); if (rb.options?.leaveOpen) { q.leaveOpen = rb.options.leaveOpen; } @@ -257,8 +237,6 @@ export class NostrSystem { } this.Queries.set(rb.id, q); - const store = new type(); - this.Feeds.set(rb.id, store); this.SendQuery(q); this.#changed(); return store; @@ -301,9 +279,9 @@ export class NostrSystem { queries: [...this.Queries.values()].map(a => { return { id: a.id, - filters: a.request.filters, + filters: a.filters, closing: a.closing, - subFilters: a.subQueries.map(a => a.request.filters).flat(), + subFilters: a.subQueries.map(a => a.filters).flat(), }; }), }); @@ -319,7 +297,6 @@ export class NostrSystem { if (v.closingAt && v.closingAt < now) { v.sendClose(); this.Queries.delete(k); - this.Feeds.delete(k); console.debug("Removed:", k); changed = true; } diff --git a/packages/nostr/src/legacy/Connection.ts b/packages/nostr/src/legacy/Connection.ts index 5e0ddd3c..8a1b5723 100644 --- a/packages/nostr/src/legacy/Connection.ts +++ b/packages/nostr/src/legacy/Connection.ts @@ -43,7 +43,10 @@ export class Connection { Socket: WebSocket | null = null; PendingRaw: Array = []; - PendingRequests: Array = []; + PendingRequests: Array<{ + cmd: ReqCommand, + cb: () => void + }> = []; ActiveRequests: Set = new Set(); Settings: RelaySettings; @@ -60,6 +63,7 @@ export class Connection { OnConnected?: () => void; OnEvent?: (sub: string, e: TaggedRawEvent) => void; OnEose?: (sub: string) => void; + OnDisconnect?: (active: Array, pending: Array) => void; Auth?: AuthHandler; AwaitingAuth: Map; Authed = false; @@ -162,7 +166,12 @@ export class Connection { OnClose(e: CloseEvent) { if (!this.IsClosed) { + this.OnDisconnect?.([...this.ActiveRequests], this.PendingRequests.map(a => a.cmd[1])) this.#ResetQueues(); + + // reset connection Id on disconnect, for query-tracking + this.Id = uuid(); + this.ConnectTimeout = this.ConnectTimeout * 2; console.log( `[${this.Address}] Closed (${e.reason}), trying again in ${( @@ -303,13 +312,16 @@ export class Connection { * Queue or send command to the relay * @param cmd The REQ to send to the server */ - QueueReq(cmd: ReqCommand) { + QueueReq(cmd: ReqCommand, cbSent: () => void) { if (this.ActiveRequests.size >= this.#maxSubscriptions) { - this.PendingRequests.push(cmd); + this.PendingRequests.push({ + cmd, cb: cbSent + }); console.debug("Queuing:", this.Address, cmd); } else { this.ActiveRequests.add(cmd[1]); this.#SendJson(cmd); + cbSent(); } this.#UpdateState(); } @@ -327,21 +339,18 @@ export class Connection { const canSend = this.#maxSubscriptions - this.ActiveRequests.size; if (canSend > 0) { for (let x = 0; x < canSend; x++) { - const cmd = this.PendingRequests.shift(); - if (cmd) { - this.ActiveRequests.add(cmd[1]); - this.#SendJson(cmd); - console.debug("Sent pending REQ", this.Address, cmd); + const p = this.PendingRequests.shift(); + if (p) { + this.ActiveRequests.add(p.cmd[1]); + this.#SendJson(p.cmd); + p.cb(); + console.debug("Sent pending REQ", this.Address, p.cmd); } } } } #ResetQueues() { - //send EOSE on disconnect for active subs - this.ActiveRequests.forEach((v) => this.OnEose?.(v)); - this.PendingRequests.forEach((v) => this.OnEose?.(v[1])); - this.ActiveRequests.clear(); this.PendingRequests = []; this.PendingRaw = []; @@ -360,9 +369,7 @@ export class Connection { this.CurrentState.disconnects = this.Stats.Disconnects; this.CurrentState.info = this.Info; this.CurrentState.id = this.Id; - this.CurrentState.pendingRequests = [ - ...this.PendingRequests.map((a) => a[1]), - ]; + this.CurrentState.pendingRequests = [...this.PendingRequests.map(a => a.cmd[1])]; this.CurrentState.activeRequests = [...this.ActiveRequests]; this.Stats.Latency = this.Stats.Latency.slice(-20); // trim this.HasStateChange = true; @@ -380,7 +387,7 @@ export class Connection { const authPending = !this.Authed && (this.AwaitingAuth.size > 0 || this.Info?.limitation?.auth_required === true); if (this.Socket?.readyState !== WebSocket.OPEN || authPending) { this.PendingRaw.push(obj); - return; + return false; } this.#sendPendingRaw(); @@ -402,6 +409,7 @@ export class Connection { } const json = JSON.stringify(obj); this.Socket.send(json); + return true; } async _OnAuthAsync(challenge: string): Promise {