From 1939725ac6fdffd8cb978bf7021d62a9c24fa842 Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Tue, 21 Mar 2023 10:00:34 -0500 Subject: [PATCH] Split up pool status and error to handle auth --- ROADMAP.md | 5 +- src/agent/pool.ts | 130 +++++++++++++++++++++++----------------------- 2 files changed, 67 insertions(+), 68 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 2bb4f3f8..20cf972b 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,10 +1,11 @@ # Current -- [ ] https://github.com/staab/coracle/issues/42 -- [ ] Fix iOS/safari/firefox +- [ ] Merge COUNT - [ ] Multiplex, charge past a certain usage level based on bandwidth - [ ] Fix compose, topics - [ ] Fix onboarding workflow w/forced relays +- [ ] https://github.com/staab/coracle/issues/42 +- [ ] Fix iOS/safari/firefox # Others diff --git a/src/agent/pool.ts b/src/agent/pool.ts index feec7b4a..eca970dc 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -1,7 +1,6 @@ import type {Relay, Filter} from "nostr-tools" import type {Deferred} from "src/util/misc" import type {MyEvent} from "src/util/types" -import {debounce} from "throttle-debounce" import {verifySignature} from "nostr-tools" import {pluck, objOf, identity, is} from "ramda" import {ensurePlural, noop} from "hurdak/lib/hurdak" @@ -20,16 +19,18 @@ const eventBus = new EventBus() const connections = {} -const CONNECTION_STATUS = { +const STATUS = { NEW: "new", PENDING: "pending", CLOSED: "closed", + ERROR: "error", READY: "ready", - AUTH: "auth", - ERROR: { - CONNECTION: "error/connection", - AUTH: "error/auth", - }, +} + +const ERROR = { + CONNECTION: "connection", + UNAUTHORIZED: "unauthorized", + FORBIDDEN: "forbidden", } class Connection { @@ -37,17 +38,23 @@ class Connection { url: string promise?: Deferred queue: string[] - status: {code: string; message: string; occurredAt: number} + error: {code: string; occurredAt: number} + status: {code: string; message: string} timeout?: number stats: Record bus: EventBus constructor(url) { + if (connections[url]) { + error(`Connection to ${url} already exists`) + } + this.ws = null this.url = url this.promise = null this.queue = [] this.timeout = null this.bus = new EventBus() + this.error = null this.stats = { timeouts: 0, subsCount: 0, @@ -57,28 +64,30 @@ class Connection { activeSubsCount: 0, } - this.setStatus(CONNECTION_STATUS.NEW, "Waiting to connect") - - if (connections[url]) { - error(`Connection to ${url} already exists`) - } + this.status = {code: STATUS.NEW, message: "Waiting to connect"} + this.listenForAuth() connections[url] = this } - setStatus(code, message, extra = {}) { - this.status = {code, message, ...extra, occurredAt: Date.now()} + setStatus(code, message) { + this.status = {code, message} + } + setError(code) { + this.error = {code, occurredAt: Date.now()} } connect() { if (this.ws) { - throw new Error("Attempted to connect when already connected") + error("Attempted to connect when already connected", this) } this.promise = defer() this.ws = new WebSocket(this.url) - this.setStatus(CONNECTION_STATUS.PENDING, "Trying to connect") + this.setStatus(STATUS.PENDING, "Trying to connect") this.ws.addEventListener("open", () => { - this.setStatus(CONNECTION_STATUS.READY, "Connected") + log(`Opened connection to ${this.url}`) + + this.setStatus(STATUS.READY, "Connected") this.promise.resolve() }) @@ -91,55 +100,38 @@ class Connection { }) this.ws.addEventListener("error", e => { - this.disconnect(CONNECTION_STATUS.ERROR.CONNECTION, "Failed to connect") + log(`Error on connection to ${this.url}`, e) + + this.disconnect() this.promise.reject() + this.setError(ERROR.CONNECTION) + this.setStatus(STATUS.CLOSED, "Closed") }) this.ws.addEventListener("close", () => { + log(`Closed connection to ${this.url}`) + this.disconnect() this.promise.reject() + this.setStatus(STATUS.CLOSED, "Closed") }) - - // Propagate auth to global handler - this.bus.on( - "AUTH", - debounce( - 10_000, - challenge => { - this.setStatus(CONNECTION_STATUS.AUTH, "Logging in") - - eventBus.handle("AUTH", challenge, this) - }, - { - atBegin: true, - } - ) - ) } - disconnect(code = null, message = null) { - if (this.ws?.readyState === WebSocket.OPEN) { + disconnect() { + if (this.ws) { + log(`Disconnecting from ${this.url}`) + this.ws.close() + this.ws = null } - - if (code) { - this.setStatus(code, message) - } else { - this.setStatus(CONNECTION_STATUS.CLOSED, "Closed") - } - - this.ws = null } async autoConnect() { - const {code, occurredAt} = this.status - const {NEW, CLOSED} = CONNECTION_STATUS - // If the connection has not been opened, or was closed, open 'er up - if ([NEW, CLOSED].includes(code)) { + if (!this.error && [STATUS.NEW, STATUS.CLOSED].includes(this.status.code)) { this.connect() } // If the connection failed, try to re-open after a while - if (code.startsWith("error") && Date.now() - 30_000 > occurredAt) { + if (this.error?.code === ERROR.CONNECTION && Date.now() - 30_000 > this.error.occurredAt) { this.disconnect() this.connect() } @@ -179,7 +171,7 @@ class Connection { return { conn: this, unsub: () => { - if (this.status.code === CONNECTION_STATUS.READY) { + if (this.status.code === STATUS.READY) { this.send("CLOSE", id, ...filters) } @@ -204,13 +196,25 @@ class Connection { this.send("EVENT", event) } + listenForAuth() { + // Propagate auth to global handler + this.bus.on("AUTH", challenge => { + if (!this.error) { + this.setStatus(STATUS.ERROR, "Logging in") + this.setError(ERROR.UNAUTHORIZED) + + eventBus.handle("AUTH", challenge, this) + } + }) + } checkAuth(eid) { const channel = this.bus.on("OK", (id, ok, message) => { if (id === eid) { if (ok) { - this.setStatus(CONNECTION_STATUS.READY, "Connected") + this.setStatus(STATUS.READY, "Connected") } else { - this.disconnect(CONNECTION_STATUS.ERROR.AUTH, message) + this.disconnect() + this.setError(ERROR.FORBIDDEN) } this.bus.off("OK", channel) @@ -218,10 +222,10 @@ class Connection { }) } hasRecentError() { - return this.status.code.startsWith("error") && Date.now() - this.status.occurredAt < 30_000 + return this.error && Date.now() - 30_000 < this.error.occurredAt } getQuality() { - if (this.status.code.startsWith("error")) { + if (this.status.code === STATUS.ERROR) { return [0, this.status.message] } @@ -241,15 +245,11 @@ class Connection { return [eoseQuality, "Connected"] } - const {NEW, PENDING, AUTH, CLOSED, READY} = CONNECTION_STATUS - - if ([NEW, PENDING, AUTH, CLOSED].includes(this.status.code)) { - return [0.5, this.status.message] - } - - if (this.status.code === READY) { + if (this.status.code === STATUS.READY) { return [1, "Connected"] } + + return [0.5, this.status.message] } } @@ -344,10 +344,8 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => { relays.map(async relay => { const conn = await connect(relay.url) - const {READY, AUTH} = CONNECTION_STATUS - const canPublish = [READY, AUTH].includes(conn.status.code) - if (canPublish) { + if (conn.status.code === STATUS.READY || conn.error.code === ERROR.UNAUTHORIZED) { conn.publish(event, { onOk: () => { succeeded.add(relay.url) @@ -406,7 +404,7 @@ const subscribe = async ({relays, filter, onEvent, onEose, onError}: SubscribeOp const promises = relays.map(async relay => { const conn = await connect(relay.url) - if (conn.status.code !== CONNECTION_STATUS.READY) { + if (conn.status.code !== STATUS.READY) { if (onError) { onError(relay.url) }