refactor: query cache relay once per query emit
This commit is contained in:
parent
4a48f4f340
commit
64ad548d75
@ -1,8 +1,7 @@
|
|||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
import { EventEmitter } from "eventemitter3";
|
import { EventEmitter } from "eventemitter3";
|
||||||
import { BuiltRawReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".";
|
import { BuiltRawReqFilter, FlatReqFilter, ReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".";
|
||||||
import { Query, TraceReport } from "./query";
|
import { Query, TraceReport } from "./query";
|
||||||
import { FilterCacheLayer } from "./filter-cache-layer";
|
|
||||||
import { trimFilters } from "./request-trim";
|
import { trimFilters } from "./request-trim";
|
||||||
import { eventMatchesFilter } from "./request-matcher";
|
import { eventMatchesFilter } from "./request-matcher";
|
||||||
|
|
||||||
@ -28,11 +27,6 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
*/
|
*/
|
||||||
#system: SystemInterface;
|
#system: SystemInterface;
|
||||||
|
|
||||||
/**
|
|
||||||
* Query cache processing layers which can take data from a cache
|
|
||||||
*/
|
|
||||||
#queryCacheLayers: Array<FilterCacheLayer> = [];
|
|
||||||
|
|
||||||
constructor(system: SystemInterface) {
|
constructor(system: SystemInterface) {
|
||||||
super();
|
super();
|
||||||
this.#system = system;
|
this.#system = system;
|
||||||
@ -55,11 +49,10 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
}
|
}
|
||||||
return existing;
|
return existing;
|
||||||
} else {
|
} else {
|
||||||
const q = new Query(this.#system, req);
|
const q = new Query(req);
|
||||||
q.on("trace", r => this.emit("trace", r));
|
q.on("trace", r => this.emit("trace", r));
|
||||||
q.on("request", (id, fx) => {
|
q.on("request", (id, fx) => {
|
||||||
this.#send(q, fx);
|
this.#send(q, fx);
|
||||||
this.emit("request", id, fx);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
this.#queries.set(req.id, q);
|
this.#queries.set(req.id, q);
|
||||||
@ -97,49 +90,79 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async #send(q: Query, qSend: BuiltRawReqFilter) {
|
async #send(q: Query, filters: Array<ReqFilter>) {
|
||||||
for (const qfl of this.#queryCacheLayers) {
|
// check for empty filters
|
||||||
qSend = await qfl.processFilter(q, qSend);
|
filters = trimFilters(filters);
|
||||||
}
|
|
||||||
|
|
||||||
// automated outbox model, load relays for queried authors
|
// automated outbox model, load relays for queried authors
|
||||||
for (const f of qSend.filters) {
|
for (const f of filters) {
|
||||||
if (f.authors) {
|
if (f.authors) {
|
||||||
this.#system.relayLoader.TrackKeys(f.authors);
|
this.#system.relayLoader.TrackKeys(f.authors);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for empty filters
|
let syncFrom: Array<TaggedNostrEvent> = [];
|
||||||
qSend.filters = trimFilters(qSend.filters);
|
|
||||||
|
|
||||||
// fetch results from cache first, flag qSend for sync
|
// fetch results from cache first, flag qSend for sync
|
||||||
if (this.#system.cacheRelay) {
|
if (this.#system.cacheRelay) {
|
||||||
const data = await this.#system.cacheRelay.query(["REQ", q.id, ...qSend.filters]);
|
const data = await this.#system.cacheRelay.query(["REQ", q.id, ...filters]);
|
||||||
if (data.length > 0) {
|
if (data.length > 0) {
|
||||||
qSend.syncFrom = data as Array<TaggedNostrEvent>;
|
syncFrom = data.map(a => ({ ...a, relays: [] }));
|
||||||
this.#log("Adding from cache: %O", data);
|
this.#log("Adding from cache: %O", data);
|
||||||
q.feed.add(data.map(a => ({ ...a, relays: [] })));
|
q.feed.add(syncFrom);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove satisfied filters
|
// remove satisfied filters
|
||||||
if (qSend.syncFrom && qSend.syncFrom.length > 0) {
|
if (syncFrom.length > 0) {
|
||||||
// only remove the "ids" filters
|
// only remove the "ids" filters
|
||||||
const newFilters = qSend.filters.filter(
|
const newFilters = filters.filter(a => !a.ids || (a.ids && !syncFrom.some(b => eventMatchesFilter(b, a))));
|
||||||
a => !a.ids || (a.ids && !qSend.syncFrom?.some(b => eventMatchesFilter(b, a))),
|
if (newFilters.length !== filters.length) {
|
||||||
);
|
this.#log("Removing satisfied filters %o %o", newFilters, filters);
|
||||||
if (newFilters.length !== qSend.filters.length) {
|
filters = newFilters;
|
||||||
this.#log("Removing satisfied filters %o %o", newFilters, qSend.filters);
|
|
||||||
qSend.filters = newFilters;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// nothing left to send
|
// nothing left to send
|
||||||
if (qSend.filters.length === 0) {
|
if (filters.length === 0) {
|
||||||
this.#log("Dropping %s %o", q.id, qSend);
|
this.#log("Dropping %s %o", q.id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.#system.requestRouter) {
|
||||||
|
filters = this.#system.requestRouter.forAllRequest(filters);
|
||||||
|
}
|
||||||
|
const expanded = filters.flatMap(a => this.#system.optimizer.expandFilter(a));
|
||||||
|
const qSend = this.#groupFlatByRelay(expanded);
|
||||||
|
qSend.forEach(a => (a.syncFrom = syncFrom));
|
||||||
|
await Promise.all(qSend.map(a => this.#sendToRelays(q, a)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#groupFlatByRelay(filters: Array<FlatReqFilter>) {
|
||||||
|
const relayMerged = filters.reduce((acc, v) => {
|
||||||
|
const relay = v.relay ?? "";
|
||||||
|
// delete relay from filter
|
||||||
|
delete v.relay;
|
||||||
|
const existing = acc.get(relay);
|
||||||
|
if (existing) {
|
||||||
|
existing.push(v);
|
||||||
|
} else {
|
||||||
|
acc.set(relay, [v]);
|
||||||
|
}
|
||||||
|
return acc;
|
||||||
|
}, new Map<string, Array<FlatReqFilter>>());
|
||||||
|
|
||||||
|
const ret = [];
|
||||||
|
for (const [k, v] of relayMerged.entries()) {
|
||||||
|
const filters = this.#system.optimizer.flatMerge(v);
|
||||||
|
ret.push({
|
||||||
|
relay: k,
|
||||||
|
filters,
|
||||||
|
} as BuiltRawReqFilter);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
async #sendToRelays(q: Query, qSend: BuiltRawReqFilter) {
|
||||||
if (qSend.relay) {
|
if (qSend.relay) {
|
||||||
const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true);
|
const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true);
|
||||||
if (nc) {
|
if (nc) {
|
||||||
@ -168,6 +191,8 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.emit("request", q.id, qSend);
|
||||||
}
|
}
|
||||||
|
|
||||||
#cleanup() {
|
#cleanup() {
|
||||||
|
@ -3,7 +3,7 @@ import debug from "debug";
|
|||||||
import { EventEmitter } from "eventemitter3";
|
import { EventEmitter } from "eventemitter3";
|
||||||
import { unixNowMs, unwrap } from "@snort/shared";
|
import { unixNowMs, unwrap } from "@snort/shared";
|
||||||
|
|
||||||
import { ReqFilter, Nips, TaggedNostrEvent, SystemInterface, ParsedFragment, FlatReqFilter } from ".";
|
import { ReqFilter, Nips, TaggedNostrEvent } from ".";
|
||||||
import { NoteCollection } from "./note-collection";
|
import { NoteCollection } from "./note-collection";
|
||||||
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
|
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
|
||||||
import { eventMatchesFilter } from "./request-matcher";
|
import { eventMatchesFilter } from "./request-matcher";
|
||||||
@ -96,7 +96,7 @@ export interface TraceReport {
|
|||||||
|
|
||||||
export interface QueryEvents {
|
export interface QueryEvents {
|
||||||
trace: (report: TraceReport) => void;
|
trace: (report: TraceReport) => void;
|
||||||
request: (subId: string, req: BuiltRawReqFilter) => void;
|
request: (subId: string, req: Array<ReqFilter>) => void;
|
||||||
event: (evs: Array<TaggedNostrEvent>) => void;
|
event: (evs: Array<TaggedNostrEvent>) => void;
|
||||||
end: () => void;
|
end: () => void;
|
||||||
done: () => void;
|
done: () => void;
|
||||||
@ -113,11 +113,6 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
*/
|
*/
|
||||||
requests: Array<ReqFilter> = [];
|
requests: Array<ReqFilter> = [];
|
||||||
|
|
||||||
/**
|
|
||||||
* Nostr system interface
|
|
||||||
*/
|
|
||||||
#system: SystemInterface;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Which relays this query has already been executed on
|
* Which relays this query has already been executed on
|
||||||
*/
|
*/
|
||||||
@ -160,15 +155,9 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
|
|
||||||
#log = debug("Query");
|
#log = debug("Query");
|
||||||
|
|
||||||
/**
|
constructor(req: RequestBuilder) {
|
||||||
* Compressed cached trace filters
|
|
||||||
*/
|
|
||||||
#cachedFilters?: Array<ReqFilter>;
|
|
||||||
|
|
||||||
constructor(system: SystemInterface, req: RequestBuilder) {
|
|
||||||
super();
|
super();
|
||||||
this.id = req.id;
|
this.id = req.id;
|
||||||
this.#system = system;
|
|
||||||
this.#feed = new NoteCollection();
|
this.#feed = new NoteCollection();
|
||||||
this.#leaveOpen = req.options?.leaveOpen ?? false;
|
this.#leaveOpen = req.options?.leaveOpen ?? false;
|
||||||
this.#timeout = req.options?.timeout ?? 5_000;
|
this.#timeout = req.options?.timeout ?? 5_000;
|
||||||
@ -202,10 +191,7 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
* Recompute the complete set of compressed filters from all query traces
|
* Recompute the complete set of compressed filters from all query traces
|
||||||
*/
|
*/
|
||||||
get filters() {
|
get filters() {
|
||||||
if (this.#system && !this.#cachedFilters) {
|
return this.#tracing.flatMap(a => a.filters);
|
||||||
this.#cachedFilters = this.#system.optimizer.compress(this.#tracing.flatMap(a => a.filters));
|
|
||||||
}
|
|
||||||
return this.#cachedFilters ?? this.#tracing.flatMap(a => a.filters);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get feed() {
|
get feed() {
|
||||||
@ -329,37 +315,7 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
this.#log("Starting emit of %s", this.id);
|
this.#log("Starting emit of %s", this.id);
|
||||||
let rawFilters = [...this.requests];
|
let rawFilters = [...this.requests];
|
||||||
this.requests = [];
|
this.requests = [];
|
||||||
if (this.#system.requestRouter) {
|
this.emit("request", this.id, rawFilters);
|
||||||
rawFilters = this.#system.requestRouter.forAllRequest(rawFilters);
|
|
||||||
}
|
|
||||||
const expanded = rawFilters.flatMap(a => this.#system.optimizer.expandFilter(a));
|
|
||||||
const fx = this.#groupFlatByRelay(expanded);
|
|
||||||
fx.forEach(a => this.emit("request", this.id, a));
|
|
||||||
}
|
|
||||||
|
|
||||||
#groupFlatByRelay(filters: Array<FlatReqFilter>) {
|
|
||||||
const relayMerged = filters.reduce((acc, v) => {
|
|
||||||
const relay = v.relay ?? "";
|
|
||||||
// delete relay from filter
|
|
||||||
delete v.relay;
|
|
||||||
const existing = acc.get(relay);
|
|
||||||
if (existing) {
|
|
||||||
existing.push(v);
|
|
||||||
} else {
|
|
||||||
acc.set(relay, [v]);
|
|
||||||
}
|
|
||||||
return acc;
|
|
||||||
}, new Map<string, Array<FlatReqFilter>>());
|
|
||||||
|
|
||||||
const ret = [];
|
|
||||||
for (const [k, v] of relayMerged.entries()) {
|
|
||||||
const filters = this.#system.optimizer.flatMerge(v);
|
|
||||||
ret.push({
|
|
||||||
relay: k,
|
|
||||||
filters,
|
|
||||||
} as BuiltRawReqFilter);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#stopCheckTraces() {
|
#stopCheckTraces() {
|
||||||
@ -444,7 +400,6 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
c.off("closed", eoseHandler);
|
c.off("closed", eoseHandler);
|
||||||
});
|
});
|
||||||
this.#tracing.push(qt);
|
this.#tracing.push(qt);
|
||||||
this.#cachedFilters = undefined;
|
|
||||||
|
|
||||||
if (q.syncFrom !== undefined) {
|
if (q.syncFrom !== undefined) {
|
||||||
c.request(["SYNC", qt.id, q.syncFrom, ...qt.filters], () => qt.sentToRelay());
|
c.request(["SYNC", qt.id, q.syncFrom, ...qt.filters], () => qt.sentToRelay());
|
||||||
|
Loading…
Reference in New Issue
Block a user