diff --git a/packages/app/src/Feed/LoginFeed.ts b/packages/app/src/Feed/LoginFeed.ts index 5a798342..385fd967 100644 --- a/packages/app/src/Feed/LoginFeed.ts +++ b/packages/app/src/Feed/LoginFeed.ts @@ -41,11 +41,7 @@ export default function useLoginFeed() { const { publicKey: pubKey, follows } = login; const { publisher, system } = useEventPublisher(); - const followLists = useFollowsContactListView(); - useEffect(() => { - followLists.forEach(e => socialGraphInstance.handleEvent(e)); - }, followLists); - + useFollowsContactListView(); useEffect(() => { system.checkSigs = login.appData.item.preferences.checkSigs; }, [login]); diff --git a/packages/app/src/Feed/WorkerRelayView.ts b/packages/app/src/Feed/WorkerRelayView.ts index cbdf1483..fa8208d0 100644 --- a/packages/app/src/Feed/WorkerRelayView.ts +++ b/packages/app/src/Feed/WorkerRelayView.ts @@ -36,13 +36,13 @@ export function useWorkerRelayView(id: string, filters: Array, leaveO ...f, limit: undefined, until: undefined, - since: latest.results?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), + since: latest.result?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), })) .forEach(f => rb.withBareFilter(f)); setRb(rb); }); Relay.req({ id, filters, leaveOpen }).then(res => { - setEvents(res.results); + setEvents(res.result); if (res.port) { res.port.addEventListener("message", ev => { const evs = ev.data as Array; @@ -82,7 +82,7 @@ export function useWorkerRelayViewCount(id: string, filters: Array, m ...f, limit: undefined, until: undefined, - since: latest.results?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), + since: latest.result?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), })) .forEach(f => rb.withBareFilter(f)); setRb(rb); diff --git a/packages/app/src/Pages/settings/Cache.tsx b/packages/app/src/Pages/settings/Cache.tsx index 8c9fd72f..5e335100 100644 --- a/packages/app/src/Pages/settings/Cache.tsx +++ b/packages/app/src/Pages/settings/Cache.tsx @@ -74,26 +74,34 @@ function RelayCacheStats() { - {Object.entries(counts) - .sort(([, a], [, b]) => (a > b ? -1 : 1)) - .map(([k, v]) => { - return ( - - {k} - - - - - ); - })} + {Object.entries(counts).sort(([, a], [, b]) => a > b ? -1 : 1).map(([k, v]) => { + return ( + + + + + ); + })} -
- {}}> +
+ { }}> + { + const data = await Relay.dump(); + const url = URL.createObjectURL(new File([data], "snort.db", { + type: "application/octet-stream" + })); + const a = document.createElement("a"); + a.href = url; + a.download = "snort.db"; + a.click(); + }}> + +
-
+ ); } diff --git a/packages/app/src/system.ts b/packages/app/src/system.ts index 642fe5e3..245b88b3 100644 --- a/packages/app/src/system.ts +++ b/packages/app/src/system.ts @@ -1,5 +1,4 @@ import { removeUndefined, throwIfOffline } from "@snort/shared"; -import LRUSet from "@snort/shared/src/LRUSet"; import { mapEventToProfile, NostrEvent, NostrSystem, ProfileLoaderService, socialGraphInstance } from "@snort/system"; import { WorkerRelayInterface } from "@snort/worker-relay"; import WorkerRelayPath from "@snort/worker-relay/dist/worker?worker&url"; @@ -63,28 +62,13 @@ export async function fetchProfile(key: string) { } export const Relay = new WorkerRelayInterface(WorkerRelayPath); -let relayInitStarted = false; export async function initRelayWorker() { - if (relayInitStarted) return; - relayInitStarted = true; try { if (await Relay.init()) { if (await Relay.open()) { await Relay.migrate(); - const seen = new LRUSet(100); - System.on("event", async (_, ev) => { - if (seen.has(ev.id)) return; - seen.add(ev.id); - await Relay.event(ev); - }); - System.on("request", async (subId, f) => { - const evs = await Relay.req(["REQ", "", ...f.filters]); - evs.forEach(ev => { - seen.add(ev.id); - queueMicrotask(() => { - System.HandleEvent(subId, { ...ev, relays: [] }); - }); - }); + System.on("event", (_, ev) => { + Relay.event(ev); }); } } diff --git a/packages/worker-relay/src/interface.ts b/packages/worker-relay/src/interface.ts index 63692cd9..3a8b4df1 100644 --- a/packages/worker-relay/src/interface.ts +++ b/packages/worker-relay/src/interface.ts @@ -21,35 +21,39 @@ export class WorkerRelayInterface { } async init() { - return await this.#workerRpc("init"); + return (await this.#workerRpc("init")).result; } async open() { - return await this.#workerRpc("open"); + return (await this.#workerRpc("open")).result; } async migrate() { - return await this.#workerRpc("migrate"); + return (await this.#workerRpc("migrate")).result; } async event(ev: NostrEvent) { - return await this.#workerRpc("event", ev); + return (await this.#workerRpc("event", ev)).result; } async req(req: ReqCommand) { - return await this.#workerRpc; port?: Readonly }>("req", req); + return await this.#workerRpc>("req", req); } async count(req: ReqCommand) { - return await this.#workerRpc("count", req); + return (await this.#workerRpc("count", req)).result; } async summary() { - return await this.#workerRpc>("summary"); + return (await this.#workerRpc>("summary")).result; } async close(id: string) { - return await this.#workerRpc("close", id); + return (await this.#workerRpc("close", id)).result; + } + + async dump() { + return (await this.#workerRpc("dumpDb")).result; } #workerRpc(cmd: string, args?: T, timeout = 30_000) { @@ -60,12 +64,18 @@ export class WorkerRelayInterface { args, } as WorkerMessage; this.#worker.postMessage(msg); - return new Promise((resolve, reject) => { + return new Promise<{ + result: R; + port: MessagePort | undefined; + }>((resolve, reject) => { let t: ReturnType; - this.#commandQueue.set(id, (v, ports) => { + this.#commandQueue.set(id, (v, port) => { clearTimeout(t); const cmdReply = v as WorkerMessage; - resolve({ ...cmdReply.args, port: ports.length > 0 ? ports[0] : undefined }); + resolve({ + result: cmdReply.args, + port: port.length > 0 ? port[0] : undefined, + }); }); t = setTimeout(() => { reject("timeout"); diff --git a/packages/worker-relay/src/relay.ts b/packages/worker-relay/src/relay.ts index 99e35f6b..b5381d51 100644 --- a/packages/worker-relay/src/relay.ts +++ b/packages/worker-relay/src/relay.ts @@ -9,8 +9,9 @@ export interface WorkerRelayEvents { export class WorkerRelay extends EventEmitter { #sqlite?: Sqlite3Static; - #log = debug("WorkerRelay"); + #log = (...args: any[]) => console.debug(...args); #db?: Database; + #seenInserts = new Set(); /** * Initialize the SQLite driver @@ -101,27 +102,45 @@ export class WorkerRelay extends EventEmitter { return eventsInserted.length > 0; } + #deleteById(db: Database, ids: Array) { + db.exec(`delete from events where id in (${this.#repeatParams(ids.length)})`, { + bind: ids, + }); + } + #insertEvent(db: Database, ev: NostrEvent) { + if (this.#seenInserts.has(ev.id)) return false; + const legacyReplacable = [0, 3, 41]; if (legacyReplacable.includes(ev.kind) || (ev.kind >= 10_000 && ev.kind < 20_000)) { - db.exec("delete from events where kind = ? and pubkey = ? and created < ?", { - bind: [ev.kind, ev.pubkey, ev.created_at], - }); - const oldDeleted = db.changes(); - if (oldDeleted === 0) { + const oldEvents = db.selectValues("select id from events where kind = ? and pubkey = ? and created <= ?", [ + ev.kind, + ev.pubkey, + ev.created_at, + ]) as Array; + if (oldEvents.includes(ev.id)) { + // we already have this event, return + this.#seenInserts.add(ev.id); + if (oldEvents.length > 1) { + const toDelete = oldEvents.filter(a => a !== ev.id); + this.#deleteById(db, toDelete); + } return false; } } if (ev.kind >= 30_000 && ev.kind < 40_000) { const dTag = ev.tags.find(a => a[0] === "d")![1]; - db.exec( - "delete from events where id in (select id from events, tags where events.id = tags.event_id and tags.key = ? and tags.value = ?)", - { - bind: ["d", dTag], - }, - ); - const oldDeleted = db.changes(); - if (oldDeleted === 0) { + const oldEvents = db.selectValues( + "select id from events where id in (select id from events, tags where events.id = tags.event_id and tags.key = ? and tags.value = ?)", + ["d", dTag], + ) as Array; + if (oldEvents.includes(ev.id)) { + // we have this version + this.#seenInserts.add(ev.id); + if (oldEvents.length > 1) { + const toDelete = oldEvents.filter(a => a !== ev.id); + this.#deleteById(db, toDelete); + } return false; } } @@ -136,23 +155,21 @@ export class WorkerRelay extends EventEmitter { }); } } + this.#seenInserts.add(ev.id); return eventInserted; } /** * Query relay by nostr filter */ - req(req: ReqFilter) { + req(id: string, req: ReqFilter) { const start = unixNowMs(); const [sql, params] = this.#buildQuery(req); - const rows = this.#db?.exec(sql, { - bind: params, - returnValue: "resultRows", - }); - const results = rows?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? []; + const res = this.#db?.selectArrays(sql, params); + const results = res?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? []; const time = unixNowMs() - start; - this.#log(`Query results took ${time.toLocaleString()}ms`); + //this.#log(`Query ${id} results took ${time.toLocaleString()}ms`); return results; } @@ -182,38 +199,55 @@ export class WorkerRelay extends EventEmitter { return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []); } - #buildQuery(req: ReqFilter, count = false) { + /** + * Dump the database file + */ + async dump() { + const filePath = String(this.#db?.filename ?? ""); + try { + this.#db?.close(); + this.#db = undefined; + const dir = await navigator.storage.getDirectory(); + // @ts-expect-error + for await (const [name, file] of dir) { + if (`/${name}` === filePath) { + const fh = await (file as FileSystemFileHandle).getFile(); + const ret = new Uint8Array(await fh.arrayBuffer()); + return ret; + } + } + } catch (e) { + console.error(e); + } finally { + this.open(filePath); + } + return new Uint8Array(); + } + + #buildQuery(req: ReqFilter, count = false): [string, Array] { const conditions: Array = []; const params: Array = []; - const repeatParams = (n: number) => { - const ret: Array = []; - for (let x = 0; x < n; x++) { - ret.push("?"); - } - return ret.join(", "); - }; - let sql = `select ${count ? "count(json)" : "json"} from events`; const tags = Object.entries(req).filter(([k]) => k.startsWith("#")); for (const [key, values] of tags) { const vArray = values as Array; - sql += ` inner join tags on events.id = tags.event_id and tags.key = ? and tags.value in (${repeatParams( + sql += ` inner join tags on events.id = tags.event_id and tags.key = ? and tags.value in (${this.#repeatParams( vArray.length, )})`; params.push(key.slice(1)); params.push(...vArray); } if (req.ids) { - conditions.push(`id in (${repeatParams(req.ids.length)})`); + conditions.push(`id in (${this.#repeatParams(req.ids.length)})`); params.push(...req.ids); } if (req.authors) { - conditions.push(`pubkey in (${repeatParams(req.authors.length)})`); + conditions.push(`pubkey in (${this.#repeatParams(req.authors.length)})`); params.push(...req.authors); } if (req.kinds) { - conditions.push(`kind in (${repeatParams(req.kinds.length)})`); + conditions.push(`kind in (${this.#repeatParams(req.kinds.length)})`); params.push(...req.kinds); } if (req.since) { @@ -233,6 +267,32 @@ export class WorkerRelay extends EventEmitter { return [sql, params]; } + #repeatParams(n: number) { + const ret: Array = []; + for (let x = 0; x < n; x++) { + ret.push("?"); + } + return ret.join(", "); + } + + #replaceParamsDebug(sql: string, params: Array) { + let res = ""; + let cIdx = 0; + for (const chr of sql) { + if (chr === "?") { + const px = params[cIdx++]; + if (typeof px === "number") { + res += px.toString(); + } else if (typeof px === "string") { + res += `'${px}'`; + } + } else { + res += chr; + } + } + return res; + } + #migrate_v1() { this.#db?.transaction(db => { db.exec( diff --git a/packages/worker-relay/src/types.ts b/packages/worker-relay/src/types.ts index b08185da..d33b5949 100644 --- a/packages/worker-relay/src/types.ts +++ b/packages/worker-relay/src/types.ts @@ -1,6 +1,6 @@ export interface WorkerMessage { id: string; - cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close"; + cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb"; args: T; } diff --git a/packages/worker-relay/src/worker.ts b/packages/worker-relay/src/worker.ts index 70e60ca6..475528c0 100644 --- a/packages/worker-relay/src/worker.ts +++ b/packages/worker-relay/src/worker.ts @@ -60,8 +60,6 @@ globalThis.onclose = () => { }; globalThis.onmessage = ev => { - //console.debug(ev); - const msg = ev.data as WorkerMessage; try { switch (msg.cmd) { @@ -108,9 +106,9 @@ globalThis.onmessage = ev => { } const results = []; for (const r of req.filters) { - results.push(...relay.req(r as ReqFilter)); + results.push(...relay.req(req.id, r as ReqFilter)); } - reply(msg.id, { results }, req.leaveOpen ? [chan.port2] : undefined); + reply(msg.id, results, req.leaveOpen ? [chan.port2] : undefined); }); break; } @@ -133,12 +131,20 @@ globalThis.onmessage = ev => { }); break; } + case "dumpDb": { + barrierQueue(cmdQueue, async () => { + const res = await relay.dump(); + reply(msg.id, res); + }); + break; + } default: { reply(msg.id, { error: "Unknown command" }); break; } } } catch (e) { + console.error(e); reply(msg.id, { error: e }); } };