diff --git a/package.json b/package.json index 54a9d996..dddd46fc 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,8 @@ "printWidth": 120, "bracketSameLine": true, "arrowParens": "avoid", - "trailingComma": "all" + "trailingComma": "all", + "endOfLine": "lf" }, "packageManager": "yarn@3.6.3", "dependencies": { diff --git a/packages/app/src/Pages/settings/tools/follows-relay-health.tsx b/packages/app/src/Pages/settings/tools/follows-relay-health.tsx index 0ecf8df0..ec676cb6 100644 --- a/packages/app/src/Pages/settings/tools/follows-relay-health.tsx +++ b/packages/app/src/Pages/settings/tools/follows-relay-health.tsx @@ -1,5 +1,5 @@ import { dedupe } from "@snort/shared"; -import { pickTopRelays } from "@snort/system"; +import { OutboxModel } from "@snort/system"; import { SnortContext } from "@snort/system-react"; import { ReactNode, useContext, useMemo } from "react"; import { FormattedMessage, FormattedNumber } from "react-intl"; @@ -31,7 +31,8 @@ export function FollowsRelayHealth({ }, [hasRelays]); const topWriteRelays = useMemo(() => { - return pickTopRelays(system.relayCache, uniqueFollows, 1e31, "write"); + const outbox = OutboxModel.fromSystem(system); + return outbox.pickTopRelays(uniqueFollows, 1e31, "write"); }, [uniqueFollows]); return ( diff --git a/packages/system/package.json b/packages/system/package.json index 9654c87c..9e475c65 100644 --- a/packages/system/package.json +++ b/packages/system/package.json @@ -1,6 +1,6 @@ { "name": "@snort/system", - "version": "1.2.8", + "version": "1.2.9", "description": "Snort nostr system package", "type": "module", "main": "dist/index.js", diff --git a/packages/system/src/connection-pool.ts b/packages/system/src/connection-pool.ts index bc7dbcd5..b4ccbf39 100644 --- a/packages/system/src/connection-pool.ts +++ b/packages/system/src/connection-pool.ts @@ -4,7 +4,6 @@ import EventEmitter from "eventemitter3"; import { Connection, RelaySettings } from "./connection"; import { NostrEvent, OkResponse, TaggedNostrEvent } from "./nostr"; -import { pickRelaysForReply } from "./outbox-model"; import { SystemInterface } from "."; import LRUSet from "@snort/shared/src/LRUSet"; @@ -22,7 +21,7 @@ export type ConnectionPool = { getConnection(id: string): Connection | undefined; connect(address: string, options: RelaySettings, ephemeral: boolean): Promise; disconnect(address: string): void; - broadcast(system: SystemInterface, ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise; + broadcast(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise; broadcastTo(address: string, ev: NostrEvent): Promise; } & EventEmitter & Iterable<[string, Connection]>; @@ -126,9 +125,9 @@ export class DefaultConnectionPool extends EventEmitter void) { + async broadcast(ev: NostrEvent, cb?: (rsp: OkResponse) => void) { const writeRelays = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write); - const replyRelays = await pickRelaysForReply(ev, system); + const replyRelays = (await this.#system.requestRouter?.forReply(ev)) ?? []; const oks = await Promise.all([ ...writeRelays.map(async s => { try { @@ -140,7 +139,7 @@ export class DefaultConnectionPool extends EventEmitter !this.#sockets.has(unwrap(sanitizeRelayUrl(a)))).map(a => this.broadcastTo(a, ev)), + ...replyRelays?.filter(a => !this.#sockets.has(unwrap(sanitizeRelayUrl(a)))).map(a => this.broadcastTo(a, ev)), ]); return removeUndefined(oks); } diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index 67a2388a..91dae407 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -2,7 +2,8 @@ import { RelaySettings } from "./connection"; import { RequestBuilder } from "./request-builder"; import { NostrEvent, OkResponse, ReqFilter, TaggedNostrEvent } from "./nostr"; import { ProfileLoaderService } from "./profile-cache"; -import { AuthorsRelaysCache, RelayMetadataLoader } from "./outbox-model"; +import { AuthorsRelaysCache } from "./outbox"; +import { RelayMetadataLoader } from "outbox/relay-loader"; import { Optimizer } from "./query-optimizer"; import { base64 } from "@scure/base"; import { CachedTable } from "@snort/shared"; @@ -10,6 +11,7 @@ import { ConnectionPool } from "./connection-pool"; import EventEmitter from "eventemitter3"; import { QueryEvents } from "./query"; import { CacheRelay } from "./cache-relay"; +import { RequestRouter } from "./request-router"; export { NostrSystem } from "./nostr-system"; export { default as EventKind } from "./event-kind"; @@ -34,7 +36,7 @@ export * from "./pow"; export * from "./pow-util"; export * from "./query-optimizer"; export * from "./encrypted"; -export * from "./outbox-model"; +export * from "./outbox"; export * from "./impl/nip4"; export * from "./impl/nip44"; @@ -155,6 +157,11 @@ export interface SystemInterface { * Local relay cache service */ get cacheRelay(): CacheRelay | undefined; + + /** + * Request router instance + */ + get requestRouter(): RequestRouter | undefined; } export interface SystemSnapshot { diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 21c77ff4..3d0fc803 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -18,13 +18,15 @@ import { UsersRelays, SnortSystemDb, QueryLike, + OutboxModel, } from "."; import { EventsCache } from "./cache/events"; -import { RelayMetadataLoader } from "./outbox-model"; +import { RelayMetadataLoader } from "./outbox"; import { Optimizer, DefaultOptimizer } from "./query-optimizer"; import { ConnectionPool, DefaultConnectionPool } from "./connection-pool"; import { QueryManager } from "./query-manager"; import { CacheRelay } from "./cache-relay"; +import { RequestRouter } from "request-router"; export interface NostrSystemEvents { change: (state: SystemSnapshot) => void; @@ -33,15 +35,54 @@ export interface NostrSystemEvents { request: (subId: string, filter: BuiltRawReqFilter) => void; } -export interface NostrsystemProps { - relayCache?: CachedTable; - profileCache?: CachedTable; - relayMetrics?: CachedTable; - eventsCache?: CachedTable; - cacheRelay?: CacheRelay; - optimizer?: Optimizer; +export interface SystemConfig { + /** + * Users configured relays (via kind 3 or kind 10_002) + */ + relays: CachedTable; + + /** + * Cache of user profiles, (kind 0) + */ + profiles: CachedTable; + + /** + * Cache of relay connection stats + */ + relayMetrics: CachedTable; + + /** + * Direct reference events cache + */ + events: CachedTable; + + /** + * Optimized cache relay, usually `@snort/worker-relay` + */ + cachingRelay?: CacheRelay; + + /** + * Optimized functions, usually `@snort/system-wasm` + */ + optimizer: Optimizer; + + /** + * Dexie database storage, usually `@snort/system-web` + */ db?: SnortSystemDb; - checkSigs?: boolean; + + /** + * Check event sigs on receive from relays + */ + checkSigs: boolean; + + /** + * Automatically handle outbox model + * + * 1. Fetch relay lists automatically for queried authors + * 2. Write to inbox for all `p` tagged users in broadcasting events + */ + automaticOutboxModel: boolean; } /** @@ -50,60 +91,83 @@ export interface NostrsystemProps { export class NostrSystem extends EventEmitter implements SystemInterface { #log = debug("System"); #queryManager: QueryManager; + #config: SystemConfig; /** * Storage class for user relay lists */ - readonly relayCache: CachedTable; + get relayCache(): CachedTable { + return this.#config.relays; + } /** * Storage class for user profiles */ - readonly profileCache: CachedTable; + get profileCache(): CachedTable { + return this.#config.profiles; + } /** * Storage class for relay metrics (connects/disconnects) */ - readonly relayMetricsCache: CachedTable; - - /** - * Profile loading service - */ - readonly profileLoader: ProfileLoaderService; - - /** - * Relay metrics handler cache - */ - readonly relayMetricsHandler: RelayMetricHandler; + get relayMetricsCache(): CachedTable { + return this.#config.relayMetrics; + } /** * Optimizer instance, contains optimized functions for processing data */ - readonly optimizer: Optimizer; + get optimizer(): Optimizer { + return this.#config.optimizer; + } - readonly pool: ConnectionPool; - readonly eventsCache: CachedTable; - readonly relayLoader: RelayMetadataLoader; - readonly cacheRelay: CacheRelay | undefined; + get eventsCache(): CachedTable { + return this.#config.events; + } + + get cacheRelay(): CacheRelay | undefined { + return this.#config.cachingRelay; + } /** - * Check event signatures (reccomended) + * Check event signatures (recommended) */ - checkSigs: boolean; + get checkSigs(): boolean { + return this.#config.checkSigs; + } - constructor(props: NostrsystemProps) { + set checkSigs(v: boolean) { + this.#config.checkSigs = v; + } + + readonly profileLoader: ProfileLoaderService; + readonly relayMetricsHandler: RelayMetricHandler; + readonly pool: ConnectionPool; + readonly relayLoader: RelayMetadataLoader; + readonly requestRouter: RequestRouter | undefined; + + constructor(props: Partial) { super(); - this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); - this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); - this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); - this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); - this.optimizer = props.optimizer ?? DefaultOptimizer; - this.cacheRelay = props.cacheRelay; + this.#config = { + relays: props.relays ?? new UserRelaysCache(props.db?.userRelays), + profiles: props.profiles ?? new UserProfileCache(props.db?.users), + relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics), + events: props.events ?? new EventsCache(props.db?.events), + optimizer: props.optimizer ?? DefaultOptimizer, + checkSigs: props.checkSigs ?? false, + cachingRelay: props.cachingRelay, + db: props.db, + automaticOutboxModel: props.automaticOutboxModel ?? true, + }; this.profileLoader = new ProfileLoaderService(this, this.profileCache); this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache); this.relayLoader = new RelayMetadataLoader(this, this.relayCache); - this.checkSigs = props.checkSigs ?? true; + + // if automatic outbox model, setup request router as OutboxModel + if (this.#config.automaticOutboxModel) { + this.requestRouter = OutboxModel.fromSystem(this); + } this.pool = new DefaultConnectionPool(this); this.#queryManager = new QueryManager(this); @@ -196,7 +260,7 @@ export class NostrSystem extends EventEmitter implements Syst async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise { this.HandleEvent("*", { ...ev, relays: [] }); - return await this.pool.broadcast(this, ev, cb); + return await this.pool.broadcast(ev, cb); } async WriteOnceToRelay(address: string, ev: NostrEvent): Promise { diff --git a/packages/system/src/outbox-model.ts b/packages/system/src/outbox-model.ts deleted file mode 100644 index 52331615..00000000 --- a/packages/system/src/outbox-model.ts +++ /dev/null @@ -1,318 +0,0 @@ -import { - EventKind, - FullRelaySettings, - NostrEvent, - ReqFilter, - RequestBuilder, - SystemInterface, - TaggedNostrEvent, - UsersRelays, -} from "."; -import { dedupe, removeUndefined, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared"; -import debug from "debug"; -import { FlatReqFilter } from "./query-optimizer"; -import { RelayListCacheExpire } from "./const"; -import { BackgroundLoader } from "./background-loader"; - -const DefaultPickNRelays = 2; - -export interface RelayTaggedFilter { - relay: string; - filter: ReqFilter; -} - -export interface RelayTaggedFlatFilters { - relay: string; - filters: Array; -} - -export interface RelayTaggedFilters { - relay: string; - filters: Array; -} - -const logger = debug("OutboxModel"); - -export interface AuthorsRelaysCache { - getFromCache(pubkey?: string): UsersRelays | undefined; - update(obj: UsersRelays): Promise<"new" | "updated" | "refresh" | "no_change">; - buffer(keys: Array): Promise>; - bulkSet(objs: Array): Promise; -} - -export function splitAllByWriteRelays(cache: AuthorsRelaysCache, filters: Array) { - const allSplit = filters - .map(a => splitByWriteRelays(cache, a)) - .reduce((acc, v) => { - for (const vn of v) { - const existing = acc.get(vn.relay); - if (existing) { - existing.push(vn.filter); - } else { - acc.set(vn.relay, [vn.filter]); - } - } - return acc; - }, new Map>()); - - return [...allSplit.entries()].map(([k, v]) => { - return { - relay: k, - filters: v, - } as RelayTaggedFilters; - }); -} - -/** - * Split filters by authors - */ -export function splitByWriteRelays( - cache: AuthorsRelaysCache, - filter: ReqFilter, - pickN?: number, -): Array { - const authors = filter.authors; - if ((authors?.length ?? 0) === 0) { - return [ - { - relay: "", - filter, - }, - ]; - } - - const topRelays = pickTopRelays(cache, unwrap(authors), pickN ?? DefaultPickNRelays, "write"); - const pickedRelays = dedupe(topRelays.flatMap(a => a.relays)); - - const picked = pickedRelays.map(a => { - const keysOnPickedRelay = dedupe(topRelays.filter(b => b.relays.includes(a)).map(b => b.key)); - return { - relay: a, - filter: { - ...filter, - authors: keysOnPickedRelay, - }, - } as RelayTaggedFilter; - }); - const noRelays = dedupe(topRelays.filter(a => a.relays.length === 0).map(a => a.key)); - if (noRelays.length > 0) { - picked.push({ - relay: "", - filter: { - ...filter, - authors: noRelays, - }, - }); - } - logger("Picked %O => %O", filter, picked); - return picked; -} - -/** - * Split filters by author - */ -export function splitFlatByWriteRelays( - cache: AuthorsRelaysCache, - input: Array, - pickN?: number, -): Array { - const authors = input.filter(a => a.authors).map(a => unwrap(a.authors)); - if (authors.length === 0) { - return [ - { - relay: "", - filters: input, - }, - ]; - } - const topRelays = pickTopRelays(cache, authors, pickN ?? DefaultPickNRelays, "write"); - const pickedRelays = dedupe(topRelays.flatMap(a => a.relays)); - - const picked = pickedRelays.map(a => { - const authorsOnRelay = new Set(topRelays.filter(v => v.relays.includes(a)).map(v => v.key)); - return { - relay: a, - filters: input.filter(v => v.authors && authorsOnRelay.has(v.authors)), - } as RelayTaggedFlatFilters; - }); - const noRelays = new Set(topRelays.filter(v => v.relays.length === 0).map(v => v.key)); - if (noRelays.size > 0) { - picked.push({ - relay: "", - filters: input.filter(v => !v.authors || noRelays.has(v.authors)), - } as RelayTaggedFlatFilters); - } - - logger("Picked %d relays from %d filters", picked.length, input.length); - return picked; -} - -/** - * Pick most popular relays for each authors - */ -export function pickTopRelays(cache: AuthorsRelaysCache, authors: Array, n: number, type: "write" | "read") { - // map of pubkey -> [write relays] - const allRelays = authors.map(a => { - return { - key: a, - relays: cache - .getFromCache(a) - ?.relays?.filter(a => (type === "write" ? a.settings.write : a.settings.read)) - .sort(() => (Math.random() < 0.5 ? 1 : -1)), - }; - }); - - const missing = allRelays.filter(a => a.relays === undefined || a.relays.length === 0); - const hasRelays = allRelays.filter(a => a.relays !== undefined && a.relays.length > 0); - - // map of relay -> [pubkeys] - const relayUserMap = hasRelays.reduce((acc, v) => { - for (const r of unwrap(v.relays)) { - if (!acc.has(r.url)) { - acc.set(r.url, new Set([v.key])); - } else { - unwrap(acc.get(r.url)).add(v.key); - } - } - return acc; - }, new Map>()); - - // selection algo will just pick relays with the most users - const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size); - - // - count keys per relay - // - pick n top relays - // - map keys per relay (for subscription filter) - - return hasRelays - .map(k => { - // pick top N relays for this key - const relaysForKey = topRelays - .filter(([, v]) => v.has(k.key)) - .slice(0, n) - .map(([k]) => k); - return { key: k.key, relays: relaysForKey }; - }) - .concat( - missing.map(a => { - return { - key: a.key, - relays: [], - }; - }), - ); -} - -/** - * Pick read relays for sending reply events - */ -export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface, pickN?: number) { - const recipients = dedupe(ev.tags.filter(a => a[0] === "p").map(a => a[1])); - await updateRelayLists(recipients, system); - const relays = pickTopRelays(system.relayCache, recipients, pickN ?? DefaultPickNRelays, "read"); - const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat())); - logger("Picked %O from authors %O", ret, recipients); - return ret; -} - -export function parseRelayTag(tag: Array) { - return { - url: sanitizeRelayUrl(tag[1]), - settings: { - read: tag[2] === "read" || tag[2] === undefined, - write: tag[2] === "write" || tag[2] === undefined, - }, - } as FullRelaySettings; -} - -export function parseRelayTags(tag: Array>) { - return tag.map(parseRelayTag).filter(a => a !== null); -} - -export function parseRelaysFromKind(ev: NostrEvent) { - if (ev.kind === EventKind.ContactList) { - const relaysInContent = - ev.content.length > 0 ? (JSON.parse(ev.content) as Record) : undefined; - if (relaysInContent) { - return Object.entries(relaysInContent).map( - ([k, v]) => - ({ - url: sanitizeRelayUrl(k), - settings: { - read: v.read, - write: v.write, - }, - }) as FullRelaySettings, - ); - } - } else if (ev.kind === EventKind.Relays) { - return parseRelayTags(ev.tags); - } -} - -export async function updateRelayLists(authors: Array, system: SystemInterface) { - await system.relayCache.buffer(authors); - const expire = unixNowMs() - RelayListCacheExpire; - const expired = authors.filter(a => (system.relayCache.getFromCache(a)?.loaded ?? 0) < expire); - if (expired.length > 0) { - logger("Updating relays for authors: %O", expired); - const rb = new RequestBuilder("system-update-relays-for-outbox"); - rb.withFilter().authors(expired).kinds([EventKind.Relays, EventKind.ContactList]); - const relayLists = await system.Fetch(rb); - await system.relayCache.bulkSet( - removeUndefined( - relayLists.map(a => { - const relays = parseRelaysFromKind(a); - if (!relays) return; - return { - relays: relays, - pubkey: a.pubkey, - created: a.created_at, - loaded: unixNowMs(), - }; - }), - ), - ); - } -} - -export class RelayMetadataLoader extends BackgroundLoader { - override name(): string { - return "RelayMetadataLoader"; - } - - override onEvent(e: Readonly): UsersRelays | undefined { - const relays = parseRelaysFromKind(e); - if (!relays) return; - return { - relays: relays, - pubkey: e.pubkey, - created: e.created_at, - loaded: unixNowMs(), - }; - } - - override getExpireCutoff(): number { - return unixNowMs() - RelayListCacheExpire; - } - - protected override buildSub(missing: string[]): RequestBuilder { - const rb = new RequestBuilder("relay-loader"); - rb.withOptions({ - skipDiff: true, - timeout: 10_000, - outboxPickN: 4, - }); - rb.withFilter().authors(missing).kinds([EventKind.Relays, EventKind.ContactList]); - return rb; - } - - protected override makePlaceholder(key: string): UsersRelays | undefined { - return { - relays: [], - pubkey: key, - created: 0, - loaded: this.getExpireCutoff() + 300_000, - }; - } -} diff --git a/packages/system/src/outbox/index.ts b/packages/system/src/outbox/index.ts new file mode 100644 index 00000000..47500feb --- /dev/null +++ b/packages/system/src/outbox/index.ts @@ -0,0 +1,58 @@ +import { EventKind, FullRelaySettings, NostrEvent, SystemInterface, UsersRelays } from ".."; +import { sanitizeRelayUrl } from "@snort/shared"; + +export const DefaultPickNRelays = 2; + +export interface AuthorsRelaysCache { + getFromCache(pubkey?: string): UsersRelays | undefined; + update(obj: UsersRelays): Promise<"new" | "updated" | "refresh" | "no_change">; + buffer(keys: Array): Promise>; + bulkSet(objs: Array): Promise; +} + +export interface PickedRelays { + key: string; + relays: Array; +} + +export type EventFetcher = { + Fetch: SystemInterface["Fetch"]; +}; + +export function parseRelayTag(tag: Array) { + return { + url: sanitizeRelayUrl(tag[1]), + settings: { + read: tag[2] === "read" || tag[2] === undefined, + write: tag[2] === "write" || tag[2] === undefined, + }, + } as FullRelaySettings; +} + +export function parseRelayTags(tag: Array>) { + return tag.map(parseRelayTag).filter(a => a !== null); +} + +export function parseRelaysFromKind(ev: NostrEvent) { + if (ev.kind === EventKind.ContactList) { + const relaysInContent = + ev.content.length > 0 ? (JSON.parse(ev.content) as Record) : undefined; + if (relaysInContent) { + return Object.entries(relaysInContent).map( + ([k, v]) => + ({ + url: sanitizeRelayUrl(k), + settings: { + read: v.read, + write: v.write, + }, + }) as FullRelaySettings, + ); + } + } else if (ev.kind === EventKind.Relays) { + return parseRelayTags(ev.tags); + } +} + +export * from "./outbox-model"; +export * from "./relay-loader"; diff --git a/packages/system/src/outbox/outbox-model.ts b/packages/system/src/outbox/outbox-model.ts new file mode 100644 index 00000000..60d1df57 --- /dev/null +++ b/packages/system/src/outbox/outbox-model.ts @@ -0,0 +1,213 @@ +import { EventKind, NostrEvent, ReqFilter, RequestBuilder, SystemInterface } from ".."; +import { dedupe, removeUndefined, unixNowMs, unwrap } from "@snort/shared"; +import { FlatReqFilter } from "../query-optimizer"; +import { RelayListCacheExpire } from "../const"; +import { AuthorsRelaysCache, EventFetcher, PickedRelays, DefaultPickNRelays, parseRelaysFromKind } from "."; +import debug from "debug"; +import { BaseRequestRouter, RelayTaggedFilter, RelayTaggedFlatFilters } from "../request-router"; + +/** + * Simple outbox model using most popular relays + */ +export class OutboxModel extends BaseRequestRouter { + #log = debug("OutboxModel"); + #relays: AuthorsRelaysCache; + #fetcher: EventFetcher; + + constructor(relays: AuthorsRelaysCache, fetcher: EventFetcher) { + super(); + this.#relays = relays; + this.#fetcher = fetcher; + } + + static fromSystem(system: SystemInterface) { + return new OutboxModel(system.relayCache, system); + } + + /** + * Pick top relays for each user + * @param authors The authors whos relays will be picked + * @param pickN Number of relays to pick per pubkey + * @param type Read/Write relays + * @returns + */ + pickTopRelays(authors: Array, pickN: number, type: "write" | "read"): Array { + // map of pubkey -> [write relays] + const allRelays = authors.map(a => { + return { + key: a, + relays: this.#relays + .getFromCache(a) + ?.relays?.filter(a => (type === "write" ? a.settings.write : a.settings.read)) + .sort(() => (Math.random() < 0.5 ? 1 : -1)), + }; + }); + + const missing = allRelays.filter(a => a.relays === undefined || a.relays.length === 0); + const hasRelays = allRelays.filter(a => a.relays !== undefined && a.relays.length > 0); + + // map of relay -> [pubkeys] + const relayUserMap = hasRelays.reduce((acc, v) => { + for (const r of unwrap(v.relays)) { + if (!acc.has(r.url)) { + acc.set(r.url, new Set([v.key])); + } else { + unwrap(acc.get(r.url)).add(v.key); + } + } + return acc; + }, new Map>()); + + // selection algo will just pick relays with the most users + const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size); + + // - count keys per relay + // - pick n top relays + // - map keys per relay (for subscription filter) + return hasRelays + .map(k => { + // pick top N relays for this key + const relaysForKey = topRelays + .filter(([, v]) => v.has(k.key)) + .slice(0, pickN) + .map(([k]) => k); + return { key: k.key, relays: relaysForKey }; + }) + .concat( + missing.map(a => { + return { + key: a.key, + relays: [], + }; + }), + ); + } + + /** + * Split a request filter by authors + * @param filter Filter to split + * @param pickN Number of relays to pick per author + * @returns + */ + forRequest(filter: ReqFilter, pickN?: number): Array { + const authors = filter.authors; + if ((authors?.length ?? 0) === 0) { + return [ + { + relay: "", + filter, + }, + ]; + } + + const topRelays = this.pickTopRelays(unwrap(authors), pickN ?? DefaultPickNRelays, "write"); + const pickedRelays = dedupe(topRelays.flatMap(a => a.relays)); + + const picked = pickedRelays.map(a => { + const keysOnPickedRelay = dedupe(topRelays.filter(b => b.relays.includes(a)).map(b => b.key)); + return { + relay: a, + filter: { + ...filter, + authors: keysOnPickedRelay, + }, + } as RelayTaggedFilter; + }); + const noRelays = dedupe(topRelays.filter(a => a.relays.length === 0).map(a => a.key)); + if (noRelays.length > 0) { + picked.push({ + relay: "", + filter: { + ...filter, + authors: noRelays, + }, + }); + } + this.#log("Picked %O => %O", filter, picked); + return picked; + } + + /** + * Split a flat request filter by authors + * @param filter Filter to split + * @param pickN Number of relays to pick per author + * @returns + */ + forFlatRequest(input: Array, pickN?: number): Array { + const authors = input.filter(a => a.authors).map(a => unwrap(a.authors)); + if (authors.length === 0) { + return [ + { + relay: "", + filters: input, + }, + ]; + } + const topRelays = this.pickTopRelays(authors, pickN ?? DefaultPickNRelays, "write"); + const pickedRelays = dedupe(topRelays.flatMap(a => a.relays)); + + const picked = pickedRelays.map(a => { + const authorsOnRelay = new Set(topRelays.filter(v => v.relays.includes(a)).map(v => v.key)); + return { + relay: a, + filters: input.filter(v => v.authors && authorsOnRelay.has(v.authors)), + } as RelayTaggedFlatFilters; + }); + const noRelays = new Set(topRelays.filter(v => v.relays.length === 0).map(v => v.key)); + if (noRelays.size > 0) { + picked.push({ + relay: "", + filters: input.filter(v => !v.authors || noRelays.has(v.authors)), + } as RelayTaggedFlatFilters); + } + + this.#log("Picked %d relays from %d filters", picked.length, input.length); + return picked; + } + + /** + * Pick relay inboxs for replies + * @param ev The reply event to send + * @param system Nostr system interface + * @param pickN Number of relays to pick per recipient + * @returns + */ + async forReply(ev: NostrEvent, pickN?: number) { + const recipients = dedupe([ev.pubkey, ...ev.tags.filter(a => a[0] === "p").map(a => a[1])]); + await this.updateRelayLists(recipients); + const relays = this.pickTopRelays(recipients, pickN ?? DefaultPickNRelays, "read"); + const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat())); + this.#log("Picked %O from authors %O", ret, recipients); + return ret; + } + + /** + * Update relay cache with latest relay lists + * @param authors The authors to update relay lists for + */ + async updateRelayLists(authors: Array) { + await this.#relays.buffer(authors); + const expire = unixNowMs() - RelayListCacheExpire; + const expired = authors.filter(a => (this.#relays.getFromCache(a)?.loaded ?? 0) < expire); + if (expired.length > 0) { + this.#log("Updating relays for authors: %O", expired); + const rb = new RequestBuilder("system-update-relays-for-outbox"); + rb.withFilter().authors(expired).kinds([EventKind.Relays, EventKind.ContactList]); + const relayLists = await this.#fetcher.Fetch(rb); + await this.#relays.bulkSet( + removeUndefined( + relayLists.map(a => { + const relays = parseRelaysFromKind(a); + if (!relays) return; + return { + relays: relays, + pubkey: a.pubkey, + created: a.created_at, + loaded: unixNowMs(), + }; + }), + ), + ); + } + } +} diff --git a/packages/system/src/outbox/relay-loader.ts b/packages/system/src/outbox/relay-loader.ts new file mode 100644 index 00000000..1ac0d073 --- /dev/null +++ b/packages/system/src/outbox/relay-loader.ts @@ -0,0 +1,46 @@ +import { EventKind, RequestBuilder, TaggedNostrEvent, UsersRelays } from ".."; +import { unixNowMs } from "@snort/shared"; +import { RelayListCacheExpire } from "../const"; +import { BackgroundLoader } from "../background-loader"; +import { parseRelaysFromKind } from "."; + +export class RelayMetadataLoader extends BackgroundLoader { + override name(): string { + return "RelayMetadataLoader"; + } + + override onEvent(e: Readonly): UsersRelays | undefined { + const relays = parseRelaysFromKind(e); + if (!relays) return; + return { + relays: relays, + pubkey: e.pubkey, + created: e.created_at, + loaded: unixNowMs(), + }; + } + + override getExpireCutoff(): number { + return unixNowMs() - RelayListCacheExpire; + } + + protected override buildSub(missing: string[]): RequestBuilder { + const rb = new RequestBuilder("relay-loader"); + rb.withOptions({ + skipDiff: true, + timeout: 10000, + outboxPickN: 4, + }); + rb.withFilter().authors(missing).kinds([EventKind.Relays, EventKind.ContactList]); + return rb; + } + + protected override makePlaceholder(key: string): UsersRelays | undefined { + return { + relays: [], + pubkey: key, + created: 0, + loaded: this.getExpireCutoff() + 300000, + }; + } +} diff --git a/packages/system/src/request-builder.ts b/packages/system/src/request-builder.ts index b71b8ed4..37b1a548 100644 --- a/packages/system/src/request-builder.ts +++ b/packages/system/src/request-builder.ts @@ -5,7 +5,7 @@ import { appendDedupe, dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snor import EventKind from "./event-kind"; import { NostrLink, NostrPrefix, SystemInterface } from "."; import { ReqFilter, u256, HexKey, TaggedNostrEvent } from "./nostr"; -import { AuthorsRelaysCache, splitByWriteRelays, splitFlatByWriteRelays } from "./outbox-model"; +import { RequestRouter } from "./request-router"; /** * Which strategy is used when building REQ filters @@ -133,7 +133,7 @@ export class RequestBuilder { } build(system: SystemInterface): Array { - const expanded = this.#builders.flatMap(a => a.build(system.relayCache, this.#options)); + const expanded = this.#builders.flatMap(a => a.build(system.requestRouter, this.#options)); return this.#groupByRelay(system, expanded); } @@ -147,14 +147,24 @@ export class RequestBuilder { const ts = unixNowMs() - start; this.#log("buildDiff %s %d ms +%d", this.id, ts, diff.length); if (diff.length > 0) { - // todo: fix for explicit relays - return splitFlatByWriteRelays(system.relayCache, diff).map(a => { - return { - strategy: RequestStrategy.AuthorsRelays, - filters: system.optimizer.flatMerge(a.filters), - relay: a.relay, - }; - }); + if (system.requestRouter) { + // todo: fix for explicit relays + return system.requestRouter.forFlatRequest(diff).map(a => { + return { + strategy: RequestStrategy.AuthorsRelays, + filters: system.optimizer.flatMerge(a.filters), + relay: a.relay, + }; + }); + } else { + return [ + { + strategy: RequestStrategy.DefaultRelays, + filters: system.optimizer.flatMerge(diff), + relay: "", + }, + ]; + } } return []; } @@ -294,11 +304,11 @@ export class RequestFilterBuilder { /** * Build/expand this filter into a set of relay specific queries */ - build(relays: AuthorsRelaysCache, options?: RequestBuilderOptions): Array { - return this.#buildFromFilter(relays, this.#filter, options); + build(model?: RequestRouter, options?: RequestBuilderOptions): Array { + return this.#buildFromFilter(this.#filter, model, options); } - #buildFromFilter(relays: AuthorsRelaysCache, f: ReqFilter, options?: RequestBuilderOptions) { + #buildFromFilter(f: ReqFilter, model?: RequestRouter, options?: RequestBuilderOptions) { // use the explicit relay list first if (this.#relays.size > 0) { return [...this.#relays].map(r => { @@ -311,8 +321,8 @@ export class RequestFilterBuilder { } // If any authors are set use the gossip model to fetch data for each author - if (f.authors) { - const split = splitByWriteRelays(relays, f, options?.outboxPickN); + if (f.authors && model) { + const split = model.forRequest(f, options?.outboxPickN); return split.map(a => { return { filters: [a.filter], diff --git a/packages/system/src/request-router.ts b/packages/system/src/request-router.ts new file mode 100644 index 00000000..008b7f58 --- /dev/null +++ b/packages/system/src/request-router.ts @@ -0,0 +1,76 @@ +import { NostrEvent, ReqFilter } from "./nostr"; +import { FlatReqFilter } from "./query-optimizer"; + +export interface RelayTaggedFilter { + relay: string; + filter: ReqFilter; +} + +export interface RelayTaggedFlatFilters { + relay: string; + filters: Array; +} + +export interface RelayTaggedFilters { + relay: string; + filters: Array; +} + +/** + * Request router managed splitting of requests to one or more relays, and which relay to send events to. + */ +export interface RequestRouter { + /** + * Pick relays to send an event to + * @param ev The reply event to send + * @param system Nostr system interface + * @param pickN Number of relays to pick per recipient + * @returns + */ + forReply(ev: NostrEvent, pickN?: number): Promise>; + + /** + * Split a request filter to one or more relays. + * @param filter Filter to split + * @param pickN Number of relays to pick + * @returns + */ + forRequest(filter: ReqFilter, pickN?: number): Array; + + /** + * Split a request filter to one or more relays. + * @param filter Filters to split + * @param pickN Number of relays to pick + * @returns + */ + forFlatRequest(filter: Array, pickN?: number): Array; +} + +export abstract class BaseRequestRouter implements RequestRouter { + abstract forReply(ev: NostrEvent, pickN?: number): Promise>; + abstract forRequest(filter: ReqFilter, pickN?: number): Array; + abstract forFlatRequest(filter: FlatReqFilter[], pickN?: number): Array; + + forAllRequest(filters: Array) { + const allSplit = filters + .map(a => this.forRequest(a)) + .reduce((acc, v) => { + for (const vn of v) { + const existing = acc.get(vn.relay); + if (existing) { + existing.push(vn.filter); + } else { + acc.set(vn.relay, [vn.filter]); + } + } + return acc; + }, new Map>()); + + return [...allSplit.entries()].map(([k, v]) => { + return { + relay: k, + filters: v, + } as RelayTaggedFilters; + }); + } +} diff --git a/packages/system/src/worker/system-worker.ts b/packages/system/src/worker/system-worker.ts index 9a40e768..8fb0dc8d 100644 --- a/packages/system/src/worker/system-worker.ts +++ b/packages/system/src/worker/system-worker.ts @@ -9,7 +9,6 @@ import { SystemInterface, TaggedNostrEvent, CachedMetadata, - DefaultOptimizer, RelayMetadataLoader, RelayMetricCache, RelayMetrics, @@ -17,8 +16,10 @@ import { UserRelaysCache, UsersRelays, QueryLike, + Optimizer, + DefaultOptimizer, } from ".."; -import { NostrSystemEvents, NostrsystemProps } from "../nostr-system"; +import { NostrSystemEvents, SystemConfig } from "../nostr-system"; import { WorkerCommand, WorkerMessage } from "."; import { CachedTable } from "@snort/shared"; import { EventsCache } from "../cache/events"; @@ -31,38 +32,80 @@ export class SystemWorker extends EventEmitter implements Sys #log = debug("SystemWorker"); #worker: Worker; #commandQueue: Map void> = new Map(); - readonly relayCache: CachedTable; - readonly profileCache: CachedTable; - readonly relayMetricsCache: CachedTable; - readonly profileLoader: ProfileLoaderService; - readonly relayMetricsHandler: RelayMetricHandler; - readonly eventsCache: CachedTable; - readonly relayLoader: RelayMetadataLoader; - readonly cacheRelay: CacheRelay | undefined; + #config: SystemConfig; - get checkSigs() { - return true; + /** + * Storage class for user relay lists + */ + get relayCache(): CachedTable { + return this.#config.relays; + } + + /** + * Storage class for user profiles + */ + get profileCache(): CachedTable { + return this.#config.profiles; + } + + /** + * Storage class for relay metrics (connects/disconnects) + */ + get relayMetricsCache(): CachedTable { + return this.#config.relayMetrics; + } + + /** + * Optimizer instance, contains optimized functions for processing data + */ + get optimizer(): Optimizer { + return this.#config.optimizer; + } + + get eventsCache(): CachedTable { + return this.#config.events; + } + + /** + * Check event signatures (recommended) + */ + get checkSigs(): boolean { + return this.#config.checkSigs; } set checkSigs(v: boolean) { - // not used + this.#config.checkSigs = v; } - get optimizer() { - return DefaultOptimizer; + get requestRouter() { + return undefined; + } + + get cacheRelay(): CacheRelay | undefined { + return this.#config.cachingRelay; } get pool() { return {} as ConnectionPool; } - constructor(scriptPath: string, props: NostrsystemProps) { - super(); + readonly relayLoader: RelayMetadataLoader; + readonly profileLoader: ProfileLoaderService; + readonly relayMetricsHandler: RelayMetricHandler; - this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); - this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); - this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); - this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); + constructor(scriptPath: string, props: Partial) { + super(); + this.#config = { + relays: props.relays ?? new UserRelaysCache(props.db?.userRelays), + profiles: props.profiles ?? new UserProfileCache(props.db?.users), + relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics), + events: props.events ?? new EventsCache(props.db?.events), + optimizer: props.optimizer ?? DefaultOptimizer, + checkSigs: props.checkSigs ?? false, + cachingRelay: props.cachingRelay, + db: props.db, + automaticOutboxModel: props.automaticOutboxModel ?? true, + }; this.profileLoader = new ProfileLoaderService(this, this.profileCache); this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);