refactor: thread loading improvements
This commit is contained in:
@ -397,7 +397,7 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
||||
this.queueReq(["REQ", id, ...newFilters], item.cb);
|
||||
}
|
||||
};
|
||||
if (this.Info?.software?.includes("strfry")) {
|
||||
if (this.Address.startsWith("wss://relay.snort.social")) {
|
||||
const newFilters = filters.map(a => {
|
||||
if (a.ids_only) {
|
||||
const copy = { ...a };
|
||||
|
@ -49,3 +49,12 @@ export const MentionNostrEntityRegex = /@n(pub|profile|event|ote|addr|)1[acdefgh
|
||||
* Regex to match markdown code content
|
||||
*/
|
||||
export const MarkdownCodeRegex = /(```.+?```)/gms;
|
||||
|
||||
/**
|
||||
* Public metadata relays
|
||||
*/
|
||||
export const MetadataRelays = [
|
||||
"wss://purplepag.es/",
|
||||
"wss://relay.nostr.band/",
|
||||
"wss://relay.snort.social/"
|
||||
]
|
@ -1,5 +1,5 @@
|
||||
import { BuiltRawReqFilter, RequestStrategy } from "./request-builder";
|
||||
import { NostrEvent, TaggedNostrEvent } from "./nostr";
|
||||
import { BuiltRawReqFilter } from "./request-builder";
|
||||
import { NostrEvent } from "./nostr";
|
||||
import { Query } from "./query";
|
||||
|
||||
export interface EventCache {
|
||||
@ -9,31 +9,3 @@ export interface EventCache {
|
||||
export interface FilterCacheLayer {
|
||||
processFilter(q: Query, req: BuiltRawReqFilter): Promise<BuiltRawReqFilter>;
|
||||
}
|
||||
|
||||
export class IdsFilterCacheLayer implements FilterCacheLayer {
|
||||
constructor(readonly cache: EventCache) {}
|
||||
|
||||
async processFilter(q: Query, req: BuiltRawReqFilter) {
|
||||
for (const f of req.filters) {
|
||||
if (f.ids) {
|
||||
const cacheResults = await this.cache.bulkGet(f.ids);
|
||||
if (cacheResults.length > 0) {
|
||||
const resultIds = new Set(cacheResults.map(a => a.id));
|
||||
f.ids = f.ids.filter(a => !resultIds.has(a));
|
||||
|
||||
// this step is important for buildDiff, if a filter doesnt exist with the ids which are from cache
|
||||
// we will create an infinite loop where every render we insert a new query for the ids which are missing
|
||||
q.insertCompletedTrace(
|
||||
{
|
||||
filters: [{ ...f, ids: [...resultIds] }],
|
||||
strategy: RequestStrategy.ExplicitRelays,
|
||||
relay: req.relay,
|
||||
},
|
||||
cacheResults as Array<TaggedNostrEvent>,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
@ -177,14 +177,14 @@ export class NostrLink implements ToNostrEventTag {
|
||||
throw new Error(`Unknown tag kind ${tag.key}`);
|
||||
}
|
||||
|
||||
static fromTag(tag: Array<string>) {
|
||||
static fromTag(tag: Array<string>, author?: string, kind?: number) {
|
||||
const relays = tag.length > 2 ? [tag[2]] : undefined;
|
||||
switch (tag[0]) {
|
||||
case "e": {
|
||||
return new NostrLink(NostrPrefix.Event, tag[1], undefined, undefined, relays);
|
||||
return new NostrLink(NostrPrefix.Event, tag[1], kind, author, relays);
|
||||
}
|
||||
case "p": {
|
||||
return new NostrLink(NostrPrefix.Profile, tag[1], undefined, undefined, relays);
|
||||
return new NostrLink(NostrPrefix.Profile, tag[1], kind, author, relays);
|
||||
}
|
||||
case "a": {
|
||||
const [kind, author, dTag] = tag[1].split(":");
|
||||
|
@ -58,6 +58,7 @@ export interface ReqFilter {
|
||||
until?: number;
|
||||
limit?: number;
|
||||
ids_only?: boolean;
|
||||
relays?: string[];
|
||||
[key: string]: Array<string> | Array<number> | string | number | undefined | boolean;
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,10 @@
|
||||
import { EventKind, NostrEvent, ReqFilter, RequestBuilder, SystemInterface } from "..";
|
||||
import { dedupe, removeUndefined, unixNowMs, unwrap } from "@snort/shared";
|
||||
import { appendDedupe, dedupe, removeUndefined, unixNowMs, unwrap } from "@snort/shared";
|
||||
import { FlatReqFilter } from "../query-optimizer";
|
||||
import { RelayListCacheExpire } from "../const";
|
||||
import { AuthorsRelaysCache, EventFetcher, PickedRelays, DefaultPickNRelays, parseRelaysFromKind } from ".";
|
||||
import debug from "debug";
|
||||
import { BaseRequestRouter, RelayTaggedFilter, RelayTaggedFlatFilters } from "../request-router";
|
||||
import { BaseRequestRouter } from "../request-router";
|
||||
|
||||
/**
|
||||
* Simple outbox model using most popular relays
|
||||
@ -89,15 +89,10 @@ export class OutboxModel extends BaseRequestRouter {
|
||||
* @param pickN Number of relays to pick per author
|
||||
* @returns
|
||||
*/
|
||||
forRequest(filter: ReqFilter, pickN?: number): Array<RelayTaggedFilter> {
|
||||
forRequest(filter: ReqFilter, pickN?: number): Array<ReqFilter> {
|
||||
const authors = filter.authors;
|
||||
if ((authors?.length ?? 0) === 0) {
|
||||
return [
|
||||
{
|
||||
relay: "",
|
||||
filter,
|
||||
},
|
||||
];
|
||||
return [filter];
|
||||
}
|
||||
|
||||
const topRelays = this.pickTopRelays(unwrap(authors), pickN ?? DefaultPickNRelays, "write");
|
||||
@ -106,22 +101,17 @@ export class OutboxModel extends BaseRequestRouter {
|
||||
const picked = pickedRelays.map(a => {
|
||||
const keysOnPickedRelay = dedupe(topRelays.filter(b => b.relays.includes(a)).map(b => b.key));
|
||||
return {
|
||||
relay: a,
|
||||
filter: {
|
||||
...filter,
|
||||
authors: keysOnPickedRelay,
|
||||
},
|
||||
} as RelayTaggedFilter;
|
||||
...filter,
|
||||
authors: keysOnPickedRelay,
|
||||
relays: appendDedupe(filter.relays, [a])
|
||||
} as ReqFilter;
|
||||
});
|
||||
const noRelays = dedupe(topRelays.filter(a => a.relays.length === 0).map(a => a.key));
|
||||
if (noRelays.length > 0) {
|
||||
picked.push({
|
||||
relay: "",
|
||||
filter: {
|
||||
...filter,
|
||||
authors: noRelays,
|
||||
},
|
||||
});
|
||||
...filter,
|
||||
authors: noRelays,
|
||||
} as ReqFilter);
|
||||
}
|
||||
this.#log("Picked %O => %O", filter, picked);
|
||||
return picked;
|
||||
@ -133,32 +123,32 @@ export class OutboxModel extends BaseRequestRouter {
|
||||
* @param pickN Number of relays to pick per author
|
||||
* @returns
|
||||
*/
|
||||
forFlatRequest(input: Array<FlatReqFilter>, pickN?: number): Array<RelayTaggedFlatFilters> {
|
||||
const authors = input.filter(a => a.authors).map(a => unwrap(a.authors));
|
||||
forFlatRequest(input: Array<FlatReqFilter>, pickN?: number): Array<FlatReqFilter> {
|
||||
const authors = removeUndefined(input.flatMap(a => a.authors));
|
||||
if (authors.length === 0) {
|
||||
return [
|
||||
{
|
||||
relay: "",
|
||||
filters: input,
|
||||
},
|
||||
];
|
||||
return input;
|
||||
}
|
||||
const topRelays = this.pickTopRelays(authors, pickN ?? DefaultPickNRelays, "write");
|
||||
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
|
||||
|
||||
const picked = pickedRelays.map(a => {
|
||||
const picked = pickedRelays.flatMap(a => {
|
||||
const authorsOnRelay = new Set(topRelays.filter(v => v.relays.includes(a)).map(v => v.key));
|
||||
return {
|
||||
relay: a,
|
||||
filters: input.filter(v => v.authors && authorsOnRelay.has(v.authors)),
|
||||
} as RelayTaggedFlatFilters;
|
||||
return input
|
||||
.filter(v => v.authors && authorsOnRelay.has(v.authors))
|
||||
.flatMap(b => {
|
||||
// if flat filter isnt already relay tagged, set relay tag or
|
||||
// create a duplicate filter with the authors picked relay
|
||||
if (!b.relay) {
|
||||
b.relay = a;
|
||||
return [b];
|
||||
} else {
|
||||
return [b, { ...b, relay: a }];
|
||||
}
|
||||
});
|
||||
});
|
||||
const noRelays = new Set(topRelays.filter(v => v.relays.length === 0).map(v => v.key));
|
||||
if (noRelays.size > 0) {
|
||||
picked.push({
|
||||
relay: "",
|
||||
filters: input.filter(v => !v.authors || noRelays.has(v.authors)),
|
||||
} as RelayTaggedFlatFilters);
|
||||
picked.push(...input.filter(v => !v.authors || noRelays.has(v.authors)));
|
||||
}
|
||||
|
||||
this.#log("Picked %d relays from %d filters", picked.length, input.length);
|
||||
|
@ -19,7 +19,10 @@ export class ProfileLoaderService extends BackgroundLoader<CachedMetadata> {
|
||||
|
||||
override buildSub(missing: string[]): RequestBuilder {
|
||||
const sub = new RequestBuilder(`profiles`);
|
||||
sub.withFilter().kinds([EventKind.SetMetadata]).authors(missing);
|
||||
sub.withFilter()
|
||||
.kinds([EventKind.SetMetadata])
|
||||
.authors(missing)
|
||||
.relay(["wss://purplepag.es/"]);
|
||||
return sub;
|
||||
}
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
import debug from "debug";
|
||||
import { EventEmitter } from "eventemitter3";
|
||||
import { BuiltRawReqFilter, RequestBuilder, RequestStrategy, SystemInterface, TaggedNostrEvent } from ".";
|
||||
import { BuiltRawReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".";
|
||||
import { Query, TraceReport } from "./query";
|
||||
import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer";
|
||||
import { FilterCacheLayer } from "./filter-cache-layer";
|
||||
import { trimFilters } from "./request-trim";
|
||||
|
||||
interface QueryManagerEvents {
|
||||
@ -35,7 +35,6 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
||||
constructor(system: SystemInterface) {
|
||||
super();
|
||||
this.#system = system;
|
||||
this.#queryCacheLayers.push(new IdsFilterCacheLayer(system.eventsCache));
|
||||
|
||||
setInterval(() => this.#cleanup(), 1_000);
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ export interface FlatReqFilter {
|
||||
since?: number;
|
||||
until?: number;
|
||||
limit?: number;
|
||||
relay?: string;
|
||||
resultSetId: string;
|
||||
}
|
||||
|
||||
|
@ -1,41 +1,18 @@
|
||||
import debug from "debug";
|
||||
import { v4 as uuid } from "uuid";
|
||||
import { appendDedupe, dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared";
|
||||
import { appendDedupe, dedupe, removeUndefined, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared";
|
||||
|
||||
import EventKind from "./event-kind";
|
||||
import { NostrLink, NostrPrefix, SystemInterface } from ".";
|
||||
import { FlatReqFilter, NostrLink, NostrPrefix, SystemInterface } from ".";
|
||||
import { ReqFilter, u256, HexKey, TaggedNostrEvent } from "./nostr";
|
||||
import { RequestRouter } from "./request-router";
|
||||
|
||||
/**
|
||||
* Which strategy is used when building REQ filters
|
||||
*/
|
||||
export const enum RequestStrategy {
|
||||
/**
|
||||
* Use the users default relays to fetch events,
|
||||
* this is the fallback option when there is no better way to query a given filter set
|
||||
*/
|
||||
DefaultRelays = "default",
|
||||
|
||||
/**
|
||||
* Using a cached copy of the authors relay lists NIP-65, split a given set of request filters by pubkey
|
||||
*/
|
||||
AuthorsRelays = "authors-relays",
|
||||
|
||||
/**
|
||||
* Use pre-determined relays for query
|
||||
*/
|
||||
ExplicitRelays = "explicit-relays",
|
||||
}
|
||||
|
||||
/**
|
||||
* A built REQ filter ready for sending to System
|
||||
*/
|
||||
export interface BuiltRawReqFilter {
|
||||
filters: Array<ReqFilter>;
|
||||
relay: string;
|
||||
strategy: RequestStrategy;
|
||||
|
||||
// Use set sync from an existing set of events
|
||||
syncFrom?: Array<TaggedNostrEvent>;
|
||||
}
|
||||
@ -133,8 +110,12 @@ export class RequestBuilder {
|
||||
}
|
||||
|
||||
build(system: SystemInterface): Array<BuiltRawReqFilter> {
|
||||
const expanded = this.#builders.flatMap(a => a.build(system.requestRouter, this.#options));
|
||||
return this.#groupByRelay(system, expanded);
|
||||
let rawFilters = this.buildRaw();
|
||||
if (system.requestRouter) {
|
||||
rawFilters = system.requestRouter.forAllRequest(rawFilters);
|
||||
}
|
||||
const expanded = rawFilters.flatMap(a => system.optimizer.expandFilter(a));
|
||||
return this.#groupFlatByRelay(system, expanded);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -143,55 +124,41 @@ export class RequestBuilder {
|
||||
async buildDiff(system: SystemInterface, prev: Array<ReqFilter>): Promise<Array<BuiltRawReqFilter>> {
|
||||
const start = unixNowMs();
|
||||
|
||||
const diff = system.optimizer.getDiff(prev, this.buildRaw());
|
||||
let rawFilters = this.buildRaw();
|
||||
if (system.requestRouter) {
|
||||
rawFilters = system.requestRouter.forAllRequest(rawFilters);
|
||||
}
|
||||
const diff = system.optimizer.getDiff(prev, rawFilters);
|
||||
const ts = unixNowMs() - start;
|
||||
this.#log("buildDiff %s %d ms +%d", this.id, ts, diff.length);
|
||||
if (diff.length > 0) {
|
||||
if (system.requestRouter) {
|
||||
// todo: fix for explicit relays
|
||||
return system.requestRouter.forFlatRequest(diff).map(a => {
|
||||
return {
|
||||
strategy: RequestStrategy.AuthorsRelays,
|
||||
filters: system.optimizer.flatMerge(a.filters),
|
||||
relay: a.relay,
|
||||
};
|
||||
});
|
||||
} else {
|
||||
return [
|
||||
{
|
||||
strategy: RequestStrategy.DefaultRelays,
|
||||
filters: system.optimizer.flatMerge(diff),
|
||||
relay: "",
|
||||
},
|
||||
];
|
||||
}
|
||||
return this.#groupFlatByRelay(system, diff);
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge a set of expanded filters into the smallest number of subscriptions by merging similar requests
|
||||
*/
|
||||
#groupByRelay(system: SystemInterface, filters: Array<BuiltRawReqFilter>) {
|
||||
#groupFlatByRelay(system: SystemInterface, filters: Array<FlatReqFilter>) {
|
||||
const relayMerged = filters.reduce((acc, v) => {
|
||||
const existing = acc.get(v.relay);
|
||||
const relay = v.relay ?? "";
|
||||
delete v.relay;
|
||||
const existing = acc.get(relay);
|
||||
if (existing) {
|
||||
existing.push(v);
|
||||
} else {
|
||||
acc.set(v.relay, [v]);
|
||||
acc.set(relay, [v]);
|
||||
}
|
||||
return acc;
|
||||
}, new Map<string, Array<BuiltRawReqFilter>>());
|
||||
}, new Map<string, Array<FlatReqFilter>>());
|
||||
|
||||
const filtersSquashed = [...relayMerged.values()].map(a => {
|
||||
return {
|
||||
filters: system.optimizer.flatMerge(a.flatMap(b => b.filters.flatMap(c => system.optimizer.expandFilter(c)))),
|
||||
relay: a[0].relay,
|
||||
strategy: a[0].strategy,
|
||||
} as BuiltRawReqFilter;
|
||||
});
|
||||
|
||||
return filtersSquashed;
|
||||
const ret = [];
|
||||
for (const [k, v] of relayMerged.entries()) {
|
||||
const filters = system.optimizer.flatMerge(v);
|
||||
ret.push({
|
||||
relay: k,
|
||||
filters,
|
||||
} as BuiltRawReqFilter);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@ -207,7 +174,10 @@ export class RequestFilterBuilder {
|
||||
}
|
||||
|
||||
get filter() {
|
||||
return { ...this.#filter };
|
||||
return {
|
||||
...this.#filter,
|
||||
relays: this.#relays.size > 0 ? [...this.#relays] : undefined
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@ -232,6 +202,7 @@ export class RequestFilterBuilder {
|
||||
authors(authors?: Array<HexKey>) {
|
||||
if (!authors) return this;
|
||||
this.#filter.authors = appendDedupe(this.#filter.authors, authors);
|
||||
this.#filter.authors = this.#filter.authors.filter(a => a.length === 64);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -281,6 +252,9 @@ export class RequestFilterBuilder {
|
||||
.authors([unwrap(link.author)]);
|
||||
} else {
|
||||
this.ids([link.id]);
|
||||
if (link.author) {
|
||||
this.authors([link.author]);
|
||||
}
|
||||
}
|
||||
link.relays?.forEach(v => this.relay(v));
|
||||
return this;
|
||||
@ -298,46 +272,18 @@ export class RequestFilterBuilder {
|
||||
tags[0][0],
|
||||
tags.map(v => v[1]),
|
||||
);
|
||||
this.relay(removeUndefined(links.map(a => a.relays).flat()));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build/expand this filter into a set of relay specific queries
|
||||
*/
|
||||
build(model?: RequestRouter, options?: RequestBuilderOptions): Array<BuiltRawReqFilter> {
|
||||
return this.#buildFromFilter(this.#filter, model, options);
|
||||
}
|
||||
|
||||
#buildFromFilter(f: ReqFilter, model?: RequestRouter, options?: RequestBuilderOptions) {
|
||||
// use the explicit relay list first
|
||||
if (this.#relays.size > 0) {
|
||||
return [...this.#relays].map(r => {
|
||||
return {
|
||||
filters: [f],
|
||||
relay: r,
|
||||
strategy: RequestStrategy.ExplicitRelays,
|
||||
};
|
||||
});
|
||||
build(model?: RequestRouter, options?: RequestBuilderOptions): Array<ReqFilter> {
|
||||
if (model) {
|
||||
return model.forRequest(this.filter, options?.outboxPickN);
|
||||
}
|
||||
|
||||
// If any authors are set use the gossip model to fetch data for each author
|
||||
if (f.authors && model) {
|
||||
const split = model.forRequest(f, options?.outboxPickN);
|
||||
return split.map(a => {
|
||||
return {
|
||||
filters: [a.filter],
|
||||
relay: a.relay,
|
||||
strategy: RequestStrategy.AuthorsRelays,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
return [
|
||||
{
|
||||
filters: [f],
|
||||
relay: "",
|
||||
strategy: RequestStrategy.DefaultRelays,
|
||||
},
|
||||
];
|
||||
return [this.filter];
|
||||
}
|
||||
}
|
||||
|
@ -1,21 +1,7 @@
|
||||
import { unwrap } from "@snort/shared";
|
||||
import { NostrEvent, ReqFilter } from "./nostr";
|
||||
import { FlatReqFilter } from "./query-optimizer";
|
||||
|
||||
export interface RelayTaggedFilter {
|
||||
relay: string;
|
||||
filter: ReqFilter;
|
||||
}
|
||||
|
||||
export interface RelayTaggedFlatFilters {
|
||||
relay: string;
|
||||
filters: Array<FlatReqFilter>;
|
||||
}
|
||||
|
||||
export interface RelayTaggedFilters {
|
||||
relay: string;
|
||||
filters: Array<ReqFilter>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Request router managed splitting of requests to one or more relays, and which relay to send events to.
|
||||
*/
|
||||
@ -35,7 +21,7 @@ export interface RequestRouter {
|
||||
* @param pickN Number of relays to pick
|
||||
* @returns
|
||||
*/
|
||||
forRequest(filter: ReqFilter, pickN?: number): Array<RelayTaggedFilter>;
|
||||
forRequest(filter: ReqFilter, pickN?: number): Array<ReqFilter>;
|
||||
|
||||
/**
|
||||
* Split a request filter to one or more relays.
|
||||
@ -43,34 +29,37 @@ export interface RequestRouter {
|
||||
* @param pickN Number of relays to pick
|
||||
* @returns
|
||||
*/
|
||||
forFlatRequest(filter: Array<FlatReqFilter>, pickN?: number): Array<RelayTaggedFlatFilters>;
|
||||
forFlatRequest(filter: Array<FlatReqFilter>, pickN?: number): Array<FlatReqFilter>;
|
||||
|
||||
/**
|
||||
* Same as forRequest, but merges the results
|
||||
* @param filters
|
||||
*/
|
||||
forAllRequest(filters: Array<ReqFilter>): Array<ReqFilter>;
|
||||
}
|
||||
|
||||
export abstract class BaseRequestRouter implements RequestRouter {
|
||||
abstract forReply(ev: NostrEvent, pickN?: number): Promise<Array<string>>;
|
||||
abstract forRequest(filter: ReqFilter, pickN?: number): Array<RelayTaggedFilter>;
|
||||
abstract forFlatRequest(filter: FlatReqFilter[], pickN?: number): Array<RelayTaggedFlatFilters>;
|
||||
abstract forRequest(filter: ReqFilter, pickN?: number): Array<ReqFilter>;
|
||||
abstract forFlatRequest(filter: FlatReqFilter[], pickN?: number): Array<FlatReqFilter>;
|
||||
|
||||
forAllRequest(filters: Array<ReqFilter>) {
|
||||
const allSplit = filters
|
||||
.map(a => this.forRequest(a))
|
||||
.reduce((acc, v) => {
|
||||
for (const vn of v) {
|
||||
const existing = acc.get(vn.relay);
|
||||
if (existing) {
|
||||
existing.push(vn.filter);
|
||||
} else {
|
||||
acc.set(vn.relay, [vn.filter]);
|
||||
for (const r of (vn.relays?.length ?? 0) > 0 ? unwrap(vn.relays) : [""]) {
|
||||
const existing = acc.get(r);
|
||||
if (existing) {
|
||||
existing.push(vn);
|
||||
} else {
|
||||
acc.set(r, [vn]);
|
||||
}
|
||||
}
|
||||
}
|
||||
return acc;
|
||||
}, new Map<string, Array<ReqFilter>>());
|
||||
|
||||
return [...allSplit.entries()].map(([k, v]) => {
|
||||
return {
|
||||
relay: k,
|
||||
filters: v,
|
||||
} as RelayTaggedFilters;
|
||||
});
|
||||
return [...allSplit.values()].flat()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user