diff --git a/packages/app/custom.d.ts b/packages/app/custom.d.ts index 965ca5a8..85f08691 100644 --- a/packages/app/custom.d.ts +++ b/packages/app/custom.d.ts @@ -106,11 +106,6 @@ declare const CONFIG: { }>; }; -/** - * Single relay (Debug) - */ -declare const SINGLE_RELAY: string | undefined; - /** * Build git hash */ diff --git a/packages/app/src/Components/Relay/Relay.tsx b/packages/app/src/Components/Relay/Relay.tsx index 56479d85..50eb44ae 100644 --- a/packages/app/src/Components/Relay/Relay.tsx +++ b/packages/app/src/Components/Relay/Relay.tsx @@ -24,9 +24,7 @@ export default function Relay(props: RelayProps) { const system = useContext(SnortContext); const login = useLogin(); - const relaySettings = unwrap( - login.relays.item[props.addr] ?? system.Sockets.find(a => a.address === props.addr)?.settings ?? {}, - ); + const relaySettings = unwrap(login.relays.item[props.addr] ?? system.pool.getConnection(props.addr)?.Settings ?? {}); const state = useRelayState(props.addr); const name = useMemo(() => getRelayName(props.addr), [props.addr]); @@ -44,14 +42,14 @@ export default function Relay(props: RelayProps) { return ( <>
-
+
{name}
- {!state?.ephemeral && ( + {!state?.Ephemeral && (
navigate(state?.id ?? "")} + onClick={() => navigate(state?.Id ?? "")} />
)} diff --git a/packages/app/src/Feed/RelayState.ts b/packages/app/src/Feed/RelayState.ts index f6b257ad..49357a76 100644 --- a/packages/app/src/Feed/RelayState.ts +++ b/packages/app/src/Feed/RelayState.ts @@ -3,6 +3,6 @@ import { useContext } from "react"; export default function useRelayState(addr: string) { const system = useContext(SnortContext); - const c = system.Sockets.find(a => a.address === addr); + const c = system.pool.getConnection(addr); return c; } diff --git a/packages/app/src/Hooks/useLoginRelays.tsx b/packages/app/src/Hooks/useLoginRelays.tsx index 0438fa49..389b5f2a 100644 --- a/packages/app/src/Hooks/useLoginRelays.tsx +++ b/packages/app/src/Hooks/useLoginRelays.tsx @@ -5,27 +5,27 @@ import useEventPublisher from "./useEventPublisher"; import useLogin from "./useLogin"; export function useLoginRelays() { - const { relays } = useLogin(); + const relays = useLogin(s => s.relays.item); const { system } = useEventPublisher(); useEffect(() => { if (relays) { - updateRelayConnections(system, relays.item).catch(console.error); + updateRelayConnections(system, relays).catch(console.error); } }, [relays]); } export async function updateRelayConnections(system: SystemInterface, relays: Record) { - if (SINGLE_RELAY) { - system.ConnectToRelay(SINGLE_RELAY, { read: true, write: true }); + if (import.meta.env.VITE_SINGLE_RELAY) { + system.ConnectToRelay(import.meta.env.VITE_SINGLE_RELAY, { read: true, write: true }); } else { for (const [k, v] of Object.entries(relays)) { // note: don't awit this, causes race condition with sending requests to relays system.ConnectToRelay(k, v); } - for (const v of system.Sockets) { - if (!relays[v.address] && !v.ephemeral) { - system.DisconnectRelay(v.address); + for (const [k, v] of system.pool) { + if (!relays[k] && !v.Ephemeral) { + system.DisconnectRelay(k); } } } diff --git a/packages/app/src/Pages/Root/GlobalTab.tsx b/packages/app/src/Pages/Root/GlobalTab.tsx index 721bd7b4..f73be6f4 100644 --- a/packages/app/src/Pages/Root/GlobalTab.tsx +++ b/packages/app/src/Pages/Root/GlobalTab.tsx @@ -4,6 +4,7 @@ import { useContext, useEffect, useMemo, useState } from "react"; import { FormattedMessage } from "react-intl"; import Timeline from "@/Components/Feed/Timeline"; +import { TimelineSubject } from "@/Feed/TimelineFeed"; import useHistoryState from "@/Hooks/useHistoryState"; import useLogin from "@/Hooks/useLogin"; import { debounce, getRelayName, sha256 } from "@/Utils"; @@ -15,8 +16,8 @@ interface RelayOption { export const GlobalTab = () => { const { relays } = useLogin(); - const [relay, setRelay] = useHistoryState(undefined, "global-relay"); - const [allRelays, setAllRelays] = useHistoryState(undefined, "global-relay-options"); + const [relay, setRelay] = useHistoryState(undefined, "global-relay"); + const [allRelays, setAllRelays] = useHistoryState(undefined, "global-relay-options"); const [now] = useState(unixNow()); const system = useContext(SnortContext); @@ -62,11 +63,11 @@ export const GlobalTab = () => { useEffect(() => { return debounce(500, () => { const ret: RelayOption[] = []; - system.Sockets.forEach(v => { - if (v.connected) { + [...system.pool].forEach(([, v]) => { + if (!v.IsClosed) { ret.push({ - url: v.address, - paid: v.info?.limitation?.payment_required ?? false, + url: v.Address, + paid: v.Info?.limitation?.payment_required ?? false, }); } }); @@ -80,12 +81,13 @@ export const GlobalTab = () => { }, [relays, relay]); const subject = useMemo( - () => ({ - type: "global", - items: [], - relay: [relay?.url], - discriminator: `all-${sha256(relay?.url ?? "")}`, - }), + () => + ({ + type: "global", + items: [], + relay: [relay?.url], + discriminator: `all-${sha256(relay?.url ?? "")}`, + }) as TimelineSubject, [relay?.url], ); diff --git a/packages/app/src/Pages/ZapPool.tsx b/packages/app/src/Pages/ZapPool.tsx index a63720e1..62f07261 100644 --- a/packages/app/src/Pages/ZapPool.tsx +++ b/packages/app/src/Pages/ZapPool.tsx @@ -75,14 +75,15 @@ function ZapPoolPageInner() { const { wallet } = useWallet(); const relayConnections = useMemo(() => { - return system.Sockets.map(a => { - if (a.info?.pubkey && !a.ephemeral) { - return { - address: a.address, - pubkey: a.info.pubkey, - }; - } - }) + return [...system.pool] + .map(([, a]) => { + if (a.Info?.pubkey && !a.Ephemeral) { + return { + address: a.Address, + pubkey: a.Info.pubkey, + }; + } + }) .filter(a => a !== undefined) .map(unwrap); }, [login.relays]); @@ -131,7 +132,7 @@ function ZapPoolPageInner() { nOut: ( ), diff --git a/packages/app/src/Pages/settings/RelayInfo.tsx b/packages/app/src/Pages/settings/RelayInfo.tsx index 5bb69aec..3b9bf620 100644 --- a/packages/app/src/Pages/settings/RelayInfo.tsx +++ b/packages/app/src/Pages/settings/RelayInfo.tsx @@ -16,66 +16,66 @@ const RelayInfo = () => { const login = useLogin(); const { system } = useEventPublisher(); - const conn = system.Sockets.find(a => a.id === params.id); - const stats = useRelayState(conn?.address ?? ""); + const conn = [...system.pool].find(([, a]) => a.Id === params.id)?.[1]; + const stats = useRelayState(conn?.Address ?? ""); return ( <>

navigate("/settings/relays")}>

-

{stats?.info?.name}

-

{stats?.info?.description}

+

{stats?.Info?.name}

+

{stats?.Info?.description}

- {stats?.info?.pubkey && ( + {stats?.Info?.pubkey && ( <>

- + )} - {stats?.info?.software && ( + {stats?.Info?.software && (

- {stats.info.software.startsWith("http") ? ( - - {stats.info.software} + {stats.Info.software.startsWith("http") ? ( + + {stats.Info.software} ) : ( - <>{stats.info.software} + <>{stats.Info.software} )} - {!stats.info.version?.startsWith("v") && "v"} - {stats.info.version} + {!stats.Info.version?.startsWith("v") && "v"} + {stats.Info.version}
)} - {stats?.info?.contact && ( + {stats?.Info?.contact && ( )} - {stats?.info?.supported_nips && ( + {stats?.Info?.supported_nips && ( <>

- {stats.info.supported_nips.map(a => ( + {stats.Info?.supported_nips?.map(a => ( NIP-{a.toString().padStart(2, "0")} @@ -87,7 +87,7 @@ const RelayInfo = () => {
- {stats?.activeRequests.map(a => ( + {[...(stats?.ActiveRequests ?? [])].map(a => ( {a} @@ -97,9 +97,9 @@ const RelayInfo = () => {
- {stats?.pendingRequests.map(a => ( - - {a} + {stats?.PendingRequests?.map(a => ( + + {a.obj[1]} ))}
@@ -107,7 +107,7 @@ const RelayInfo = () => {
{ - removeRelay(login, unwrap(conn).address); + removeRelay(login, unwrap(conn).Address); navigate("/settings/relays"); }}> diff --git a/packages/app/src/Pages/settings/Relays.tsx b/packages/app/src/Pages/settings/Relays.tsx index d6a095a6..43ea43bf 100644 --- a/packages/app/src/Pages/settings/Relays.tsx +++ b/packages/app/src/Pages/settings/Relays.tsx @@ -21,7 +21,7 @@ const RelaySettingsPage = () => { const [newRelay, setNewRelay] = useState(); const otherConnections = useMemo(() => { - return system.Sockets.filter(a => relays.item[a.address] === undefined); + return [...system.pool].filter(([k]) => relays.item[k] === undefined).map(([, v]) => v); }, [relays]); const handleNewRelayChange = (event: React.ChangeEvent) => { @@ -83,7 +83,7 @@ const RelaySettingsPage = () => {
{otherConnections.map(a => ( - + ))}
diff --git a/packages/app/src/Utils/Login/Functions.ts b/packages/app/src/Utils/Login/Functions.ts index f92bbd30..078cac50 100644 --- a/packages/app/src/Utils/Login/Functions.ts +++ b/packages/app/src/Utils/Login/Functions.ts @@ -23,9 +23,9 @@ import { SubscriptionEvent } from "@/Utils/Subscription"; import { Nip7OsSigner } from "./Nip7OsSigner"; export function setRelays(state: LoginSession, relays: Record, createdAt: number) { - if (SINGLE_RELAY) { + if (import.meta.env.VITE_SINGLE_RELAY) { state.relays.item = { - [SINGLE_RELAY]: { read: true, write: true }, + [import.meta.env.VITE_SINGLE_RELAY]: { read: true, write: true }, }; state.relays.timestamp = 100; LoginStore.updateSession(state); diff --git a/packages/app/src/Utils/Login/MultiAccountStore.ts b/packages/app/src/Utils/Login/MultiAccountStore.ts index c6f32742..dacd3976 100644 --- a/packages/app/src/Utils/Login/MultiAccountStore.ts +++ b/packages/app/src/Utils/Login/MultiAccountStore.ts @@ -183,7 +183,7 @@ export class MultiAccountStore extends ExternalStore { } decideInitRelays(relays: Record | undefined): Record { - if (SINGLE_RELAY) return { [SINGLE_RELAY]: { read: true, write: true } }; + if (import.meta.env.VITE_SINGLE_RELAY) return { [import.meta.env.VITE_SINGLE_RELAY]: { read: true, write: true } }; if (relays && Object.keys(relays).length > 0) { return relays; } diff --git a/packages/app/src/Utils/Zapper.ts b/packages/app/src/Utils/Zapper.ts index 0005f51b..57fc423a 100644 --- a/packages/app/src/Utils/Zapper.ts +++ b/packages/app/src/Utils/Zapper.ts @@ -104,7 +104,7 @@ export class Zapper { if (!svc) { throw new Error(`Failed to get invoice from ${t.value}`); } - const relays = this.system.Sockets.filter(a => !a.ephemeral).map(v => v.address); + const relays = [...this.system.pool].filter(([, v]) => !v.Ephemeral).map(([k]) => k); const pub = t.zap?.anon ?? false ? EventPublisher.privateKey(generateRandomKey().privateKey) : this.publisher; const zap = t.zap && svc.canZap @@ -199,7 +199,7 @@ export class Zapper { await svc.load(); return svc; } else if (t.type === "pubkey") { - const profile = await this.system.ProfileLoader.fetchProfile(t.value); + const profile = await this.system.profileLoader.fetch(t.value); if (profile) { const svc = new LNURL(profile.lud16 ?? profile.lud06 ?? ""); await svc.load(); diff --git a/packages/app/vite.config.ts b/packages/app/vite.config.ts index 3747bc0f..3b808780 100644 --- a/packages/app/vite.config.ts +++ b/packages/app/vite.config.ts @@ -60,7 +60,6 @@ export default defineConfig({ define: { CONFIG: JSON.stringify(appConfig), global: {}, // needed for custom-event lib - SINGLE_RELAY: JSON.stringify(process.env.SINGLE_RELAY), }, test: { globals: true, diff --git a/packages/system/package.json b/packages/system/package.json index ee6764ab..8f3606b9 100644 --- a/packages/system/package.json +++ b/packages/system/package.json @@ -20,7 +20,7 @@ "@jest/globals": "^29.5.0", "@peculiar/webcrypto": "^1.4.3", "@types/debug": "^4.1.8", - "@types/jest": "^29.5.1", + "@types/jest": "^29.5.11", "@types/lokijs": "^1.5.14", "@types/node": "^20.5.9", "@types/uuid": "^9.0.2", diff --git a/packages/system/src/connection-pool.ts b/packages/system/src/connection-pool.ts index bf99ea7c..e7a6e9f8 100644 --- a/packages/system/src/connection-pool.ts +++ b/packages/system/src/connection-pool.ts @@ -2,7 +2,7 @@ import { removeUndefined, sanitizeRelayUrl, unwrap } from "@snort/shared"; import debug from "debug"; import EventEmitter from "eventemitter3"; -import { Connection, ConnectionStateSnapshot, RelaySettings } from "./connection"; +import { Connection, RelaySettings } from "./connection"; import { NostrEvent, OkResponse, TaggedNostrEvent } from "./nostr"; import { pickRelaysForReply } from "./outbox-model"; import { SystemInterface } from "."; @@ -18,7 +18,6 @@ export interface NostrConnectionPoolEvents { } export type ConnectionPool = { - getState(): ConnectionStateSnapshot[]; getConnection(id: string): Connection | undefined; connect(address: string, options: RelaySettings, ephemeral: boolean): Promise; disconnect(address: string): void; @@ -45,13 +44,6 @@ export class DefaultConnectionPool extends EventEmitter a.takeSnapshot()); - } - /** * Get a connection object from the pool */ @@ -81,7 +73,7 @@ export class DefaultConnectionPool extends EventEmitter this.emit("disconnect", addr, code)); c.on("connected", r => this.emit("connected", addr, r)); c.on("auth", (cx, r, cb) => this.emit("auth", addr, cx, r, cb)); - await c.Connect(); + await c.connect(); return c; } else { // update settings if already connected @@ -107,7 +99,7 @@ export class DefaultConnectionPool extends EventEmitter { try { - const rsp = await s.SendAsync(ev); + const rsp = await s.sendEventAsync(ev); cb?.(rsp); return rsp; } catch (e) { @@ -145,7 +137,7 @@ export class DefaultConnectionPool extends EventEmitter((resolve, reject) => { const c = new Connection(address, { write: true, read: true }, true); @@ -153,11 +145,11 @@ export class DefaultConnectionPool extends EventEmitter { clearTimeout(t); - const rsp = await c.SendAsync(ev); - c.Close(); + const rsp = await c.sendEventAsync(ev); + c.close(); resolve(rsp); }); - c.Connect(); + c.connect(); }); } } diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 120ff048..96a21e93 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -5,11 +5,11 @@ import { unixNowMs, dedupe } from "@snort/shared"; import EventEmitter from "eventemitter3"; import { DefaultConnectTimeout } from "./const"; -import { ConnectionStats } from "./connection-stats"; import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr"; import { RelayInfo } from "./relay-info"; import EventKind from "./event-kind"; import { EventExt } from "./event-ext"; +import { NegentropyFlow } from "./negentropy/negentropy-flow"; /** * Relay settings @@ -19,28 +19,8 @@ export interface RelaySettings { write: boolean; } -/** - * Snapshot of connection stats - */ -export interface ConnectionStateSnapshot { - connected: boolean; - disconnects: number; - avgLatency: number; - events: { - received: number; - send: number; - }; - settings?: RelaySettings; - info?: RelayInfo; - pendingRequests: Array; - activeRequests: Array; - id: string; - ephemeral: boolean; - address: string; -} - interface ConnectionEvents { - change: (snapshot: ConnectionStateSnapshot) => void; + change: () => void; connected: (wasReconnect: boolean) => void; event: (sub: string, e: TaggedNostrEvent) => void; eose: (sub: string) => void; @@ -48,6 +28,13 @@ interface ConnectionEvents { disconnect: (code: number) => void; auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void; notice: (msg: string) => void; + unknownMessage: (obj: Array) => void; +} + +export type SyncCommand = ["SYNC", id: string, fromSet: Array, ...filters: Array]; +interface ConnectionQueueItem { + obj: ReqCommand | SyncCommand; + cb: () => void; } export class Connection extends EventEmitter { @@ -58,20 +45,16 @@ export class Connection extends EventEmitter { #ephemeral: boolean; Id: string; - Address: string; + readonly Address: string; Socket: WebSocket | null = null; PendingRaw: Array = []; - PendingRequests: Array<{ - cmd: ReqCommand; - cb: () => void; - }> = []; + PendingRequests: Array = []; ActiveRequests = new Set(); Settings: RelaySettings; Info?: RelayInfo; ConnectTimeout: number = DefaultConnectTimeout; - Stats: ConnectionStats = new ConnectionStats(); HasStateChange: boolean = true; IsClosed: boolean; ReconnectTimer?: ReturnType; @@ -102,7 +85,7 @@ export class Connection extends EventEmitter { this.#setupEphemeral(); } - async Connect() { + async connect() { try { if (this.Info === undefined) { const u = new URL(this.Address); @@ -136,19 +119,18 @@ export class Connection extends EventEmitter { } this.IsClosed = false; this.Socket = new WebSocket(this.Address); - this.Socket.onopen = () => this.OnOpen(wasReconnect); - this.Socket.onmessage = e => this.OnMessage(e); - this.Socket.onerror = e => this.OnError(e); - this.Socket.onclose = e => this.OnClose(e); + this.Socket.onopen = () => this.#onOpen(wasReconnect); + this.Socket.onmessage = e => this.#onMessage(e); + this.Socket.onerror = e => this.#onError(e); + this.Socket.onclose = e => this.#onClose(e); } - Close() { + close() { this.IsClosed = true; this.Socket?.close(); - this.notifyChange(); } - OnOpen(wasReconnect: boolean) { + #onOpen(wasReconnect: boolean) { this.ConnectTimeout = DefaultConnectTimeout; this.#log(`Open!`); this.Down = false; @@ -157,7 +139,7 @@ export class Connection extends EventEmitter { this.#sendPendingRaw(); } - OnClose(e: WebSocket.CloseEvent) { + #onClose(e: WebSocket.CloseEvent) { if (this.ReconnectTimer) { clearTimeout(this.ReconnectTimer); this.ReconnectTimer = undefined; @@ -174,12 +156,12 @@ export class Connection extends EventEmitter { ); this.ReconnectTimer = setTimeout(() => { try { - this.Connect(); + this.connect(); } catch { this.emit("disconnect", -1); } }, this.ConnectTimeout); - this.Stats.Disconnects++; + // todo: stats disconnect } else { this.#log(`Closed!`); this.ReconnectTimer = undefined; @@ -187,10 +169,9 @@ export class Connection extends EventEmitter { this.emit("disconnect", e.code); this.#reset(); - this.notifyChange(); } - OnMessage(e: WebSocket.MessageEvent) { + #onMessage(e: WebSocket.MessageEvent) { this.#activity = unixNowMs(); if ((e.data as string).length > 0) { const msg = JSON.parse(e.data as string) as Array; @@ -201,8 +182,7 @@ export class Connection extends EventEmitter { this.#onAuthAsync(msg[1] as string) .then(() => this.#sendPendingRaw()) .catch(this.#log); - this.Stats.EventsReceived++; - this.notifyChange(); + // todo: stats events received } else { this.#log("Ignoring unexpected AUTH request"); } @@ -219,8 +199,7 @@ export class Connection extends EventEmitter { return; } this.emit("event", msg[1] as string, ev); - this.Stats.EventsReceived++; - this.notifyChange(); + // todo: stats events received break; } case "EOSE": { @@ -250,34 +229,34 @@ export class Connection extends EventEmitter { } default: { this.#log(`Unknown tag: ${tag}`); + this.emit("unknownMessage", msg); break; } } } } - OnError(e: WebSocket.Event) { + #onError(e: WebSocket.Event) { this.#log("Error: %O", e); - this.notifyChange(); + this.emit("change"); } /** * Send event on this connection */ - SendEvent(e: NostrEvent) { + sendEvent(e: NostrEvent) { if (!this.Settings.write) { return; } - const req = ["EVENT", e]; - this.#sendJson(req); - this.Stats.EventsSent++; - this.notifyChange(); + this.send(["EVENT", e]); + // todo: stats events send + this.emit("change"); } /** * Send event on this connection and wait for OK response */ - async SendAsync(e: NostrEvent, timeout = 5000) { + async sendEventAsync(e: NostrEvent, timeout = 5000) { return await new Promise((resolve, reject) => { if (!this.Settings.write) { reject(new Error("Not a write relay")); @@ -317,17 +296,16 @@ export class Connection extends EventEmitter { }); }); - const req = ["EVENT", e]; - this.#sendJson(req); - this.Stats.EventsSent++; - this.notifyChange(); + this.send(["EVENT", e]); + // todo: stats events send + this.emit("change"); }); } /** * Using relay document to determine if this relay supports a feature */ - SupportsNip(n: number) { + supportsNip(n: number) { return this.Info?.supported_nips?.some(a => a === n) ?? false; } @@ -335,7 +313,7 @@ export class Connection extends EventEmitter { * Queue or send command to the relay * @param cmd The REQ to send to the server */ - QueueReq(cmd: ReqCommand, cbSent: () => void) { + queueReq(cmd: ReqCommand | SyncCommand, cbSent: () => void) { const requestKinds = dedupe( cmd .slice(2) @@ -349,63 +327,64 @@ export class Connection extends EventEmitter { } if (this.ActiveRequests.size >= this.#maxSubscriptions) { this.PendingRequests.push({ - cmd, + obj: cmd, cb: cbSent, }); this.#log("Queuing: %O", cmd); } else { this.ActiveRequests.add(cmd[1]); - this.#sendJson(cmd); + this.#sendRequestCommand(cmd); cbSent(); } - this.notifyChange(); + this.emit("change"); } - CloseReq(id: string) { + closeReq(id: string) { if (this.ActiveRequests.delete(id)) { - this.#sendJson(["CLOSE", id]); + this.send(["CLOSE", id]); this.emit("eose", id); - this.#SendQueuedRequests(); + this.#sendQueuedRequests(); + this.emit("change"); } - this.notifyChange(); } - takeSnapshot(): ConnectionStateSnapshot { - return { - connected: this.Socket?.readyState === WebSocket.OPEN, - events: { - received: this.Stats.EventsReceived, - send: this.Stats.EventsSent, - }, - avgLatency: - this.Stats.Latency.length > 0 - ? this.Stats.Latency.reduce((acc, v) => acc + v, 0) / this.Stats.Latency.length - : 0, - disconnects: this.Stats.Disconnects, - info: this.Info, - id: this.Id, - pendingRequests: [...this.PendingRequests.map(a => a.cmd[1])], - activeRequests: [...this.ActiveRequests], - ephemeral: this.Ephemeral, - address: this.Address, - }; - } - - #SendQueuedRequests() { + #sendQueuedRequests() { const canSend = this.#maxSubscriptions - this.ActiveRequests.size; if (canSend > 0) { for (let x = 0; x < canSend; x++) { const p = this.PendingRequests.shift(); if (p) { - this.ActiveRequests.add(p.cmd[1]); - this.#sendJson(p.cmd); + this.#sendRequestCommand(p.obj); p.cb(); - this.#log("Sent pending REQ %O", p.cmd); + this.#log("Sent pending REQ %O", p.obj); } } } } + #sendRequestCommand(cmd: ReqCommand | SyncCommand) { + try { + if (cmd[0] === "REQ") { + this.ActiveRequests.add(cmd[1]); + this.send(cmd); + } else if (cmd[0] === "SYNC") { + if (this.Info?.software?.includes("strfry")) { + const neg = new NegentropyFlow(cmd[1], this, cmd[2], cmd.slice(3) as Array); + neg.once("finish", filters => { + if (filters.length > 0) { + this.queueReq(["REQ", cmd[1], ...filters], () => {}); + } + }); + neg.start(); + } else { + throw new Error("SYNC not supported"); + } + } + } catch (e) { + console.error(e); + } + } + #reset() { // reset connection Id on disconnect, for query-tracking this.Id = uuid(); @@ -413,15 +392,15 @@ export class Connection extends EventEmitter { this.ActiveRequests.clear(); this.PendingRequests = []; this.PendingRaw = []; - this.notifyChange(); + this.emit("change"); } - #sendJson(obj: object) { + send(obj: object) { const authPending = !this.Authed && (this.AwaitingAuth.size > 0 || this.Info?.limitation?.auth_required === true); if (!this.Socket || this.Socket?.readyState !== WebSocket.OPEN || authPending) { this.PendingRaw.push(obj); if (this.Socket?.readyState === WebSocket.CLOSED && this.Ephemeral && this.IsClosed) { - this.Connect(); + this.connect(); } return false; } @@ -498,14 +477,10 @@ export class Connection extends EventEmitter { if (this.ActiveRequests.size > 0) { this.#log("Inactive connection has %d active requests! %O", this.ActiveRequests.size, this.ActiveRequests); } else { - this.Close(); + this.close(); } } }, 5_000); } } - - notifyChange() { - this.emit("change", this.takeSnapshot()); - } } diff --git a/packages/system/src/impl/nip46.ts b/packages/system/src/impl/nip46.ts index e9dd3e65..1134c357 100644 --- a/packages/system/src/impl/nip46.ts +++ b/packages/system/src/impl/nip46.ts @@ -89,7 +89,7 @@ export class Nip46Signer implements EventSigner { await this.#onReply(e); }); this.#conn.on("connected", async () => { - this.#conn!.QueueReq( + this.#conn!.queueReq( [ "REQ", "reply", @@ -111,7 +111,7 @@ export class Nip46Signer implements EventSigner { }); } }); - this.#conn.Connect(); + this.#conn.connect(); this.#didInit = true; }); } @@ -119,8 +119,8 @@ export class Nip46Signer implements EventSigner { async close() { if (this.#conn) { await this.#disconnect(); - this.#conn.CloseReq("reply"); - this.#conn.Close(); + this.#conn.closeReq("reply"); + this.#conn.close(); this.#conn = undefined; this.#didInit = false; } @@ -236,6 +236,6 @@ export class Nip46Signer implements EventSigner { this.#log("Send: %O", payload); const evCommand = await eb.buildAndSign(this.#insideSigner); - await this.#conn.SendAsync(evCommand); + await this.#conn.sendEventAsync(evCommand); } } diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index dde02769..67a2388a 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -1,4 +1,4 @@ -import { RelaySettings, ConnectionStateSnapshot } from "./connection"; +import { RelaySettings } from "./connection"; import { RequestBuilder } from "./request-builder"; import { NostrEvent, OkResponse, ReqFilter, TaggedNostrEvent } from "./nostr"; import { ProfileLoaderService } from "./profile-cache"; @@ -65,11 +65,6 @@ export interface SystemInterface { */ checkSigs: boolean; - /** - * Get a snapshot of the relay connections - */ - get Sockets(): Array; - /** * Do some initialization */ diff --git a/packages/system/src/negentropy/accumulator.ts b/packages/system/src/negentropy/accumulator.ts new file mode 100644 index 00000000..2143c068 --- /dev/null +++ b/packages/system/src/negentropy/accumulator.ts @@ -0,0 +1,60 @@ +import { sha256 } from "@noble/hashes/sha256"; +import { encodeVarInt, FINGERPRINT_SIZE } from "./utils"; + +export class Accumulator { + #buf!: Uint8Array; + + constructor() { + this.setToZero(); + } + + setToZero() { + this.#buf = new Uint8Array(32); + } + + add(otherBuf: Uint8Array) { + let currCarry = 0, + nextCarry = 0; + const p = new DataView(this.#buf.buffer); + const po = new DataView(otherBuf.buffer); + + for (let i = 0; i < 8; i++) { + const offset = i * 4; + const orig = p.getUint32(offset, true); + const otherV = po.getUint32(offset, true); + + let next = orig; + + next += currCarry; + next += otherV; + if (next > 4294967295) nextCarry = 1; + + p.setUint32(offset, next & 4294967295, true); + currCarry = nextCarry; + nextCarry = 0; + } + } + + negate() { + const p = new DataView(this.#buf.buffer); + + for (let i = 0; i < 8; i++) { + let offset = i * 4; + p.setUint32(offset, ~p.getUint32(offset, true)); + } + + const one = new Uint8Array(32); + one[0] = 1; + this.add(one); + } + + getFingerprint(n: number) { + const varInt = encodeVarInt(n); + const copy = new Uint8Array(this.#buf.length + varInt.length); + copy.set(this.#buf); + copy.set(varInt, this.#buf.length); + + const hash = sha256(copy); + return hash.subarray(0, FINGERPRINT_SIZE); + } +} diff --git a/packages/system/src/negentropy/negentropy-flow.ts b/packages/system/src/negentropy/negentropy-flow.ts new file mode 100644 index 00000000..7d0f34fc --- /dev/null +++ b/packages/system/src/negentropy/negentropy-flow.ts @@ -0,0 +1,94 @@ +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; +import { Connection } from "../connection"; +import { ReqFilter, TaggedNostrEvent } from "../nostr"; +import { Negentropy } from "./negentropy"; +import { NegentropyStorageVector } from "./vector-storage"; +import debug from "debug"; +import EventEmitter from "eventemitter3"; + +export interface NegentropyFlowEvents { + /** + * When sync is finished emit a set of filters which can resolve sync + */ + finish: (req: Array) => void; +} + +/** + * Negentropy sync flow on connection + */ +export class NegentropyFlow extends EventEmitter { + readonly idSize: number = 16; + #log = debug("NegentropyFlow"); + #id: string; + #connection: Connection; + #filters: Array; + #negentropy: Negentropy; + #need: Array = []; + + constructor(id: string, conn: Connection, set: Array, filters: Array) { + super(); + this.#id = id; + this.#connection = conn; + this.#filters = filters; + + this.#connection.on("unknownMessage", this.#handleMessage.bind(this)); + this.#connection.on("notice", n => this.#handleMessage.bind(this)); + + const storage = new NegentropyStorageVector(); + set.forEach(a => storage.insert(a.created_at, a.id)); + storage.seal(); + this.#negentropy = new Negentropy(storage, 50_000); + } + + /** + * Start sync + */ + start() { + const init = this.#negentropy.initiate(); + this.#connection.send(["NEG-OPEN", this.#id, this.#filters, bytesToHex(init)]); + } + + #handleMessage(msg: Array) { + try { + switch (msg[0] as string) { + case "NOTICE": { + if ((msg[1] as string).includes("negentropy disabled")) { + this.#log("SYNC ERROR: %s", msg[1]); + this.#cleanup(); + } + break; + } + case "NEG-ERROR": { + if (msg[1] !== this.#id) break; + this.#log("SYNC ERROR %s", msg[2]); + this.#cleanup(); + break; + } + case "NEG-MSG": { + if (msg[1] !== this.#id) break; + const query = hexToBytes(msg[2] as string); + const [nextMsg, _, need] = this.#negentropy.reconcile(query); + if (need.length > 0) { + this.#need.push(...need.map(bytesToHex)); + } + if (nextMsg) { + this.#connection.send(["NEG-MSG", this.#id, bytesToHex(nextMsg)]); + } else { + this.#connection.send(["NEG-CLOSE", this.#id]); + this.#cleanup(); + } + break; + } + } + } catch (e) { + debugger; + console.error(e); + } + } + + #cleanup() { + this.#connection.off("unknownMessage", this.#handleMessage.bind(this)); + this.#connection.off("notice", n => this.#handleMessage.bind(this)); + this.emit("finish", this.#need.length > 0 ? [{ ids: this.#need }] : []); + } +} diff --git a/packages/system/src/negentropy/negentropy.ts b/packages/system/src/negentropy/negentropy.ts new file mode 100644 index 00000000..144ecd22 --- /dev/null +++ b/packages/system/src/negentropy/negentropy.ts @@ -0,0 +1,303 @@ +import { bytesToHex } from "@noble/hashes/utils"; +import { WrappedBuffer } from "./wrapped-buffer"; +import { NegentropyStorageVector, VectorStorageItem } from "./vector-storage"; +import { + PROTOCOL_VERSION, + getByte, + encodeVarInt, + Mode, + decodeVarInt, + getBytes, + FINGERPRINT_SIZE, + compareUint8Array, +} from "./utils"; + +export class Negentropy { + readonly #storage: NegentropyStorageVector; + readonly #frameSizeLimit: number; + #lastTimestampIn: number; + #lastTimestampOut: number; + #isInitiator: boolean = false; + + constructor(storage: NegentropyStorageVector, frameSizeLimit = 0) { + if (frameSizeLimit !== 0 && frameSizeLimit < 4096) throw Error("frameSizeLimit too small"); + + this.#storage = storage; + this.#frameSizeLimit = frameSizeLimit; + + this.#lastTimestampIn = 0; + this.#lastTimestampOut = 0; + } + + #bound(timestamp: number, id?: Uint8Array) { + return { timestamp, id: id ? id : new Uint8Array(0) }; + } + + initiate() { + if (this.#isInitiator) throw Error("already initiated"); + this.#isInitiator = true; + + const output = new WrappedBuffer(); + output.set([PROTOCOL_VERSION]); + + this.splitRange(0, this.#storage.size(), this.#bound(Number.MAX_VALUE), output); + + return this.#renderOutput(output); + } + + setInitiator() { + this.#isInitiator = true; + } + + reconcile(query: WrappedBuffer | Uint8Array): [Uint8Array | undefined, Array, Array] { + let haveIds: Array = [], + needIds: Array = []; + query = query instanceof WrappedBuffer ? query : new WrappedBuffer(query); + + this.#lastTimestampIn = this.#lastTimestampOut = 0; // reset for each message + + const fullOutput = new WrappedBuffer(); + fullOutput.set([PROTOCOL_VERSION]); + + const protocolVersion = getByte(query); + if (protocolVersion < 96 || protocolVersion > 111) throw Error("invalid negentropy protocol version byte"); + if (protocolVersion !== PROTOCOL_VERSION) { + if (this.#isInitiator) + throw Error("unsupported negentropy protocol version requested: " + (protocolVersion - 96)); + else return [this.#renderOutput(fullOutput), haveIds, needIds]; + } + + const storageSize = this.#storage.size(); + let prevBound = this.#bound(0); + let prevIndex = 0; + let skip = false; + + while (query.length !== 0) { + let o = new WrappedBuffer(); + + let doSkip = () => { + if (skip) { + skip = false; + o.append(this.encodeBound(prevBound)); + o.append(encodeVarInt(Mode.Skip)); + } + }; + + let currBound = this.decodeBound(query); + let mode = query.length === 0 ? 0 : decodeVarInt(query); + + let lower = prevIndex; + let upper = this.#storage.findLowerBound(prevIndex, storageSize, currBound); + + if (mode === Mode.Skip) { + skip = true; + } else if (mode === Mode.Fingerprint) { + let theirFingerprint = getBytes(query, FINGERPRINT_SIZE); + let ourFingerprint = this.#storage.fingerprint(lower, upper); + + if (compareUint8Array(theirFingerprint, ourFingerprint) !== 0) { + doSkip(); + this.splitRange(lower, upper, currBound, o); + } else { + skip = true; + } + } else if (mode === Mode.IdList) { + let numIds = decodeVarInt(query); + + let theirElems = {} as Record; // stringified Uint8Array -> original Uint8Array (or hex) + for (let i = 0; i < numIds; i++) { + let e = getBytes(query, this.#storage.idSize); + theirElems[bytesToHex(e)] = e; + } + + this.#storage.iterate(lower, upper, item => { + let k = bytesToHex(item.id); + if (!theirElems[k]) { + // ID exists on our side, but not their side + if (this.#isInitiator) haveIds.push(item.id); + } else { + // ID exists on both sides + delete theirElems[k]; + } + + return true; + }); + + if (this.#isInitiator) { + skip = true; + + for (let v of Object.values(theirElems)) { + // ID exists on their side, but not our side + needIds.push(v); + } + } else { + doSkip(); + + let responseIds = new WrappedBuffer(); + let numResponseIds = 0; + let endBound = currBound; + + this.#storage.iterate(lower, upper, (item, index) => { + if (this.exceededFrameSizeLimit(fullOutput.length + responseIds.length)) { + endBound = item; + upper = index; // shrink upper so that remaining range gets correct fingerprint + return false; + } + + responseIds.append(item.id); + numResponseIds++; + return true; + }); + + o.append(this.encodeBound(endBound)); + o.append(encodeVarInt(Mode.IdList)); + o.append(encodeVarInt(numResponseIds)); + o.append(responseIds.unwrap()); + + fullOutput.append(o.unwrap()); + o.clear(); + } + } else { + throw Error("unexpected mode"); + } + + if (this.exceededFrameSizeLimit(fullOutput.length + o.length)) { + // frameSizeLimit exceeded: Stop range processing and return a fingerprint for the remaining range + let remainingFingerprint = this.#storage.fingerprint(upper, storageSize); + + fullOutput.append(this.encodeBound(this.#bound(Number.MAX_VALUE))); + fullOutput.append(encodeVarInt(Mode.Fingerprint)); + fullOutput.append(remainingFingerprint); + break; + } else { + fullOutput.append(o.unwrap()); + } + + prevIndex = upper; + prevBound = currBound; + } + + return [ + fullOutput.length === 1 && this.#isInitiator ? undefined : this.#renderOutput(fullOutput), + haveIds, + needIds, + ]; + } + + async splitRange(lower: number, upper: number, upperBound: VectorStorageItem, o: WrappedBuffer) { + const numElems = upper - lower; + const buckets = 16; + + if (numElems < buckets * 2) { + o.append(this.encodeBound(upperBound)); + o.append(encodeVarInt(Mode.IdList)); + + o.append(encodeVarInt(numElems)); + this.#storage.iterate(lower, upper, item => { + o.append(item.id); + return true; + }); + } else { + const itemsPerBucket = Math.floor(numElems / buckets); + const bucketsWithExtra = numElems % buckets; + let curr = lower; + + for (let i = 0; i < buckets; i++) { + let bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); + let ourFingerprint = this.#storage.fingerprint(curr, curr + bucketSize); + curr += bucketSize; + + let nextBound; + + if (curr === upper) { + nextBound = upperBound; + } else { + let prevItem: VectorStorageItem, currItem: VectorStorageItem; + + this.#storage.iterate(curr - 1, curr + 1, (item, index) => { + if (index === curr - 1) prevItem = item; + else currItem = item; + return true; + }); + + nextBound = this.getMinimalBound(prevItem!, currItem!); + } + + o.append(this.encodeBound(nextBound)); + o.append(encodeVarInt(Mode.Fingerprint)); + o.append(ourFingerprint); + } + } + } + + #renderOutput(o: WrappedBuffer) { + return o.unwrap(); + } + + exceededFrameSizeLimit(n: number) { + return this.#frameSizeLimit && n > this.#frameSizeLimit - 200; + } + + // Decoding + decodeTimestampIn(encoded: Uint8Array | WrappedBuffer) { + let timestamp = decodeVarInt(encoded); + timestamp = timestamp === 0 ? Number.MAX_VALUE : timestamp - 1; + if (this.#lastTimestampIn === Number.MAX_VALUE || timestamp === Number.MAX_VALUE) { + this.#lastTimestampIn = Number.MAX_VALUE; + return Number.MAX_VALUE; + } + timestamp += this.#lastTimestampIn; + this.#lastTimestampIn = timestamp; + return timestamp; + } + + decodeBound(encoded: Uint8Array | WrappedBuffer) { + const timestamp = this.decodeTimestampIn(encoded); + const len = decodeVarInt(encoded); + if (len > this.#storage.idSize) throw Error("bound key too long"); + const id = new Uint8Array(this.#storage.idSize); + const encodedId = getBytes(encoded, Math.min(len, encoded.length)); + id.set(encodedId); + return { timestamp, id }; + } + + // Encoding + encodeTimestampOut(timestamp: number) { + if (timestamp === Number.MAX_VALUE) { + this.#lastTimestampOut = Number.MAX_VALUE; + return encodeVarInt(0); + } + + let temp = timestamp; + timestamp -= this.#lastTimestampOut; + this.#lastTimestampOut = temp; + return encodeVarInt(timestamp + 1); + } + + encodeBound(key: VectorStorageItem) { + const tsBytes = this.encodeTimestampOut(key.timestamp); + const idLenBytes = encodeVarInt(key.id.length); + const output = new Uint8Array(tsBytes.length + idLenBytes.length + key.id.length); + output.set(tsBytes); + output.set(idLenBytes, tsBytes.length); + output.set(key.id, tsBytes.length + idLenBytes.length); + return output; + } + + getMinimalBound(prev: VectorStorageItem, curr: VectorStorageItem) { + if (curr.timestamp !== prev.timestamp) { + return this.#bound(curr.timestamp); + } else { + let sharedPrefixBytes = 0; + let currKey = curr.id; + let prevKey = prev.id; + + for (let i = 0; i < this.#storage.idSize; i++) { + if (currKey[i] !== prevKey[i]) break; + sharedPrefixBytes++; + } + + return this.#bound(curr.timestamp, curr.id.subarray(0, sharedPrefixBytes + 1)); + } + } +} diff --git a/packages/system/src/negentropy/utils.ts b/packages/system/src/negentropy/utils.ts new file mode 100644 index 00000000..caa9d73c --- /dev/null +++ b/packages/system/src/negentropy/utils.ts @@ -0,0 +1,83 @@ +import { VectorStorageItem } from "./vector-storage"; +import { WrappedBuffer } from "./wrapped-buffer"; + +export const PROTOCOL_VERSION = 0x61; // Version 1 +export const FINGERPRINT_SIZE = 16; + +export const enum Mode { + Skip = 0, + Fingerprint = 1, + IdList = 2, +} + +/** + * Decode variable int, also consumes the bytes from buf + */ +export function decodeVarInt(buf: Uint8Array | WrappedBuffer) { + let res = 0; + + while (1) { + if (buf.length === 0) throw Error("parse ends prematurely"); + let byte = 0; + if (buf instanceof WrappedBuffer) { + byte = buf.shift(); + } else { + byte = buf[0]; + buf = buf.subarray(1); + } + res = (res << 7) | (byte & 127); + if ((byte & 128) === 0) break; + } + + return res; +} + +export function encodeVarInt(n: number) { + if (n === 0) return new Uint8Array([0]); + + let o = []; + while (n !== 0) { + o.push(n & 127); + n >>>= 7; + } + o.reverse(); + + for (let i = 0; i < o.length - 1; i++) o[i] |= 128; + + return new Uint8Array(o); +} + +export function getByte(buf: WrappedBuffer) { + return getBytes(buf, 1)[0]; +} + +export function getBytes(buf: WrappedBuffer | Uint8Array, n: number) { + if (buf.length < n) throw Error("parse ends prematurely"); + if (buf instanceof WrappedBuffer) { + return buf.shiftN(n); + } else { + const ret = buf.subarray(0, n); + buf = buf.subarray(n); + return ret; + } +} + +export function compareUint8Array(a: Uint8Array, b: Uint8Array) { + for (let i = 0; i < a.byteLength; i++) { + if (a[i] < b[i]) return -1; + if (a[i] > b[i]) return 1; + } + + if (a.byteLength > b.byteLength) return 1; + if (a.byteLength < b.byteLength) return -1; + + return 0; +} + +export function itemCompare(a: VectorStorageItem, b: VectorStorageItem) { + if (a.timestamp === b.timestamp) { + return compareUint8Array(a.id, b.id); + } + + return a.timestamp - b.timestamp; +} diff --git a/packages/system/src/negentropy/vector-storage.ts b/packages/system/src/negentropy/vector-storage.ts new file mode 100644 index 00000000..5d3c89b2 --- /dev/null +++ b/packages/system/src/negentropy/vector-storage.ts @@ -0,0 +1,116 @@ +import { hexToBytes } from "@noble/hashes/utils"; +import { Accumulator } from "./accumulator"; +import { itemCompare } from "./utils"; + +export interface VectorStorageItem { + timestamp: number; + id: Uint8Array; +} + +const IdSize = 32; + +export class NegentropyStorageVector { + #items: Array = []; + #sealed = false; + + constructor(other?: Array) { + if (other) { + this.#items = other; + this.#sealed = true; + } + } + + get idSize() { + return IdSize; + } + + insert(timestamp: number, id: string) { + if (this.#sealed) throw Error("already sealed"); + const idData = hexToBytes(id); + if (idData.byteLength !== IdSize) throw Error("bad id size for added item"); + this.#items.push({ timestamp, id: idData }); + } + + seal() { + if (this.#sealed) throw Error("already sealed"); + this.#sealed = true; + + this.#items.sort(itemCompare); + + for (let i = 1; i < this.#items.length; i++) { + if (itemCompare(this.#items[i - 1], this.#items[i]) === 0) { + debugger; + throw Error("duplicate item inserted"); + } + } + } + + unseal() { + this.#sealed = false; + } + + size() { + this.#checkSealed(); + return this.#items.length; + } + + getItem(i: number) { + this.#checkSealed(); + if (i >= this.#items.length) throw Error("out of range"); + return this.#items[i]; + } + + iterate(begin: number, end: number, cb: (item: VectorStorageItem, index: number) => boolean) { + this.#checkSealed(); + this.#checkBounds(begin, end); + + for (let i = begin; i < end; ++i) { + if (!cb(this.#items[i], i)) break; + } + } + + findLowerBound(begin: number, end: number, bound: VectorStorageItem) { + this.#checkSealed(); + this.#checkBounds(begin, end); + + return this.#binarySearch(this.#items, begin, end, a => itemCompare(a, bound) < 0); + } + + fingerprint(begin: number, end: number) { + const out = new Accumulator(); + + this.iterate(begin, end, item => { + out.add(item.id); + return true; + }); + + return out.getFingerprint(end - begin); + } + + #checkSealed() { + if (!this.#sealed) throw Error("not sealed"); + } + + #checkBounds(begin: number, end: number) { + if (begin > end || end > this.#items.length) throw Error("bad range"); + } + + #binarySearch(arr: Array, first: number, last: number, cmp: (item: VectorStorageItem) => boolean) { + let count = last - first; + + while (count > 0) { + let it = first; + let step = Math.floor(count / 2); + it += step; + + if (cmp(arr[it])) { + first = ++it; + count -= step + 1; + } else { + count = step; + } + } + + return first; + } +} diff --git a/packages/system/src/negentropy/wrapped-buffer.ts b/packages/system/src/negentropy/wrapped-buffer.ts new file mode 100644 index 00000000..dbb97d0f --- /dev/null +++ b/packages/system/src/negentropy/wrapped-buffer.ts @@ -0,0 +1,62 @@ +export class WrappedBuffer { + #raw: Uint8Array; + #length: number; + + constructor(buffer?: Uint8Array) { + this.#raw = buffer ? new Uint8Array(buffer) : new Uint8Array(512); + this.#length = buffer ? buffer.length : 0; + } + + unwrap() { + return this.#raw.subarray(0, this.#length); + } + + get capacity() { + return this.#raw.byteLength; + } + + get length() { + return this.#length; + } + + set(val: ArrayLike, offset?: number) { + this.#raw.set(val, offset); + this.#length = (offset ?? 0) + val.length; + } + + append(val: ArrayLike) { + const targetSize = val.length + this.#length; + this.resize(targetSize); + + this.#raw.set(val, this.#length); + this.#length += val.length; + } + + clear() { + this.#length = 0; + this.#raw.fill(0); + } + + resize(newSize: number) { + if (this.capacity < newSize) { + const newCapacity = Math.max(this.capacity * 2, newSize); + const newArr = new Uint8Array(newCapacity); + newArr.set(this.#raw); + this.#raw = newArr; + } + } + + shift() { + const first = this.#raw[0]; + this.#raw = this.#raw.subarray(1); + this.#length--; + return first; + } + + shiftN(n = 1) { + const firstSubarray = this.#raw.subarray(0, n); + this.#raw = this.#raw.subarray(n); + this.#length -= n; + return firstSubarray; + } +} diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index b6eaedd4..21c77ff4 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -3,7 +3,7 @@ import EventEmitter from "eventemitter3"; import { CachedTable } from "@snort/shared"; import { NostrEvent, TaggedNostrEvent, OkResponse } from "./nostr"; -import { RelaySettings, ConnectionStateSnapshot } from "./connection"; +import { Connection, RelaySettings } from "./connection"; import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; import { RelayMetricHandler } from "./relay-metric-handler"; import { @@ -155,10 +155,6 @@ export class NostrSystem extends EventEmitter implements Syst this.#queryManager.on("request", (subId: string, f: BuiltRawReqFilter) => this.emit("request", subId, f)); } - get Sockets(): ConnectionStateSnapshot[] { - return this.pool.getState(); - } - async Init() { const t = [ this.relayCache.preload(), diff --git a/packages/system/src/query-manager.ts b/packages/system/src/query-manager.ts index cb0db971..fbed78fb 100644 --- a/packages/system/src/query-manager.ts +++ b/packages/system/src/query-manager.ts @@ -105,15 +105,17 @@ export class QueryManager extends EventEmitter { } async #send(q: Query, qSend: BuiltRawReqFilter) { - if (qSend.strategy === RequestStrategy.CacheRelay && this.#system.cacheRelay) { - const qt = q.insertCompletedTrace(qSend, []); - const res = await this.#system.cacheRelay.query(["REQ", qt.id, ...qSend.filters]); - q.feed.add(res?.map(a => ({ ...a, relays: [] }) as TaggedNostrEvent)); - return; - } for (const qfl of this.#queryCacheLayers) { qSend = await qfl.processFilter(q, qSend); } + if (this.#system.cacheRelay) { + // fetch results from cache first, flag qSend for sync + const data = await this.#system.cacheRelay.query(["REQ", q.id, ...qSend.filters]); + if (data.length > 0) { + qSend.syncFrom = data as Array; + q.feed.add(data as Array); + } + } // automated outbox model, load relays for queried authors for (const f of qSend.filters) { diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index 6ccbf234..a9c817d5 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -277,7 +277,8 @@ export class Query extends EventEmitter { if (this.isOpen()) { for (const qt of this.#tracing) { if (qt.relay === c.Address) { - c.QueueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay()); + // todo: queue sync? + c.queueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay()); } } } @@ -371,7 +372,7 @@ export class Query extends EventEmitter { this.#log("Cant send non-specific REQ to ephemeral connection %O %O %O", q, q.relay, c); return false; } - if (q.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) { + if (q.filters.some(a => a.search) && !c.supportsNip(Nips.Search)) { this.#log("Cant send REQ to non-search relay", c.Address); return false; } @@ -382,7 +383,7 @@ export class Query extends EventEmitter { let filters = q.filters; const qt = new QueryTrace(c.Address, filters, c.Id); - qt.on("close", x => c.CloseReq(x)); + qt.on("close", x => c.closeReq(x)); qt.on("change", () => this.#onProgress()); qt.on("eose", (id, connId, forced) => this.emit("trace", { @@ -401,7 +402,12 @@ export class Query extends EventEmitter { c.on("event", handler); this.on("end", () => c.off("event", handler)); this.#tracing.push(qt); - c.QueueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay()); + + if (q.syncFrom !== undefined) { + c.queueReq(["SYNC", qt.id, q.syncFrom, ...qt.filters], () => qt.sentToRelay()); + } else { + c.queueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay()); + } return qt; } } diff --git a/packages/system/src/request-builder.ts b/packages/system/src/request-builder.ts index 82c2d733..1e4a7251 100644 --- a/packages/system/src/request-builder.ts +++ b/packages/system/src/request-builder.ts @@ -4,9 +4,8 @@ import { appendDedupe, dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snor import EventKind from "./event-kind"; import { NostrLink, NostrPrefix, SystemInterface } from "."; -import { ReqFilter, u256, HexKey } from "./nostr"; +import { ReqFilter, u256, HexKey, TaggedNostrEvent } from "./nostr"; import { AuthorsRelaysCache, splitByWriteRelays, splitFlatByWriteRelays } from "./outbox-model"; -import { CacheRelay } from "cache-relay"; /** * Which strategy is used when building REQ filters @@ -27,11 +26,6 @@ export const enum RequestStrategy { * Use pre-determined relays for query */ ExplicitRelays = "explicit-relays", - - /** - * Query the cache relay - */ - CacheRelay = "cache-relay", } /** @@ -41,10 +35,17 @@ export interface BuiltRawReqFilter { filters: Array; relay: string; strategy: RequestStrategy; + + // Use set sync from an existing set of events + syncFrom?: Array; } export interface RequestBuilderOptions { + /** + * Dont send CLOSE directly after EOSE and allow events to stream in + */ leaveOpen?: boolean; + /** * Do not apply diff logic and always use full filters for query */ @@ -131,10 +132,8 @@ export class RequestBuilder { return this.#builders.map(f => f.filter); } - async build(system: SystemInterface): Promise> { - const expanded = ( - await Promise.all(this.#builders.map(a => a.build(system.relayCache, system.cacheRelay, this.#options))) - ).flat(); + build(system: SystemInterface): Array { + const expanded = this.#builders.flatMap(a => a.build(system.relayCache, this.#options)); return this.#groupByRelay(system, expanded); } @@ -295,39 +294,7 @@ export class RequestFilterBuilder { /** * Build/expand this filter into a set of relay specific queries */ - async build( - relays: AuthorsRelaysCache, - cacheRelay?: CacheRelay, - options?: RequestBuilderOptions, - ): Promise> { - // if since/until are set ignore sync split, cache relay wont be used - if (cacheRelay && this.#filter.since === undefined && this.#filter.until === undefined) { - const latest = await cacheRelay.query([ - "REQ", - uuid(), - { - ...this.#filter, - since: undefined, - until: undefined, - limit: 1, - }, - ]); - if (latest.length === 1) { - return [ - ...this.#buildFromFilter(relays, { - ...this.#filter, - since: latest[0].created_at, - until: undefined, - limit: undefined, - }), - { - filters: [this.#filter], - relay: "==CACHE==", - strategy: RequestStrategy.CacheRelay, - }, - ]; - } - } + build(relays: AuthorsRelaysCache, options?: RequestBuilderOptions): Array { return this.#buildFromFilter(relays, this.#filter, options); } diff --git a/packages/system/src/worker/system-worker.ts b/packages/system/src/worker/system-worker.ts index f8e39d26..51370627 100644 --- a/packages/system/src/worker/system-worker.ts +++ b/packages/system/src/worker/system-worker.ts @@ -1,7 +1,6 @@ import { v4 as uuid } from "uuid"; import EventEmitter from "eventemitter3"; import { - ConnectionStateSnapshot, NostrEvent, OkResponse, ProfileLoaderService, @@ -82,7 +81,7 @@ export class SystemWorker extends EventEmitter implements Sys }; } - get Sockets(): ConnectionStateSnapshot[] { + get Sockets(): never[] { return []; } diff --git a/packages/system/tests/negentropy.test.ts b/packages/system/tests/negentropy.test.ts new file mode 100644 index 00000000..75725a7c --- /dev/null +++ b/packages/system/tests/negentropy.test.ts @@ -0,0 +1,5 @@ +import { NegentropyStorageVector, VectorStorageItem } from "../src/negentropy/vector-storage"; + +describe("negentropy", () => { + it("should decodeBound", () => {}); +}); diff --git a/packages/worker-relay/src/worker.ts b/packages/worker-relay/src/worker.ts index 8d427804..669f8c8a 100644 --- a/packages/worker-relay/src/worker.ts +++ b/packages/worker-relay/src/worker.ts @@ -112,9 +112,17 @@ globalThis.onmessage = async ev => { await barrierQueue(cmdQueue, async () => { const req = msg.args as ReqCommand; const filters = req.slice(2) as Array; - const results = []; + const results: Array = []; + const ids = new Set(); for (const r of filters) { - results.push(...relay!.req(req[1], r)); + const rx = relay!.req(req[1], r); + for (const x of rx) { + if ((typeof x === "string" && ids.has(x)) || ids.has((x as NostrEvent).id)) { + continue; + } + ids.add(typeof x === "string" ? x : (x as NostrEvent).id); + results.push(x); + } } reply(msg.id, results); }); diff --git a/yarn.lock b/yarn.lock index 9007a9b5..7292e89e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3080,7 +3080,7 @@ __metadata: "@snort/shared": ^1.0.11 "@stablelib/xchacha20": ^1.0.1 "@types/debug": ^4.1.8 - "@types/jest": ^29.5.1 + "@types/jest": ^29.5.11 "@types/lokijs": ^1.5.14 "@types/node": ^20.5.9 "@types/uuid": ^9.0.2 @@ -3586,13 +3586,13 @@ __metadata: languageName: node linkType: hard -"@types/jest@npm:^29.5.1": - version: 29.5.8 - resolution: "@types/jest@npm:29.5.8" +"@types/jest@npm:^29.5.11": + version: 29.5.11 + resolution: "@types/jest@npm:29.5.11" dependencies: expect: ^29.0.0 pretty-format: ^29.0.0 - checksum: ca8438a5b4c098c8c023e9d5b279ea306494a1d0b5291cfb498100fa780377145f068b2a021d545b0398bbe0328dcc37044dd3aaf3c6c0fe9b0bef7b46a63453 + checksum: f892a06ec9f0afa9a61cd7fa316ec614e21d4df1ad301b5a837787e046fcb40dfdf7f264a55e813ac6b9b633cb9d366bd5b8d1cea725e84102477b366df23fdd languageName: node linkType: hard