forked from Kieran/snort
1
0
Fork 0

feat: abstract OutboxModel into RequestRouter

This commit is contained in:
Kieran 2024-02-20 11:28:02 +00:00
parent e5f8bebb53
commit 8b9acd3109
13 changed files with 603 additions and 403 deletions

View File

@ -15,7 +15,8 @@
"printWidth": 120,
"bracketSameLine": true,
"arrowParens": "avoid",
"trailingComma": "all"
"trailingComma": "all",
"endOfLine": "lf"
},
"packageManager": "yarn@3.6.3",
"dependencies": {

View File

@ -1,5 +1,5 @@
import { dedupe } from "@snort/shared";
import { pickTopRelays } from "@snort/system";
import { OutboxModel } from "@snort/system";
import { SnortContext } from "@snort/system-react";
import { ReactNode, useContext, useMemo } from "react";
import { FormattedMessage, FormattedNumber } from "react-intl";
@ -31,7 +31,8 @@ export function FollowsRelayHealth({
}, [hasRelays]);
const topWriteRelays = useMemo(() => {
return pickTopRelays(system.relayCache, uniqueFollows, 1e31, "write");
const outbox = OutboxModel.fromSystem(system);
return outbox.pickTopRelays(uniqueFollows, 1e31, "write");
}, [uniqueFollows]);
return (

View File

@ -1,6 +1,6 @@
{
"name": "@snort/system",
"version": "1.2.8",
"version": "1.2.9",
"description": "Snort nostr system package",
"type": "module",
"main": "dist/index.js",

View File

@ -4,7 +4,6 @@ import EventEmitter from "eventemitter3";
import { Connection, RelaySettings } from "./connection";
import { NostrEvent, OkResponse, TaggedNostrEvent } from "./nostr";
import { pickRelaysForReply } from "./outbox-model";
import { SystemInterface } from ".";
import LRUSet from "@snort/shared/src/LRUSet";
@ -22,7 +21,7 @@ export type ConnectionPool = {
getConnection(id: string): Connection | undefined;
connect(address: string, options: RelaySettings, ephemeral: boolean): Promise<Connection | undefined>;
disconnect(address: string): void;
broadcast(system: SystemInterface, ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]>;
broadcast(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]>;
broadcastTo(address: string, ev: NostrEvent): Promise<OkResponse>;
} & EventEmitter<NostrConnectionPoolEvents> &
Iterable<[string, Connection]>;
@ -126,9 +125,9 @@ export class DefaultConnectionPool extends EventEmitter<NostrConnectionPoolEvent
* Broadcast event to all write relays.
* @remarks Also write event to read relays of those who are `p` tagged in the event (Inbox model)
*/
async broadcast(system: SystemInterface, ev: NostrEvent, cb?: (rsp: OkResponse) => void) {
async broadcast(ev: NostrEvent, cb?: (rsp: OkResponse) => void) {
const writeRelays = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write);
const replyRelays = await pickRelaysForReply(ev, system);
const replyRelays = (await this.#system.requestRouter?.forReply(ev)) ?? [];
const oks = await Promise.all([
...writeRelays.map(async s => {
try {
@ -140,7 +139,7 @@ export class DefaultConnectionPool extends EventEmitter<NostrConnectionPoolEvent
}
return;
}),
...replyRelays.filter(a => !this.#sockets.has(unwrap(sanitizeRelayUrl(a)))).map(a => this.broadcastTo(a, ev)),
...replyRelays?.filter(a => !this.#sockets.has(unwrap(sanitizeRelayUrl(a)))).map(a => this.broadcastTo(a, ev)),
]);
return removeUndefined(oks);
}

View File

@ -2,7 +2,8 @@ import { RelaySettings } from "./connection";
import { RequestBuilder } from "./request-builder";
import { NostrEvent, OkResponse, ReqFilter, TaggedNostrEvent } from "./nostr";
import { ProfileLoaderService } from "./profile-cache";
import { AuthorsRelaysCache, RelayMetadataLoader } from "./outbox-model";
import { AuthorsRelaysCache } from "./outbox";
import { RelayMetadataLoader } from "outbox/relay-loader";
import { Optimizer } from "./query-optimizer";
import { base64 } from "@scure/base";
import { CachedTable } from "@snort/shared";
@ -10,6 +11,7 @@ import { ConnectionPool } from "./connection-pool";
import EventEmitter from "eventemitter3";
import { QueryEvents } from "./query";
import { CacheRelay } from "./cache-relay";
import { RequestRouter } from "./request-router";
export { NostrSystem } from "./nostr-system";
export { default as EventKind } from "./event-kind";
@ -34,7 +36,7 @@ export * from "./pow";
export * from "./pow-util";
export * from "./query-optimizer";
export * from "./encrypted";
export * from "./outbox-model";
export * from "./outbox";
export * from "./impl/nip4";
export * from "./impl/nip44";
@ -155,6 +157,11 @@ export interface SystemInterface {
* Local relay cache service
*/
get cacheRelay(): CacheRelay | undefined;
/**
* Request router instance
*/
get requestRouter(): RequestRouter | undefined;
}
export interface SystemSnapshot {

View File

@ -18,13 +18,15 @@ import {
UsersRelays,
SnortSystemDb,
QueryLike,
OutboxModel,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayMetadataLoader } from "./outbox-model";
import { RelayMetadataLoader } from "./outbox";
import { Optimizer, DefaultOptimizer } from "./query-optimizer";
import { ConnectionPool, DefaultConnectionPool } from "./connection-pool";
import { QueryManager } from "./query-manager";
import { CacheRelay } from "./cache-relay";
import { RequestRouter } from "request-router";
export interface NostrSystemEvents {
change: (state: SystemSnapshot) => void;
@ -33,15 +35,54 @@ export interface NostrSystemEvents {
request: (subId: string, filter: BuiltRawReqFilter) => void;
}
export interface NostrsystemProps {
relayCache?: CachedTable<UsersRelays>;
profileCache?: CachedTable<CachedMetadata>;
relayMetrics?: CachedTable<RelayMetrics>;
eventsCache?: CachedTable<NostrEvent>;
cacheRelay?: CacheRelay;
optimizer?: Optimizer;
export interface SystemConfig {
/**
* Users configured relays (via kind 3 or kind 10_002)
*/
relays: CachedTable<UsersRelays>;
/**
* Cache of user profiles, (kind 0)
*/
profiles: CachedTable<CachedMetadata>;
/**
* Cache of relay connection stats
*/
relayMetrics: CachedTable<RelayMetrics>;
/**
* Direct reference events cache
*/
events: CachedTable<NostrEvent>;
/**
* Optimized cache relay, usually `@snort/worker-relay`
*/
cachingRelay?: CacheRelay;
/**
* Optimized functions, usually `@snort/system-wasm`
*/
optimizer: Optimizer;
/**
* Dexie database storage, usually `@snort/system-web`
*/
db?: SnortSystemDb;
checkSigs?: boolean;
/**
* Check event sigs on receive from relays
*/
checkSigs: boolean;
/**
* Automatically handle outbox model
*
* 1. Fetch relay lists automatically for queried authors
* 2. Write to inbox for all `p` tagged users in broadcasting events
*/
automaticOutboxModel: boolean;
}
/**
@ -50,60 +91,83 @@ export interface NostrsystemProps {
export class NostrSystem extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#log = debug("System");
#queryManager: QueryManager;
#config: SystemConfig;
/**
* Storage class for user relay lists
*/
readonly relayCache: CachedTable<UsersRelays>;
get relayCache(): CachedTable<UsersRelays> {
return this.#config.relays;
}
/**
* Storage class for user profiles
*/
readonly profileCache: CachedTable<CachedMetadata>;
get profileCache(): CachedTable<CachedMetadata> {
return this.#config.profiles;
}
/**
* Storage class for relay metrics (connects/disconnects)
*/
readonly relayMetricsCache: CachedTable<RelayMetrics>;
/**
* Profile loading service
*/
readonly profileLoader: ProfileLoaderService;
/**
* Relay metrics handler cache
*/
readonly relayMetricsHandler: RelayMetricHandler;
get relayMetricsCache(): CachedTable<RelayMetrics> {
return this.#config.relayMetrics;
}
/**
* Optimizer instance, contains optimized functions for processing data
*/
readonly optimizer: Optimizer;
get optimizer(): Optimizer {
return this.#config.optimizer;
}
readonly pool: ConnectionPool;
readonly eventsCache: CachedTable<NostrEvent>;
readonly relayLoader: RelayMetadataLoader;
readonly cacheRelay: CacheRelay | undefined;
get eventsCache(): CachedTable<NostrEvent> {
return this.#config.events;
}
get cacheRelay(): CacheRelay | undefined {
return this.#config.cachingRelay;
}
/**
* Check event signatures (reccomended)
* Check event signatures (recommended)
*/
checkSigs: boolean;
get checkSigs(): boolean {
return this.#config.checkSigs;
}
constructor(props: NostrsystemProps) {
set checkSigs(v: boolean) {
this.#config.checkSigs = v;
}
readonly profileLoader: ProfileLoaderService;
readonly relayMetricsHandler: RelayMetricHandler;
readonly pool: ConnectionPool;
readonly relayLoader: RelayMetadataLoader;
readonly requestRouter: RequestRouter | undefined;
constructor(props: Partial<SystemConfig>) {
super();
this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics);
this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events);
this.optimizer = props.optimizer ?? DefaultOptimizer;
this.cacheRelay = props.cacheRelay;
this.#config = {
relays: props.relays ?? new UserRelaysCache(props.db?.userRelays),
profiles: props.profiles ?? new UserProfileCache(props.db?.users),
relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics),
events: props.events ?? new EventsCache(props.db?.events),
optimizer: props.optimizer ?? DefaultOptimizer,
checkSigs: props.checkSigs ?? false,
cachingRelay: props.cachingRelay,
db: props.db,
automaticOutboxModel: props.automaticOutboxModel ?? true,
};
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);
this.relayLoader = new RelayMetadataLoader(this, this.relayCache);
this.checkSigs = props.checkSigs ?? true;
// if automatic outbox model, setup request router as OutboxModel
if (this.#config.automaticOutboxModel) {
this.requestRouter = OutboxModel.fromSystem(this);
}
this.pool = new DefaultConnectionPool(this);
this.#queryManager = new QueryManager(this);
@ -196,7 +260,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]> {
this.HandleEvent("*", { ...ev, relays: [] });
return await this.pool.broadcast(this, ev, cb);
return await this.pool.broadcast(ev, cb);
}
async WriteOnceToRelay(address: string, ev: NostrEvent): Promise<OkResponse> {

View File

@ -1,318 +0,0 @@
import {
EventKind,
FullRelaySettings,
NostrEvent,
ReqFilter,
RequestBuilder,
SystemInterface,
TaggedNostrEvent,
UsersRelays,
} from ".";
import { dedupe, removeUndefined, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared";
import debug from "debug";
import { FlatReqFilter } from "./query-optimizer";
import { RelayListCacheExpire } from "./const";
import { BackgroundLoader } from "./background-loader";
const DefaultPickNRelays = 2;
export interface RelayTaggedFilter {
relay: string;
filter: ReqFilter;
}
export interface RelayTaggedFlatFilters {
relay: string;
filters: Array<FlatReqFilter>;
}
export interface RelayTaggedFilters {
relay: string;
filters: Array<ReqFilter>;
}
const logger = debug("OutboxModel");
export interface AuthorsRelaysCache {
getFromCache(pubkey?: string): UsersRelays | undefined;
update(obj: UsersRelays): Promise<"new" | "updated" | "refresh" | "no_change">;
buffer(keys: Array<string>): Promise<Array<string>>;
bulkSet(objs: Array<UsersRelays>): Promise<void>;
}
export function splitAllByWriteRelays(cache: AuthorsRelaysCache, filters: Array<ReqFilter>) {
const allSplit = filters
.map(a => splitByWriteRelays(cache, a))
.reduce((acc, v) => {
for (const vn of v) {
const existing = acc.get(vn.relay);
if (existing) {
existing.push(vn.filter);
} else {
acc.set(vn.relay, [vn.filter]);
}
}
return acc;
}, new Map<string, Array<ReqFilter>>());
return [...allSplit.entries()].map(([k, v]) => {
return {
relay: k,
filters: v,
} as RelayTaggedFilters;
});
}
/**
* Split filters by authors
*/
export function splitByWriteRelays(
cache: AuthorsRelaysCache,
filter: ReqFilter,
pickN?: number,
): Array<RelayTaggedFilter> {
const authors = filter.authors;
if ((authors?.length ?? 0) === 0) {
return [
{
relay: "",
filter,
},
];
}
const topRelays = pickTopRelays(cache, unwrap(authors), pickN ?? DefaultPickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
const keysOnPickedRelay = dedupe(topRelays.filter(b => b.relays.includes(a)).map(b => b.key));
return {
relay: a,
filter: {
...filter,
authors: keysOnPickedRelay,
},
} as RelayTaggedFilter;
});
const noRelays = dedupe(topRelays.filter(a => a.relays.length === 0).map(a => a.key));
if (noRelays.length > 0) {
picked.push({
relay: "",
filter: {
...filter,
authors: noRelays,
},
});
}
logger("Picked %O => %O", filter, picked);
return picked;
}
/**
* Split filters by author
*/
export function splitFlatByWriteRelays(
cache: AuthorsRelaysCache,
input: Array<FlatReqFilter>,
pickN?: number,
): Array<RelayTaggedFlatFilters> {
const authors = input.filter(a => a.authors).map(a => unwrap(a.authors));
if (authors.length === 0) {
return [
{
relay: "",
filters: input,
},
];
}
const topRelays = pickTopRelays(cache, authors, pickN ?? DefaultPickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
const authorsOnRelay = new Set(topRelays.filter(v => v.relays.includes(a)).map(v => v.key));
return {
relay: a,
filters: input.filter(v => v.authors && authorsOnRelay.has(v.authors)),
} as RelayTaggedFlatFilters;
});
const noRelays = new Set(topRelays.filter(v => v.relays.length === 0).map(v => v.key));
if (noRelays.size > 0) {
picked.push({
relay: "",
filters: input.filter(v => !v.authors || noRelays.has(v.authors)),
} as RelayTaggedFlatFilters);
}
logger("Picked %d relays from %d filters", picked.length, input.length);
return picked;
}
/**
* Pick most popular relays for each authors
*/
export function pickTopRelays(cache: AuthorsRelaysCache, authors: Array<string>, n: number, type: "write" | "read") {
// map of pubkey -> [write relays]
const allRelays = authors.map(a => {
return {
key: a,
relays: cache
.getFromCache(a)
?.relays?.filter(a => (type === "write" ? a.settings.write : a.settings.read))
.sort(() => (Math.random() < 0.5 ? 1 : -1)),
};
});
const missing = allRelays.filter(a => a.relays === undefined || a.relays.length === 0);
const hasRelays = allRelays.filter(a => a.relays !== undefined && a.relays.length > 0);
// map of relay -> [pubkeys]
const relayUserMap = hasRelays.reduce((acc, v) => {
for (const r of unwrap(v.relays)) {
if (!acc.has(r.url)) {
acc.set(r.url, new Set([v.key]));
} else {
unwrap(acc.get(r.url)).add(v.key);
}
}
return acc;
}, new Map<string, Set<string>>());
// selection algo will just pick relays with the most users
const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size);
// <relay, key[]> - count keys per relay
// <key, relay[]> - pick n top relays
// <relay, key[]> - map keys per relay (for subscription filter)
return hasRelays
.map(k => {
// pick top N relays for this key
const relaysForKey = topRelays
.filter(([, v]) => v.has(k.key))
.slice(0, n)
.map(([k]) => k);
return { key: k.key, relays: relaysForKey };
})
.concat(
missing.map(a => {
return {
key: a.key,
relays: [],
};
}),
);
}
/**
* Pick read relays for sending reply events
*/
export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface, pickN?: number) {
const recipients = dedupe(ev.tags.filter(a => a[0] === "p").map(a => a[1]));
await updateRelayLists(recipients, system);
const relays = pickTopRelays(system.relayCache, recipients, pickN ?? DefaultPickNRelays, "read");
const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat()));
logger("Picked %O from authors %O", ret, recipients);
return ret;
}
export function parseRelayTag(tag: Array<string>) {
return {
url: sanitizeRelayUrl(tag[1]),
settings: {
read: tag[2] === "read" || tag[2] === undefined,
write: tag[2] === "write" || tag[2] === undefined,
},
} as FullRelaySettings;
}
export function parseRelayTags(tag: Array<Array<string>>) {
return tag.map(parseRelayTag).filter(a => a !== null);
}
export function parseRelaysFromKind(ev: NostrEvent) {
if (ev.kind === EventKind.ContactList) {
const relaysInContent =
ev.content.length > 0 ? (JSON.parse(ev.content) as Record<string, { read: boolean; write: boolean }>) : undefined;
if (relaysInContent) {
return Object.entries(relaysInContent).map(
([k, v]) =>
({
url: sanitizeRelayUrl(k),
settings: {
read: v.read,
write: v.write,
},
}) as FullRelaySettings,
);
}
} else if (ev.kind === EventKind.Relays) {
return parseRelayTags(ev.tags);
}
}
export async function updateRelayLists(authors: Array<string>, system: SystemInterface) {
await system.relayCache.buffer(authors);
const expire = unixNowMs() - RelayListCacheExpire;
const expired = authors.filter(a => (system.relayCache.getFromCache(a)?.loaded ?? 0) < expire);
if (expired.length > 0) {
logger("Updating relays for authors: %O", expired);
const rb = new RequestBuilder("system-update-relays-for-outbox");
rb.withFilter().authors(expired).kinds([EventKind.Relays, EventKind.ContactList]);
const relayLists = await system.Fetch(rb);
await system.relayCache.bulkSet(
removeUndefined(
relayLists.map(a => {
const relays = parseRelaysFromKind(a);
if (!relays) return;
return {
relays: relays,
pubkey: a.pubkey,
created: a.created_at,
loaded: unixNowMs(),
};
}),
),
);
}
}
export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
override name(): string {
return "RelayMetadataLoader";
}
override onEvent(e: Readonly<TaggedNostrEvent>): UsersRelays | undefined {
const relays = parseRelaysFromKind(e);
if (!relays) return;
return {
relays: relays,
pubkey: e.pubkey,
created: e.created_at,
loaded: unixNowMs(),
};
}
override getExpireCutoff(): number {
return unixNowMs() - RelayListCacheExpire;
}
protected override buildSub(missing: string[]): RequestBuilder {
const rb = new RequestBuilder("relay-loader");
rb.withOptions({
skipDiff: true,
timeout: 10_000,
outboxPickN: 4,
});
rb.withFilter().authors(missing).kinds([EventKind.Relays, EventKind.ContactList]);
return rb;
}
protected override makePlaceholder(key: string): UsersRelays | undefined {
return {
relays: [],
pubkey: key,
created: 0,
loaded: this.getExpireCutoff() + 300_000,
};
}
}

