refactor: outbox (inbox query) improvements

feat: sync account tool
This commit is contained in:
2024-04-05 14:11:40 +01:00
parent a88fda2a22
commit a938e466d7
16 changed files with 207 additions and 85 deletions

View File

@ -70,23 +70,6 @@ export class DefaultConnectionPool extends EventEmitter<NostrConnectionPoolEvent
}
this.emit("event", addr, s, e);
});
c.on("have", async (s, id) => {
this.#log("%s have: %s %o", c.Address, s, id);
if (this.#requestedIds.has(id)) {
this.#log("HAVE: Already requested from another relay %s", id);
// TODO if request to a relay fails, try another relay. otherwise malicious relays can block content.
return;
}
this.#requestedIds.add(id);
// is this performant? should it be batched?
const alreadyHave = await this.#system.cacheRelay?.query(["REQ", id, { ids: [id] }]);
if (alreadyHave?.length) {
this.#log("HAVE: Already have %s", id);
return;
}
this.#log("HAVE: GET requesting %s", id);
c.queueReq(["GET", id], () => {});
});
c.on("eose", s => this.emit("eose", addr, s));
c.on("disconnect", code => this.emit("disconnect", addr, code));
c.on("connected", r => this.emit("connected", addr, r));

View File

@ -28,7 +28,6 @@ interface ConnectionEvents {
disconnect: (code: number) => void;
auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void;
notice: (msg: string) => void;
have: (sub: string, id: u256) => void; // NIP-114
unknownMessage: (obj: Array<any>) => void;
}
@ -159,7 +158,7 @@ export class Connection extends EventEmitter<ConnectionEvents> {
this.IsClosed = true;
this.#log(`Closed! (Remote)`);
} else if (!this.IsClosed) {
this.ConnectTimeout = this.ConnectTimeout * 2;
this.ConnectTimeout = this.ConnectTimeout * this.ConnectTimeout;
this.#log(
`Closed (code=${e.code}), trying again in ${(this.ConnectTimeout / 1000).toFixed(0).toLocaleString()} sec`,
);
@ -211,11 +210,6 @@ export class Connection extends EventEmitter<ConnectionEvents> {
// todo: stats events received
break;
}
// NIP-114: GetMatchingEventIds
case "HAVE": {
this.emit("have", msg[1] as string, msg[2] as u256);
break;
}
case "EOSE": {
this.emit("eose", msg[1] as string);
break;
@ -398,18 +392,14 @@ export class Connection extends EventEmitter<ConnectionEvents> {
}
};
if (this.Address.startsWith("wss://relay.snort.social")) {
const newFilters = filters.map(a => {
if (a.ids_only) {
const copy = { ...a };
delete copy.ids_only;
return copy;
}
return a;
});
const newFilters = filters;
const neg = new NegentropyFlow(id, this, eventSet, newFilters);
neg.once("finish", filters => {
if (filters.length > 0) {
this.queueReq(["REQ", cmd[1], ...filters], item.cb);
} else {
// no results to query, emulate closed
this.emit("closed", id, "Nothing to sync");
}
});
neg.once("error", () => {

View File

@ -38,6 +38,7 @@ export * from "./pow-util";
export * from "./query-optimizer";
export * from "./encrypted";
export * from "./outbox";
export * from "./range-sync";
export * from "./impl/nip4";
export * from "./impl/nip44";

View File

@ -257,14 +257,6 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
}
});
this.pool.on("eose", (id, sub) => {
const c = this.pool.getConnection(id);
if (c) {
for (const [, v] of this.#queryManager) {
v.eose(sub, c);
}
}
});
this.pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb));
this.pool.on("notice", (addr, msg) => {
this.#log("NOTICE: %s %s", addr, msg);

View File

@ -57,7 +57,6 @@ export interface ReqFilter {
since?: number;
until?: number;
limit?: number;
ids_only?: boolean;
relays?: string[];
[key: string]: Array<string> | Array<number> | string | number | undefined | boolean;
}

View File

@ -61,6 +61,9 @@ export class OutboxModel extends BaseRequestRouter {
// selection algo will just pick relays with the most users
const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size);
if (missing.length > 0) {
this.#log("No relay metadata found, outbox model will not work for %O", missing)
}
// <relay, key[]> - count keys per relay
// <key, relay[]> - pick n top relays
// <relay, key[]> - map keys per relay (for subscription filter)
@ -90,30 +93,35 @@ export class OutboxModel extends BaseRequestRouter {
* @returns
*/
forRequest(filter: ReqFilter, pickN?: number): Array<ReqFilter> {
const authors = filter.authors;
// when sending a request prioritize the #p filter over authors
const pattern = filter["#p"] !== undefined ? "inbox" : "outbox";
const key = filter["#p"] !== undefined ? "#p" : "authors";
const authors = filter[key];
if ((authors?.length ?? 0) === 0) {
return [filter];
}
const topRelays = this.pickTopRelays(unwrap(authors), pickN ?? DefaultPickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const topWriteRelays = this.pickTopRelays(unwrap(authors),
pickN ?? DefaultPickNRelays,
pattern === "inbox" ? "read" : "write");
const pickedRelays = dedupe(topWriteRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
const keysOnPickedRelay = dedupe(topRelays.filter(b => b.relays.includes(a)).map(b => b.key));
const keysOnPickedRelay = dedupe(topWriteRelays.filter(b => b.relays.includes(a)).map(b => b.key));
return {
...filter,
authors: keysOnPickedRelay,
relays: appendDedupe(filter.relays, [a]),
[key]: keysOnPickedRelay,
relays: appendDedupe(filter.relays, [a])
} as ReqFilter;
});
const noRelays = dedupe(topRelays.filter(a => a.relays.length === 0).map(a => a.key));
const noRelays = dedupe(topWriteRelays.filter(a => a.relays.length === 0).map(a => a.key));
if (noRelays.length > 0) {
picked.push({
...filter,
authors: noRelays,
[key]: noRelays,
} as ReqFilter);
}
this.#log("Picked %O => %O", filter, picked);
this.#log("Picked: pattern=%s, input=%O, output=%O", pattern, filter, picked);
return picked;
}
@ -151,7 +159,7 @@ export class OutboxModel extends BaseRequestRouter {
picked.push(...input.filter(v => !v.authors || noRelays.has(v.authors)));
}
this.#log("Picked %d relays from %d filters", picked.length, input.length);
this.#log("Picked: pattern=%s, input=%O, output=%O", "outbox", input, picked);
return picked;
}
@ -167,7 +175,8 @@ export class OutboxModel extends BaseRequestRouter {
await this.updateRelayLists(recipients);
const relays = this.pickTopRelays(recipients, pickN ?? DefaultPickNRelays, "read");
const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat()));
this.#log("Picked %O from authors %O", ret, recipients);
this.#log("Picked: pattern=%s, input=%O, output=%O", "inbox", ev, ret);
return ret;
}

