Clean up pool so we're not waiting for slow connections

This commit is contained in:
Jonathan Staab 2023-02-04 14:11:34 -06:00
parent dcd8de7912
commit fc984cafa6
11 changed files with 115 additions and 158 deletions

View File

@ -1,26 +1,32 @@
module.exports = {
"env": {
"browser": true,
"es2021": true
root: true,
env: {
browser: true,
es2021: true
},
"plugins": [
'svelte3'
],
plugins: ['svelte3', '@typescript-eslint'],
overrides: [
{
files: ['*.svelte'],
processor: 'svelte3/svelte3'
}
],
"extends": "eslint:recommended",
"parserOptions": {
"ecmaVersion": "latest",
"sourceType": "module"
extends: ['eslint:recommended', 'plugin:@typescript-eslint/recommended'],
parser: '@typescript-eslint/parser',
parserOptions: {
ecmaVersion: "latest",
sourceType: "module",
tsconfigRootDir: __dirname,
project: ['./tsconfig.json'],
extraFileExtensions: ['.svelte']
},
"rules": {
settings: {
'svelte3/typescript': require('typescript'),
},
rules: {
"a11y-click-events-have-key-events": "off",
"no-unused-vars": ["error", {args: "none"}],
"no-async-promise-executor": "off",
},
"ignorePatterns": ["*.svg"]
ignorePatterns: ["*.svg"]
}

BIN
package-lock.json generated

Binary file not shown.

View File

@ -12,12 +12,15 @@
},
"devDependencies": {
"@sveltejs/vite-plugin-svelte": "^1.1.0",
"@typescript-eslint/eslint-plugin": "^5.50.0",
"@typescript-eslint/parser": "^5.50.0",
"autoprefixer": "^10.4.13",
"eslint": "^8.28.0",
"eslint": "^8.33.0",
"eslint-plugin-svelte3": "^4.0.0",
"postcss": "^8.4.19",
"svelte": "^3.52.0",
"tailwindcss": "^3.2.4",
"typescript": "^4.9.5",
"vite": "^3.2.3"
},
"dependencies": {

View File

@ -71,7 +71,7 @@ export const publish = async (relays, event) => {
return signedEvent
}
export const load = async (relays, filter, opts?) => {
export const load = async (relays, filter, opts?): Promise<Record<string, unknown>[]> => {
const events = await pool.request(relays, filter, opts)
await processEvents(events)
@ -79,18 +79,16 @@ export const load = async (relays, filter, opts?) => {
return events
}
export const listen = async (relays, filter, onEvent, {shouldProcess = true}: any = {}) => {
const sub = await pool.subscribe(relays, filter)
export const listen = (relays, filter, onEvent, {shouldProcess = true}: any = {}) => {
return pool.subscribe(relays, filter, {
onEvent: e => {
if (shouldProcess) {
processEvents(e)
}
sub.onEvent(e => {
if (shouldProcess) {
processEvents(e)
}
if (onEvent) {
onEvent(e)
}
if (onEvent) {
onEvent(e)
}
},
})
return sub
}

View File

@ -1,5 +1,6 @@
import type {Relay} from 'nostr-tools'
import {relayInit} from 'nostr-tools'
import {uniqBy, reject, prop, find, whereEq, is, filter, identity} from 'ramda'
import {uniqBy, reject, prop, find, whereEq, is} from 'ramda'
import {ensurePlural} from 'hurdak/lib/hurdak'
import {isRelay} from 'src/util/nostr'
import {sleep} from 'src/util/misc'
@ -8,6 +9,12 @@ import {db} from 'src/agent/data'
let connections = []
class Connection {
promise: Promise<void>
nostr: Relay
status: string
url: string
stats: Record<string, number>
lastRequest: number
constructor(url) {
this.promise = null
this.nostr = this.init(url)
@ -107,7 +114,7 @@ const describeFilter = ({kinds = [], ...filter}) => {
return '(' + parts.join(',') + ')'
}
const subscribe = async (relays, filters) => {
const subscribe = async (relays, filters, {onEvent, onEose}: Record<string, (e: any) => void>) => {
relays = uniqBy(prop('url'), relays.filter(r => isRelay(r.url)))
filters = ensurePlural(filters)
@ -117,151 +124,101 @@ const subscribe = async (relays, filters) => {
filters.map(describeFilter).join(':'),
].join('-')
// Deduplicate events
const seen = new Set()
const eose = []
const events = []
let onEvent
let onEose
const subs = filter(identity, await Promise.all(
relays.map(async relay => {
const conn = await connect(relay.url)
// Don't await before returning so we're not blocking on slow connects
const promises = relays.map(async relay => {
const conn = await connect(relay.url)
// If the relay failed to connect, give up
if (!conn) {
return null
}
// If the relay failed to connect, give up
if (!conn) {
return null
}
const sub = conn.nostr.sub(filters, {id})
const sub = conn.nostr.sub(filters, {id})
// Subscribe to events immediately so we don't miss any while we
// wait for slow relays to connect
if (onEvent) {
sub.on('event', e => {
if (!seen.has(e.id)) {
e.seen_on = sub.conn.url
seen.add(e.id)
if (onEvent) {
onEvent(e)
} else {
events.push(e)
}
onEvent(Object.assign(e, {seen_on: conn.url}))
}
})
}
// Same thing for eose
sub.on('eose', () => {
if (onEose) {
onEose(conn.url)
} else {
eose.push(conn.url)
}
})
if (onEose) {
sub.on('eose', () => onEose(conn.url))
}
sub.conn = conn
sub.conn.stats.activeCount += 1
conn.stats.activeCount += 1
if (sub.conn.stats.activeCount > 10) {
console.warn(`Relay ${sub.conn.url} has >10 active subscriptions`)
}
if (conn.stats.activeCount > 10) {
console.warn(`Relay ${conn.url} has >10 active subscriptions`)
}
return sub
})
))
return Object.assign(sub, {conn})
})
return {
subs,
unsub: () => {
subs.forEach(sub => {
if (sub.conn.status === 'ready') {
sub.unsub()
promises.forEach(async promise => {
const sub = await promise
if (sub) {
if (sub.conn.status === 'ready') {
sub.unsub()
}
sub.conn.stats.activeCount -= 1
}
sub.conn.stats.activeCount -= 1
})
},
onEvent: cb => {
// Report our buffered events
events.splice(0).forEach(e => cb(e))
// Add our listener for future ones
onEvent = cb
},
onEose: cb => {
// Report our buffered eoses
eose.splice(0).forEach(e => cb(e))
// Add our listener for future ones
onEose = cb
},
}
}
const request = (relays, filters, {threshold = 1} = {}) => {
relays = uniqBy(prop('url'), relays.filter(r => isRelay(r.url)))
const request = (relays, filters, {threshold = 1} = {}): Promise<Record<string, unknown>[]> => {
return new Promise(async resolve => {
const agg = await subscribe(relays, filters)
relays = uniqBy(prop('url'), relays.filter(r => isRelay(r.url)))
threshold = Math.min(relays.length, threshold)
const now = Date.now()
const relaysWithEvents = new Set()
const events = []
const eose = []
const attemptToComplete = () => {
// If we have all relays, more than `threshold` reporting events, most after
// a short timeout, or all after a long timeout, go ahead and unsubscribe.
const done = (
eose.length === agg.subs.length
|| eose.filter(url => relaysWithEvents.has(url)).length >= threshold
|| (
Date.now() - now >= 1000
&& eose.length > agg.subs.length - Math.round(agg.subs.length / 10)
)
|| Date.now() - now >= 5000
)
if (done) {
if (eose.length >= threshold || Date.now() - now >= 5000) {
agg.unsub()
resolve(events)
// Keep track of relay timeouts
agg.subs.forEach(async sub => {
if (!eose.includes(sub.conn.url)) {
const conn = findConnection(sub.conn.url)
if (conn) {
conn.stats.count += 1
conn.stats.timer += Date.now() - now
conn.stats.timeouts += 1
}
}
})
}
}
agg.onEvent(e => {
relaysWithEvents.add(e.seen_on)
events.push(e)
})
agg.onEose(async url => {
if (!eose.includes(url)) {
eose.push(url)
const conn = findConnection(url)
// Keep track of relay timing stats
if (conn) {
conn.stats.count += 1
conn.stats.timer += Date.now() - now
}
}
attemptToComplete()
})
// If a relay takes too long, give up
setTimeout(attemptToComplete, 5000)
const agg = await subscribe(relays, filters, {
onEvent: e => {
relaysWithEvents.add(e.seen_on)
events.push(e)
},
onEose: async url => {
if (!eose.includes(url)) {
eose.push(url)
const conn = findConnection(url)
// Keep track of relay timing stats
if (conn) {
conn.stats.count += 1
conn.stats.timer += Date.now() - now
}
}
attemptToComplete()
},
})
})
}

View File

@ -30,7 +30,7 @@ const loadPeople = (relays, pubkeys, {kinds = personKinds, force = false, ...opt
const loadNetwork = async (relays, pubkey) => {
// Get this user's profile to start with. This may update what relays
// are available, so don't assign relays to a variable here.
let events = pubkey ? await loadPeople(relays, [pubkey], {force: true}) : []
const events = pubkey ? await loadPeople(relays, [pubkey], {force: true}) : []
let petnames = Tags.from(events.filter(e => e.kind === 3)).type("p").all()
// Default to some cool guys we know
@ -58,7 +58,7 @@ const loadContext = async (relays, notes, {loadParents = false, depth = 0, ...op
const parentTags = uniq(chunk.map(findReply).filter(identity))
const parentIds = Tags.wrap(parentTags).values().all()
const combinedRelays = uniq(relays.concat(Tags.wrap(parentTags).relays()))
const filter = [{kinds: [1, 7], '#e': chunkIds} as {}]
const filter = [{kinds: [1, 7], '#e': chunkIds} as object]
if (authors.length > 0) {
filter.push({kinds: personKinds, authors})

View File

@ -16,6 +16,7 @@
switcher(type, {
anchor: "underline",
button: "py-2 px-4 rounded bg-white text-accent",
'button-circle': "w-10 h-10 flex justify-center items-center rounded-full bg-white text-accent",
'button-accent': "py-2 px-4 rounded bg-accent text-white",
}),
)

View File

@ -15,7 +15,6 @@
import Compose from "src/partials/Compose.svelte"
import Card from "src/partials/Card.svelte"
import {user, people, getPerson, getRelays, getEventRelays} from 'src/agent'
import {addRelay, removeRelay} from "src/app"
import cmd from 'src/app/cmd'
export let note

View File

@ -44,11 +44,6 @@
)
const loadMessages = async ({until, limit}) => {
const a = db.table('messages')
const b = a.where('pubkey')
const c = b.equals(pubkey)
const d = c.toArray()
const e = await d
const fromThem = await db.table('messages').where('pubkey').equals(pubkey).toArray()
const toThem = await db.table('messages').where('recipient').equals(pubkey).toArray()
const events = fromThem.concat(toThem).filter(e => e.created_at < until)

View File

@ -10,7 +10,6 @@
import Tabs from "src/partials/Tabs.svelte"
import Content from "src/partials/Content.svelte"
import Anchor from "src/partials/Anchor.svelte"
import Button from "src/partials/Button.svelte"
import Spinner from "src/partials/Spinner.svelte"
import Notes from "src/views/person/Notes.svelte"
import Likes from "src/views/person/Likes.svelte"
@ -92,7 +91,7 @@
<Content>
<div class="flex gap-4" in:fly={{y: 20}}>
<div
class="overflow-hidden w-32 h-32 rounded-full bg-cover bg-center shrink-0 border border-solid border-white"
class="overflow-hidden w-16 h-16 sm:w-32 sm:h-32 rounded-full bg-cover bg-center shrink-0 border border-solid border-white"
style="background-image: url({person.picture})" />
<div class="flex flex-col gap-4 flex-grow">
<div class="flex justify-between items-center gap-4">
@ -107,22 +106,24 @@
</div>
{/if}
</div>
<div class="whitespace-nowrap flex gap-4 items-center">
<div class="whitespace-nowrap flex gap-3 items-center flex-wrap">
{#if $user?.pubkey === pubkey && keys.canSign()}
<Anchor type="button" href="/profile">
<i class="fa-solid fa-edit" /> Edit profile
</Anchor>
<Anchor href="/profile"><i class="fa-solid fa-edit" /> Edit profile</Anchor>
{:else if $user && keys.canSign()}
<Anchor type="button" on:click={openAdvanced}>
<i class="fa-solid fa-sliders" />
<Anchor type="button-circle" on:click={openAdvanced}>
<i class="fa fa-sliders" />
</Anchor>
<Anchor type="button" href={`/messages/${npub}`}>
<i class="fa-solid fa-envelope" />
<Anchor type="button-circle" href={`/messages/${npub}`}>
<i class="fa fa-envelope" />
</Anchor>
{#if following}
<Button on:click={unfollow}>Unfollow</Button>
<Anchor type="button-circle" on:click={unfollow}>
<i class="fa fa-user-minus" />
</Anchor>
{:else}
<Button on:click={follow}>Follow</Button>
<Anchor type="button-circle" on:click={follow}>
<i class="fa fa-user-plus" />
</Anchor>
{/if}
{/if}
</div>

View File

@ -100,7 +100,6 @@ export const createScroller = (loadMore, {reverse = false} = {}) => {
// loadMore function is not properly awaiting all the work necessary.
// That is the problem, but see commit 8371fde for another strategy
let done = false
let timeout = null
const check = async () => {
// While we have empty space, fill it
const {scrollY, innerHeight} = window
@ -111,8 +110,6 @@ export const createScroller = (loadMore, {reverse = false} = {}) => {
// Only trigger loading the first time we reach the threshold
if (shouldLoad) {
clearTimeout(timeout)
await loadMore()
}