View File

@ -0,0 +1,58 @@
import { EventKind, FullRelaySettings, NostrEvent, SystemInterface, UsersRelays } from "..";
import { sanitizeRelayUrl } from "@snort/shared";
export const DefaultPickNRelays = 2;
export interface AuthorsRelaysCache {
getFromCache(pubkey?: string): UsersRelays | undefined;
update(obj: UsersRelays): Promise<"new" | "updated" | "refresh" | "no_change">;
buffer(keys: Array<string>): Promise<Array<string>>;
bulkSet(objs: Array<UsersRelays>): Promise<void>;
}
export interface PickedRelays {
key: string;
relays: Array<string>;
}
export type EventFetcher = {
Fetch: SystemInterface["Fetch"];
};
export function parseRelayTag(tag: Array<string>) {
return {
url: sanitizeRelayUrl(tag[1]),
settings: {
read: tag[2] === "read" || tag[2] === undefined,
write: tag[2] === "write" || tag[2] === undefined,
},
} as FullRelaySettings;
}
export function parseRelayTags(tag: Array<Array<string>>) {
return tag.map(parseRelayTag).filter(a => a !== null);
}
export function parseRelaysFromKind(ev: NostrEvent) {
if (ev.kind === EventKind.ContactList) {
const relaysInContent =
ev.content.length > 0 ? (JSON.parse(ev.content) as Record<string, { read: boolean; write: boolean }>) : undefined;
if (relaysInContent) {
return Object.entries(relaysInContent).map(
([k, v]) =>
({
url: sanitizeRelayUrl(k),
settings: {
read: v.read,
write: v.write,
},
}) as FullRelaySettings,
);
}
} else if (ev.kind === EventKind.Relays) {
return parseRelayTags(ev.tags);
}
}
export * from "./outbox-model";
export * from "./relay-loader";

