From 5cea096067fac1b2c320213b975e1b66c648113e Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 23 Jan 2024 15:35:28 +0000 Subject: [PATCH] feat: @snort/system CacheRelay --- packages/app/src/Cache/EventCacheWorker.ts | 30 +-- packages/app/src/Cache/ProfileWorkeCache.ts | 36 +-- .../Event/Note/NoteFooter/NoteFooter.tsx | 5 +- .../app/src/Components/Feed/LocalSearch.tsx | 21 +- packages/app/src/Db/FuzzySearch.ts | 22 +- packages/app/src/Feed/LoginFeed.ts | 4 - packages/app/src/Feed/TimelineFeed.ts | 7 +- packages/app/src/Feed/WorkerRelayView.ts | 205 ++---------------- packages/app/src/Hooks/useLists.tsx | 4 +- .../app/src/Pages/Profile/ProfilePage.tsx | 2 +- .../Pages/Profile/ProfileTabComponents.tsx | 6 +- packages/app/src/index.tsx | 31 ++- packages/app/src/system.ts | 10 +- packages/system/src/cache-relay.ts | 17 ++ packages/system/src/connection-pool.ts | 4 +- packages/system/src/connection.ts | 10 +- packages/system/src/index.ts | 14 +- packages/system/src/nostr-system.ts | 8 +- packages/system/src/nostr.ts | 12 +- packages/system/src/outbox-model.ts | 10 +- packages/system/src/query-manager.ts | 17 +- packages/system/src/query.ts | 6 +- packages/system/src/request-builder.ts | 73 +++++-- packages/system/src/worker/system-worker.ts | 2 + packages/worker-relay/src/interface.ts | 30 +-- packages/worker-relay/src/memory-relay.ts | 6 +- packages/worker-relay/src/sqlite-relay.ts | 16 +- packages/worker-relay/src/types.ts | 22 +- packages/worker-relay/src/worker.ts | 46 ++-- 29 files changed, 296 insertions(+), 380 deletions(-) create mode 100644 packages/system/src/cache-relay.ts diff --git a/packages/app/src/Cache/EventCacheWorker.ts b/packages/app/src/Cache/EventCacheWorker.ts index e78c65dc..dce3cc3e 100644 --- a/packages/app/src/Cache/EventCacheWorker.ts +++ b/packages/app/src/Cache/EventCacheWorker.ts @@ -14,9 +14,14 @@ export class EventCacheWorker extends EventEmitter implements Cache } async preload() { - const ids = await this.#relay.sql("select id from events", []); - this.#keys = new Set(ids.map(a => a[0] as string)); - return Promise.resolve(); + const ids = await this.#relay.query([ + "REQ", + "preload-event-cache", + { + ids_only: true, + }, + ]); + this.#keys = new Set(ids as unknown as Array); } keysOnTable(): string[] { @@ -43,18 +48,17 @@ export class EventCacheWorker extends EventEmitter implements Cache } async bulkGet(keys: string[]): Promise { - const results = await this.#relay.req({ - id: "EventCacheWorker.bulkGet", - filters: [ - { - ids: keys, - }, - ], - }); - for (const ev of results.result) { + const results = await this.#relay.query([ + "REQ", + "EventCacheWorker.bulkGet", + { + ids: keys, + }, + ]); + for (const ev of results) { this.#cache.set(ev.id, ev); } - return results.result; + return results; } async set(obj: NostrEvent): Promise { diff --git a/packages/app/src/Cache/ProfileWorkeCache.ts b/packages/app/src/Cache/ProfileWorkeCache.ts index 7d4ec5c3..aa1774ef 100644 --- a/packages/app/src/Cache/ProfileWorkeCache.ts +++ b/packages/app/src/Cache/ProfileWorkeCache.ts @@ -1,12 +1,14 @@ -import { CachedTable, CacheEvents, removeUndefined } from "@snort/shared"; +import { CachedTable, CacheEvents, removeUndefined, unixNowMs, unwrap } from "@snort/shared"; import { CachedMetadata, mapEventToProfile, NostrEvent } from "@snort/system"; import { WorkerRelayInterface } from "@snort/worker-relay"; +import debug from "debug"; import EventEmitter from "eventemitter3"; export class ProfileCacheRelayWorker extends EventEmitter implements CachedTable { #relay: WorkerRelayInterface; #keys = new Set(); #cache = new Map(); + #log = debug("ProfileCacheRelayWorker"); constructor(relay: WorkerRelayInterface) { super(); @@ -14,8 +16,17 @@ export class ProfileCacheRelayWorker extends EventEmitter implement } async preload() { - const ids = await this.#relay.sql("select distinct(pubkey) from events where kind = ?", [0]); - this.#keys = new Set(ids.map(a => a[0] as string)); + const start = unixNowMs(); + const profiles = await this.#relay.query([ + "REQ", + "profiles-preload", + { + kinds: [0], + }, + ]); + this.#cache = new Map(profiles.map(a => [a.pubkey, unwrap(mapEventToProfile(a))])); + this.#keys = new Set(this.#cache.keys()); + this.#log(`Loaded %d/%d in %d ms`, this.#cache.size, this.#keys.size, (unixNowMs() - start).toLocaleString()); } keysOnTable(): string[] { @@ -50,16 +61,15 @@ export class ProfileCacheRelayWorker extends EventEmitter implement async bulkGet(keys: string[]) { if (keys.length === 0) return []; - const results = await this.#relay.req({ - id: "ProfileCacheRelayWorker.bulkGet", - filters: [ - { - authors: keys, - kinds: [0], - }, - ], - }); - const mapped = removeUndefined(results.result.map(a => mapEventToProfile(a))); + const results = await this.#relay.query([ + "REQ", + "ProfileCacheRelayWorker.bulkGet", + { + authors: keys, + kinds: [0], + }, + ]); + const mapped = removeUndefined(results.map(a => mapEventToProfile(a))); for (const pf of mapped) { this.#cache.set(this.key(pf), pf); } diff --git a/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx b/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx index cf392bea..092349bf 100644 --- a/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx +++ b/packages/app/src/Components/Event/Note/NoteFooter/NoteFooter.tsx @@ -1,5 +1,5 @@ import { NostrLink, TaggedNostrEvent } from "@snort/system"; -import { useEventReactions } from "@snort/system-react"; +import { useEventReactions, useReactions } from "@snort/system-react"; import React, { useMemo, useState } from "react"; import { FooterZapButton } from "@/Components/Event/Note/NoteFooter/FooterZapButton"; @@ -8,7 +8,6 @@ import { PowIcon } from "@/Components/Event/Note/NoteFooter/PowIcon"; import { ReplyButton } from "@/Components/Event/Note/NoteFooter/ReplyButton"; import { RepostButton } from "@/Components/Event/Note/NoteFooter/RepostButton"; import ReactionsModal from "@/Components/Event/Note/ReactionsModal"; -import { useReactionsView } from "@/Feed/WorkerRelayView"; import useLogin from "@/Hooks/useLogin"; export interface NoteFooterProps { @@ -22,7 +21,7 @@ export default function NoteFooter(props: NoteFooterProps) { const ids = useMemo(() => [link], [link]); const [showReactions, setShowReactions] = useState(false); - const related = useReactionsView(ids, false); + const related = useReactions("reactions", ids, undefined, false); const { reactions, zaps, reposts } = useEventReactions(link, related); const { positive } = reactions; diff --git a/packages/app/src/Components/Feed/LocalSearch.tsx b/packages/app/src/Components/Feed/LocalSearch.tsx index 561c71b0..9eb5206c 100644 --- a/packages/app/src/Components/Feed/LocalSearch.tsx +++ b/packages/app/src/Components/Feed/LocalSearch.tsx @@ -21,19 +21,18 @@ export function LocalSearch({ term, kind }: { term: string; kind: EventKind }) { useEffect(() => { setFrag(undefined); if (term) { - Relay.req({ - id: "local-search", - filters: [ - { - kinds: [kind], - limit: 100, - search: term, - }, - ], - }).then(res => { + Relay.query([ + "REQ", + "local-search", + { + kinds: [kind], + limit: 100, + search: term, + }, + ]).then(res => { setFrag({ refTime: 0, - events: res.result as Array, + events: res as Array, }); }); } diff --git a/packages/app/src/Db/FuzzySearch.ts b/packages/app/src/Db/FuzzySearch.ts index 5c2abce9..6fb0ed02 100644 --- a/packages/app/src/Db/FuzzySearch.ts +++ b/packages/app/src/Db/FuzzySearch.ts @@ -43,19 +43,17 @@ export const addEventToFuzzySearch = (ev: NostrEvent) => { }; export const addCachedMetadataToFuzzySearch = (profile: CachedMetadata) => { - queueMicrotask(() => { - const existing = profileTimestamps.get(profile.pubkey); - if (existing) { - if (existing > profile.created) { - return; - } - fuzzySearch.remove(doc => doc.pubkey === profile.pubkey); + const existing = profileTimestamps.get(profile.pubkey); + if (existing) { + if (existing > profile.created) { + return; } - profileTimestamps.set(profile.pubkey, profile.created); - if (profile.pubkey && (profile.name || profile.display_name || profile.nip05)) { - fuzzySearch.add(profile); - } - }); + fuzzySearch.remove(doc => doc.pubkey === profile.pubkey); + } + profileTimestamps.set(profile.pubkey, profile.created); + if (profile.pubkey && (profile.name || profile.display_name || profile.nip05)) { + fuzzySearch.add(profile); + } }; export default fuzzySearch; diff --git a/packages/app/src/Feed/LoginFeed.ts b/packages/app/src/Feed/LoginFeed.ts index e4610e6d..05d80b22 100644 --- a/packages/app/src/Feed/LoginFeed.ts +++ b/packages/app/src/Feed/LoginFeed.ts @@ -23,9 +23,6 @@ import { SnortAppData, } from "@/Utils/Login"; import { SubscriptionEvent } from "@/Utils/Subscription"; - -import { useFollowsContactListView } from "./WorkerRelayView"; - /** * Managed loading data for the current logged in user */ @@ -34,7 +31,6 @@ export default function useLoginFeed() { const { publicKey: pubKey, follows } = login; const { publisher, system } = useEventPublisher(); - useFollowsContactListView(); useEffect(() => { system.checkSigs = login.appData.item.preferences.checkSigs; }, [login]); diff --git a/packages/app/src/Feed/TimelineFeed.ts b/packages/app/src/Feed/TimelineFeed.ts index 59096c0c..d8ff37fc 100644 --- a/packages/app/src/Feed/TimelineFeed.ts +++ b/packages/app/src/Feed/TimelineFeed.ts @@ -86,9 +86,10 @@ export default function useTimelineFeed(subject: TimelineSubject, options: Timel const sub = useMemo(() => { const rb = createBuilder(); + console.debug(rb?.builder.id, options); if (rb) { if (options.method === "LIMIT_UNTIL") { - rb.filter.until(until).limit(100); + rb.filter.until(until).limit(50); } else { rb.filter.since(since).until(until); if (since === undefined) { @@ -112,8 +113,8 @@ export default function useTimelineFeed(subject: TimelineSubject, options: Timel .limit(1) .since(now); } + return rb.builder; } - return rb?.builder ?? null; }, [until, since, options.method, pref, createBuilder]); const mainQuery = useRequestBuilderAdvanced(sub); @@ -135,8 +136,8 @@ export default function useTimelineFeed(subject: TimelineSubject, options: Timel }); rb.builder.id = `${rb.builder.id}:latest`; rb.filter.limit(1).since(now); + return rb.builder; } - return rb?.builder ?? null; }, [pref.autoShowLatest, createBuilder]); const latestQuery = useRequestBuilderAdvanced(subRealtime); diff --git a/packages/app/src/Feed/WorkerRelayView.ts b/packages/app/src/Feed/WorkerRelayView.ts index 298dddbc..3d6ecd18 100644 --- a/packages/app/src/Feed/WorkerRelayView.ts +++ b/packages/app/src/Feed/WorkerRelayView.ts @@ -1,200 +1,39 @@ -import { unixNow } from "@snort/shared"; -import { EventKind, NostrEvent, NostrLink, ReqFilter, RequestBuilder, TaggedNostrEvent } from "@snort/system"; -import { SnortContext, useRequestBuilder } from "@snort/system-react"; -import { useCallback, useContext, useEffect, useMemo, useState } from "react"; -import { LRUCache } from "typescript-lru-cache"; +import { EventKind, RequestBuilder } from "@snort/system"; +import { useRequestBuilder } from "@snort/system-react"; +import { useMemo } from "react"; -import { Relay } from "@/Cache"; +//import { LRUCache } from "typescript-lru-cache"; import useLogin from "@/Hooks/useLogin"; -import { Day } from "@/Utils/Const"; -const cache = new LRUCache({ maxSize: 100 }); - -export function useWorkerRelayView(id: string, filters: Array, leaveOpen?: boolean, maxWindow?: number) { - const cacheKey = useMemo(() => JSON.stringify(filters), [filters]); - const [events, setEvents] = useState>(cache.get(cacheKey) ?? []); - const [rb, setRb] = useState(); - const system = useContext(SnortContext); - - const cacheAndSetEvents = useCallback( - (evs: Array) => { - cache.set(cacheKey, evs); - setEvents(evs); - }, - [cacheKey], - ); - - useEffect(() => { - if (rb) { - const q = system.Query(rb); - q.uncancel(); - return () => q.cancel(); - } - }, [rb, system]); - useEffect(() => { - setRb(undefined); - Relay.req({ - id: `${id}+latest`, - filters: filters.map(f => ({ - ...f, - until: undefined, - since: undefined, - limit: 1, - })), - }).then(latest => { - const rb = new RequestBuilder(id); - rb.withOptions({ fillStore: false }); - filters - .map((f, i) => { - const since = latest.result?.at(i)?.created_at; - return { - ...f, - limit: undefined, - until: undefined, - since: since ? since + 1 : maxWindow ? unixNow() - maxWindow : f.since, - }; - }) - .forEach(f => rb.withBareFilter(f)); - setRb(rb); - }); - Relay.req({ id, filters, leaveOpen }).then(res => { - cacheAndSetEvents(res.result); - if (res.port) { - res.port.addEventListener("message", ev => { - const evs = ev.data as Array; - if (evs.length > 0) { - cacheAndSetEvents([...events, ...evs]); - } - }); - res.port.start(); - } - }); - return () => { - Relay.close(id); - }; - }, [id, filters, maxWindow]); - - return events as Array; -} - -export function useWorkerRelayViewCount(id: string, filters: Array, maxWindow?: number) { - const [count, setCount] = useState(0); - const [rb, setRb] = useState(); - useRequestBuilder(rb); - - useEffect(() => { - Relay.req({ - id: `${id}+latest`, - filters: filters.map(f => ({ - ...f, - until: undefined, - since: undefined, - limit: 1, - })), - }).then(latest => { - const rb = new RequestBuilder(id); - filters - .map((f, i) => ({ - ...f, - limit: undefined, - until: undefined, - since: latest.result?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), - })) - .forEach(f => rb.withBareFilter(f)); - setRb(rb); - }); - Relay.count({ id, filters }).then(setCount); - }, [id, filters, maxWindow]); - - return count; -} +//const cache = new LRUCache({ maxSize: 100 }); export function useFollowsTimelineView(limit = 20) { const follows = useLogin(s => s.follows.item); const kinds = [EventKind.TextNote, EventKind.Repost, EventKind.Polls]; - const filter = useMemo(() => { - return [ - { - authors: follows, - kinds, - limit, - }, - ]; + const req = useMemo(() => { + const rb = new RequestBuilder("follows-timeline"); + rb.withOptions({ + leaveOpen: true, + }); + rb.withFilter().kinds(kinds).authors(follows).limit(limit); + return rb; }, [follows, limit]); - return useWorkerRelayView("follows-timeline", filter, true, Day * 7); + return useRequestBuilder(req); } export function useNotificationsView() { const publicKey = useLogin(s => s.publicKey); const kinds = [EventKind.TextNote, EventKind.Reaction, EventKind.Repost, EventKind.ZapReceipt]; const req = useMemo(() => { - return [ - { - "#p": [publicKey ?? ""], - kinds, - since: unixNow() - Day * 7, - }, - ]; + if (publicKey) { + const rb = new RequestBuilder("notifications"); + rb.withOptions({ + leaveOpen: true, + }); + rb.withFilter().kinds(kinds).tag("p", [publicKey]).limit(1000); + return rb; + } }, [publicKey]); - return useWorkerRelayView("notifications", req, true, Day * 30); -} - -export function useReactionsView(ids: Array, leaveOpen = true) { - const req = useMemo(() => { - const rb = new RequestBuilder("reactions"); - rb.withOptions({ leaveOpen }); - const grouped = ids.reduce( - (acc, v) => { - acc[v.type] ??= []; - acc[v.type].push(v); - return acc; - }, - {} as Record>, - ); - - for (const [, v] of Object.entries(grouped)) { - rb.withFilter().kinds([EventKind.Reaction, EventKind.Repost, EventKind.ZapReceipt]).replyToLink(v); - } - return rb.buildRaw(); - }, [ids]); - - return useWorkerRelayView("reactions", req, leaveOpen, undefined); -} - -export function useReactionsViewCount(ids: Array, leaveOpen = true) { - const req = useMemo(() => { - const rb = new RequestBuilder("reactions"); - rb.withOptions({ leaveOpen }); - const grouped = ids.reduce( - (acc, v) => { - acc[v.type] ??= []; - acc[v.type].push(v); - return acc; - }, - {} as Record>, - ); - - for (const [, v] of Object.entries(grouped)) { - rb.withFilter().kinds([EventKind.Reaction, EventKind.Repost, EventKind.ZapReceipt]).replyToLink(v); - } - return rb.buildRaw(); - }, [ids]); - - return useWorkerRelayViewCount("reactions", req, undefined); -} - -export function useFollowsContactListView() { - const follows = useLogin(s => s.follows.item); - const kinds = [EventKind.ContactList, EventKind.Relays]; - - const filter = useMemo(() => { - return [ - { - authors: follows, - kinds, - }, - ]; - }, [follows]); - return useWorkerRelayView("follows-contacts-relays", filter, undefined, undefined); + return useRequestBuilder(req); } diff --git a/packages/app/src/Hooks/useLists.tsx b/packages/app/src/Hooks/useLists.tsx index 64f97547..d3815ea9 100644 --- a/packages/app/src/Hooks/useLists.tsx +++ b/packages/app/src/Hooks/useLists.tsx @@ -9,7 +9,9 @@ export function useLinkList(id: string, fn: (rb: RequestBuilder) => void) { const sub = useMemo(() => { const rb = new RequestBuilder(id); fn(rb); - return rb; + if (rb.numFilters > 0) { + return rb; + } }, [id, fn]); const listStore = useRequestBuilder(sub); diff --git a/packages/app/src/Pages/Profile/ProfilePage.tsx b/packages/app/src/Pages/Profile/ProfilePage.tsx index 86a86227..4f62fed8 100644 --- a/packages/app/src/Pages/Profile/ProfilePage.tsx +++ b/packages/app/src/Pages/Profile/ProfilePage.tsx @@ -163,7 +163,7 @@ export default function ProfilePage({ id: propId, state }: ProfilePageProps) { )}
- +
diff --git a/packages/app/src/Pages/Profile/ProfileTabComponents.tsx b/packages/app/src/Pages/Profile/ProfileTabComponents.tsx index 29fe345a..b70ddb5c 100644 --- a/packages/app/src/Pages/Profile/ProfileTabComponents.tsx +++ b/packages/app/src/Pages/Profile/ProfileTabComponents.tsx @@ -11,6 +11,7 @@ import FollowsList from "@/Components/User/FollowListBase"; import useFollowersFeed from "@/Feed/FollowersFeed"; import useFollowsFeed from "@/Feed/FollowsFeed"; import useRelaysFeed from "@/Feed/RelaysFeed"; +import { TimelineSubject } from "@/Feed/TimelineFeed"; import useZapsFeed from "@/Feed/ZapsFeed"; import { useBookmarkList, usePinList } from "@/Hooks/useLists"; import messages from "@/Pages/messages"; @@ -52,7 +53,6 @@ export function BookMarksTab({ id }: { id: HexKey }) { } export function ProfileNotesTab({ id, relays, isMe }: { id: HexKey; relays?: Array; isMe: boolean }) { - console.count("ProfileNotesTab"); const pinned = usePinList(id); const options = useMemo(() => ({ showTime: false, showPinned: true, canUnpin: isMe }), [isMe]); const subject = useMemo( @@ -61,7 +61,7 @@ export function ProfileNotesTab({ id, relays, isMe }: { id: HexKey; relays?: Arr items: [id], discriminator: id.slice(0, 12), relay: relays, - }), + } as TimelineSubject), [id, relays], ); return ( @@ -76,7 +76,7 @@ export function ProfileNotesTab({ id, relays, isMe }: { id: HexKey; relays?: Arr subject={subject} postsOnly={false} method={"LIMIT_UNTIL"} - loadMore={false} + loadMore={true} ignoreModeration={true} window={60 * 60 * 6} /> diff --git a/packages/app/src/index.tsx b/packages/app/src/index.tsx index 731d6cf4..225b6872 100644 --- a/packages/app/src/index.tsx +++ b/packages/app/src/index.tsx @@ -8,11 +8,11 @@ import { StrictMode } from "react"; import * as ReactDOM from "react-dom/client"; import { createBrowserRouter, RouteObject, RouterProvider } from "react-router-dom"; -import { initRelayWorker, preload, Relay } from "@/Cache"; +import { initRelayWorker, preload, Relay, UserCache } from "@/Cache"; import { ThreadRoute } from "@/Components/Event/Thread"; import { IntlProvider } from "@/Components/IntlProvider/IntlProvider"; import { db } from "@/Db"; -import { addEventToFuzzySearch } from "@/Db/FuzzySearch"; +import { addCachedMetadataToFuzzySearch } from "@/Db/FuzzySearch"; import { updateRelayConnections } from "@/Hooks/useLoginRelays"; import { AboutPage } from "@/Pages/About"; import { SnortDeckLayout } from "@/Pages/DeckLayout"; @@ -54,39 +54,34 @@ async function initSite() { const login = LoginStore.takeSnapshot(); db.ready = await db.isAvailable(); if (db.ready) { - await preload(login.follows.item); + preload(login.follows.item); } updateRelayConnections(System, login.relays.item).catch(console.error); - try { - if ("registerProtocolHandler" in window.navigator) { - window.navigator.registerProtocolHandler("web+nostr", `${window.location.protocol}//${window.location.host}/%s`); - console.info("Registered protocol handler for 'web+nostr'"); - } - } catch (e) { - console.error("Failed to register protocol handler", e); - } - setupWebLNWalletConfig(Wallets); - Relay.sql("select json from events where kind = ?", [3]).then(res => { - for (const [json] of res) { + Relay.query(["REQ", "preload-social-graph", { + kinds: [3] + }]).then(res => { + for (const ev of res) { try { - socialGraphInstance.handleEvent(JSON.parse(json as string)); + socialGraphInstance.handleEvent(ev); } catch (e) { console.error("Failed to handle contact list event from sql db", e); } } }); - Relay.sql("select json from events where kind = ?", [0]).then(res => { - for (const [json] of res) { + + queueMicrotask(() => { + for (const ev of UserCache.snapshot()) { try { - addEventToFuzzySearch(JSON.parse(json as string)); + addCachedMetadataToFuzzySearch(ev); } catch (e) { console.error("Failed to handle metadata event from sql db", e); } } }); + return null; } diff --git a/packages/app/src/system.ts b/packages/app/src/system.ts index 83eca49b..fc990072 100644 --- a/packages/app/src/system.ts +++ b/packages/app/src/system.ts @@ -1,5 +1,5 @@ import { removeUndefined, throwIfOffline } from "@snort/shared"; -import { mapEventToProfile, NostrEvent, NostrSystem, ProfileLoaderService, socialGraphInstance } from "@snort/system"; +import { mapEventToProfile, NostrEvent, NostrSystem, socialGraphInstance } from "@snort/system"; import inMemoryDB from "@snort/system/src/InMemoryDB"; import { EventsCache, Relay, RelayMetrics, SystemDb, UserCache, UserRelays } from "@/Cache"; @@ -15,6 +15,7 @@ export const System = new NostrSystem({ eventsCache: EventsCache, profileCache: UserCache, relayMetrics: RelayMetrics, + cacheRelay: Relay, optimizer: hasWasm ? WasmOptimizer : undefined, db: SystemDb, }); @@ -58,9 +59,4 @@ export async function fetchProfile(key: string) { } catch (e) { console.error(e); } -} - -/** - * Singleton user profile loader - */ -export const ProfileLoader = new ProfileLoaderService(System, UserCache); +} \ No newline at end of file diff --git a/packages/system/src/cache-relay.ts b/packages/system/src/cache-relay.ts new file mode 100644 index 00000000..15b7b41d --- /dev/null +++ b/packages/system/src/cache-relay.ts @@ -0,0 +1,17 @@ +import { NostrEvent, OkResponse, ReqCommand } from "./nostr"; + +/** + * A cache relay is an always available local (local network / browser worker) relay + * Which should contain all of the content we're looking for and respond quickly. + */ +export interface CacheRelay { + /** + * Write event to cache relay + */ + event(ev: NostrEvent): Promise; + + /** + * Read event from cache relay + */ + query(req: ReqCommand): Promise>; +} diff --git a/packages/system/src/connection-pool.ts b/packages/system/src/connection-pool.ts index 0b20bc62..bf99ea7c 100644 --- a/packages/system/src/connection-pool.ts +++ b/packages/system/src/connection-pool.ts @@ -2,8 +2,8 @@ import { removeUndefined, sanitizeRelayUrl, unwrap } from "@snort/shared"; import debug from "debug"; import EventEmitter from "eventemitter3"; -import { Connection, ConnectionStateSnapshot, OkResponse, RelaySettings } from "./connection"; -import { NostrEvent, TaggedNostrEvent } from "./nostr"; +import { Connection, ConnectionStateSnapshot, RelaySettings } from "./connection"; +import { NostrEvent, OkResponse, TaggedNostrEvent } from "./nostr"; import { pickRelaysForReply } from "./outbox-model"; import { SystemInterface } from "."; diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 63d3b04f..120ff048 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -6,7 +6,7 @@ import EventEmitter from "eventemitter3"; import { DefaultConnectTimeout } from "./const"; import { ConnectionStats } from "./connection-stats"; -import { NostrEvent, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr"; +import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr"; import { RelayInfo } from "./relay-info"; import EventKind from "./event-kind"; import { EventExt } from "./event-ext"; @@ -19,14 +19,6 @@ export interface RelaySettings { write: boolean; } -export interface OkResponse { - ok: boolean; - id: string; - relay: string; - message?: string; - event: NostrEvent; -} - /** * Snapshot of connection stats */ diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index 3fee758d..dde02769 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -1,14 +1,15 @@ -import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; +import { RelaySettings, ConnectionStateSnapshot } from "./connection"; import { RequestBuilder } from "./request-builder"; -import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr"; +import { NostrEvent, OkResponse, ReqFilter, TaggedNostrEvent } from "./nostr"; import { ProfileLoaderService } from "./profile-cache"; -import { RelayCache, RelayMetadataLoader } from "./outbox-model"; +import { AuthorsRelaysCache, RelayMetadataLoader } from "./outbox-model"; import { Optimizer } from "./query-optimizer"; import { base64 } from "@scure/base"; import { CachedTable } from "@snort/shared"; import { ConnectionPool } from "./connection-pool"; import EventEmitter from "eventemitter3"; import { QueryEvents } from "./query"; +import { CacheRelay } from "./cache-relay"; export { NostrSystem } from "./nostr-system"; export { default as EventKind } from "./event-kind"; @@ -133,7 +134,7 @@ export interface SystemInterface { /** * Relay cache for "Gossip" model */ - get relayCache(): RelayCache; + get relayCache(): AuthorsRelaysCache; /** * Query optimizer @@ -154,6 +155,11 @@ export interface SystemInterface { * Main connection pool */ get pool(): ConnectionPool; + + /** + * Local relay cache service + */ + get cacheRelay(): CacheRelay | undefined; } export interface SystemSnapshot { diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index ab271fd2..b6eaedd4 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -2,8 +2,8 @@ import debug from "debug"; import EventEmitter from "eventemitter3"; import { CachedTable } from "@snort/shared"; -import { NostrEvent, TaggedNostrEvent } from "./nostr"; -import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; +import { NostrEvent, TaggedNostrEvent, OkResponse } from "./nostr"; +import { RelaySettings, ConnectionStateSnapshot } from "./connection"; import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; import { RelayMetricHandler } from "./relay-metric-handler"; import { @@ -24,6 +24,7 @@ import { RelayMetadataLoader } from "./outbox-model"; import { Optimizer, DefaultOptimizer } from "./query-optimizer"; import { ConnectionPool, DefaultConnectionPool } from "./connection-pool"; import { QueryManager } from "./query-manager"; +import { CacheRelay } from "./cache-relay"; export interface NostrSystemEvents { change: (state: SystemSnapshot) => void; @@ -37,6 +38,7 @@ export interface NostrsystemProps { profileCache?: CachedTable; relayMetrics?: CachedTable; eventsCache?: CachedTable; + cacheRelay?: CacheRelay; optimizer?: Optimizer; db?: SnortSystemDb; checkSigs?: boolean; @@ -82,6 +84,7 @@ export class NostrSystem extends EventEmitter implements Syst readonly pool: ConnectionPool; readonly eventsCache: CachedTable; readonly relayLoader: RelayMetadataLoader; + readonly cacheRelay: CacheRelay | undefined; /** * Check event signatures (reccomended) @@ -95,6 +98,7 @@ export class NostrSystem extends EventEmitter implements Syst this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); this.optimizer = props.optimizer ?? DefaultOptimizer; + this.cacheRelay = props.cacheRelay; this.profileLoader = new ProfileLoaderService(this, this.profileCache); this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache); diff --git a/packages/system/src/nostr.ts b/packages/system/src/nostr.ts index 61b63bd8..66f2f9e2 100644 --- a/packages/system/src/nostr.ts +++ b/packages/system/src/nostr.ts @@ -57,8 +57,8 @@ export interface ReqFilter { since?: number; until?: number; limit?: number; - not?: ReqFilter; - [key: string]: Array | Array | string | number | undefined | ReqFilter; + ids_only?: boolean; + [key: string]: Array | Array | string | number | undefined | boolean; } /** @@ -92,3 +92,11 @@ export interface IMeta { alt?: string; fallback?: Array; } + +export interface OkResponse { + ok: boolean; + id: string; + relay: string; + message?: string; + event: NostrEvent; +} diff --git a/packages/system/src/outbox-model.ts b/packages/system/src/outbox-model.ts index b4fc883c..b79c2995 100644 --- a/packages/system/src/outbox-model.ts +++ b/packages/system/src/outbox-model.ts @@ -33,14 +33,14 @@ export interface RelayTaggedFilters { const logger = debug("OutboxModel"); -export interface RelayCache { +export interface AuthorsRelaysCache { getFromCache(pubkey?: string): UsersRelays | undefined; update(obj: UsersRelays): Promise<"new" | "updated" | "refresh" | "no_change">; buffer(keys: Array): Promise>; bulkSet(objs: Array): Promise; } -export function splitAllByWriteRelays(cache: RelayCache, filters: Array) { +export function splitAllByWriteRelays(cache: AuthorsRelaysCache, filters: Array) { const allSplit = filters .map(a => splitByWriteRelays(cache, a)) .reduce((acc, v) => { @@ -66,7 +66,7 @@ export function splitAllByWriteRelays(cache: RelayCache, filters: Array { +export function splitByWriteRelays(cache: AuthorsRelaysCache, filter: ReqFilter, pickN?: number): Array { const authors = filter.authors; if ((authors?.length ?? 0) === 0) { return [ @@ -108,7 +108,7 @@ export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter, pickN?: * Split filters by author */ export function splitFlatByWriteRelays( - cache: RelayCache, + cache: AuthorsRelaysCache, input: Array, pickN?: number, ): Array { @@ -146,7 +146,7 @@ export function splitFlatByWriteRelays( /** * Pick most popular relays for each authors */ -export function pickTopRelays(cache: RelayCache, authors: Array, n: number, type: "write" | "read") { +export function pickTopRelays(cache: AuthorsRelaysCache, authors: Array, n: number, type: "write" | "read") { // map of pubkey -> [write relays] const allRelays = authors.map(a => { return { diff --git a/packages/system/src/query-manager.ts b/packages/system/src/query-manager.ts index dc26b785..cb0db971 100644 --- a/packages/system/src/query-manager.ts +++ b/packages/system/src/query-manager.ts @@ -1,6 +1,6 @@ import debug from "debug"; import EventEmitter from "eventemitter3"; -import { BuiltRawReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from "."; +import { BuiltRawReqFilter, RequestBuilder, RequestStrategy, SystemInterface, TaggedNostrEvent } from "."; import { Query, TraceReport } from "./query"; import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer"; import { trimFilters } from "./request-trim"; @@ -105,9 +105,17 @@ export class QueryManager extends EventEmitter { } async #send(q: Query, qSend: BuiltRawReqFilter) { + if (qSend.strategy === RequestStrategy.CacheRelay && this.#system.cacheRelay) { + const qt = q.insertCompletedTrace(qSend, []); + const res = await this.#system.cacheRelay.query(["REQ", qt.id, ...qSend.filters]); + q.feed.add(res?.map(a => ({ ...a, relays: [] }) as TaggedNostrEvent)); + return; + } for (const qfl of this.#queryCacheLayers) { qSend = await qfl.processFilter(q, qSend); } + + // automated outbox model, load relays for queried authors for (const f of qSend.filters) { if (f.authors) { this.#system.relayLoader.TrackKeys(f.authors); @@ -156,6 +164,13 @@ export class QueryManager extends EventEmitter { } } + /** + * Split request into 2 branches. + * 1. Request cache for results + * 2. Send query to relays + */ + #splitSyncRequest(req: BuiltRawReqFilter) {} + #cleanup() { let changed = false; for (const [k, v] of this.#queries) { diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index 0d523abf..6ccbf234 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -323,15 +323,15 @@ export class Query extends EventEmitter { } } - #emitFilters() { + async #emitFilters() { this.#log("Starting emit of %s", this.id); const existing = this.filters; if (!(this.request.options?.skipDiff ?? false) && existing.length > 0) { - const filters = this.request.buildDiff(this.#system, existing); + const filters = await this.request.buildDiff(this.#system, existing); this.#log("Build %s %O", this.id, filters); filters.forEach(f => this.emit("request", this.id, f)); } else { - const filters = this.request.build(this.#system); + const filters = await this.request.build(this.#system); this.#log("Build %s %O", this.id, filters); filters.forEach(f => this.emit("request", this.id, f)); } diff --git a/packages/system/src/request-builder.ts b/packages/system/src/request-builder.ts index 4c40caa5..6cf65147 100644 --- a/packages/system/src/request-builder.ts +++ b/packages/system/src/request-builder.ts @@ -5,27 +5,33 @@ import { appendDedupe, dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snor import EventKind from "./event-kind"; import { NostrLink, NostrPrefix, SystemInterface } from "."; import { ReqFilter, u256, HexKey } from "./nostr"; -import { RelayCache, splitByWriteRelays, splitFlatByWriteRelays } from "./outbox-model"; +import { AuthorsRelaysCache, splitByWriteRelays, splitFlatByWriteRelays } from "./outbox-model"; +import { CacheRelay } from "cache-relay"; /** * Which strategy is used when building REQ filters */ -export enum RequestStrategy { +export const enum RequestStrategy { /** * Use the users default relays to fetch events, * this is the fallback option when there is no better way to query a given filter set */ - DefaultRelays = 1, + DefaultRelays = "default", /** * Using a cached copy of the authors relay lists NIP-65, split a given set of request filters by pubkey */ - AuthorsRelays = 2, + AuthorsRelays = "authors-relays", /** * Use pre-determined relays for query */ - ExplicitRelays = 3, + ExplicitRelays = "explicit-relays", + + /** + * Query the cache relay + */ + CacheRelay = "cache-relay", } /** @@ -121,21 +127,24 @@ export class RequestBuilder { return this.#builders.map(f => f.filter); } - build(system: SystemInterface): Array { - const expanded = this.#builders.flatMap(a => a.build(system.relayCache, this.#options)); + async build(system: SystemInterface): Promise> { + const expanded = ( + await Promise.all(this.#builders.map(a => a.build(system.relayCache, system.cacheRelay, this.#options))) + ).flat(); return this.#groupByRelay(system, expanded); } /** * Detects a change in request from a previous set of filters */ - buildDiff(system: SystemInterface, prev: Array): Array { + async buildDiff(system: SystemInterface, prev: Array): Promise> { const start = unixNowMs(); const diff = system.optimizer.getDiff(prev, this.buildRaw()); const ts = unixNowMs() - start; this.#log("buildDiff %s %d ms +%d", this.id, ts, diff.length); if (diff.length > 0) { + // todo: fix return splitFlatByWriteRelays(system.relayCache, diff).map(a => { return { strategy: RequestStrategy.AuthorsRelays, @@ -143,8 +152,6 @@ export class RequestBuilder { relay: a.relay, }; }); - } else { - this.#log(`Wasted ${ts} ms detecting no changes!`); } return []; } @@ -284,12 +291,48 @@ export class RequestFilterBuilder { /** * Build/expand this filter into a set of relay specific queries */ - build(relays: RelayCache, options?: RequestBuilderOptions): Array { + async build( + relays: AuthorsRelaysCache, + cacheRelay?: CacheRelay, + options?: RequestBuilderOptions, + ): Promise> { + // if since/until are set ignore sync split, cache relay wont be used + if (cacheRelay && this.#filter.since === undefined && this.#filter.until === undefined) { + const latest = await cacheRelay.query([ + "REQ", + uuid(), + { + ...this.#filter, + since: undefined, + until: undefined, + limit: 1, + }, + ]); + if (latest.length === 1) { + return [ + ...this.#buildFromFilter(relays, { + ...this.#filter, + since: latest[0].created_at, + until: undefined, + limit: undefined, + }), + { + filters: [this.#filter], + relay: "==CACHE==", + strategy: RequestStrategy.CacheRelay, + }, + ]; + } + } + return this.#buildFromFilter(relays, this.#filter, options); + } + + #buildFromFilter(relays: AuthorsRelaysCache, f: ReqFilter, options?: RequestBuilderOptions) { // use the explicit relay list first if (this.#relays.size > 0) { return [...this.#relays].map(r => { return { - filters: [this.#filter], + filters: [f], relay: r, strategy: RequestStrategy.ExplicitRelays, }; @@ -297,8 +340,8 @@ export class RequestFilterBuilder { } // If any authors are set use the gossip model to fetch data for each author - if (this.#filter.authors) { - const split = splitByWriteRelays(relays, this.#filter, options?.outboxPickN); + if (f.authors) { + const split = splitByWriteRelays(relays, f, options?.outboxPickN); return split.map(a => { return { filters: [a.filter], @@ -310,7 +353,7 @@ export class RequestFilterBuilder { return [ { - filters: [this.#filter], + filters: [f], relay: "", strategy: RequestStrategy.DefaultRelays, }, diff --git a/packages/system/src/worker/system-worker.ts b/packages/system/src/worker/system-worker.ts index 8c0274ae..f8e39d26 100644 --- a/packages/system/src/worker/system-worker.ts +++ b/packages/system/src/worker/system-worker.ts @@ -26,6 +26,7 @@ import { EventsCache } from "../cache/events"; import { RelayMetricHandler } from "../relay-metric-handler"; import debug from "debug"; import { ConnectionPool } from "connection-pool"; +import { CacheRelay } from "cache-relay"; export class SystemWorker extends EventEmitter implements SystemInterface { #log = debug("SystemWorker"); @@ -38,6 +39,7 @@ export class SystemWorker extends EventEmitter implements Sys readonly relayMetricsHandler: RelayMetricHandler; readonly eventsCache: CachedTable; readonly relayLoader: RelayMetadataLoader; + readonly cacheRelay: CacheRelay | undefined; get checkSigs() { return true; diff --git a/packages/worker-relay/src/interface.ts b/packages/worker-relay/src/interface.ts index c41e3991..175e9c65 100644 --- a/packages/worker-relay/src/interface.ts +++ b/packages/worker-relay/src/interface.ts @@ -1,4 +1,4 @@ -import { NostrEvent, ReqCommand, WorkerMessage, WorkerMessageCommand } from "./types"; +import { NostrEvent, OkResponse, ReqCommand, ReqFilter, WorkerMessage, WorkerMessageCommand } from "./types"; import { v4 as uuid } from "uuid"; export class WorkerRelayInterface { @@ -18,35 +18,31 @@ export class WorkerRelayInterface { } async init(path: string) { - return (await this.#workerRpc("init", path)).result; + return await this.#workerRpc("init", path); } async event(ev: NostrEvent) { - return (await this.#workerRpc("event", ev)).result; + return await this.#workerRpc("event", ev); } - async req(req: ReqCommand) { + async query(req: ReqCommand) { return await this.#workerRpc>("req", req); } async count(req: ReqCommand) { - return (await this.#workerRpc("count", req)).result; + return await this.#workerRpc("count", req); } async summary() { - return (await this.#workerRpc>("summary")).result; + return await this.#workerRpc>("summary"); } async close(id: string) { - return (await this.#workerRpc("close", id)).result; + return await this.#workerRpc("close", id); } async dump() { - return (await this.#workerRpc("dumpDb")).result; - } - - async sql(sql: string, params: Array) { - return (await this.#workerRpc>>("sql", { sql, params })).result; + return await this.#workerRpc("dumpDb"); } #workerRpc(cmd: WorkerMessageCommand, args?: T) { @@ -57,16 +53,10 @@ export class WorkerRelayInterface { args, } as WorkerMessage; this.#worker.postMessage(msg); - return new Promise<{ - result: R; - port: MessagePort | undefined; - }>(resolve => { + return new Promise(resolve => { this.#commandQueue.set(id, (v, port) => { const cmdReply = v as WorkerMessage; - resolve({ - result: cmdReply.args, - port: port.length > 0 ? port[0] : undefined, - }); + resolve(cmdReply.args); }); }); } diff --git a/packages/worker-relay/src/memory-relay.ts b/packages/worker-relay/src/memory-relay.ts index 088dcb10..98dc7b27 100644 --- a/packages/worker-relay/src/memory-relay.ts +++ b/packages/worker-relay/src/memory-relay.ts @@ -69,7 +69,11 @@ export class InMemoryRelay extends EventEmitter implements R const ret = []; for (const [, e] of this.#events) { if (eventMatchesFilter(e, filter)) { - ret.push(e); + if (filter.ids_only === true) { + ret.push(e.id); + } else { + ret.push(e); + } } } return ret; diff --git a/packages/worker-relay/src/sqlite-relay.ts b/packages/worker-relay/src/sqlite-relay.ts index 9955a141..5d21e179 100644 --- a/packages/worker-relay/src/sqlite-relay.ts +++ b/packages/worker-relay/src/sqlite-relay.ts @@ -184,7 +184,13 @@ export class SqliteRelay extends EventEmitter implements Rel const [sql, params] = this.#buildQuery(req); const res = this.#db?.selectArrays(sql, params); - const results = res?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? []; + const results = + res?.map(a => { + if (req.ids_only === true) { + return a[0] as string; + } + return JSON.parse(a[0] as string) as NostrEvent; + }) ?? []; const time = unixNowMs() - start; this.#log(`Query ${id} results took ${time.toLocaleString()}ms`); return results; @@ -245,7 +251,13 @@ export class SqliteRelay extends EventEmitter implements Rel const conditions: Array = []; const params: Array = []; - let sql = `select ${count ? "count(json)" : "json"} from events`; + let resultType = "json"; + if (count) { + resultType = "count(json)"; + } else if (req.ids_only === true) { + resultType = "id"; + } + let sql = `select ${resultType} from events`; const tags = Object.entries(req).filter(([k]) => k.startsWith("#")); for (const [key, values] of tags) { const vArray = values as Array; diff --git a/packages/worker-relay/src/types.ts b/packages/worker-relay/src/types.ts index 975a5323..ae8bccc6 100644 --- a/packages/worker-relay/src/types.ts +++ b/packages/worker-relay/src/types.ts @@ -9,7 +9,7 @@ export type WorkerMessageCommand = | "summary" | "close" | "dumpDb" - | "sql"; + | "emit-event"; export interface WorkerMessage { id: string; @@ -27,11 +27,7 @@ export interface NostrEvent { sig: string; } -export interface ReqCommand { - id: string; - filters: Array; - leaveOpen?: boolean; -} +export type ReqCommand = ["REQ", id: string, ...filters: Array]; export interface ReqFilter { ids?: string[]; @@ -41,8 +37,16 @@ export interface ReqFilter { since?: number; until?: number; limit?: number; - not?: ReqFilter; - [key: string]: Array | Array | string | number | undefined | ReqFilter; + ids_only?: boolean; + [key: string]: Array | Array | string | number | undefined | boolean; +} + +export interface OkResponse { + ok: boolean; + id: string; + relay: string; + message?: string; + event: NostrEvent; } export interface RelayHandler extends EventEmitter { @@ -55,7 +59,7 @@ export interface RelayHandler extends EventEmitter { * Run any SQL command */ sql(sql: string, params: Array): Array>; - req(id: string, req: ReqFilter): Array; + req(id: string, req: ReqFilter): Array; count(req: ReqFilter): number; summary(): Record; dump(): Promise; diff --git a/packages/worker-relay/src/worker.ts b/packages/worker-relay/src/worker.ts index 4019c226..8d427804 100644 --- a/packages/worker-relay/src/worker.ts +++ b/packages/worker-relay/src/worker.ts @@ -15,15 +15,12 @@ const ActiveSubscriptions = new Map(); let relay: RelayHandler | undefined; -async function reply(id: string, obj?: T, transferables?: Transferable[]) { - globalThis.postMessage( - { - id, - cmd: "reply", - args: obj, - } as WorkerMessage, - transferables ?? [], - ); +async function reply(id: string, obj?: T) { + globalThis.postMessage({ + id, + cmd: "reply", + args: obj, + } as WorkerMessage); } // Event inserter queue @@ -108,25 +105,18 @@ globalThis.onmessage = async ev => { break; } case "close": { - ActiveSubscriptions.delete(msg.args as string); reply(msg.id, true); break; } case "req": { await barrierQueue(cmdQueue, async () => { const req = msg.args as ReqCommand; - const chan = new MessageChannel(); - if (req.leaveOpen) { - ActiveSubscriptions.set(req.id, { - filters: req.filters, - port: chan.port1, - }); - } + const filters = req.slice(2) as Array; const results = []; - for (const r of req.filters) { - results.push(...relay!.req(req.id, r as ReqFilter)); + for (const r of filters) { + results.push(...relay!.req(req[1], r)); } - reply(msg.id, results, req.leaveOpen ? [chan.port2] : undefined); + reply(msg.id, results); }); break; } @@ -134,8 +124,9 @@ globalThis.onmessage = async ev => { await barrierQueue(cmdQueue, async () => { const req = msg.args as ReqCommand; let results = 0; - for (const r of req.filters) { - const c = relay!.count(r as ReqFilter); + const filters = req.slice(2) as Array; + for (const r of filters) { + const c = relay!.count(r); results += c; } reply(msg.id, results); @@ -156,17 +147,6 @@ globalThis.onmessage = async ev => { }); break; } - case "sql": { - await barrierQueue(cmdQueue, async () => { - const req = msg.args as { - sql: string; - params: Array; - }; - const res = relay!.sql(req.sql, req.params); - reply(msg.id, res); - }); - break; - } default: { reply(msg.id, { error: "Unknown command" }); break;