Move more logic into subscription object

This commit is contained in:
Jonathan Staab 2023-08-15 12:46:20 -07:00
parent c8965d5585
commit 6a7a995979
8 changed files with 99 additions and 107 deletions

View File

@ -11,21 +11,9 @@
import {isNil, find, last} from "ramda"
import {Storage, seconds, Fetch, shuffle} from "hurdak"
import {tryFetch, hexToBech32, bech32ToHex, now} from "src/util/misc"
import {userKinds} from "src/util/nostr"
import {default as engine} from "src/app/engine"
import {
Keys,
Nip65,
pubkeyLoader,
user,
Env,
Network,
Builder,
Outbox,
Settings,
storage,
} from "src/app/engine"
import {listenForNotifications} from "src/app/state"
import {Keys, Nip65, user, Env, Network, Builder, Outbox, Settings, storage} from "src/app/engine"
import {loadAppData} from "src/app/state"
import {theme, getThemeVariables, appName, modal} from "src/partials/state"
import {logUsage} from "src/app/state"
import SideNav from "src/app/SideNav.svelte"
@ -168,14 +156,11 @@
}
})
storage.ready.then(() => {
const pubkey = Keys.pubkey.get()
const {pubkey} = Keys
// Make sure the user's stuff is loaded, but don't call loadAppData
// since that reloads messages and stuff
if (pubkey) {
pubkeyLoader.load(pubkey, {force: true, kinds: userKinds})
listenForNotifications()
storage.ready.then(() => {
if ($pubkey) {
loadAppData()
}
const interval = setInterval(async () => {
@ -208,8 +193,6 @@
clearInterval(interval)
}
})
const {pubkey} = Keys
</script>
<TypedRouter url={pathname}>

View File

@ -157,17 +157,6 @@ export const loadAppData = async () => {
// Load their network
pubkeyLoader.load(user.getFollows())
// Load their messages and notifications
Network.subscribe({
timeout: 10_000,
relays: user.getRelayUrls("read"),
filter: [
{kinds: [4], authors: [pubkey]},
{kinds: [4, 1059], "#p": [pubkey]},
{kinds: noteKinds, "#p": [pubkey]},
],
})
// Start our listener
listenForNotifications()
}

View File

@ -61,7 +61,7 @@
<Anchor
type="unstyled"
class="fa fa-arrow-left cursor-pointer text-2xl"
on:click={() => history.back()} />
href="/conversations" />
<PersonCircle {pubkey} size={10} />
</div>
<div class="flex h-12 w-full flex-col overflow-hidden pt-px">

View File

@ -1,4 +1,3 @@
import {verifySignature, matchFilters} from "nostr-tools"
import {Pool, Plex, Relays, Executor} from "paravel"
import {noop, ensurePlural, union, difference} from "hurdak"
import {warn, error, info} from "src/util/logger"
@ -74,7 +73,7 @@ export class Network {
shouldReconnect: connection.meta.lastClose < Date.now() - 30_000,
})
if (connection.isHealthy()) {
if (connection.socket.isHealthy()) {
target = new Plex(urls, connection)
}
}
@ -167,72 +166,24 @@ export class Network {
timeout,
shouldProcess = true,
}: SubscribeOpts) => {
const urls = getUrls(relays)
const executor = this.getExecutor(urls)
const filters = ensurePlural(filter)
const subscription = new Subscription()
const seen = new Map()
const eose = new Set()
info(`Starting subscription with ${relays.length} relays`, {filters, relays})
subscription.on("close", () => {
sub.unsubscribe()
executor.target.cleanup()
onClose?.()
const subscription = new Subscription({
executor: this.getExecutor(getUrls(relays)),
filters: ensurePlural(filter),
timeout,
relays,
})
if (timeout) {
setTimeout(subscription.close, timeout)
}
info(`Starting subscription with ${relays.length} relays`, {filter, relays})
const sub = executor.subscribe(filters, {
onEvent: (url: string, event: Event) => {
const seen_on = seen.get(event.id)
if (onEose) subscription.on("eose", onEose)
if (onClose) subscription.on("close", onClose)
if (seen_on) {
if (!seen_on.includes(url)) {
seen_on.push(url)
}
subscription.on("event", event => {
if (shouldProcess) {
this.engine.Events.queue.push(event)
}
return
}
Object.assign(event, {
seen_on: [url],
content: event.content || "",
})
seen.set(event.id, event.seen_on)
try {
if (!verifySignature(event)) {
return
}
} catch (e) {
console.error(e)
return
}
if (!matchFilters(filters, event)) {
return
}
if (shouldProcess) {
this.engine.Events.queue.push(event)
}
onEvent?.(event)
},
onEose: (url: string) => {
onEose?.(url)
eose.add(url)
if (timeout && eose.size === relays.length) {
subscription.close()
}
},
onEvent?.(event)
})
return subscription

View File

@ -80,7 +80,7 @@ export class FeedLoader {
// Wait until a good number of subscriptions have completed to reduce the chance of
// out of order notes
this.ready = race(0.2, pluck("complete", subs))
this.ready = race(0.2, pluck("result", subs))
}
// Control

View File

@ -76,7 +76,7 @@ export class PubkeyLoader {
await Promise.all(
pluck(
"complete",
"result",
chunk(256, pubkeys).map((chunk: string[]) =>
this.engine.Network.subscribe({
relays: getChunkRelays(chunk),

View File

@ -1,16 +1,86 @@
import {verifySignature, matchFilters} from "nostr-tools"
import EventEmitter from "events"
import {defer} from "hurdak"
import {defer, tryFunc} from "hurdak"
import type {Executor} from "paravel"
import type {Event, Filter} from "src/engine/types"
type SubscriptionOpts = {
executor: typeof Executor
relays: string[]
filters: Filter[]
timeout?: number
}
export class Subscription extends EventEmitter {
opened = Date.now()
closed: number = null
complete = defer()
result = defer()
events = []
seen = new Map()
eose = new Set()
sub: {unsubscribe: () => void} = null
id = Math.random().toString().slice(12, 16)
constructor(readonly opts: SubscriptionOpts) {
super()
if (opts.timeout) {
setTimeout(this.close, opts.timeout)
}
this.sub = opts.executor.subscribe(opts.filters, {
onEvent: this.onEvent,
onEose: this.onEose,
})
}
onEvent = (url: string, event: Event) => {
const {filters} = this.opts
const seen_on = this.seen.get(event.id)
if (seen_on) {
if (!seen_on.includes(url)) {
seen_on.push(url)
}
return
}
event.seen_on = [url]
event.content = event.content || ""
this.seen.set(event.id, event.seen_on)
if (!tryFunc(() => verifySignature(event))) {
return
}
if (!matchFilters(filters, event)) {
return
}
this.emit("event", event)
}
onEose = (url: string) => {
const {timeout, relays} = this.opts
this.emit("eose", url)
this.eose.add(url)
if (timeout && this.eose.size === relays.length) {
this.close()
}
}
close = () => {
if (!this.closed) {
this.closed = Date.now()
this.complete.resolve()
this.emit("close")
this.result.resolve(this.events)
this.sub.unsubscribe()
this.opts.executor.target.cleanup()
this.emit("close", this.events)
this.removeAllListeners()
}
}

View File

@ -16,7 +16,6 @@
const dispatch = createEventDispatcher()
$: _href = external ? href : null
$: target = external ? "_blank" : null
let className
@ -49,7 +48,7 @@
e.stopPropagation()
}
if (href && !external) {
if (href && tag !== "a") {
navigate(href)
}
@ -58,7 +57,7 @@
</script>
{#if tag === "a"}
<a class={className} on:click={onClick} href={_href} {target}>
<a class={className} on:click={onClick} {href} {target}>
<slot />
</a>
{:else if tag === "button"}