From d269e1bcb550f335b4df4434c7f3e863ce92d592 Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Tue, 12 Sep 2023 15:05:33 -0700 Subject: [PATCH] Use load/subscribe --- src/app/TopNav.svelte | 8 +-- src/app/shared/Compose.svelte | 8 +-- src/app/shared/NoteById.svelte | 7 +- src/app/shared/NoteContentQuote.svelte | 50 ++++++-------- src/app/shared/PersonList.svelte | 18 ++--- src/app/shared/PersonStats.svelte | 15 ++--- src/app/state.ts | 4 +- src/app/views/ChannelsDetail.svelte | 4 +- src/app/views/ChatDetail.svelte | 4 +- src/app/views/Explore.svelte | 4 +- src/app/views/MessagesDetail.svelte | 4 +- src/app/views/NoteDetail.svelte | 23 ++----- src/app/views/ZapModal.svelte | 20 ++++-- src/engine2/projections/index.ts | 4 +- .../projections/{deletes.ts => nip09.ts} | 0 .../projections/{lists.ts => nip51.ts} | 0 src/engine2/queries/alerts.ts | 1 + src/engine2/requests/context.ts | 66 ++++++------------- src/engine2/requests/cursor.ts | 55 ++++++++-------- src/engine2/requests/feed.ts | 28 ++++---- src/engine2/requests/nip04.ts | 6 +- src/engine2/requests/nip28.ts | 6 +- src/engine2/requests/nip57.ts | 8 +-- src/engine2/requests/nip59.ts | 6 +- src/engine2/requests/thread.ts | 15 ++--- 25 files changed, 151 insertions(+), 213 deletions(-) rename src/engine2/projections/{deletes.ts => nip09.ts} (100%) rename src/engine2/projections/{lists.ts => nip51.ts} (100%) diff --git a/src/app/TopNav.svelte b/src/app/TopNav.svelte index 14879877..d2f48aa2 100644 --- a/src/app/TopNav.svelte +++ b/src/app/TopNav.svelte @@ -18,6 +18,7 @@ import TopNavMenu from "src/app/TopNavMenu.svelte" import {menuIsOpen} from "src/app/state" import { + load, topics, people, peopleWithName, @@ -29,7 +30,6 @@ hasNewNotfications, getUserRelayUrls, searchableRelays, - Subscription, } from "src/engine2" const logoUrl = import.meta.env.VITE_LOGO_URL || "/images/logo.png" @@ -75,16 +75,14 @@ // If we have a query, search using nostr.band. If not, ask for random profiles. // This allows us to populate results even if search isn't supported by forced urls if (term.length > 2) { - new Subscription({ + load({ relays: $searchableRelays, filters: [{kinds: [0], search, limit: 10}], - timeout: 3000, }) } else if (people.get().length < 50) { - new Subscription({ + load({ relays: getUserRelayUrls("read"), filters: [{kinds: [0], limit: 50}], - timeout: 3000, }) } }) diff --git a/src/app/shared/Compose.svelte b/src/app/shared/Compose.svelte index cc7dc8cc..6049be34 100644 --- a/src/app/shared/Compose.svelte +++ b/src/app/shared/Compose.svelte @@ -7,9 +7,9 @@ import ContentEditable from "src/partials/ContentEditable.svelte" import Suggestions from "src/partials/Suggestions.svelte" import { + load, derivePerson, displayPerson, - Subscription, isFollowing, searchableRelays, getPubkeyHints, @@ -37,13 +37,11 @@ const loadPeople = debounce(500, search => { if (search.length > 2 && search.startsWith("@")) { - const sub = new Subscription({ - timeout: 3000, + load({ relays: $searchableRelays, filters: [{kinds: [0], search, limit: 10}], + onEvent: () => applySearch(getInfo().word), }) - - sub.on("event", () => applySearch(getInfo().word)) } }) diff --git a/src/app/shared/NoteById.svelte b/src/app/shared/NoteById.svelte index c26bbdbd..2e511c8b 100644 --- a/src/app/shared/NoteById.svelte +++ b/src/app/shared/NoteById.svelte @@ -21,7 +21,7 @@ }, }) - let sub, note + let note let loading = true onMount(async () => { @@ -32,18 +32,13 @@ context.addContext([e], {depth: 0}) note = first(context.applyContext([e])) - - sub.close() }, }) - await Promise.all(context.getAllSubs()) - loading = false }) onDestroy(() => { - sub?.close() context.stop() }) diff --git a/src/app/shared/NoteContentQuote.svelte b/src/app/shared/NoteContentQuote.svelte index 03c8e31c..2a61b806 100644 --- a/src/app/shared/NoteContentQuote.svelte +++ b/src/app/shared/NoteContentQuote.svelte @@ -1,6 +1,5 @@
diff --git a/src/app/shared/PersonList.svelte b/src/app/shared/PersonList.svelte index a8b87fc4..06b580c4 100644 --- a/src/app/shared/PersonList.svelte +++ b/src/app/shared/PersonList.svelte @@ -1,4 +1,4 @@ - diff --git a/src/app/shared/PersonStats.svelte b/src/app/shared/PersonStats.svelte index d58c3541..98bc12e0 100644 --- a/src/app/shared/PersonStats.svelte +++ b/src/app/shared/PersonStats.svelte @@ -6,7 +6,7 @@ import {numberFmt} from "src/util/misc" import {modal} from "src/partials/state" import type {Event} from "src/engine2" - import {session, people, count, Subscription, getPubkeyHints} from "src/engine2" + import {session, people, count, subscribe, getPubkeyHints} from "src/engine2" export let pubkey @@ -32,23 +32,18 @@ } else { const followers = new Set() - sub = new Subscription({ - timeout: 30_000, + sub = subscribe({ ephemeral: true, relays: getPubkeyHints(3, $session?.pubkey, "read"), filters: [{kinds: [3], "#p": [pubkey]}], - }) - - sub.on( - "event", - batch(300, (events: Event[]) => { + onEvent: batch(300, (events: Event[]) => { for (const e of events) { followers.add(e.pubkey) } followersCount.set(followers.size) - }) - ) + }), + }) } } diff --git a/src/app/state.ts b/src/app/state.ts index 007d77c8..b8836474 100644 --- a/src/app/state.ts +++ b/src/app/state.ts @@ -18,7 +18,7 @@ import { channels, follows, network, - Subscription, + subscribe, getUserRelayUrls, getSetting, dufflepud, @@ -129,7 +129,7 @@ export const listenForNotifications = async () => { // Only grab one event from each category/relay so we have enough to show // the notification badges, but load the details lazily listener?.close() - listener = new Subscription({ + listener = subscribe({ relays: getUserRelayUrls("read"), filters: [ // Messages diff --git a/src/app/views/ChannelsDetail.svelte b/src/app/views/ChannelsDetail.svelte index b8bc3ecd..53201384 100644 --- a/src/app/views/ChannelsDetail.svelte +++ b/src/app/views/ChannelsDetail.svelte @@ -14,7 +14,7 @@ displayPubkey, createNip24Message, nip24MarkChannelRead, - loadNip59Messages, + listenForNip59Messages, } from "src/engine2" export let entity @@ -29,7 +29,7 @@ const showPerson = pubkey => modal.push({type: "person/detail", pubkey}) onMount(() => { - const sub = loadNip59Messages() + const sub = listenForNip59Messages() return () => sub.close() }) diff --git a/src/app/views/ChatDetail.svelte b/src/app/views/ChatDetail.svelte index ff485e41..330008ae 100644 --- a/src/app/views/ChatDetail.svelte +++ b/src/app/views/ChatDetail.svelte @@ -17,7 +17,7 @@ publishNip28Message, joinNip28Channel, leaveNip28Channel, - loadNip28Messages, + listenForNip28Messages, publishNip28ChannelChecked, } from "src/engine2" @@ -37,7 +37,7 @@ const sendMessage = content => publishNip28Message(id, content).result onMount(() => { - const sub = loadNip28Messages(id) + const sub = listenForNip28Messages(id) return () => sub.close() }) diff --git a/src/app/views/Explore.svelte b/src/app/views/Explore.svelte index c756ee03..c0495817 100644 --- a/src/app/views/Explore.svelte +++ b/src/app/views/Explore.svelte @@ -11,7 +11,7 @@ import Content from "src/partials/Content.svelte" import NoteById from "src/app/shared/NoteById.svelte" import PersonBadgeSmall from "src/app/shared/PersonBadgeSmall.svelte" - import {session, labels, getUserRelayUrls, follows, Subscription} from "src/engine2" + import {session, labels, getUserRelayUrls, follows, subscribe} from "src/engine2" type LabelGroup = { label: string @@ -60,7 +60,7 @@ const showGroup = ({label, ids, hints}) => modal.push({type: "label/detail", label, ids, hints}) onMount(() => { - const sub = new Subscription({ + const sub = subscribe({ relays: getUserRelayUrls("read"), filters: [ { diff --git a/src/app/views/MessagesDetail.svelte b/src/app/views/MessagesDetail.svelte index b8aba79e..7632bff8 100644 --- a/src/app/views/MessagesDetail.svelte +++ b/src/app/views/MessagesDetail.svelte @@ -13,7 +13,7 @@ displayPerson, publishNip04Message, nip04MarkChannelRead, - loadNip04Messages, + listenForNip04Messages, } from "src/engine2" import {routes} from "src/app/state" import PersonCircle from "src/app/shared/PersonCircle.svelte" @@ -34,7 +34,7 @@ } onMount(() => { - const sub = loadNip04Messages(pubkey) + const sub = listenForNip04Messages(pubkey) return () => sub.close() }) diff --git a/src/app/views/NoteDetail.svelte b/src/app/views/NoteDetail.svelte index bc005fc0..06559059 100644 --- a/src/app/views/NoteDetail.svelte +++ b/src/app/views/NoteDetail.svelte @@ -11,7 +11,7 @@ import Modal from "src/partials/Modal.svelte" import Spinner from "src/partials/Spinner.svelte" import Note from "src/app/shared/Note.svelte" - import {Subscription, selectHints, getSetting} from "src/engine2" + import {load, selectHints, getSetting} from "src/engine2" export let note export let relays = [] @@ -31,7 +31,6 @@ }, }) - let sub let loading = true let feedRelay = null let displayNote = asDisplayEvent(note) @@ -40,30 +39,22 @@ // If our note came from a feed, we can preload context context.hydrate([displayNote], depth) - sub = new Subscription({ + await load({ filters: [{ids: [note.id]}], - timeout: 8000, relays: selectHints(getSetting("relay_limit"), relays), + onEvent: e => { + context.addContext([e], {depth}) + + displayNote = first(context.applyContext([e])) + }, }) - sub.on("event", e => { - context.addContext([e], {depth}) - - displayNote = first(context.applyContext([e])) - - sub.close() - }) - - await sub.result - await Promise.all(context.getAllSubs()) - info("NoteDetail", displayNote) loading = false }) onDestroy(() => { - sub?.close() context.stop() }) diff --git a/src/app/views/ZapModal.svelte b/src/app/views/ZapModal.svelte index 61af19a9..c14d8b8f 100644 --- a/src/app/views/ZapModal.svelte +++ b/src/app/views/ZapModal.svelte @@ -7,7 +7,13 @@ import Anchor from "src/partials/Anchor.svelte" import Input from "src/partials/Input.svelte" import Textarea from "src/partials/Textarea.svelte" - import {getSetting, displayPubkey, requestZap, collectInvoice, loadZapResponse} from "src/engine2" + import { + getSetting, + displayPubkey, + requestZap, + collectInvoice, + listenForZapResponse, + } from "src/engine2" export let pubkey export let note = null @@ -38,11 +44,13 @@ await collectInvoice(invoice) // Listen for the zap confirmation - sub = loadZapResponse({relays, pubkey}) - - sub.on("event", event => { - zap.confirmed = true - setTimeout(() => modal.pop(), 1000) + sub = listenForZapResponse(pubkey, { + relays, + onEvent: event => { + zap.confirmed = true + setTimeout(() => modal.pop(), 1000) + sub.close() + }, }) } diff --git a/src/engine2/projections/index.ts b/src/engine2/projections/index.ts index 4b50c874..a2203346 100644 --- a/src/engine2/projections/index.ts +++ b/src/engine2/projections/index.ts @@ -1,14 +1,14 @@ import "./nip01" import "./nip02" import "./nip05" +import "./nip09" import "./nip24" import "./nip28" +import "./nip51" import "./nip57" import "./nip65" import "./nip78" import "./alerts" -import "./deletes" -import "./lists" import "./topics" export * from "./core" diff --git a/src/engine2/projections/deletes.ts b/src/engine2/projections/nip09.ts similarity index 100% rename from src/engine2/projections/deletes.ts rename to src/engine2/projections/nip09.ts diff --git a/src/engine2/projections/lists.ts b/src/engine2/projections/nip51.ts similarity index 100% rename from src/engine2/projections/lists.ts rename to src/engine2/projections/nip51.ts diff --git a/src/engine2/queries/alerts.ts b/src/engine2/queries/alerts.ts index 01b7c070..bf1a8dce 100644 --- a/src/engine2/queries/alerts.ts +++ b/src/engine2/queries/alerts.ts @@ -3,6 +3,7 @@ import {derived} from "src/engine2/util" import {alerts, alertsLastChecked} from "src/engine2/state" export const latestNotification = alerts.derived(reduce((n, e) => Math.max(n, e.created_at), 0)) + export const hasNewNotfications = derived( [alertsLastChecked, latestNotification], ([c, n]) => n > c diff --git a/src/engine2/requests/context.ts b/src/engine2/requests/context.ts index 368480c6..6e5e9d4a 100644 --- a/src/engine2/requests/context.ts +++ b/src/engine2/requests/context.ts @@ -15,8 +15,9 @@ import { getRootHints, getParentHints, } from "src/engine2/queries" -import {Subscription} from "./subscription" +import {subscribe} from "./subscription" import {loadPubkeys} from "./pubkeys" +import {load} from "./load" const fromDisplayEvent = (e: DisplayEvent): Event => omit(["zaps", "likes", "replies", "matchesFilter"], e) @@ -32,34 +33,27 @@ export class ContextLoader { stopped: boolean data: Collection seen: Set - subs: Record void}>> + subs: {close: () => void}[] constructor(readonly opts: ContextLoaderOpts) { this.stopped = false this.data = collection("id") this.seen = new Set() - this.subs = { - context: [], - listeners: [], - } + this.subs = [] } // Utils - addSubs(key: string, subs: Array) { - for (const sub of ensurePlural(subs)) { - this.subs[key].push(sub) + addSubs(subs) { + for (const sub of subs) { + this.subs.push(sub) sub.on("close", () => { - this.subs[key] = without([sub], this.subs[key]) + this.subs = without([sub], this.subs) }) } } - getAllSubs() { - return flatten(Object.values(this.subs)) - } - getReplyKinds() { const {ENABLE_ZAPS} = env.get() @@ -197,18 +191,11 @@ export class ContextLoader { }) if (parentsInfo.length > 0) { - const sub = new Subscription({ - timeout: 5000, + load({ filters: [{ids: pluck("id", parentsInfo)}], relays: this.mergeHints(pluck("hints", parentsInfo)), + onEvent: batch(100, (context: Event[]) => this.addContext(context, {depth: 2})), }) - - sub.on( - "event", - batch(100, (context: Event[]) => this.addContext(context, {depth: 2})) - ) - - this.addSubs("context", [sub]) } } @@ -232,18 +219,11 @@ export class ContextLoader { ) as Event[] for (const c of chunk(256, events)) { - const sub = new Subscription({ - timeout: 5000, + load({ relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))), filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c as Event[])}], + onEvent: batch(100, (context: Event[]) => this.addContext(context, {depth: depth - 1})), }) - - sub.on( - "event", - batch(100, (context: Event[]) => this.addContext(context, {depth: depth - 1})) - ) - - this.addSubs("context", [sub]) } } }) @@ -253,7 +233,7 @@ export class ContextLoader { return } - this.subs.listeners.forEach(sub => sub.close()) + this.subs.forEach(sub => sub.close()) const contextByParentId = groupBy(findReplyId, this.data.get()) @@ -266,17 +246,13 @@ export class ContextLoader { ) for (const c of chunk(256, findNotes(this.data.get()))) { - const sub = new Subscription({ - relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))), - filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c), since: now()}], - }) - - sub.on( - "event", - batch(100, (context: Event[]) => this.addContext(context, {depth: 2})) - ) - - this.addSubs("listeners", [sub]) + this.addSubs([ + subscribe({ + relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))), + filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c), since: now()}], + onEvent: batch(100, (context: Event[]) => this.addContext(context, {depth: 2})), + }), + ]) } }) @@ -328,7 +304,7 @@ export class ContextLoader { stop() { this.stopped = true - for (const sub of this.getAllSubs()) { + for (const sub of this.subs) { sub.close() } } diff --git a/src/engine2/requests/cursor.ts b/src/engine2/requests/cursor.ts index a83ef05a..a93002d7 100644 --- a/src/engine2/requests/cursor.ts +++ b/src/engine2/requests/cursor.ts @@ -3,7 +3,8 @@ import {seconds, first} from "hurdak" import {now} from "src/util/misc" import {EPOCH} from "src/util/nostr" import type {Filter, Event} from "src/engine2/model" -import {Subscription} from "./subscription" +import type {Subscription} from "./subscription" +import {subscribe} from "./subscription" export type CursorOpts = { relay: string @@ -36,38 +37,34 @@ export class Cursor { let count = 0 - const sub = new Subscription({ + return subscribe({ timeout: 5000, relays: [relay], filters: filters.map(mergeRight({until, limit, since})), + onEvent: (event: Event) => { + this.until = Math.min(until, event.created_at) - 1 + this.buffer.push(event) + + count += 1 + + onEvent?.(event) + }, + onClose: () => { + this.loading = false + + // Relays can't be relied upon to return events in descending order, do exponential + // windowing to ensure we get the most recent stuff on first load, but eventually find it all + if (count === 0) { + this.delta *= 10 + } + + if (this.since <= EPOCH) { + this.done = true + } + + this.since -= this.delta + }, }) - - sub.on("event", (event: Event) => { - this.until = Math.min(until, event.created_at) - 1 - this.buffer.push(event) - - count += 1 - - onEvent?.(event) - }) - - sub.on("close", () => { - this.loading = false - - // Relays can't be relied upon to return events in descending order, do exponential - // windowing to ensure we get the most recent stuff on first load, but eventually find it all - if (count === 0) { - this.delta *= 10 - } - - if (this.since <= EPOCH) { - this.done = true - } - - this.since -= this.delta - }) - - return sub } take(n = Infinity) { diff --git a/src/engine2/requests/feed.ts b/src/engine2/requests/feed.ts index 650853ac..6265ea02 100644 --- a/src/engine2/requests/feed.ts +++ b/src/engine2/requests/feed.ts @@ -5,7 +5,7 @@ import {findReplyId} from "src/util/nostr" import type {Event, DisplayEvent, Filter} from "src/engine2/model" import {writable} from "src/engine2/util/store" import {getUrls} from "src/engine2/queries" -import {Subscription} from "./subscription" +import {subscribe} from "./subscription" import {Cursor, MultiCursor} from "./cursor" import {ContextLoader} from "./context" @@ -44,20 +44,16 @@ export class FeedLoader { // No point in subscribing if we have an end date if (!any(prop("until"), ensurePlural(opts.filters) as any[])) { - const sub = new Subscription({ - relays: urls, - filters: opts.filters.map(assoc("since", this.since)), - }) - - sub.on( - "event", - batch(1000, (context: Event[]) => { - this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth}) - this.stream.update($stream => $stream.concat(context)) - }) - ) - - this.addSubs([sub]) + this.addSubs([ + subscribe({ + relays: urls, + filters: opts.filters.map(assoc("since", this.since)), + onEvent: batch(1000, (context: Event[]) => { + this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth}) + this.stream.update($stream => $stream.concat(context)) + }), + }), + ]) } this.cursor = new MultiCursor( @@ -84,7 +80,7 @@ export class FeedLoader { // Control - addSubs(subs: Array) { + addSubs(subs) { for (const sub of ensurePlural(subs)) { this.subs.push(sub) diff --git a/src/engine2/requests/nip04.ts b/src/engine2/requests/nip04.ts index 1db999c6..6d449753 100644 --- a/src/engine2/requests/nip04.ts +++ b/src/engine2/requests/nip04.ts @@ -1,10 +1,10 @@ import {user, getInboxHints, getSetting} from "src/engine2/queries" -import {Subscription} from "./subscription" +import {subscribe} from "./subscription" -export function loadNip04Messages(contactPubkey: string) { +export function listenForNip04Messages(contactPubkey: string) { const {pubkey: userPubkey} = user.get() - return new Subscription({ + return subscribe({ relays: getInboxHints(getSetting("relay_limit"), [contactPubkey, userPubkey]), filters: [ {kinds: [4], authors: [userPubkey], "#p": [contactPubkey]}, diff --git a/src/engine2/requests/nip28.ts b/src/engine2/requests/nip28.ts index c6d00036..cc38a37e 100644 --- a/src/engine2/requests/nip28.ts +++ b/src/engine2/requests/nip28.ts @@ -1,12 +1,12 @@ import {channels} from "src/engine2/state" import {selectHints, getSetting} from "src/engine2/queries" -import {Subscription} from "./subscription" +import {subscribe} from "./subscription" -export const loadNip28Messages = channelId => { +export const listenForNip28Messages = channelId => { const channel = channels.key(channelId).get() const relays = selectHints(getSetting("relay_limit"), channel?.relays || []) - return new Subscription({ + return subscribe({ relays, filters: [ {kinds: [40], ids: [channelId]}, diff --git a/src/engine2/requests/nip57.ts b/src/engine2/requests/nip57.ts index d269f795..7544a3dd 100644 --- a/src/engine2/requests/nip57.ts +++ b/src/engine2/requests/nip57.ts @@ -1,12 +1,12 @@ import {now} from "src/util/misc" import {people} from "src/engine2/state" -import {Subscription} from "./subscription" +import {subscribe} from "./subscription" -export function loadZapResponse({pubkey, relays}) { +export const listenForZapResponse = (pubkey, opts) => { const {zapper} = people.key(pubkey).get() - return new Subscription({ - relays, + return subscribe({ + ...opts, filters: [ { kinds: [9735], diff --git a/src/engine2/requests/nip59.ts b/src/engine2/requests/nip59.ts index ec279bce..f56a2e5f 100644 --- a/src/engine2/requests/nip59.ts +++ b/src/engine2/requests/nip59.ts @@ -1,10 +1,10 @@ import {user, getPubkeyHints, getSetting} from "src/engine2/queries" -import {Subscription} from "./subscription" +import {subscribe} from "./subscription" -export function loadNip59Messages() { +export function listenForNip59Messages() { const {pubkey} = user.get() - return new Subscription({ + return subscribe({ relays: getPubkeyHints(getSetting("relay_limit"), pubkey, "read"), filters: [{kinds: [1059], "#p": [pubkey]}], }) diff --git a/src/engine2/requests/thread.ts b/src/engine2/requests/thread.ts index f235a399..082d53ad 100644 --- a/src/engine2/requests/thread.ts +++ b/src/engine2/requests/thread.ts @@ -4,7 +4,7 @@ import {findReplyId, findRootId} from "src/util/nostr" import type {Event, DisplayEvent} from "src/engine2/model" import {writable} from "src/engine2/util/store" import {ContextLoader} from "./context" -import {Subscription} from "./subscription" +import {load} from "./load" export type ThreadOpts = { anchorId: string @@ -36,20 +36,15 @@ export class ThreadLoader { const filteredIds = ids.filter(id => id && !seen.has(id)) if (filteredIds.length > 0) { - const sub = new Subscription({ - timeout: 4000, + load({ relays: this.opts.relays, filters: [{ids: filteredIds}], - }) - - sub.on( - "event", - batch(300, (events: Event[]) => { + onEvent: batch(300, (events: Event[]) => { this.context.addContext(events, {depth: 1}) this.addToThread(this.context.applyContext(events)) this.loadNotes(events.flatMap(e => [findReplyId(e), findRootId(e)])) - }) - ) + }), + }) } }