View File

@ -0,0 +1,213 @@
import { EventKind, NostrEvent, ReqFilter, RequestBuilder, SystemInterface } from "..";
import { dedupe, removeUndefined, unixNowMs, unwrap } from "@snort/shared";
import { FlatReqFilter } from "../query-optimizer";
import { RelayListCacheExpire } from "../const";
import { AuthorsRelaysCache, EventFetcher, PickedRelays, DefaultPickNRelays, parseRelaysFromKind } from ".";
import debug from "debug";
import { BaseRequestRouter, RelayTaggedFilter, RelayTaggedFlatFilters } from "../request-router";
/**
* Simple outbox model using most popular relays
*/
export class OutboxModel extends BaseRequestRouter {
#log = debug("OutboxModel");
#relays: AuthorsRelaysCache;
#fetcher: EventFetcher;
constructor(relays: AuthorsRelaysCache, fetcher: EventFetcher) {
super();
this.#relays = relays;
this.#fetcher = fetcher;
}
static fromSystem(system: SystemInterface) {
return new OutboxModel(system.relayCache, system);
}
/**
* Pick top relays for each user
* @param authors The authors whos relays will be picked
* @param pickN Number of relays to pick per pubkey
* @param type Read/Write relays
* @returns
*/
pickTopRelays(authors: Array<string>, pickN: number, type: "write" | "read"): Array<PickedRelays> {
// map of pubkey -> [write relays]
const allRelays = authors.map(a => {
return {
key: a,
relays: this.#relays
.getFromCache(a)
?.relays?.filter(a => (type === "write" ? a.settings.write : a.settings.read))
.sort(() => (Math.random() < 0.5 ? 1 : -1)),
};
});
const missing = allRelays.filter(a => a.relays === undefined || a.relays.length === 0);
const hasRelays = allRelays.filter(a => a.relays !== undefined && a.relays.length > 0);
// map of relay -> [pubkeys]
const relayUserMap = hasRelays.reduce((acc, v) => {
for (const r of unwrap(v.relays)) {
if (!acc.has(r.url)) {
acc.set(r.url, new Set([v.key]));
} else {
unwrap(acc.get(r.url)).add(v.key);
}
}
return acc;
}, new Map<string, Set<string>>());
// selection algo will just pick relays with the most users
const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size);
// <relay, key[]> - count keys per relay
// <key, relay[]> - pick n top relays
// <relay, key[]> - map keys per relay (for subscription filter)
return hasRelays
.map(k => {
// pick top N relays for this key
const relaysForKey = topRelays
.filter(([, v]) => v.has(k.key))
.slice(0, pickN)
.map(([k]) => k);
return { key: k.key, relays: relaysForKey };
})
.concat(
missing.map(a => {
return {
key: a.key,
relays: [],
};
}),
);
}
/**
* Split a request filter by authors
* @param filter Filter to split
* @param pickN Number of relays to pick per author
* @returns
*/
forRequest(filter: ReqFilter, pickN?: number): Array<RelayTaggedFilter> {
const authors = filter.authors;
if ((authors?.length ?? 0) === 0) {
return [
{
relay: "",
filter,
},
];
}
const topRelays = this.pickTopRelays(unwrap(authors), pickN ?? DefaultPickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
const keysOnPickedRelay = dedupe(topRelays.filter(b => b.relays.includes(a)).map(b => b.key));
return {
relay: a,
filter: {
...filter,
authors: keysOnPickedRelay,
},
} as RelayTaggedFilter;
});
const noRelays = dedupe(topRelays.filter(a => a.relays.length === 0).map(a => a.key));
if (noRelays.length > 0) {
picked.push({
relay: "",
filter: {
...filter,
authors: noRelays,
},
});
}
this.#log("Picked %O => %O", filter, picked);
return picked;
}
/**
* Split a flat request filter by authors
* @param filter Filter to split
* @param pickN Number of relays to pick per author
* @returns
*/
forFlatRequest(input: Array<FlatReqFilter>, pickN?: number): Array<RelayTaggedFlatFilters> {
const authors = input.filter(a => a.authors).map(a => unwrap(a.authors));
if (authors.length === 0) {
return [
{
relay: "",
filters: input,
},
];
}
const topRelays = this.pickTopRelays(authors, pickN ?? DefaultPickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
const authorsOnRelay = new Set(topRelays.filter(v => v.relays.includes(a)).map(v => v.key));
return {
relay: a,
filters: input.filter(v => v.authors && authorsOnRelay.has(v.authors)),
} as RelayTaggedFlatFilters;
});
const noRelays = new Set(topRelays.filter(v => v.relays.length === 0).map(v => v.key));
if (noRelays.size > 0) {
picked.push({
relay: "",
filters: input.filter(v => !v.authors || noRelays.has(v.authors)),
} as RelayTaggedFlatFilters);
}
this.#log("Picked %d relays from %d filters", picked.length, input.length);
return picked;
}
/**
* Pick relay inboxs for replies
* @param ev The reply event to send
* @param system Nostr system interface
* @param pickN Number of relays to pick per recipient
* @returns
*/
async forReply(ev: NostrEvent, pickN?: number) {
const recipients = dedupe([ev.pubkey, ...ev.tags.filter(a => a[0] === "p").map(a => a[1])]);
await this.updateRelayLists(recipients);
const relays = this.pickTopRelays(recipients, pickN ?? DefaultPickNRelays, "read");
const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat()));
this.#log("Picked %O from authors %O", ret, recipients);
return ret;
}
/**
* Update relay cache with latest relay lists
* @param authors The authors to update relay lists for
*/
async updateRelayLists(authors: Array<string>) {
await this.#relays.buffer(authors);
const expire = unixNowMs() - RelayListCacheExpire;
const expired = authors.filter(a => (this.#relays.getFromCache(a)?.loaded ?? 0) < expire);
if (expired.length > 0) {
this.#log("Updating relays for authors: %O", expired);
const rb = new RequestBuilder("system-update-relays-for-outbox");
rb.withFilter().authors(expired).kinds([EventKind.Relays, EventKind.ContactList]);
const relayLists = await this.#fetcher.Fetch(rb);
await this.#relays.bulkSet(
removeUndefined(
relayLists.map(a => {
const relays = parseRelaysFromKind(a);
if (!relays) return;
return {
relays: relays,
pubkey: a.pubkey,
created: a.created_at,
loaded: unixNowMs(),
};
}),
),
);
}
}
}

