From e326df9a2298b26aca66655b8311a1ac7e1f0acc Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Mon, 20 Mar 2023 10:51:35 -0500 Subject: [PATCH] Remove dependency on nostr-tools for pool --- src/agent/pool.ts | 275 +++++++++++++++++++---------- src/agent/sync.ts | 13 +- src/app/connection.js | 2 +- src/util/misc.ts | 7 +- src/views/login/ConnectUser.svelte | 2 +- 5 files changed, 197 insertions(+), 102 deletions(-) diff --git a/src/agent/pool.ts b/src/agent/pool.ts index 1ba50d51..438094da 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -1,10 +1,11 @@ import type {Relay, Filter} from "nostr-tools" +import type {Deferred} from "src/util/misc" import type {MyEvent} from "src/util/types" -import {relayInit} from "nostr-tools" +import {verifySignature} from "nostr-tools" import {pluck, objOf, identity, is} from "ramda" -import {ensurePlural} from "hurdak/lib/hurdak" +import {ensurePlural, noop} from "hurdak/lib/hurdak" import {warn, log, error} from "src/util/logger" -import {union, now, difference} from "src/util/misc" +import {union, defer, tryJson, now, difference} from "src/util/misc" import {isRelay, normalizeRelayUrl} from "src/util/nostr" const forceRelays = (import.meta.env.VITE_FORCE_RELAYS || "") @@ -25,15 +26,31 @@ const CONNECTION_STATUS = { } class Connection { - promise: Promise - nostr: Relay + ws?: WebSocket + url: string + promise?: Deferred + queue: string[] status: string + closed?: number + timeout?: number + listeners: Record void>> stats: Record - lastConnectionAttempt: number constructor(url) { + this.ws = null + this.url = url this.promise = null - this.nostr = relayInit(url) - this.status = "new" + this.queue = [] + this.status = CONNECTION_STATUS.NEW + this.closed = null + this.timeout = null + + this.listeners = { + OK: {}, + ERROR: {}, + EVENT: {}, + EOSE: {}, + } + this.stats = { timeouts: 0, subsCount: 0, @@ -45,45 +62,117 @@ class Connection { connections[url] = this } - hasRecentError() { - return this.status === CONNECTION_STATUS.ERROR && now() - this.lastConnectionAttempt < 10 + connect() { + if (this.ws) { + throw new Error("Attempted to connect when already connected") + } + + this.status = CONNECTION_STATUS.PENDING + this.ws = new WebSocket(this.url) + this.promise = defer() + this.closed = null + + this.ws.addEventListener("open", () => { + this.status = CONNECTION_STATUS.READY + this.promise.resolve() + }) + + this.ws.addEventListener("message", e => { + this.queue.push(e.data) + + if (!this.timeout) { + this.timeout = window.setTimeout(() => this.handleMessages(), 10) + } + }) + + this.ws.addEventListener("error", () => { + this.status = CONNECTION_STATUS.ERROR + this.promise.reject() + this.closed = now() + }) + + this.ws.addEventListener("close", () => { + this.status = CONNECTION_STATUS.CLOSED + this.promise.reject() + this.closed = now() + }) } - async connect() { - const shouldConnect = - this.status === CONNECTION_STATUS.NEW || - (this.status === CONNECTION_STATUS.ERROR && now() - this.lastConnectionAttempt > 10) - - if (shouldConnect) { - this.status = CONNECTION_STATUS.PENDING - this.promise = this.nostr.connect() - - this.nostr.on("connect", () => { - this.status = CONNECTION_STATUS.READY - }) - - this.nostr.on("error", () => { - this.status = CONNECTION_STATUS.ERROR - }) - - this.nostr.on("disconnect", () => { - this.status = CONNECTION_STATUS.CLOSED - }) + disconnect() { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.close() } - this.lastConnectionAttempt = now() - - try { - await this.promise - } catch (e) { - // This is already handled in the on error handler above + this.ws = null + } + async autoConnect() { + if (this.status === CONNECTION_STATUS.NEW) { + this.connect() + } else if (this.closed && now() - 10 > this.closed) { + // If the connection was closed, try to re-open, but throttle it + this.disconnect() + this.connect() } + await this.promise.catch(noop) + return this } - disconnect() { - this.nostr.close() + on(name, id, cb) { + this.listeners[name][id] = cb + } + off(name, id) { + delete this.listeners[name][id] + } + handleMessages() { + for (const json of this.queue.splice(0, 10)) { + const message = tryJson(() => JSON.parse(json)) - delete connections[this.nostr.url] + if (message) { + const [verb, ...payload] = message + + for (const listener of Object.values(this.listeners[verb] || {})) { + listener(...payload) + } + } + } + + this.timeout = this.queue.length > 0 ? window.setTimeout(() => this.handleMessages(), 10) : null + } + send(...payload) { + this.ws.send(JSON.stringify(payload)) + } + subscribe(filters, id, {onEvent, onEose}) { + this.on("EVENT", id, (subid, e) => subid === id && onEvent(e)) + this.on("EOSE", id, subid => subid === id && onEose()) + + this.send("REQ", id, ...filters) + + return { + conn: this, + unsub: () => { + this.send("CLOSE", id, ...filters) + + this.off("EVENT", id) + this.off("EOSE", id) + }, + } + } + publish(event, {onOk, onError}) { + const withCleanup = f => eid => { + if (eid === event.id) { + f() + this.off("OK", event.id) + this.off("ERROR", event.id) + } + } + + this.on("OK", event.id, withCleanup(onOk)) + this.on("ERROR", event.id, withCleanup(onError)) + + this.send("EVENT", event) + } + hasRecentError() { + return this.status === CONNECTION_STATUS.ERROR && now() - this.closed < 10 } getQuality() { if (this.status === CONNECTION_STATUS.ERROR) { @@ -137,7 +226,15 @@ const connect = url => { connections[url] = new Connection(url) } - return connections[url].connect() + return connections[url].autoConnect() +} + +const disconnect = url => { + if (connections[url]) { + connections[url].disconnect() + + delete connections[url] + } } // Public api - publish/subscribe @@ -205,19 +302,18 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => { const conn = await connect(relay.url) if (conn.status === CONNECTION_STATUS.READY) { - const pub = conn.nostr.publish(event) - - pub.on("ok", () => { - succeeded.add(relay.url) - timeouts.delete(relay.url) - failed.delete(relay.url) - attemptToResolve() - }) - - pub.on("failed", reason => { - failed.add(relay.url) - timeouts.delete(relay.url) - attemptToResolve() + conn.publish(event, { + onOk: () => { + succeeded.add(relay.url) + timeouts.delete(relay.url) + failed.delete(relay.url) + attemptToResolve() + }, + onError: () => { + failed.add(relay.url) + timeouts.delete(relay.url) + attemptToResolve() + }, }) } else { failed.add(relay.url) @@ -272,53 +368,43 @@ const subscribe = async ({relays, filter, onEvent, onEose, onError}: SubscribeOp return } - const sub = conn.nostr.sub(filter, { - id, - // This isn't currently working for some reason - // alreadyHaveEvent: (id, url) => { - // conn.stats.eventsCount += 1 - - // if (seen.has(id)) { - // return true - // } - - // seen.add(id) - - // return false - // }, - }) - - sub.on("event", e => { - if (!seen.has(e.id)) { - seen.add(e.id) - - // Normalize events here, annotate with relay url - onEvent({...e, seen_on: relay.url, content: e.content || ""}) - } - }) - - sub.on("eose", () => { - if (onEose) { - onEose(conn.nostr.url) - } - - // Keep track of relay timing stats, but only for the first eose we get - if (!eose.has(conn.nostr.url)) { - eose.add(conn.nostr.url) - - conn.stats.eoseCount += 1 - conn.stats.eoseTimer += Date.now() - now - } - }) - conn.stats.subsCount += 1 conn.stats.activeSubsCount += 1 if (conn.stats.activeSubsCount > 10) { - warn(`Relay ${conn.nostr.url} has >10 active subscriptions`) + warn(`Relay ${conn.url} has >10 active subscriptions`) } - return Object.assign(sub, {conn}) + return conn.subscribe(filter, id, { + onEvent: e => { + if (seen.has(e.id)) { + return + } + + seen.add(e.id) + + if (!verifySignature(e)) { + return + } + + conn.stats.eventsCount += 1 + + onEvent({...e, seen_on: relay.url, content: e.content || ""}) + }, + onEose: () => { + if (onEose) { + onEose(conn.url) + } + + // Keep track of relay timing stats, but only for the first eose we get + if (!eose.has(conn.url)) { + eose.add(conn.url) + + conn.stats.eoseCount += 1 + conn.stats.eoseTimer += Date.now() - now + } + }, + }) }) return { @@ -370,6 +456,7 @@ export default { getConnections, getConnection, connect, + disconnect, publish, subscribe, } diff --git a/src/agent/sync.ts b/src/agent/sync.ts index 9a036935..c62bd70e 100644 --- a/src/agent/sync.ts +++ b/src/agent/sync.ts @@ -209,12 +209,15 @@ addHandler( addHandler( 10002, profileHandler("relays", (e, p) => { - return e.tags.map(([_, url, mode]) => { - const read = (mode || "read") === "read" - const write = (mode || "write") === "write" + return Tags.from(e) + .type("r") + .all() + .map(([_, url, mode]) => { + const read = (mode || "read") === "read" + const write = (mode || "write") === "write" - return {url, read, write} - }) + return {url, read, write} + }) }) ) diff --git a/src/app/connection.js b/src/app/connection.js index 8753ed69..d759f1bd 100644 --- a/src/app/connection.js +++ b/src/app/connection.js @@ -18,7 +18,7 @@ setInterval(() => { // Alert the user to any heinously slow connections slowConnections.set( Object.values(pool.getConnections()).filter( - c => relayUrls.has(c.nostr.url) && first(c.getQuality()) < 0.3 + c => relayUrls.has(c.url) && first(c.getQuality()) < 0.3 ) ) }, 30_000) diff --git a/src/util/misc.ts b/src/util/misc.ts index c5fbdc28..15dc0a20 100644 --- a/src/util/misc.ts +++ b/src/util/misc.ts @@ -234,7 +234,12 @@ export const batch = (t, f) => { } } -export const defer = () => { +export type Deferred = Promise & { + resolve: (arg: T) => void + reject: (arg: T) => void +} + +export const defer = (): Deferred => { let resolve, reject const p = new Promise((resolve_, reject_) => { resolve = resolve_ diff --git a/src/views/login/ConnectUser.svelte b/src/views/login/ConnectUser.svelte index 9446866f..fdf903ff 100644 --- a/src/views/login/ConnectUser.svelte +++ b/src/views/login/ConnectUser.svelte @@ -67,7 +67,7 @@ navigate("/notes/follows") } else { - pool.getConnection(relay.url).disconnect() + pool.disconnect(relay.url) } }) }