diff --git a/packages/system-react/README.md b/packages/system-react/README.md index 2855a667..23a5e4d7 100644 --- a/packages/system-react/README.md +++ b/packages/system-react/README.md @@ -2,7 +2,32 @@ React hooks for @snort/system -Sample: +### Available hooks + +#### `useRequestBuilder(NoteStore, RequestBuilder)` + +The main hook which allows you to subscribe to nostr relays and returns a reactive store. + +#### `useUserProfile(pubkey: string | undefined)` + +Profile hook, profile loading is automated, this hook will return the profile from cache and also refresh the cache in the background (`stale-while-revalidate`) + +#### `useEventFeed(NostrLink)` / `useEventsFeed(Array)` +A simple hook which can load events using the `NostrLink` class, this class contains one NIP-19 entity `nevent/naddr` etc. + +#### `useReactions(id, Array)` +Loads reactions for a set of events, this can be a set of posts on a profile or an arbitary list of events. + +#### `useEventReactions(NostrLink, Array)` +Process a set of related events (usually results from `useReactions`) and return likes/dislikes/reposts/zaps + +#### `useUserSearch()` +Search for profiles in the profile cache, this also returns exact links if they match + +#### `useSystemState(System)` +Hook state of the nostr system + +## Example: ```js import { useMemo } from "react"; diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index a417e479..57160455 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -44,6 +44,8 @@ export * from "./cache/user-relays"; export * from "./cache/user-metadata"; export * from "./cache/relay-metric"; +export * from "./worker/system-worker"; + export interface SystemInterface { /** * Check event signatures (reccomended) @@ -55,6 +57,11 @@ export interface SystemInterface { */ get Sockets(): Array; + /** + * Do some initialization + */ + Init(): Promise; + /** * Get an active query by ID * @param id Query ID @@ -88,6 +95,9 @@ export interface SystemInterface { */ DisconnectRelay(address: string): void; + /** + * Push an event into the system from external source + */ HandleEvent(ev: TaggedNostrEvent): void; /** diff --git a/packages/system/src/nostr-connection-pool.ts b/packages/system/src/nostr-connection-pool.ts new file mode 100644 index 00000000..d44ba516 --- /dev/null +++ b/packages/system/src/nostr-connection-pool.ts @@ -0,0 +1,147 @@ +import { removeUndefined, sanitizeRelayUrl, unwrap } from "@snort/shared"; +import debug from "debug"; +import EventEmitter from "eventemitter3"; + +import { Connection, ConnectionStateSnapshot, OkResponse, RelaySettings } from "./connection"; +import { NostrEvent, TaggedNostrEvent } from "./nostr"; +import { pickRelaysForReply } from "./outbox-model"; +import { SystemInterface } from "."; + +export interface NostrConnectionPoolEvents { + connected: (address: string, wasReconnect: boolean) => void; + connectFailed: (address: string) => void; + event: (address: string, sub: string, e: TaggedNostrEvent) => void; + eose: (address: string, sub: string) => void; + disconnect: (address: string, code: number) => void; + auth: (address: string, challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void; + notice: (address: string, msg: string) => void; +} + +/** + * Simple connection pool containing connections to multiple nostr relays + */ +export class NostrConnectionPool extends EventEmitter { + #log = debug("NostrConnectionPool"); + + /** + * All currently connected websockets + */ + #sockets = new Map(); + + /** + * Get basic state information from the pool + */ + getState(): ConnectionStateSnapshot[] { + return [...this.#sockets.values()].map(a => a.takeSnapshot()); + } + + /** + * Get a connection object from the pool + */ + getConnection(id: string) { + return this.#sockets.get(id); + } + + /** + * Add a new relay to the pool + */ + async connect(address: string, options: RelaySettings, ephemeral: boolean) { + const addr = unwrap(sanitizeRelayUrl(address)); + try { + const existing = this.#sockets.get(addr); + if (!existing) { + const c = new Connection(addr, options, ephemeral); + this.#sockets.set(addr, c); + + c.on("event", (s, e) => this.emit("event", addr, s, e)); + c.on("eose", s => this.emit("eose", addr, s)); + c.on("disconnect", code => this.emit("disconnect", addr, code)); + c.on("connected", r => this.emit("connected", addr, r)); + c.on("auth", (cx, r, cb) => this.emit("auth", addr, cx, r, cb)); + await c.Connect(); + return c; + } else { + // update settings if already connected + existing.Settings = options; + // upgrade to non-ephemeral, never downgrade + if (existing.Ephemeral && !ephemeral) { + existing.Ephemeral = ephemeral; + } + return existing; + } + } catch (e) { + this.#log("%O", e); + this.emit("connectFailed", addr); + this.#sockets.delete(addr); + } + } + + /** + * Remove relay from pool + */ + disconnect(address: string) { + const addr = unwrap(sanitizeRelayUrl(address)); + const c = this.#sockets.get(addr); + if (c) { + this.#sockets.delete(addr); + c.Close(); + } + } + + /** + * Broadcast event to all write relays. + * @remarks Also write event to read relays of those who are `p` tagged in the event (Inbox model) + */ + async broadcast(system: SystemInterface, ev: NostrEvent, cb?: (rsp: OkResponse) => void) { + const writeRelays = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write); + const replyRelays = await pickRelaysForReply(ev, system); + const oks = await Promise.all([ + ...writeRelays.map(async s => { + try { + const rsp = await s.SendAsync(ev); + cb?.(rsp); + return rsp; + } catch (e) { + console.error(e); + } + return; + }), + ...replyRelays.filter(a => !this.#sockets.has(unwrap(sanitizeRelayUrl(a)))).map(a => this.broadcastTo(a, ev)), + ]); + return removeUndefined(oks); + } + + /** + * Send event to specific relay + */ + async broadcastTo(address: string, ev: NostrEvent): Promise { + const addrClean = sanitizeRelayUrl(address); + if (!addrClean) { + throw new Error("Invalid relay address"); + } + + const existing = this.#sockets.get(addrClean); + if (existing) { + return await existing.SendAsync(ev); + } else { + return await new Promise((resolve, reject) => { + const c = new Connection(address, { write: true, read: true }, true); + + const t = setTimeout(reject, 10_000); + c.once("connected", async () => { + clearTimeout(t); + const rsp = await c.SendAsync(ev); + c.Close(); + resolve(rsp); + }); + c.Connect(); + }); + } + } + + *[Symbol.iterator]() { + for (const kv of this.#sockets) { + yield kv; + } + } +} diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 0c49e028..20570372 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -1,9 +1,9 @@ import debug from "debug"; import EventEmitter from "eventemitter3"; -import { unwrap, sanitizeRelayUrl, FeedCache, removeUndefined } from "@snort/shared"; +import { unwrap, FeedCache } from "@snort/shared"; import { NostrEvent, TaggedNostrEvent } from "./nostr"; -import { Connection, RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; +import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; import { Query } from "./query"; import { NoteCollection, NoteStore } from "./note-collection"; import { BuiltRawReqFilter, RequestBuilder, RequestStrategy } from "./request-builder"; @@ -22,14 +22,25 @@ import { EventExt, } from "."; import { EventsCache } from "./cache/events"; -import { RelayCache, RelayMetadataLoader, pickRelaysForReply } from "./outbox-model"; +import { RelayCache, RelayMetadataLoader } from "./outbox-model"; import { QueryOptimizer, DefaultQueryOptimizer } from "./query-optimizer"; import { trimFilters } from "./request-trim"; +import { NostrConnectionPool } from "./nostr-connection-pool"; -interface NostrSystemEvents { +export interface NostrSystemEvents { change: (state: SystemSnapshot) => void; auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void; - event: (ev: TaggedNostrEvent) => void; + event: (id: string, ev: TaggedNostrEvent) => void; +} + +export interface NostrsystemProps { + relayCache?: FeedCache; + profileCache?: FeedCache; + relayMetrics?: FeedCache; + eventsCache?: FeedCache; + queryOptimizer?: QueryOptimizer; + db?: SnortSystemDb; + checkSigs?: boolean; } /** @@ -37,11 +48,7 @@ interface NostrSystemEvents { */ export class NostrSystem extends EventEmitter implements SystemInterface { #log = debug("System"); - - /** - * All currently connected websockets - */ - #sockets = new Map(); + #pool = new NostrConnectionPool(); /** * All active queries @@ -90,15 +97,7 @@ export class NostrSystem extends EventEmitter implements Syst #relayLoader: RelayMetadataLoader; - constructor(props: { - relayCache?: FeedCache; - profileCache?: FeedCache; - relayMetrics?: FeedCache; - eventsCache?: FeedCache; - queryOptimizer?: QueryOptimizer; - db?: SnortSystemDb; - checkSigs?: boolean; - }) { + constructor(props: NostrsystemProps) { super(); this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); @@ -111,6 +110,67 @@ export class NostrSystem extends EventEmitter implements Syst this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache); this.checkSigs = props.checkSigs ?? true; this.#cleanup(); + + // hook connection pool + this.#pool.on("connected", (id, wasReconnect) => { + const c = this.#pool.getConnection(id); + if (c) { + this.#relayMetrics.onConnect(c.Address); + if (wasReconnect) { + for (const [, q] of this.Queries) { + q.connectionRestored(c); + } + } + } + }); + this.#pool.on("connectFailed", address => { + this.#relayMetrics.onDisconnect(address, 0); + }); + this.#pool.on("event", (_, sub, ev) => { + ev.relays?.length && this.#relayMetrics.onEvent(ev.relays[0]); + + if (!EventExt.isValid(ev)) { + this.#log("Rejecting invalid event %O", ev); + return; + } + if (this.checkSigs) { + const id = EventExt.createId(ev); + if (!this.#queryOptimizer.schnorrVerify(id, ev.sig, ev.pubkey)) { + this.#log("Invalid sig %O", ev); + return; + } + } + + this.emit("event", sub, ev); + }); + this.#pool.on("disconnect", (id, code) => { + const c = this.#pool.getConnection(id); + if (c) { + this.#relayMetrics.onDisconnect(c.Address, code); + for (const [, q] of this.Queries) { + q.connectionLost(c.Id); + } + } + }); + this.#pool.on("eose", (id, sub) => { + const c = this.#pool.getConnection(id); + if (c) { + for (const [, v] of this.Queries) { + v.eose(sub, c); + } + } + }); + this.#pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb)); + this.#pool.on("notice", (addr, msg) => { + this.#log("NOTICE: %s %s", addr, msg); + }); + + // internal handler for on-event + this.on("event", (sub, ev) => { + for (const [, v] of this.Queries) { + v.handleEvent(sub, ev); + } + }); } get ProfileLoader() { @@ -118,7 +178,7 @@ export class NostrSystem extends EventEmitter implements Syst } get Sockets(): ConnectionStateSnapshot[] { - return [...this.#sockets.values()].map(a => a.takeSnapshot()); + return this.#pool.getState(); } get RelayCache(): RelayCache { @@ -129,9 +189,6 @@ export class NostrSystem extends EventEmitter implements Syst return this.#queryOptimizer; } - /** - * Setup caches - */ async Init() { const t = [ this.#relayCache.preload(), @@ -142,109 +199,16 @@ export class NostrSystem extends EventEmitter implements Syst await Promise.all(t); } - /** - * Connect to a NOSTR relay if not already connected - */ async ConnectToRelay(address: string, options: RelaySettings) { - const addr = unwrap(sanitizeRelayUrl(address)); - try { - const existing = this.#sockets.get(addr); - if (!existing) { - const c = new Connection(addr, options); - this.#sockets.set(addr, c); - c.on("event", (s, e) => this.#onEvent(s, e)); - c.on("eose", s => this.#onEndOfStoredEvents(c, s)); - c.on("disconnect", code => this.#onRelayDisconnect(c, code)); - c.on("connected", r => this.#onRelayConnected(c, r)); - c.on("auth", (c, r, cb) => this.emit("auth", c, r, cb)); - await c.Connect(); - } else { - // update settings if already connected - existing.Settings = options; - existing.Ephemeral = false; - } - } catch (e) { - console.error(e); - this.#relayMetrics.onDisconnect(addr, 0); - } + await this.#pool.connect(address, options, false); } - #onRelayConnected(c: Connection, wasReconnect: boolean) { - this.#relayMetrics.onConnect(c.Address); - if (wasReconnect) { - for (const [, q] of this.Queries) { - q.connectionRestored(c); - } - } + ConnectEphemeralRelay(address: string) { + return this.#pool.connect(address, { read: true, write: true }, true); } - #onRelayDisconnect(c: Connection, code: number) { - this.#relayMetrics.onDisconnect(c.Address, code); - for (const [, q] of this.Queries) { - q.connectionLost(c.Id); - } - } - - #onEndOfStoredEvents(c: Readonly, sub: string) { - for (const [, v] of this.Queries) { - v.eose(sub, c); - } - } - - #onEvent(sub: string, ev: TaggedNostrEvent) { - ev.relays?.length && this.#relayMetrics.onEvent(ev.relays[0]); - - if (!EventExt.isValid(ev)) { - this.#log("Rejecting invalid event %O", ev); - return; - } - if (this.checkSigs) { - const id = EventExt.createId(ev); - if (!this.#queryOptimizer.schnorrVerify(id, ev.sig, ev.pubkey)) { - this.#log("Invalid sig %O", ev); - return; - } - } - - this.emit("event", ev); - - for (const [, v] of this.Queries) { - v.handleEvent(sub, ev); - } - } - - /** - * - * @param address Relay address URL - */ - async ConnectEphemeralRelay(address: string): Promise { - try { - const addr = unwrap(sanitizeRelayUrl(address)); - if (!this.#sockets.has(addr)) { - const c = new Connection(addr, { read: true, write: true }, true); - this.#sockets.set(addr, c); - c.on("event", (s, e) => this.#onEvent(s, e)); - c.on("eose", s => this.#onEndOfStoredEvents(c, s)); - c.on("disconnect", code => this.#onRelayDisconnect(c, code)); - c.on("connected", r => this.#onRelayConnected(c, r)); - c.on("auth", (c, r, cb) => this.emit("auth", c, r, cb)); - await c.Connect(); - return c; - } - } catch (e) { - console.error(e); - } - } - - /** - * Disconnect from a relay - */ DisconnectRelay(address: string) { - const c = this.#sockets.get(address); - if (c) { - this.#sockets.delete(address); - c.Close(); - } + this.#pool.disconnect(address); } GetQuery(id: string): Query | undefined { @@ -354,7 +318,7 @@ export class NostrSystem extends EventEmitter implements Syst if (qSend.relay) { this.#log("Sending query to %s %O", qSend.relay, qSend); - const s = this.#sockets.get(qSend.relay); + const s = this.#pool.getConnection(qSend.relay); if (s) { const qt = q.sendToRelay(s, qSend); if (qt) { @@ -373,7 +337,7 @@ export class NostrSystem extends EventEmitter implements Syst } } else { const ret = []; - for (const [a, s] of this.#sockets) { + for (const [a, s] of this.#pool) { if (!s.Ephemeral) { this.#log("Sending query to %s %O", a, qSend); const qt = q.sendToRelay(s, qSend); @@ -388,58 +352,16 @@ export class NostrSystem extends EventEmitter implements Syst } HandleEvent(ev: TaggedNostrEvent) { - this.#onEvent("*", ev); + this.emit("event", "*", ev); } - /** - * Send events to writable relays - */ - async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void) { + async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise { this.HandleEvent({ ...ev, relays: [] }); - const socks = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write); - const replyRelays = await pickRelaysForReply(ev, this); - const oks = await Promise.all([ - ...socks.map(async s => { - try { - const rsp = await s.SendAsync(ev); - cb?.(rsp); - return rsp; - } catch (e) { - console.error(e); - } - return; - }), - ...replyRelays.filter(a => !this.#sockets.has(a)).map(a => this.WriteOnceToRelay(a, ev)), - ]); - return removeUndefined(oks); + return await this.#pool.broadcast(this, ev, cb); } - /** - * Write an event to a relay then disconnect - */ async WriteOnceToRelay(address: string, ev: NostrEvent): Promise { - const addrClean = sanitizeRelayUrl(address); - if (!addrClean) { - throw new Error("Invalid relay address"); - } - - const existing = this.#sockets.get(addrClean); - if (existing) { - return await existing.SendAsync(ev); - } else { - return await new Promise((resolve, reject) => { - const c = new Connection(address, { write: true, read: true }, true); - - const t = setTimeout(reject, 10_000); - c.once("connected", async () => { - clearTimeout(t); - const rsp = await c.SendAsync(ev); - c.Close(); - resolve(rsp); - }); - c.Connect(); - }); - } + return await this.#pool.broadcastTo(address, ev); } takeSnapshot(): SystemSnapshot { diff --git a/packages/system/src/worker/index.ts b/packages/system/src/worker/index.ts new file mode 100644 index 00000000..8d2372d9 --- /dev/null +++ b/packages/system/src/worker/index.ts @@ -0,0 +1,14 @@ + +export const enum NostrSystemCommand { + OkResponse, + ErrorResponse, + Init, + ConnectRelay, + DisconnectRelay +} + +export interface NostrSystemMessage { + id: string; + type: NostrSystemCommand; + data: T; +} diff --git a/packages/system/src/worker/system-worker-script.ts b/packages/system/src/worker/system-worker-script.ts new file mode 100644 index 00000000..abe2bcce --- /dev/null +++ b/packages/system/src/worker/system-worker-script.ts @@ -0,0 +1,59 @@ +/// + +import { NostrSystem, NostrsystemProps } from "../nostr-system"; +import { NostrSystemMessage, NostrSystemCommand } from "."; + +let system: NostrSystem | undefined; + +function reply(id: string, type: NostrSystemCommand, data: T) { + globalThis.postMessage({ + id, + type, + data, + } as NostrSystemMessage); +} +function okReply(id: string, message?: string) { + reply(id, NostrSystemCommand.OkResponse, message); +} +function errorReply(id: string, message: string) { + reply(id, NostrSystemCommand.ErrorResponse, message); +} +function checkInitialized() { + if (system === undefined) { + throw new Error("System not initialized"); + } +} + +globalThis.onmessage = async ev => { + const data = ev.data as { id: string; type: NostrSystemCommand }; + try { + switch (data.type) { + case NostrSystemCommand.Init: { + const cmd = ev.data as NostrSystemMessage; + if (system === undefined) { + system = new NostrSystem(cmd.data); + await system.Init(); + okReply(data.id); + } else { + errorReply(data.id, "System is already initialized"); + } + break; + } + case NostrSystemCommand.ConnectRelay: { + checkInitialized(); + const cmd = ev.data as NostrSystemMessage<[string, {read: boolean, write: boolean}]>; + await system?.ConnectToRelay(cmd.data[0], cmd.data[1]); + okReply(data.id, "Connected"); + break; + } + default: { + errorReply(data.id, "Unknown command"); + break; + } + } + } catch (e) { + if (e instanceof Error) { + errorReply(data.id, e.message); + } + } +}; diff --git a/packages/system/src/worker/system-worker.ts b/packages/system/src/worker/system-worker.ts new file mode 100644 index 00000000..3ebd4130 --- /dev/null +++ b/packages/system/src/worker/system-worker.ts @@ -0,0 +1,109 @@ +import { v4 as uuid } from "uuid"; +import EventEmitter from "eventemitter3"; +import { + ConnectionStateSnapshot, + NostrEvent, + NoteStore, + OkResponse, + ProfileLoaderService, + QueryOptimizer, + RelayCache, + RelaySettings, + RequestBuilder, + SystemInterface, + TaggedNostrEvent, +} from ".."; +import { NostrSystemEvents, NostrsystemProps } from "../nostr-system"; +import { Query } from "../query"; +import { NostrSystemCommand, NostrSystemMessage } from "."; + +export class SystemWorker extends EventEmitter implements SystemInterface { + #worker: Worker; + #commandQueue: Map void> = new Map(); + checkSigs: boolean; + + constructor(scriptPath: string, props: NostrsystemProps) { + super(); + this.checkSigs = props.checkSigs ?? false; + + this.#worker = new Worker(scriptPath, { + name: "SystemWorker", + }); + } + + get Sockets(): ConnectionStateSnapshot[] { + throw new Error("Method not implemented."); + } + + async Init() { + await this.#workerRpc(NostrSystemCommand.Init, undefined); + } + + GetQuery(id: string): Query | undefined { + return undefined; + } + + Query(type: new () => T, req: RequestBuilder): Query { + throw new Error("Method not implemented."); + } + + Fetch(req: RequestBuilder, cb?: ((evs: TaggedNostrEvent[]) => void) | undefined): Promise { + throw new Error("Method not implemented."); + } + + ConnectToRelay(address: string, options: RelaySettings): Promise { + throw new Error("Method not implemented."); + } + + DisconnectRelay(address: string): void { + throw new Error("Method not implemented."); + } + + HandleEvent(ev: TaggedNostrEvent): void { + throw new Error("Method not implemented."); + } + + BroadcastEvent(ev: NostrEvent, cb?: ((rsp: OkResponse) => void) | undefined): Promise { + throw new Error("Method not implemented."); + } + + WriteOnceToRelay(relay: string, ev: NostrEvent): Promise { + throw new Error("Method not implemented."); + } + + get ProfileLoader(): ProfileLoaderService { + throw new Error("Method not implemented."); + } + + get RelayCache(): RelayCache { + throw new Error("Method not implemented."); + } + + get QueryOptimizer(): QueryOptimizer { + throw new Error("Method not implemented."); + } + + #workerRpc(type: NostrSystemCommand, data: T, timeout = 5_000) { + const id = uuid(); + this.#worker.postMessage({ + id, + type, + data, + } as NostrSystemMessage); + return new Promise((resolve, reject) => { + let t: ReturnType; + this.#commandQueue.set(id, v => { + clearTimeout(t); + const cmdReply = v as NostrSystemMessage; + if (cmdReply.type === NostrSystemCommand.OkResponse) { + resolve(cmdReply.data); + } else { + reject(cmdReply.data); + } + }); + t = setTimeout(() => { + reject("timeout"); + }, timeout); + }); + } +}