use InMemoryDB instead of seenEvents LRUSet
This commit is contained in:
214
packages/app/src/Cache/InMemoryDB.ts
Normal file
214
packages/app/src/Cache/InMemoryDB.ts
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
import { ID, ReqFilter as Filter, STR, TaggedNostrEvent, UID } from "@snort/system";
|
||||||
|
import loki from "lokijs";
|
||||||
|
|
||||||
|
type PackedNostrEvent = {
|
||||||
|
id: UID;
|
||||||
|
pubkey: number;
|
||||||
|
kind: number;
|
||||||
|
tags: Array<string | UID>[];
|
||||||
|
flatTags: string[];
|
||||||
|
sig: string;
|
||||||
|
created_at: number;
|
||||||
|
content?: string;
|
||||||
|
relays: string[];
|
||||||
|
saved_at: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
const DEFAULT_MAX_SIZE = 10000;
|
||||||
|
|
||||||
|
class InMemoryDB {
|
||||||
|
private loki = new loki("EventDB");
|
||||||
|
private eventsCollection: Collection<PackedNostrEvent>;
|
||||||
|
private maxSize: number;
|
||||||
|
|
||||||
|
constructor(maxSize = DEFAULT_MAX_SIZE) {
|
||||||
|
this.maxSize = maxSize;
|
||||||
|
this.eventsCollection = this.loki.addCollection("events", {
|
||||||
|
unique: ["id"],
|
||||||
|
indices: ["pubkey", "kind", "flatTags", "created_at", "saved_at"],
|
||||||
|
});
|
||||||
|
this.startRemoveOldestInterval();
|
||||||
|
}
|
||||||
|
|
||||||
|
private startRemoveOldestInterval() {
|
||||||
|
const removeOldest = () => {
|
||||||
|
this.removeOldest();
|
||||||
|
setTimeout(() => removeOldest(), 3000);
|
||||||
|
};
|
||||||
|
setTimeout(() => removeOldest(), 3000);
|
||||||
|
}
|
||||||
|
|
||||||
|
get(id: string): TaggedNostrEvent | undefined {
|
||||||
|
const event = this.eventsCollection.by("id", ID(id)); // throw if db not ready yet?
|
||||||
|
if (event) {
|
||||||
|
return this.unpack(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
has(id: string): boolean {
|
||||||
|
return !!this.eventsCollection.by("id", ID(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// map to internal UIDs to save memory
|
||||||
|
private pack(event: TaggedNostrEvent): PackedNostrEvent {
|
||||||
|
return {
|
||||||
|
id: ID(event.id),
|
||||||
|
pubkey: ID(event.pubkey),
|
||||||
|
sig: event.sig,
|
||||||
|
kind: event.kind,
|
||||||
|
tags: event.tags.map(tag => {
|
||||||
|
if (["e", "p"].includes(tag[0]) && typeof tag[1] === "string") {
|
||||||
|
return [tag[0], ID(tag[1] as string), ...tag.slice(2)];
|
||||||
|
} else {
|
||||||
|
return tag;
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
flatTags: event.tags.filter(tag => ["e", "p", "d"].includes(tag[0])).map(tag => `${tag[0]}_${ID(tag[1])}`),
|
||||||
|
created_at: event.created_at,
|
||||||
|
content: event.content,
|
||||||
|
relays: event.relays,
|
||||||
|
saved_at: Date.now(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private unpack(packedEvent: PackedNostrEvent): TaggedNostrEvent {
|
||||||
|
return <TaggedNostrEvent>{
|
||||||
|
id: STR(packedEvent.id),
|
||||||
|
pubkey: STR(packedEvent.pubkey),
|
||||||
|
sig: packedEvent.sig,
|
||||||
|
kind: packedEvent.kind,
|
||||||
|
tags: packedEvent.tags.map(tag => {
|
||||||
|
if (["e", "p"].includes(tag[0] as string) && typeof tag[1] === "number") {
|
||||||
|
return [tag[0], STR(tag[1] as number), ...tag.slice(2)];
|
||||||
|
} else {
|
||||||
|
return tag;
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
created_at: packedEvent.created_at,
|
||||||
|
content: packedEvent.content,
|
||||||
|
relays: packedEvent.relays,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
handleEvent(event: TaggedNostrEvent): boolean {
|
||||||
|
if (!event || !event.id || !event.created_at) {
|
||||||
|
throw new Error("Invalid event");
|
||||||
|
}
|
||||||
|
|
||||||
|
const id = ID(event.id);
|
||||||
|
if (this.eventsCollection.by("id", id)) {
|
||||||
|
return false; // this prevents updating event.relays?
|
||||||
|
}
|
||||||
|
|
||||||
|
const packed = this.pack(event);
|
||||||
|
|
||||||
|
// we might want to limit the kinds of events we save, e.g. no kind 0, 3 or only 1, 6
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.eventsCollection.insert(packed);
|
||||||
|
} catch (e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
remove(eventId: string): void {
|
||||||
|
const id = ID(eventId);
|
||||||
|
this.eventsCollection.findAndRemove({ id });
|
||||||
|
if (this.idb) {
|
||||||
|
try {
|
||||||
|
this.idb.events.where({ id: eventId }).delete();
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
removeOldest(): void {
|
||||||
|
const count = this.eventsCollection.count();
|
||||||
|
console.log("InMemoryDB: count", count, this.maxSize);
|
||||||
|
if (count > this.maxSize) {
|
||||||
|
console.log("InMemoryDB: removing oldest events", count - this.maxSize);
|
||||||
|
this.eventsCollection
|
||||||
|
.chain()
|
||||||
|
.simplesort("saved_at")
|
||||||
|
.limit(count - this.maxSize)
|
||||||
|
.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
find(filter: Filter, callback: (event: TaggedNostrEvent) => void): void {
|
||||||
|
this.findArray(filter).forEach(event => {
|
||||||
|
callback(event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
findArray(filter: Filter): TaggedNostrEvent[] {
|
||||||
|
const query = this.constructQuery(filter);
|
||||||
|
|
||||||
|
const searchRegex = filter.search ? new RegExp(filter.search, "i") : undefined;
|
||||||
|
let chain = this.eventsCollection
|
||||||
|
.chain()
|
||||||
|
.find(query)
|
||||||
|
.where((e: PackedNostrEvent) => {
|
||||||
|
if (searchRegex && !e.content?.match(searchRegex)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
})
|
||||||
|
.simplesort("created_at", true);
|
||||||
|
|
||||||
|
if (filter.limit) {
|
||||||
|
chain = chain.limit(filter.limit);
|
||||||
|
}
|
||||||
|
|
||||||
|
return chain.data().map(e => this.unpack(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
findAndRemove(filter: Filter) {
|
||||||
|
const query = this.constructQuery(filter);
|
||||||
|
this.eventsCollection.findAndRemove(query);
|
||||||
|
}
|
||||||
|
|
||||||
|
private constructQuery(filter: Filter): LokiQuery<PackedNostrEvent> {
|
||||||
|
const query: LokiQuery<PackedNostrEvent> = {};
|
||||||
|
|
||||||
|
if (filter.ids) {
|
||||||
|
query.id = { $in: filter.ids.map(ID) };
|
||||||
|
} else {
|
||||||
|
if (filter.authors) {
|
||||||
|
query.pubkey = { $in: filter.authors.map(ID) };
|
||||||
|
}
|
||||||
|
if (filter.kinds) {
|
||||||
|
query.kind = { $in: filter.kinds };
|
||||||
|
}
|
||||||
|
if (filter["#e"]) {
|
||||||
|
query.flatTags = { $contains: "e_" + filter["#e"]!.map(ID) };
|
||||||
|
} else if (filter["#p"]) {
|
||||||
|
query.flatTags = { $contains: "p_" + filter["#p"]!.map(ID) };
|
||||||
|
} else if (filter["#d"]) {
|
||||||
|
query.flatTags = { $contains: "d_" + filter["#d"]!.map(ID) };
|
||||||
|
}
|
||||||
|
if (filter.since && filter.until) {
|
||||||
|
query.created_at = { $between: [filter.since, filter.until] };
|
||||||
|
}
|
||||||
|
if (filter.since) {
|
||||||
|
query.created_at = { $gte: filter.since };
|
||||||
|
}
|
||||||
|
if (filter.until) {
|
||||||
|
query.created_at = { $lte: filter.until };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return query;
|
||||||
|
}
|
||||||
|
|
||||||
|
findOne(filter: Filter): TaggedNostrEvent | undefined {
|
||||||
|
return this.findArray(filter)[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export { InMemoryDB };
|
||||||
|
|
||||||
|
export default new InMemoryDB();
|
@ -1,5 +1,5 @@
|
|||||||
|
import LRUSet from "@snort/shared/src/LRUSet";
|
||||||
import { ReqFilter as Filter, TaggedNostrEvent } from "@snort/system";
|
import { ReqFilter as Filter, TaggedNostrEvent } from "@snort/system";
|
||||||
import { seenEvents } from "@snort/system";
|
|
||||||
import * as Comlink from "comlink";
|
import * as Comlink from "comlink";
|
||||||
import Dexie, { Table } from "dexie";
|
import Dexie, { Table } from "dexie";
|
||||||
|
|
||||||
@ -23,6 +23,7 @@ class IndexedDB extends Dexie {
|
|||||||
private subscribedAuthorsAndKinds = new Set<string>();
|
private subscribedAuthorsAndKinds = new Set<string>();
|
||||||
private readQueue: Map<string, Task> = new Map();
|
private readQueue: Map<string, Task> = new Map();
|
||||||
private isProcessingQueue = false;
|
private isProcessingQueue = false;
|
||||||
|
private seenEvents = new LRUSet(2000);
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
super("EventDB");
|
super("EventDB");
|
||||||
@ -61,10 +62,10 @@ class IndexedDB extends Dexie {
|
|||||||
}
|
}
|
||||||
|
|
||||||
handleEvent(event: TaggedNostrEvent) {
|
handleEvent(event: TaggedNostrEvent) {
|
||||||
if (seenEvents.has(event.id)) {
|
if (this.seenEvents.has(event.id)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
seenEvents.add(event.id);
|
this.seenEvents.add(event.id);
|
||||||
|
|
||||||
// maybe we don't want event.kind 3 tags
|
// maybe we don't want event.kind 3 tags
|
||||||
const tags =
|
const tags =
|
||||||
@ -166,7 +167,7 @@ class IndexedDB extends Dexie {
|
|||||||
|
|
||||||
// make sure only 1 argument is passed
|
// make sure only 1 argument is passed
|
||||||
const cb = e => {
|
const cb = e => {
|
||||||
seenEvents.add(e.id);
|
this.seenEvents.add(e.id);
|
||||||
callback(e);
|
callback(e);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ import { addEventToFuzzySearch } from "@/Db/FuzzySearch";
|
|||||||
import IndexedDBWorker from "@/Db/IndexedDB?worker";
|
import IndexedDBWorker from "@/Db/IndexedDB?worker";
|
||||||
import { LoginStore } from "@/Utils/Login";
|
import { LoginStore } from "@/Utils/Login";
|
||||||
import { hasWasm, WasmOptimizer } from "@/Utils/wasm";
|
import { hasWasm, WasmOptimizer } from "@/Utils/wasm";
|
||||||
|
import inMemoryDB from "@/Cache/InMemoryDB"; // move to system or pass alreadyHave fn to system?
|
||||||
|
|
||||||
export const indexedDB = Comlink.wrap(new IndexedDBWorker());
|
export const indexedDB = Comlink.wrap(new IndexedDBWorker());
|
||||||
/**
|
/**
|
||||||
@ -37,6 +38,7 @@ System.on("auth", async (c, r, cb) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
System.on("event", (_, ev) => {
|
System.on("event", (_, ev) => {
|
||||||
|
inMemoryDB.handleEvent(ev);
|
||||||
addEventToFuzzySearch(ev);
|
addEventToFuzzySearch(ev);
|
||||||
socialGraphInstance.handleEvent(ev);
|
socialGraphInstance.handleEvent(ev);
|
||||||
if (CONFIG.useIndexedDBEvents && socialGraphInstance.getFollowDistance(ev.pubkey) <= 2) {
|
if (CONFIG.useIndexedDBEvents && socialGraphInstance.getFollowDistance(ev.pubkey) <= 2) {
|
||||||
@ -44,21 +46,24 @@ System.on("event", (_, ev) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
System.on("request", (filter: ReqFilter) => {
|
||||||
|
inMemoryDB.find(filter, e => System.HandleEvent(e));
|
||||||
|
if (CONFIG.useIndexedDBEvents) {
|
||||||
|
indexedDB.find(
|
||||||
|
filter,
|
||||||
|
Comlink.proxy((e: TaggedNostrEvent) => {
|
||||||
|
System.HandleEvent(e);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if (CONFIG.useIndexedDBEvents) {
|
if (CONFIG.useIndexedDBEvents) {
|
||||||
// load all profiles
|
// load all profiles
|
||||||
indexedDB.find(
|
indexedDB.find(
|
||||||
{ kinds: [0] },
|
{ kinds: [0] },
|
||||||
Comlink.proxy((e: TaggedNostrEvent) => System.HandleEvent(e)),
|
Comlink.proxy((e: TaggedNostrEvent) => System.HandleEvent(e)),
|
||||||
);
|
);
|
||||||
|
|
||||||
System.on("request", (filter: ReqFilter) => {
|
|
||||||
indexedDB.find(
|
|
||||||
filter,
|
|
||||||
Comlink.proxy((e: TaggedNostrEvent) => {
|
|
||||||
System.HandleEvent(e);
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -9,8 +9,8 @@ import { ConnectionStats } from "./connection-stats";
|
|||||||
import { NostrEvent, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr";
|
import { NostrEvent, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr";
|
||||||
import { RelayInfo } from "./relay-info";
|
import { RelayInfo } from "./relay-info";
|
||||||
import EventKind from "./event-kind";
|
import EventKind from "./event-kind";
|
||||||
import { seenEvents } from "./seen-events";
|
|
||||||
import { getHex64 } from "./utils";
|
import { getHex64 } from "./utils";
|
||||||
|
import inMemoryDB from "@snort/app/src/Cache/InMemoryDB";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Relay settings
|
* Relay settings
|
||||||
@ -203,12 +203,10 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
|||||||
if ((e.data as string).length > 0) {
|
if ((e.data as string).length > 0) {
|
||||||
// skip message processing if we've already seen it
|
// skip message processing if we've already seen it
|
||||||
const msgId = getHex64(e.data as string, "id");
|
const msgId = getHex64(e.data as string, "id");
|
||||||
/* Disabled in absence of local db
|
if (inMemoryDB.has(msgId)) {
|
||||||
if (seenEvents.has(msgId)) {
|
console.log('already have');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
seenEvents.add(msgId); // TODO only do after msg validation
|
|
||||||
|
|
||||||
const msg = JSON.parse(e.data as string) as Array<string | NostrEvent | boolean>;
|
const msg = JSON.parse(e.data as string) as Array<string | NostrEvent | boolean>;
|
||||||
const tag = msg[0] as string;
|
const tag = msg[0] as string;
|
||||||
|
@ -33,7 +33,6 @@ export * from "./query-optimizer";
|
|||||||
export * from "./encrypted";
|
export * from "./encrypted";
|
||||||
export * from "./outbox-model";
|
export * from "./outbox-model";
|
||||||
export { parseIMeta } from "./utils";
|
export { parseIMeta } from "./utils";
|
||||||
export * from "./seen-events";
|
|
||||||
|
|
||||||
export * from "./impl/nip4";
|
export * from "./impl/nip4";
|
||||||
export * from "./impl/nip44";
|
export * from "./impl/nip44";
|
||||||
|
@ -1,3 +0,0 @@
|
|||||||
import LRUSet from "@snort/shared/src/LRUSet";
|
|
||||||
|
|
||||||
export const seenEvents = new LRUSet<string>(2000);
|
|
Reference in New Issue
Block a user