Use load/subscribe

This commit is contained in:
Jonathan Staab 2023-09-12 15:05:33 -07:00
parent 8b9e069f1d
commit d269e1bcb5
25 changed files with 151 additions and 213 deletions

View File

@ -18,6 +18,7 @@
import TopNavMenu from "src/app/TopNavMenu.svelte"
import {menuIsOpen} from "src/app/state"
import {
load,
topics,
people,
peopleWithName,
@ -29,7 +30,6 @@
hasNewNotfications,
getUserRelayUrls,
searchableRelays,
Subscription,
} from "src/engine2"
const logoUrl = import.meta.env.VITE_LOGO_URL || "/images/logo.png"
@ -75,16 +75,14 @@
// If we have a query, search using nostr.band. If not, ask for random profiles.
// This allows us to populate results even if search isn't supported by forced urls
if (term.length > 2) {
new Subscription({
load({
relays: $searchableRelays,
filters: [{kinds: [0], search, limit: 10}],
timeout: 3000,
})
} else if (people.get().length < 50) {
new Subscription({
load({
relays: getUserRelayUrls("read"),
filters: [{kinds: [0], limit: 50}],
timeout: 3000,
})
}
})

View File

@ -7,9 +7,9 @@
import ContentEditable from "src/partials/ContentEditable.svelte"
import Suggestions from "src/partials/Suggestions.svelte"
import {
load,
derivePerson,
displayPerson,
Subscription,
isFollowing,
searchableRelays,
getPubkeyHints,
@ -37,13 +37,11 @@
const loadPeople = debounce(500, search => {
if (search.length > 2 && search.startsWith("@")) {
const sub = new Subscription({
timeout: 3000,
load({
relays: $searchableRelays,
filters: [{kinds: [0], search, limit: 10}],
onEvent: () => applySearch(getInfo().word),
})
sub.on("event", () => applySearch(getInfo().word))
}
})

View File

@ -21,7 +21,7 @@
},
})
let sub, note
let note
let loading = true
onMount(async () => {
@ -32,18 +32,13 @@
context.addContext([e], {depth: 0})
note = first(context.applyContext([e]))
sub.close()
},
})
await Promise.all(context.getAllSubs())
loading = false
})
onDestroy(() => {
sub?.close()
context.stop()
})
</script>

View File

