feat: query save/restore
This commit is contained in:
@ -41,6 +41,7 @@
|
|||||||
"eventemitter3": "^5.0.1",
|
"eventemitter3": "^5.0.1",
|
||||||
"isomorphic-ws": "^5.0.0",
|
"isomorphic-ws": "^5.0.0",
|
||||||
"lokijs": "^1.5.12",
|
"lokijs": "^1.5.12",
|
||||||
|
"lru-cache": "^10.2.0",
|
||||||
"uuid": "^9.0.0",
|
"uuid": "^9.0.0",
|
||||||
"ws": "^8.14.0"
|
"ws": "^8.14.0"
|
||||||
}
|
}
|
||||||
|
@ -386,7 +386,7 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
|||||||
const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
|
const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
|
||||||
const newFilters = filters.map(a => ({
|
const newFilters = filters.map(a => ({
|
||||||
...a,
|
...a,
|
||||||
since: latest,
|
since: latest + 1,
|
||||||
}));
|
}));
|
||||||
this.queueReq(["REQ", id, ...newFilters], item.cb);
|
this.queueReq(["REQ", id, ...newFilters], item.cb);
|
||||||
}
|
}
|
||||||
|
@ -166,13 +166,6 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Split request into 2 branches.
|
|
||||||
* 1. Request cache for results
|
|
||||||
* 2. Send query to relays
|
|
||||||
*/
|
|
||||||
#splitSyncRequest(req: BuiltRawReqFilter) {}
|
|
||||||
|
|
||||||
#cleanup() {
|
#cleanup() {
|
||||||
let changed = false;
|
let changed = false;
|
||||||
for (const [k, v] of this.#queries) {
|
for (const [k, v] of this.#queries) {
|
||||||
|
@ -3,10 +3,11 @@ import debug from "debug";
|
|||||||
import EventEmitter from "eventemitter3";
|
import EventEmitter from "eventemitter3";
|
||||||
import { unixNowMs, unwrap } from "@snort/shared";
|
import { unixNowMs, unwrap } from "@snort/shared";
|
||||||
|
|
||||||
import { Connection, ReqFilter, Nips, TaggedNostrEvent, SystemInterface } from ".";
|
import { Connection, ReqFilter, Nips, TaggedNostrEvent, SystemInterface, ParsedFragment } from ".";
|
||||||
import { NoteCollection } from "./note-collection";
|
import { NoteCollection } from "./note-collection";
|
||||||
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
|
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
|
||||||
import { eventMatchesFilter } from "./request-matcher";
|
import { eventMatchesFilter } from "./request-matcher";
|
||||||
|
import { LRUCache } from "lru-cache";
|
||||||
|
|
||||||
interface QueryTraceEvents {
|
interface QueryTraceEvents {
|
||||||
change: () => void;
|
change: () => void;
|
||||||
@ -105,6 +106,11 @@ export interface QueryEvents {
|
|||||||
end: () => void;
|
end: () => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const QueryCache = new LRUCache<string, Array<TaggedNostrEvent>>({
|
||||||
|
ttl: 60_000 * 3,
|
||||||
|
ttlAutopurge: true,
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Active or queued query on the system
|
* Active or queued query on the system
|
||||||
*/
|
*/
|
||||||
@ -175,6 +181,11 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
this.#groupingDelay = req.options?.groupingDelay ?? 100;
|
this.#groupingDelay = req.options?.groupingDelay ?? 100;
|
||||||
this.#checkTraces();
|
this.#checkTraces();
|
||||||
|
|
||||||
|
const cached = QueryCache.get(this.request.id);
|
||||||
|
if (cached) {
|
||||||
|
this.#log("Restored %o for %s", cached, this.request.id);
|
||||||
|
this.feed.add(cached);
|
||||||
|
}
|
||||||
this.feed.on("event", evs => this.emit("event", evs));
|
this.feed.on("event", evs => this.emit("event", evs));
|
||||||
this.#start();
|
this.#start();
|
||||||
}
|
}
|
||||||
@ -234,7 +245,7 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
* This function should be called when this Query object and FeedStore is no longer needed
|
* This function should be called when this Query object and FeedStore is no longer needed
|
||||||
*/
|
*/
|
||||||
cancel() {
|
cancel() {
|
||||||
this.#cancelAt = unixNowMs() + 5_000;
|
this.#cancelAt = unixNowMs() + 1_000;
|
||||||
}
|
}
|
||||||
|
|
||||||
uncancel() {
|
uncancel() {
|
||||||
@ -248,6 +259,8 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
}
|
}
|
||||||
this.#stopCheckTraces();
|
this.#stopCheckTraces();
|
||||||
this.emit("end");
|
this.emit("end");
|
||||||
|
QueryCache.set(this.request.id, this.feed.snapshot);
|
||||||
|
this.#log("Saved %O for %s", this.feed.snapshot, this.request.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3091,6 +3091,7 @@ __metadata:
|
|||||||
jest: ^29.5.0
|
jest: ^29.5.0
|
||||||
jest-environment-jsdom: ^29.5.0
|
jest-environment-jsdom: ^29.5.0
|
||||||
lokijs: ^1.5.12
|
lokijs: ^1.5.12
|
||||||
|
lru-cache: ^10.2.0
|
||||||
ts-jest: ^29.1.0
|
ts-jest: ^29.1.0
|
||||||
ts-node: ^10.9.1
|
ts-node: ^10.9.1
|
||||||
typescript: ^5.2.2
|
typescript: ^5.2.2
|
||||||
@ -8280,6 +8281,13 @@ __metadata:
|
|||||||
languageName: node
|
languageName: node
|
||||||
linkType: hard
|
linkType: hard
|
||||||
|
|
||||||
|
"lru-cache@npm:^10.2.0":
|
||||||
|
version: 10.2.0
|
||||||
|
resolution: "lru-cache@npm:10.2.0"
|
||||||
|
checksum: eee7ddda4a7475deac51ac81d7dd78709095c6fa46e8350dc2d22462559a1faa3b81ed931d5464b13d48cbd7e08b46100b6f768c76833912bc444b99c37e25db
|
||||||
|
languageName: node
|
||||||
|
linkType: hard
|
||||||
|
|
||||||
"lru-cache@npm:^5.1.1":
|
"lru-cache@npm:^5.1.1":
|
||||||
version: 5.1.1
|
version: 5.1.1
|
||||||
resolution: "lru-cache@npm:5.1.1"
|
resolution: "lru-cache@npm:5.1.1"
|
||||||
|
Reference in New Issue
Block a user