From 7b151e1b17b22da823b1aad7e3bd78a3e043f2d9 Mon Sep 17 00:00:00 2001 From: Kieran Date: Wed, 24 May 2023 17:17:17 +0100 Subject: [PATCH] sub-query via query trace --- packages/app/src/Element/Timeline.tsx | 2 +- packages/app/src/Hooks/useSystemState.tsx | 2 +- packages/app/src/Pages/Debug.tsx | 9 ++ packages/app/src/System/Query.test.ts | 21 ++-- packages/app/src/System/Query.ts | 122 ++++++++++++++-------- packages/app/src/System/index.ts | 94 +++++++++-------- packages/app/src/index.tsx | 5 + 7 files changed, 151 insertions(+), 104 deletions(-) create mode 100644 packages/app/src/Pages/Debug.tsx diff --git a/packages/app/src/Element/Timeline.tsx b/packages/app/src/Element/Timeline.tsx index 039fed06..af28e335 100644 --- a/packages/app/src/Element/Timeline.tsx +++ b/packages/app/src/Element/Timeline.tsx @@ -143,7 +143,7 @@ const Timeline = (props: TimelineProps) => { )} {mainFeed.map(eventElement)} {(props.loadMore === undefined || props.loadMore === true) && ( - + feed.loadMore()} shouldLoadMore={!feed.loading}> diff --git a/packages/app/src/Hooks/useSystemState.tsx b/packages/app/src/Hooks/useSystemState.tsx index c128a242..a9aa8b94 100644 --- a/packages/app/src/Hooks/useSystemState.tsx +++ b/packages/app/src/Hooks/useSystemState.tsx @@ -4,6 +4,6 @@ import { System, SystemSnapshot } from "System"; export default function useSystemState() { return useSyncExternalStore( cb => System.hook(cb), - () => System.getSnapshot() + () => System.snapshot() ); } diff --git a/packages/app/src/Pages/Debug.tsx b/packages/app/src/Pages/Debug.tsx new file mode 100644 index 00000000..37c650d6 --- /dev/null +++ b/packages/app/src/Pages/Debug.tsx @@ -0,0 +1,9 @@ +import SubDebug from "Element/SubDebug"; + +export default function DebugPage() { + return ( + <> + + + ); +} diff --git a/packages/app/src/System/Query.test.ts b/packages/app/src/System/Query.test.ts index 2d6bf824..3c8d6735 100644 --- a/packages/app/src/System/Query.test.ts +++ b/packages/app/src/System/Query.test.ts @@ -1,6 +1,6 @@ import { Connection } from "@snort/nostr"; import { describe, expect } from "@jest/globals"; -import { Query } from "./Query"; +import { Query, QueryBase } from "./Query"; import { getRandomValues } from "crypto"; import { FlatNoteStore } from "./NoteCollection"; @@ -44,24 +44,21 @@ describe("query", () => { q.eose(q.id, c3); expect(q.progress).toBe(1); - const qs = new Query( - "test-1", - [ + const qs = { + id: "test-1", + filters: [ { kinds: [1], authors: ["test-sub"], }, ], - new FlatNoteStore() - ); - q.subQueries.push(qs); - qs.sendToRelay(c1); + } as QueryBase; + q.sendSubQueryToRelay(c1, qs); - expect(q.progress).toBe(0.5); + expect(q.progress).toBe(3 / 4); q.eose(qs.id, c1); expect(q.progress).toBe(1); - qs.sendToRelay(c2); - // 1 + 0.5 (1/2 sent sub query) - expect(q.progress).toBe(1.5 / 2); + q.sendSubQueryToRelay(c2, qs); + expect(q.progress).toBe(4 / 5); }); }); diff --git a/packages/app/src/System/Query.ts b/packages/app/src/System/Query.ts index 36178492..01b45c5d 100644 --- a/packages/app/src/System/Query.ts +++ b/packages/app/src/System/Query.ts @@ -16,18 +16,21 @@ class QueryTrace { close?: number; #wasForceClosed = false; readonly #fnClose: (id: string) => void; + readonly #fnProgress: () => void; - constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void) { + constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void, fnProgress: () => void) { this.id = uuid(); this.subId = sub; this.relay = relay; this.connId = connId; this.start = unixNowMs(); this.#fnClose = fnClose; + this.#fnProgress = fnProgress; } sentToRelay() { this.sent = unixNowMs(); + this.#fnProgress(); } gotEose() { @@ -35,23 +38,28 @@ class QueryTrace { if (this.responseTime > 5_000) { console.debug(`Slow query ${this.subId} on ${this.relay} took ${this.responseTime.toLocaleString()}ms`); } + this.#fnProgress(); + console.debug(`[EOSE][${this.subId}] ${this.relay}`); } forceEose() { this.eose = unixNowMs(); this.#wasForceClosed = true; + this.#fnProgress(); + console.debug(`[F-EOSE][${this.subId}] ${this.relay}`); } sendClose() { this.close = unixNowMs(); this.#fnClose(this.subId); + this.#fnProgress(); } log() { console.debug( - `QT:${this.id}, ${this.relay}, ${this.subId}, finished=${ + `QT:${this.id}, ${this.subId}, finished=${ this.finished - }, queued=${this.queued.toLocaleString()}ms, runtime=${this.runtime?.toLocaleString()}ms` + }, queued=${this.queued.toLocaleString()}ms, runtime=${this.runtime?.toLocaleString()}ms, ${this.relay}` ); } @@ -59,7 +67,7 @@ class QueryTrace { * Time spent in queue */ get queued() { - return (this.sent === undefined ? unixNowMs() : this.sent) - this.start; + return (this.sent === undefined ? unixNowMs() : this.#wasForceClosed ? unwrap(this.eose) : this.sent) - this.start; } /** @@ -84,10 +92,7 @@ class QueryTrace { } } -/** - * Active or queued query on the system - */ -export class Query { +export interface QueryBase { /** * Uniquie ID of this query */ @@ -99,9 +104,18 @@ export class Query { filters: Array; /** - * Sub-Queries which are connected to this subscription + * List of relays to send this query to */ - subQueries: Array = []; + relays?: Array; +} + +/** + * Active or queued query on the system + */ +export class Query implements QueryBase { + id: string; + filters: Array; + relays?: Array; /** * Which relays this query has already been executed on @@ -113,11 +127,6 @@ export class Query { */ leaveOpen = false; - /** - * List of relays to send this query to - */ - relays: Array = []; - /** * Time when this query can be removed */ @@ -133,6 +142,8 @@ export class Query { */ #feed: NoteStore; + subQueryCounter = 0; + constructor(id: string, filters: Array, feed: NoteStore) { this.id = id; this.filters = filters; @@ -165,35 +176,36 @@ export class Query { } sendToRelay(c: Connection) { - if (this.relays.length > 0 && !this.relays.includes(c.Address)) { + if (!this.#canSendQuery(c, this)) { return; } - if (this.relays.length === 0 && c.Ephemeral) { - console.debug("Cant send non-specific REQ to ephemeral connection"); + this.#sendQueryInternal(c, this); + } + + sendSubQueryToRelay(c: Connection, subq: QueryBase) { + if (!this.#canSendQuery(c, subq)) { return; } - if (this.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) { - console.debug("Cant send REQ to non-search relay", c.Address); - return; - } - const qt = new QueryTrace(this.id, c.Address, c.Id, x => c.CloseReq(x)); - this.#tracing.push(qt); - c.QueueReq(["REQ", this.id, ...this.filters], () => qt.sentToRelay()); + this.#sendQueryInternal(c, subq); } connectionLost(c: Connection, active: Array, pending: Array) { - const allQueriesLost = [...active, ...pending].filter(a => this.id === a || this.subQueries.some(b => b.id === a)); + const allQueriesLost = [...active, ...pending].filter(a => this.id === a || this.#tracing.some(b => b.subId === a)); if (allQueriesLost.length > 0) { console.debug("Lost", allQueriesLost, c.Address, c.Id); } + for (const qLost of allQueriesLost) { + const qt = this.#tracing.find(a => a.subId === qLost && a.connId == c.Id); + qt?.forceEose(); + } } sendClose() { for (const qt of this.#tracing) { qt.sendClose(); } - for (const sq of this.subQueries) { - sq.sendClose(); + for (const qt of this.#tracing) { + qt.sendClose(); } this.cleanup(); } @@ -202,15 +214,9 @@ export class Query { const qt = this.#tracing.find(a => a.subId === sub && a.connId === conn.Id); qt?.gotEose(); if (sub === this.id) { - console.debug(`[EOSE][${sub}] ${conn.Address}`); if (!this.leaveOpen) { qt?.sendClose(); } - } else { - const subQ = this.subQueries.find(a => a.id === sub); - if (subQ) { - subQ.eose(sub, conn); - } } } @@ -218,19 +224,19 @@ export class Query { * Get the progress to EOSE, can be used to determine when we should load more content */ get progress() { - let thisProgress = this.#tracing.reduce((acc, v) => (acc += v.finished ? 1 : 0), 0) / this.#tracing.length; + const thisProgress = this.#tracing.reduce((acc, v) => (acc += v.finished ? 1 : 0), 0) / this.#tracing.length; if (isNaN(thisProgress)) { - thisProgress = 0; - } - if (this.subQueries.length === 0) { - return thisProgress; + return 0; } + return thisProgress; + } - let totalProgress = thisProgress; - for (const sq of this.subQueries) { - totalProgress += sq.progress; + #onProgress() { + const isFinished = this.progress === 1; + if (this.feed.loading !== isFinished) { + console.debug(`[QT] ${this.id}, loading=${this.feed.loading}, progress=${this.progress}`); + this.feed.loading = isFinished; } - return totalProgress / (this.subQueries.length + 1); } #stopCheckTraces() { @@ -243,11 +249,37 @@ export class Query { this.#stopCheckTraces(); this.#checkTrace = setInterval(() => { for (const v of this.#tracing) { - //v.log(); if (v.runtime > 5_000 && !v.finished) { v.forceEose(); } } - }, 2_000); + }, 500); + } + + #canSendQuery(c: Connection, q: QueryBase) { + if (q.relays && !q.relays.includes(c.Address)) { + return false; + } + if ((q.relays?.length ?? 0) === 0 && c.Ephemeral) { + console.debug("Cant send non-specific REQ to ephemeral connection"); + return false; + } + if (q.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) { + console.debug("Cant send REQ to non-search relay", c.Address); + return false; + } + return true; + } + + #sendQueryInternal(c: Connection, q: QueryBase) { + const qt = new QueryTrace( + q.id, + c.Address, + c.Id, + x => c.CloseReq(x), + () => this.#onProgress() + ); + this.#tracing.push(qt); + c.QueueReq(["REQ", q.id, ...q.filters], () => qt.sentToRelay()); } } diff --git a/packages/app/src/System/index.ts b/packages/app/src/System/index.ts index 02112fb1..13304c26 100644 --- a/packages/app/src/System/index.ts +++ b/packages/app/src/System/index.ts @@ -11,8 +11,9 @@ import { ReplaceableNoteStore, } from "./NoteCollection"; import { diffFilters } from "./RequestSplitter"; -import { Query } from "./Query"; +import { Query, QueryBase } from "./Query"; import { splitAllByWriteRelays } from "./GossipModel"; +import ExternalStore from "ExternalStore"; export { NoteStore, @@ -40,7 +41,7 @@ export type HookSystemSnapshot = () => void; /** * Manages nostr content retrieval system */ -export class NostrSystem { +export class NostrSystem extends ExternalStore { /** * All currently connected websockets */ @@ -56,33 +57,12 @@ export class NostrSystem { */ HandleAuth?: AuthHandler; - /** - * State change hooks - */ - #stateHooks: Array = []; - - /** - * Current snapshot of the system - */ - #snapshot: Readonly = { queries: [] }; - constructor() { + super(); this.Sockets = new Map(); this.#cleanup(); } - hook(cb: HookSystemSnapshot): HookSystemSnapshotRelease { - this.#stateHooks.push(cb); - return () => { - const idx = this.#stateHooks.findIndex(a => a === cb); - this.#stateHooks.splice(idx, 1); - }; - } - - getSnapshot(): Readonly { - return this.#snapshot; - } - /** * Connect to a NOSTR relay if not already connected */ @@ -210,19 +190,20 @@ export class NostrSystem { const diff = diffFilters(q.filters, filters); if (!diff.changed && !req.options?.skipDiff) { - this.#changed(); + this.notifyChange(); return unwrap(q.feed) as Readonly; } else { const splitFilters = splitAllByWriteRelays(filters); for (const sf of splitFilters) { - const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed); - subQ.relays = sf.relay ? [sf.relay] : []; - q.subQueries.push(subQ); - this.SendQuery(subQ); + const subQ = { + id: `${q.id}-${q.subQueryCounter++}`, + filters: sf.filters, + relays: sf.relay ? [sf.relay] : [], + } as QueryBase; + this.SendSubQuery(q, subQ); } q.filters = filters; - q.feed.loading = true; - this.#changed(); + this.notifyChange(); return q.feed as Readonly; } } else { @@ -246,15 +227,17 @@ export class NostrSystem { const splitFilters = splitAllByWriteRelays(filters); if (splitFilters.length > 1) { for (const sf of splitFilters) { - const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed); - subQ.relays = sf.relay ? [sf.relay] : []; - q.subQueries.push(subQ); - this.SendQuery(subQ); + const subQ = { + id: `${q.id}-${q.subQueryCounter++}`, + filters: sf.filters, + relays: sf.relay ? [sf.relay] : [], + } as QueryBase; + this.SendSubQuery(q, subQ); } } else { this.SendQuery(q); } - this.#changed(); + this.notifyChange(); return store; } @@ -266,7 +249,7 @@ export class NostrSystem { } async SendQuery(q: Query) { - if (q.relays.length > 0) { + if (q.relays && q.relays.length > 0) { for (const r of q.relays) { const s = this.Sockets.get(r); if (s) { @@ -289,6 +272,30 @@ export class NostrSystem { } } + async SendSubQuery(q: Query, subQ: QueryBase) { + if (subQ.relays && subQ.relays.length > 0) { + for (const r of subQ.relays) { + const s = this.Sockets.get(r); + if (s) { + q.sendSubQueryToRelay(s, subQ); + } else { + const nc = await this.ConnectEphemeralRelay(r); + if (nc) { + q.sendSubQueryToRelay(nc, subQ); + } else { + console.warn("Failed to connect to new relay for:", r, subQ); + } + } + } + } else { + for (const [, s] of this.Sockets) { + if (!s.Ephemeral) { + q.sendSubQueryToRelay(s, subQ); + } + } + } + } + /** * Send events to writable relays */ @@ -316,20 +323,17 @@ export class NostrSystem { }); } - #changed() { - this.#snapshot = Object.freeze({ + takeSnapshot(): SystemSnapshot { + return { queries: [...this.Queries.values()].map(a => { return { id: a.id, filters: a.filters, closing: a.closing, - subFilters: a.subQueries.map(a => a.filters).flat(), + subFilters: [], }; }), - }); - for (const h of this.#stateHooks) { - h(); - } + }; } #cleanup() { @@ -343,7 +347,7 @@ export class NostrSystem { } } if (changed) { - this.#changed(); + this.notifyChange(); } setTimeout(() => this.#cleanup(), 1_000); } diff --git a/packages/app/src/index.tsx b/packages/app/src/index.tsx index 82fa17a3..863e2bf6 100644 --- a/packages/app/src/index.tsx +++ b/packages/app/src/index.tsx @@ -31,6 +31,7 @@ import NostrLinkHandler from "Pages/NostrLinkHandler"; import Thread from "Element/Thread"; import { SubscribeRoutes } from "Pages/subscribe"; import ZapPoolPage from "Pages/ZapPool"; +import DebugPage from "Pages/Debug"; // @ts-ignore window.__webpack_nonce__ = "ZmlhdGphZiBzYWlkIHNub3J0LnNvY2lhbCBpcyBwcmV0dHkgZ29vZCwgd2UgbWFkZSBpdCE="; @@ -99,6 +100,10 @@ export const router = createBrowserRouter([ ...NewUserRoutes, ...WalletRoutes, ...SubscribeRoutes, + { + path: "/debug", + element: , + }, { path: "/*", element: ,