View File

@ -1,6 +1,6 @@
import { unixNowMs } from "@snort/shared";
import { EventKind, TaggedNostrEvent, RequestBuilder } from ".";
import { ProfileCacheExpire } from "./const";
import { MetadataRelays, ProfileCacheExpire } from "./const";
import { mapEventToProfile, CachedMetadata } from "./cache";
import { BackgroundLoader } from "./background-loader";
@ -19,7 +19,10 @@ export class ProfileLoaderService extends BackgroundLoader<CachedMetadata> {
override buildSub(missing: string[]): RequestBuilder {
const sub = new RequestBuilder(`profiles`);
sub.withFilter().kinds([EventKind.SetMetadata]).authors(missing).relay(["wss://purplepag.es/"]);
sub.withFilter()
.kinds([EventKind.SetMetadata])
.authors(missing)
.relay(MetadataRelays);
return sub;
}

View File

@ -43,8 +43,8 @@ export class QueryTrace extends EventEmitter<QueryTraceEvents> {
gotEose() {
this.eose = unixNowMs();
this.emit("change");
this.emit("eose", this.id, this.connId, false);
this.emit("change");
}
forceEose() {
@ -304,14 +304,6 @@ export class Query extends EventEmitter<QueryEvents> {
this.cleanup();
}
eose(sub: string, conn: Readonly<Connection>) {
const qt = this.#tracing.find(a => a.id === sub && a.connId === conn.Id);
qt?.gotEose();
if (!this.#leaveOpen) {
qt?.sendClose();
}
}
/**
* Get the progress to EOSE, can be used to determine when we should load more content
*/
@ -337,6 +329,16 @@ export class Query extends EventEmitter<QueryEvents> {
}
}
#eose(sub: string, conn: Readonly<Connection>) {
const qt = this.#tracing.find(a => a.id === sub && a.connId === conn.Id);
if (qt) {
qt.gotEose();
if (!this.#leaveOpen) {
qt.sendClose();
}
}
}
async #emitFilters() {
this.#log("Starting emit of %s", this.id);
const existing = this.filters;
@ -394,10 +396,6 @@ export class Query extends EventEmitter<QueryEvents> {
#sendQueryInternal(c: Connection, q: BuiltRawReqFilter) {
let filters = q.filters;
if (c.supportsNip(Nips.GetMatchingEventIds)) {
filters = filters.map(f => ({ ...f, ids_only: true }));
}
const qt = new QueryTrace(c.Address, filters, c.Id);
qt.on("close", x => c.closeReq(x));
qt.on("change", () => this.#onProgress());
@ -410,13 +408,22 @@ export class Query extends EventEmitter<QueryEvents> {
responseTime: qt.responseTime,
} as TraceReport),
);
const handler = (sub: string, ev: TaggedNostrEvent) => {
const eventHandler = (sub: string, ev: TaggedNostrEvent) => {
if (this.request.options?.fillStore ?? true) {
this.handleEvent(sub, ev);
}
};
c.on("event", handler);
this.on("end", () => c.off("event", handler));
const eoseHandler = (sub: string) => {
this.#eose(sub, c);
};
c.on("event", eventHandler);
c.on("eose", eoseHandler);
c.on("closed", eoseHandler);
this.on("end", () => {
c.off("event", eventHandler);
c.off("eose", eoseHandler);
c.off("closed", eoseHandler);
});
this.#tracing.push(qt);
if (q.syncFrom !== undefined) {

View File

@ -0,0 +1,70 @@
import { unixNow } from "@snort/shared";
import EventEmitter from "eventemitter3";
import { ReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".";
/**
* When nostr was created
*/
const NostrBirthday: number = new Date(2021, 1, 1).getTime() / 1000;
interface RangeSyncEvents {
event: (ev: Array<TaggedNostrEvent>) => void
scan: (from: number) => void
}
/**
* A simple time based sync for pulling lots of data from nostr
*/
export class RangeSync extends EventEmitter<RangeSyncEvents> {
#start: number = NostrBirthday;
#windowSize: number = 60 * 60 * 12;
constructor(readonly system: SystemInterface) {
super();
}
/**
* Set window size in seconds
*/
setWindowSize(n: number) {
if (n < 60) {
throw new Error("Window size too small");
}
this.#windowSize = n;
}
/**
* Set start time for range sync
* @param n Unix timestamp
*/
setStartPoint(n: number) {
if (n < NostrBirthday) {
throw new Error("Start point cannot be before nostr's birthday");
}
this.#start = n;
}
/**
* Request to sync with a given filter
*/
async sync(filter: ReqFilter) {
if (filter.since !== undefined || filter.until !== undefined || filter.limit !== undefined) {
throw new Error("Filter must not contain since/until/limit");
}
if (!this.system.requestRouter) {
throw new Error("RangeSync cannot work without request router!");
}
const now = unixNow();
for (let end = now; end > this.#start; end -= this.#windowSize) {
const rb = new RequestBuilder(`range-query:${end}`);
rb.withBareFilter(filter)
.since(end - this.#windowSize)
.until(end);
this.emit("scan", end);
const results = await this.system.Fetch(rb);
this.emit("event", results);
}
}
}

View File

@ -140,6 +140,7 @@ export class RequestBuilder {
#groupFlatByRelay(system: SystemInterface, filters: Array<FlatReqFilter>) {
const relayMerged = filters.reduce((acc, v) => {
const relay = v.relay ?? "";
// delete relay from filter
delete v.relay;
const existing = acc.get(relay);
if (existing) {
@ -167,7 +168,6 @@ export class RequestBuilder {
*/
export class RequestFilterBuilder {
#filter: ReqFilter;
#relays = new Set<string>();
constructor(f?: ReqFilter) {
this.#filter = f ?? {};
@ -176,7 +176,6 @@ export class RequestFilterBuilder {
get filter() {
return {
...this.#filter,
relays: this.#relays.size > 0 ? [...this.#relays] : undefined,
};
}
@ -185,12 +184,7 @@ export class RequestFilterBuilder {
*/
relay(u: string | Array<string>) {
const relays = Array.isArray(u) ? u : [u];
for (const r of relays) {
const uClean = sanitizeRelayUrl(r);
if (uClean) {
this.#relays.add(uClean);
}
}
this.#filter.relays = appendDedupe(this.#filter.relays, removeUndefined(relays.map(a => sanitizeRelayUrl(a))));
return this;
}