Add new pool based on paravel

This commit is contained in:
Jonathan Staab 2023-03-28 15:08:41 -05:00
parent 56a5d17116
commit e9da4e899b
19 changed files with 361 additions and 518 deletions

View File

@ -1,7 +1,7 @@
# Current
- [ ] Fix reactions and replies
- [ ] Multiplexer
- [ ] Explore the idea of separating everything into different components and wiring it all up into a system in a single file.
- [ ] Write NIP to support proxies. Update COUNT NIP to mention how proxies are a good use case for COUNT
- [ ] Fix iOS/safari/firefox
- [ ] https://github.com/staab/coracle/issues/42
@ -136,3 +136,4 @@
- Graph view? Query db with COUNT? Hardware specs on relay info endpoint?
- "adoptarelay.com"
- Add suggested relays based on follows or topics
- [ ] Integrate plephy https://plebhy.com/

77
package-lock.json generated
View File

@ -23,6 +23,7 @@
"lru-cache": "^7.18.3",
"nostr-tools": "^1.7.4",
"npm-run-all": "^4.1.5",
"paravel": "^0.1.7",
"qr-scanner": "^1.4.2",
"qrcode": "^1.5.1",
"ramda": "^0.28.0",
@ -5333,6 +5334,14 @@
"node": ">=10"
}
},
"node_modules/isomorphic-ws": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/isomorphic-ws/-/isomorphic-ws-5.0.0.tgz",
"integrity": "sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==",
"peerDependencies": {
"ws": "*"
}
},
"node_modules/jake": {
"version": "10.8.5",
"resolved": "https://registry.npmjs.org/jake/-/jake-10.8.5.tgz",
@ -5884,9 +5893,9 @@
}
},
"node_modules/nostr-tools": {
"version": "1.7.4",
"resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.7.4.tgz",
"integrity": "sha512-YowDJ+S3UW9KCYPDZfZXXMITrJSMjiCmFOK5HohyKkg+w6EipFUTkFRBPRA2BTLXO/qw8gukKXfL0B7Dv3jtcQ==",
"version": "1.8.1",
"resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.8.1.tgz",
"integrity": "sha512-/2IUe5xINUYT5hYBoEz51dfRaodbRHnyF8n+ZbKWCoh0ZRX6AL88OoDNrWaWWo7tP5j5OyzSL9g/z4TP7bshEA==",
"dependencies": {
"@noble/hashes": "1.0.0",
"@noble/secp256k1": "^1.7.1",
@ -6173,6 +6182,17 @@
"resolved": "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz",
"integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw=="
},
"node_modules/paravel": {
"version": "0.1.7",
"resolved": "https://registry.npmjs.org/paravel/-/paravel-0.1.7.tgz",
"integrity": "sha512-Ji35sAX94MbckQ2fvT2kOPkVhwZhB7tF/39pjEI9ZCJGQ+lWcqQ/gQR/mSUZbFDYn9FxfVilGt2pfgL27QG2Wg==",
"dependencies": {
"husky": "^8.0.3",
"isomorphic-ws": "^5.0.0",
"nostr-tools": "^1.7.5",
"npm-run-all": "^4.1.5"
}
},
"node_modules/parent-module": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz",
@ -8534,6 +8554,27 @@
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="
},
"node_modules/ws": {
"version": "8.13.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.13.0.tgz",
"integrity": "sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==",
"peer": true,
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/xtend": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",
@ -12328,6 +12369,12 @@
"resolved": "https://registry.npmjs.org/isomorphic-timers-promises/-/isomorphic-timers-promises-1.0.1.tgz",
"integrity": "sha512-u4sej9B1LPSxTGKB/HiuzvEQnXH0ECYkSVQU39koSwmFAxhlEAFl9RdTvLv4TOTQUgBS5O3O5fwUxk6byBZ+IQ=="
},
"isomorphic-ws": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/isomorphic-ws/-/isomorphic-ws-5.0.0.tgz",
"integrity": "sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==",
"requires": {}
},
"jake": {
"version": "10.8.5",
"resolved": "https://registry.npmjs.org/jake/-/jake-10.8.5.tgz",
@ -12762,9 +12809,9 @@
"dev": true
},
"nostr-tools": {
"version": "1.7.4",
"resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.7.4.tgz",
"integrity": "sha512-YowDJ+S3UW9KCYPDZfZXXMITrJSMjiCmFOK5HohyKkg+w6EipFUTkFRBPRA2BTLXO/qw8gukKXfL0B7Dv3jtcQ==",
"version": "1.8.1",
"resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.8.1.tgz",
"integrity": "sha512-/2IUe5xINUYT5hYBoEz51dfRaodbRHnyF8n+ZbKWCoh0ZRX6AL88OoDNrWaWWo7tP5j5OyzSL9g/z4TP7bshEA==",
"requires": {
"@noble/hashes": "1.0.0",
"@noble/secp256k1": "^1.7.1",
@ -12973,6 +13020,17 @@
"resolved": "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz",
"integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw=="
},
"paravel": {
"version": "0.1.7",
"resolved": "https://registry.npmjs.org/paravel/-/paravel-0.1.7.tgz",
"integrity": "sha512-Ji35sAX94MbckQ2fvT2kOPkVhwZhB7tF/39pjEI9ZCJGQ+lWcqQ/gQR/mSUZbFDYn9FxfVilGt2pfgL27QG2Wg==",
"requires": {
"husky": "^8.0.3",
"isomorphic-ws": "^5.0.0",
"nostr-tools": "^1.7.5",
"npm-run-all": "^4.1.5"
}
},
"parent-module": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz",
@ -14626,6 +14684,13 @@
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="
},
"ws": {
"version": "8.13.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.13.0.tgz",
"integrity": "sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==",
"peer": true,
"requires": {}
},
"xtend": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",

