Port pool and meta to system

This commit is contained in:
Jonathan Staab 2023-07-08 10:35:58 -07:00
parent 9fb8a6c4de
commit 1a8962258a
19 changed files with 456 additions and 481 deletions

View File

@ -17,8 +17,7 @@ import {
import {personKinds, appDataKeys, findReplyId} from "src/util/nostr"
import {chunk, ensurePlural} from "hurdak/lib/hurdak"
import {batch, now, timedelta} from "src/util/misc"
import {ENABLE_ZAPS, routing, settings, directory} from "src/system"
import pool from "src/agent/pool"
import {ENABLE_ZAPS, routing, settings, directory, network} from "src/system"
const ext = {sync: null}
@ -53,7 +52,7 @@ const listen = ({
shouldProcess?: boolean
delay?: number
}) => {
return pool.subscribe({
return network.subscribe({
filter,
relays,
onEvent: batch(delay, chunk => {
@ -97,7 +96,7 @@ const load = ({
if (force) {
relays.forEach(url => {
if (!done.has(url)) {
pool.Meta.onTimeout(url)
network.pool.emit("timeout", url)
}
})
}
@ -112,7 +111,7 @@ const load = ({
// If a relay takes too long, give up
setTimeout(() => attemptToComplete(true), timeout)
const subPromise = pool.subscribe({
const subPromise = network.subscribe({
relays,
filter,
onEvent: batch(500, chunk => {

View File

@ -1,412 +0,0 @@
import type {Filter} from "nostr-tools"
import type {MyEvent} from "src/util/types"
import {Socket, Pool, Plex, Relays, Executor} from "paravel"
import {verifySignature} from "nostr-tools"
import {objOf, identity} from "ramda"
import {ensurePlural, switcher} from "hurdak/lib/hurdak"
import {warn, log, error} from "src/util/logger"
import {union, sleep, difference} from "src/util/misc"
import {normalizeRelayUrl} from "src/util/nostr"
const ext = {
routing: null,
}
const Config = {
multiplextrUrl: null,
authHandler: null,
}
type StatsItem = {
error: string | null
timeouts: number
subsCount: number
eoseCount: number
eoseTimer: number
eventsCount: number
activeSubsCount: number
lastRequest: number
openedAt: number
closedAt: number
}
const Meta = {
stats: {} as Record<string, StatsItem>,
errors: {},
getStats(url) {
if (!this.stats[url]) {
this.stats[url] = {
error: null,
timeouts: 0,
subsCount: 0,
eoseCount: 0,
eoseTimer: 0,
eventsCount: 0,
activeSubsCount: 0,
lastRequest: 0,
openedAt: 0,
closedAt: 0,
}
}
return this.stats[url]
},
onPublish(urls) {
urls.forEach(url => {
const stats = this.getStats(url)
stats.lastRequest = Date.now()
})
},
onSubscriptionStart(urls) {
urls.forEach(url => {
const stats = this.getStats(url)
stats.subsCount += 1
stats.activeSubsCount += 1
stats.lastRequest = Date.now()
})
},
onSubscriptionEnd(urls) {
urls.forEach(url => {
const stats = this.getStats(url)
stats.activeSubsCount -= 1
})
},
onEvent(url) {
const stats = this.getStats(url)
stats.eventsCount += 1
},
onEose(url, ms) {
const stats = this.getStats(url)
stats.eoseCount += 1
stats.eoseTimer += ms
},
onTimeout(url) {
const stats = this.getStats(url)
stats.timeouts += 1
},
}
const forceUrls = (import.meta.env.VITE_FORCE_RELAYS || "").split(",").filter(identity)
const forceRelays = forceUrls.map(url => ({url, write: true}))
const countRelay = forceUrls[0] || "wss://rbr.bio"
const defaultUrls =
forceUrls.length > 0
? forceUrls
: [
"wss://purplepag.es",
"wss://relay.damus.io",
"wss://relay.nostr.band",
"wss://nostr-pub.wellorder.net",
]
const defaultRelays = defaultUrls.map(objOf("url"))
const getUrls = relays => {
if (relays.length === 0) {
error(`Attempted to connect to zero urls`)
}
const urls = new Set(relays.map(normalizeRelayUrl))
if (urls.size !== relays.length) {
warn(`Attempted to connect to non-unique relays`)
}
return Array.from(urls)
}
const pool = new Pool()
pool.bus.addListeners({
open: ({url}) => {
const stats = Meta.getStats(url)
stats.openedAt = Date.now()
},
close: ({url}) => {
const stats = Meta.getStats(url)
stats.closedAt = Date.now()
},
})
function disconnect(url) {
pool.remove(url)
delete Meta.stats[url]
delete Meta.errors[url]
}
function getQuality(url) {
if (Meta.errors[url]) {
return [
0,
switcher(Meta.errors[url], {
disconnected: "Disconnected",
unauthorized: "Logging in",
forbidden: "Failed to log in",
}),
]
}
const stats = Meta.getStats(url)
const {timeouts, subsCount, eoseTimer, eoseCount} = stats
const timeoutRate = timeouts > 0 ? timeouts / subsCount : null
const eoseQuality = eoseCount > 0 ? Math.max(1, 500 / (eoseTimer / eoseCount)) : null
if (timeoutRate && timeoutRate > 0.5) {
return [1 - timeoutRate, "Slow connection"]
}
if (eoseQuality && eoseQuality < 0.7) {
return [eoseQuality, "Slow connection"]
}
if (eoseQuality) {
return [eoseQuality, "Connected"]
}
if (pool.get(url).status === Socket.STATUS.READY) {
return [1, "Connected"]
}
return [0.5, "Not Connected"]
}
async function getExecutor(urls, {bypassBoot = false} = {}) {
if (forceUrls.length > 0) {
urls = forceUrls
}
let target
// Try to use our multiplexer, but if it fails to connect fall back to relays. If
// we're only connecting to a single relay, just do it directly, unless we already
// have a connection to the multiplexer open, in which case we're probably doing
// AUTH with a single relay.
if (Config.multiplextrUrl && (urls.length > 1 || pool.has(Config.multiplextrUrl))) {
const socket = pool.get(Config.multiplextrUrl)
if (!socket.error) {
target = new Plex(urls, pool.get(Config.multiplextrUrl))
}
}
if (!target) {
target = new Relays(urls.map(url => pool.get(url)))
}
const executor = new Executor(target)
executor.handleAuth({
onAuth(url, challenge) {
Meta.errors[url] = "unauthorized"
return Config.authHandler?.(url, challenge)
},
onOk(url, id, ok, message) {
Meta.errors[url] = ok ? null : "forbidden"
},
})
// Eagerly connect and handle AUTH
await Promise.all(
ensurePlural(executor.target.sockets).map(async socket => {
const {limitation} = ext.routing.getRelayMeta(socket.url)
const waitForBoot = limitation?.payment_required || limitation?.auth_required
if (socket.status === Socket.STATUS.NEW) {
socket.booted = sleep(2000)
await socket.connect()
}
// Delay REQ/EVENT until AUTH flow happens
if (!bypassBoot && waitForBoot) {
await socket.booted
}
})
)
return executor
}
async function publish({relays, event, onProgress, timeout = 3000, verb = "EVENT"}) {
const urls = getUrls(relays)
const executor = await getExecutor(urls, {bypassBoot: verb === "AUTH"})
Meta.onPublish(urls)
log(`Publishing to ${urls.length} relays`, event, urls)
return new Promise(resolve => {
const timeouts = new Set()
const succeeded = new Set()
const failed = new Set()
const getProgress = () => {
const completed = union(timeouts, succeeded, failed)
const pending = difference(urls, completed)
return {succeeded, failed, timeouts, completed, pending}
}
const attemptToResolve = () => {
const progress = getProgress()
if (progress.pending.size === 0) {
log(`Finished publishing to ${urls.length} relays`, event, progress)
resolve(progress)
sub.unsubscribe()
executor.target.cleanup()
} else if (onProgress) {
onProgress(progress)
}
}
setTimeout(() => {
for (const url of urls) {
if (!succeeded.has(url) && !failed.has(url)) {
timeouts.add(url)
}
}
attemptToResolve()
}, timeout)
const sub = executor.publish(event, {
verb,
onOk: url => {
succeeded.add(url)
timeouts.delete(url)
failed.delete(url)
attemptToResolve()
},
onError: url => {
failed.add(url)
timeouts.delete(url)
attemptToResolve()
},
})
// Report progress to start
attemptToResolve()
})
}
type SubscribeOpts = {
relays: string[]
filter: Filter[] | Filter
onEvent: (event: MyEvent) => void
onEose?: (url: string) => void
}
async function subscribe({relays, filter, onEvent, onEose}: SubscribeOpts) {
const urls = getUrls(relays)
const executor = await getExecutor(urls)
const filters = ensurePlural(filter)
const now = Date.now()
const seen = new Map()
const eose = new Set()
log(`Starting subscription with ${relays.length} relays`, {filters, relays})
Meta.onSubscriptionStart(urls)
const sub = executor.subscribe(filters, {
onEvent: (url, event) => {
const seen_on = seen.get(event.id)
if (seen_on) {
if (!seen_on.includes(url)) {
seen_on.push(url)
}
return
}
Object.assign(event, {
seen_on: [url],
content: event.content || "",
})
seen.set(event.id, event.seen_on)
try {
if (!verifySignature(event)) {
return
}
} catch (e) {
console.error(e)
return
}
Meta.onEvent(url)
onEvent(event)
},
onEose: url => {
onEose?.(url)
// Keep track of relay timing stats, but only for the first eose we get
if (!eose.has(url)) {
Meta.onEose(url, Date.now() - now)
}
eose.add(url)
},
})
return {
unsub: () => {
log(`Closing subscription`, filters)
sub.unsubscribe()
executor.target.cleanup()
Meta.onSubscriptionEnd(urls)
},
}
}
async function count(filter) {
const filters = ensurePlural(filter)
const executor = await getExecutor([countRelay])
return new Promise(resolve => {
const sub = executor.count(filters, {
onCount: (url, {count}) => resolve(count),
})
setTimeout(() => {
resolve(0)
sub.unsubscribe()
executor.target.cleanup()
}, 3000)
})
}
export default {
ext,
Config,
Meta,
forceUrls,
forceRelays,
defaultUrls,
defaultRelays,
disconnect,
getQuality,
publish,
subscribe,
count,
}

View File

@ -21,7 +21,6 @@
} from "src/util/misc"
import {onReady} from "src/util/loki"
import * as system from "src/system"
import pool from "src/agent/pool"
import {loadAppData} from "src/app/state"
import {theme, getThemeVariables, appName, modal} from "src/partials/state"
import {logUsage} from "src/app/state"
@ -54,7 +53,7 @@
const seenChallenges = new Set()
// When we get an AUTH challenge from our pool, attempt to authenticate
pool.Config.authHandler = async (url, challenge) => {
system.network.authHandler = async (url, challenge) => {
if (get(system.keys.canSign) && !seenChallenges.has(challenge)) {
seenChallenges.add(challenge)

View File

@ -2,9 +2,8 @@
import cx from "classnames"
import {theme, installPrompt} from "src/partials/state"
import PersonCircle from "src/app/shared/PersonCircle.svelte"
import {keys, directory, alerts} from "src/system"
import {FORCE_RELAYS, keys, directory, alerts} from "src/system"
import {watch} from "src/util/loki"
import pool from "src/agent/pool"
import {routes, slowConnections, menuIsOpen} from "src/app/state"
const {canSign, pubkey} = keys
@ -86,7 +85,7 @@
</a>
</li>
<li class="mx-3 my-4 h-px bg-gray-6" />
{#if pool.forceUrls.length === 0}
{#if FORCE_RELAYS.length === 0}
<li class="relative cursor-pointer">
<a class="block px-4 py-2 transition-all hover:bg-accent hover:text-white" href="/relays">
<i class="fa fa-server mr-2" /> Relays

View File

@ -14,8 +14,7 @@
import CopyValue from "src/partials/CopyValue.svelte"
import PersonBadge from "src/app/shared/PersonBadge.svelte"
import RelayCard from "src/app/shared/RelayCard.svelte"
import {ENABLE_ZAPS, keys, nip57, cmd, routing, social} from "src/system"
import pool from "src/agent/pool"
import {ENABLE_ZAPS, FORCE_RELAYS, keys, nip57, cmd, routing, social} from "src/system"
export let note
export let reply
@ -89,7 +88,7 @@
actions.push({label: "Mute", icon: "microphone-slash", onClick: mute})
}
if (pool.forceUrls.length === 0) {
if (FORCE_RELAYS.length === 0) {
actions.push({
label: "Details",
icon: "info",
@ -136,7 +135,7 @@
{/if}
</div>
<div class="flex items-center">
{#if pool.forceUrls.length === 0}
{#if FORCE_RELAYS.length === 0}
<!-- Mobile version -->
<div
style="transform: scale(-1, 1)"

View File

@ -4,9 +4,8 @@
import {modal} from "src/partials/state"
import Popover from "src/partials/Popover.svelte"
import OverflowMenu from "src/partials/OverflowMenu.svelte"
import {keys, routing, social, directory} from "src/system"
import {FORCE_RELAYS, keys, routing, social, directory} from "src/system"
import {watch} from "src/util/loki"
import pool from "src/agent/pool"
import {addToList} from "src/app/state"
export let pubkey
@ -45,7 +44,7 @@
}
}
if (pool.forceUrls.length === 0) {
if (FORCE_RELAYS.length === 0) {
actions.push({onClick: openProfileInfo, label: "Details", icon: "info"})
}

View File

@ -4,10 +4,9 @@
import {tweened} from "svelte/motion"
import {numberFmt} from "src/util/misc"
import {modal} from "src/partials/state"
import {social, routing} from "src/system"
import {watch} from "src/util/loki"
import network from "src/agent/network"
import pool from "src/agent/pool"
import {social, routing, network} from "src/system"
import legacyNetwork from "src/agent/network"
export let pubkey
@ -26,14 +25,14 @@
onMount(async () => {
// Get our followers count
const count = await pool.count({kinds: [3], "#p": [pubkey]})
const count = await network.count({kinds: [3], "#p": [pubkey]})
if (count) {
followersCount.set(count)
} else {
const followers = new Set()
await network.load({
await legacyNetwork.load({
relays: routing.getUserHints(3, "read"),
shouldProcess: false,
filter: [{kinds: [3], "#p": [pubkey]}],

View File

@ -8,9 +8,8 @@
import Toggle from "src/partials/Toggle.svelte"
import Rating from "src/partials/Rating.svelte"
import Anchor from "src/partials/Anchor.svelte"
import {keys, routing, directory} from "src/system"
import {keys, routing, directory, meta} from "src/system"
import {watch} from "src/util/loki"
import pool from "src/agent/pool"
import {loadAppData} from "src/app/state"
export let relay
@ -48,7 +47,7 @@
onMount(() => {
return poll(10_000, () => {
;[quality, message] = pool.getQuality(relay.url)
;[quality, message] = meta.getRelayQuality(relay.url)
})
})
</script>

View File

@ -5,8 +5,7 @@
import {poll, stringToHue, hsl} from "src/util/misc"
import Rating from "src/partials/Rating.svelte"
import Anchor from "src/partials/Anchor.svelte"
import {routing} from "src/system"
import pool from "src/agent/pool"
import {routing, meta} from "src/system"
export let relay
export let rating = null
@ -16,8 +15,8 @@
let showStatus = false
onMount(() => {
return poll(10_000, async () => {
;[quality, message] = pool.getQuality(relay.url)
return poll(10_000, () => {
;[quality, message] = meta.getRelayQuality(relay.url)
})
})
</script>

View File

@ -13,6 +13,7 @@ import {userKinds, noteKinds} from "src/util/nostr"
import {findReplyId} from "src/util/nostr"
import {modal, toast} from "src/partials/state"
import {
FORCE_RELAYS,
DEFAULT_FOLLOWS,
ENABLE_ZAPS,
keys,
@ -22,9 +23,10 @@ import {
settings,
cache,
chat,
meta,
network,
} from "src/system"
import network from "src/agent/network"
import pool from "src/agent/pool"
import legacyNetwork from "src/agent/network"
// Routing
@ -113,7 +115,7 @@ export const listen = async () => {
])
;(listen as any)._listener?.unsub()
;(listen as any)._listener = await network.listen({
;(listen as any)._listener = await legacyNetwork.listen({
relays: routing.getUserRelayUrls("read"),
filter: [
{kinds: noteKinds.concat(4), authors: [pubkey], since},
@ -122,7 +124,7 @@ export const listen = async () => {
{kinds: [42], "#e": channelIds, since},
],
onChunk: async events => {
await network.loadPeople(pluck("pubkey", events))
await legacyNetwork.loadPeople(pluck("pubkey", events))
},
})
}
@ -132,16 +134,21 @@ export const slowConnections = writable([])
setInterval(() => {
// Only notify about relays the user is actually subscribed to
const relays = new Set(routing.getUserRelayUrls())
const $slowConnections = []
// Prune connections we haven't used in a while
Object.entries(pool.Meta.stats)
.filter(([url, stats]) => stats.lastRequest < Date.now() - 60_000)
.forEach(([url, stats]) => pool.disconnect(url))
for (const url of network.pool.data.keys()) {
const stats = meta.relayStats.get(url)
if (stats.active_subs === 0 && stats.last_activity < Date.now() - 60_000) {
network.pool.remove(url)
} else if (relays.has(url) && first(meta.getRelayQuality(url)) < 0.3) {
$slowConnections.push(url)
}
}
// Alert the user to any heinously slow connections
slowConnections.set(
Object.keys(pool.Meta.stats).filter(url => relays.has(url) && first(pool.getQuality(url)) < 0.3)
)
slowConnections.set($slowConnections)
}, 30_000)
export const loadAppData = async pubkey => {
@ -150,15 +157,15 @@ export const loadAppData = async pubkey => {
listen()
// Make sure the user and their network is loaded
await network.loadPeople([pubkey], {force: true, kinds: userKinds})
await network.loadPeople(social.getUserFollows())
await legacyNetwork.loadPeople([pubkey], {force: true, kinds: userKinds})
await legacyNetwork.loadPeople(social.getUserFollows())
}
}
export const login = async (method, key) => {
keys.login(method, key)
if (pool.forceUrls.length > 0) {
if (FORCE_RELAYS.length > 0) {
modal.replace({
type: "message",
message: "Logging you in...",
@ -168,7 +175,7 @@ export const login = async (method, key) => {
await Promise.all([
sleep(1500),
network.loadPeople([keys.getPubkey()], {force: true, kinds: userKinds}),
legacyNetwork.loadPeople([keys.getPubkey()], {force: true, kinds: userKinds}),
])
navigate("/notes")

View File

@ -12,10 +12,9 @@
import Anchor from "src/partials/Anchor.svelte"
import Modal from "src/partials/Modal.svelte"
import RelayCard from "src/app/shared/RelayCard.svelte"
import {DEFAULT_RELAYS, routing, keys} from "src/system"
import {DEFAULT_RELAYS, FORCE_RELAYS, routing, keys, network} from "src/system"
import {watch} from "src/util/loki"
import network from "src/agent/network"
import pool from "src/agent/pool"
import legacyNetwork from "src/agent/network"
import {loadAppData} from "src/app/state"
let modal = null
@ -54,7 +53,7 @@
attemptedRelays.add(relay.url)
currentRelays[i] = relay
network
legacyNetwork
.loadPeople([keys.getPubkey()], {relays: [relay.url], force: true, kinds: userKinds})
.then(async () => {
// Wait a bit before removing the relay to smooth out the ui
@ -72,7 +71,7 @@
navigate("/notes")
} else {
pool.disconnect(relay.url)
network.pool.remove(relay.url)
}
})
}
@ -120,7 +119,7 @@
}}>here</Anchor
>.
</p>
{#if pool.forceUrls.length > 0}
{#if FORCE_RELAYS.length > 0}
<Spinner />
{:else if Object.values(currentRelays).length > 0}
<p>Currently searching:</p>

View File

@ -6,12 +6,12 @@
import Anchor from "src/partials/Anchor.svelte"
import Heading from "src/partials/Heading.svelte"
import Content from "src/partials/Content.svelte"
import pool from "src/agent/pool"
import {FORCE_RELAYS} from "src/system"
export let privkey
const nsec = nip19.nsecEncode(privkey)
const nextStage = pool.forceUrls.length > 0 ? "follows" : "relays"
const nextStage = FORCE_RELAYS.length > 0 ? "follows" : "relays"
const copyKey = () => {
copyToClipboard(nsec)

View File

@ -12,8 +12,7 @@
import PersonNotes from "src/app/shared/PersonNotes.svelte"
import PersonLikes from "src/app/shared/PersonLikes.svelte"
import PersonRelays from "src/app/shared/PersonRelays.svelte"
import {nip05, directory, routing} from "src/system"
import pool from "src/agent/pool"
import {FORCE_RELAYS, nip05, directory, routing} from "src/system"
import {watch} from "src/util/loki"
import {routes} from "src/app/state"
import PersonCircle from "src/app/shared/PersonCircle.svelte"
@ -24,7 +23,7 @@
export let activeTab
export let relays = []
const tabs = ["notes", "likes", pool.forceUrls.length === 0 && "relays"].filter(identity)
const tabs = ["notes", "likes", FORCE_RELAYS.length === 0 && "relays"].filter(identity)
const pubkey = toHex(npub)
const profile = watch(directory.profiles, () => directory.getProfile(pubkey))
const handle = watch(nip05.handles, () => nip05.getHandle(pubkey))

View File

@ -6,8 +6,7 @@
import Anchor from "src/partials/Anchor.svelte"
import Content from "src/partials/Content.svelte"
import Heading from "src/partials/Heading.svelte"
import {settings} from "src/system"
import pool from "src/agent/pool"
import {FORCE_RELAYS, settings} from "src/system"
let values = {...settings.getSettings()}
@ -68,7 +67,7 @@
>.
</p>
</div>
{#if pool.forceUrls.length === 0}
{#if FORCE_RELAYS.length === 0}
<div class="flex flex-col gap-1">
<strong>Multiplextr URL</strong>
<Input bind:value={values.multiplextrUrl}>

View File

@ -4,7 +4,7 @@ import {doPipe} from "hurdak/lib/hurdak"
import {Tags, channelAttrs, findRoot, findReply} from "src/util/nostr"
import {parseContent} from "src/util/notes"
export default ({keys, sync, pool, routing, displayPubkey}) => {
export default ({keys, sync, network, routing, displayPubkey}) => {
const authenticate = (url, challenge) =>
new PublishableEvent(22242, {
tags: [
@ -163,7 +163,7 @@ export default ({keys, sync, pool, routing, displayPubkey}) => {
async publish(relays: string[], onProgress = null, verb = "EVENT") {
const event = await this.getSignedEvent()
// return console.log(event)
const promise = pool.publish({relays, event, onProgress, verb})
const promise = network.publish({relays, event, onProgress, verb})
// Copy the event since loki mutates it to add metadata
sync.processEvents({...event, seen_on: []})

View File

@ -12,8 +12,9 @@ import initCache from "src/system/cache"
import initChat from "src/system/chat"
import initAlerts from "src/system/alerts"
import initCmd from "src/system/cmd"
import network from "src/agent/network"
import pool from "src/agent/pool"
import Network from "src/system/network"
import initMeta from "src/system/meta"
import legacyNetwork from "src/agent/network"
// Hacks for circular deps
@ -35,17 +36,13 @@ const cache = initCache({keys, sync, social})
const chat = initChat({keys, sync, getCmd, getUserWriteRelays})
const alerts = initAlerts({keys, sync, chat, social, isUserEvent})
const content = initContent({keys, sync, getCmd, getUserWriteRelays})
const cmd = initCmd({keys, sync, pool, routing, displayPubkey: directory.displayPubkey})
const network = new Network({settings, routing})
const cmd = initCmd({keys, sync, network, routing, displayPubkey: directory.displayPubkey})
const meta = initMeta({network})
// Glue stuff together
network.ext.sync = sync
settings.store.subscribe($settings => {
pool.Config.multiplextrUrl = $settings.multiplextrUrl
})
pool.ext.routing = routing
legacyNetwork.ext.sync = sync
// ===========================================================
// Initialization
@ -72,4 +69,6 @@ export {
content,
cmd,
initialize,
network,
meta,
}

132
src/system/meta.ts Normal file
View File

@ -0,0 +1,132 @@
import {Socket} from "paravel"
import {Table} from "src/util/loki"
import {switcher} from "hurdak/lib/hurdak"
export default ({network}) => {
const relayStats = new Table("meta/relayStats", "url")
network.pool.on("open", ({url}) => {
relayStats.patch({url, last_opened: Date.now(), last_activity: Date.now()})
})
network.pool.on("close", ({url}) => {
relayStats.patch({url, last_closed: Date.now(), last_activity: Date.now()})
})
network.pool.on("error:set", (url, error) => {
relayStats.patch({url, error})
})
network.pool.on("error:clear", url => {
relayStats.patch({url, error: null})
})
network.on("publish", urls => {
relayStats.patch(
urls.map(url => ({
url,
last_publish: Date.now(),
last_activity: Date.now(),
}))
)
})
network.on("sub:open", urls => {
for (const url of urls) {
const stats = relayStats.get(url)
relayStats.patch({
url,
last_sub: Date.now(),
last_activity: Date.now(),
total_subs: (stats?.total_subs || 0) + 1,
active_subs: (stats?.active_subs || 0) + 1,
})
}
})
network.on("sub:close", urls => {
for (const url of urls) {
const stats = relayStats.get(url)
relayStats.patch({
url,
last_activity: Date.now(),
active_subs: stats.active_subs - 1,
})
}
})
network.on("event", url => {
const stats = relayStats.get(url)
relayStats.patch({
url,
last_activity: Date.now(),
events_count: (stats.events_count || 0) + 1,
})
})
network.on("eose", (url, ms) => {
const stats = relayStats.get(url)
relayStats.patch({
url,
last_activity: Date.now(),
eose_count: (stats.eose_count || 0) + 1,
eose_timer: (stats.eose_timer || 0) + ms,
})
})
network.on("timeout", (url, ms) => {
const stats = relayStats.get(url)
relayStats.patch({
url,
last_activity: Date.now(),
timeouts: (stats.timeouts || 0) + 1,
})
})
const getRelayQuality = url => {
const stats = relayStats.get(url)
if (stats.error) {
return [
0,
switcher(stats.error, {
disconnected: "Disconnected",
unauthorized: "Logging in",
forbidden: "Failed to log in",
}),
]
}
const {timeouts, subs_count: subsCount, eose_timer: eoseTimer, eose_count: eoseCount} = stats
const timeoutRate = timeouts > 0 ? timeouts / subsCount : null
const eoseQuality = eoseCount > 0 ? Math.max(1, 500 / (eoseTimer / eoseCount)) : null
if (timeoutRate && timeoutRate > 0.5) {
return [1 - timeoutRate, "Slow connection"]
}
if (eoseQuality && eoseQuality < 0.7) {
return [eoseQuality, "Slow connection"]
}
if (eoseQuality) {
return [eoseQuality, "Connected"]
}
if (network.pool.get(url).status === Socket.STATUS.READY) {
return [1, "Connected"]
}
return [0.5, "Not Connected"]
}
return {
relayStats,
getRelayQuality,
}
}

260
src/system/network.ts Normal file
View File

@ -0,0 +1,260 @@
import type {Filter} from "nostr-tools"
import type {MyEvent} from "src/util/types"
import {EventEmitter} from "events"
import {verifySignature} from "nostr-tools"
import {Pool, Plex, Relays, Executor, Socket} from "paravel"
import {ensurePlural} from "hurdak/lib/hurdak"
import {union, sleep, difference} from "src/util/misc"
import {warn, error, log} from "src/util/logger"
import {normalizeRelayUrl} from "src/util/nostr"
import {FORCE_RELAYS, COUNT_RELAYS} from "src/system/env"
type SubscribeOpts = {
relays: string[]
filter: Filter[] | Filter
onEvent: (event: MyEvent) => void
onEose?: (url: string) => void
}
const getUrls = relays => {
if (relays.length === 0) {
error(`Attempted to connect to zero urls`)
}
const urls = new Set(relays.map(normalizeRelayUrl))
if (urls.size !== relays.length) {
warn(`Attempted to connect to non-unique relays`)
}
return Array.from(urls)
}
export default class Network extends EventEmitter {
authHandler?: (url: string, challenge: string) => void
settings: {
getSetting: (name: string) => string
}
routing: {
getRelayMeta: (url: string) => {
limitation?: {
auth_required?: boolean
payment_required?: boolean
}
}
}
pool: Pool
constructor({settings, routing}) {
super()
this.authHandler = null
this.settings = settings
this.routing = routing
this.pool = new Pool()
}
getExecutor = async (urls, {bypassBoot = false} = {}) => {
if (FORCE_RELAYS.length > 0) {
urls = FORCE_RELAYS
}
let target
const multiplextrUrl = this.settings.getSetting("multiplextrUrl")
// Try to use our multiplexer, but if it fails to connect fall back to relays. If
// we're only connecting to a single relay, just do it directly, unless we already
// have a connection to the multiplexer open, in which case we're probably doing
// AUTH with a single relay.
if (multiplextrUrl && (urls.length > 1 || this.pool.has(multiplextrUrl))) {
const socket = this.pool.get(multiplextrUrl)
if (!socket.error) {
target = new Plex(urls, this.pool.get(multiplextrUrl))
}
}
if (!target) {
target = new Relays(urls.map(url => this.pool.get(url)))
}
const executor = new Executor(target)
executor.handleAuth({
onAuth(url, challenge) {
this.emit("error:set", url, "unauthorized")
return this.authHandler?.(url, challenge)
},
onOk(url, id, ok, message) {
this.emit("error:clear", url, ok ? null : "forbidden")
},
})
// Eagerly connect and handle AUTH
await Promise.all(
executor.target.sockets.map(async socket => {
const {limitation} = this.routing.getRelayMeta(socket.url)
const waitForBoot = limitation?.payment_required || limitation?.auth_required
if (socket.status === Socket.STATUS.NEW) {
socket.booted = sleep(2000)
await socket.connect()
}
// Delay REQ/EVENT until AUTH flow happens
if (!bypassBoot && waitForBoot) {
await socket.booted
}
})
)
return executor
}
publish = async ({relays, event, onProgress, timeout = 3000, verb = "EVENT"}) => {
const urls = getUrls(relays)
const executor = await this.getExecutor(urls, {bypassBoot: verb === "AUTH"})
this.emit("publish", urls)
log(`Publishing to ${urls.length} relays`, event, urls)
return new Promise(resolve => {
const timeouts = new Set()
const succeeded = new Set()
const failed = new Set()
const getProgress = () => {
const completed = union(timeouts, succeeded, failed)
const pending = difference(urls, completed)
return {succeeded, failed, timeouts, completed, pending}
}
const attemptToResolve = () => {
const progress = getProgress()
if (progress.pending.size === 0) {
log(`Finished publishing to ${urls.length} relays`, event, progress)
resolve(progress)
sub.unsubscribe()
executor.target.cleanup()
} else if (onProgress) {
onProgress(progress)
}
}
setTimeout(() => {
for (const url of urls) {
if (!succeeded.has(url) && !failed.has(url)) {
timeouts.add(url)
}
}
attemptToResolve()
}, timeout)
const sub = executor.publish(event, {
verb,
onOk: url => {
succeeded.add(url)
timeouts.delete(url)
failed.delete(url)
attemptToResolve()
},
onError: url => {
failed.add(url)
timeouts.delete(url)
attemptToResolve()
},
})
// Report progress to start
attemptToResolve()
})
}
subscribe = async ({relays, filter, onEvent, onEose}: SubscribeOpts) => {
const urls = getUrls(relays)
const executor = await this.getExecutor(urls)
const filters = ensurePlural(filter)
const now = Date.now()
const seen = new Map()
const eose = new Set()
log(`Starting subscription with ${relays.length} relays`, {filters, relays})
this.emit("sub:open", urls)
const sub = executor.subscribe(filters, {
onEvent: (url, event) => {
const seen_on = seen.get(event.id)
if (seen_on) {
if (!seen_on.includes(url)) {
seen_on.push(url)
}
return
}
Object.assign(event, {
seen_on: [url],
content: event.content || "",
})
seen.set(event.id, event.seen_on)
try {
if (!verifySignature(event)) {
return
}
} catch (e) {
console.error(e)
return
}
this.emit("event", url)
onEvent(event)
},
onEose: url => {
onEose?.(url)
// Keep track of relay timing stats, but only for the first eose we get
if (!eose.has(url)) {
this.emit("eose", url, Date.now() - now)
}
eose.add(url)
},
})
return {
unsub: () => {
log(`Closing subscription`, filters)
sub.unsubscribe()
executor.target.cleanup()
this.emit("sub:close", urls)
},
}
}
count = async filter => {
const filters = ensurePlural(filter)
const executor = await this.getExecutor(COUNT_RELAYS)
return new Promise(resolve => {
const sub = executor.count(filters, {
onCount: (url, {count}) => resolve(count),
})
setTimeout(() => {
resolve(0)
sub.unsubscribe()
executor.target.cleanup()
}, 3000)
})
}
}

BIN
yarn.lock

Binary file not shown.