View File

@ -0,0 +1,46 @@
import { EventKind, RequestBuilder, TaggedNostrEvent, UsersRelays } from "..";
import { unixNowMs } from "@snort/shared";
import { RelayListCacheExpire } from "../const";
import { BackgroundLoader } from "../background-loader";
import { parseRelaysFromKind } from ".";
export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
override name(): string {
return "RelayMetadataLoader";
}
override onEvent(e: Readonly<TaggedNostrEvent>): UsersRelays | undefined {
const relays = parseRelaysFromKind(e);
if (!relays) return;
return {
relays: relays,
pubkey: e.pubkey,
created: e.created_at,
loaded: unixNowMs(),
};
}
override getExpireCutoff(): number {
return unixNowMs() - RelayListCacheExpire;
}
protected override buildSub(missing: string[]): RequestBuilder {
const rb = new RequestBuilder("relay-loader");
rb.withOptions({
skipDiff: true,
timeout: 10000,
outboxPickN: 4,
});
rb.withFilter().authors(missing).kinds([EventKind.Relays, EventKind.ContactList]);
return rb;
}
protected override makePlaceholder(key: string): UsersRelays | undefined {
return {
relays: [],
pubkey: key,
created: 0,
loaded: this.getExpireCutoff() + 300000,
};
}
}

