Remove dependency on nostr-tools for pool

This commit is contained in:
Jonathan Staab 2023-03-20 10:51:35 -05:00
parent 03fea8154f
commit e326df9a22
5 changed files with 197 additions and 102 deletions

View File

@ -1,10 +1,11 @@
import type {Relay, Filter} from "nostr-tools"
import type {Deferred} from "src/util/misc"
import type {MyEvent} from "src/util/types"
import {relayInit} from "nostr-tools"
import {verifySignature} from "nostr-tools"
import {pluck, objOf, identity, is} from "ramda"
import {ensurePlural} from "hurdak/lib/hurdak"
import {ensurePlural, noop} from "hurdak/lib/hurdak"
import {warn, log, error} from "src/util/logger"
import {union, now, difference} from "src/util/misc"
import {union, defer, tryJson, now, difference} from "src/util/misc"
import {isRelay, normalizeRelayUrl} from "src/util/nostr"
const forceRelays = (import.meta.env.VITE_FORCE_RELAYS || "")
@ -25,15 +26,31 @@ const CONNECTION_STATUS = {
}
class Connection {
promise: Promise<void>
nostr: Relay
ws?: WebSocket
url: string
promise?: Deferred<void>
queue: string[]
status: string
closed?: number
timeout?: number
listeners: Record<string, Record<string, (...args: any[]) => void>>
stats: Record<string, number>
lastConnectionAttempt: number
constructor(url) {
this.ws = null
this.url = url
this.promise = null
this.nostr = relayInit(url)
this.status = "new"
this.queue = []
this.status = CONNECTION_STATUS.NEW
this.closed = null
this.timeout = null
this.listeners = {
OK: {},
ERROR: {},
EVENT: {},
EOSE: {},
}
this.stats = {
timeouts: 0,
subsCount: 0,
@ -45,45 +62,117 @@ class Connection {
connections[url] = this
}
hasRecentError() {
return this.status === CONNECTION_STATUS.ERROR && now() - this.lastConnectionAttempt < 10
connect() {
if (this.ws) {
throw new Error("Attempted to connect when already connected")
}
this.status = CONNECTION_STATUS.PENDING
this.ws = new WebSocket(this.url)
this.promise = defer()
this.closed = null
this.ws.addEventListener("open", () => {
this.status = CONNECTION_STATUS.READY
this.promise.resolve()
})
this.ws.addEventListener("message", e => {
this.queue.push(e.data)
if (!this.timeout) {
this.timeout = window.setTimeout(() => this.handleMessages(), 10)
}
})
this.ws.addEventListener("error", () => {
this.status = CONNECTION_STATUS.ERROR
this.promise.reject()
this.closed = now()
})
this.ws.addEventListener("close", () => {
this.status = CONNECTION_STATUS.CLOSED
this.promise.reject()
this.closed = now()
})
}
async connect() {
const shouldConnect =
this.status === CONNECTION_STATUS.NEW ||
(this.status === CONNECTION_STATUS.ERROR && now() - this.lastConnectionAttempt > 10)
if (shouldConnect) {
this.status = CONNECTION_STATUS.PENDING
this.promise = this.nostr.connect()
this.nostr.on("connect", () => {
this.status = CONNECTION_STATUS.READY
})
this.nostr.on("error", () => {
this.status = CONNECTION_STATUS.ERROR
})
this.nostr.on("disconnect", () => {
this.status = CONNECTION_STATUS.CLOSED
})
disconnect() {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.close()
}
this.lastConnectionAttempt = now()
try {
await this.promise
} catch (e) {
// This is already handled in the on error handler above
this.ws = null
}
async autoConnect() {
if (this.status === CONNECTION_STATUS.NEW) {
this.connect()
} else if (this.closed && now() - 10 > this.closed) {
// If the connection was closed, try to re-open, but throttle it
this.disconnect()
this.connect()
}
await this.promise.catch(noop)
return this
}
disconnect() {
this.nostr.close()
on(name, id, cb) {
this.listeners[name][id] = cb
}
off(name, id) {
delete this.listeners[name][id]
}
handleMessages() {
for (const json of this.queue.splice(0, 10)) {
const message = tryJson(() => JSON.parse(json))
delete connections[this.nostr.url]
if (message) {
const [verb, ...payload] = message
for (const listener of Object.values(this.listeners[verb] || {})) {
listener(...payload)
}
}
}
this.timeout = this.queue.length > 0 ? window.setTimeout(() => this.handleMessages(), 10) : null
}
send(...payload) {
this.ws.send(JSON.stringify(payload))
}
subscribe(filters, id, {onEvent, onEose}) {
this.on("EVENT", id, (subid, e) => subid === id && onEvent(e))
this.on("EOSE", id, subid => subid === id && onEose())
this.send("REQ", id, ...filters)
return {
conn: this,
unsub: () => {
this.send("CLOSE", id, ...filters)
this.off("EVENT", id)
this.off("EOSE", id)
},
}
}
publish(event, {onOk, onError}) {
const withCleanup = f => eid => {
if (eid === event.id) {
f()
this.off("OK", event.id)
this.off("ERROR", event.id)
}
}
this.on("OK", event.id, withCleanup(onOk))
this.on("ERROR", event.id, withCleanup(onError))
this.send("EVENT", event)
}
hasRecentError() {
return this.status === CONNECTION_STATUS.ERROR && now() - this.closed < 10
}
getQuality() {
if (this.status === CONNECTION_STATUS.ERROR) {
@ -137,7 +226,15 @@ const connect = url => {
connections[url] = new Connection(url)
}
return connections[url].connect()
return connections[url].autoConnect()
}
const disconnect = url => {
if (connections[url]) {
connections[url].disconnect()
delete connections[url]
}
}
// Public api - publish/subscribe
@ -205,19 +302,18 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => {
const conn = await connect(relay.url)
if (conn.status === CONNECTION_STATUS.READY) {
const pub = conn.nostr.publish(event)
pub.on("ok", () => {
succeeded.add(relay.url)
timeouts.delete(relay.url)
failed.delete(relay.url)
attemptToResolve()
})
pub.on("failed", reason => {
failed.add(relay.url)
timeouts.delete(relay.url)
attemptToResolve()
conn.publish(event, {
onOk: () => {
succeeded.add(relay.url)
timeouts.delete(relay.url)
failed.delete(relay.url)
attemptToResolve()
},
onError: () => {
failed.add(relay.url)
timeouts.delete(relay.url)
attemptToResolve()
},
})
} else {
failed.add(relay.url)
@ -272,53 +368,43 @@ const subscribe = async ({relays, filter, onEvent, onEose, onError}: SubscribeOp
return
}
const sub = conn.nostr.sub(filter, {
id,
// This isn't currently working for some reason
// alreadyHaveEvent: (id, url) => {
// conn.stats.eventsCount += 1
// if (seen.has(id)) {
// return true
// }
// seen.add(id)
// return false
// },
})
sub.on("event", e => {
if (!seen.has(e.id)) {
seen.add(e.id)
// Normalize events here, annotate with relay url
onEvent({...e, seen_on: relay.url, content: e.content || ""})
}
})
sub.on("eose", () => {
if (onEose) {
onEose(conn.nostr.url)
}
// Keep track of relay timing stats, but only for the first eose we get
if (!eose.has(conn.nostr.url)) {
eose.add(conn.nostr.url)
conn.stats.eoseCount += 1
conn.stats.eoseTimer += Date.now() - now
}
})
conn.stats.subsCount += 1
conn.stats.activeSubsCount += 1
if (conn.stats.activeSubsCount > 10) {
warn(`Relay ${conn.nostr.url} has >10 active subscriptions`)
warn(`Relay ${conn.url} has >10 active subscriptions`)
}
return Object.assign(sub, {conn})
return conn.subscribe(filter, id, {
onEvent: e => {
if (seen.has(e.id)) {
return
}
seen.add(e.id)
if (!verifySignature(e)) {
return
}
conn.stats.eventsCount += 1
onEvent({...e, seen_on: relay.url, content: e.content || ""})
},
onEose: () => {
if (onEose) {
onEose(conn.url)
}
// Keep track of relay timing stats, but only for the first eose we get
if (!eose.has(conn.url)) {
eose.add(conn.url)
conn.stats.eoseCount += 1
conn.stats.eoseTimer += Date.now() - now
}
},
})
})
return {
@ -370,6 +456,7 @@ export default {
getConnections,
getConnection,
connect,
disconnect,
publish,
subscribe,
}

View File

@ -209,12 +209,15 @@ addHandler(
addHandler(
10002,
profileHandler("relays", (e, p) => {
return e.tags.map(([_, url, mode]) => {
const read = (mode || "read") === "read"
const write = (mode || "write") === "write"
return Tags.from(e)
.type("r")
.all()
.map(([_, url, mode]) => {
const read = (mode || "read") === "read"
const write = (mode || "write") === "write"
return {url, read, write}
})
return {url, read, write}
})
})
)

View File

@ -18,7 +18,7 @@ setInterval(() => {
// Alert the user to any heinously slow connections
slowConnections.set(
Object.values(pool.getConnections()).filter(
c => relayUrls.has(c.nostr.url) && first(c.getQuality()) < 0.3
c => relayUrls.has(c.url) && first(c.getQuality()) < 0.3
)
)
}, 30_000)

View File

@ -234,7 +234,12 @@ export const batch = (t, f) => {
}
}
export const defer = () => {
export type Deferred<T> = Promise<T> & {
resolve: (arg: T) => void
reject: (arg: T) => void
}
export const defer = (): Deferred<any> => {
let resolve, reject
const p = new Promise((resolve_, reject_) => {
resolve = resolve_

View File

@ -67,7 +67,7 @@
navigate("/notes/follows")
} else {
pool.getConnection(relay.url).disconnect()
pool.disconnect(relay.url)
}
})
}