From e9da4e899b3cd089de1a749bc4363148f559f925 Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Tue, 28 Mar 2023 15:08:41 -0500 Subject: [PATCH] Add new pool based on paravel --- ROADMAP.md | 3 +- package-lock.json | Bin 601593 -> 603912 bytes package.json | 1 + src/App.svelte | 15 +- src/agent/cmd.ts | 11 +- src/agent/network.ts | 26 +- src/agent/pool.ts | 682 ++++++++-------------- src/agent/relays.ts | 7 +- src/agent/user.ts | 2 + src/app/connection.js | 10 +- src/partials/RelayCard.svelte | 9 +- src/routes/PersonDetail.svelte | 4 +- src/routes/RelayDetail.svelte | 9 +- src/views/SideNav.svelte | 2 +- src/views/login/ConnectUser.svelte | 2 +- src/views/notes/Note.svelte | 2 +- src/views/onboarding/Onboarding.svelte | 6 +- src/views/onboarding/OnboardingKey.svelte | 2 +- src/views/relays/RelayCard.svelte | 9 +- 19 files changed, 290 insertions(+), 512 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index aa9e3c08..36b93581 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,7 +1,7 @@ # Current +- [ ] Fix reactions and replies - [ ] Multiplexer - - [ ] Explore the idea of separating everything into different components and wiring it all up into a system in a single file. - [ ] Write NIP to support proxies. Update COUNT NIP to mention how proxies are a good use case for COUNT - [ ] Fix iOS/safari/firefox - [ ] https://github.com/staab/coracle/issues/42 @@ -136,3 +136,4 @@ - Graph view? Query db with COUNT? Hardware specs on relay info endpoint? - "adoptarelay.com" - Add suggested relays based on follows or topics +- [ ] Integrate plephy https://plebhy.com/ diff --git a/package-lock.json b/package-lock.json index 9932a932128b423b15fd03367f55f5c3d81fdb41..b023b026820cb7fdfd645033550b59e94b543e60 100644 GIT binary patch delta 1402 zcmex)TczWj%7!;gf(401iDjudN>&O=aRz#Zdgjd!nA#sOF#<8u_6JPNM>VEfYB6&& zmQO!;fl+GugvD$o(-!~*gvyKcb@elg^KZEiiQ~_RJ~t){aO`3onQa zsq}I;O*J(zb~CE*EDbTrNe(qDat1L6q9hKxA?wp@t z5ET?*=ICQ#;h*o~R}`KY9-ePr5@4ET>R(wE>|>d(Uu6;!V4hT*;W}Ayp2oD->{llz zu=7lJn8wV#eOnXre+{O@vgv|WERxK?K${$BXDkX0NMN8qS-L=(=?{xoRYJWojZKRk zBP>mPlajLojnc|OjI#X$vcodUqcWV#OWgF0Eeo<-JuRc0z1#z}bHbAg1NGAbgYWl-HpO0JAP50F8_*&Q!dUz&p^*uNry`T3Y3te zWqRUUX7S0Zm_!&&C+}xcnjYNADmuMkB{N_9=GDyGH?L+n5(A9fiNTB#j26=k&DhPj z!7*E2tZM;u|HKk@)wBxBvgD}9kRrDLXY<5h&!jB#;vnb9d~+ZDh$IWoO6`JhgS>Qu z@)DymXRk7IUrP^v-%5kfd_Uu&B5hAMXY-6C^RkHi!eH$H%j}X$A0M9#e{CN}V3fj~ ztW=PiS~UIRCPw|~>yI;=nI@H{rKJ{?mSjTGr3uh~hH%9S3QDCVX}T7=Wr;bNDTyVi zptNgeYdT%w1Ebhvfqeqg>v|c5xDaMcZ+yUP!i%D3dcs*|pZ0PNRv>2EUe3XOd>54t4t=@->lRkpomk5n4)ls(-rpG}$Eq}?9I b%dtI-morvqkfiE~T5K|q$lNT?>Ba{D_8`aw delta 309 zcmeBpr}Fc*%7!;g&792boXm_s%(R`8nR%zic5Yo}<4{KP=?fj1jHhq=z%Bz~e`8V! zjm$51@zM@94h^^Tc8&~iiAsx#i176cDe?;T&B}Dnb@TT&^~let^v+J#E;n<{EN}}A z$#x5J3J7vEatiT@@YgRaw@5F|_KryNF>o?>DKpM0NuC^Nrm@|infbM5`|fqj+jp;H zITX`=hL;tH*|wkIWj{V+`?}@qS3KzEpy`77tSZ|JKC(wDwMU3@Y>yD-j1t;@NQ2Xj F4*-Z^YBvA? diff --git a/package.json b/package.json index c8da582d..f2e690c4 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "lru-cache": "^7.18.3", "nostr-tools": "^1.7.4", "npm-run-all": "^4.1.5", + "paravel": "^0.1.7", "qr-scanner": "^1.4.2", "qrcode": "^1.5.1", "ramda": "^0.28.0", diff --git a/src/App.svelte b/src/App.svelte index 273ff22b..b4e627d1 100644 --- a/src/App.svelte +++ b/src/App.svelte @@ -83,13 +83,16 @@ $: style.textContent = `:root { ${getThemeVariables($theme)}; background: var(--gray-8); }` - // When we get an AUTH challenge from our pool, attempt to authenticate - pool.eventBus.on("AUTH", async (challenge, connection) => { - const publishable = cmd.authenticate(challenge, url) - const [event] = await publishable.publish([{url: connection.url}]) + const seenChallenges = new Set() - connection.checkAuth(event.id) - }) + // When we get an AUTH challenge from our pool, attempt to authenticate + pool.Config.authHandler = async (url, challenge) => { + if (!seenChallenges.has(challenge)) { + seenChallenges.add(challenge) + + return first(await cmd.authenticate(url, challenge).publish([{url}])) + } + } onMount(() => { // Keep scroll position on body, but don't allow scrolling diff --git a/src/agent/cmd.ts b/src/agent/cmd.ts index 3788969f..e6f0ecda 100644 --- a/src/agent/cmd.ts +++ b/src/agent/cmd.ts @@ -7,11 +7,11 @@ import pool from "src/agent/pool" import sync from "src/agent/sync" import keys from "src/agent/keys" -const authenticate = (challenge, relay) => +const authenticate = (url, challenge) => new PublishableEvent(22242, { tags: [ ["challenge", challenge], - ["relay", relay], + ["relay", url], ], }) @@ -85,7 +85,7 @@ const processMentions = map(pubkey => { const name = displayPerson(getPersonWithFallback(pubkey)) const relay = getRelayForPersonHint(pubkey) - return ["p", pubkey, relay?.url || '', name] + return ["p", pubkey, relay?.url || "", name] }) const getReplyTags = n => { @@ -132,8 +132,11 @@ class PublishableEvent { this.event = {kind, content, tags, pubkey, created_at: createdAt} } + getSignedEvent() { + return keys.sign(this.event) + } async publish(relays, onProgress = null) { - const event = await keys.sign(this.event) + const event = await this.getSignedEvent() const promise = pool.publish({relays, event, onProgress}) sync.processEvents(event) diff --git a/src/agent/network.ts b/src/agent/network.ts index 0079cf47..562673be 100644 --- a/src/agent/network.ts +++ b/src/agent/network.ts @@ -1,7 +1,6 @@ import type {MyEvent} from "src/util/types" -import {sortBy, assoc, uniq, uniqBy, prop, propEq, reject, groupBy, pluck} from "ramda" +import {sortBy, assoc, uniq, uniqBy, prop, propEq, groupBy, pluck} from "ramda" import {personKinds, findReplyId} from "src/util/nostr" -import {log} from "src/util/logger" import {chunk} from "hurdak/lib/hurdak" import {batch, now, timedelta} from "src/util/misc" import { @@ -42,6 +41,7 @@ const listen = ({relays, filter, onChunk = null, shouldProcess = true, delay = 5 const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5000}) => { return new Promise(resolve => { + let completed = false const done = new Set() const allEvents = [] @@ -49,25 +49,16 @@ const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5 const sub = await subPromise // If we've already unsubscribed we're good - if (!sub.isActive()) { + if (completed) { return } const isDone = done.size === relays.length if (force) { - const timedOutRelays = reject(r => done.has(r.url), relays) - - log( - `Timing out ${timedOutRelays.length}/${relays.length} relays after ${timeout}ms`, - timedOutRelays - ) - - timedOutRelays.forEach(relay => { - const conn = pool.getConnection(relay.url) - - if (conn) { - conn.stats.timeouts += 1 + relays.forEach(relay => { + if (!done.has(relay.url)) { + pool.Meta.onTimeout(relay.url) } }) } @@ -75,6 +66,7 @@ const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5 if (isDone || force) { sub.unsub() resolve(allEvents) + completed = true } } @@ -101,10 +93,6 @@ const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5 done.add(url) attemptToComplete(false) }, - onError: url => { - done.add(url) - attemptToComplete(false) - }, }) }) as Promise } diff --git a/src/agent/pool.ts b/src/agent/pool.ts index 18607217..ac578b9d 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -1,318 +1,193 @@ import type {Relay, Filter} from "nostr-tools" -import type {Deferred} from "src/util/misc" import type {MyEvent} from "src/util/types" +import {Socket, Pool, Plex, Relays, Executor} from "paravel" import {verifySignature} from "nostr-tools" -import {pluck, objOf, identity, is} from "ramda" -import {ensurePlural, noop} from "hurdak/lib/hurdak" +import {pluck, identity} from "ramda" +import {ensurePlural, switcher} from "hurdak/lib/hurdak" import {warn, log, error} from "src/util/logger" -import {union, EventBus, defer, tryJson, difference} from "src/util/misc" -import {isRelay, normalizeRelayUrl} from "src/util/nostr" +import {union, difference} from "src/util/misc" +import {normalizeRelayUrl} from "src/util/nostr" -const forceRelays = (import.meta.env.VITE_FORCE_RELAYS || "") - .split(",") - .filter(identity) - .map(objOf("url")) - -// Connection management - -const eventBus = new EventBus() - -const connections = {} - -const STATUS = { - NEW: "new", - PENDING: "pending", - CLOSED: "closed", - ERROR: "error", - READY: "ready", +const Config = { + multiplextrUrl: null, + authHandler: null, } -const ERROR = { - CONNECTION: "connection", - UNAUTHORIZED: "unauthorized", - FORBIDDEN: "forbidden", +const Meta = { + stats: {}, + errors: {}, + getStats(url) { + if (!this.stats[url]) { + this.stats[url] = { + error: null, + timeouts: 0, + subsCount: 0, + eoseCount: 0, + eoseTimer: 0, + eventsCount: 0, + activeSubsCount: 0, + lastRequest: 0, + openedAt: 0, + closedAt: 0, + } + } + + return this.stats[url] + }, + onPublish(urls) { + urls.forEach(url => { + const stats = this.getStats(url) + + stats.lastRequest = Date.now() + }) + }, + onSubscriptionStart(urls) { + urls.forEach(url => { + const stats = this.getStats(url) + + stats.subsCount += 1 + stats.activeSubsCount += 1 + stats.lastRequest = Date.now() + + if (stats.activeSubsCount > 10) { + warn(`Relay ${url} has >10 active subscriptions`) + } + }) + }, + onSubscriptionEnd(urls) { + urls.forEach(url => { + const stats = this.getStats(url) + + stats.activeSubsCount -= 1 + }) + }, + onEvent(url) { + const stats = this.getStats(url) + + stats.eventsCount += 1 + }, + onEose(url, ms) { + const stats = this.getStats(url) + + stats.eoseCount += 1 + stats.eoseTimer += ms + }, + onTimeout(url) { + const stats = this.getStats(url) + + stats.timeouts += 1 + }, } -class Connection { - ws?: WebSocket - url: string - promise?: Deferred - queue: string[] - error: {code: string; message: string, occurredAt: number} - status: {code: string; message: string} - timeout?: number - stats: Record - bus: EventBus - constructor(url) { - if (connections[url]) { - error(`Connection to ${url} already exists`) - } - - this.ws = null - this.url = url - this.promise = null - this.queue = [] - this.timeout = null - this.bus = new EventBus() - this.error = null - this.stats = { - timeouts: 0, - subsCount: 0, - eoseCount: 0, - eoseTimer: 0, - eventsCount: 0, - activeSubsCount: 0, - } - - this.status = {code: STATUS.NEW, message: "Waiting to connect"} - this.listenForAuth() - - connections[url] = this - } - setStatus(code, message) { - this.status = {code, message} - } - setError(code, message) { - this.error = {code, message, occurredAt: Date.now()} - } - connect() { - if (this.ws) { - error("Attempted to connect when already connected", this) - } - - this.promise = defer() - this.ws = new WebSocket(this.url) - this.setStatus(STATUS.PENDING, "Trying to connect") - - this.ws.addEventListener("open", () => { - log(`Opened connection to ${this.url}`) - - this.setStatus(STATUS.READY, "Connected") - this.promise.resolve() - }) - - this.ws.addEventListener("message", e => { - this.queue.push(e.data) - - if (!this.timeout) { - this.timeout = window.setTimeout(() => this.handleMessages(), 10) - } - }) - - this.ws.addEventListener("error", e => { - log(`Error on connection to ${this.url}`) - - this.disconnect() - this.promise.reject() - this.setError(ERROR.CONNECTION, "Disconnected") - this.setStatus(STATUS.CLOSED, "Closed") - }) - - this.ws.addEventListener("close", () => { - log(`Closed connection to ${this.url}`) - - this.disconnect() - this.promise.reject() - this.setStatus(STATUS.CLOSED, "Closed") - }) - } - disconnect() { - if (this.ws) { - log(`Disconnecting from ${this.url}`) - - this.ws.close() - this.ws = null - } - } - async autoConnect() { - // If the connection has not been opened, or was closed, open 'er up - if (!this.error && [STATUS.NEW, STATUS.CLOSED].includes(this.status.code)) { - this.connect() - } - - // If the connection failed, try to re-open after a while - if (this.error?.code === ERROR.CONNECTION && Date.now() - 30_000 > this.error.occurredAt) { - this.disconnect() - this.connect() - } - - await this.promise.catch(noop) - - return this - } - handleMessages() { - for (const json of this.queue.splice(0, 10)) { - const message = tryJson(() => JSON.parse(json)) - - if (message) { - const [k, ...payload] = message - - this.bus.handle(k, ...payload) - } - } - - this.timeout = this.queue.length > 0 ? window.setTimeout(() => this.handleMessages(), 10) : null - } - send(...payload) { - if (this.ws?.readyState !== 1) { - warn("Send attempted before socket was ready", this) - } - - this.ws.send(JSON.stringify(payload)) - } - subscribe(filters, id, {onEvent, onEose}) { - const [eventChannel, eoseChannel] = [ - this.bus.on("EVENT", (subid, e) => subid === id && onEvent(e)), - this.bus.on("EOSE", subid => subid === id && onEose()), - ] - - this.send("REQ", id, ...filters) - - return { - conn: this, - unsub: () => { - if (this.status.code === STATUS.READY) { - this.send("CLOSE", id) - } - - this.bus.off("EVENT", eventChannel) - this.bus.off("EOSE", eoseChannel) - }, - } - } - publish(event, {onOk, onError}) { - const withCleanup = cb => k => { - if (k === event.id) { - cb() - this.bus.off("OK", okChannel) - this.bus.off("ERROR", errorChannel) - } - } - - const [okChannel, errorChannel] = [ - this.bus.on("OK", withCleanup(onOk)), - this.bus.on("ERROR", withCleanup(onError)), - ] - - this.send("EVENT", event) - } - count(filter, id, {onCount}) { - const channel = this.bus.on("COUNT", (subid, ...payload) => { - if (subid === id) { - onCount(...payload) - - this.bus.off("COUNT", channel) - } - }) - - this.send("COUNT", id, ...filter) - } - listenForAuth() { - // Propagate auth to global handler - this.bus.on("AUTH", challenge => { - if (!this.error) { - this.setStatus(STATUS.ERROR, "Logging in") - this.setError(ERROR.UNAUTHORIZED, "Logging in") - - eventBus.handle("AUTH", challenge, this) - } - }) - } - checkAuth(eid) { - const channel = this.bus.on("OK", (id, ok, message) => { - if (id === eid) { - if (ok) { - this.setStatus(STATUS.READY, "Connected") - } else { - this.disconnect() - this.setError(ERROR.FORBIDDEN, "Access restricted") - } - - this.bus.off("OK", channel) - } - }) - } - hasRecentError() { - return this.error && Date.now() - 30_000 < this.error.occurredAt - } - getQuality() { - if (this.error) { - return [0, this.error.message] - } - - const {timeouts, subsCount, eoseTimer, eoseCount} = this.stats - const timeoutRate = timeouts > 0 ? timeouts / subsCount : null - const eoseQuality = eoseCount > 0 ? Math.max(1, 500 / (eoseTimer / eoseCount)) : null - - if (timeoutRate && timeoutRate > 0.5) { - return [1 - timeoutRate, "Slow connection"] - } - - if (eoseQuality && eoseQuality < 0.7) { - return [eoseQuality, "Slow connection"] - } - - if (eoseQuality) { - return [eoseQuality, "Connected"] - } - - if (this.status.code === STATUS.READY) { - return [1, "Connected"] - } - - return [0.5, this.status.message] - } -} - -const getConnections = () => connections - -const getConnection = url => connections[url] - -const connect = url => { - if (!isRelay(url)) { - warn(`Invalid relay url ${url}`) - } - - if (url !== normalizeRelayUrl(url)) { - warn(`Received non-normalized relay url ${url}`) - } - - if (!connections[url]) { - connections[url] = new Connection(url) - } - - return connections[url].autoConnect() -} - -const disconnect = url => { - if (connections[url]) { - connections[url].disconnect() - - delete connections[url] - } -} - -// Public api - publish/subscribe - -const publish = async ({relays, event, onProgress, timeout = 5000}) => { - if (forceRelays.length > 0) { - relays = forceRelays - } +const forceUrls = (import.meta.env.VITE_FORCE_RELAYS || "").split(",").filter(identity) +const getUrls = relays => { if (relays.length === 0) { - error(`Attempted to publish to zero relays`, event) - } else { - log(`Publishing to ${relays.length} relays`, event, relays) + error(`Attempted to connect to zero urls`) } - const urls = new Set(pluck("url", relays)) + const urls = new Set(pluck("url", relays).map(normalizeRelayUrl)) if (urls.size !== relays.length) { - warn(`Attempted to publish to non-unique relays`) + warn(`Attempted to connect to non-unique relays`) } + return Array.from(urls) +} + +const pool = new Pool() + +pool.bus.addListeners({ + open: ({url}) => { + const stats = Meta.getStats(url) + + stats.openedAt = Date.now() + }, + close: ({url}) => { + const stats = Meta.getStats(url) + + stats.closedAt = Date.now() + }, +}) + +function disconnect(url) { + pool.remove(url) + + delete Meta.stats[url] + delete Meta.errors[url] +} + +function getQuality(url) { + if (Meta.errors[url]) { + return [ + 0, + switcher(Meta.errors[url], { + disconnected: "Disconnected", + unauthorized: "Logging in", + forbidden: "Failed to log in", + }), + ] + } + + const stats = Meta.getStats(url) + + const {timeouts, subsCount, eoseTimer, eoseCount} = stats + const timeoutRate = timeouts > 0 ? timeouts / subsCount : null + const eoseQuality = eoseCount > 0 ? Math.max(1, 500 / (eoseTimer / eoseCount)) : null + + if (timeoutRate && timeoutRate > 0.5) { + return [1 - timeoutRate, "Slow connection"] + } + + if (eoseQuality && eoseQuality < 0.7) { + return [eoseQuality, "Slow connection"] + } + + if (eoseQuality) { + return [eoseQuality, "Connected"] + } + + if (pool.get(url).status === Socket.STATUS.READY) { + return [1, "Connected"] + } + + return [0.5, "Not Connected"] +} + +function getExecutor(urls) { + if (forceUrls.length > 0) { + urls = forceUrls + } + + const executor = new Executor( + Config.multiplextrUrl + ? new Plex(urls, pool.get(Config.multiplextrUrl)) + : new Relays(urls.map(url => pool.get(url))) + ) + + executor.handleAuth({ + onAuth(url, challenge) { + Meta.errors[url] = "unauthorized" + + return Config.authHandler(url, challenge) + }, + onOk(url, id, ok, message) { + Meta.errors[url] = ok ? null : "forbidden" + }, + }) + + return executor +} + +async function publish({relays, event, onProgress, timeout = 3000}) { + const urls = getUrls(relays) + const executor = getExecutor(urls) + + Meta.onPublish(urls) + + log(`Publishing to ${urls.length} relays`, event, urls) + return new Promise(resolve => { - let resolved = false const timeouts = new Set() const succeeded = new Set() const failed = new Set() @@ -325,26 +200,20 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => { } const attemptToResolve = () => { - // Don't report progress once we're done, even if more errors/ok come through - if (resolved) { - return - } - const progress = getProgress() - if (onProgress) { - onProgress(progress) - } - if (progress.pending.size === 0) { - log(`Finished publishing to ${urls.size} relays`, event, progress) + log(`Finished publishing to ${urls.length} relays`, event, progress) resolve(progress) - resolved = true + sub.unsubscribe() + executor.target.cleanup() + } else if (onProgress) { + onProgress(progress) } } setTimeout(() => { - for (const {url} of relays) { + for (const url of urls) { if (!succeeded.has(url) && !failed.has(url)) { timeouts.add(url) } @@ -353,29 +222,21 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => { attemptToResolve() }, timeout) - relays.map(async relay => { - const conn = await connect(relay.url) - - if (conn.status.code === STATUS.READY || conn.error.code === ERROR.UNAUTHORIZED) { - conn.publish(event, { - onOk: () => { - succeeded.add(relay.url) - timeouts.delete(relay.url) - failed.delete(relay.url) - attemptToResolve() - }, - onError: () => { - failed.add(relay.url) - timeouts.delete(relay.url) - attemptToResolve() - }, - }) - } else { - failed.add(relay.url) + const sub = executor.publish(event, { + onOk: url => { + succeeded.add(url) + timeouts.delete(url) + failed.delete(url) attemptToResolve() - } + }, + onError: url => { + failed.add(url) + timeouts.delete(url) + attemptToResolve() + }, }) + // Report progress to start attemptToResolve() }) } @@ -384,151 +245,88 @@ type SubscribeOpts = { relays: Relay[] filter: Filter[] | Filter onEvent: (event: MyEvent) => void - onError?: (url: string) => void onEose?: (url: string) => void } -const subscribe = async ({relays, filter, onEvent, onEose, onError}: SubscribeOpts) => { - filter = ensurePlural(filter) - - if (forceRelays.length > 0) { - relays = forceRelays - } - - const id = createFilterId(filter) +async function subscribe({relays, filter, onEvent, onEose}: SubscribeOpts) { + const urls = getUrls(relays) + const executor = getExecutor(urls) + const filters = ensurePlural(filter) const now = Date.now() const seen = new Set() const eose = new Set() - let active = true - - if (relays.length === 0) { - error(`Attempted to start subscription ${id} with zero relays`, filter) - } else { - log(`Starting subscription ${id} with ${relays.length} relays`, filter, relays) - } + log(`Starting subscription with ${relays.length} relays`, filters, relays) if (relays.length !== new Set(pluck("url", relays)).size) { error(`Subscribed to non-unique relays`, relays) } - const promises = relays.map(async relay => { - const conn = await connect(relay.url) + Meta.onSubscriptionStart(urls) - if (conn.status.code !== STATUS.READY) { - if (onError) { - onError(relay.url) - } - - return - } - - conn.stats.subsCount += 1 - conn.stats.activeSubsCount += 1 - - if (conn.stats.activeSubsCount > 10) { - warn(`Relay ${conn.url} has >10 active subscriptions`) - } - - return conn.subscribe(filter, id, { - onEvent: e => { - if (seen.has(e.id)) { - return - } - - seen.add(e.id) - - if (!verifySignature(e)) { - return - } - - conn.stats.eventsCount += 1 - - onEvent({...e, seen_on: relay.url, content: e.content || ""}) - }, - onEose: () => { - if (onEose) { - onEose(conn.url) - } - - // Keep track of relay timing stats, but only for the first eose we get - if (!eose.has(conn.url)) { - eose.add(conn.url) - - conn.stats.eoseCount += 1 - conn.stats.eoseTimer += Date.now() - now - } - }, - }) - }) - - return { - isActive: () => active, - unsub: () => { - if (!active) { + const sub = executor.subscribe(filters, { + onEvent: (url, e) => { + if (seen.has(e.id)) { return } - active = false + seen.add(e.id) - log(`Closing subscription ${id}`) + if (!verifySignature(e)) { + return + } - promises.forEach(async promise => { - const sub = await promise + Meta.onEvent(url) - if (sub) { - sub.unsub() - sub.conn.stats.activeSubsCount -= 1 - } - }) + onEvent({...e, seen_on: url, content: e.content || ""}) + }, + onEose: url => { + onEose?.(url) + + // Keep track of relay timing stats, but only for the first eose we get + if (!eose.has(url)) { + Meta.onEose(url, Date.now() - now) + } + + eose.add(url) + }, + }) + + return { + unsub: () => { + log(`Closing subscription`, filters) + + sub.unsubscribe() + executor.target.cleanup() + + Meta.onSubscriptionEnd(urls) }, } } -const count = async filter => { - const conn = await connect("wss://rbr.bio") - - if (!conn || conn.status.code !== STATUS.READY) { - return null - } - - filter = ensurePlural(filter) +async function count(filter) { + const filters = ensurePlural(filter) + const executor = getExecutor(["wss://rbr.bio"]) return new Promise(resolve => { - conn.count(filter, createFilterId(filter), { + const sub = executor.count(filters, { onCount: res => resolve(res?.count), }) + + setTimeout(() => { + resolve(0) + sub.unsubscribe() + executor.target.cleanup() + }, 3000) }) } -// Utils - -const createFilterId = filters => - [Math.random().toString().slice(2, 6), filters.map(describeFilter).join(":")].join("-") - -const describeFilter = ({kinds = [], ...filter}) => { - const parts = [] - - parts.push(kinds.join(",")) - - for (const [key, value] of Object.entries(filter)) { - if (is(Array, value)) { - parts.push(`${key}[${value.length}]`) - } else { - parts.push(key) - } - } - - return "(" + parts.join(",") + ")" -} - export default { - eventBus, - forceRelays, - getConnections, - getConnection, - connect, + Config, + Meta, + forceUrls, disconnect, + getQuality, publish, subscribe, count, diff --git a/src/agent/relays.ts b/src/agent/relays.ts index 67e0a51f..78d252b0 100644 --- a/src/agent/relays.ts +++ b/src/agent/relays.ts @@ -160,8 +160,11 @@ export const sampleRelays = (relays, scale = 1) => { limit *= scale } - // Remove relays that are currently in an error state - relays = relays.filter(r => !pool.getConnection(r.url)?.hasRecentError()) + // Remove relays that are currently in an error state or were recently closed + relays = relays.filter(r => { + if (pool.Meta.errors[r.url]) return false + if (pool.Meta.getStats(r.url).closed > Date.now() - 30_000) return false + }) // Limit target relays relays = relays.slice(0, limit) diff --git a/src/agent/user.ts b/src/agent/user.ts index 6fcd8df0..48919ba6 100644 --- a/src/agent/user.ts +++ b/src/agent/user.ts @@ -18,6 +18,7 @@ import {findReplyId, findRootId} from "src/util/nostr" import {synced} from "src/util/misc" import {derived} from "svelte/store" import keys from "src/agent/keys" +import pool from "src/agent/pool" import cmd from "src/agent/cmd" const profile = synced("agent/user/profile", { @@ -54,6 +55,7 @@ let profileCopy = null profile.subscribe($profile => { profileCopy = $profile + pool.Config.multiplextrUrl = $profile.settings.multiplextrUrl }) // Watch pubkey and add to profile diff --git a/src/app/connection.js b/src/app/connection.js index d759f1bd..0cbc8b73 100644 --- a/src/app/connection.js +++ b/src/app/connection.js @@ -11,14 +11,14 @@ setInterval(() => { const relayUrls = new Set(pluck("url", getUserRelays())) // Prune connections we haven't used in a while - Object.values(pool.getConnections()) - .filter(conn => conn.lastRequest < Date.now() - 60_000) - .forEach(conn => conn.nostr.close()) + Object.entries(pool.Meta.stats) + .filter(([url, stats]) => stats.lastRequest < Date.now() - 60_000) + .forEach(([url, stats]) => pool.disconnect(url)) // Alert the user to any heinously slow connections slowConnections.set( - Object.values(pool.getConnections()).filter( - c => relayUrls.has(c.url) && first(c.getQuality()) < 0.3 + Object.keys(pool.Meta.stats).filter( + url => relayUrls.has(url) && first(pool.getQuality(url)) < 0.3 ) ) }, 30_000) diff --git a/src/partials/RelayCard.svelte b/src/partials/RelayCard.svelte index e70312b3..fecdedb6 100644 --- a/src/partials/RelayCard.svelte +++ b/src/partials/RelayCard.svelte @@ -19,14 +19,7 @@ onMount(() => { return poll(10_000, async () => { - const conn = await pool.getConnection(relay.url) - - if (conn) { - ;[quality, message] = conn.getQuality() - } else { - quality = null - message = "Not connected" - } + ;[quality, message] = pool.getQuality(relay.url) }) }) diff --git a/src/routes/PersonDetail.svelte b/src/routes/PersonDetail.svelte index 3a718280..ed11f4a8 100644 --- a/src/routes/PersonDetail.svelte +++ b/src/routes/PersonDetail.svelte @@ -31,7 +31,7 @@ const interpolate = (a, b) => t => a + Math.round((b - a) * t) const {petnamePubkeys, canPublish, mutes} = user const getRelays = () => sampleRelays(relays.concat(getPubkeyWriteRelays(pubkey))) - const tabs = ["notes", "likes", pool.forceRelays.length === 0 && "relays"].filter(identity) + const tabs = ["notes", "likes", pool.forceUrls.length === 0 && "relays"].filter(identity) let pubkey = toHex(npub) let following = false @@ -79,7 +79,7 @@ }) } - if (pool.forceRelays.length === 0) { + if (pool.forceUrls.length === 0) { actions.push({onClick: openProfileInfo, label: "Profile", icon: "info"}) } diff --git a/src/routes/RelayDetail.svelte b/src/routes/RelayDetail.svelte index c669937b..1329c508 100644 --- a/src/routes/RelayDetail.svelte +++ b/src/routes/RelayDetail.svelte @@ -26,14 +26,7 @@ onMount(() => { return poll(10_000, async () => { - const conn = await pool.getConnection(relay.url) - - if (conn) { - ;[quality, message] = conn.getQuality() - } else { - quality = null - message = "Not connected" - } + ;[quality, message] = pool.getQuality(relay.url) }) }) diff --git a/src/views/SideNav.svelte b/src/views/SideNav.svelte index b30b0917..a71c37de 100644 --- a/src/views/SideNav.svelte +++ b/src/views/SideNav.svelte @@ -89,7 +89,7 @@
  • - {#if pool.forceRelays.length === 0} + {#if pool.forceUrls.length === 0}
  • Relays diff --git a/src/views/login/ConnectUser.svelte b/src/views/login/ConnectUser.svelte index fdf903ff..c3f2da91 100644 --- a/src/views/login/ConnectUser.svelte +++ b/src/views/login/ConnectUser.svelte @@ -115,7 +115,7 @@ }}>here.

    - {#if pool.forceRelays.length > 0} + {#if pool.forceUrls.length > 0} {:else if Object.values(currentRelays).length > 0}

    Currently searching:

    diff --git a/src/views/notes/Note.svelte b/src/views/notes/Note.svelte index 2ea45913..0baf4636 100644 --- a/src/views/notes/Note.svelte +++ b/src/views/notes/Note.svelte @@ -450,7 +450,7 @@ let:instance class="flex flex-col gap-2" on:click={() => instance.hide()}> - {#if pool.forceRelays.length === 0} + {#if pool.forceUrls.length === 0} { diff --git a/src/views/onboarding/Onboarding.svelte b/src/views/onboarding/Onboarding.svelte index c9cb7a6b..3a19bc05 100644 --- a/src/views/onboarding/Onboarding.svelte +++ b/src/views/onboarding/Onboarding.svelte @@ -1,5 +1,5 @@