View File

@ -5,7 +5,7 @@ import { appendDedupe, dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snor
import EventKind from "./event-kind";
import { NostrLink, NostrPrefix, SystemInterface } from ".";
import { ReqFilter, u256, HexKey, TaggedNostrEvent } from "./nostr";
import { AuthorsRelaysCache, splitByWriteRelays, splitFlatByWriteRelays } from "./outbox-model";
import { RequestRouter } from "./request-router";
/**
* Which strategy is used when building REQ filters
@ -133,7 +133,7 @@ export class RequestBuilder {
}
build(system: SystemInterface): Array<BuiltRawReqFilter> {
const expanded = this.#builders.flatMap(a => a.build(system.relayCache, this.#options));
const expanded = this.#builders.flatMap(a => a.build(system.requestRouter, this.#options));
return this.#groupByRelay(system, expanded);
}
@ -147,14 +147,24 @@ export class RequestBuilder {
const ts = unixNowMs() - start;
this.#log("buildDiff %s %d ms +%d", this.id, ts, diff.length);
if (diff.length > 0) {
// todo: fix for explicit relays
return splitFlatByWriteRelays(system.relayCache, diff).map(a => {
return {
strategy: RequestStrategy.AuthorsRelays,
filters: system.optimizer.flatMerge(a.filters),
relay: a.relay,
};
});
if (system.requestRouter) {
// todo: fix for explicit relays
return system.requestRouter.forFlatRequest(diff).map(a => {
return {
strategy: RequestStrategy.AuthorsRelays,
filters: system.optimizer.flatMerge(a.filters),
relay: a.relay,
};
});
} else {
return [
{
strategy: RequestStrategy.DefaultRelays,
filters: system.optimizer.flatMerge(diff),
relay: "",
},
];
}
}
return [];
}
@ -294,11 +304,11 @@ export class RequestFilterBuilder {
/**
* Build/expand this filter into a set of relay specific queries
*/
build(relays: AuthorsRelaysCache, options?: RequestBuilderOptions): Array<BuiltRawReqFilter> {
return this.#buildFromFilter(relays, this.#filter, options);
build(model?: RequestRouter, options?: RequestBuilderOptions): Array<BuiltRawReqFilter> {
return this.#buildFromFilter(this.#filter, model, options);
}
#buildFromFilter(relays: AuthorsRelaysCache, f: ReqFilter, options?: RequestBuilderOptions) {
#buildFromFilter(f: ReqFilter, model?: RequestRouter, options?: RequestBuilderOptions) {
// use the explicit relay list first
if (this.#relays.size > 0) {
return [...this.#relays].map(r => {
@ -311,8 +321,8 @@ export class RequestFilterBuilder {
}
// If any authors are set use the gossip model to fetch data for each author
if (f.authors) {
const split = splitByWriteRelays(relays, f, options?.outboxPickN);
if (f.authors && model) {
const split = model.forRequest(f, options?.outboxPickN);
return split.map(a => {
return {
filters: [a.filter],

View File

@ -0,0 +1,76 @@
import { NostrEvent, ReqFilter } from "./nostr";
import { FlatReqFilter } from "./query-optimizer";
export interface RelayTaggedFilter {
relay: string;
filter: ReqFilter;
}
export interface RelayTaggedFlatFilters {
relay: string;
filters: Array<FlatReqFilter>;
}
export interface RelayTaggedFilters {
relay: string;
filters: Array<ReqFilter>;
}
/**
* Request router managed splitting of requests to one or more relays, and which relay to send events to.
*/
export interface RequestRouter {
/**
* Pick relays to send an event to
* @param ev The reply event to send
* @param system Nostr system interface
* @param pickN Number of relays to pick per recipient
* @returns
*/
forReply(ev: NostrEvent, pickN?: number): Promise<Array<string>>;
/**
* Split a request filter to one or more relays.
* @param filter Filter to split
* @param pickN Number of relays to pick
* @returns
*/
forRequest(filter: ReqFilter, pickN?: number): Array<RelayTaggedFilter>;
/**
* Split a request filter to one or more relays.
* @param filter Filters to split
* @param pickN Number of relays to pick
* @returns
*/
forFlatRequest(filter: Array<FlatReqFilter>, pickN?: number): Array<RelayTaggedFlatFilters>;
}
export abstract class BaseRequestRouter implements RequestRouter {
abstract forReply(ev: NostrEvent, pickN?: number): Promise<Array<string>>;
abstract forRequest(filter: ReqFilter, pickN?: number): Array<RelayTaggedFilter>;
abstract forFlatRequest(filter: FlatReqFilter[], pickN?: number): Array<RelayTaggedFlatFilters>;
forAllRequest(filters: Array<ReqFilter>) {
const allSplit = filters
.map(a => this.forRequest(a))
.reduce((acc, v) => {
for (const vn of v) {
const existing = acc.get(vn.relay);
if (existing) {
existing.push(vn.filter);
} else {
acc.set(vn.relay, [vn.filter]);
}
}
return acc;
}, new Map<string, Array<ReqFilter>>());
return [...allSplit.entries()].map(([k, v]) => {
return {
relay: k,
filters: v,
} as RelayTaggedFilters;
});
}
}

View File

@ -9,7 +9,6 @@ import {
SystemInterface,
TaggedNostrEvent,
CachedMetadata,
DefaultOptimizer,
RelayMetadataLoader,
RelayMetricCache,
RelayMetrics,
@ -17,8 +16,10 @@ import {
UserRelaysCache,
UsersRelays,
QueryLike,
Optimizer,
DefaultOptimizer,
} from "..";
import { NostrSystemEvents, NostrsystemProps } from "../nostr-system";
import { NostrSystemEvents, SystemConfig } from "../nostr-system";
import { WorkerCommand, WorkerMessage } from ".";
import { CachedTable } from "@snort/shared";
import { EventsCache } from "../cache/events";
@ -31,38 +32,80 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
#log = debug("SystemWorker");
#worker: Worker;
#commandQueue: Map<string, (v: unknown) => void> = new Map();
readonly relayCache: CachedTable<UsersRelays>;
readonly profileCache: CachedTable<CachedMetadata>;
readonly relayMetricsCache: CachedTable<RelayMetrics>;
readonly profileLoader: ProfileLoaderService;
readonly relayMetricsHandler: RelayMetricHandler;
readonly eventsCache: CachedTable<NostrEvent>;
readonly relayLoader: RelayMetadataLoader;
readonly cacheRelay: CacheRelay | undefined;
#config: SystemConfig;
get checkSigs() {
return true;
/**
* Storage class for user relay lists
*/
get relayCache(): CachedTable<UsersRelays> {
return this.#config.relays;
}
/**
* Storage class for user profiles
*/
get profileCache(): CachedTable<CachedMetadata> {
return this.#config.profiles;
}
/**
* Storage class for relay metrics (connects/disconnects)
*/
get relayMetricsCache(): CachedTable<RelayMetrics> {
return this.#config.relayMetrics;
}
/**
* Optimizer instance, contains optimized functions for processing data
*/
get optimizer(): Optimizer {
return this.#config.optimizer;
}
get eventsCache(): CachedTable<NostrEvent> {
return this.#config.events;
}
/**
* Check event signatures (recommended)
*/
get checkSigs(): boolean {
return this.#config.checkSigs;
}
set checkSigs(v: boolean) {
// not used
this.#config.checkSigs = v;
}
get optimizer() {
return DefaultOptimizer;
get requestRouter() {
return undefined;
}
get cacheRelay(): CacheRelay | undefined {
return this.#config.cachingRelay;
}
get pool() {
return {} as ConnectionPool;
}
constructor(scriptPath: string, props: NostrsystemProps) {
super();
readonly relayLoader: RelayMetadataLoader;
readonly profileLoader: ProfileLoaderService;
readonly relayMetricsHandler: RelayMetricHandler;
this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics);
this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events);
constructor(scriptPath: string, props: Partial<SystemConfig>) {
super();
this.#config = {
relays: props.relays ?? new UserRelaysCache(props.db?.userRelays),
profiles: props.profiles ?? new UserProfileCache(props.db?.users),
relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics),
events: props.events ?? new EventsCache(props.db?.events),
optimizer: props.optimizer ?? DefaultOptimizer,
checkSigs: props.checkSigs ?? false,
cachingRelay: props.cachingRelay,
db: props.db,
automaticOutboxModel: props.automaticOutboxModel ?? true,
};
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);