From a938e466d7b8283a02de2a5329ded225a000f9f2 Mon Sep 17 00:00:00 2001 From: kieran Date: Fri, 5 Apr 2024 14:11:40 +0100 Subject: [PATCH] refactor: outbox (inbox query) improvements feat: sync account tool --- packages/app/src/Feed/LoginFeed.ts | 1 + packages/app/src/Pages/settings/Cache.tsx | 12 +++- .../app/src/Pages/settings/tools/index.tsx | 19 ++++- .../src/Pages/settings/tools/sync-account.tsx | 48 +++++++++++++ packages/app/src/Utils/Const.ts | 7 +- packages/system/src/connection-pool.ts | 17 ----- packages/system/src/connection.ts | 20 ++---- packages/system/src/index.ts | 1 + packages/system/src/nostr-system.ts | 8 --- packages/system/src/nostr.ts | 1 - packages/system/src/outbox/outbox-model.ts | 31 +++++--- packages/system/src/profile-cache.ts | 7 +- packages/system/src/query.ts | 39 ++++++----- packages/system/src/range-sync.ts | 70 +++++++++++++++++++ packages/system/src/request-builder.ts | 10 +-- packages/worker-relay/src/types.ts | 1 - 16 files changed, 207 insertions(+), 85 deletions(-) create mode 100644 packages/app/src/Pages/settings/tools/sync-account.tsx create mode 100644 packages/system/src/range-sync.ts diff --git a/packages/app/src/Feed/LoginFeed.ts b/packages/app/src/Feed/LoginFeed.ts index b8904be9..6f46ad2e 100644 --- a/packages/app/src/Feed/LoginFeed.ts +++ b/packages/app/src/Feed/LoginFeed.ts @@ -65,6 +65,7 @@ export default function useLoginFeed() { EventKind.BookmarksList, EventKind.InterestsList, EventKind.PublicChatsList, + EventKind.DirectMessage, ]); if (CONFIG.features.subscriptions && !login.readonly) { b.withFilter().authors([pubKey]).kinds([EventKind.AppData]).tag("d", ["snort"]); diff --git a/packages/app/src/Pages/settings/Cache.tsx b/packages/app/src/Pages/settings/Cache.tsx index 2060ec34..032c16f3 100644 --- a/packages/app/src/Pages/settings/Cache.tsx +++ b/packages/app/src/Pages/settings/Cache.tsx @@ -4,6 +4,7 @@ import { FormattedMessage, FormattedNumber } from "react-intl"; import { GiftsCache, Relay, RelayMetrics } from "@/Cache"; import AsyncButton from "@/Components/Button/AsyncButton"; +import useLogin from "@/Hooks/useLogin"; export function CacheSettings() { return ( @@ -50,15 +51,24 @@ function CacheDetails({ cache, name }: { cache: FeedCache; name: ReactNode function RelayCacheStats() { const [counts, setCounts] = useState>({}); + const [myEvents, setMyEvents] = useState(0); + const login = useLogin(); useEffect(() => { Relay.summary().then(setCounts); + if (login.publicKey) { + Relay.count(["REQ", "my", { authors: [login.publicKey] }]).then(setMyEvents); + } }, []); return (
+ {myEvents &&

+ + }} />

} @@ -89,7 +99,7 @@ function RelayCacheStats() {
- {}}> + { }}> , path: "follows-relay-health", - }, + } ], }, + { + + title: , + items: [ + { + icon: "repost", + iconBg: "bg-blue-800", + message: , + path: "sync-account" + } + ] + } ] as SettingsMenuItems; export const ToolsPages = [ @@ -47,6 +60,10 @@ export const ToolsPages = [ path: "follows-relay-health", element: , }, + { + path: "sync-account", + element: + } ] as Array; export function ToolsPage() { diff --git a/packages/app/src/Pages/settings/tools/sync-account.tsx b/packages/app/src/Pages/settings/tools/sync-account.tsx new file mode 100644 index 00000000..33e274c7 --- /dev/null +++ b/packages/app/src/Pages/settings/tools/sync-account.tsx @@ -0,0 +1,48 @@ +import { unwrap } from "@snort/shared"; +import { RangeSync, TaggedNostrEvent } from "@snort/system" +import { SnortContext } from "@snort/system-react"; +import { useContext, useState } from "react"; +import { FormattedMessage, FormattedNumber } from "react-intl"; + +import AsyncButton from "@/Components/Button/AsyncButton"; +import useLogin from "@/Hooks/useLogin"; +import { SearchRelays } from "@/Utils/Const"; + +export default function SyncAccountTool() { + const system = useContext(SnortContext); + const login = useLogin(); + const [scan, setScan] = useState(); + const [results, setResults] = useState>([]); + + async function start() { + const relays = Object.entries(login.relays.item).filter(([, v]) => v.write).map(([k,]) => k); + const sync = new RangeSync(system); + sync.on("event", evs => { + setResults(r => [...r, ...evs]); + }); + sync.on("scan", t => setScan(t)); + await sync.sync({ + authors: [unwrap(login.publicKey)], + relays: [...relays, ...Object.keys(CONFIG.defaultRelays), ...SearchRelays] + }) + } + return <> +

+ +

+ + {results.length > 0 &&

+ + }} /> +

} + {scan !== undefined &&

+ +

} + + + + +} \ No newline at end of file diff --git a/packages/app/src/Utils/Const.ts b/packages/app/src/Utils/Const.ts index 21917f86..e92097d7 100644 --- a/packages/app/src/Utils/Const.ts +++ b/packages/app/src/Utils/Const.ts @@ -32,10 +32,9 @@ export const SnortPubKey = "npub1sn0rtcjcf543gj4wsg7fa59s700d5ztys5ctj0g69g2x680 * Default search relays */ export const SearchRelays = [ - "wss://relay.nostr.band", - "wss://search.nos.today", - "wss://relay.noswhere.com", - "wss://saltivka.org", + "wss://relay.nostr.band/", + "wss://search.nos.today/", + "wss://relay.noswhere.com/", ]; export const DeveloperAccounts = [ diff --git a/packages/system/src/connection-pool.ts b/packages/system/src/connection-pool.ts index d1a34318..d23e7dc7 100644 --- a/packages/system/src/connection-pool.ts +++ b/packages/system/src/connection-pool.ts @@ -70,23 +70,6 @@ export class DefaultConnectionPool extends EventEmitter { - this.#log("%s have: %s %o", c.Address, s, id); - if (this.#requestedIds.has(id)) { - this.#log("HAVE: Already requested from another relay %s", id); - // TODO if request to a relay fails, try another relay. otherwise malicious relays can block content. - return; - } - this.#requestedIds.add(id); - // is this performant? should it be batched? - const alreadyHave = await this.#system.cacheRelay?.query(["REQ", id, { ids: [id] }]); - if (alreadyHave?.length) { - this.#log("HAVE: Already have %s", id); - return; - } - this.#log("HAVE: GET requesting %s", id); - c.queueReq(["GET", id], () => {}); - }); c.on("eose", s => this.emit("eose", addr, s)); c.on("disconnect", code => this.emit("disconnect", addr, code)); c.on("connected", r => this.emit("connected", addr, r)); diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 994b9765..4ecb8507 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -28,7 +28,6 @@ interface ConnectionEvents { disconnect: (code: number) => void; auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void; notice: (msg: string) => void; - have: (sub: string, id: u256) => void; // NIP-114 unknownMessage: (obj: Array) => void; } @@ -159,7 +158,7 @@ export class Connection extends EventEmitter { this.IsClosed = true; this.#log(`Closed! (Remote)`); } else if (!this.IsClosed) { - this.ConnectTimeout = this.ConnectTimeout * 2; + this.ConnectTimeout = this.ConnectTimeout * this.ConnectTimeout; this.#log( `Closed (code=${e.code}), trying again in ${(this.ConnectTimeout / 1000).toFixed(0).toLocaleString()} sec`, ); @@ -211,11 +210,6 @@ export class Connection extends EventEmitter { // todo: stats events received break; } - // NIP-114: GetMatchingEventIds - case "HAVE": { - this.emit("have", msg[1] as string, msg[2] as u256); - break; - } case "EOSE": { this.emit("eose", msg[1] as string); break; @@ -398,18 +392,14 @@ export class Connection extends EventEmitter { } }; if (this.Address.startsWith("wss://relay.snort.social")) { - const newFilters = filters.map(a => { - if (a.ids_only) { - const copy = { ...a }; - delete copy.ids_only; - return copy; - } - return a; - }); + const newFilters = filters; const neg = new NegentropyFlow(id, this, eventSet, newFilters); neg.once("finish", filters => { if (filters.length > 0) { this.queueReq(["REQ", cmd[1], ...filters], item.cb); + } else { + // no results to query, emulate closed + this.emit("closed", id, "Nothing to sync"); } }); neg.once("error", () => { diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index d3d58b77..8fc76bf9 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -38,6 +38,7 @@ export * from "./pow-util"; export * from "./query-optimizer"; export * from "./encrypted"; export * from "./outbox"; +export * from "./range-sync"; export * from "./impl/nip4"; export * from "./impl/nip44"; diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 20708516..7e2777a5 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -257,14 +257,6 @@ export class NostrSystem extends EventEmitter implements Syst } } }); - this.pool.on("eose", (id, sub) => { - const c = this.pool.getConnection(id); - if (c) { - for (const [, v] of this.#queryManager) { - v.eose(sub, c); - } - } - }); this.pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb)); this.pool.on("notice", (addr, msg) => { this.#log("NOTICE: %s %s", addr, msg); diff --git a/packages/system/src/nostr.ts b/packages/system/src/nostr.ts index 846398a5..20f0dc39 100644 --- a/packages/system/src/nostr.ts +++ b/packages/system/src/nostr.ts @@ -57,7 +57,6 @@ export interface ReqFilter { since?: number; until?: number; limit?: number; - ids_only?: boolean; relays?: string[]; [key: string]: Array | Array | string | number | undefined | boolean; } diff --git a/packages/system/src/outbox/outbox-model.ts b/packages/system/src/outbox/outbox-model.ts index 61c1f3d0..5e961f91 100644 --- a/packages/system/src/outbox/outbox-model.ts +++ b/packages/system/src/outbox/outbox-model.ts @@ -61,6 +61,9 @@ export class OutboxModel extends BaseRequestRouter { // selection algo will just pick relays with the most users const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size); + if (missing.length > 0) { + this.#log("No relay metadata found, outbox model will not work for %O", missing) + } // - count keys per relay // - pick n top relays // - map keys per relay (for subscription filter) @@ -90,30 +93,35 @@ export class OutboxModel extends BaseRequestRouter { * @returns */ forRequest(filter: ReqFilter, pickN?: number): Array { - const authors = filter.authors; + // when sending a request prioritize the #p filter over authors + const pattern = filter["#p"] !== undefined ? "inbox" : "outbox"; + const key = filter["#p"] !== undefined ? "#p" : "authors"; + const authors = filter[key]; if ((authors?.length ?? 0) === 0) { return [filter]; } - const topRelays = this.pickTopRelays(unwrap(authors), pickN ?? DefaultPickNRelays, "write"); - const pickedRelays = dedupe(topRelays.flatMap(a => a.relays)); + const topWriteRelays = this.pickTopRelays(unwrap(authors), + pickN ?? DefaultPickNRelays, + pattern === "inbox" ? "read" : "write"); + const pickedRelays = dedupe(topWriteRelays.flatMap(a => a.relays)); const picked = pickedRelays.map(a => { - const keysOnPickedRelay = dedupe(topRelays.filter(b => b.relays.includes(a)).map(b => b.key)); + const keysOnPickedRelay = dedupe(topWriteRelays.filter(b => b.relays.includes(a)).map(b => b.key)); return { ...filter, - authors: keysOnPickedRelay, - relays: appendDedupe(filter.relays, [a]), + [key]: keysOnPickedRelay, + relays: appendDedupe(filter.relays, [a]) } as ReqFilter; }); - const noRelays = dedupe(topRelays.filter(a => a.relays.length === 0).map(a => a.key)); + const noRelays = dedupe(topWriteRelays.filter(a => a.relays.length === 0).map(a => a.key)); if (noRelays.length > 0) { picked.push({ ...filter, - authors: noRelays, + [key]: noRelays, } as ReqFilter); } - this.#log("Picked %O => %O", filter, picked); + this.#log("Picked: pattern=%s, input=%O, output=%O", pattern, filter, picked); return picked; } @@ -151,7 +159,7 @@ export class OutboxModel extends BaseRequestRouter { picked.push(...input.filter(v => !v.authors || noRelays.has(v.authors))); } - this.#log("Picked %d relays from %d filters", picked.length, input.length); + this.#log("Picked: pattern=%s, input=%O, output=%O", "outbox", input, picked); return picked; } @@ -167,7 +175,8 @@ export class OutboxModel extends BaseRequestRouter { await this.updateRelayLists(recipients); const relays = this.pickTopRelays(recipients, pickN ?? DefaultPickNRelays, "read"); const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat())); - this.#log("Picked %O from authors %O", ret, recipients); + + this.#log("Picked: pattern=%s, input=%O, output=%O", "inbox", ev, ret); return ret; } diff --git a/packages/system/src/profile-cache.ts b/packages/system/src/profile-cache.ts index c76e629b..fc13bb3e 100644 --- a/packages/system/src/profile-cache.ts +++ b/packages/system/src/profile-cache.ts @@ -1,6 +1,6 @@ import { unixNowMs } from "@snort/shared"; import { EventKind, TaggedNostrEvent, RequestBuilder } from "."; -import { ProfileCacheExpire } from "./const"; +import { MetadataRelays, ProfileCacheExpire } from "./const"; import { mapEventToProfile, CachedMetadata } from "./cache"; import { BackgroundLoader } from "./background-loader"; @@ -19,7 +19,10 @@ export class ProfileLoaderService extends BackgroundLoader { override buildSub(missing: string[]): RequestBuilder { const sub = new RequestBuilder(`profiles`); - sub.withFilter().kinds([EventKind.SetMetadata]).authors(missing).relay(["wss://purplepag.es/"]); + sub.withFilter() + .kinds([EventKind.SetMetadata]) + .authors(missing) + .relay(MetadataRelays); return sub; } diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index f8c278a5..21575e35 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -43,8 +43,8 @@ export class QueryTrace extends EventEmitter { gotEose() { this.eose = unixNowMs(); - this.emit("change"); this.emit("eose", this.id, this.connId, false); + this.emit("change"); } forceEose() { @@ -304,14 +304,6 @@ export class Query extends EventEmitter { this.cleanup(); } - eose(sub: string, conn: Readonly) { - const qt = this.#tracing.find(a => a.id === sub && a.connId === conn.Id); - qt?.gotEose(); - if (!this.#leaveOpen) { - qt?.sendClose(); - } - } - /** * Get the progress to EOSE, can be used to determine when we should load more content */ @@ -337,6 +329,16 @@ export class Query extends EventEmitter { } } + #eose(sub: string, conn: Readonly) { + const qt = this.#tracing.find(a => a.id === sub && a.connId === conn.Id); + if (qt) { + qt.gotEose(); + if (!this.#leaveOpen) { + qt.sendClose(); + } + } + } + async #emitFilters() { this.#log("Starting emit of %s", this.id); const existing = this.filters; @@ -394,10 +396,6 @@ export class Query extends EventEmitter { #sendQueryInternal(c: Connection, q: BuiltRawReqFilter) { let filters = q.filters; - if (c.supportsNip(Nips.GetMatchingEventIds)) { - filters = filters.map(f => ({ ...f, ids_only: true })); - } - const qt = new QueryTrace(c.Address, filters, c.Id); qt.on("close", x => c.closeReq(x)); qt.on("change", () => this.#onProgress()); @@ -410,13 +408,22 @@ export class Query extends EventEmitter { responseTime: qt.responseTime, } as TraceReport), ); - const handler = (sub: string, ev: TaggedNostrEvent) => { + const eventHandler = (sub: string, ev: TaggedNostrEvent) => { if (this.request.options?.fillStore ?? true) { this.handleEvent(sub, ev); } }; - c.on("event", handler); - this.on("end", () => c.off("event", handler)); + const eoseHandler = (sub: string) => { + this.#eose(sub, c); + }; + c.on("event", eventHandler); + c.on("eose", eoseHandler); + c.on("closed", eoseHandler); + this.on("end", () => { + c.off("event", eventHandler); + c.off("eose", eoseHandler); + c.off("closed", eoseHandler); + }); this.#tracing.push(qt); if (q.syncFrom !== undefined) { diff --git a/packages/system/src/range-sync.ts b/packages/system/src/range-sync.ts new file mode 100644 index 00000000..d29a2c68 --- /dev/null +++ b/packages/system/src/range-sync.ts @@ -0,0 +1,70 @@ +import { unixNow } from "@snort/shared"; +import EventEmitter from "eventemitter3"; +import { ReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from "."; + +/** + * When nostr was created + */ +const NostrBirthday: number = new Date(2021, 1, 1).getTime() / 1000; + +interface RangeSyncEvents { + event: (ev: Array) => void + scan: (from: number) => void +} + +/** + * A simple time based sync for pulling lots of data from nostr + */ +export class RangeSync extends EventEmitter { + #start: number = NostrBirthday; + #windowSize: number = 60 * 60 * 12; + + constructor(readonly system: SystemInterface) { + super(); + } + + /** + * Set window size in seconds + */ + setWindowSize(n: number) { + if (n < 60) { + throw new Error("Window size too small"); + } + this.#windowSize = n; + } + + /** + * Set start time for range sync + * @param n Unix timestamp + */ + setStartPoint(n: number) { + if (n < NostrBirthday) { + throw new Error("Start point cannot be before nostr's birthday"); + } + this.#start = n; + } + + /** + * Request to sync with a given filter + */ + async sync(filter: ReqFilter) { + if (filter.since !== undefined || filter.until !== undefined || filter.limit !== undefined) { + throw new Error("Filter must not contain since/until/limit"); + } + + if (!this.system.requestRouter) { + throw new Error("RangeSync cannot work without request router!"); + } + + const now = unixNow(); + for (let end = now; end > this.#start; end -= this.#windowSize) { + const rb = new RequestBuilder(`range-query:${end}`); + rb.withBareFilter(filter) + .since(end - this.#windowSize) + .until(end); + this.emit("scan", end); + const results = await this.system.Fetch(rb); + this.emit("event", results); + } + } +} \ No newline at end of file diff --git a/packages/system/src/request-builder.ts b/packages/system/src/request-builder.ts index c55b4af0..2620690c 100644 --- a/packages/system/src/request-builder.ts +++ b/packages/system/src/request-builder.ts @@ -140,6 +140,7 @@ export class RequestBuilder { #groupFlatByRelay(system: SystemInterface, filters: Array) { const relayMerged = filters.reduce((acc, v) => { const relay = v.relay ?? ""; + // delete relay from filter delete v.relay; const existing = acc.get(relay); if (existing) { @@ -167,7 +168,6 @@ export class RequestBuilder { */ export class RequestFilterBuilder { #filter: ReqFilter; - #relays = new Set(); constructor(f?: ReqFilter) { this.#filter = f ?? {}; @@ -176,7 +176,6 @@ export class RequestFilterBuilder { get filter() { return { ...this.#filter, - relays: this.#relays.size > 0 ? [...this.#relays] : undefined, }; } @@ -185,12 +184,7 @@ export class RequestFilterBuilder { */ relay(u: string | Array) { const relays = Array.isArray(u) ? u : [u]; - for (const r of relays) { - const uClean = sanitizeRelayUrl(r); - if (uClean) { - this.#relays.add(uClean); - } - } + this.#filter.relays = appendDedupe(this.#filter.relays, removeUndefined(relays.map(a => sanitizeRelayUrl(a)))); return this; } diff --git a/packages/worker-relay/src/types.ts b/packages/worker-relay/src/types.ts index d53526ab..f983b2ee 100644 --- a/packages/worker-relay/src/types.ts +++ b/packages/worker-relay/src/types.ts @@ -45,7 +45,6 @@ export interface ReqFilter { since?: number; until?: number; limit?: number; - ids_only?: boolean; [key: string]: Array | Array | string | number | undefined | boolean; }