@ -1,6 +1,5 @@
<script lang="ts">
import type {Filter} from "nostr-tools"
import {onDestroy} from "svelte"
import {filterVals} from "hurdak"
import {isShareableRelay} from "src/util/nostr"
import {fly} from "src/util/transition"
@ -10,9 +9,9 @@
import Spinner from "src/partials/Spinner.svelte"
import PersonCircle from "src/app/shared/PersonCircle.svelte"
import {
load,
getSetting,
displayPubkey,
Subscription,
isEventMuted,
getEventHints,
mergeHints,
@ -29,29 +28,26 @@
const {id, identifier, kind, pubkey} = value
const relays = mergeHints(getSetting("relay_limit"), [
// Agora social has a bug
(value.relays || []).flatMap(r => r.split(",")).filter(isShareableRelay),
getEventHints(getSetting("relay_limit"), note),
])
const filters = [
id
? {ids: [id]}
: filterVals(xs => xs.length > 0, {
"#d": [identifier],
kinds: [kind],
authors: [pubkey],
}),
] as Filter[]
const sub = new Subscription({timeout: 30000, relays, filters})
sub.on("event", event => {
loading = false
muted = isEventMuted(event).get()
quote = event
sub.close()
load({
relays: mergeHints(getSetting("relay_limit"), [
// Agora social has a bug
(value.relays || []).flatMap(r => r.split(",")).filter(isShareableRelay),
getEventHints(getSetting("relay_limit"), note),
]),
filters: [
id
? {ids: [id]}
: filterVals(xs => xs.length > 0, {
"#d": [identifier],
kinds: [kind],
authors: [pubkey],
}),
] as Filter[],
onEvent: event => {
loading = false
muted = isEventMuted(event).get()
quote = event
},
})
const openQuote = e => {
@ -66,10 +62,6 @@
const unmute = e => {
muted = false
}
onDestroy(() => {
sub.close()
})
</script>
<div class="py-2" on:click|stopPropagation>

View File

@ -1,4 +1,4 @@
<script type="ts">
<script lang="ts">
import {onMount} from "svelte"
import {uniq, pluck} from "ramda"
import {batch} from "hurdak"
@ -6,7 +6,7 @@
import Spinner from "src/partials/Spinner.svelte"
import PersonSummary from "src/app/shared/PersonSummary.svelte"
import type {Event} from "src/engine2"
import {Subscription, getSetting, loadPubkeys, getPubkeyHints, follows} from "src/engine2"
import {subscribe, getSetting, loadPubkeys, getPubkeyHints, follows} from "src/engine2"
export let type
export let pubkey
@ -17,23 +17,19 @@
if (type === "follows") {
pubkeys = $follows
} else {
const sub = new Subscription({
const sub = subscribe({
relays: getPubkeyHints(getSetting("relay_limit"), pubkey, "read"),
filters: [{kinds: [3], "#p": [pubkey]}],
})
sub.on(
"event",
batch(500, (events: Event[]) => {
onEvent: batch(500, (events: Event[]) => {
const newPubkeys = pluck("pubkey", events)
loadPubkeys(newPubkeys)
pubkeys = uniq(pubkeys.concat(newPubkeys))
})
)
}),
})
return sub.close
return () => sub.close()
}
})
</script>

View File

@ -6,7 +6,7 @@
import {numberFmt} from "src/util/misc"
import {modal} from "src/partials/state"
import type {Event} from "src/engine2"
import {session, people, count, Subscription, getPubkeyHints} from "src/engine2"
import {session, people, count, subscribe, getPubkeyHints} from "src/engine2"
export let pubkey
@ -32,23 +32,18 @@
} else {
const followers = new Set()
sub = new Subscription({
timeout: 30_000,
sub = subscribe({
ephemeral: true,
relays: getPubkeyHints(3, $session?.pubkey, "read"),
filters: [{kinds: [3], "#p": [pubkey]}],
})
sub.on(
"event",
batch(300, (events: Event[]) => {
onEvent: batch(300, (events: Event[]) => {
for (const e of events) {
followers.add(e.pubkey)
}
followersCount.set(followers.size)
})
)
}),
})
}
}

View File