View File

@ -46,6 +46,7 @@
"lru-cache": "^7.18.3",
"nostr-tools": "^1.7.4",
"npm-run-all": "^4.1.5",
"paravel": "^0.1.7",
"qr-scanner": "^1.4.2",
"qrcode": "^1.5.1",
"ramda": "^0.28.0",

View File

@ -83,13 +83,16 @@
$: style.textContent = `:root { ${getThemeVariables($theme)}; background: var(--gray-8); }`
// When we get an AUTH challenge from our pool, attempt to authenticate
pool.eventBus.on("AUTH", async (challenge, connection) => {
const publishable = cmd.authenticate(challenge, url)
const [event] = await publishable.publish([{url: connection.url}])
const seenChallenges = new Set()
connection.checkAuth(event.id)
})
// When we get an AUTH challenge from our pool, attempt to authenticate
pool.Config.authHandler = async (url, challenge) => {
if (!seenChallenges.has(challenge)) {
seenChallenges.add(challenge)
return first(await cmd.authenticate(url, challenge).publish([{url}]))
}
}
onMount(() => {
// Keep scroll position on body, but don't allow scrolling

View File

@ -7,11 +7,11 @@ import pool from "src/agent/pool"
import sync from "src/agent/sync"
import keys from "src/agent/keys"
const authenticate = (challenge, relay) =>
const authenticate = (url, challenge) =>
new PublishableEvent(22242, {
tags: [
["challenge", challenge],
["relay", relay],
["relay", url],
],
})
@ -85,7 +85,7 @@ const processMentions = map(pubkey => {
const name = displayPerson(getPersonWithFallback(pubkey))
const relay = getRelayForPersonHint(pubkey)
return ["p", pubkey, relay?.url || '', name]
return ["p", pubkey, relay?.url || "", name]
})
const getReplyTags = n => {
@ -132,8 +132,11 @@ class PublishableEvent {
this.event = {kind, content, tags, pubkey, created_at: createdAt}
}
getSignedEvent() {
return keys.sign(this.event)
}
async publish(relays, onProgress = null) {
const event = await keys.sign(this.event)
const event = await this.getSignedEvent()
const promise = pool.publish({relays, event, onProgress})
sync.processEvents(event)

View File

@ -1,7 +1,6 @@
import type {MyEvent} from "src/util/types"
import {sortBy, assoc, uniq, uniqBy, prop, propEq, reject, groupBy, pluck} from "ramda"
import {sortBy, assoc, uniq, uniqBy, prop, propEq, groupBy, pluck} from "ramda"
import {personKinds, findReplyId} from "src/util/nostr"
import {log} from "src/util/logger"
import {chunk} from "hurdak/lib/hurdak"
import {batch, now, timedelta} from "src/util/misc"
import {
@ -42,6 +41,7 @@ const listen = ({relays, filter, onChunk = null, shouldProcess = true, delay = 5
const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5000}) => {
return new Promise(resolve => {
let completed = false
const done = new Set()
const allEvents = []
@ -49,25 +49,16 @@ const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5
const sub = await subPromise
// If we've already unsubscribed we're good
if (!sub.isActive()) {
if (completed) {
return
}
const isDone = done.size === relays.length
if (force) {
const timedOutRelays = reject(r => done.has(r.url), relays)
log(
`Timing out ${timedOutRelays.length}/${relays.length} relays after ${timeout}ms`,
timedOutRelays
)
timedOutRelays.forEach(relay => {
const conn = pool.getConnection(relay.url)
if (conn) {
conn.stats.timeouts += 1
relays.forEach(relay => {
if (!done.has(relay.url)) {
pool.Meta.onTimeout(relay.url)
}
})
}
@ -75,6 +66,7 @@ const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5
if (isDone || force) {
sub.unsub()
resolve(allEvents)
completed = true
}
}
@ -101,10 +93,6 @@ const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5
done.add(url)
attemptToComplete(false)
},
onError: url => {
done.add(url)
attemptToComplete(false)
},
})
}) as Promise<MyEvent[]>
}

View File

@ -1,318 +1,193 @@
import type {Relay, Filter} from "nostr-tools"
import type {Deferred} from "src/util/misc"
import type {MyEvent} from "src/util/types"
import {Socket, Pool, Plex, Relays, Executor} from "paravel"
import {verifySignature} from "nostr-tools"
import {pluck, objOf, identity, is} from "ramda"
import {ensurePlural, noop} from "hurdak/lib/hurdak"
import {pluck, identity} from "ramda"
import {ensurePlural, switcher} from "hurdak/lib/hurdak"
import {warn, log, error} from "src/util/logger"
import {union, EventBus, defer, tryJson, difference} from "src/util/misc"
import {isRelay, normalizeRelayUrl} from "src/util/nostr"
import {union, difference} from "src/util/misc"
import {normalizeRelayUrl} from "src/util/nostr"
const forceRelays = (import.meta.env.VITE_FORCE_RELAYS || "")
.split(",")
.filter(identity)
.map(objOf("url"))
// Connection management
const eventBus = new EventBus()
const connections = {}
const STATUS = {
NEW: "new",
PENDING: "pending",
CLOSED: "closed",
ERROR: "error",
READY: "ready",
const Config = {
multiplextrUrl: null,
authHandler: null,
}
const ERROR = {
CONNECTION: "connection",
UNAUTHORIZED: "unauthorized",
FORBIDDEN: "forbidden",
const Meta = {
stats: {},
errors: {},
getStats(url) {
if (!this.stats[url]) {
this.stats[url] = {
error: null,
timeouts: 0,
subsCount: 0,
eoseCount: 0,
eoseTimer: 0,
eventsCount: 0,
activeSubsCount: 0,
lastRequest: 0,
openedAt: 0,
closedAt: 0,
}
}
return this.stats[url]
},
onPublish(urls) {
urls.forEach(url => {
const stats = this.getStats(url)
stats.lastRequest = Date.now()
})
},
onSubscriptionStart(urls) {
urls.forEach(url => {
const stats = this.getStats(url)
stats.subsCount += 1
stats.activeSubsCount += 1
stats.lastRequest = Date.now()
if (stats.activeSubsCount > 10) {
warn(`Relay ${url} has >10 active subscriptions`)
}
})
},
onSubscriptionEnd(urls) {
urls.forEach(url => {
const stats = this.getStats(url)
stats.activeSubsCount -= 1
})
},
onEvent(url) {
const stats = this.getStats(url)
stats.eventsCount += 1
},
onEose(url, ms) {
const stats = this.getStats(url)
stats.eoseCount += 1
stats.eoseTimer += ms
},
onTimeout(url) {
const stats = this.getStats(url)
stats.timeouts += 1
},
}
class Connection {
ws?: WebSocket
url: string
promise?: Deferred<void>
queue: string[]
error: {code: string; message: string, occurredAt: number}
status: {code: string; message: string}
timeout?: number
stats: Record<string, number>
bus: EventBus
constructor(url) {
if (connections[url]) {
error(`Connection to ${url} already exists`)
}
this.ws = null
this.url = url
this.promise = null
this.queue = []
this.timeout = null
this.bus = new EventBus()
this.error = null
this.stats = {
timeouts: 0,
subsCount: 0,
eoseCount: 0,
eoseTimer: 0,
eventsCount: 0,
activeSubsCount: 0,
}
this.status = {code: STATUS.NEW, message: "Waiting to connect"}
this.listenForAuth()
connections[url] = this
}
setStatus(code, message) {
this.status = {code, message}
}
setError(code, message) {
this.error = {code, message, occurredAt: Date.now()}
}
connect() {
if (this.ws) {
error("Attempted to connect when already connected", this)
}
this.promise = defer()
this.ws = new WebSocket(this.url)
this.setStatus(STATUS.PENDING, "Trying to connect")
this.ws.addEventListener("open", () => {
log(`Opened connection to ${this.url}`)
this.setStatus(STATUS.READY, "Connected")
this.promise.resolve()
})
this.ws.addEventListener("message", e => {
this.queue.push(e.data)
if (!this.timeout) {
this.timeout = window.setTimeout(() => this.handleMessages(), 10)
}
})
this.ws.addEventListener("error", e => {
log(`Error on connection to ${this.url}`)
this.disconnect()
this.promise.reject()
this.setError(ERROR.CONNECTION, "Disconnected")
this.setStatus(STATUS.CLOSED, "Closed")
})
this.ws.addEventListener("close", () => {
log(`Closed connection to ${this.url}`)
this.disconnect()
this.promise.reject()
this.setStatus(STATUS.CLOSED, "Closed")
})
}
disconnect() {
if (this.ws) {
log(`Disconnecting from ${this.url}`)
this.ws.close()
this.ws = null
}
}
async autoConnect() {
// If the connection has not been opened, or was closed, open 'er up
if (!this.error && [STATUS.NEW, STATUS.CLOSED].includes(this.status.code)) {
this.connect()
}
// If the connection failed, try to re-open after a while
if (this.error?.code === ERROR.CONNECTION && Date.now() - 30_000 > this.error.occurredAt) {
this.disconnect()
this.connect()
}
await this.promise.catch(noop)
return this
}
handleMessages() {
for (const json of this.queue.splice(0, 10)) {
const message = tryJson(() => JSON.parse(json))
if (message) {
const [k, ...payload] = message
this.bus.handle(k, ...payload)
}
}
this.timeout = this.queue.length > 0 ? window.setTimeout(() => this.handleMessages(), 10) : null
}
send(...payload) {
if (this.ws?.readyState !== 1) {
warn("Send attempted before socket was ready", this)
}
this.ws.send(JSON.stringify(payload))
}
subscribe(filters, id, {onEvent, onEose}) {
const [eventChannel, eoseChannel] = [
this.bus.on("EVENT", (subid, e) => subid === id && onEvent(e)),
this.bus.on("EOSE", subid => subid === id && onEose()),
]
this.send("REQ", id, ...filters)
return {
conn: this,
unsub: () => {
if (this.status.code === STATUS.READY) {
this.send("CLOSE", id)
}
this.bus.off("EVENT", eventChannel)
this.bus.off("EOSE", eoseChannel)
},
}
}
publish(event, {onOk, onError}) {
const withCleanup = cb => k => {
if (k === event.id) {
cb()
this.bus.off("OK", okChannel)
this.bus.off("ERROR", errorChannel)
}
}
const [okChannel, errorChannel] = [
this.bus.on("OK", withCleanup(onOk)),
this.bus.on("ERROR", withCleanup(onError)),
]
this.send("EVENT", event)
}
count(filter, id, {onCount}) {
const channel = this.bus.on("COUNT", (subid, ...payload) => {
if (subid === id) {
onCount(...payload)
this.bus.off("COUNT", channel)
}
})
this.send("COUNT", id, ...filter)
}
listenForAuth() {
// Propagate auth to global handler
this.bus.on("AUTH", challenge => {
if (!this.error) {
this.setStatus(STATUS.ERROR, "Logging in")
this.setError(ERROR.UNAUTHORIZED, "Logging in")
eventBus.handle("AUTH", challenge, this)
}
})
}
checkAuth(eid) {
const channel = this.bus.on("OK", (id, ok, message) => {
if (id === eid) {
if (ok) {
this.setStatus(STATUS.READY, "Connected")
} else {
this.disconnect()
this.setError(ERROR.FORBIDDEN, "Access restricted")
}
this.bus.off("OK", channel)
}
})
}
hasRecentError() {
return this.error && Date.now() - 30_000 < this.error.occurredAt
}
getQuality() {
if (this.error) {
return [0, this.error.message]
}
const {timeouts, subsCount, eoseTimer, eoseCount} = this.stats
const timeoutRate = timeouts > 0 ? timeouts / subsCount : null
const eoseQuality = eoseCount > 0 ? Math.max(1, 500 / (eoseTimer / eoseCount)) : null
if (timeoutRate && timeoutRate > 0.5) {
return [1 - timeoutRate, "Slow connection"]
}
if (eoseQuality && eoseQuality < 0.7) {
return [eoseQuality, "Slow connection"]
}
if (eoseQuality) {
return [eoseQuality, "Connected"]
}
if (this.status.code === STATUS.READY) {
return [1, "Connected"]
}
return [0.5, this.status.message]
}
}
const getConnections = () => connections
const getConnection = url => connections[url]
const connect = url => {
if (!isRelay(url)) {
warn(`Invalid relay url ${url}`)
}
if (url !== normalizeRelayUrl(url)) {
warn(`Received non-normalized relay url ${url}`)
}
if (!connections[url]) {
connections[url] = new Connection(url)
}
return connections[url].autoConnect()
}
const disconnect = url => {
if (connections[url]) {
connections[url].disconnect()
delete connections[url]
}
}
// Public api - publish/subscribe
const publish = async ({relays, event, onProgress, timeout = 5000}) => {
if (forceRelays.length > 0) {
relays = forceRelays
}
const forceUrls = (import.meta.env.VITE_FORCE_RELAYS || "").split(",").filter(identity)
const getUrls = relays => {
if (relays.length === 0) {
error(`Attempted to publish to zero relays`, event)
} else {
log(`Publishing to ${relays.length} relays`, event, relays)
error(`Attempted to connect to zero urls`)
}
const urls = new Set(pluck("url", relays))
const urls = new Set(pluck("url", relays).map(normalizeRelayUrl))
if (urls.size !== relays.length) {
warn(`Attempted to publish to non-unique relays`)
warn(`Attempted to connect to non-unique relays`)
}
return Array.from(urls)
}
const pool = new Pool()
pool.bus.addListeners({
open: ({url}) => {
const stats = Meta.getStats(url)
stats.openedAt = Date.now()
},
close: ({url}) => {
const stats = Meta.getStats(url)
stats.closedAt = Date.now()
},
})
function disconnect(url) {
pool.remove(url)
delete Meta.stats[url]
delete Meta.errors[url]
}
function getQuality(url) {
if (Meta.errors[url]) {
return [
0,
switcher(Meta.errors[url], {
disconnected: "Disconnected",
unauthorized: "Logging in",
forbidden: "Failed to log in",
}),
]
}
const stats = Meta.getStats(url)
const {timeouts, subsCount, eoseTimer, eoseCount} = stats
const timeoutRate = timeouts > 0 ? timeouts / subsCount : null
const eoseQuality = eoseCount > 0 ? Math.max(1, 500 / (eoseTimer / eoseCount)) : null
if (timeoutRate && timeoutRate > 0.5) {
return [1 - timeoutRate, "Slow connection"]
}
if (eoseQuality && eoseQuality < 0.7) {
return [eoseQuality, "Slow connection"]
}
if (eoseQuality) {
return [eoseQuality, "Connected"]
}
if (pool.get(url).status === Socket.STATUS.READY) {
return [1, "Connected"]
}
return [0.5, "Not Connected"]
}
function getExecutor(urls) {
if (forceUrls.length > 0) {
urls = forceUrls
}
const executor = new Executor(
Config.multiplextrUrl
? new Plex(urls, pool.get(Config.multiplextrUrl))
: new Relays(urls.map(url => pool.get(url)))
)
executor.handleAuth({
onAuth(url, challenge) {
Meta.errors[url] = "unauthorized"
return Config.authHandler(url, challenge)
},
onOk(url, id, ok, message) {
Meta.errors[url] = ok ? null : "forbidden"
},
})
return executor
}
async function publish({relays, event, onProgress, timeout = 3000}) {
const urls = getUrls(relays)
const executor = getExecutor(urls)
Meta.onPublish(urls)
log(`Publishing to ${urls.length} relays`, event, urls)
return new Promise(resolve => {
let resolved = false
const timeouts = new Set()
const succeeded = new Set()
const failed = new Set()
@ -325,26 +200,20 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => {
}
const attemptToResolve = () => {
// Don't report progress once we're done, even if more errors/ok come through
if (resolved) {
return
}
const progress = getProgress()
if (onProgress) {
onProgress(progress)
}
if (progress.pending.size === 0) {
log(`Finished publishing to ${urls.size} relays`, event, progress)
log(`Finished publishing to ${urls.length} relays`, event, progress)
resolve(progress)
resolved = true
sub.unsubscribe()
executor.target.cleanup()
} else if (onProgress) {
onProgress(progress)
}
}
setTimeout(() => {
for (const {url} of relays) {
for (const url of urls) {
if (!succeeded.has(url) && !failed.has(url)) {
timeouts.add(url)
}
@ -353,29 +222,21 @@ const publish = async ({relays, event, onProgress, timeout = 5000}) => {
attemptToResolve()
}, timeout)
relays.map(async relay => {
const conn = await connect(relay.url)
if (conn.status.code === STATUS.READY || conn.error.code === ERROR.UNAUTHORIZED) {
conn.publish(event, {
onOk: () => {
succeeded.add(relay.url)
timeouts.delete(relay.url)
failed.delete(relay.url)
attemptToResolve()
},
onError: () => {
failed.add(relay.url)
timeouts.delete(relay.url)
attemptToResolve()
},
})
} else {
failed.add(relay.url)
const sub = executor.publish(event, {
onOk: url => {
succeeded.add(url)
timeouts.delete(url)
failed.delete(url)
attemptToResolve()
}
},
onError: url => {
failed.add(url)
timeouts.delete(url)
attemptToResolve()
},
})
// Report progress to start
attemptToResolve()
})
}
@ -384,151 +245,88 @@ type SubscribeOpts = {
relays: Relay[]
filter: Filter[] | Filter
onEvent: (event: MyEvent) => void
onError?: (url: string) => void
onEose?: (url: string) => void
}
const subscribe = async ({relays, filter, onEvent, onEose, onError}: SubscribeOpts) => {
filter = ensurePlural(filter)
if (forceRelays.length > 0) {
relays = forceRelays
}
const id = createFilterId(filter)
async function subscribe({relays, filter, onEvent, onEose}: SubscribeOpts) {
const urls = getUrls(relays)
const executor = getExecutor(urls)
const filters = ensurePlural(filter)
const now = Date.now()
const seen = new Set()
const eose = new Set()
let active = true
if (relays.length === 0) {
error(`Attempted to start subscription ${id} with zero relays`, filter)
} else {
log(`Starting subscription ${id} with ${relays.length} relays`, filter, relays)
}
log(`Starting subscription with ${relays.length} relays`, filters, relays)
if (relays.length !== new Set(pluck("url", relays)).size) {
error(`Subscribed to non-unique relays`, relays)
}
const promises = relays.map(async relay => {
const conn = await connect(relay.url)
Meta.onSubscriptionStart(urls)
if (conn.status.code !== STATUS.READY) {
if (onError) {
onError(relay.url)
}
return
}
conn.stats.subsCount += 1
conn.stats.activeSubsCount += 1
if (conn.stats.activeSubsCount > 10) {
warn(`Relay ${conn.url} has >10 active subscriptions`)
}
return conn.subscribe(filter, id, {
onEvent: e => {
if (seen.has(e.id)) {
return
}
seen.add(e.id)
if (!verifySignature(e)) {
return
}
conn.stats.eventsCount += 1
onEvent({...e, seen_on: relay.url, content: e.content || ""})
},
onEose: () => {
if (onEose) {
onEose(conn.url)
}
// Keep track of relay timing stats, but only for the first eose we get
if (!eose.has(conn.url)) {
eose.add(conn.url)
conn.stats.eoseCount += 1
conn.stats.eoseTimer += Date.now() - now
}
},
})
})
return {
isActive: () => active,
unsub: () => {
if (!active) {
const sub = executor.subscribe(filters, {
onEvent: (url, e) => {
if (seen.has(e.id)) {
return
}
active = false
seen.add(e.id)
log(`Closing subscription ${id}`)
if (!verifySignature(e)) {
return
}
promises.forEach(async promise => {
const sub = await promise
Meta.onEvent(url)
if (sub) {
sub.unsub()
sub.conn.stats.activeSubsCount -= 1
}
})
onEvent({...e, seen_on: url, content: e.content || ""})
},
onEose: url => {
onEose?.(url)
// Keep track of relay timing stats, but only for the first eose we get
if (!eose.has(url)) {
Meta.onEose(url, Date.now() - now)
}
eose.add(url)
},
})
return {
unsub: () => {
log(`Closing subscription`, filters)
sub.unsubscribe()
executor.target.cleanup()
Meta.onSubscriptionEnd(urls)
},
}
}
const count = async filter => {
const conn = await connect("wss://rbr.bio")
if (!conn || conn.status.code !== STATUS.READY) {
return null
}
filter = ensurePlural(filter)
async function count(filter) {
const filters = ensurePlural(filter)
const executor = getExecutor(["wss://rbr.bio"])
return new Promise(resolve => {
conn.count(filter, createFilterId(filter), {
const sub = executor.count(filters, {
onCount: res => resolve(res?.count),
})
setTimeout(() => {
resolve(0)
sub.unsubscribe()
executor.target.cleanup()
}, 3000)
})
}
// Utils
const createFilterId = filters =>
[Math.random().toString().slice(2, 6), filters.map(describeFilter).join(":")].join("-")
const describeFilter = ({kinds = [], ...filter}) => {
const parts = []
parts.push(kinds.join(","))
for (const [key, value] of Object.entries(filter)) {
if (is(Array, value)) {
parts.push(`${key}[${value.length}]`)
} else {
parts.push(key)
}
}
return "(" + parts.join(",") + ")"
}
export default {
eventBus,
forceRelays,
getConnections,
getConnection,
connect,
Config,
Meta,
forceUrls,
disconnect,
getQuality,
publish,
subscribe,
count,

View File

@ -160,8 +160,11 @@ export const sampleRelays = (relays, scale = 1) => {
limit *= scale
}
// Remove relays that are currently in an error state
relays = relays.filter(r => !pool.getConnection(r.url)?.hasRecentError())
// Remove relays that are currently in an error state or were recently closed
relays = relays.filter(r => {
if (pool.Meta.errors[r.url]) return false
if (pool.Meta.getStats(r.url).closed > Date.now() - 30_000) return false
})
// Limit target relays
relays = relays.slice(0, limit)

View File

@ -18,6 +18,7 @@ import {findReplyId, findRootId} from "src/util/nostr"
import {synced} from "src/util/misc"
import {derived} from "svelte/store"
import keys from "src/agent/keys"
import pool from "src/agent/pool"
import cmd from "src/agent/cmd"
const profile = synced("agent/user/profile", {
@ -54,6 +55,7 @@ let profileCopy = null
profile.subscribe($profile => {
profileCopy = $profile
pool.Config.multiplextrUrl = $profile.settings.multiplextrUrl
})
// Watch pubkey and add to profile

View File

@ -11,14 +11,14 @@ setInterval(() => {
const relayUrls = new Set(pluck("url", getUserRelays()))
// Prune connections we haven't used in a while
Object.values(pool.getConnections())
.filter(conn => conn.lastRequest < Date.now() - 60_000)
.forEach(conn => conn.nostr.close())
Object.entries(pool.Meta.stats)
.filter(([url, stats]) => stats.lastRequest < Date.now() - 60_000)
.forEach(([url, stats]) => pool.disconnect(url))
// Alert the user to any heinously slow connections
slowConnections.set(
Object.values(pool.getConnections()).filter(
c => relayUrls.has(c.url) && first(c.getQuality()) < 0.3
Object.keys(pool.Meta.stats).filter(
url => relayUrls.has(url) && first(pool.getQuality(url)) < 0.3
)
)
}, 30_000)

View File

@ -19,14 +19,7 @@
onMount(() => {
return poll(10_000, async () => {
const conn = await pool.getConnection(relay.url)
if (conn) {
;[quality, message] = conn.getQuality()
} else {
quality = null
message = "Not connected"
}
;[quality, message] = pool.getQuality(relay.url)
})
})
</script>

View File

@ -31,7 +31,7 @@
const interpolate = (a, b) => t => a + Math.round((b - a) * t)
const {petnamePubkeys, canPublish, mutes} = user
const getRelays = () => sampleRelays(relays.concat(getPubkeyWriteRelays(pubkey)))
const tabs = ["notes", "likes", pool.forceRelays.length === 0 && "relays"].filter(identity)
const tabs = ["notes", "likes", pool.forceUrls.length === 0 && "relays"].filter(identity)
let pubkey = toHex(npub)
let following = false
@ -79,7 +79,7 @@
})
}
if (pool.forceRelays.length === 0) {
if (pool.forceUrls.length === 0) {
actions.push({onClick: openProfileInfo, label: "Profile", icon: "info"})
}

View File

@ -26,14 +26,7 @@
onMount(() => {
return poll(10_000, async () => {
const conn = await pool.getConnection(relay.url)
if (conn) {
;[quality, message] = conn.getQuality()
} else {
quality = null
message = "Not connected"
}
;[quality, message] = pool.getQuality(relay.url)
})
})

View File

@ -89,7 +89,7 @@
</a>
</li>
<li class="mx-3 my-4 h-px bg-gray-6" />
{#if pool.forceRelays.length === 0}
{#if pool.forceUrls.length === 0}
<li class="relative cursor-pointer">
<a class="block px-4 py-2 transition-all hover:bg-accent hover:text-white" href="/relays">
<i class="fa fa-server mr-2" /> Relays

View File

@ -115,7 +115,7 @@
}}>here</Anchor
>.
</p>
{#if pool.forceRelays.length > 0}
{#if pool.forceUrls.length > 0}
<Spinner />
{:else if Object.values(currentRelays).length > 0}
<p>Currently searching:</p>

View File

@ -450,7 +450,7 @@
let:instance
class="flex flex-col gap-2"
on:click={() => instance.hide()}>
{#if pool.forceRelays.length === 0}
{#if pool.forceUrls.length === 0}
<Anchor
type="button-circle"
on:click={() => {

View File

@ -1,5 +1,5 @@
<script lang="ts">
import {uniq} from "ramda"
import {uniq, objOf} from "ramda"
import {onMount} from "svelte"
import {generatePrivateKey} from "nostr-tools"
import {fly} from "svelte/transition"
@ -28,8 +28,8 @@
let relays = []
if ($userRelays.length > 0) {
relays = $userRelays
} else if (pool.forceRelays.length > 0) {
relays = pool.forceRelays
} else if (pool.forceUrls.length > 0) {
relays = pool.forceUrls.map(objOf("url"))
} else {
relays = [
{url: "wss://nostr-pub.wellorder.net", write: true},

View File

@ -11,7 +11,7 @@
export let privkey
const nsec = nip19.nsecEncode(privkey)
const nextStage = pool.forceRelays.length > 0 ? "follows" : "relays"
const nextStage = pool.forceUrls.length > 0 ? "follows" : "relays"
const copyKey = () => {
copyToClipboard(nsec)

View File

@ -35,14 +35,7 @@
onMount(() => {
return poll(10_000, async () => {
const conn = await pool.getConnection(relay.url)
if (conn) {
;[quality, message] = conn.getQuality()
} else {
quality = null
message = "Not connected"
}
;[quality, message] = await pool.getQuality(relay.url)
})
})
</script>