mirror of
https://github.com/coracle-social/coracle.git
synced 2024-09-30 00:41:12 +00:00
Split up pool status and error to handle auth
This commit is contained in:
parent
6695ee7b7b
commit
1939725ac6
@ -1,10 +1,11 @@
|
||||
# Current
|
||||
|
||||
- [ ] https://github.com/staab/coracle/issues/42
|
||||
- [ ] Fix iOS/safari/firefox
|
||||
- [ ] Merge COUNT
|
||||
- [ ] Multiplex, charge past a certain usage level based on bandwidth
|
||||
- [ ] Fix compose, topics
|
||||
- [ ] Fix onboarding workflow w/forced relays
|
||||
- [ ] https://github.com/staab/coracle/issues/42
|
||||
- [ ] Fix iOS/safari/firefox
|
||||
|
||||
# Others
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
import type {Relay, Filter} from "nostr-tools"
|
||||
import type {Deferred} from "src/util/misc"
|
||||
import type {MyEvent} from "src/util/types"
|
||||
import {debounce} from "throttle-debounce"
|
||||
import {verifySignature} from "nostr-tools"
|
||||
import {pluck, objOf, identity, is} from "ramda"
|
||||
import {ensurePlural, noop} from "hurdak/lib/hurdak"
|
||||
@ -20,16 +19,18 @@ const eventBus = new EventBus()
|
||||
|
||||
const connections = {}
|
||||
|
||||
const CONNECTION_STATUS = {
|
||||
const STATUS = {
|
||||
NEW: "new",
|
||||
PENDING: "pending",
|
||||
CLOSED: "closed",
|
||||
ERROR: "error",
|
||||
READY: "ready",
|
||||
AUTH: "auth",
|
||||
ERROR: {
|
||||
CONNECTION: "error/connection",
|
||||
AUTH: "error/auth",
|
||||
},
|
||||
}
|
||||
|
||||
const ERROR = {
|
||||
CONNECTION: "connection",
|
||||
UNAUTHORIZED: "unauthorized",
|
||||
FORBIDDEN: "forbidden",
|
||||
}
|
||||
|
||||
class Connection {
|
||||
@ -37,17 +38,23 @@ class Connection {
|
||||
url: string
|
||||
promise?: Deferred<void>
|
||||
queue: string[]
|
||||
status: {code: string; message: string; occurredAt: number}
|
||||
error: {code: string; occurredAt: number}
|
||||
status: {code: string; message: string}
|
||||
timeout?: number
|
||||
stats: Record<string, number>
|
||||
bus: EventBus
|
||||
constructor(url) {
|
||||
if (connections[url]) {
|
||||
error(`Connection to ${url} already exists`)
|
||||
}
|
||||
|
||||
this.ws = null
|
||||
this.url = url
|
||||
this.promise = null
|
||||
this.queue = []
|
||||
this.timeout = null
|
||||
this.bus = new EventBus()
|
||||
this.error = null
|
||||
this.stats = {
|
||||
timeouts: 0,
|
||||
subsCount: 0,
|
||||
@ -57,28 +64,30 @@ class Connection {
|
||||
activeSubsCount: 0,
|
||||
}
|
||||
|
||||
this.setStatus(CONNECTION_STATUS.NEW, "Waiting to connect")
|
||||
|
||||
if (connections[url]) {
|
||||
error(`Connection to ${url} already exists`)
|
||||
}
|
||||
this.status = {code: STATUS.NEW, message: "Waiting to connect"}
|
||||
this.listenForAuth()
|
||||
|
||||
connections[url] = this
|
||||
}
|
||||
setStatus(code, message, extra = {}) {
|
||||
this.status = {code, message, ...extra, occurredAt: Date.now()}
|
||||
setStatus(code, message) {
|
||||
this.status = {code, message}
|
||||
}
|
||||
setError(code) {
|
||||
this.error = {code, occurredAt: Date.now()}
|
||||
}
|
||||
connect() {
|
||||
if (this.ws) {
|
||||
throw new Error("Attempted to connect when already connected")
|
||||
error("Attempted to connect when already connected", this)
|
||||
}
|
||||
|
||||
this.promise = defer()
|
||||
this.ws = new WebSocket(this.url)
|
||||
this.setStatus(CONNECTION_STATUS.PENDING, "Trying to connect")
|
||||
this.setStatus(STATUS.PENDING, "Trying to connect")
|
||||
|
||||
this.ws.addEventListener("open", () => {
|
||||
this.setStatus(CONNECTION_STATUS.READY, "Connected")
|
||||
log(`Opened connection to ${this.url}`)
|
||||
|
||||
this.setStatus(STATUS.READY, "Connected")
|
||||
this.promise.resolve()
|
||||
})
|
||||
|
||||
@ -91,55 +100,38 @@ class Connection {
|
||||
})
|
||||
|
||||
this.ws.addEventListener("error", e => {
|
||||
this.disconnect(CONNECTION_STATUS.ERROR.CONNECTION, "Failed to connect")
|
||||
log(`Error on connection to ${this.url}`, e)
|
||||
|
||||
this.disconnect()
|
||||
this.promise.reject()
|
||||
this.setError(ERROR.CONNECTION)
|
||||
this.setStatus(STATUS.CLOSED, "Closed")
|
||||
})
|
||||
|
||||
this.ws.addEventListener("close", () => {
|
||||
log(`Closed connection to ${this.url}`)
|
||||
|
||||
this.disconnect()
|
||||
this.promise.reject()
|
||||
this.setStatus(STATUS.CLOSED, "Closed")
|
||||
})
|
||||
|
||||
// Propagate auth to global handler
|
||||
this.bus.on(
|
||||
"AUTH",
|
||||
debounce(
|
||||
10_000,
|
||||
challenge => {
|
||||
this.setStatus(CONNECTION_STATUS.AUTH, "Logging in")
|
||||
|
||||
eventBus.handle("AUTH", challenge, this)
|
||||
},
|
||||
{
|
||||
atBegin: true,
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
disconnect(code = null, message = null) {
|
||||
if (this.ws?.readyState === WebSocket.OPEN) {
|
||||
disconnect() {
|
||||
if (this.ws) {
|
||||
log(`Disconnecting from ${this.url}`)
|
||||
|
||||
this.ws.close()
|
||||
this.ws = null
|
||||
}
|
||||
|
||||
if (code) {
|
||||
this.setStatus(code, message)
|
||||
} else {
|
||||
this.setStatus(CONNECTION_STATUS.CLOSED, "Closed")
|
||||
}
|
||||
|
||||
this.ws = null
|
||||
}
|
||||
async autoConnect() {
|
||||
const {code, occurredAt} = this.status
|
||||
const {NEW, CLOSED} = CONNECTION_STATUS
|
||||
|
||||
// If the connection has not been opened, or was closed, open 'er up
|
||||
if ([NEW, CLOSED].includes(code)) {
|
||||
if (!this.error && [STATUS.NEW, STATUS.CLOSED].includes(this.status.code)) {
|
||||
this.connect()
|
||||
}
|
||||
|
||||
// If the connection failed, try to re-open after a while
|
||||
if (code.startsWith("error") && Date.now() - 30_000 > occurredAt) {
|
||||
if (this.error?.code === ERROR.CONNECTION && Date.now() - 30_000 > this.error.occurredAt) {
|
||||
this.disconnect()
|
||||
this.connect()
|
||||
}
|
||||
@ -179,7 +171,7 @@ class Connection {
|
||||
return {
|
||||
conn: this,
|
||||
unsub: () => {
|
||||
if (this.status.code === CONNECTION_STATUS.READY) {
|
||||
if (this.status.code === STATUS.READY) {
|
||||
this.send("CLOSE", id, ...filters)
|
||||
}
|
||||
|
||||
@ -204,13 +196,25 @@ class Connection {
|
||||
|
||||
this.send("EVENT", event)
|
||||
}
|
||||
listenForAuth() {
|
||||
// Propagate auth to global handler
|
||||
this.bus.on("AUTH", challenge => {
|
||||
if (!this.error) {
|
||||
this.setStatus(STATUS.ERROR, "Logging in")
|
||||
this.setError(ERROR.UNAUTHORIZED)
|
||||
|
||||
eventBus.handle("AUTH", challenge, this)
|
||||
}
|
||||
})
|
||||
}
|
||||
checkAuth(eid) {
|
||||
const channel = this.bus.on("OK", (id, ok, message) => {
|
||||
if (id === eid) {
|
||||
if (ok) {
|
||||
this.setStatus(CONNECTION_STATUS.READY, "Connected")
|
||||
this.setStatus(STATUS.READY, "Connected")
|
||||
} else {
|
||||
this.disconnect(CONNECTION_STATUS.ERROR.AUTH, message)
|
||||
this.disconnect()
|
||||
this.setError(ERROR.FORBIDDEN)
|
||||
}
|
||||
|
||||
this.bus.off("OK", channel)
|
||||
@ -218,10 +222,10 @@ class Connection {
|
||||
})
|
||||
}
|
||||
hasRecentError() {
|
||||
return this.status.code.startsWith("error") && Date.now() - this.status.occurredAt < 30_000
|
||||
return this.error && Date.now() - 30_000 < this.error.occurredAt
|
||||
}
|
||||
getQuality() {
|
||||
if (this.status.code.startsWith("error")) {
|
||||
if (this.status.code === STATUS.ERROR) {
|
||||
return [0, this.status.message]
|
||||
}
|
||||
|
||||
@ -241,15 +245,11 @@ class Connection {
|
||||
return [eoseQuality, "Connected"]
|
||||
}
|
||||
|
||||
const {NEW, PENDING, AUTH, CLOSED, READY} = CONNECTION_STATUS
|
||||
|
||||
if ([NEW, PENDING, AUTH, CLOSED].includes(this.status.code)) {
|
||||
return [0.5, this.status.message]
|
||||
}
|
||||
|
||||
if (this.status.code === READY) {
|
||||
if (this.status.code === STATUS.READY) {
|
||||
return [1, "Connected"]
|
||||
}
|
||||
|
||||
return [0.5, this.status.message]
|
||||
}
|
||||
}
|
||||
|
||||
@ -344,10 +344,8 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => {
|
||||
|
||||
relays.map(async relay => {
|
||||
const conn = await connect(relay.url)
|
||||
const {READY, AUTH} = CONNECTION_STATUS
|
||||
const canPublish = [READY, AUTH].includes(conn.status.code)
|
||||
|
||||
if (canPublish) {
|
||||
if (conn.status.code === STATUS.READY || conn.error.code === ERROR.UNAUTHORIZED) {
|
||||
conn.publish(event, {
|
||||
onOk: () => {
|
||||
succeeded.add(relay.url)
|
||||
@ -406,7 +404,7 @@ const subscribe = async ({relays, filter, onEvent, onEose, onError}: SubscribeOp
|
||||
const promises = relays.map(async relay => {
|
||||
const conn = await connect(relay.url)
|
||||
|
||||
if (conn.status.code !== CONNECTION_STATUS.READY) {
|
||||
if (conn.status.code !== STATUS.READY) {
|
||||
if (onError) {
|
||||
onError(relay.url)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user