@ -18,7 +18,7 @@ import {
channels,
follows,
network,
Subscription,
subscribe,
getUserRelayUrls,
getSetting,
dufflepud,
@ -129,7 +129,7 @@ export const listenForNotifications = async () => {
// Only grab one event from each category/relay so we have enough to show
// the notification badges, but load the details lazily
listener?.close()
listener = new Subscription({
listener = subscribe({
relays: getUserRelayUrls("read"),
filters: [
// Messages

View File

@ -14,7 +14,7 @@
displayPubkey,
createNip24Message,
nip24MarkChannelRead,
loadNip59Messages,
listenForNip59Messages,
} from "src/engine2"
export let entity
@ -29,7 +29,7 @@
const showPerson = pubkey => modal.push({type: "person/detail", pubkey})
onMount(() => {
const sub = loadNip59Messages()
const sub = listenForNip59Messages()
return () => sub.close()
})

View File

@ -17,7 +17,7 @@
publishNip28Message,
joinNip28Channel,
leaveNip28Channel,
loadNip28Messages,
listenForNip28Messages,
publishNip28ChannelChecked,
} from "src/engine2"
@ -37,7 +37,7 @@
const sendMessage = content => publishNip28Message(id, content).result
onMount(() => {
const sub = loadNip28Messages(id)
const sub = listenForNip28Messages(id)
return () => sub.close()
})

View File

@ -11,7 +11,7 @@
import Content from "src/partials/Content.svelte"
import NoteById from "src/app/shared/NoteById.svelte"
import PersonBadgeSmall from "src/app/shared/PersonBadgeSmall.svelte"
import {session, labels, getUserRelayUrls, follows, Subscription} from "src/engine2"
import {session, labels, getUserRelayUrls, follows, subscribe} from "src/engine2"
type LabelGroup = {
label: string
@ -60,7 +60,7 @@
const showGroup = ({label, ids, hints}) => modal.push({type: "label/detail", label, ids, hints})
onMount(() => {
const sub = new Subscription({
const sub = subscribe({
relays: getUserRelayUrls("read"),
filters: [
{

View File

@ -13,7 +13,7 @@
displayPerson,
publishNip04Message,
nip04MarkChannelRead,
loadNip04Messages,
listenForNip04Messages,
} from "src/engine2"
import {routes} from "src/app/state"
import PersonCircle from "src/app/shared/PersonCircle.svelte"
@ -34,7 +34,7 @@
}
onMount(() => {
const sub = loadNip04Messages(pubkey)
const sub = listenForNip04Messages(pubkey)
return () => sub.close()
})

View File

@ -11,7 +11,7 @@
import Modal from "src/partials/Modal.svelte"
import Spinner from "src/partials/Spinner.svelte"
import Note from "src/app/shared/Note.svelte"
import {Subscription, selectHints, getSetting} from "src/engine2"
import {load, selectHints, getSetting} from "src/engine2"
export let note
export let relays = []
@ -31,7 +31,6 @@
},
})
let sub
let loading = true
let feedRelay = null
let displayNote = asDisplayEvent(note)
@ -40,30 +39,22 @@
// If our note came from a feed, we can preload context
context.hydrate([displayNote], depth)
sub = new Subscription({
await load({
filters: [{ids: [note.id]}],
timeout: 8000,
relays: selectHints(getSetting("relay_limit"), relays),
onEvent: e => {
context.addContext([e], {depth})
displayNote = first(context.applyContext([e]))
},
})
sub.on("event", e => {
context.addContext([e], {depth})
displayNote = first(context.applyContext([e]))
sub.close()
})
await sub.result
await Promise.all(context.getAllSubs())
info("NoteDetail", displayNote)
loading = false
})
onDestroy(() => {
sub?.close()
context.stop()
})
</script>

View File

@ -7,7 +7,13 @@
import Anchor from "src/partials/Anchor.svelte"
import Input from "src/partials/Input.svelte"
import Textarea from "src/partials/Textarea.svelte"
import {getSetting, displayPubkey, requestZap, collectInvoice, loadZapResponse} from "src/engine2"
import {
getSetting,
displayPubkey,
requestZap,
collectInvoice,
listenForZapResponse,
} from "src/engine2"
export let pubkey
export let note = null
@ -38,11 +44,13 @@
await collectInvoice(invoice)
// Listen for the zap confirmation
sub = loadZapResponse({relays, pubkey})
sub.on("event", event => {
zap.confirmed = true
setTimeout(() => modal.pop(), 1000)
sub = listenForZapResponse(pubkey, {
relays,
onEvent: event => {
zap.confirmed = true
setTimeout(() => modal.pop(), 1000)
sub.close()
},
})
}

View File

@ -1,14 +1,14 @@
import "./nip01"
import "./nip02"
import "./nip05"
import "./nip09"
import "./nip24"
import "./nip28"
import "./nip51"
import "./nip57"
import "./nip65"
import "./nip78"
import "./alerts"
import "./deletes"
import "./lists"
import "./topics"
export * from "./core"

View File

@ -3,6 +3,7 @@ import {derived} from "src/engine2/util"
import {alerts, alertsLastChecked} from "src/engine2/state"
export const latestNotification = alerts.derived(reduce((n, e) => Math.max(n, e.created_at), 0))
export const hasNewNotfications = derived(
[alertsLastChecked, latestNotification],
([c, n]) => n > c

View File

@ -15,8 +15,9 @@ import {
getRootHints,
getParentHints,
} from "src/engine2/queries"
import {Subscription} from "./subscription"
import {subscribe} from "./subscription"
import {loadPubkeys} from "./pubkeys"
import {load} from "./load"
const fromDisplayEvent = (e: DisplayEvent): Event =>
omit(["zaps", "likes", "replies", "matchesFilter"], e)
@ -32,34 +33,27 @@ export class ContextLoader {
stopped: boolean
data: Collection<Event>
seen: Set<string>
subs: Record<string, Array<{close: () => void}>>
subs: {close: () => void}[]
constructor(readonly opts: ContextLoaderOpts) {
this.stopped = false
this.data = collection<Event>("id")
this.seen = new Set()
this.subs = {
context: [],
listeners: [],
}
this.subs = []
}
// Utils
addSubs(key: string, subs: Array<Subscription>) {
for (const sub of ensurePlural(subs)) {
this.subs[key].push(sub)
addSubs(subs) {
for (const sub of subs) {
this.subs.push(sub)
sub.on("close", () => {
this.subs[key] = without([sub], this.subs[key])
this.subs = without([sub], this.subs)
})
}
}
getAllSubs() {
return flatten(Object.values(this.subs))
}
getReplyKinds() {
const {ENABLE_ZAPS} = env.get()
@ -197,18 +191,11 @@ export class ContextLoader {
})
if (parentsInfo.length > 0) {
const sub = new Subscription({
timeout: 5000,
load({
filters: [{ids: pluck("id", parentsInfo)}],
relays: this.mergeHints(pluck("hints", parentsInfo)),
onEvent: batch(100, (context: Event[]) => this.addContext(context, {depth: 2})),
})
sub.on(
"event",
batch(100, (context: Event[]) => this.addContext(context, {depth: 2}))
)
this.addSubs("context", [sub])
}
}
@ -232,18 +219,11 @@ export class ContextLoader {
) as Event[]
for (const c of chunk(256, events)) {
const sub = new Subscription({
timeout: 5000,
load({
relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))),
filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c as Event[])}],
onEvent: batch(100, (context: Event[]) => this.addContext(context, {depth: depth - 1})),
})
sub.on(
"event",
batch(100, (context: Event[]) => this.addContext(context, {depth: depth - 1}))
)
this.addSubs("context", [sub])
}
}
})
@ -253,7 +233,7 @@ export class ContextLoader {
return
}
this.subs.listeners.forEach(sub => sub.close())
this.subs.forEach(sub => sub.close())
const contextByParentId = groupBy(findReplyId, this.data.get())
@ -266,17 +246,13 @@ export class ContextLoader {
)
for (const c of chunk(256, findNotes(this.data.get()))) {
const sub = new Subscription({
relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))),
filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c), since: now()}],
})
sub.on(
"event",
batch(100, (context: Event[]) => this.addContext(context, {depth: 2}))
)
this.addSubs("listeners", [sub])
this.addSubs([
subscribe({
relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))),
filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c), since: now()}],
onEvent: batch(100, (context: Event[]) => this.addContext(context, {depth: 2})),
}),
])
}
})
@ -328,7 +304,7 @@ export class ContextLoader {
stop() {
this.stopped = true
for (const sub of this.getAllSubs()) {
for (const sub of this.subs) {
sub.close()
}
}

View File

@ -3,7 +3,8 @@ import {seconds, first} from "hurdak"
import {now} from "src/util/misc"
import {EPOCH} from "src/util/nostr"
import type {Filter, Event} from "src/engine2/model"
import {Subscription} from "./subscription"
import type {Subscription} from "./subscription"
import {subscribe} from "./subscription"
export type CursorOpts = {
relay: string
@ -36,38 +37,34 @@ export class Cursor {
let count = 0
const sub = new Subscription({
return subscribe({
timeout: 5000,
relays: [relay],
filters: filters.map(mergeRight({until, limit, since})),
onEvent: (event: Event) => {
this.until = Math.min(until, event.created_at) - 1
this.buffer.push(event)
count += 1
onEvent?.(event)
},
onClose: () => {
this.loading = false
// Relays can't be relied upon to return events in descending order, do exponential
// windowing to ensure we get the most recent stuff on first load, but eventually find it all
if (count === 0) {
this.delta *= 10
}
if (this.since <= EPOCH) {
this.done = true
}
this.since -= this.delta
},
})
sub.on("event", (event: Event) => {
this.until = Math.min(until, event.created_at) - 1
this.buffer.push(event)
count += 1
onEvent?.(event)
})
sub.on("close", () => {
this.loading = false
// Relays can't be relied upon to return events in descending order, do exponential
// windowing to ensure we get the most recent stuff on first load, but eventually find it all
if (count === 0) {
this.delta *= 10
}
if (this.since <= EPOCH) {
this.done = true
}
this.since -= this.delta
})
return sub
}
take(n = Infinity) {

View File

@ -5,7 +5,7 @@ import {findReplyId} from "src/util/nostr"
import type {Event, DisplayEvent, Filter} from "src/engine2/model"
import {writable} from "src/engine2/util/store"
import {getUrls} from "src/engine2/queries"
import {Subscription} from "./subscription"
import {subscribe} from "./subscription"
import {Cursor, MultiCursor} from "./cursor"
import {ContextLoader} from "./context"
@ -44,20 +44,16 @@ export class FeedLoader {
// No point in subscribing if we have an end date
if (!any(prop("until"), ensurePlural(opts.filters) as any[])) {
const sub = new Subscription({
relays: urls,
filters: opts.filters.map(assoc("since", this.since)),
})
sub.on(
"event",
batch(1000, (context: Event[]) => {
this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth})
this.stream.update($stream => $stream.concat(context))
})
)
this.addSubs([sub])
this.addSubs([
subscribe({
relays: urls,
filters: opts.filters.map(assoc("since", this.since)),
onEvent: batch(1000, (context: Event[]) => {
this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth})
this.stream.update($stream => $stream.concat(context))
}),
}),
])
}
this.cursor = new MultiCursor(
@ -84,7 +80,7 @@ export class FeedLoader {
// Control
addSubs(subs: Array<Subscription>) {
addSubs(subs) {
for (const sub of ensurePlural(subs)) {
this.subs.push(sub)

View File

@ -1,10 +1,10 @@
import {user, getInboxHints, getSetting} from "src/engine2/queries"
import {Subscription} from "./subscription"
import {subscribe} from "./subscription"
export function loadNip04Messages(contactPubkey: string) {
export function listenForNip04Messages(contactPubkey: string) {
const {pubkey: userPubkey} = user.get()
return new Subscription({
return subscribe({
relays: getInboxHints(getSetting("relay_limit"), [contactPubkey, userPubkey]),
filters: [
{kinds: [4], authors: [userPubkey], "#p": [contactPubkey]},

View File

@ -1,12 +1,12 @@
import {channels} from "src/engine2/state"
import {selectHints, getSetting} from "src/engine2/queries"
import {Subscription} from "./subscription"
import {subscribe} from "./subscription"
export const loadNip28Messages = channelId => {
export const listenForNip28Messages = channelId => {
const channel = channels.key(channelId).get()
const relays = selectHints(getSetting("relay_limit"), channel?.relays || [])
return new Subscription({
return subscribe({
relays,
filters: [
{kinds: [40], ids: [channelId]},

View File

@ -1,12 +1,12 @@
import {now} from "src/util/misc"
import {people} from "src/engine2/state"
import {Subscription} from "./subscription"
import {subscribe} from "./subscription"
export function loadZapResponse({pubkey, relays}) {
export const listenForZapResponse = (pubkey, opts) => {
const {zapper} = people.key(pubkey).get()
return new Subscription({
relays,
return subscribe({
...opts,
filters: [
{
kinds: [9735],

View File

@ -1,10 +1,10 @@
import {user, getPubkeyHints, getSetting} from "src/engine2/queries"
import {Subscription} from "./subscription"
import {subscribe} from "./subscription"
export function loadNip59Messages() {
export function listenForNip59Messages() {
const {pubkey} = user.get()
return new Subscription({
return subscribe({
relays: getPubkeyHints(getSetting("relay_limit"), pubkey, "read"),
filters: [{kinds: [1059], "#p": [pubkey]}],
})

View File

@ -4,7 +4,7 @@ import {findReplyId, findRootId} from "src/util/nostr"
import type {Event, DisplayEvent} from "src/engine2/model"
import {writable} from "src/engine2/util/store"
import {ContextLoader} from "./context"
import {Subscription} from "./subscription"
import {load} from "./load"
export type ThreadOpts = {
anchorId: string
@ -36,20 +36,15 @@ export class ThreadLoader {
const filteredIds = ids.filter(id => id && !seen.has(id))
if (filteredIds.length > 0) {
const sub = new Subscription({
timeout: 4000,
load({
relays: this.opts.relays,
filters: [{ids: filteredIds}],
})
sub.on(
"event",
batch(300, (events: Event[]) => {
onEvent: batch(300, (events: Event[]) => {
this.context.addContext(events, {depth: 1})
this.addToThread(this.context.applyContext(events))
this.loadNotes(events.flatMap(e => [findReplyId(e), findRootId(e)]))
})
)
}),
})
}
}