diff --git a/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx b/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx index 8b43b1bd..d8b7c20e 100644 --- a/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx +++ b/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx @@ -21,7 +21,7 @@ export default function NoteFooter(props: NoteFooterProps) { const link = useMemo(() => NostrLink.fromEvent(ev), [ev.id]); const [showReactions, setShowReactions] = useState(false); - const related = useReactions(`reactions:${link.tagKey}`, link); + const related = useReactions("reactions", link); const { replies, reactions, zaps, reposts } = useEventReactions(link, related); const { positive } = reactions; diff --git a/packages/app/src/Components/Event/Note/ReactionsModal.tsx b/packages/app/src/Components/Event/Note/ReactionsModal.tsx index d5caf94e..a7936ae5 100644 --- a/packages/app/src/Components/Event/Note/ReactionsModal.tsx +++ b/packages/app/src/Components/Event/Note/ReactionsModal.tsx @@ -25,7 +25,7 @@ const ReactionsModal = ({ onClose, event, initialTab = 0 }: ReactionsModalProps) const link = NostrLink.fromEvent(event); - const related = useReactions(`reactions:${link.tagKey}`, link, undefined, false); + const related = useReactions("reactions", link, undefined, false); const { reactions, zaps, reposts } = useEventReactions(link, related); const { positive, negative } = reactions; diff --git a/packages/system/src/outbox/relay-loader.ts b/packages/system/src/outbox/relay-loader.ts index 5711a717..0bb3c330 100644 --- a/packages/system/src/outbox/relay-loader.ts +++ b/packages/system/src/outbox/relay-loader.ts @@ -27,7 +27,6 @@ export class RelayMetadataLoader extends BackgroundLoader { protected override buildSub(missing: string[]): RequestBuilder { const rb = new RequestBuilder("relay-loader"); rb.withOptions({ - skipDiff: true, timeout: 10000, outboxPickN: 4, }); diff --git a/packages/system/src/query-manager.ts b/packages/system/src/query-manager.ts index 3e1e29bf..df7720c2 100644 --- a/packages/system/src/query-manager.ts +++ b/packages/system/src/query-manager.ts @@ -76,7 +76,7 @@ export class QueryManager extends EventEmitter { * Async fetch results */ async fetch(req: RequestBuilder, cb?: (evs: Array) => void) { - const filters = req.buildRaw(this.#system); + const filters = req.buildRaw(); const q = this.query(req); if (cb) { q.on("event", cb); @@ -101,15 +101,6 @@ export class QueryManager extends EventEmitter { for (const qfl of this.#queryCacheLayers) { qSend = await qfl.processFilter(q, qSend); } - if (this.#system.cacheRelay) { - // fetch results from cache first, flag qSend for sync - const data = await this.#system.cacheRelay.query(["REQ", q.id, ...qSend.filters]); - if (data.length > 0) { - qSend.syncFrom = data as Array; - this.#log("Adding from cache: %O", data); - q.feed.add(data.map(a => ({ ...a, relays: [] }))); - } - } // automated outbox model, load relays for queried authors for (const f of qSend.filters) { @@ -119,19 +110,42 @@ export class QueryManager extends EventEmitter { } // check for empty filters - const fNew = trimFilters(qSend.filters); - if (fNew.length === 0) { + qSend.filters = trimFilters(qSend.filters); + + // fetch results from cache first, flag qSend for sync + if (this.#system.cacheRelay) { + const data = await this.#system.cacheRelay.query(["REQ", q.id, ...qSend.filters]); + if (data.length > 0) { + qSend.syncFrom = data as Array; + this.#log("Adding from cache: %O", data); + q.feed.add(data.map(a => ({ ...a, relays: [] }))); + } + } + + // remove satisfied filters + if (qSend.syncFrom && qSend.syncFrom.length > 0) { + // only remove the "ids" filters + const newFilters = qSend.filters.filter( + a => !a.ids || (a.ids && !qSend.syncFrom?.some(b => eventMatchesFilter(b, a))), + ); + if (newFilters.length !== qSend.filters.length) { + this.#log("Removing satisfied filters %o %o", newFilters, qSend.filters); + qSend.filters = newFilters; + } + } + + // nothing left to send + if (qSend.filters.length === 0) { this.#log("Dropping %s %o", q.id, qSend); return; } - qSend.filters = fNew; if (qSend.relay) { - this.#log("Sending query to %s %s %O", qSend.relay, q.id, qSend); const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true); if (nc) { const qt = q.sendToRelay(nc, qSend); if (qt) { + this.#log("Sent query %s to %s %s %O", qt.id, qSend.relay, q.id, qSend); return [qt]; } else { this.#log("Query not sent to %s: %O", qSend.relay, qSend); @@ -143,9 +157,9 @@ export class QueryManager extends EventEmitter { const ret = []; for (const [a, s] of this.#system.pool) { if (!s.ephemeral) { - this.#log("Sending query to %s %s %O", a, q.id, qSend); const qt = q.sendToRelay(s, qSend); if (qt) { + this.#log("Sent query %s to %s %s %O", qt.id, qSend.relay, q.id, qSend); ret.push(qt); } else { this.#log("Query not sent to %s: %O", a, qSend); diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index 946a1b8e..273a303a 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -3,11 +3,10 @@ import debug from "debug"; import { EventEmitter } from "eventemitter3"; import { unixNowMs, unwrap } from "@snort/shared"; -import { ReqFilter, Nips, TaggedNostrEvent, SystemInterface, ParsedFragment } from "."; +import { ReqFilter, Nips, TaggedNostrEvent, SystemInterface, ParsedFragment, FlatReqFilter } from "."; import { NoteCollection } from "./note-collection"; import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; import { eventMatchesFilter } from "./request-matcher"; -import { LRUCache } from "lru-cache"; import { ConnectionType } from "./connection-pool"; interface QueryTraceEvents { @@ -103,23 +102,16 @@ export interface QueryEvents { done: () => void; } -const QueryCache = new LRUCache>({ - ttl: 60_000 * 3, - ttlAutopurge: true, -}); - /** * Active or queued query on the system */ export class Query extends EventEmitter { - get id() { - return this.request.id; - } + id: string; /** * RequestBuilder instance */ - request: RequestBuilder; + requests: Array = []; /** * Nostr system interface @@ -175,7 +167,7 @@ export class Query extends EventEmitter { constructor(system: SystemInterface, req: RequestBuilder) { super(); - this.request = req; + this.id = req.id; this.#system = system; this.#feed = new NoteCollection(); this.#leaveOpen = req.options?.leaveOpen ?? false; @@ -183,11 +175,7 @@ export class Query extends EventEmitter { this.#groupingDelay = req.options?.groupingDelay ?? 100; this.#checkTraces(); - const cached = QueryCache.get(this.request.id); - if (cached) { - this.#log("Restored %o for %s", cached, this.request.id); - this.feed.add(cached); - } + this.requests.push(...req.buildRaw()); this.feed.on("event", evs => this.emit("event", evs)); this.#start(); } @@ -196,12 +184,8 @@ export class Query extends EventEmitter { * Adds another request to this one */ addRequest(req: RequestBuilder) { - if (req.instance === this.request.instance) { - // same requst, do nothing - return; - } this.#log("Add query %O to %s", req, this.id); - this.request.add(req); + this.requests.push(...req.buildRaw()); this.#start(); return true; } @@ -263,8 +247,6 @@ export class Query extends EventEmitter { } this.#stopCheckTraces(); this.emit("end"); - QueryCache.set(this.request.id, this.feed.snapshot); - this.#log("Saved %O for %s", this.feed.snapshot, this.request.id); } /** @@ -345,16 +327,39 @@ export class Query extends EventEmitter { async #emitFilters() { this.#log("Starting emit of %s", this.id); - const existing = this.filters; - if (!(this.request.options?.skipDiff ?? false) && existing.length > 0) { - const filters = this.request.buildDiff(this.#system, existing); - this.#log("Build %s %O", this.id, filters); - filters.forEach(f => this.emit("request", this.id, f)); - } else { - const filters = this.request.build(this.#system); - this.#log("Build %s %O", this.id, filters); - filters.forEach(f => this.emit("request", this.id, f)); + let rawFilters = [...this.requests]; + this.requests = []; + if (this.#system.requestRouter) { + rawFilters = this.#system.requestRouter.forAllRequest(rawFilters); } + const expanded = rawFilters.flatMap(a => this.#system.optimizer.expandFilter(a)); + const fx = this.#groupFlatByRelay(expanded); + fx.forEach(a => this.emit("request", this.id, a)); + } + + #groupFlatByRelay(filters: Array) { + const relayMerged = filters.reduce((acc, v) => { + const relay = v.relay ?? ""; + // delete relay from filter + delete v.relay; + const existing = acc.get(relay); + if (existing) { + existing.push(v); + } else { + acc.set(relay, [v]); + } + return acc; + }, new Map>()); + + const ret = []; + for (const [k, v] of relayMerged.entries()) { + const filters = this.#system.optimizer.flatMerge(v); + ret.push({ + relay: k, + filters, + } as BuiltRawReqFilter); + } + return ret; } #stopCheckTraces() { @@ -419,7 +424,7 @@ export class Query extends EventEmitter { } }); const eventHandler = (sub: string, ev: TaggedNostrEvent) => { - if ((this.request.options?.fillStore ?? true) && qt.id === sub) { + if (qt.id === sub) { if (qt.filters.some(v => eventMatchesFilter(ev, v))) { this.feed.add(ev); } else { diff --git a/packages/system/src/request-builder.ts b/packages/system/src/request-builder.ts index 7f0342fd..f7921bfb 100644 --- a/packages/system/src/request-builder.ts +++ b/packages/system/src/request-builder.ts @@ -1,9 +1,8 @@ -import debug from "debug"; import { v4 as uuid } from "uuid"; -import { appendDedupe, dedupe, removeUndefined, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared"; +import { appendDedupe, dedupe, removeUndefined, sanitizeRelayUrl, unwrap } from "@snort/shared"; import EventKind from "./event-kind"; -import { FlatReqFilter, NostrLink, NostrPrefix, SystemInterface, ToNostrEventTag } from "."; +import { NostrLink, NostrPrefix, ToNostrEventTag } from "."; import { ReqFilter, u256, HexKey, TaggedNostrEvent } from "./nostr"; import { RequestRouter } from "./request-router"; @@ -23,11 +22,6 @@ export interface RequestBuilderOptions { */ leaveOpen?: boolean; - /** - * Do not apply diff logic and always use full filters for query - */ - skipDiff?: boolean; - /** * Pick N relays per pubkey when using outbox strategy */ @@ -42,12 +36,6 @@ export interface RequestBuilderOptions { * How many milli-seconds to wait to allow grouping */ groupingDelay?: number; - - /** - * If events should be added automatically to the internal NoteCollection - * default=true - */ - fillStore?: boolean; } /** @@ -58,8 +46,6 @@ export class RequestBuilder { instance: string; #builders: Array; #options?: RequestBuilderOptions; - #log = debug("RequestBuilder"); - #rawCached?: Array; constructor(id: string) { this.instance = uuid(); @@ -84,20 +70,17 @@ export class RequestBuilder { */ add(other: RequestBuilder) { this.#builders.push(...other.#builders); - this.#rawCached = undefined; } withFilter() { const ret = new RequestFilterBuilder(); this.#builders.push(ret); - this.#rawCached = undefined; return ret; } withBareFilter(f: ReqFilter) { const ret = new RequestFilterBuilder(f); this.#builders.push(ret); - this.#rawCached = undefined; return ret; } @@ -109,67 +92,8 @@ export class RequestBuilder { return this; } - buildRaw(system?: SystemInterface): Array { - if (!this.#rawCached && system) { - this.#rawCached = system.optimizer.compress(this.#builders.map(f => f.filter)); - } - return this.#rawCached ?? this.#builders.map(f => f.filter); - } - - build(system: SystemInterface): Array { - let rawFilters = this.buildRaw(system); - if (system.requestRouter) { - rawFilters = system.requestRouter.forAllRequest(rawFilters); - } - const expanded = rawFilters.flatMap(a => system.optimizer.expandFilter(a)); - return this.#groupFlatByRelay(system, expanded); - } - - /** - * Detects a change in request from a previous set of filters - */ - buildDiff(system: SystemInterface, prev: Array): Array { - const start = unixNowMs(); - - let rawFilters = this.buildRaw(system); - if (system.requestRouter) { - rawFilters = system.requestRouter.forAllRequest(rawFilters); - } - const diff = system.optimizer.getDiff(prev, rawFilters); - if (diff.length > 0) { - const ret = this.#groupFlatByRelay(system, diff); - const ts = unixNowMs() - start; - if (ts >= 100) { - this.#log("slow diff %s %d ms, consider separate query ids, or use skipDiff: %O", this.id, ts, prev); - } - return ret; - } - return []; - } - - #groupFlatByRelay(system: SystemInterface, filters: Array) { - const relayMerged = filters.reduce((acc, v) => { - const relay = v.relay ?? ""; - // delete relay from filter - delete v.relay; - const existing = acc.get(relay); - if (existing) { - existing.push(v); - } else { - acc.set(relay, [v]); - } - return acc; - }, new Map>()); - - const ret = []; - for (const [k, v] of relayMerged.entries()) { - const filters = system.optimizer.flatMerge(v); - ret.push({ - relay: k, - filters, - } as BuiltRawReqFilter); - } - return ret; + buildRaw(): Array { + return this.#builders.map(f => f.filter); } }