From 7558e91d28896d81ad3f9f54486e57e301f31471 Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 22 Feb 2024 11:12:26 +0000 Subject: [PATCH] feat: automate social graph --- packages/app/src/Cache/UserFollowsWorker.ts | 117 ++++++++++ packages/app/src/Cache/index.ts | 3 + packages/app/src/index.tsx | 20 +- packages/app/src/system.ts | 15 +- packages/system-web/package.json | 6 +- packages/system-web/src/index.ts | 6 +- packages/system/package.json | 2 +- .../system/src/SocialGraph/SocialGraph.ts | 57 ++--- packages/system/src/cache/index.ts | 10 +- .../system/src/cache/user-follows-lists.ts | 29 +++ packages/system/src/index.ts | 8 +- packages/system/src/nostr-system.ts | 70 +++++- packages/system/src/note-collection.ts | 6 +- packages/system/src/worker/index.ts | 15 -- .../system/src/worker/system-worker-script.ts | 49 ----- packages/system/src/worker/system-worker.ts | 208 ------------------ 16 files changed, 285 insertions(+), 336 deletions(-) create mode 100644 packages/app/src/Cache/UserFollowsWorker.ts create mode 100644 packages/system/src/cache/user-follows-lists.ts delete mode 100644 packages/system/src/worker/index.ts delete mode 100644 packages/system/src/worker/system-worker-script.ts delete mode 100644 packages/system/src/worker/system-worker.ts diff --git a/packages/app/src/Cache/UserFollowsWorker.ts b/packages/app/src/Cache/UserFollowsWorker.ts new file mode 100644 index 00000000..4b8483d9 --- /dev/null +++ b/packages/app/src/Cache/UserFollowsWorker.ts @@ -0,0 +1,117 @@ +import { CachedTable, CacheEvents, removeUndefined, unixNowMs, unwrap } from "@snort/shared"; +import { EventKind, NostrEvent, UsersFollows } from "@snort/system"; +import { WorkerRelayInterface } from "@snort/worker-relay"; +import debug from "debug"; +import EventEmitter from "eventemitter3"; + +export class UserFollowsWorker extends EventEmitter implements CachedTable { + #relay: WorkerRelayInterface; + #keys = new Set(); + #cache = new Map(); + #log = debug("UserFollowsWorker"); + + constructor(relay: WorkerRelayInterface) { + super(); + this.#relay = relay; + } + + async preload() { + const start = unixNowMs(); + const profiles = await this.#relay.query([ + "REQ", + "profiles-preload", + { + kinds: [3], + }, + ]); + this.#cache = new Map(profiles.map(a => [a.pubkey, unwrap(mapEventToUserFollows(a))])); + this.#keys = new Set(this.#cache.keys()); + this.#log(`Loaded %d/%d in %d ms`, this.#cache.size, this.#keys.size, (unixNowMs() - start).toLocaleString()); + } + + keysOnTable(): string[] { + return [...this.#keys]; + } + + getFromCache(key?: string | undefined): UsersFollows | undefined { + if (key) { + return this.#cache.get(key); + } + } + + discover(ev: NostrEvent) { + this.#keys.add(ev.pubkey); + } + + async get(key?: string | undefined): Promise { + if (key) { + const res = await this.bulkGet([key]); + if (res.length > 0) { + return res[0]; + } + } + } + + async bulkGet(keys: string[]) { + if (keys.length === 0) return []; + + const results = await this.#relay.query([ + "REQ", + "UserFollowsWorker.bulkGet", + { + authors: keys, + kinds: [3], + }, + ]); + const mapped = removeUndefined(results.map(a => mapEventToUserFollows(a))); + for (const pf of mapped) { + this.#cache.set(this.key(pf), pf); + } + this.emit( + "change", + mapped.map(a => this.key(a)), + ); + return mapped; + } + + async set(obj: UsersFollows) { + this.#keys.add(this.key(obj)); + } + + async bulkSet(obj: UsersFollows[] | readonly UsersFollows[]) { + const mapped = obj.map(a => this.key(a)); + mapped.forEach(a => this.#keys.add(a)); + // todo: store in cache + this.emit("change", mapped); + } + + async update(): Promise<"new" | "refresh" | "updated" | "no_change"> { + // do nothing + return "refresh"; + } + + async buffer(keys: string[]): Promise { + const missing = keys.filter(a => !this.#keys.has(a)); + const res = await this.bulkGet(missing); + return missing.filter(a => !res.some(b => this.key(b) === a)); + } + + key(of: UsersFollows): string { + return of.pubkey; + } + + snapshot(): UsersFollows[] { + return [...this.#cache.values()]; + } +} + +export function mapEventToUserFollows(ev: NostrEvent): UsersFollows | undefined { + if (ev.kind !== EventKind.ContactList) return; + + return { + pubkey: ev.pubkey, + loaded: unixNowMs(), + created: ev.created_at, + follows: ev.tags, + }; +} diff --git a/packages/app/src/Cache/index.ts b/packages/app/src/Cache/index.ts index 5bafffe3..1a671eb9 100644 --- a/packages/app/src/Cache/index.ts +++ b/packages/app/src/Cache/index.ts @@ -6,6 +6,7 @@ import WorkerRelayPath from "@snort/worker-relay/dist/worker?worker&url"; import { EventCacheWorker } from "./EventCacheWorker"; import { GiftWrapCache } from "./GiftWrapCache"; import { ProfileCacheRelayWorker } from "./ProfileWorkerCache"; +import { UserFollowsWorker } from "./UserFollowsWorker"; export const Relay = new WorkerRelayInterface(WorkerRelayPath); export async function initRelayWorker() { @@ -20,6 +21,7 @@ export const SystemDb = new SnortSystemDb(); export const UserRelays = new UserRelaysCache(SystemDb.userRelays); export const RelayMetrics = new RelayMetricCache(SystemDb.relayMetrics); +export const UserFollows = new UserFollowsWorker(Relay); export const UserCache = new ProfileCacheRelayWorker(Relay); export const EventsCache = new EventCacheWorker(Relay); @@ -32,6 +34,7 @@ export async function preload(follows?: Array) { GiftsCache.preload(), UserRelays.preload(follows), EventsCache.preload(), + UserFollows.preload(), ]; await Promise.all(preloads); } diff --git a/packages/app/src/index.tsx b/packages/app/src/index.tsx index 183244b7..91adc74e 100644 --- a/packages/app/src/index.tsx +++ b/packages/app/src/index.tsx @@ -2,13 +2,12 @@ import "./index.css"; import "@szhsin/react-menu/dist/index.css"; import "@/assets/fonts/inter.css"; -import { socialGraphInstance } from "@snort/system"; import { SnortContext } from "@snort/system-react"; import { StrictMode } from "react"; import * as ReactDOM from "react-dom/client"; import { createBrowserRouter, RouteObject, RouterProvider } from "react-router-dom"; -import { initRelayWorker, preload, Relay, UserCache } from "@/Cache"; +import { initRelayWorker, preload, UserCache } from "@/Cache"; import { ThreadRoute } from "@/Components/Event/Thread"; import { IntlProvider } from "@/Components/IntlProvider/IntlProvider"; import { db } from "@/Db"; @@ -55,25 +54,10 @@ async function initSite() { updateRelayConnections(System, login.relays.item).catch(console.error); setupWebLNWalletConfig(Wallets); - Relay.query([ - "REQ", - "preload-social-graph", - { - kinds: [3], - }, - ]).then(res => { - for (const ev of res) { - try { - socialGraphInstance.handleEvent(ev); - } catch (e) { - console.error("Failed to handle contact list event from sql db", e); - } - } - }); - db.ready = await db.isAvailable(); if (db.ready) { await preload(login.follows.item); + await System.PreloadSocialGraph(); } queueMicrotask(() => { diff --git a/packages/app/src/system.ts b/packages/app/src/system.ts index 9a033e0a..05e7f8fe 100644 --- a/packages/app/src/system.ts +++ b/packages/app/src/system.ts @@ -1,7 +1,7 @@ import { removeUndefined, throwIfOffline } from "@snort/shared"; -import { mapEventToProfile, NostrEvent, NostrSystem, socialGraphInstance } from "@snort/system"; +import { mapEventToProfile, NostrEvent, NostrSystem } from "@snort/system"; -import { EventsCache, Relay, RelayMetrics, SystemDb, UserCache, UserRelays } from "@/Cache"; +import { EventsCache, Relay, RelayMetrics, SystemDb, UserCache, UserFollows, UserRelays } from "@/Cache"; import { addEventToFuzzySearch } from "@/Db/FuzzySearch"; import { LoginStore } from "@/Utils/Login"; import { hasWasm, WasmOptimizer } from "@/Utils/wasm"; @@ -10,13 +10,15 @@ import { hasWasm, WasmOptimizer } from "@/Utils/wasm"; * Singleton nostr system */ export const System = new NostrSystem({ - relayCache: UserRelays, - eventsCache: EventsCache, - profileCache: UserCache, + relays: UserRelays, + events: EventsCache, + profiles: UserCache, relayMetrics: RelayMetrics, - cacheRelay: Relay, + cachingRelay: Relay, + contactLists: UserFollows, optimizer: hasWasm ? WasmOptimizer : undefined, db: SystemDb, + buildFollowGraph: true, }); System.on("auth", async (c, r, cb) => { @@ -31,7 +33,6 @@ System.on("event", (_, ev) => { Relay.event(ev); EventsCache.discover(ev); UserCache.discover(ev); - socialGraphInstance.handleEvent(ev); addEventToFuzzySearch(ev); }); diff --git a/packages/system-web/package.json b/packages/system-web/package.json index 3bec58e0..69703f8d 100644 --- a/packages/system-web/package.json +++ b/packages/system-web/package.json @@ -1,6 +1,6 @@ { "name": "@snort/system-web", - "version": "1.0.4", + "version": "1.2.10", "description": "Web based components @snort/system", "type": "module", "main": "dist/index.js", @@ -16,8 +16,8 @@ "dist" ], "dependencies": { - "@snort/shared": "^1.0.11", - "@snort/system": "^1.2.0", + "@snort/shared": "^1.0.13", + "@snort/system": "^1.2.10", "dexie": "^3.2.4" }, "devDependencies": { diff --git a/packages/system-web/src/index.ts b/packages/system-web/src/index.ts index 9e53455a..70632c22 100644 --- a/packages/system-web/src/index.ts +++ b/packages/system-web/src/index.ts @@ -1,13 +1,14 @@ -import { NostrEvent, CachedMetadata, RelayMetrics, UsersRelays } from "@snort/system"; +import { NostrEvent, CachedMetadata, RelayMetrics, UsersRelays, UsersFollows } from "@snort/system"; import Dexie, { Table } from "dexie"; const NAME = "snort-system"; -const VERSION = 2; +const VERSION = 3; const STORES = { users: "++pubkey, name, display_name, picture, nip05, npub", relayMetrics: "++addr", userRelays: "++pubkey", + contacts: "++pubkey", events: "++id, pubkey, created_at", }; @@ -17,6 +18,7 @@ export class SnortSystemDb extends Dexie { relayMetrics!: Table; userRelays!: Table; events!: Table; + contacts!: Table; constructor() { super(NAME); diff --git a/packages/system/package.json b/packages/system/package.json index 9e475c65..53dc4f1e 100644 --- a/packages/system/package.json +++ b/packages/system/package.json @@ -1,6 +1,6 @@ { "name": "@snort/system", - "version": "1.2.9", + "version": "1.2.10", "description": "Snort nostr system package", "type": "module", "main": "dist/index.js", diff --git a/packages/system/src/SocialGraph/SocialGraph.ts b/packages/system/src/SocialGraph/SocialGraph.ts index 767ad2be..cd505fd0 100644 --- a/packages/system/src/SocialGraph/SocialGraph.ts +++ b/packages/system/src/SocialGraph/SocialGraph.ts @@ -47,44 +47,47 @@ export default class SocialGraph { } } - handleEvent(event: NostrEvent) { - if (event.kind !== 3) { + handleEvent(evs: NostrEvent | Array) { + const filtered = (Array.isArray(evs) ? evs : [evs]).filter(a => a.kind === 3); + if (filtered.length === 0) { return; } queueMicrotask(() => { try { - const author = ID(event.pubkey); - const timestamp = event.created_at; - const existingTimestamp = this.latestFollowEventTimestamps.get(author); - if (existingTimestamp && timestamp <= existingTimestamp) { - return; - } - this.latestFollowEventTimestamps.set(author, timestamp); + for (const event of filtered) { + const author = ID(event.pubkey); + const timestamp = event.created_at; + const existingTimestamp = this.latestFollowEventTimestamps.get(author); + if (existingTimestamp && timestamp <= existingTimestamp) { + return; + } + this.latestFollowEventTimestamps.set(author, timestamp); - // Collect all users followed in the new event. - const followedInEvent = new Set(); - for (const tag of event.tags) { - if (tag[0] === "p") { - const followedUser = ID(tag[1]); - if (followedUser !== author) { - followedInEvent.add(followedUser); + // Collect all users followed in the new event. + const followedInEvent = new Set(); + for (const tag of event.tags) { + if (tag[0] === "p") { + const followedUser = ID(tag[1]); + if (followedUser !== author) { + followedInEvent.add(followedUser); + } } } - } - // Get the set of users currently followed by the author. - const currentlyFollowed = this.followedByUser.get(author) || new Set(); + // Get the set of users currently followed by the author. + const currentlyFollowed = this.followedByUser.get(author) || new Set(); - // Find users that need to be removed. - for (const user of currentlyFollowed) { - if (!followedInEvent.has(user)) { - this.removeFollower(user, author); + // Find users that need to be removed. + for (const user of currentlyFollowed) { + if (!followedInEvent.has(user)) { + this.removeFollower(user, author); + } } - } - // Add or update the followers based on the new event. - for (const user of followedInEvent) { - this.addFollower(user, author); + // Add or update the followers based on the new event. + for (const user of followedInEvent) { + this.addFollower(user, author); + } } } catch (e) { // might not be logged in or sth diff --git a/packages/system/src/cache/index.ts b/packages/system/src/cache/index.ts index bee1e923..3fbd3c1b 100644 --- a/packages/system/src/cache/index.ts +++ b/packages/system/src/cache/index.ts @@ -44,9 +44,16 @@ export interface RelayMetrics { export interface UsersRelays { pubkey: string; - relays: FullRelaySettings[]; created: number; loaded: number; + relays: FullRelaySettings[]; +} + +export interface UsersFollows { + pubkey: string; + created: number; + loaded: number; + follows: Array>; } export function mapEventToProfile(ev: NostrEvent) { @@ -78,6 +85,7 @@ export interface SnortSystemDb { relayMetrics: DexieTableLike; userRelays: DexieTableLike; events: DexieTableLike; + contacts: DexieTableLike; isAvailable(): Promise; } diff --git a/packages/system/src/cache/user-follows-lists.ts b/packages/system/src/cache/user-follows-lists.ts new file mode 100644 index 00000000..7f61e352 --- /dev/null +++ b/packages/system/src/cache/user-follows-lists.ts @@ -0,0 +1,29 @@ +import { UsersFollows } from "."; +import { DexieTableLike, FeedCache } from "@snort/shared"; + +export class UserFollowsCache extends FeedCache { + constructor(table?: DexieTableLike) { + super("UserFollowsCache", table); + } + + key(of: UsersFollows): string { + return of.pubkey; + } + + override async preload(follows?: Array): Promise { + await super.preload(); + if (follows) { + await this.buffer(follows); + } + } + + newest(): number { + let ret = 0; + this.cache.forEach(v => (ret = v.created > ret ? v.created : ret)); + return ret; + } + + takeSnapshot(): Array { + return [...this.cache.values()]; + } +} diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index 91dae407..7e49ffe8 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -12,6 +12,7 @@ import EventEmitter from "eventemitter3"; import { QueryEvents } from "./query"; import { CacheRelay } from "./cache-relay"; import { RequestRouter } from "./request-router"; +import { UsersFollows } from "./cache/index"; export { NostrSystem } from "./nostr-system"; export { default as EventKind } from "./event-kind"; @@ -48,8 +49,6 @@ export * from "./cache/user-relays"; export * from "./cache/user-metadata"; export * from "./cache/relay-metric"; -export * from "./worker/system-worker"; - export type QueryLike = { get progress(): number; feed: { @@ -143,6 +142,11 @@ export interface SystemInterface { */ get eventsCache(): CachedTable; + /** + * ContactList cache + */ + get userFollowsCache(): CachedTable; + /** * Relay loader loads relay metadata for a set of profiles */ diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 3d0fc803..2c8ded85 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -1,7 +1,7 @@ import debug from "debug"; import EventEmitter from "eventemitter3"; -import { CachedTable } from "@snort/shared"; +import { CachedTable, isHex, unixNowMs } from "@snort/shared"; import { NostrEvent, TaggedNostrEvent, OkResponse } from "./nostr"; import { Connection, RelaySettings } from "./connection"; import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; @@ -19,6 +19,10 @@ import { SnortSystemDb, QueryLike, OutboxModel, + socialGraphInstance, + EventKind, + UsersFollows, + ID, } from "."; import { EventsCache } from "./cache/events"; import { RelayMetadataLoader } from "./outbox"; @@ -26,7 +30,8 @@ import { Optimizer, DefaultOptimizer } from "./query-optimizer"; import { ConnectionPool, DefaultConnectionPool } from "./connection-pool"; import { QueryManager } from "./query-manager"; import { CacheRelay } from "./cache-relay"; -import { RequestRouter } from "request-router"; +import { RequestRouter } from "./request-router"; +import { UserFollowsCache } from "./cache/user-follows-lists"; export interface NostrSystemEvents { change: (state: SystemSnapshot) => void; @@ -56,6 +61,11 @@ export interface SystemConfig { */ events: CachedTable; + /** + * Cache of user ContactLists (kind 3) + */ + contactLists: CachedTable; + /** * Optimized cache relay, usually `@snort/worker-relay` */ @@ -83,6 +93,14 @@ export interface SystemConfig { * 2. Write to inbox for all `p` tagged users in broadcasting events */ automaticOutboxModel: boolean; + + /** + * Automatically populate SocialGraph from kind 3 events fetched. + * + * This is basically free because we always load relays (which includes kind 3 contact lists) + * for users when fetching by author. + */ + buildFollowGraph: boolean; } /** @@ -125,6 +143,10 @@ export class NostrSystem extends EventEmitter implements Syst return this.#config.events; } + get userFollowsCache(): CachedTable { + return this.#config.contactLists; + } + get cacheRelay(): CacheRelay | undefined { return this.#config.cachingRelay; } @@ -153,11 +175,13 @@ export class NostrSystem extends EventEmitter implements Syst profiles: props.profiles ?? new UserProfileCache(props.db?.users), relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics), events: props.events ?? new EventsCache(props.db?.events), + contactLists: props.contactLists ?? new UserFollowsCache(props.db?.contacts), optimizer: props.optimizer ?? DefaultOptimizer, checkSigs: props.checkSigs ?? false, cachingRelay: props.cachingRelay, db: props.db, automaticOutboxModel: props.automaticOutboxModel ?? true, + buildFollowGraph: props.buildFollowGraph ?? false, }; this.profileLoader = new ProfileLoaderService(this, this.profileCache); @@ -169,6 +193,32 @@ export class NostrSystem extends EventEmitter implements Syst this.requestRouter = OutboxModel.fromSystem(this); } + // Hook on-event when building follow graph + if (this.#config.buildFollowGraph) { + let evBuf: Array = []; + let t: ReturnType | undefined; + this.on("event", (_, ev) => { + if (ev.kind === EventKind.ContactList) { + // fire&forget update + this.userFollowsCache.update({ + loaded: unixNowMs(), + created: ev.created_at, + pubkey: ev.pubkey, + follows: ev.tags, + }); + + // buffer social graph updates into 500ms window + evBuf.push(ev); + if (!t) { + t = setTimeout(() => { + socialGraphInstance.handleEvent(evBuf); + evBuf = []; + }, 500); + } + } + }); + } + this.pool = new DefaultConnectionPool(this); this.#queryManager = new QueryManager(this); @@ -225,8 +275,24 @@ export class NostrSystem extends EventEmitter implements Syst this.profileCache.preload(), this.relayMetricsCache.preload(), this.eventsCache.preload(), + this.userFollowsCache.preload(), ]; await Promise.all(t); + await this.PreloadSocialGraph(); + } + + async PreloadSocialGraph() { + // Insert data to socialGraph from cache + if (this.#config.buildFollowGraph) { + for (const list of this.userFollowsCache.snapshot()) { + const user = ID(list.pubkey); + for (const fx of list.follows) { + if (fx[0] === "p" && fx[1].length === 64) { + socialGraphInstance.addFollower(ID(fx[1]), user); + } + } + } + } } async ConnectToRelay(address: string, options: RelaySettings) { diff --git a/packages/system/src/note-collection.ts b/packages/system/src/note-collection.ts index 5c31e2e9..00c32347 100644 --- a/packages/system/src/note-collection.ts +++ b/packages/system/src/note-collection.ts @@ -69,8 +69,12 @@ export class KeyedReplaceableNoteStore extends HookedNoteStore { const changes: Array = []; ev.forEach(a => { const keyOnEvent = this.#keyFn(a); - const existingCreated = this.#events.get(keyOnEvent)?.created_at ?? 0; + const existing = this.#events.get(keyOnEvent); + const existingCreated = existing?.created_at ?? 0; if (a.created_at > existingCreated) { + if (existing) { + a.relays.push(...existing.relays); + } this.#events.set(keyOnEvent, a); changes.push(a); } diff --git a/packages/system/src/worker/index.ts b/packages/system/src/worker/index.ts deleted file mode 100644 index 075aecb9..00000000 --- a/packages/system/src/worker/index.ts +++ /dev/null @@ -1,15 +0,0 @@ -export const enum WorkerCommand { - OkResponse, - ErrorResponse, - Init, - ConnectRelay, - DisconnectRelay, - Query, - QueryResult, -} - -export interface WorkerMessage { - id: string; - type: WorkerCommand; - data: T; -} diff --git a/packages/system/src/worker/system-worker-script.ts b/packages/system/src/worker/system-worker-script.ts deleted file mode 100644 index 6ed6ce87..00000000 --- a/packages/system/src/worker/system-worker-script.ts +++ /dev/null @@ -1,49 +0,0 @@ -/// - -import { NostrSystem } from "../nostr-system"; -import { WorkerMessage, WorkerCommand } from "."; - -const system = new NostrSystem({ - checkSigs: true, -}); - -function reply(id: string, type: WorkerCommand, data: T) { - globalThis.postMessage({ - id, - type, - data, - } as WorkerMessage); -} -function okReply(id: string, message?: string) { - reply(id, WorkerCommand.OkResponse, message); -} -function errorReply(id: string, message: string) { - reply(id, WorkerCommand.ErrorResponse, message); -} -globalThis.onmessage = async ev => { - console.debug(ev); - const data = ev.data as { id: string; type: WorkerCommand }; - try { - switch (data.type) { - case WorkerCommand.Init: { - await system.Init(); - okReply(data.id); - break; - } - case WorkerCommand.ConnectRelay: { - const cmd = ev.data as WorkerMessage<[string, { read: boolean; write: boolean }]>; - await system.ConnectToRelay(cmd.data[0], cmd.data[1]); - okReply(data.id, "Connected"); - break; - } - default: { - errorReply(data.id, "Unknown command"); - break; - } - } - } catch (e) { - if (e instanceof Error) { - errorReply(data.id, e.message); - } - } -}; diff --git a/packages/system/src/worker/system-worker.ts b/packages/system/src/worker/system-worker.ts deleted file mode 100644 index 8fb0dc8d..00000000 --- a/packages/system/src/worker/system-worker.ts +++ /dev/null @@ -1,208 +0,0 @@ -import { v4 as uuid } from "uuid"; -import EventEmitter from "eventemitter3"; -import { - NostrEvent, - OkResponse, - ProfileLoaderService, - RelaySettings, - RequestBuilder, - SystemInterface, - TaggedNostrEvent, - CachedMetadata, - RelayMetadataLoader, - RelayMetricCache, - RelayMetrics, - UserProfileCache, - UserRelaysCache, - UsersRelays, - QueryLike, - Optimizer, - DefaultOptimizer, -} from ".."; -import { NostrSystemEvents, SystemConfig } from "../nostr-system"; -import { WorkerCommand, WorkerMessage } from "."; -import { CachedTable } from "@snort/shared"; -import { EventsCache } from "../cache/events"; -import { RelayMetricHandler } from "../relay-metric-handler"; -import debug from "debug"; -import { ConnectionPool } from "../connection-pool"; -import { CacheRelay } from "../cache-relay"; - -export class SystemWorker extends EventEmitter implements SystemInterface { - #log = debug("SystemWorker"); - #worker: Worker; - #commandQueue: Map void> = new Map(); - #config: SystemConfig; - - /** - * Storage class for user relay lists - */ - get relayCache(): CachedTable { - return this.#config.relays; - } - - /** - * Storage class for user profiles - */ - get profileCache(): CachedTable { - return this.#config.profiles; - } - - /** - * Storage class for relay metrics (connects/disconnects) - */ - get relayMetricsCache(): CachedTable { - return this.#config.relayMetrics; - } - - /** - * Optimizer instance, contains optimized functions for processing data - */ - get optimizer(): Optimizer { - return this.#config.optimizer; - } - - get eventsCache(): CachedTable { - return this.#config.events; - } - - /** - * Check event signatures (recommended) - */ - get checkSigs(): boolean { - return this.#config.checkSigs; - } - - set checkSigs(v: boolean) { - this.#config.checkSigs = v; - } - - get requestRouter() { - return undefined; - } - - get cacheRelay(): CacheRelay | undefined { - return this.#config.cachingRelay; - } - - get pool() { - return {} as ConnectionPool; - } - - readonly relayLoader: RelayMetadataLoader; - readonly profileLoader: ProfileLoaderService; - readonly relayMetricsHandler: RelayMetricHandler; - - constructor(scriptPath: string, props: Partial) { - super(); - this.#config = { - relays: props.relays ?? new UserRelaysCache(props.db?.userRelays), - profiles: props.profiles ?? new UserProfileCache(props.db?.users), - relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics), - events: props.events ?? new EventsCache(props.db?.events), - optimizer: props.optimizer ?? DefaultOptimizer, - checkSigs: props.checkSigs ?? false, - cachingRelay: props.cachingRelay, - db: props.db, - automaticOutboxModel: props.automaticOutboxModel ?? true, - }; - - this.profileLoader = new ProfileLoaderService(this, this.profileCache); - this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache); - this.relayLoader = new RelayMetadataLoader(this, this.relayCache); - this.#worker = new Worker(scriptPath, { - name: "SystemWorker", - type: "module", - }); - this.#worker.onmessage = async e => { - const cmd = e.data as { id: string; type: WorkerCommand; data?: unknown }; - if (cmd.type === WorkerCommand.OkResponse) { - const q = this.#commandQueue.get(cmd.id); - q?.(cmd.data); - this.#commandQueue.delete(cmd.id); - } - }; - } - - get Sockets(): never[] { - return []; - } - - async Init() { - await this.#workerRpc(WorkerCommand.Init); - } - - GetQuery(id: string): QueryLike | undefined { - return undefined; - } - - Query(req: RequestBuilder): QueryLike { - const chan = this.#workerRpc<[RequestBuilder], { id: string; port: MessagePort }>(WorkerCommand.Query, [req]); - return { - on: (_: "event", cb) => { - chan.then(c => { - c.port.onmessage = e => { - //cb(e.data as Array); - }; - }); - }, - off: (_: "event", cb) => { - chan.then(c => { - c.port.close(); - }); - }, - cancel: () => {}, - uncancel: () => {}, - } as QueryLike; - } - - Fetch(req: RequestBuilder, cb?: ((evs: TaggedNostrEvent[]) => void) | undefined): Promise { - throw new Error("Method not implemented."); - } - - async ConnectToRelay(address: string, options: RelaySettings) { - await this.#workerRpc(WorkerCommand.ConnectRelay, [address, options, false]); - } - - DisconnectRelay(address: string): void { - this.#workerRpc(WorkerCommand.DisconnectRelay, address); - } - - HandleEvent(subId: string, ev: TaggedNostrEvent): void { - throw new Error("Method not implemented."); - } - - BroadcastEvent(ev: NostrEvent, cb?: ((rsp: OkResponse) => void) | undefined): Promise { - throw new Error("Method not implemented."); - } - - WriteOnceToRelay(relay: string, ev: NostrEvent): Promise { - throw new Error("Method not implemented."); - } - - #workerRpc(type: WorkerCommand, data?: T, timeout = 5_000) { - const id = uuid(); - const msg = { - id, - type, - data, - } as WorkerMessage; - this.#log(msg); - this.#worker.postMessage(msg); - return new Promise((resolve, reject) => { - let t: ReturnType; - this.#commandQueue.set(id, v => { - clearTimeout(t); - const cmdReply = v as WorkerMessage; - if (cmdReply.type === WorkerCommand.OkResponse) { - resolve(cmdReply.data); - } else { - reject(cmdReply.data); - } - }); - t = setTimeout(() => { - reject("timeout"); - }, timeout); - }); - } -}