feat: tools pages
Various other fixes: - Better handeling of limit/since/before merging - Expose timeout through request builder - Expose PickN through request builder - Fix tests
This commit is contained in:
@ -300,7 +300,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
|
||||
const store = new type();
|
||||
|
||||
const filters = req.build(this);
|
||||
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen);
|
||||
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen, req.options?.timeout);
|
||||
q.on("trace", r => this.#relayMetrics.onTraceReport(r));
|
||||
|
||||
if (filters.some(a => a.filters.some(b => b.ids))) {
|
||||
|
@ -14,7 +14,7 @@ import { FlatReqFilter } from "./query-optimizer";
|
||||
import { RelayListCacheExpire } from "./const";
|
||||
import { BackgroundLoader } from "./background-loader";
|
||||
|
||||
const PickNRelays = 2;
|
||||
const DefaultPickNRelays = 2;
|
||||
|
||||
export interface RelayTaggedFilter {
|
||||
relay: string;
|
||||
@ -66,7 +66,7 @@ export function splitAllByWriteRelays(cache: RelayCache, filters: Array<ReqFilte
|
||||
/**
|
||||
* Split filters by authors
|
||||
*/
|
||||
export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter): Array<RelayTaggedFilter> {
|
||||
export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter, pickN?: number): Array<RelayTaggedFilter> {
|
||||
const authors = filter.authors;
|
||||
if ((authors?.length ?? 0) === 0) {
|
||||
return [
|
||||
@ -77,7 +77,7 @@ export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter): Array<
|
||||
];
|
||||
}
|
||||
|
||||
const topRelays = pickTopRelays(cache, unwrap(authors), PickNRelays, "write");
|
||||
const topRelays = pickTopRelays(cache, unwrap(authors), pickN ?? DefaultPickNRelays, "write");
|
||||
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
|
||||
|
||||
const picked = pickedRelays.map(a => {
|
||||
@ -107,7 +107,11 @@ export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter): Array<
|
||||
/**
|
||||
* Split filters by author
|
||||
*/
|
||||
export function splitFlatByWriteRelays(cache: RelayCache, input: Array<FlatReqFilter>): Array<RelayTaggedFlatFilters> {
|
||||
export function splitFlatByWriteRelays(
|
||||
cache: RelayCache,
|
||||
input: Array<FlatReqFilter>,
|
||||
pickN?: number,
|
||||
): Array<RelayTaggedFlatFilters> {
|
||||
const authors = input.filter(a => a.authors).map(a => unwrap(a.authors));
|
||||
if (authors.length === 0) {
|
||||
return [
|
||||
@ -117,7 +121,7 @@ export function splitFlatByWriteRelays(cache: RelayCache, input: Array<FlatReqFi
|
||||
},
|
||||
];
|
||||
}
|
||||
const topRelays = pickTopRelays(cache, authors, PickNRelays, "write");
|
||||
const topRelays = pickTopRelays(cache, authors, pickN ?? DefaultPickNRelays, "write");
|
||||
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
|
||||
|
||||
const picked = pickedRelays.map(a => {
|
||||
@ -142,7 +146,7 @@ export function splitFlatByWriteRelays(cache: RelayCache, input: Array<FlatReqFi
|
||||
/**
|
||||
* Pick most popular relays for each authors
|
||||
*/
|
||||
function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number, type: "write" | "read") {
|
||||
export function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number, type: "write" | "read") {
|
||||
// map of pubkey -> [write relays]
|
||||
const allRelays = authors.map(a => {
|
||||
return {
|
||||
@ -198,10 +202,10 @@ function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number, typ
|
||||
/**
|
||||
* Pick read relays for sending reply events
|
||||
*/
|
||||
export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface) {
|
||||
export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface, pickN?: number) {
|
||||
const recipients = dedupe(ev.tags.filter(a => a[0] === "p").map(a => a[1]));
|
||||
await updateRelayLists(recipients, system);
|
||||
const relays = pickTopRelays(system.RelayCache, recipients, 2, "read");
|
||||
const relays = pickTopRelays(system.RelayCache, recipients, pickN ?? DefaultPickNRelays, "read");
|
||||
const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat()));
|
||||
logger("Picked %O from authors %O", ret, recipients);
|
||||
return ret;
|
||||
@ -221,6 +225,27 @@ export function parseRelayTags(tag: Array<Array<string>>) {
|
||||
return tag.map(parseRelayTag).filter(a => a !== null);
|
||||
}
|
||||
|
||||
export function parseRelaysFromKind(ev: NostrEvent) {
|
||||
if (ev.kind === EventKind.ContactList) {
|
||||
const relaysInContent =
|
||||
ev.content.length > 0 ? (JSON.parse(ev.content) as Record<string, { read: boolean; write: boolean }>) : undefined;
|
||||
if (relaysInContent) {
|
||||
return Object.entries(relaysInContent).map(
|
||||
([k, v]) =>
|
||||
({
|
||||
url: sanitizeRelayUrl(k),
|
||||
settings: {
|
||||
read: v.read,
|
||||
write: v.write,
|
||||
},
|
||||
}) as FullRelaySettings,
|
||||
);
|
||||
}
|
||||
} else if (ev.kind === EventKind.Relays) {
|
||||
return parseRelayTags(ev.tags);
|
||||
}
|
||||
}
|
||||
|
||||
export async function updateRelayLists(authors: Array<string>, system: SystemInterface) {
|
||||
await system.RelayCache.buffer(authors);
|
||||
const expire = unixNowMs() - RelayListCacheExpire;
|
||||
@ -228,15 +253,21 @@ export async function updateRelayLists(authors: Array<string>, system: SystemInt
|
||||
if (expired.length > 0) {
|
||||
logger("Updating relays for authors: %O", expired);
|
||||
const rb = new RequestBuilder("system-update-relays-for-outbox");
|
||||
rb.withFilter().authors(expired).kinds([EventKind.Relays]);
|
||||
rb.withFilter().authors(expired).kinds([EventKind.Relays, EventKind.ContactList]);
|
||||
const relayLists = await system.Fetch(rb);
|
||||
await system.RelayCache.bulkSet(
|
||||
relayLists.map(a => ({
|
||||
relays: parseRelayTags(a.tags),
|
||||
pubkey: a.pubkey,
|
||||
created: a.created_at,
|
||||
loaded: unixNowMs(),
|
||||
})),
|
||||
removeUndefined(
|
||||
relayLists.map(a => {
|
||||
const relays = parseRelaysFromKind(a);
|
||||
if (!relays) return;
|
||||
return {
|
||||
relays: relays,
|
||||
pubkey: a.pubkey,
|
||||
created: a.created_at,
|
||||
loaded: unixNowMs(),
|
||||
};
|
||||
}),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -247,8 +278,10 @@ export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
|
||||
}
|
||||
|
||||
override onEvent(e: Readonly<TaggedNostrEvent>): UsersRelays | undefined {
|
||||
const relays = parseRelaysFromKind(e);
|
||||
if (!relays) return;
|
||||
return {
|
||||
relays: parseRelayTags(e.tags),
|
||||
relays: relays,
|
||||
pubkey: e.pubkey,
|
||||
created: e.created_at,
|
||||
loaded: unixNowMs(),
|
||||
@ -261,8 +294,12 @@ export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
|
||||
|
||||
protected override buildSub(missing: string[]): RequestBuilder {
|
||||
const rb = new RequestBuilder("relay-loader");
|
||||
rb.withOptions({ skipDiff: true });
|
||||
rb.withFilter().authors(missing).kinds([EventKind.Relays]);
|
||||
rb.withOptions({
|
||||
skipDiff: true,
|
||||
timeout: 10_000,
|
||||
outboxPickN: 4,
|
||||
});
|
||||
rb.withFilter().authors(missing).kinds([EventKind.Relays, EventKind.ContactList]);
|
||||
return rb;
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ export interface FlatReqFilter {
|
||||
since?: number;
|
||||
until?: number;
|
||||
limit?: number;
|
||||
resultSetId: string;
|
||||
}
|
||||
|
||||
export interface QueryOptimizer {
|
||||
|
@ -1,3 +1,4 @@
|
||||
import { sha256 } from "@snort/shared";
|
||||
import { FlatReqFilter } from ".";
|
||||
import { ReqFilter } from "../nostr";
|
||||
|
||||
@ -7,29 +8,52 @@ import { ReqFilter } from "../nostr";
|
||||
export function expandFilter(f: ReqFilter): Array<FlatReqFilter> {
|
||||
const ret: Array<FlatReqFilter> = [];
|
||||
const src = Object.entries(f);
|
||||
const keys = src.filter(([, v]) => Array.isArray(v)).map(a => a[0]);
|
||||
const props = src.filter(([, v]) => !Array.isArray(v));
|
||||
|
||||
function generateCombinations(index: number, currentCombination: FlatReqFilter) {
|
||||
if (index === keys.length) {
|
||||
ret.push(currentCombination);
|
||||
const id = resultSetId(f);
|
||||
|
||||
// Filter entries that are arrays and keep the rest as is
|
||||
const arrays: [string, Array<string> | Array<number>][] = src.filter(([, value]) => Array.isArray(value)) as [
|
||||
string,
|
||||
Array<string> | Array<number>,
|
||||
][];
|
||||
const constants = Object.fromEntries(src.filter(([, value]) => !Array.isArray(value))) as {
|
||||
[key: string]: string | number | undefined;
|
||||
};
|
||||
|
||||
// Recursive function to compute cartesian product
|
||||
function cartesianProduct(arr: [string, Array<string> | Array<number>][], temp: [string, any][] = []) {
|
||||
if (arr.length === 0) {
|
||||
ret.push(createFilterObject(temp, constants, id));
|
||||
return;
|
||||
}
|
||||
|
||||
const key = keys[index];
|
||||
const values = (f as Record<string, Array<string | number>>)[key];
|
||||
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
const value = values[i];
|
||||
const updatedCombination = { ...currentCombination, [key]: value };
|
||||
generateCombinations(index + 1, updatedCombination);
|
||||
for (let i = 0; i < arr[0][1].length; i++) {
|
||||
cartesianProduct(arr.slice(1), temp.concat([[arr[0][0], arr[0][1][i]]]));
|
||||
}
|
||||
}
|
||||
|
||||
generateCombinations(0, {
|
||||
keys: keys.length,
|
||||
...Object.fromEntries(props),
|
||||
});
|
||||
// Create filter object from the combination
|
||||
function createFilterObject(
|
||||
combination: [string, any][],
|
||||
constants: { [key: string]: string | number | undefined },
|
||||
resultId: string,
|
||||
) {
|
||||
let filterObject = { ...Object.fromEntries(combination), ...constants } as FlatReqFilter;
|
||||
filterObject.resultSetId = resultId;
|
||||
return filterObject;
|
||||
}
|
||||
|
||||
cartesianProduct(arrays);
|
||||
return ret;
|
||||
}
|
||||
|
||||
function resultSetId(f: ReqFilter) {
|
||||
if (f.limit !== undefined || f.since !== undefined || f.until !== undefined) {
|
||||
const arrays = Object.entries(f)
|
||||
.filter(([, a]) => Array.isArray(a))
|
||||
.map(a => a as [string, Array<string | number>])
|
||||
.sort();
|
||||
const input = arrays.map(([, a]) => a.join(",")).join(",");
|
||||
return sha256(input);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
@ -2,18 +2,9 @@ import { distance } from "@snort/shared";
|
||||
import { ReqFilter } from "..";
|
||||
import { FlatReqFilter } from ".";
|
||||
|
||||
/**
|
||||
* Keys which can change the entire meaning of the filter outside the array types
|
||||
*/
|
||||
const DiscriminatorKeys = ["since", "until", "limit", "search"];
|
||||
|
||||
export function canMergeFilters(a: FlatReqFilter | ReqFilter, b: FlatReqFilter | ReqFilter): boolean {
|
||||
const aObj = a as Record<string, string | number | undefined>;
|
||||
const bObj = b as Record<string, string | number | undefined>;
|
||||
for (const key of DiscriminatorKeys) {
|
||||
if (aObj[key] !== bObj[key]) {
|
||||
return false;
|
||||
}
|
||||
if (a.resultSetId !== b.resultSetId) {
|
||||
return false;
|
||||
}
|
||||
return distance(a, b) <= 1;
|
||||
}
|
||||
@ -101,12 +92,11 @@ export function flatMerge(all: Array<FlatReqFilter>): Array<ReqFilter> {
|
||||
|
||||
// to compute filters which can be merged we need to calucate the distance change between each filter
|
||||
// then we can merge filters which are exactly 1 change diff from each other
|
||||
|
||||
function mergeFiltersInSet(filters: Array<FlatReqFilter>) {
|
||||
return filters.reduce((acc, a) => {
|
||||
Object.entries(a).forEach(([k, v]) => {
|
||||
if (k === "keys" || v === undefined) return;
|
||||
if (DiscriminatorKeys.includes(k)) {
|
||||
if (v === undefined) return;
|
||||
if (k === "since" || k === "until" || k === "limit" || k === "search" || k === "resultSetId") {
|
||||
acc[k] = v;
|
||||
} else {
|
||||
acc[k] ??= [];
|
||||
@ -142,5 +132,6 @@ export function flatMerge(all: Array<FlatReqFilter>): Array<ReqFilter> {
|
||||
}
|
||||
ret = n;
|
||||
}
|
||||
ret.forEach(a => delete a["resultSetId"]);
|
||||
return ret;
|
||||
}
|
||||
|
@ -157,14 +157,20 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
|
||||
*/
|
||||
#feed: NoteStore;
|
||||
|
||||
/**
|
||||
* Maximum waiting time for this query
|
||||
*/
|
||||
#timeout: number;
|
||||
|
||||
#log = debug("Query");
|
||||
|
||||
constructor(id: string, instance: string, feed: NoteStore, leaveOpen?: boolean) {
|
||||
constructor(id: string, instance: string, feed: NoteStore, leaveOpen?: boolean, timeout?: number) {
|
||||
super();
|
||||
this.id = id;
|
||||
this.#feed = feed;
|
||||
this.fromInstance = instance;
|
||||
this.#leaveOpen = leaveOpen ?? false;
|
||||
this.#timeout = timeout ?? 5_000;
|
||||
this.#checkTraces();
|
||||
}
|
||||
|
||||
@ -292,7 +298,7 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
|
||||
this.#stopCheckTraces();
|
||||
this.#checkTrace = setInterval(() => {
|
||||
for (const v of this.#tracing) {
|
||||
if (v.runtime > 5_000 && !v.finished) {
|
||||
if (v.runtime > this.#timeout && !v.finished) {
|
||||
v.forceEose();
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,16 @@ export interface RequestBuilderOptions {
|
||||
* Do not apply diff logic and always use full filters for query
|
||||
*/
|
||||
skipDiff?: boolean;
|
||||
|
||||
/**
|
||||
* Pick N relays per pubkey when using outbox strategy
|
||||
*/
|
||||
outboxPickN?: number;
|
||||
|
||||
/**
|
||||
* Max wait time for this request
|
||||
*/
|
||||
timeout?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -101,7 +111,7 @@ export class RequestBuilder {
|
||||
}
|
||||
|
||||
build(system: SystemInterface): Array<BuiltRawReqFilter> {
|
||||
const expanded = this.#builders.flatMap(a => a.build(system.RelayCache, this.id));
|
||||
const expanded = this.#builders.flatMap(a => a.build(system.RelayCache, this.#options));
|
||||
return this.#groupByRelay(system, expanded);
|
||||
}
|
||||
|
||||
@ -130,11 +140,9 @@ export class RequestBuilder {
|
||||
|
||||
/**
|
||||
* Merge a set of expanded filters into the smallest number of subscriptions by merging similar requests
|
||||
* @param expanded
|
||||
* @returns
|
||||
*/
|
||||
#groupByRelay(system: SystemInterface, expanded: Array<BuiltRawReqFilter>) {
|
||||
const relayMerged = expanded.reduce((acc, v) => {
|
||||
#groupByRelay(system: SystemInterface, filters: Array<BuiltRawReqFilter>) {
|
||||
const relayMerged = filters.reduce((acc, v) => {
|
||||
const existing = acc.get(v.relay);
|
||||
if (existing) {
|
||||
existing.push(v);
|
||||
@ -267,7 +275,7 @@ export class RequestFilterBuilder {
|
||||
/**
|
||||
* Build/expand this filter into a set of relay specific queries
|
||||
*/
|
||||
build(relays: RelayCache, id: string): Array<BuiltRawReqFilter> {
|
||||
build(relays: RelayCache, options?: RequestBuilderOptions): Array<BuiltRawReqFilter> {
|
||||
// use the explicit relay list first
|
||||
if (this.#relays.size > 0) {
|
||||
return [...this.#relays].map(r => {
|
||||
@ -281,7 +289,7 @@ export class RequestFilterBuilder {
|
||||
|
||||
// If any authors are set use the gossip model to fetch data for each author
|
||||
if (this.#filter.authors) {
|
||||
const split = splitByWriteRelays(relays, this.#filter);
|
||||
const split = splitByWriteRelays(relays, this.#filter, options?.outboxPickN);
|
||||
return split.map(a => {
|
||||
return {
|
||||
filters: [a.filter],
|
||||
|
Reference in New Issue
Block a user