diff --git a/packages/app/src/Components/IrisAccount/IrisAccount.tsx b/packages/app/src/Components/IrisAccount/IrisAccount.tsx index 20183bda..2aebb307 100644 --- a/packages/app/src/Components/IrisAccount/IrisAccount.tsx +++ b/packages/app/src/Components/IrisAccount/IrisAccount.tsx @@ -4,7 +4,7 @@ import { FormattedMessage } from "react-intl"; import { injectIntl } from "react-intl"; import messages from "@/Components/messages"; -import { ProfileLoader } from "@/system"; +import { System } from "@/system"; import { LoginStore } from "@/Utils/Login"; import AccountName from "./AccountName"; @@ -285,8 +285,8 @@ class IrisAccount extends Component { componentDidMount() { const session = LoginStore.snapshot(); const myPub = session.publicKey; - ProfileLoader.Cache.hook(() => { - const profile = ProfileLoader.Cache.getFromCache(myPub); + System.ProfileLoader.Cache.hook(() => { + const profile = System.ProfileLoader.Cache.getFromCache(myPub); const irisToActive = profile && profile.nip05 && profile.nip05.endsWith("@iris.to"); this.setState({ profile, irisToActive }); if (profile && !irisToActive) { diff --git a/packages/app/src/Hooks/useRefreshFeedcache.tsx b/packages/app/src/Hooks/useRefreshFeedcache.tsx index 85c9ad47..3f86a78c 100644 --- a/packages/app/src/Hooks/useRefreshFeedcache.tsx +++ b/packages/app/src/Hooks/useRefreshFeedcache.tsx @@ -28,7 +28,7 @@ export function useRefreshFeedCache(c: RefreshFeedCache, leaveOpen = false const q = system.Query(NoopStore, sub); let t: ReturnType | undefined; let tBuf: Array = []; - q.feed.on("event", evs => { + q.on("event", evs => { if (!t) { tBuf = [...evs]; t = setTimeout(() => { @@ -41,9 +41,8 @@ export function useRefreshFeedCache(c: RefreshFeedCache, leaveOpen = false }); q.uncancel(); return () => { - q.feed.off("event"); + q.off("event"); q.cancel(); - q.sendClose(); }; } }, [sub]); diff --git a/packages/app/vite.config.ts b/packages/app/vite.config.ts index 97e3667b..7e0c1692 100644 --- a/packages/app/vite.config.ts +++ b/packages/app/vite.config.ts @@ -34,6 +34,7 @@ export default defineConfig({ assetsInclude: ["**/*.md", "**/*.wasm"], build: { outDir: "build", + commonjsOptions: { transformMixedEsModules: true }, }, clearScreen: false, publicDir: appConfig.get("publicDir"), @@ -51,4 +52,7 @@ export default defineConfig({ globals: true, environment: "jsdom", }, + worker: { + format: "es", + }, }); diff --git a/packages/system-react/src/useRequestBuilder.tsx b/packages/system-react/src/useRequestBuilder.tsx index 11901928..0d685708 100644 --- a/packages/system-react/src/useRequestBuilder.tsx +++ b/packages/system-react/src/useRequestBuilder.tsx @@ -14,10 +14,10 @@ const useRequestBuilder = void) => { if (rb) { const q = system.Query(type, rb); - q.feed.on("event", onChanged); + q.on("event", onChanged); q.uncancel(); return () => { - q.feed.off("event", onChanged); + q.off("event", onChanged); q.cancel(); }; } @@ -28,7 +28,7 @@ const useRequestBuilder = => { const q = system.GetQuery(rb?.id ?? ""); if (q) { - return unwrap(q).feed?.snapshot as StoreSnapshot; + return q.snapshot as StoreSnapshot; } return EmptySnapshot as StoreSnapshot; }; diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index 8f827507..794f40d9 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -1,7 +1,6 @@ import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; import { RequestBuilder } from "./request-builder"; -import { NoteStore, NoteStoreSnapshotData } from "./note-collection"; -import { Query } from "./query"; +import { NoteStore, NoteStoreSnapshotData, StoreSnapshot } from "./note-collection"; import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr"; import { ProfileLoaderService } from "./profile-cache"; import { RelayCache } from "./outbox-model"; @@ -43,7 +42,15 @@ export * from "./cache/user-relays"; export * from "./cache/user-metadata"; export * from "./cache/relay-metric"; -export * from "./worker"; +export * from "./worker/system-worker"; + +export interface QueryLike { + on: (event: "event", fn?: (evs: Array) => void) => void; + off: (event: "event", fn?: (evs: Array) => void) => void; + cancel: () => void; + uncancel: () => void; + get snapshot(): StoreSnapshot; +} export interface SystemInterface { /** @@ -65,14 +72,14 @@ export interface SystemInterface { * Get an active query by ID * @param id Query ID */ - GetQuery(id: string): Query | undefined; + GetQuery(id: string): QueryLike | undefined; /** * Open a new query to relays * @param type Store type * @param req Request to send to relays */ - Query(type: { new (): T }, req: RequestBuilder): Query; + Query(type: { new (): T }, req: RequestBuilder): QueryLike; /** * Fetch data from nostr relays asynchronously diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 7ebc1e72..9a1b6908 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -20,6 +20,7 @@ import { UsersRelays, SnortSystemDb, EventExt, + QueryLike, } from "."; import { EventsCache } from "./cache/events"; import { RelayCache, RelayMetadataLoader } from "./outbox-model"; @@ -224,16 +225,16 @@ export class NostrSystem extends EventEmitter implements Syst this.#pool.disconnect(address); } - GetQuery(id: string): Query | undefined { - return this.#queryManager.get(id); + GetQuery(id: string): QueryLike | undefined { + return this.#queryManager.get(id) as QueryLike; } Fetch(req: RequestBuilder, cb?: (evs: ReadonlyArray) => void) { return this.#queryManager.fetch(req, cb); } - Query(type: { new (): T }, req: RequestBuilder): Query { - return this.#queryManager.query(type, req); + Query(type: { new (): T }, req: RequestBuilder): QueryLike { + return this.#queryManager.query(type, req) as QueryLike; } async #sendQuery(q: Query, qSend: BuiltRawReqFilter) { diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index fee646fa..b19ac528 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -116,6 +116,7 @@ export interface TraceReport { interface QueryEvents { trace: (report: TraceReport) => void; + event: (evs: ReadonlyArray) => void; } /** @@ -172,6 +173,8 @@ export class Query extends EventEmitter implements QueryBase { this.#leaveOpen = leaveOpen ?? false; this.#timeout = timeout ?? 5_000; this.#checkTraces(); + + this.feed.on("event", evs => this.emit("event", evs)); } isOpen() { @@ -193,6 +196,10 @@ export class Query extends EventEmitter implements QueryBase { return this.#feed; } + get snapshot() { + return this.#feed.snapshot; + } + handleEvent(sub: string, e: TaggedNostrEvent) { for (const t of this.#tracing) { if (t.id === sub || sub === "*") { diff --git a/packages/system/src/worker/index.ts b/packages/system/src/worker/index.ts index a73ee825..075aecb9 100644 --- a/packages/system/src/worker/index.ts +++ b/packages/system/src/worker/index.ts @@ -4,6 +4,8 @@ export const enum WorkerCommand { Init, ConnectRelay, DisconnectRelay, + Query, + QueryResult, } export interface WorkerMessage { diff --git a/packages/system/src/worker/system-worker-script.ts b/packages/system/src/worker/system-worker-script.ts index 94786e94..6ed6ce87 100644 --- a/packages/system/src/worker/system-worker-script.ts +++ b/packages/system/src/worker/system-worker-script.ts @@ -1,9 +1,11 @@ /// -import { NostrSystem, NostrsystemProps } from "../nostr-system"; +import { NostrSystem } from "../nostr-system"; import { WorkerMessage, WorkerCommand } from "."; -let system: NostrSystem | undefined; +const system = new NostrSystem({ + checkSigs: true, +}); function reply(id: string, type: WorkerCommand, data: T) { globalThis.postMessage({ @@ -18,31 +20,19 @@ function okReply(id: string, message?: string) { function errorReply(id: string, message: string) { reply(id, WorkerCommand.ErrorResponse, message); } -function checkInitialized() { - if (system === undefined) { - throw new Error("System not initialized"); - } -} - globalThis.onmessage = async ev => { + console.debug(ev); const data = ev.data as { id: string; type: WorkerCommand }; try { switch (data.type) { case WorkerCommand.Init: { - const cmd = ev.data as WorkerMessage; - if (system === undefined) { - system = new NostrSystem(cmd.data); - await system.Init(); - okReply(data.id); - } else { - errorReply(data.id, "System is already initialized"); - } + await system.Init(); + okReply(data.id); break; } case WorkerCommand.ConnectRelay: { - checkInitialized(); const cmd = ev.data as WorkerMessage<[string, { read: boolean; write: boolean }]>; - await system?.ConnectToRelay(cmd.data[0], cmd.data[1]); + await system.ConnectToRelay(cmd.data[0], cmd.data[1]); okReply(data.id, "Connected"); break; } diff --git a/packages/system/src/worker/system-worker.ts b/packages/system/src/worker/system-worker.ts index 0e57cb28..8ffaed2d 100644 --- a/packages/system/src/worker/system-worker.ts +++ b/packages/system/src/worker/system-worker.ts @@ -12,51 +12,110 @@ import { RequestBuilder, SystemInterface, TaggedNostrEvent, + CachedMetadata, + DefaultOptimizer, + RelayMetadataLoader, + RelayMetricCache, + RelayMetrics, + UserProfileCache, + UserRelaysCache, + UsersRelays, + QueryLike, } from ".."; import { NostrSystemEvents, NostrsystemProps } from "../nostr-system"; -import { Query } from "../query"; import { WorkerCommand, WorkerMessage } from "."; +import { FeedCache } from "@snort/shared"; +import { EventsCache } from "../cache/events"; +import { RelayMetricHandler } from "../relay-metric-handler"; +import debug from "debug"; export class SystemWorker extends EventEmitter implements SystemInterface { + #log = debug("SystemWorker"); #worker: Worker; #commandQueue: Map void> = new Map(); - checkSigs: boolean; + #relayCache: FeedCache; + #profileCache: FeedCache; + #relayMetricsCache: FeedCache; + #profileLoader: ProfileLoaderService; + #relayMetrics: RelayMetricHandler; + #eventsCache: FeedCache; + #relayLoader: RelayMetadataLoader; + + get checkSigs() { + return true; + } + + set checkSigs(v: boolean) { + // not used + } constructor(scriptPath: string, props: NostrsystemProps) { super(); - this.checkSigs = props.checkSigs ?? false; + this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); + this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); + this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); + this.#eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); + + this.#profileLoader = new ProfileLoaderService(this, this.#profileCache); + this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache); + this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache); this.#worker = new Worker(scriptPath, { name: "SystemWorker", + type: "module", }); + this.#worker.onmessage = async e => { + const cmd = e.data as { id: string; type: WorkerCommand; data?: unknown }; + if (cmd.type === WorkerCommand.OkResponse) { + const q = this.#commandQueue.get(cmd.id); + q?.(cmd.data); + this.#commandQueue.delete(cmd.id); + } + }; } get Sockets(): ConnectionStateSnapshot[] { - throw new Error("Method not implemented."); + return []; } async Init() { - await this.#workerRpc(WorkerCommand.Init, undefined); + await this.#workerRpc(WorkerCommand.Init); } - GetQuery(id: string): Query | undefined { + GetQuery(id: string): QueryLike | undefined { return undefined; } - Query(type: new () => T, req: RequestBuilder): Query { - throw new Error("Method not implemented."); + Query(type: new () => T, req: RequestBuilder): QueryLike { + const chan = this.#workerRpc<[RequestBuilder], { id: string; port: MessagePort }>(WorkerCommand.Query, [req]); + return { + on: (_: "event", cb) => { + chan.then(c => { + c.port.onmessage = e => { + cb?.(e.data as Array); + }; + }); + }, + off: (_: "event", cb) => { + chan.then(c => { + c.port.close(); + }); + }, + cancel: () => {}, + uncancel: () => {}, + } as QueryLike; } Fetch(req: RequestBuilder, cb?: ((evs: TaggedNostrEvent[]) => void) | undefined): Promise { throw new Error("Method not implemented."); } - ConnectToRelay(address: string, options: RelaySettings): Promise { - throw new Error("Method not implemented."); + async ConnectToRelay(address: string, options: RelaySettings) { + await this.#workerRpc(WorkerCommand.ConnectRelay, [address, options, false]); } DisconnectRelay(address: string): void { - throw new Error("Method not implemented."); + this.#workerRpc(WorkerCommand.DisconnectRelay, address); } HandleEvent(ev: TaggedNostrEvent): void { @@ -72,24 +131,26 @@ export class SystemWorker extends EventEmitter implements Sys } get ProfileLoader(): ProfileLoaderService { - throw new Error("Method not implemented."); + return this.#profileLoader; } get RelayCache(): RelayCache { - throw new Error("Method not implemented."); + return this.#relayCache; } get Optimizer(): Optimizer { - throw new Error("Method not implemented."); + return DefaultOptimizer; } - #workerRpc(type: WorkerCommand, data: T, timeout = 5_000) { + #workerRpc(type: WorkerCommand, data?: T, timeout = 5_000) { const id = uuid(); - this.#worker.postMessage({ + const msg = { id, type, data, - } as WorkerMessage); + } as WorkerMessage; + this.#log(msg); + this.#worker.postMessage(msg); return new Promise((resolve, reject) => { let t: ReturnType; this.#commandQueue.set(id, v => {