From 1a9e571b0f378794ce07cb94e99d8e1ba33a2b1e Mon Sep 17 00:00:00 2001 From: Martti Malmi Date: Fri, 5 Jan 2024 12:04:52 +0200 Subject: [PATCH] use InMemoryDB instead of seenEvents LRUSet --- packages/app/src/Cache/InMemoryDB.ts | 214 +++++++++++++++++++++++++++ packages/app/src/Db/IndexedDB.ts | 9 +- packages/app/src/system.ts | 23 +-- packages/system/src/connection.ts | 8 +- packages/system/src/index.ts | 1 - packages/system/src/seen-events.ts | 3 - 6 files changed, 236 insertions(+), 22 deletions(-) create mode 100644 packages/app/src/Cache/InMemoryDB.ts delete mode 100644 packages/system/src/seen-events.ts diff --git a/packages/app/src/Cache/InMemoryDB.ts b/packages/app/src/Cache/InMemoryDB.ts new file mode 100644 index 00000000..cbfbb242 --- /dev/null +++ b/packages/app/src/Cache/InMemoryDB.ts @@ -0,0 +1,214 @@ +import { ID, ReqFilter as Filter, STR, TaggedNostrEvent, UID } from "@snort/system"; +import loki from "lokijs"; + +type PackedNostrEvent = { + id: UID; + pubkey: number; + kind: number; + tags: Array[]; + flatTags: string[]; + sig: string; + created_at: number; + content?: string; + relays: string[]; + saved_at: number; +}; + +const DEFAULT_MAX_SIZE = 10000; + +class InMemoryDB { + private loki = new loki("EventDB"); + private eventsCollection: Collection; + private maxSize: number; + + constructor(maxSize = DEFAULT_MAX_SIZE) { + this.maxSize = maxSize; + this.eventsCollection = this.loki.addCollection("events", { + unique: ["id"], + indices: ["pubkey", "kind", "flatTags", "created_at", "saved_at"], + }); + this.startRemoveOldestInterval(); + } + + private startRemoveOldestInterval() { + const removeOldest = () => { + this.removeOldest(); + setTimeout(() => removeOldest(), 3000); + }; + setTimeout(() => removeOldest(), 3000); + } + + get(id: string): TaggedNostrEvent | undefined { + const event = this.eventsCollection.by("id", ID(id)); // throw if db not ready yet? + if (event) { + return this.unpack(event); + } + } + + has(id: string): boolean { + return !!this.eventsCollection.by("id", ID(id)); + } + + // map to internal UIDs to save memory + private pack(event: TaggedNostrEvent): PackedNostrEvent { + return { + id: ID(event.id), + pubkey: ID(event.pubkey), + sig: event.sig, + kind: event.kind, + tags: event.tags.map(tag => { + if (["e", "p"].includes(tag[0]) && typeof tag[1] === "string") { + return [tag[0], ID(tag[1] as string), ...tag.slice(2)]; + } else { + return tag; + } + }), + flatTags: event.tags.filter(tag => ["e", "p", "d"].includes(tag[0])).map(tag => `${tag[0]}_${ID(tag[1])}`), + created_at: event.created_at, + content: event.content, + relays: event.relays, + saved_at: Date.now(), + }; + } + + private unpack(packedEvent: PackedNostrEvent): TaggedNostrEvent { + return { + id: STR(packedEvent.id), + pubkey: STR(packedEvent.pubkey), + sig: packedEvent.sig, + kind: packedEvent.kind, + tags: packedEvent.tags.map(tag => { + if (["e", "p"].includes(tag[0] as string) && typeof tag[1] === "number") { + return [tag[0], STR(tag[1] as number), ...tag.slice(2)]; + } else { + return tag; + } + }), + created_at: packedEvent.created_at, + content: packedEvent.content, + relays: packedEvent.relays, + }; + } + + handleEvent(event: TaggedNostrEvent): boolean { + if (!event || !event.id || !event.created_at) { + throw new Error("Invalid event"); + } + + const id = ID(event.id); + if (this.eventsCollection.by("id", id)) { + return false; // this prevents updating event.relays? + } + + const packed = this.pack(event); + + // we might want to limit the kinds of events we save, e.g. no kind 0, 3 or only 1, 6 + + try { + this.eventsCollection.insert(packed); + } catch (e) { + return false; + } + + return true; + } + + remove(eventId: string): void { + const id = ID(eventId); + this.eventsCollection.findAndRemove({ id }); + if (this.idb) { + try { + this.idb.events.where({ id: eventId }).delete(); + } catch (e) { + console.error(e); + } + } + } + + removeOldest(): void { + const count = this.eventsCollection.count(); + console.log("InMemoryDB: count", count, this.maxSize); + if (count > this.maxSize) { + console.log("InMemoryDB: removing oldest events", count - this.maxSize); + this.eventsCollection + .chain() + .simplesort("saved_at") + .limit(count - this.maxSize) + .remove(); + } + } + + find(filter: Filter, callback: (event: TaggedNostrEvent) => void): void { + this.findArray(filter).forEach(event => { + callback(event); + }); + } + + findArray(filter: Filter): TaggedNostrEvent[] { + const query = this.constructQuery(filter); + + const searchRegex = filter.search ? new RegExp(filter.search, "i") : undefined; + let chain = this.eventsCollection + .chain() + .find(query) + .where((e: PackedNostrEvent) => { + if (searchRegex && !e.content?.match(searchRegex)) { + return false; + } + return true; + }) + .simplesort("created_at", true); + + if (filter.limit) { + chain = chain.limit(filter.limit); + } + + return chain.data().map(e => this.unpack(e)); + } + + findAndRemove(filter: Filter) { + const query = this.constructQuery(filter); + this.eventsCollection.findAndRemove(query); + } + + private constructQuery(filter: Filter): LokiQuery { + const query: LokiQuery = {}; + + if (filter.ids) { + query.id = { $in: filter.ids.map(ID) }; + } else { + if (filter.authors) { + query.pubkey = { $in: filter.authors.map(ID) }; + } + if (filter.kinds) { + query.kind = { $in: filter.kinds }; + } + if (filter["#e"]) { + query.flatTags = { $contains: "e_" + filter["#e"]!.map(ID) }; + } else if (filter["#p"]) { + query.flatTags = { $contains: "p_" + filter["#p"]!.map(ID) }; + } else if (filter["#d"]) { + query.flatTags = { $contains: "d_" + filter["#d"]!.map(ID) }; + } + if (filter.since && filter.until) { + query.created_at = { $between: [filter.since, filter.until] }; + } + if (filter.since) { + query.created_at = { $gte: filter.since }; + } + if (filter.until) { + query.created_at = { $lte: filter.until }; + } + } + + return query; + } + + findOne(filter: Filter): TaggedNostrEvent | undefined { + return this.findArray(filter)[0]; + } +} + +export { InMemoryDB }; + +export default new InMemoryDB(); diff --git a/packages/app/src/Db/IndexedDB.ts b/packages/app/src/Db/IndexedDB.ts index d2591673..83509d8e 100644 --- a/packages/app/src/Db/IndexedDB.ts +++ b/packages/app/src/Db/IndexedDB.ts @@ -1,5 +1,5 @@ +import LRUSet from "@snort/shared/src/LRUSet"; import { ReqFilter as Filter, TaggedNostrEvent } from "@snort/system"; -import { seenEvents } from "@snort/system"; import * as Comlink from "comlink"; import Dexie, { Table } from "dexie"; @@ -23,6 +23,7 @@ class IndexedDB extends Dexie { private subscribedAuthorsAndKinds = new Set(); private readQueue: Map = new Map(); private isProcessingQueue = false; + private seenEvents = new LRUSet(2000); constructor() { super("EventDB"); @@ -61,10 +62,10 @@ class IndexedDB extends Dexie { } handleEvent(event: TaggedNostrEvent) { - if (seenEvents.has(event.id)) { + if (this.seenEvents.has(event.id)) { return; } - seenEvents.add(event.id); + this.seenEvents.add(event.id); // maybe we don't want event.kind 3 tags const tags = @@ -166,7 +167,7 @@ class IndexedDB extends Dexie { // make sure only 1 argument is passed const cb = e => { - seenEvents.add(e.id); + this.seenEvents.add(e.id); callback(e); }; diff --git a/packages/app/src/system.ts b/packages/app/src/system.ts index 98c77421..69b42a63 100644 --- a/packages/app/src/system.ts +++ b/packages/app/src/system.ts @@ -15,6 +15,7 @@ import { addEventToFuzzySearch } from "@/Db/FuzzySearch"; import IndexedDBWorker from "@/Db/IndexedDB?worker"; import { LoginStore } from "@/Utils/Login"; import { hasWasm, WasmOptimizer } from "@/Utils/wasm"; +import inMemoryDB from "@/Cache/InMemoryDB"; // move to system or pass alreadyHave fn to system? export const indexedDB = Comlink.wrap(new IndexedDBWorker()); /** @@ -37,6 +38,7 @@ System.on("auth", async (c, r, cb) => { }); System.on("event", (_, ev) => { + inMemoryDB.handleEvent(ev); addEventToFuzzySearch(ev); socialGraphInstance.handleEvent(ev); if (CONFIG.useIndexedDBEvents && socialGraphInstance.getFollowDistance(ev.pubkey) <= 2) { @@ -44,21 +46,24 @@ System.on("event", (_, ev) => { } }); +System.on("request", (filter: ReqFilter) => { + inMemoryDB.find(filter, e => System.HandleEvent(e)); + if (CONFIG.useIndexedDBEvents) { + indexedDB.find( + filter, + Comlink.proxy((e: TaggedNostrEvent) => { + System.HandleEvent(e); + }), + ); + } +}); + if (CONFIG.useIndexedDBEvents) { // load all profiles indexedDB.find( { kinds: [0] }, Comlink.proxy((e: TaggedNostrEvent) => System.HandleEvent(e)), ); - - System.on("request", (filter: ReqFilter) => { - indexedDB.find( - filter, - Comlink.proxy((e: TaggedNostrEvent) => { - System.HandleEvent(e); - }), - ); - }); } /** diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 90d71690..7d7fa66e 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -9,8 +9,8 @@ import { ConnectionStats } from "./connection-stats"; import { NostrEvent, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr"; import { RelayInfo } from "./relay-info"; import EventKind from "./event-kind"; -import { seenEvents } from "./seen-events"; import { getHex64 } from "./utils"; +import inMemoryDB from "@snort/app/src/Cache/InMemoryDB"; /** * Relay settings @@ -203,12 +203,10 @@ export class Connection extends EventEmitter { if ((e.data as string).length > 0) { // skip message processing if we've already seen it const msgId = getHex64(e.data as string, "id"); - /* Disabled in absence of local db - if (seenEvents.has(msgId)) { + if (inMemoryDB.has(msgId)) { + console.log('already have'); return; } - */ - seenEvents.add(msgId); // TODO only do after msg validation const msg = JSON.parse(e.data as string) as Array; const tag = msg[0] as string; diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index 9e6dd510..530842cd 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -33,7 +33,6 @@ export * from "./query-optimizer"; export * from "./encrypted"; export * from "./outbox-model"; export { parseIMeta } from "./utils"; -export * from "./seen-events"; export * from "./impl/nip4"; export * from "./impl/nip44"; diff --git a/packages/system/src/seen-events.ts b/packages/system/src/seen-events.ts deleted file mode 100644 index 817f3b39..00000000 --- a/packages/system/src/seen-events.ts +++ /dev/null @@ -1,3 +0,0 @@ -import LRUSet from "@snort/shared/src/LRUSet"; - -export const seenEvents = new LRUSet(2000);