feat: use worker relay for events cache
continuous-integration/drone/push Build is running Details

This commit is contained in:
Kieran 2024-01-18 21:11:48 +00:00
parent c2f78dad1e
commit 32a6d56cf5
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
16 changed files with 172 additions and 330 deletions

View File

@ -0,0 +1,96 @@
import { CachedTable, CacheEvents } from "@snort/shared";
import { NostrEvent } from "@snort/system";
import { WorkerRelayInterface } from "@snort/worker-relay";
import EventEmitter from "eventemitter3";
export class EventCacheWorker extends EventEmitter<CacheEvents> implements CachedTable<NostrEvent> {
#relay: WorkerRelayInterface;
#keys = new Set<string>();
#cache = new Map<string, NostrEvent>();
constructor(relay: WorkerRelayInterface) {
super();
this.#relay = relay;
}
async preload() {
const ids = await this.#relay.sql("select id from events", []);
this.#keys = new Set<string>(ids.map(a => a[0] as string));
return Promise.resolve();
}
keysOnTable(): string[] {
return [...this.#keys];
}
getFromCache(key?: string | undefined): NostrEvent | undefined {
if (key) {
return this.#cache.get(key);
}
}
discover(ev: NostrEvent) {
this.#keys.add(this.key(ev));
}
async get(key?: string | undefined): Promise<NostrEvent | undefined> {
if (key) {
const res = await this.bulkGet([key]);
if (res.length > 0) {
return res[0];
}
}
}
async bulkGet(keys: string[]): Promise<NostrEvent[]> {
const results = await this.#relay.req({
id: "EventCacheWorker.bulkGet",
filters: [
{
ids: keys,
},
],
});
for (const ev of results.result) {
this.#cache.set(ev.id, ev);
}
return results.result;
}
async set(obj: NostrEvent): Promise<void> {
await this.#relay.event(obj);
this.#keys.add(obj.id);
}
async bulkSet(obj: NostrEvent[] | readonly NostrEvent[]): Promise<void> {
await Promise.all(
obj.map(async a => {
await this.#relay.event(a);
this.#keys.add(a.id);
}),
);
}
async update<TWithCreated extends NostrEvent & { created: number; loaded: number }>(
m: TWithCreated,
): Promise<"new" | "refresh" | "updated" | "no_change"> {
if (await this.#relay.event(m)) {
return "updated";
}
return "no_change";
}
async buffer(keys: string[]): Promise<string[]> {
const missing = keys.filter(a => !this.#keys.has(a));
const res = await this.bulkGet(missing);
return missing.filter(a => !res.some(b => this.key(b) === a));
}
key(of: NostrEvent): string {
return of.id;
}
snapshot(): NostrEvent[] {
return [...this.#cache.values()];
}
}

View File

@ -1,41 +0,0 @@
import { FeedCache } from "@snort/shared";
import { db, EventInteraction } from "@/Db";
import { LoginStore } from "@/Utils/Login";
export class EventInteractionCache extends FeedCache<EventInteraction> {
constructor() {
super("EventInteraction", db.eventInteraction);
}
key(of: EventInteraction): string {
return `${of.event}:${of.by}`;
}
override async preload(): Promise<void> {
await super.preload();
const data = window.localStorage.getItem("zap-cache");
if (data) {
const toImport = [...new Set<string>(JSON.parse(data) as Array<string>)].map(a => {
const ret = {
event: a,
by: LoginStore.takeSnapshot().publicKey,
zapped: true,
reacted: false,
reposted: false,
} as EventInteraction;
ret.id = this.key(ret);
return ret;
});
await this.bulkSet(toImport);
window.localStorage.removeItem("zap-cache");
}
await this.buffer([...this.onTable]);
}
takeSnapshot(): EventInteraction[] {
return [...this.cache.values()];
}
}

View File

@ -1,49 +0,0 @@
import { unixNowMs } from "@snort/shared";
import { EventKind, RequestBuilder, socialGraphInstance, TaggedNostrEvent } from "@snort/system";
import { db } from "@/Db";
import { LoginSession } from "@/Utils/Login";
import { RefreshFeedCache } from "./RefreshFeedCache";
export class FollowListCache extends RefreshFeedCache<TaggedNostrEvent> {
constructor() {
super("FollowListCache", db.followLists);
}
buildSub(session: LoginSession, rb: RequestBuilder): void {
const since = this.newest();
rb.withFilter()
.kinds([EventKind.ContactList])
.authors(session.follows.item)
.since(since === 0 ? undefined : since);
}
async onEvent(evs: readonly TaggedNostrEvent[]) {
await Promise.all(
evs.map(async e => {
const update = await super.update({
...e,
created: e.created_at,
loaded: unixNowMs(),
});
if (update !== "no_change") {
socialGraphInstance.handleEvent(e);
}
}),
);
}
key(of: TaggedNostrEvent): string {
return of.pubkey;
}
takeSnapshot() {
return [...this.cache.values()];
}
override async preload() {
await super.preload();
this.cache.forEach(e => socialGraphInstance.handleEvent(e));
}
}

View File

@ -1,136 +0,0 @@
import { unixNow, unixNowMs } from "@snort/shared";
import { EventKind, RequestBuilder, SystemInterface, TaggedNostrEvent } from "@snort/system";
import { db } from "@/Db";
import { Day, Hour } from "@/Utils/Const";
import { LoginSession } from "@/Utils/Login";
import { RefreshFeedCache, TWithCreated } from "./RefreshFeedCache";
const WindowSize = Hour * 6;
const MaxCacheWindow = Day * 7;
export class FollowsFeedCache extends RefreshFeedCache<TaggedNostrEvent> {
#kinds = [EventKind.TextNote, EventKind.Repost, EventKind.Polls];
#oldest?: number;
constructor() {
super("FollowsFeedCache", db.followsFeed);
}
key(of: TWithCreated<TaggedNostrEvent>): string {
return of.id;
}
takeSnapshot(): TWithCreated<TaggedNostrEvent>[] {
return [...this.cache.values()];
}
buildSub(session: LoginSession, rb: RequestBuilder): void {
const authors = [...session.follows.item];
if (session.publicKey) {
authors.push(session.publicKey);
}
const since = this.newest();
rb.withFilter()
.kinds(this.#kinds)
.authors(authors)
.since(since === 0 ? unixNow() - WindowSize : since);
}
async onEvent(evs: readonly TaggedNostrEvent[]): Promise<void> {
const filtered = evs.filter(a => this.#kinds.includes(a.kind));
if (filtered.length > 0) {
await this.bulkSet(filtered);
this.emit(
"change",
filtered.map(a => this.key(a)),
);
}
}
override async preload() {
const start = unixNowMs();
const keys = (await this.table?.toCollection().primaryKeys()) ?? [];
this.onTable = new Set<string>(keys.map(a => a as string));
// load only latest 50 posts, rest can be loaded on-demand
const latest = await this.table?.orderBy("created_at").reverse().limit(50).toArray();
latest?.forEach(v => this.cache.set(this.key(v), v));
// cleanup older than 7 days
await this.table
?.where("created_at")
.below(unixNow() - MaxCacheWindow)
.delete();
const oldest = await this.table?.orderBy("created_at").first();
this.#oldest = oldest?.created_at;
this.emit("change", latest?.map(a => this.key(a)) ?? []);
this.log(`Loaded %d/%d in %d ms`, latest?.length ?? 0, keys.length, (unixNowMs() - start).toLocaleString());
}
async loadMore(system: SystemInterface, session: LoginSession, before: number) {
if (this.#oldest && before <= this.#oldest) {
const rb = new RequestBuilder(`${this.name}-loadmore`);
const authors = [...session.follows.item];
if (session.publicKey) {
authors.push(session.publicKey);
}
rb.withFilter()
.kinds(this.#kinds)
.authors(authors)
.until(before)
.since(before - WindowSize);
await system.Fetch(rb, async evs => {
await this.bulkSet(evs);
});
} else {
const latest = await this.table
?.where("created_at")
.between(before - WindowSize, before)
.reverse()
.sortBy("created_at");
latest?.forEach(v => {
const k = this.key(v);
this.cache.set(k, v);
this.onTable.add(k);
});
this.emit("change", latest?.map(a => this.key(a)) ?? []);
}
}
/**
* Backfill cache with new follows
*/
async backFill(system: SystemInterface, keys: Array<string>) {
if (keys.length === 0) return;
const rb = new RequestBuilder(`${this.name}-backfill`);
rb.withFilter()
.kinds(this.#kinds)
.authors(keys)
.until(unixNow())
.since(this.#oldest ?? unixNow() - MaxCacheWindow);
await system.Fetch(rb, async evs => {
await this.bulkSet(evs);
});
}
/**
* Backfill cache based on follows list
*/
async backFillIfMissing(system: SystemInterface, keys: Array<string>) {
if (!this.#oldest) return;
const start = unixNowMs();
const everything = await this.table?.toArray();
if ((everything?.length ?? 0) > 0) {
const allKeys = new Set(everything?.map(a => a.pubkey));
const missingKeys = keys.filter(a => !allKeys.has(a));
await this.backFill(system, missingKeys);
this.log(`Backfilled %d keys in %d ms`, missingKeys.length, (unixNowMs() - start).toLocaleString());
}
}
}

View File

@ -1,50 +0,0 @@
import { unixNow } from "@snort/shared";
import { EventKind, NostrEvent, RequestBuilder, TaggedNostrEvent } from "@snort/system";
import { db, NostrEventForSession } from "@/Db";
import { Day } from "@/Utils/Const";
import { LoginSession } from "@/Utils/Login";
import { RefreshFeedCache, TWithCreated } from "./RefreshFeedCache";
export class NotificationsCache extends RefreshFeedCache<NostrEventForSession> {
#kinds = [EventKind.TextNote, EventKind.Reaction, EventKind.Repost, EventKind.ZapReceipt];
constructor() {
super("notifications", db.notifications);
}
buildSub(session: LoginSession, rb: RequestBuilder) {
if (session.publicKey) {
const newest = this.newest(v => v.tags.some(a => a[0] === "p" && a[1] === session.publicKey));
rb.withFilter()
.kinds(this.#kinds)
.tag("p", [session.publicKey])
.since(newest === 0 ? unixNow() - Day * 30 : newest);
}
}
async onEvent(evs: readonly TaggedNostrEvent[], pubKey: string) {
const filtered = evs.filter(a => this.#kinds.includes(a.kind) && a.tags.some(b => b[0] === "p"));
if (filtered.length > 0) {
await this.bulkSet(
filtered.map(v => ({
...v,
forSession: pubKey,
})),
);
this.emit(
"change",
filtered.map(v => this.key(v)),
);
}
}
key(of: TWithCreated<NostrEvent>): string {
return of.id;
}
takeSnapshot() {
return [...this.cache.values()];
}
}

View File

@ -1,13 +1,30 @@
import { RelayMetricCache, UserProfileCache, UserRelaysCache } from "@snort/system";
import { SnortSystemDb } from "@snort/system-web";
import { WorkerRelayInterface } from "@snort/worker-relay";
import WorkerRelayPath from "@snort/worker-relay/dist/worker?worker&url";
import { ChatCache } from "./ChatCache";
import { EventCacheWorker } from "./EventCacheWorker";
import { GiftWrapCache } from "./GiftWrapCache";
export const Relay = new WorkerRelayInterface(WorkerRelayPath);
export async function initRelayWorker() {
try {
if (await Relay.init()) {
if (await Relay.open()) {
await Relay.migrate();
}
}
} catch (e) {
console.error(e);
}
}
export const SystemDb = new SnortSystemDb();
export const UserCache = new UserProfileCache(SystemDb.users);
export const UserRelays = new UserRelaysCache(SystemDb.userRelays);
export const RelayMetrics = new RelayMetricCache(SystemDb.relayMetrics);
export const EventsCache = new EventCacheWorker(Relay);
export const Chats = new ChatCache();
export const GiftsCache = new GiftWrapCache();
@ -19,6 +36,7 @@ export async function preload(follows?: Array<string>) {
RelayMetrics.preload(),
GiftsCache.preload(),
UserRelays.preload(follows),
EventsCache.preload(),
];
await Promise.all(preloads);
}

View File

@ -3,7 +3,6 @@ import {
NostrLink,
parseRelayTags,
RequestBuilder,
socialGraphInstance,
TaggedNostrEvent,
} from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";

View File

@ -3,8 +3,8 @@ import { EventKind, NostrEvent, NostrLink, ReqFilter, RequestBuilder, TaggedNost
import { SnortContext, useRequestBuilder } from "@snort/system-react";
import { useContext, useEffect, useMemo, useState } from "react";
import { Relay } from "@/Cache";
import useLogin from "@/Hooks/useLogin";
import { Relay } from "@/system";
import { Day } from "@/Utils/Const";
export function useWorkerRelayView(id: string, filters: Array<ReqFilter>, leaveOpen?: boolean, maxWindow?: number) {
@ -20,6 +20,7 @@ export function useWorkerRelayView(id: string, filters: Array<ReqFilter>, leaveO
}
}, [rb, system]);
useEffect(() => {
setRb(undefined);
Relay.req({
id: `${id}+latest`,
filters: filters.map(f => ({
@ -32,12 +33,15 @@ export function useWorkerRelayView(id: string, filters: Array<ReqFilter>, leaveO
const rb = new RequestBuilder(id);
rb.withOptions({ fillStore: false });
filters
.map((f, i) => ({
...f,
limit: undefined,
until: undefined,
since: latest.result?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined),
}))
.map((f, i) => {
const since = latest.result?.at(i)?.created_at;
return {
...f,
limit: undefined,
until: undefined,
since: since ? since + 1 : maxWindow ? unixNow() - maxWindow : f.since,
};
})
.forEach(f => rb.withBareFilter(f));
setRb(rb);
});

View File

@ -2,9 +2,8 @@ import { FeedCache } from "@snort/shared";
import { ReactNode, useEffect, useState, useSyncExternalStore } from "react";
import { FormattedMessage, FormattedNumber } from "react-intl";
import { Chats, GiftsCache, RelayMetrics, UserCache } from "@/Cache";
import { Chats, GiftsCache, Relay, RelayMetrics, UserCache } from "@/Cache";
import AsyncButton from "@/Components/Button/AsyncButton";
import { Relay } from "@/system";
export function CacheSettings() {
return (
@ -92,7 +91,7 @@ function RelayCacheStats() {
</table>
</div>
<div className="flex flex-col gap-2">
<AsyncButton onClick={() => {}}>
<AsyncButton onClick={() => { }}>
<FormattedMessage defaultMessage="Clear" id="/GCoTA" />
</AsyncButton>
<AsyncButton

View File

@ -8,7 +8,7 @@ import { StrictMode } from "react";
import * as ReactDOM from "react-dom/client";
import { createBrowserRouter, RouteObject, RouterProvider } from "react-router-dom";
import { preload } from "@/Cache";
import { initRelayWorker, preload } from "@/Cache";
import { ThreadRoute } from "@/Components/Event/Thread";
import { IntlProvider } from "@/Components/IntlProvider/IntlProvider";
import { db } from "@/Db";
@ -34,7 +34,7 @@ import SearchPage from "@/Pages/SearchPage";
import SettingsRoutes from "@/Pages/settings/Routes";
import { SubscribeRoutes } from "@/Pages/subscribe";
import ZapPoolPage from "@/Pages/ZapPool";
import { initRelayWorker, System } from "@/system";
import { System } from "@/system";
import { storeRefCode, unwrap } from "@/Utils";
import { LoginStore } from "@/Utils/Login";
import { hasWasm, wasmInit, WasmPath } from "@/Utils/wasm";

View File

@ -1,10 +1,7 @@
import { removeUndefined, throwIfOffline } from "@snort/shared";
import { mapEventToProfile, NostrEvent, NostrSystem, ProfileLoaderService, socialGraphInstance } from "@snort/system";
import { WorkerRelayInterface } from "@snort/worker-relay";
import WorkerRelayPath from "@snort/worker-relay/dist/worker?worker&url";
import { mapEventToProfile, NostrEvent, NostrSystem, ProfileLoaderService } from "@snort/system";
import { RelayMetrics, SystemDb, UserCache, UserRelays } from "@/Cache";
import { addCachedMetadataToFuzzySearch, addEventToFuzzySearch } from "@/Db/FuzzySearch";
import { EventsCache, Relay, RelayMetrics, SystemDb, UserCache, UserRelays } from "@/Cache";
import { LoginStore } from "@/Utils/Login";
import { hasWasm, WasmOptimizer } from "@/Utils/wasm";
@ -13,6 +10,7 @@ import { hasWasm, WasmOptimizer } from "@/Utils/wasm";
*/
export const System = new NostrSystem({
relayCache: UserRelays,
eventsCache: EventsCache,
profileCache: UserCache,
relayMetrics: RelayMetrics,
optimizer: hasWasm ? WasmOptimizer : undefined,
@ -28,13 +26,8 @@ System.on("auth", async (c, r, cb) => {
});
System.on("event", (_, ev) => {
addEventToFuzzySearch(ev);
socialGraphInstance.handleEvent(ev);
});
System.profileCache.on("change", keys => {
const changed = removeUndefined(keys.map(a => System.profileCache.getFromCache(a)));
changed.forEach(addCachedMetadataToFuzzySearch);
Relay.event(ev);
EventsCache.discover(ev);
});
/**
@ -61,22 +54,6 @@ export async function fetchProfile(key: string) {
}
}
export const Relay = new WorkerRelayInterface(WorkerRelayPath);
export async function initRelayWorker() {
try {
if (await Relay.init()) {
if (await Relay.open()) {
await Relay.migrate();
System.on("event", (_, ev) => {
Relay.event(ev);
});
}
}
} catch (e) {
console.error(e);
}
}
/**
* Singleton user profile loader
*/

View File

@ -22,9 +22,21 @@ export type CachedTable<T> = {
bulkGet(keys: Array<string>): Promise<Array<T>>;
set(obj: T): Promise<void>;
bulkSet(obj: Array<T> | Readonly<Array<T>>): Promise<void>;
/**
* Try to update an entry where created values exists
* @param m Profile metadata
* @returns
*/
update<TWithCreated extends T & { created: number; loaded: number }>(
m: TWithCreated,
): Promise<"new" | "refresh" | "updated" | "no_change">;
/**
* Loads a list of rows from disk cache
* @param keys List of ids to load
* @returns Keys that do not exist on disk cache
*/
buffer(keys: Array<string>): Promise<Array<string>>;
key(of: T): string;
snapshot(): Array<T>;
@ -151,11 +163,6 @@ export abstract class FeedCache<TCached> extends EventEmitter<CacheEvents> imple
);
}
/**
* Try to update an entry where created values exists
* @param m Profile metadata
* @returns
*/
async update<TCachedWithCreated extends TCached & { created: number; loaded: number }>(m: TCachedWithCreated) {
const k = this.key(m);
const existing = this.getFromCache(k) as TCachedWithCreated;
@ -182,11 +189,6 @@ export abstract class FeedCache<TCached> extends EventEmitter<CacheEvents> imple
return updateType;
}
/**
* Loads a list of rows from disk cache
* @param keys List of ids to load
* @returns Keys that do not exist on disk cache
*/
async buffer(keys: Array<string>): Promise<Array<string>> {
const needsBuffer = keys.filter(a => !this.cache.has(a));
if (this.table && needsBuffer.length > 0) {

View File

@ -56,6 +56,10 @@ export class WorkerRelayInterface {
return (await this.#workerRpc<void, Uint8Array>("dumpDb")).result;
}
async sql(sql: string, params: Array<string | number>) {
return (await this.#workerRpc<object, Array<Array<any>>>("sql", { sql, params })).result;
}
#workerRpc<T, R>(cmd: string, args?: T, timeout = 30_000) {
const id = uuid();
const msg = {

View File

@ -83,10 +83,18 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
return eventInserted;
}
/**
* Run any SQL command
*/
sql(sql: string, params: Array<any>) {
return this.#db?.selectArrays(sql, params);
}
/**
* Write multiple events
*/
eventBatch(evs: Array<NostrEvent>) {
const start = unixNowMs();
let eventsInserted: Array<NostrEvent> = [];
this.#db?.transaction(db => {
for (const ev of evs) {
@ -96,7 +104,7 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
}
});
if (eventsInserted.length > 0) {
this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}`);
this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}, ${(unixNowMs() - start).toLocaleString()}ms`);
this.emit("event", eventsInserted);
}
return eventsInserted.length > 0;
@ -169,7 +177,7 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
const res = this.#db?.selectArrays(sql, params);
const results = res?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? [];
const time = unixNowMs() - start;
//this.#log(`Query ${id} results took ${time.toLocaleString()}ms`);
this.#log(`Query ${id} results took ${time.toLocaleString()}ms`);
return results;
}

View File

@ -1,6 +1,6 @@
export interface WorkerMessage<T> {
id: string;
cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb";
cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb" | "sql";
args: T;
}

View File

@ -138,6 +138,17 @@ globalThis.onmessage = ev => {
});
break;
}
case "sql": {
barrierQueue(cmdQueue, async () => {
const req = msg.args as {
sql: string;
params: Array<any>;
};
const res = relay.sql(req.sql, req.params);
reply(msg.id, res);
});
break;
}
default: {
reply(msg.id, { error: "Unknown command" });
break;