From 88924941a58f01f1f3067b76d59fc52e859eb7b0 Mon Sep 17 00:00:00 2001 From: Kieran Date: Mon, 8 Jan 2024 13:55:59 +0000 Subject: [PATCH] feat: NostrQueryManager --- packages/system/src/connection.ts | 1 + packages/system/src/filter-cache-layer.ts | 39 ++++++ packages/system/src/nostr-query-manager.ts | 125 +++++++++++++++++ packages/system/src/nostr-system.ts | 153 +++++---------------- packages/system/src/query.ts | 1 + 5 files changed, 201 insertions(+), 118 deletions(-) create mode 100644 packages/system/src/filter-cache-layer.ts create mode 100644 packages/system/src/nostr-query-manager.ts diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 2dfb1636..e9f2c1d4 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -256,6 +256,7 @@ export class Connection extends EventEmitter { case "CLOSED": { this.emit("closed", msg[1] as string, msg[2] as string); this.#log(`CLOSED: ${msg.slice(1)}`); + break; } default: { this.#log(`Unknown tag: ${tag}`); diff --git a/packages/system/src/filter-cache-layer.ts b/packages/system/src/filter-cache-layer.ts new file mode 100644 index 00000000..32a8e96b --- /dev/null +++ b/packages/system/src/filter-cache-layer.ts @@ -0,0 +1,39 @@ +import { BuiltRawReqFilter, RequestStrategy } from "./request-builder"; +import { NostrEvent, TaggedNostrEvent } from "./nostr"; +import { Query } from "./query"; + +export interface EventCache { + bulkGet: (ids: Array) => Promise>; +} + +export interface FilterCacheLayer { + processFilter(q: Query, req: BuiltRawReqFilter): Promise; +} + +export class IdsFilterCacheLayer implements FilterCacheLayer { + constructor(readonly cache: EventCache) {} + + async processFilter(q: Query, req: BuiltRawReqFilter) { + for (const f of req.filters) { + if (f.ids) { + const cacheResults = await this.cache.bulkGet(f.ids); + if (cacheResults.length > 0) { + const resultIds = new Set(cacheResults.map(a => a.id)); + f.ids = f.ids.filter(a => !resultIds.has(a)); + + // this step is important for buildDiff, if a filter doesnt exist with the ids which are from cache + // we will create an infinite loop where every render we insert a new query for the ids which are missing + q.insertCompletedTrace( + { + filters: [{ ...f, ids: [...resultIds] }], + strategy: RequestStrategy.ExplicitRelays, + relay: req.relay, + }, + cacheResults as Array, + ); + } + } + } + return req; + } +} diff --git a/packages/system/src/nostr-query-manager.ts b/packages/system/src/nostr-query-manager.ts new file mode 100644 index 00000000..86f0cda3 --- /dev/null +++ b/packages/system/src/nostr-query-manager.ts @@ -0,0 +1,125 @@ +import debug from "debug"; +import EventEmitter from "eventemitter3"; +import { BuiltRawReqFilter, NoteCollection, NoteStore, RequestBuilder, SystemInterface, TaggedNostrEvent } from "."; +import { Query, TraceReport } from "./query"; +import { unwrap } from "@snort/shared"; + +interface NostrQueryManagerEvents { + change: () => void; + trace: (report: TraceReport) => void; + sendQuery: (q: Query, filter: BuiltRawReqFilter) => void; +} + +export class NostrQueryManager extends EventEmitter { + #log = debug("NostrQueryManager"); + + /** + * All active queries + */ + #queries: Map = new Map(); + + /** + * System interface handle + */ + #system: SystemInterface; + + constructor(system: SystemInterface) { + super(); + this.#system = system; + + setInterval(() => this.#cleanup(), 1_000); + } + + get(id: string) { + return this.#queries.get(id); + } + + /** + * Compute query to send to relays + */ + query(type: { new (): T }, req: RequestBuilder): Query { + const existing = this.#queries.get(req.id); + if (existing) { + // if same instance, just return query + if (existing.fromInstance === req.instance) { + return existing; + } + const filters = !req.options?.skipDiff ? req.buildDiff(this.#system, existing.filters) : req.build(this.#system); + if (filters.length === 0 && !!req.options?.skipDiff) { + return existing; + } else { + for (const subQ of filters) { + this.emit("sendQuery", existing, subQ); + } + this.emit("change"); + return existing; + } + } else { + const store = new type(); + + const filters = req.build(this.#system); + const q = new Query(req.id, req.instance, store, req.options?.leaveOpen, req.options?.timeout); + q.on("trace", r => this.emit("trace", r)); + + this.#queries.set(req.id, q); + for (const subQ of filters) { + this.emit("sendQuery", q, subQ); + } + this.emit("change"); + return q; + } + } + + /** + * Async fetch results + */ + fetch(req: RequestBuilder, cb?: (evs: Array) => void) { + const q = this.query(NoteCollection, req); + return new Promise>(resolve => { + let t: ReturnType | undefined; + let tBuf: Array = []; + const releaseOnEvent = cb + ? q.feed.onEvent(evs => { + if (!t) { + tBuf = [...evs]; + t = setTimeout(() => { + t = undefined; + cb(tBuf); + }, 100); + } else { + tBuf.push(...evs); + } + }) + : undefined; + const releaseFeedHook = q.feed.hook(() => { + if (q.progress === 1) { + releaseOnEvent?.(); + releaseFeedHook(); + q.cancel(); + resolve(unwrap((q.feed as NoteCollection).snapshot.data)); + } + }); + }); + } + + *[Symbol.iterator]() { + for (const kv of this.#queries) { + yield kv; + } + } + + #cleanup() { + let changed = false; + for (const [k, v] of this.#queries) { + if (v.canRemove()) { + v.sendClose(); + this.#queries.delete(k); + this.#log("Deleted query %s", k); + changed = true; + } + } + if (changed) { + this.emit("change"); + } + } +} diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index add130a6..7e5272ef 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -1,12 +1,12 @@ import debug from "debug"; import EventEmitter from "eventemitter3"; -import { unwrap, FeedCache } from "@snort/shared"; +import { FeedCache } from "@snort/shared"; import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr"; import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; import { Query } from "./query"; -import { NoteCollection, NoteStore } from "./note-collection"; -import { BuiltRawReqFilter, RequestBuilder, RequestStrategy } from "./request-builder"; +import { NoteStore } from "./note-collection"; +import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; import { RelayMetricHandler } from "./relay-metric-handler"; import { MetadataCache, @@ -26,6 +26,8 @@ import { RelayCache, RelayMetadataLoader } from "./outbox-model"; import { Optimizer, DefaultOptimizer } from "./query-optimizer"; import { trimFilters } from "./request-trim"; import { NostrConnectionPool } from "./nostr-connection-pool"; +import { NostrQueryManager } from "./nostr-query-manager"; +import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer"; export interface NostrSystemEvents { change: (state: SystemSnapshot) => void; @@ -50,11 +52,7 @@ export interface NostrsystemProps { export class NostrSystem extends EventEmitter implements SystemInterface { #log = debug("System"); #pool = new NostrConnectionPool(); - - /** - * All active queries - */ - Queries: Map = new Map(); + #queryManager: NostrQueryManager; /** * Storage class for user relay lists @@ -98,6 +96,11 @@ export class NostrSystem extends EventEmitter implements Syst #relayLoader: RelayMetadataLoader; + /** + * Query cache processing layers which can take data from a cache + */ + #queryCacheLayers: Array = []; + constructor(props: NostrsystemProps) { super(); this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); @@ -110,7 +113,9 @@ export class NostrSystem extends EventEmitter implements Syst this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache); this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache); this.checkSigs = props.checkSigs ?? true; - this.#cleanup(); + + this.#queryManager = new NostrQueryManager(this); + this.#queryCacheLayers.push(new IdsFilterCacheLayer(this.#eventsCache)); // hook connection pool this.#pool.on("connected", (id, wasReconnect) => { @@ -118,7 +123,7 @@ export class NostrSystem extends EventEmitter implements Syst if (c) { this.#relayMetrics.onConnect(c.Address); if (wasReconnect) { - for (const [, q] of this.Queries) { + for (const [, q] of this.#queryManager) { q.connectionRestored(c); } } @@ -147,7 +152,7 @@ export class NostrSystem extends EventEmitter implements Syst const c = this.#pool.getConnection(id); if (c) { this.#relayMetrics.onDisconnect(c.Address, code); - for (const [, q] of this.Queries) { + for (const [, q] of this.#queryManager) { q.connectionLost(c.Id); } } @@ -155,7 +160,7 @@ export class NostrSystem extends EventEmitter implements Syst this.#pool.on("eose", (id, sub) => { const c = this.#pool.getConnection(id); if (c) { - for (const [, v] of this.Queries) { + for (const [, v] of this.#queryManager) { v.eose(sub, c); } } @@ -164,11 +169,19 @@ export class NostrSystem extends EventEmitter implements Syst this.#pool.on("notice", (addr, msg) => { this.#log("NOTICE: %s %s", addr, msg); }); + this.#queryManager.on("change", () => this.emit("change", this.takeSnapshot())); + this.#queryManager.on("sendQuery", (q, f) => this.#sendQuery(q, f)); + this.#queryManager.on("trace", t => { + this.#relayMetrics.onTraceReport(t); + }); // internal handler for on-event this.on("event", (sub, ev) => { - for (const [, v] of this.Queries) { - v.handleEvent(sub, ev); + for (const [, v] of this.#queryManager) { + const trace = v.handleEvent(sub, ev); + if (trace && trace.filters.some(a => a.ids)) { + this.#eventsCache.set(ev); + } } }); } @@ -212,98 +225,22 @@ export class NostrSystem extends EventEmitter implements Syst } GetQuery(id: string): Query | undefined { - return this.Queries.get(id); + return this.#queryManager.get(id); } Fetch(req: RequestBuilder, cb?: (evs: Array) => void) { - const q = this.Query(NoteCollection, req); - return new Promise>(resolve => { - let t: ReturnType | undefined; - let tBuf: Array = []; - const releaseOnEvent = cb - ? q.feed.onEvent(evs => { - if (!t) { - tBuf = [...evs]; - t = setTimeout(() => { - t = undefined; - cb(tBuf); - }, 100); - } else { - tBuf.push(...evs); - } - }) - : undefined; - const releaseFeedHook = q.feed.hook(() => { - if (q.progress === 1) { - releaseOnEvent?.(); - releaseFeedHook(); - q.cancel(); - resolve(unwrap((q.feed as NoteCollection).snapshot.data)); - } - }); - }); + return this.#queryManager.fetch(req, cb); } Query(type: { new (): T }, req: RequestBuilder): Query { - const existing = this.Queries.get(req.id); - if (existing) { - // if same instance, just return query - if (existing.fromInstance === req.instance) { - return existing; - } - const filters = !req.options?.skipDiff ? req.buildDiff(this, existing.filters) : req.build(this); - if (filters.length === 0 && !!req.options?.skipDiff) { - return existing; - } else { - for (const subQ of filters) { - this.SendQuery(existing, subQ); - } - this.notifyChange(); - return existing; - } - } else { - const store = new type(); - - const filters = req.build(this); - const q = new Query(req.id, req.instance, store, req.options?.leaveOpen, req.options?.timeout); - q.on("trace", r => this.#relayMetrics.onTraceReport(r)); - - if (filters.some(a => a.filters.some(b => b.ids))) { - const expectIds = new Set(filters.flatMap(a => a.filters).flatMap(a => a.ids ?? [])); - q.feed.onEvent(async evs => { - const toSet = evs.filter(a => expectIds.has(a.id) && this.#eventsCache.getFromCache(a.id) === undefined); - if (toSet.length > 0) { - await this.#eventsCache.bulkSet(toSet); - } - }); - } - this.Queries.set(req.id, q); - for (const subQ of filters) { - this.SendQuery(q, subQ); - } - this.notifyChange(); - return q; - } + return this.#queryManager.query(type, req); } - async SendQuery(q: Query, qSend: BuiltRawReqFilter) { - // trim query of cached ids + async #sendQuery(q: Query, qSend: BuiltRawReqFilter) { + for (const qfl of this.#queryCacheLayers) { + qSend = await qfl.processFilter(q, qSend); + } for (const f of qSend.filters) { - if (f.ids) { - const cacheResults = await this.#eventsCache.bulkGet(f.ids); - if (cacheResults.length > 0) { - const resultIds = new Set(cacheResults.map(a => a.id)); - f.ids = f.ids.filter(a => !resultIds.has(a)); - q.insertCompletedTrace( - { - filters: [{ ...f, ids: [...resultIds] }], - strategy: RequestStrategy.ExplicitRelays, - relay: qSend.relay, - }, - cacheResults as Array, - ); - } - } if (f.authors) { this.#relayLoader.TrackKeys(f.authors); } @@ -366,7 +303,7 @@ export class NostrSystem extends EventEmitter implements Syst takeSnapshot(): SystemSnapshot { return { - queries: [...this.Queries.values()].map(a => { + queries: [...this.#queryManager].map(([, a]) => { return { id: a.id, filters: a.filters, @@ -375,24 +312,4 @@ export class NostrSystem extends EventEmitter implements Syst }), }; } - - notifyChange() { - this.emit("change", this.takeSnapshot()); - } - - #cleanup() { - let changed = false; - for (const [k, v] of this.Queries) { - if (v.canRemove()) { - v.sendClose(); - this.Queries.delete(k); - this.#log("Deleted query %s", k); - changed = true; - } - } - if (changed) { - this.notifyChange(); - } - setTimeout(() => this.#cleanup(), 1_000); - } } diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index 86bee211..fee646fa 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -198,6 +198,7 @@ export class Query extends EventEmitter implements QueryBase { if (t.id === sub || sub === "*") { if (t.filters.some(v => eventMatchesFilter(e, v))) { this.feed.add(e); + return t; } else { this.#log("Event did not match filter, rejecting %O %O", e, t); }