diff --git a/packages/system/src/query-manager.ts b/packages/system/src/query-manager.ts index df7720c2..b1bad16a 100644 --- a/packages/system/src/query-manager.ts +++ b/packages/system/src/query-manager.ts @@ -1,8 +1,7 @@ import debug from "debug"; import { EventEmitter } from "eventemitter3"; -import { BuiltRawReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from "."; +import { BuiltRawReqFilter, FlatReqFilter, ReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from "."; import { Query, TraceReport } from "./query"; -import { FilterCacheLayer } from "./filter-cache-layer"; import { trimFilters } from "./request-trim"; import { eventMatchesFilter } from "./request-matcher"; @@ -28,11 +27,6 @@ export class QueryManager extends EventEmitter { */ #system: SystemInterface; - /** - * Query cache processing layers which can take data from a cache - */ - #queryCacheLayers: Array = []; - constructor(system: SystemInterface) { super(); this.#system = system; @@ -55,11 +49,10 @@ export class QueryManager extends EventEmitter { } return existing; } else { - const q = new Query(this.#system, req); + const q = new Query(req); q.on("trace", r => this.emit("trace", r)); q.on("request", (id, fx) => { this.#send(q, fx); - this.emit("request", id, fx); }); this.#queries.set(req.id, q); @@ -97,49 +90,79 @@ export class QueryManager extends EventEmitter { } } - async #send(q: Query, qSend: BuiltRawReqFilter) { - for (const qfl of this.#queryCacheLayers) { - qSend = await qfl.processFilter(q, qSend); - } + async #send(q: Query, filters: Array) { + // check for empty filters + filters = trimFilters(filters); // automated outbox model, load relays for queried authors - for (const f of qSend.filters) { + for (const f of filters) { if (f.authors) { this.#system.relayLoader.TrackKeys(f.authors); } } - // check for empty filters - qSend.filters = trimFilters(qSend.filters); - + let syncFrom: Array = []; // fetch results from cache first, flag qSend for sync if (this.#system.cacheRelay) { - const data = await this.#system.cacheRelay.query(["REQ", q.id, ...qSend.filters]); + const data = await this.#system.cacheRelay.query(["REQ", q.id, ...filters]); if (data.length > 0) { - qSend.syncFrom = data as Array; + syncFrom = data.map(a => ({ ...a, relays: [] })); this.#log("Adding from cache: %O", data); - q.feed.add(data.map(a => ({ ...a, relays: [] }))); + q.feed.add(syncFrom); } } // remove satisfied filters - if (qSend.syncFrom && qSend.syncFrom.length > 0) { + if (syncFrom.length > 0) { // only remove the "ids" filters - const newFilters = qSend.filters.filter( - a => !a.ids || (a.ids && !qSend.syncFrom?.some(b => eventMatchesFilter(b, a))), - ); - if (newFilters.length !== qSend.filters.length) { - this.#log("Removing satisfied filters %o %o", newFilters, qSend.filters); - qSend.filters = newFilters; + const newFilters = filters.filter(a => !a.ids || (a.ids && !syncFrom.some(b => eventMatchesFilter(b, a)))); + if (newFilters.length !== filters.length) { + this.#log("Removing satisfied filters %o %o", newFilters, filters); + filters = newFilters; } } // nothing left to send - if (qSend.filters.length === 0) { - this.#log("Dropping %s %o", q.id, qSend); + if (filters.length === 0) { + this.#log("Dropping %s %o", q.id); return; } + if (this.#system.requestRouter) { + filters = this.#system.requestRouter.forAllRequest(filters); + } + const expanded = filters.flatMap(a => this.#system.optimizer.expandFilter(a)); + const qSend = this.#groupFlatByRelay(expanded); + qSend.forEach(a => (a.syncFrom = syncFrom)); + await Promise.all(qSend.map(a => this.#sendToRelays(q, a))); + } + + #groupFlatByRelay(filters: Array) { + const relayMerged = filters.reduce((acc, v) => { + const relay = v.relay ?? ""; + // delete relay from filter + delete v.relay; + const existing = acc.get(relay); + if (existing) { + existing.push(v); + } else { + acc.set(relay, [v]); + } + return acc; + }, new Map>()); + + const ret = []; + for (const [k, v] of relayMerged.entries()) { + const filters = this.#system.optimizer.flatMerge(v); + ret.push({ + relay: k, + filters, + } as BuiltRawReqFilter); + } + return ret; + } + + async #sendToRelays(q: Query, qSend: BuiltRawReqFilter) { if (qSend.relay) { const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true); if (nc) { @@ -168,6 +191,8 @@ export class QueryManager extends EventEmitter { } return ret; } + + this.emit("request", q.id, qSend); } #cleanup() { diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index 273a303a..e9ef7012 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -3,7 +3,7 @@ import debug from "debug"; import { EventEmitter } from "eventemitter3"; import { unixNowMs, unwrap } from "@snort/shared"; -import { ReqFilter, Nips, TaggedNostrEvent, SystemInterface, ParsedFragment, FlatReqFilter } from "."; +import { ReqFilter, Nips, TaggedNostrEvent } from "."; import { NoteCollection } from "./note-collection"; import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; import { eventMatchesFilter } from "./request-matcher"; @@ -96,7 +96,7 @@ export interface TraceReport { export interface QueryEvents { trace: (report: TraceReport) => void; - request: (subId: string, req: BuiltRawReqFilter) => void; + request: (subId: string, req: Array) => void; event: (evs: Array) => void; end: () => void; done: () => void; @@ -113,11 +113,6 @@ export class Query extends EventEmitter { */ requests: Array = []; - /** - * Nostr system interface - */ - #system: SystemInterface; - /** * Which relays this query has already been executed on */ @@ -160,15 +155,9 @@ export class Query extends EventEmitter { #log = debug("Query"); - /** - * Compressed cached trace filters - */ - #cachedFilters?: Array; - - constructor(system: SystemInterface, req: RequestBuilder) { + constructor(req: RequestBuilder) { super(); this.id = req.id; - this.#system = system; this.#feed = new NoteCollection(); this.#leaveOpen = req.options?.leaveOpen ?? false; this.#timeout = req.options?.timeout ?? 5_000; @@ -202,10 +191,7 @@ export class Query extends EventEmitter { * Recompute the complete set of compressed filters from all query traces */ get filters() { - if (this.#system && !this.#cachedFilters) { - this.#cachedFilters = this.#system.optimizer.compress(this.#tracing.flatMap(a => a.filters)); - } - return this.#cachedFilters ?? this.#tracing.flatMap(a => a.filters); + return this.#tracing.flatMap(a => a.filters); } get feed() { @@ -329,37 +315,7 @@ export class Query extends EventEmitter { this.#log("Starting emit of %s", this.id); let rawFilters = [...this.requests]; this.requests = []; - if (this.#system.requestRouter) { - rawFilters = this.#system.requestRouter.forAllRequest(rawFilters); - } - const expanded = rawFilters.flatMap(a => this.#system.optimizer.expandFilter(a)); - const fx = this.#groupFlatByRelay(expanded); - fx.forEach(a => this.emit("request", this.id, a)); - } - - #groupFlatByRelay(filters: Array) { - const relayMerged = filters.reduce((acc, v) => { - const relay = v.relay ?? ""; - // delete relay from filter - delete v.relay; - const existing = acc.get(relay); - if (existing) { - existing.push(v); - } else { - acc.set(relay, [v]); - } - return acc; - }, new Map>()); - - const ret = []; - for (const [k, v] of relayMerged.entries()) { - const filters = this.#system.optimizer.flatMerge(v); - ret.push({ - relay: k, - filters, - } as BuiltRawReqFilter); - } - return ret; + this.emit("request", this.id, rawFilters); } #stopCheckTraces() { @@ -444,7 +400,6 @@ export class Query extends EventEmitter { c.off("closed", eoseHandler); }); this.#tracing.push(qt); - this.#cachedFilters = undefined; if (q.syncFrom !== undefined) { c.request(["SYNC", qt.id, q.syncFrom, ...qt.filters], () => qt.sentToRelay());