feat: NostrQueryManager
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Kieran 2024-01-08 13:55:59 +00:00
parent cae865a3e7
commit 88924941a5
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
5 changed files with 201 additions and 118 deletions

View File

@ -256,6 +256,7 @@ export class Connection extends EventEmitter<ConnectionEvents> {
case "CLOSED": {
this.emit("closed", msg[1] as string, msg[2] as string);
this.#log(`CLOSED: ${msg.slice(1)}`);
break;
}
default: {
this.#log(`Unknown tag: ${tag}`);

View File

@ -0,0 +1,39 @@
import { BuiltRawReqFilter, RequestStrategy } from "./request-builder";
import { NostrEvent, TaggedNostrEvent } from "./nostr";
import { Query } from "./query";
export interface EventCache {
bulkGet: (ids: Array<string>) => Promise<Array<NostrEvent>>;
}
export interface FilterCacheLayer {
processFilter(q: Query, req: BuiltRawReqFilter): Promise<BuiltRawReqFilter>;
}
export class IdsFilterCacheLayer implements FilterCacheLayer {
constructor(readonly cache: EventCache) {}
async processFilter(q: Query, req: BuiltRawReqFilter) {
for (const f of req.filters) {
if (f.ids) {
const cacheResults = await this.cache.bulkGet(f.ids);
if (cacheResults.length > 0) {
const resultIds = new Set(cacheResults.map(a => a.id));
f.ids = f.ids.filter(a => !resultIds.has(a));
// this step is important for buildDiff, if a filter doesnt exist with the ids which are from cache
// we will create an infinite loop where every render we insert a new query for the ids which are missing
q.insertCompletedTrace(
{
filters: [{ ...f, ids: [...resultIds] }],
strategy: RequestStrategy.ExplicitRelays,
relay: req.relay,
},
cacheResults as Array<TaggedNostrEvent>,
);
}
}
}
return req;
}
}

View File

@ -0,0 +1,125 @@
import debug from "debug";
import EventEmitter from "eventemitter3";
import { BuiltRawReqFilter, NoteCollection, NoteStore, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".";
import { Query, TraceReport } from "./query";
import { unwrap } from "@snort/shared";
interface NostrQueryManagerEvents {
change: () => void;
trace: (report: TraceReport) => void;
sendQuery: (q: Query, filter: BuiltRawReqFilter) => void;
}
export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
#log = debug("NostrQueryManager");
/**
* All active queries
*/
#queries: Map<string, Query> = new Map();
/**
* System interface handle
*/
#system: SystemInterface;
constructor(system: SystemInterface) {
super();
this.#system = system;
setInterval(() => this.#cleanup(), 1_000);
}
get(id: string) {
return this.#queries.get(id);
}
/**
* Compute query to send to relays
*/
query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query {
const existing = this.#queries.get(req.id);
if (existing) {
// if same instance, just return query
if (existing.fromInstance === req.instance) {
return existing;
}
const filters = !req.options?.skipDiff ? req.buildDiff(this.#system, existing.filters) : req.build(this.#system);
if (filters.length === 0 && !!req.options?.skipDiff) {
return existing;
} else {
for (const subQ of filters) {
this.emit("sendQuery", existing, subQ);
}
this.emit("change");
return existing;
}
} else {
const store = new type();
const filters = req.build(this.#system);
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen, req.options?.timeout);
q.on("trace", r => this.emit("trace", r));
this.#queries.set(req.id, q);
for (const subQ of filters) {
this.emit("sendQuery", q, subQ);
}
this.emit("change");
return q;
}
}
/**
* Async fetch results
*/
fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
const q = this.query(NoteCollection, req);
return new Promise<Array<TaggedNostrEvent>>(resolve => {
let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = [];
const releaseOnEvent = cb
? q.feed.onEvent(evs => {
if (!t) {
tBuf = [...evs];
t = setTimeout(() => {
t = undefined;
cb(tBuf);
}, 100);
} else {
tBuf.push(...evs);
}
})
: undefined;
const releaseFeedHook = q.feed.hook(() => {
if (q.progress === 1) {
releaseOnEvent?.();
releaseFeedHook();
q.cancel();
resolve(unwrap((q.feed as NoteCollection).snapshot.data));
}
});
});
}
*[Symbol.iterator]() {
for (const kv of this.#queries) {
yield kv;
}
}
#cleanup() {
let changed = false;
for (const [k, v] of this.#queries) {
if (v.canRemove()) {
v.sendClose();
this.#queries.delete(k);
this.#log("Deleted query %s", k);
changed = true;
}
}
if (changed) {
this.emit("change");
}
}
}

View File

@ -1,12 +1,12 @@
import debug from "debug";
import EventEmitter from "eventemitter3";
import { unwrap, FeedCache } from "@snort/shared";
import { FeedCache } from "@snort/shared";
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { Query } from "./query";
import { NoteCollection, NoteStore } from "./note-collection";
import { BuiltRawReqFilter, RequestBuilder, RequestStrategy } from "./request-builder";
import { NoteStore } from "./note-collection";
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
import { RelayMetricHandler } from "./relay-metric-handler";
import {
MetadataCache,
@ -26,6 +26,8 @@ import { RelayCache, RelayMetadataLoader } from "./outbox-model";
import { Optimizer, DefaultOptimizer } from "./query-optimizer";
import { trimFilters } from "./request-trim";
import { NostrConnectionPool } from "./nostr-connection-pool";
import { NostrQueryManager } from "./nostr-query-manager";
import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer";
export interface NostrSystemEvents {
change: (state: SystemSnapshot) => void;
@ -50,11 +52,7 @@ export interface NostrsystemProps {
export class NostrSystem extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#log = debug("System");
#pool = new NostrConnectionPool();
/**
* All active queries
*/
Queries: Map<string, Query> = new Map();
#queryManager: NostrQueryManager;
/**
* Storage class for user relay lists
@ -98,6 +96,11 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
#relayLoader: RelayMetadataLoader;
/**
* Query cache processing layers which can take data from a cache
*/
#queryCacheLayers: Array<FilterCacheLayer> = [];
constructor(props: NostrsystemProps) {
super();
this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
@ -110,7 +113,9 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache);
this.checkSigs = props.checkSigs ?? true;
this.#cleanup();
this.#queryManager = new NostrQueryManager(this);
this.#queryCacheLayers.push(new IdsFilterCacheLayer(this.#eventsCache));
// hook connection pool
this.#pool.on("connected", (id, wasReconnect) => {
@ -118,7 +123,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
if (c) {
this.#relayMetrics.onConnect(c.Address);
if (wasReconnect) {
for (const [, q] of this.Queries) {
for (const [, q] of this.#queryManager) {
q.connectionRestored(c);
}
}
@ -147,7 +152,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
const c = this.#pool.getConnection(id);
if (c) {
this.#relayMetrics.onDisconnect(c.Address, code);
for (const [, q] of this.Queries) {
for (const [, q] of this.#queryManager) {
q.connectionLost(c.Id);
}
}
@ -155,7 +160,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.#pool.on("eose", (id, sub) => {
const c = this.#pool.getConnection(id);
if (c) {
for (const [, v] of this.Queries) {
for (const [, v] of this.#queryManager) {
v.eose(sub, c);
}
}
@ -164,11 +169,19 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.#pool.on("notice", (addr, msg) => {
this.#log("NOTICE: %s %s", addr, msg);
});
this.#queryManager.on("change", () => this.emit("change", this.takeSnapshot()));
this.#queryManager.on("sendQuery", (q, f) => this.#sendQuery(q, f));
this.#queryManager.on("trace", t => {
this.#relayMetrics.onTraceReport(t);
});
// internal handler for on-event
this.on("event", (sub, ev) => {
for (const [, v] of this.Queries) {
v.handleEvent(sub, ev);
for (const [, v] of this.#queryManager) {
const trace = v.handleEvent(sub, ev);
if (trace && trace.filters.some(a => a.ids)) {
this.#eventsCache.set(ev);
}
}
});
}
@ -212,98 +225,22 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
GetQuery(id: string): Query | undefined {
return this.Queries.get(id);
return this.#queryManager.get(id);
}
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
const q = this.Query(NoteCollection, req);
return new Promise<Array<TaggedNostrEvent>>(resolve => {
let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = [];
const releaseOnEvent = cb
? q.feed.onEvent(evs => {
if (!t) {
tBuf = [...evs];
t = setTimeout(() => {
t = undefined;
cb(tBuf);
}, 100);
} else {
tBuf.push(...evs);
}
})
: undefined;
const releaseFeedHook = q.feed.hook(() => {
if (q.progress === 1) {
releaseOnEvent?.();
releaseFeedHook();
q.cancel();
resolve(unwrap((q.feed as NoteCollection).snapshot.data));
}
});
});
return this.#queryManager.fetch(req, cb);
}
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query {
const existing = this.Queries.get(req.id);
if (existing) {
// if same instance, just return query
if (existing.fromInstance === req.instance) {
return existing;
}
const filters = !req.options?.skipDiff ? req.buildDiff(this, existing.filters) : req.build(this);
if (filters.length === 0 && !!req.options?.skipDiff) {
return existing;
} else {
for (const subQ of filters) {
this.SendQuery(existing, subQ);
}
this.notifyChange();
return existing;
}
} else {
const store = new type();
const filters = req.build(this);
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen, req.options?.timeout);
q.on("trace", r => this.#relayMetrics.onTraceReport(r));
if (filters.some(a => a.filters.some(b => b.ids))) {
const expectIds = new Set(filters.flatMap(a => a.filters).flatMap(a => a.ids ?? []));
q.feed.onEvent(async evs => {
const toSet = evs.filter(a => expectIds.has(a.id) && this.#eventsCache.getFromCache(a.id) === undefined);
if (toSet.length > 0) {
await this.#eventsCache.bulkSet(toSet);
}
});
}
this.Queries.set(req.id, q);
for (const subQ of filters) {
this.SendQuery(q, subQ);
}
this.notifyChange();
return q;
}
return this.#queryManager.query(type, req);
}
async SendQuery(q: Query, qSend: BuiltRawReqFilter) {
// trim query of cached ids
async #sendQuery(q: Query, qSend: BuiltRawReqFilter) {
for (const qfl of this.#queryCacheLayers) {
qSend = await qfl.processFilter(q, qSend);
}
for (const f of qSend.filters) {
if (f.ids) {
const cacheResults = await this.#eventsCache.bulkGet(f.ids);
if (cacheResults.length > 0) {
const resultIds = new Set(cacheResults.map(a => a.id));
f.ids = f.ids.filter(a => !resultIds.has(a));
q.insertCompletedTrace(
{
filters: [{ ...f, ids: [...resultIds] }],
strategy: RequestStrategy.ExplicitRelays,
relay: qSend.relay,
},
cacheResults as Array<TaggedNostrEvent>,
);
}
}
if (f.authors) {
this.#relayLoader.TrackKeys(f.authors);
}
@ -366,7 +303,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
takeSnapshot(): SystemSnapshot {
return {
queries: [...this.Queries.values()].map(a => {
queries: [...this.#queryManager].map(([, a]) => {
return {
id: a.id,
filters: a.filters,
@ -375,24 +312,4 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}),
};
}
notifyChange() {
this.emit("change", this.takeSnapshot());
}
#cleanup() {
let changed = false;
for (const [k, v] of this.Queries) {
if (v.canRemove()) {
v.sendClose();
this.Queries.delete(k);
this.#log("Deleted query %s", k);
changed = true;
}
}
if (changed) {
this.notifyChange();
}
setTimeout(() => this.#cleanup(), 1_000);
}
}

View File

@ -198,6 +198,7 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
if (t.id === sub || sub === "*") {
if (t.filters.some(v => eventMatchesFilter(e, v))) {
this.feed.add(e);
return t;
} else {
this.#log("Event did not match filter, rejecting %O %O", e, t);
}