feat: track event seen on relays

fix: dump/clear commands
This commit is contained in:
2024-09-16 10:55:15 +01:00
parent b49144399c
commit 21e88b06cb
13 changed files with 141 additions and 54 deletions

View File

@ -79,6 +79,10 @@ export class WorkerRelayInterface {
return await this.#workerRpc<void, Uint8Array>("dumpDb");
}
async wipe() {
return await this.#workerRpc<void, boolean>("wipe");
}
async forYouFeed(pubkey: string) {
return await this.#workerRpc<string, Array<NostrEvent>>("forYouFeed", pubkey);
}

View File

@ -34,13 +34,19 @@ export class InMemoryRelay extends EventEmitter<RelayHandlerEvents> implements R
}
dump(): Promise<Uint8Array> {
return Promise.resolve(new Uint8Array());
const enc = new TextEncoder();
return Promise.resolve(enc.encode(JSON.stringify(this.#events.values())));
}
close(): void {
// nothing
}
wipe() {
this.#events = new Map();
return Promise.resolve();
}
event(ev: NostrEvent) {
if (this.#events.has(ev.id)) return false;
this.#events.set(ev.id, ev);

View File

@ -10,6 +10,7 @@ const migrations = [
{ version: 3, script: migrate_v3 },
{ version: 4, script: migrate_v4 },
{ version: 5, script: migrate_v5 },
{ version: 6, script: migrate_v6 },
];
async function migrate(relay: SqliteRelay) {
@ -103,4 +104,13 @@ async function migrate_v5(relay: SqliteRelay) {
});
}
async function migrate_v6(relay: SqliteRelay) {
relay.db?.transaction(db => {
db.exec("ALTER TABLE events ADD COLUMN relays TEXT");
db.exec("insert into __migration values(6, ?)", {
bind: [new Date().getTime() / 1000],
});
});
}
export default migrate;

View File

@ -1,4 +1,4 @@
import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
import sqlite3InitModule, { Database, SAHPoolUtil, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
import { EventEmitter } from "eventemitter3";
import { EventMetadata, NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "../types";
import migrate from "./migrations";
@ -12,6 +12,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
#sqlite?: Sqlite3Static;
#log = (msg: string, ...args: Array<any>) => debugLog("SqliteRelay", msg, ...args);
db?: Database;
#pool?: SAHPoolUtil;
#seenInserts = new Set<string>();
/**
@ -45,8 +46,8 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
if (!this.#sqlite) throw new Error("Must call init first");
if (this.db) return;
const pool = await this.#sqlite.installOpfsSAHPoolVfs({});
this.db = new pool.OpfsSAHPoolDb(path);
this.#pool = await this.#sqlite.installOpfsSAHPoolVfs({});
this.db = new this.#pool.OpfsSAHPoolDb(path);
this.#log(`Opened ${this.db.filename}`);
/*this.db.exec(
`PRAGMA cache_size=${32 * 1024
@ -54,6 +55,19 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
);*/
}
/**
* Delete all data
*/
async wipe() {
if (this.#pool && this.db) {
const dbName = this.db.filename;
this.close();
await this.#pool.wipeFiles();
await this.#open(dbName);
await migrate(this);
}
}
close() {
this.db?.close();
this.db = undefined;
@ -157,8 +171,15 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
this.#deleteById(db, oldEvents);
}
}
db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", {
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)],
// remove relays from event json
const evInsert = {
...ev,
} as NostrEvent;
delete evInsert["relays"];
db.exec("insert or ignore into events(id, pubkey, created, kind, json, relays) values(?,?,?,?,?,?)", {
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(evInsert), (ev.relays ?? []).join(",")],
});
const insertedEvents = db.changes();
if (insertedEvents > 0) {
@ -169,12 +190,33 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
}
this.insertIntoSearchIndex(db, ev);
} else {
this.#updateRelays(db, ev);
return 0;
}
this.#seenInserts.add(ev.id);
return insertedEvents;
}
/**
* Append relays
*/
#updateRelays(db: Database, ev: NostrEvent) {
const relays = db.selectArrays("select relays from events where id = ?", [ev.id]);
const oldRelays = new Set((relays?.at(0)?.at(0) as string | null)?.split(",") ?? []);
let hasNew = false;
for (const r of ev.relays ?? []) {
if (!oldRelays.has(r)) {
oldRelays.add(r);
hasNew = true;
}
}
if (hasNew) {
db.exec("update events set relays = ? where id = ?", {
bind: [[...oldRelays].join(","), ev.id],
});
}
}
/**
* Query relay by nostr filter
*/
@ -188,7 +230,11 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
if (req.ids_only === true) {
return a[0] as string;
}
return JSON.parse(a[0] as string) as NostrEvent;
const ev = JSON.parse(a[0] as string) as NostrEvent;
return {
...ev,
relays: (a[1] as string | null)?.split(","),
};
}) ?? [];
const time = unixNowMs() - start;
this.#log(`Query ${id} results took ${time.toLocaleString()}ms`);
@ -252,22 +298,14 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
*/
async dump() {
const filePath = String(this.db?.filename ?? "");
try {
this.db?.close();
this.db = undefined;
const dir = await navigator.storage.getDirectory();
// @ts-expect-error
for await (const [name, file] of dir) {
if (`/${name}` === filePath) {
const fh = await (file as FileSystemFileHandle).getFile();
const ret = new Uint8Array(await fh.arrayBuffer());
return ret;
}
if (this.db && this.#pool) {
try {
return await this.#pool.exportFile(`/${filePath}`);
} catch (e) {
console.error(e);
} finally {
await this.#open(filePath);
}
} catch (e) {
console.error(e);
} finally {
await this.#open(filePath);
}
return new Uint8Array();
}
@ -276,7 +314,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
const conditions: Array<string> = [];
const params: Array<any> = [];
let resultType = "json";
let resultType = "json,relays";
if (count) {
resultType = "count(json)";
} else if (req.ids_only === true) {

View File

@ -13,7 +13,8 @@ export type WorkerMessageCommand =
| "forYouFeed"
| "setEventMetadata"
| "debug"
| "delete";
| "delete"
| "wipe";
export interface WorkerMessage<T> {
id: string;
@ -73,6 +74,7 @@ export interface RelayHandler extends EventEmitter<RelayHandlerEvents> {
dump(): Promise<Uint8Array>;
delete(req: ReqFilter): Array<string>;
setEventMetadata(id: string, meta: EventMetadata): void;
wipe(): Promise<void>;
}
export interface RelayHandlerEvents {

View File

@ -154,6 +154,11 @@ const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: Mes
reply(msg.id, res);
break;
}
case "wipe": {
await relay!.wipe();
reply(msg.id, true);
break;
}
case "forYouFeed": {
const res = await getForYouFeed(relay!, msg.args as string);
reply(msg.id, res);