From 57bf51c41c6f91fed89e7d01b66b3215c16f1b52 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 5 Jun 2024 13:08:55 +0100 Subject: [PATCH] refactor: move connection sync module --- .../src/Pages/Notifications/Notifications.tsx | 13 +- packages/app/src/lang.json | 3 + packages/app/src/translations/en.json | 1 + packages/system/src/connection-pool.ts | 16 ++- packages/system/src/connection.ts | 66 ++--------- packages/system/src/nostr-system.ts | 95 ++------------- packages/system/src/query.ts | 8 +- packages/system/src/sync/connection.ts | 111 ++++++++++++++++++ packages/system/src/system-base.ts | 8 +- packages/system/src/system.ts | 12 ++ 10 files changed, 181 insertions(+), 152 deletions(-) create mode 100644 packages/system/src/sync/connection.ts diff --git a/packages/app/src/Pages/Notifications/Notifications.tsx b/packages/app/src/Pages/Notifications/Notifications.tsx index fe6cf4f0..75c596bc 100644 --- a/packages/app/src/Pages/Notifications/Notifications.tsx +++ b/packages/app/src/Pages/Notifications/Notifications.tsx @@ -34,15 +34,16 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL const myNotifications = useMemo(() => { return notifications - .sort((a, b) => a.created_at > b.created_at ? -1 : 1) + .sort((a, b) => (a.created_at > b.created_at ? -1 : 1)) .slice(0, limit) .filter(a => !isMuted(a.pubkey) && a.tags.some(b => b[0] === "p" && b[1] === login.publicKey)); }, [notifications, login.publicKey, limit]); const timeGrouped = useMemo(() => { return myNotifications.reduce((acc, v) => { - const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${v.kind - }`; + const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${ + v.kind + }`; if (acc.has(key)) { unwrap(acc.get(key)).push(v as TaggedNostrEvent); } else { @@ -63,7 +64,11 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL {login.publicKey && [...timeGrouped.entries()].map(([k, g]) => )} - { setLimit(l => l + 100) }} /> + { + setLimit(l => l + 100); + }} + /> ); diff --git a/packages/app/src/lang.json b/packages/app/src/lang.json index cf36ef3b..2ab3d24b 100644 --- a/packages/app/src/lang.json +++ b/packages/app/src/lang.json @@ -1778,6 +1778,9 @@ "yCLnBC": { "defaultMessage": "LNURL or Lightning Address" }, + "z3UjXR": { + "defaultMessage": "Debug" + }, "zCb8fX": { "defaultMessage": "Weight" }, diff --git a/packages/app/src/translations/en.json b/packages/app/src/translations/en.json index e78935c6..24b6395f 100644 --- a/packages/app/src/translations/en.json +++ b/packages/app/src/translations/en.json @@ -590,6 +590,7 @@ "y1Z3or": "Language", "yAztTU": "{n} eSats", "yCLnBC": "LNURL or Lightning Address", + "z3UjXR": "Debug", "zCb8fX": "Weight", "zFegDD": "Contact", "zINlao": "Owner", diff --git a/packages/system/src/connection-pool.ts b/packages/system/src/connection-pool.ts index 2922d885..dad5eace 100644 --- a/packages/system/src/connection-pool.ts +++ b/packages/system/src/connection-pool.ts @@ -5,6 +5,7 @@ import { EventEmitter } from "eventemitter3"; import { Connection, RelaySettings, SyncCommand } from "./connection"; import { NostrEvent, OkResponse, ReqCommand, TaggedNostrEvent } from "./nostr"; import { RelayInfo, SystemInterface } from "."; +import { ConnectionSyncModule, DefaultSyncModule } from "./sync/connection"; /** * Events which the ConnectionType must emit @@ -93,6 +94,7 @@ export type ConnectionBuilder = ( address: string, options: RelaySettings, ephemeral: boolean, + syncModule?: ConnectionSyncModule, ) => Promise | T; /** @@ -105,6 +107,11 @@ export class DefaultConnectionPool #system: SystemInterface; #log = debug("ConnectionPool"); + /** + * Track if a connection request has started + */ + #connectStarted = new Set(); + /** * All currently connected websockets */ @@ -122,7 +129,8 @@ export class DefaultConnectionPool this.#connectionBuilder = builder; } else { this.#connectionBuilder = (addr, options, ephemeral) => { - return new Connection(addr, options, ephemeral) as unknown as T; + const sync = new DefaultSyncModule(this.#system.config.fallbackSync); + return new Connection(addr, options, ephemeral, sync) as unknown as T; }; } } @@ -140,12 +148,14 @@ export class DefaultConnectionPool */ async connect(address: string, options: RelaySettings, ephemeral: boolean) { const addr = unwrap(sanitizeRelayUrl(address)); + if (this.#connectStarted.has(addr)) return; + this.#connectStarted.add(addr); + try { const existing = this.#sockets.get(addr); if (!existing) { const c = await this.#connectionBuilder(addr, options, ephemeral); this.#sockets.set(addr, c); - c.on("event", (s, e) => { if (this.#system.checkSigs && !this.#system.optimizer.schnorrVerify(e)) { this.#log("Reject invalid event %o", e); @@ -177,6 +187,8 @@ export class DefaultConnectionPool this.#log("%O", e); this.emit("connectFailed", addr); this.#sockets.delete(addr); + } finally { + this.#connectStarted.delete(addr); } } diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 2abf8d73..18cc7df7 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -1,18 +1,16 @@ import { v4 as uuid } from "uuid"; import debug from "debug"; import WebSocket from "isomorphic-ws"; -import { unixNowMs, dedupe } from "@snort/shared"; +import { unixNowMs } from "@snort/shared"; import { EventEmitter } from "eventemitter3"; import { DefaultConnectTimeout } from "./const"; import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr"; import { RelayInfo } from "./relay-info"; import EventKind from "./event-kind"; -import { EventExt, EventType } from "./event-ext"; -import { NegentropyFlow } from "./negentropy/negentropy-flow"; +import { EventExt } from "./event-ext"; import { ConnectionType, ConnectionTypeEvents } from "./connection-pool"; -import { RangeSync } from "./sync"; -import { NoteCollection } from "./note-collection"; +import { ConnectionSyncModule } from "./sync/connection"; /** * Relay settings @@ -46,6 +44,7 @@ export class Connection extends EventEmitter implements Co #downCount = 0; #activeRequests = new Set(); #connectStarted = false; + #syncModule?: ConnectionSyncModule; id: string; readonly address: string; @@ -64,7 +63,7 @@ export class Connection extends EventEmitter implements Co AwaitingAuth: Map; Authed = false; - constructor(addr: string, options: RelaySettings, ephemeral: boolean = false) { + constructor(addr: string, options: RelaySettings, ephemeral: boolean = false, syncModule?: ConnectionSyncModule) { super(); this.id = uuid(); this.address = addr; @@ -72,6 +71,7 @@ export class Connection extends EventEmitter implements Co this.EventsCallback = new Map(); this.AwaitingAuth = new Map(); this.#ephemeral = ephemeral; + this.#syncModule = syncModule; this.#log = debug("Connection").extend(addr); } @@ -395,58 +395,10 @@ export class Connection extends EventEmitter implements Co this.#activeRequests.add(cmd[1]); this.#send(cmd); } else if (cmd[0] === "SYNC") { - const [_, id, eventSet, ...filters] = cmd; - const lastResortSync = () => { - const isReplacableSync = filters.every(a => a.kinds?.every(b => EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable) ?? false); - if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplacableSync) { - this.request(["REQ", id, ...filters], item.cb); - } else { - const rs = RangeSync.forFetcher(async (rb, cb) => { - return await new Promise((resolve, reject) => { - const results = new NoteCollection(); - const f = rb.buildRaw(); - this.on("event", (c, e) => { - if (rb.id === c) { - cb?.([e]); - results.add(e); - } - }); - this.on("eose", s => { - if (s === rb.id) { - resolve(results.takeSnapshot()); - } - }); - this.request(["REQ", rb.id, ...f], undefined); - }); - }); - const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0); - rs.setStartPoint(latest + 1); - rs.on("event", ev => { - ev.forEach(e => this.emit("event", id, e)); - }); - for (const f of filters) { - rs.sync(f); - } - } - }; - if (this.info?.negentropy === "v1") { - const newFilters = filters; - const neg = new NegentropyFlow(id, this, eventSet, newFilters); - neg.once("finish", filters => { - if (filters.length > 0) { - this.request(["REQ", cmd[1], ...filters], item.cb); - } else { - // no results to query, emulate closed - this.emit("closed", id, "Nothing to sync"); - } - }); - neg.once("error", () => { - lastResortSync(); - }); - neg.start(); - } else { - lastResortSync(); + if (!this.#syncModule) { + throw new Error("no sync module"); } + this.#syncModule.sync(this, cmd, item.cb); } } catch (e) { console.error(e); diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 83f15a4e..f38aaba1 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -1,97 +1,33 @@ import debug from "debug"; -import { EventEmitter } from "eventemitter3"; -import { CachedTable, unixNowMs } from "@snort/shared"; +import { unixNowMs } from "@snort/shared"; import { NostrEvent, TaggedNostrEvent, OkResponse } from "./nostr"; import { RelaySettings } from "./connection"; import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; import { RelayMetricHandler } from "./relay-metric-handler"; import { - CachedMetadata, ProfileLoaderService, - RelayMetrics, SystemInterface, SystemSnapshot, - UserProfileCache, - UserRelaysCache, - RelayMetricCache, - UsersRelays, QueryLike, OutboxModel, socialGraphInstance, EventKind, - UsersFollows, ID, - NostrSystemEvents, SystemConfig, } from "."; -import { EventsCache } from "./cache/events"; import { RelayMetadataLoader } from "./outbox"; -import { Optimizer, DefaultOptimizer } from "./query-optimizer"; import { ConnectionPool, DefaultConnectionPool } from "./connection-pool"; import { QueryManager } from "./query-manager"; -import { CacheRelay } from "./cache-relay"; import { RequestRouter } from "./request-router"; -import { UserFollowsCache } from "./cache/user-follows-lists"; +import { SystemBase } from "./system-base"; /** * Manages nostr content retrieval system */ -export class NostrSystem extends EventEmitter implements SystemInterface { +export class NostrSystem extends SystemBase implements SystemInterface { #log = debug("System"); #queryManager: QueryManager; - #config: SystemConfig; - - /** - * Storage class for user relay lists - */ - get relayCache(): CachedTable { - return this.#config.relays; - } - - /** - * Storage class for user profiles - */ - get profileCache(): CachedTable { - return this.#config.profiles; - } - - /** - * Storage class for relay metrics (connects/disconnects) - */ - get relayMetricsCache(): CachedTable { - return this.#config.relayMetrics; - } - - /** - * Optimizer instance, contains optimized functions for processing data - */ - get optimizer(): Optimizer { - return this.#config.optimizer; - } - - get eventsCache(): CachedTable { - return this.#config.events; - } - - get userFollowsCache(): CachedTable { - return this.#config.contactLists; - } - - get cacheRelay(): CacheRelay | undefined { - return this.#config.cachingRelay; - } - - /** - * Check event signatures (recommended) - */ - get checkSigs(): boolean { - return this.#config.checkSigs; - } - - set checkSigs(v: boolean) { - this.#config.checkSigs = v; - } readonly profileLoader: ProfileLoaderService; readonly relayMetricsHandler: RelayMetricHandler; @@ -100,39 +36,26 @@ export class NostrSystem extends EventEmitter implements Syst readonly requestRouter: RequestRouter | undefined; constructor(props: Partial) { - super(); - this.#config = { - relays: props.relays ?? new UserRelaysCache(props.db?.userRelays), - profiles: props.profiles ?? new UserProfileCache(props.db?.users), - relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics), - events: props.events ?? new EventsCache(props.db?.events), - contactLists: props.contactLists ?? new UserFollowsCache(props.db?.contacts), - optimizer: props.optimizer ?? DefaultOptimizer, - checkSigs: props.checkSigs ?? false, - cachingRelay: props.cachingRelay, - db: props.db, - automaticOutboxModel: props.automaticOutboxModel ?? true, - buildFollowGraph: props.buildFollowGraph ?? false, - }; + super(props); this.profileLoader = new ProfileLoaderService(this, this.profileCache); this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache); this.relayLoader = new RelayMetadataLoader(this, this.relayCache); // if automatic outbox model, setup request router as OutboxModel - if (this.#config.automaticOutboxModel) { + if (this.config.automaticOutboxModel) { this.requestRouter = OutboxModel.fromSystem(this); } // Cache everything - if (this.#config.cachingRelay) { + if (this.config.cachingRelay) { this.on("event", async (_, ev) => { - await this.#config.cachingRelay?.event(ev); + await this.config.cachingRelay?.event(ev); }); } // Hook on-event when building follow graph - if (this.#config.buildFollowGraph) { + if (this.config.buildFollowGraph) { let evBuf: Array = []; let t: ReturnType | undefined; this.on("event", (_, ev) => { @@ -213,7 +136,7 @@ export class NostrSystem extends EventEmitter implements Syst async PreloadSocialGraph() { // Insert data to socialGraph from cache - if (this.#config.buildFollowGraph) { + if (this.config.buildFollowGraph) { for (const list of this.userFollowsCache.snapshot()) { const user = ID(list.pubkey); for (const fx of list.follows) { diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index 5fa4d33f..7e3058d5 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -412,8 +412,12 @@ export class Query extends EventEmitter { } }); const eventHandler = (sub: string, ev: TaggedNostrEvent) => { - if (this.request.options?.fillStore ?? true) { - this.handleEvent(sub, ev); + if ((this.request.options?.fillStore ?? true) && qt.id === sub) { + if (qt.filters.some(v => eventMatchesFilter(ev, v))) { + this.feed.add(ev); + } else { + this.#log("Event did not match filter, rejecting %O %O", ev, qt); + } } }; const eoseHandler = (sub: string) => { diff --git a/packages/system/src/sync/connection.ts b/packages/system/src/sync/connection.ts new file mode 100644 index 00000000..35e97710 --- /dev/null +++ b/packages/system/src/sync/connection.ts @@ -0,0 +1,111 @@ +import { Connection, SyncCommand } from "../connection"; +import { FallbackSyncMethod } from "../system"; +import { EventExt, EventType } from "../event-ext"; +import { NoteCollection } from "../note-collection"; +import { RangeSync } from "./range-sync"; +import { NegentropyFlow } from "../negentropy/negentropy-flow"; + +export interface ConnectionSyncModule { + sync: (c: Connection, item: SyncCommand, cb?: () => void) => void; +} + +export class DefaultSyncModule implements ConnectionSyncModule { + constructor(readonly method: FallbackSyncMethod) {} + + sync(c: Connection, item: SyncCommand, cb?: () => void) { + const [_, id, eventSet, ...filters] = item; + if (c.info?.negentropy === "v1") { + const newFilters = filters; + const neg = new NegentropyFlow(id, c, eventSet, newFilters); + neg.once("finish", filters => { + if (filters.length > 0) { + c.request(["REQ", id, ...filters], cb); + } else { + // no results to query, emulate closed + c.emit("closed", id, "Nothing to sync"); + } + }); + neg.once("error", () => { + this.#fallbackSync(c, item, cb); + }); + neg.start(); + } else { + this.#fallbackSync(c, item, cb); + } + } + + #fallbackSync(c: Connection, item: SyncCommand, cb?: () => void) { + const [type, id, eventSet, ...filters] = item; + if (type !== "SYNC") throw new Error("Must be a SYNC command"); + + // if the event is replaceable there is no need to use any special sync query, + // just send the filters directly + const isReplaceableSync = filters.every( + a => + a.kinds?.every( + b => + EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable, + ) ?? false, + ); + if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplaceableSync) { + c.request(["REQ", id, ...filters], cb); + } else if (this.method === FallbackSyncMethod.Since) { + this.#syncSince(c, item, cb); + } else if (this.method === FallbackSyncMethod.RangeSync) { + this.#syncRangeSync(c, item, cb); + } else { + throw new Error("No fallback sync method"); + } + } + + /** + * Using the latest data, fetch only newer items + * + * The downfall of this method is when the dataset is truncated by the relay (ie. limit results to 1000 items) + */ + #syncSince(c: Connection, item: SyncCommand, cb?: () => void) { + const [type, id, eventSet, ...filters] = item; + if (type !== "SYNC") throw new Error("Must be a SYNC command"); + const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0); + const newFilters = filters.map(a => ({ + ...a, + since: latest + 1, + })); + c.request(["REQ", id, ...newFilters], cb); + } + + /** + * Using the RangeSync class, sync data using fixed window size + */ + #syncRangeSync(c: Connection, item: SyncCommand, cb?: () => void) { + const [type, id, eventSet, ...filters] = item; + if (type !== "SYNC") throw new Error("Must be a SYNC command"); + + const rs = RangeSync.forFetcher(async (rb, cb) => { + return await new Promise((resolve, reject) => { + const results = new NoteCollection(); + const f = rb.buildRaw(); + c.on("event", (c, e) => { + if (rb.id === c) { + cb?.([e]); + results.add(e); + } + }); + c.on("eose", s => { + if (s === rb.id) { + resolve(results.takeSnapshot()); + } + }); + c.request(["REQ", rb.id, ...f], undefined); + }); + }); + const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0); + rs.setStartPoint(latest + 1); + rs.on("event", ev => { + ev.forEach(e => c.emit("event", id, e)); + }); + for (const f of filters) { + rs.sync(f); + } + } +} diff --git a/packages/system/src/system-base.ts b/packages/system/src/system-base.ts index a150c1e5..2cc1e908 100644 --- a/packages/system/src/system-base.ts +++ b/packages/system/src/system-base.ts @@ -5,14 +5,19 @@ import { EventsCache } from "./cache/events"; import { UserFollowsCache } from "./cache/user-follows-lists"; import { UserRelaysCache, UserProfileCache, RelayMetricCache, NostrEvent } from "./index"; import { DefaultOptimizer, Optimizer } from "./query-optimizer"; -import { NostrSystemEvents, SystemConfig } from "./system"; +import { FallbackSyncMethod, NostrSystemEvents, SystemConfig } from "./system"; import { EventEmitter } from "eventemitter3"; export abstract class SystemBase extends EventEmitter { #config: SystemConfig; + get config() { + return this.#config; + } + constructor(props: Partial) { super(); + this.#config = { relays: props.relays ?? new UserRelaysCache(props.db?.userRelays), profiles: props.profiles ?? new UserProfileCache(props.db?.users), @@ -25,6 +30,7 @@ export abstract class SystemBase extends EventEmitter { db: props.db, automaticOutboxModel: props.automaticOutboxModel ?? true, buildFollowGraph: props.buildFollowGraph ?? false, + fallbackSync: props.fallbackSync ?? FallbackSyncMethod.Since, }; } diff --git a/packages/system/src/system.ts b/packages/system/src/system.ts index 427e8685..760fe480 100644 --- a/packages/system/src/system.ts +++ b/packages/system/src/system.ts @@ -91,6 +91,16 @@ export interface SystemConfig { * for users when fetching by author. */ buildFollowGraph: boolean; + + /** + * Pick a fallback sync method when negentropy is not available + */ + fallbackSync: FallbackSyncMethod; +} + +export enum FallbackSyncMethod { + Since = "since", + RangeSync = "range-sync", } export interface SystemInterface { @@ -200,6 +210,8 @@ export interface SystemInterface { * Request router instance */ get requestRouter(): RequestRouter | undefined; + + get config(): SystemConfig; } export interface SystemSnapshot {