diff --git a/packages/app/src/Cache/EventCacheWorker.ts b/packages/app/src/Cache/EventCacheWorker.ts new file mode 100644 index 00000000..e78c65dc --- /dev/null +++ b/packages/app/src/Cache/EventCacheWorker.ts @@ -0,0 +1,96 @@ +import { CachedTable, CacheEvents } from "@snort/shared"; +import { NostrEvent } from "@snort/system"; +import { WorkerRelayInterface } from "@snort/worker-relay"; +import EventEmitter from "eventemitter3"; + +export class EventCacheWorker extends EventEmitter implements CachedTable { + #relay: WorkerRelayInterface; + #keys = new Set(); + #cache = new Map(); + + constructor(relay: WorkerRelayInterface) { + super(); + this.#relay = relay; + } + + async preload() { + const ids = await this.#relay.sql("select id from events", []); + this.#keys = new Set(ids.map(a => a[0] as string)); + return Promise.resolve(); + } + + keysOnTable(): string[] { + return [...this.#keys]; + } + + getFromCache(key?: string | undefined): NostrEvent | undefined { + if (key) { + return this.#cache.get(key); + } + } + + discover(ev: NostrEvent) { + this.#keys.add(this.key(ev)); + } + + async get(key?: string | undefined): Promise { + if (key) { + const res = await this.bulkGet([key]); + if (res.length > 0) { + return res[0]; + } + } + } + + async bulkGet(keys: string[]): Promise { + const results = await this.#relay.req({ + id: "EventCacheWorker.bulkGet", + filters: [ + { + ids: keys, + }, + ], + }); + for (const ev of results.result) { + this.#cache.set(ev.id, ev); + } + return results.result; + } + + async set(obj: NostrEvent): Promise { + await this.#relay.event(obj); + this.#keys.add(obj.id); + } + + async bulkSet(obj: NostrEvent[] | readonly NostrEvent[]): Promise { + await Promise.all( + obj.map(async a => { + await this.#relay.event(a); + this.#keys.add(a.id); + }), + ); + } + + async update( + m: TWithCreated, + ): Promise<"new" | "refresh" | "updated" | "no_change"> { + if (await this.#relay.event(m)) { + return "updated"; + } + return "no_change"; + } + + async buffer(keys: string[]): Promise { + const missing = keys.filter(a => !this.#keys.has(a)); + const res = await this.bulkGet(missing); + return missing.filter(a => !res.some(b => this.key(b) === a)); + } + + key(of: NostrEvent): string { + return of.id; + } + + snapshot(): NostrEvent[] { + return [...this.#cache.values()]; + } +} diff --git a/packages/app/src/Cache/EventInteractionCache.ts b/packages/app/src/Cache/EventInteractionCache.ts deleted file mode 100644 index 9fe2c507..00000000 --- a/packages/app/src/Cache/EventInteractionCache.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { FeedCache } from "@snort/shared"; - -import { db, EventInteraction } from "@/Db"; -import { LoginStore } from "@/Utils/Login"; - -export class EventInteractionCache extends FeedCache { - constructor() { - super("EventInteraction", db.eventInteraction); - } - - key(of: EventInteraction): string { - return `${of.event}:${of.by}`; - } - - override async preload(): Promise { - await super.preload(); - - const data = window.localStorage.getItem("zap-cache"); - if (data) { - const toImport = [...new Set(JSON.parse(data) as Array)].map(a => { - const ret = { - event: a, - by: LoginStore.takeSnapshot().publicKey, - zapped: true, - reacted: false, - reposted: false, - } as EventInteraction; - ret.id = this.key(ret); - return ret; - }); - await this.bulkSet(toImport); - - window.localStorage.removeItem("zap-cache"); - } - await this.buffer([...this.onTable]); - } - - takeSnapshot(): EventInteraction[] { - return [...this.cache.values()]; - } -} diff --git a/packages/app/src/Cache/FollowListCache.ts b/packages/app/src/Cache/FollowListCache.ts deleted file mode 100644 index 34845ee5..00000000 --- a/packages/app/src/Cache/FollowListCache.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { unixNowMs } from "@snort/shared"; -import { EventKind, RequestBuilder, socialGraphInstance, TaggedNostrEvent } from "@snort/system"; - -import { db } from "@/Db"; -import { LoginSession } from "@/Utils/Login"; - -import { RefreshFeedCache } from "./RefreshFeedCache"; - -export class FollowListCache extends RefreshFeedCache { - constructor() { - super("FollowListCache", db.followLists); - } - - buildSub(session: LoginSession, rb: RequestBuilder): void { - const since = this.newest(); - rb.withFilter() - .kinds([EventKind.ContactList]) - .authors(session.follows.item) - .since(since === 0 ? undefined : since); - } - - async onEvent(evs: readonly TaggedNostrEvent[]) { - await Promise.all( - evs.map(async e => { - const update = await super.update({ - ...e, - created: e.created_at, - loaded: unixNowMs(), - }); - if (update !== "no_change") { - socialGraphInstance.handleEvent(e); - } - }), - ); - } - - key(of: TaggedNostrEvent): string { - return of.pubkey; - } - - takeSnapshot() { - return [...this.cache.values()]; - } - - override async preload() { - await super.preload(); - this.cache.forEach(e => socialGraphInstance.handleEvent(e)); - } -} diff --git a/packages/app/src/Cache/FollowsFeed.ts b/packages/app/src/Cache/FollowsFeed.ts deleted file mode 100644 index 1894b4bb..00000000 --- a/packages/app/src/Cache/FollowsFeed.ts +++ /dev/null @@ -1,136 +0,0 @@ -import { unixNow, unixNowMs } from "@snort/shared"; -import { EventKind, RequestBuilder, SystemInterface, TaggedNostrEvent } from "@snort/system"; - -import { db } from "@/Db"; -import { Day, Hour } from "@/Utils/Const"; -import { LoginSession } from "@/Utils/Login"; - -import { RefreshFeedCache, TWithCreated } from "./RefreshFeedCache"; - -const WindowSize = Hour * 6; -const MaxCacheWindow = Day * 7; - -export class FollowsFeedCache extends RefreshFeedCache { - #kinds = [EventKind.TextNote, EventKind.Repost, EventKind.Polls]; - #oldest?: number; - - constructor() { - super("FollowsFeedCache", db.followsFeed); - } - - key(of: TWithCreated): string { - return of.id; - } - - takeSnapshot(): TWithCreated[] { - return [...this.cache.values()]; - } - - buildSub(session: LoginSession, rb: RequestBuilder): void { - const authors = [...session.follows.item]; - if (session.publicKey) { - authors.push(session.publicKey); - } - const since = this.newest(); - rb.withFilter() - .kinds(this.#kinds) - .authors(authors) - .since(since === 0 ? unixNow() - WindowSize : since); - } - - async onEvent(evs: readonly TaggedNostrEvent[]): Promise { - const filtered = evs.filter(a => this.#kinds.includes(a.kind)); - if (filtered.length > 0) { - await this.bulkSet(filtered); - this.emit( - "change", - filtered.map(a => this.key(a)), - ); - } - } - - override async preload() { - const start = unixNowMs(); - const keys = (await this.table?.toCollection().primaryKeys()) ?? []; - this.onTable = new Set(keys.map(a => a as string)); - - // load only latest 50 posts, rest can be loaded on-demand - const latest = await this.table?.orderBy("created_at").reverse().limit(50).toArray(); - latest?.forEach(v => this.cache.set(this.key(v), v)); - - // cleanup older than 7 days - await this.table - ?.where("created_at") - .below(unixNow() - MaxCacheWindow) - .delete(); - - const oldest = await this.table?.orderBy("created_at").first(); - this.#oldest = oldest?.created_at; - this.emit("change", latest?.map(a => this.key(a)) ?? []); - this.log(`Loaded %d/%d in %d ms`, latest?.length ?? 0, keys.length, (unixNowMs() - start).toLocaleString()); - } - - async loadMore(system: SystemInterface, session: LoginSession, before: number) { - if (this.#oldest && before <= this.#oldest) { - const rb = new RequestBuilder(`${this.name}-loadmore`); - const authors = [...session.follows.item]; - if (session.publicKey) { - authors.push(session.publicKey); - } - rb.withFilter() - .kinds(this.#kinds) - .authors(authors) - .until(before) - .since(before - WindowSize); - await system.Fetch(rb, async evs => { - await this.bulkSet(evs); - }); - } else { - const latest = await this.table - ?.where("created_at") - .between(before - WindowSize, before) - .reverse() - .sortBy("created_at"); - latest?.forEach(v => { - const k = this.key(v); - this.cache.set(k, v); - this.onTable.add(k); - }); - - this.emit("change", latest?.map(a => this.key(a)) ?? []); - } - } - - /** - * Backfill cache with new follows - */ - async backFill(system: SystemInterface, keys: Array) { - if (keys.length === 0) return; - - const rb = new RequestBuilder(`${this.name}-backfill`); - rb.withFilter() - .kinds(this.#kinds) - .authors(keys) - .until(unixNow()) - .since(this.#oldest ?? unixNow() - MaxCacheWindow); - await system.Fetch(rb, async evs => { - await this.bulkSet(evs); - }); - } - - /** - * Backfill cache based on follows list - */ - async backFillIfMissing(system: SystemInterface, keys: Array) { - if (!this.#oldest) return; - - const start = unixNowMs(); - const everything = await this.table?.toArray(); - if ((everything?.length ?? 0) > 0) { - const allKeys = new Set(everything?.map(a => a.pubkey)); - const missingKeys = keys.filter(a => !allKeys.has(a)); - await this.backFill(system, missingKeys); - this.log(`Backfilled %d keys in %d ms`, missingKeys.length, (unixNowMs() - start).toLocaleString()); - } - } -} diff --git a/packages/app/src/Cache/Notifications.ts b/packages/app/src/Cache/Notifications.ts deleted file mode 100644 index 48b82dca..00000000 --- a/packages/app/src/Cache/Notifications.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { unixNow } from "@snort/shared"; -import { EventKind, NostrEvent, RequestBuilder, TaggedNostrEvent } from "@snort/system"; - -import { db, NostrEventForSession } from "@/Db"; -import { Day } from "@/Utils/Const"; -import { LoginSession } from "@/Utils/Login"; - -import { RefreshFeedCache, TWithCreated } from "./RefreshFeedCache"; - -export class NotificationsCache extends RefreshFeedCache { - #kinds = [EventKind.TextNote, EventKind.Reaction, EventKind.Repost, EventKind.ZapReceipt]; - - constructor() { - super("notifications", db.notifications); - } - - buildSub(session: LoginSession, rb: RequestBuilder) { - if (session.publicKey) { - const newest = this.newest(v => v.tags.some(a => a[0] === "p" && a[1] === session.publicKey)); - rb.withFilter() - .kinds(this.#kinds) - .tag("p", [session.publicKey]) - .since(newest === 0 ? unixNow() - Day * 30 : newest); - } - } - - async onEvent(evs: readonly TaggedNostrEvent[], pubKey: string) { - const filtered = evs.filter(a => this.#kinds.includes(a.kind) && a.tags.some(b => b[0] === "p")); - if (filtered.length > 0) { - await this.bulkSet( - filtered.map(v => ({ - ...v, - forSession: pubKey, - })), - ); - this.emit( - "change", - filtered.map(v => this.key(v)), - ); - } - } - - key(of: TWithCreated): string { - return of.id; - } - - takeSnapshot() { - return [...this.cache.values()]; - } -} diff --git a/packages/app/src/Cache/index.ts b/packages/app/src/Cache/index.ts index 84a899ec..faebdea5 100644 --- a/packages/app/src/Cache/index.ts +++ b/packages/app/src/Cache/index.ts @@ -1,13 +1,30 @@ import { RelayMetricCache, UserProfileCache, UserRelaysCache } from "@snort/system"; import { SnortSystemDb } from "@snort/system-web"; +import { WorkerRelayInterface } from "@snort/worker-relay"; +import WorkerRelayPath from "@snort/worker-relay/dist/worker?worker&url"; import { ChatCache } from "./ChatCache"; +import { EventCacheWorker } from "./EventCacheWorker"; import { GiftWrapCache } from "./GiftWrapCache"; +export const Relay = new WorkerRelayInterface(WorkerRelayPath); +export async function initRelayWorker() { + try { + if (await Relay.init()) { + if (await Relay.open()) { + await Relay.migrate(); + } + } + } catch (e) { + console.error(e); + } +} + export const SystemDb = new SnortSystemDb(); export const UserCache = new UserProfileCache(SystemDb.users); export const UserRelays = new UserRelaysCache(SystemDb.userRelays); export const RelayMetrics = new RelayMetricCache(SystemDb.relayMetrics); +export const EventsCache = new EventCacheWorker(Relay); export const Chats = new ChatCache(); export const GiftsCache = new GiftWrapCache(); @@ -19,6 +36,7 @@ export async function preload(follows?: Array) { RelayMetrics.preload(), GiftsCache.preload(), UserRelays.preload(follows), + EventsCache.preload(), ]; await Promise.all(preloads); } diff --git a/packages/app/src/Feed/LoginFeed.ts b/packages/app/src/Feed/LoginFeed.ts index 385fd967..dfbd2e98 100644 --- a/packages/app/src/Feed/LoginFeed.ts +++ b/packages/app/src/Feed/LoginFeed.ts @@ -3,7 +3,6 @@ import { NostrLink, parseRelayTags, RequestBuilder, - socialGraphInstance, TaggedNostrEvent, } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; diff --git a/packages/app/src/Feed/WorkerRelayView.ts b/packages/app/src/Feed/WorkerRelayView.ts index fa8208d0..48685052 100644 --- a/packages/app/src/Feed/WorkerRelayView.ts +++ b/packages/app/src/Feed/WorkerRelayView.ts @@ -3,8 +3,8 @@ import { EventKind, NostrEvent, NostrLink, ReqFilter, RequestBuilder, TaggedNost import { SnortContext, useRequestBuilder } from "@snort/system-react"; import { useContext, useEffect, useMemo, useState } from "react"; +import { Relay } from "@/Cache"; import useLogin from "@/Hooks/useLogin"; -import { Relay } from "@/system"; import { Day } from "@/Utils/Const"; export function useWorkerRelayView(id: string, filters: Array, leaveOpen?: boolean, maxWindow?: number) { @@ -20,6 +20,7 @@ export function useWorkerRelayView(id: string, filters: Array, leaveO } }, [rb, system]); useEffect(() => { + setRb(undefined); Relay.req({ id: `${id}+latest`, filters: filters.map(f => ({ @@ -32,12 +33,15 @@ export function useWorkerRelayView(id: string, filters: Array, leaveO const rb = new RequestBuilder(id); rb.withOptions({ fillStore: false }); filters - .map((f, i) => ({ - ...f, - limit: undefined, - until: undefined, - since: latest.result?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), - })) + .map((f, i) => { + const since = latest.result?.at(i)?.created_at; + return { + ...f, + limit: undefined, + until: undefined, + since: since ? since + 1 : maxWindow ? unixNow() - maxWindow : f.since, + }; + }) .forEach(f => rb.withBareFilter(f)); setRb(rb); }); diff --git a/packages/app/src/Pages/settings/Cache.tsx b/packages/app/src/Pages/settings/Cache.tsx index 799d9813..fee4af67 100644 --- a/packages/app/src/Pages/settings/Cache.tsx +++ b/packages/app/src/Pages/settings/Cache.tsx @@ -2,9 +2,8 @@ import { FeedCache } from "@snort/shared"; import { ReactNode, useEffect, useState, useSyncExternalStore } from "react"; import { FormattedMessage, FormattedNumber } from "react-intl"; -import { Chats, GiftsCache, RelayMetrics, UserCache } from "@/Cache"; +import { Chats, GiftsCache, Relay, RelayMetrics, UserCache } from "@/Cache"; import AsyncButton from "@/Components/Button/AsyncButton"; -import { Relay } from "@/system"; export function CacheSettings() { return ( @@ -92,7 +91,7 @@ function RelayCacheStats() {
- {}}> + { }}> { }); System.on("event", (_, ev) => { - addEventToFuzzySearch(ev); - socialGraphInstance.handleEvent(ev); -}); - -System.profileCache.on("change", keys => { - const changed = removeUndefined(keys.map(a => System.profileCache.getFromCache(a))); - changed.forEach(addCachedMetadataToFuzzySearch); + Relay.event(ev); + EventsCache.discover(ev); }); /** @@ -61,22 +54,6 @@ export async function fetchProfile(key: string) { } } -export const Relay = new WorkerRelayInterface(WorkerRelayPath); -export async function initRelayWorker() { - try { - if (await Relay.init()) { - if (await Relay.open()) { - await Relay.migrate(); - System.on("event", (_, ev) => { - Relay.event(ev); - }); - } - } - } catch (e) { - console.error(e); - } -} - /** * Singleton user profile loader */ diff --git a/packages/shared/src/feed-cache.ts b/packages/shared/src/feed-cache.ts index ffb0a356..69b595dc 100644 --- a/packages/shared/src/feed-cache.ts +++ b/packages/shared/src/feed-cache.ts @@ -22,9 +22,21 @@ export type CachedTable = { bulkGet(keys: Array): Promise>; set(obj: T): Promise; bulkSet(obj: Array | Readonly>): Promise; + + /** + * Try to update an entry where created values exists + * @param m Profile metadata + * @returns + */ update( m: TWithCreated, ): Promise<"new" | "refresh" | "updated" | "no_change">; + + /** + * Loads a list of rows from disk cache + * @param keys List of ids to load + * @returns Keys that do not exist on disk cache + */ buffer(keys: Array): Promise>; key(of: T): string; snapshot(): Array; @@ -151,11 +163,6 @@ export abstract class FeedCache extends EventEmitter imple ); } - /** - * Try to update an entry where created values exists - * @param m Profile metadata - * @returns - */ async update(m: TCachedWithCreated) { const k = this.key(m); const existing = this.getFromCache(k) as TCachedWithCreated; @@ -182,11 +189,6 @@ export abstract class FeedCache extends EventEmitter imple return updateType; } - /** - * Loads a list of rows from disk cache - * @param keys List of ids to load - * @returns Keys that do not exist on disk cache - */ async buffer(keys: Array): Promise> { const needsBuffer = keys.filter(a => !this.cache.has(a)); if (this.table && needsBuffer.length > 0) { diff --git a/packages/worker-relay/src/interface.ts b/packages/worker-relay/src/interface.ts index 3a8b4df1..63fe93d4 100644 --- a/packages/worker-relay/src/interface.ts +++ b/packages/worker-relay/src/interface.ts @@ -56,6 +56,10 @@ export class WorkerRelayInterface { return (await this.#workerRpc("dumpDb")).result; } + async sql(sql: string, params: Array) { + return (await this.#workerRpc>>("sql", { sql, params })).result; + } + #workerRpc(cmd: string, args?: T, timeout = 30_000) { const id = uuid(); const msg = { diff --git a/packages/worker-relay/src/relay.ts b/packages/worker-relay/src/relay.ts index b5381d51..0152967f 100644 --- a/packages/worker-relay/src/relay.ts +++ b/packages/worker-relay/src/relay.ts @@ -83,10 +83,18 @@ export class WorkerRelay extends EventEmitter { return eventInserted; } + /** + * Run any SQL command + */ + sql(sql: string, params: Array) { + return this.#db?.selectArrays(sql, params); + } + /** * Write multiple events */ eventBatch(evs: Array) { + const start = unixNowMs(); let eventsInserted: Array = []; this.#db?.transaction(db => { for (const ev of evs) { @@ -96,7 +104,7 @@ export class WorkerRelay extends EventEmitter { } }); if (eventsInserted.length > 0) { - this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}`); + this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}, ${(unixNowMs() - start).toLocaleString()}ms`); this.emit("event", eventsInserted); } return eventsInserted.length > 0; @@ -169,7 +177,7 @@ export class WorkerRelay extends EventEmitter { const res = this.#db?.selectArrays(sql, params); const results = res?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? []; const time = unixNowMs() - start; - //this.#log(`Query ${id} results took ${time.toLocaleString()}ms`); + this.#log(`Query ${id} results took ${time.toLocaleString()}ms`); return results; } diff --git a/packages/worker-relay/src/types.ts b/packages/worker-relay/src/types.ts index d33b5949..856f3566 100644 --- a/packages/worker-relay/src/types.ts +++ b/packages/worker-relay/src/types.ts @@ -1,6 +1,6 @@ export interface WorkerMessage { id: string; - cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb"; + cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb" | "sql"; args: T; } diff --git a/packages/worker-relay/src/worker.ts b/packages/worker-relay/src/worker.ts index 475528c0..1d6ab3cc 100644 --- a/packages/worker-relay/src/worker.ts +++ b/packages/worker-relay/src/worker.ts @@ -138,6 +138,17 @@ globalThis.onmessage = ev => { }); break; } + case "sql": { + barrierQueue(cmdQueue, async () => { + const req = msg.args as { + sql: string; + params: Array; + }; + const res = relay.sql(req.sql, req.params); + reply(msg.id, res); + }); + break; + } default: { reply(msg.id, { error: "Unknown command" }); break;