Batch zapper/handle requests, fix some bugs including invalid id filters

This commit is contained in:
Jonathan Staab 2023-10-02 13:09:56 -07:00
parent 7d9618e32f
commit 73dafc1057
9 changed files with 113 additions and 50 deletions

View File

@ -7,10 +7,9 @@
import {onMount} from "svelte"
import {Router, links} from "svelte-routing"
import {globalHistory} from "svelte-routing/src/history"
import {isNil, last} from "ramda"
import {seconds, Fetch, shuffle} from "hurdak"
import {isNil, pluck, last} from "ramda"
import {seconds, Fetch} from "hurdak"
import {tryFetch, hexToBech32, bech32ToHex, now} from "src/util/misc"
import type {Relay} from "src/engine"
import {storage, session, stateKey, relays, getSetting, dufflepud} from "src/engine"
import * as engine from "src/engine"
import {loadAppData} from "src/app/state"
@ -116,24 +115,25 @@
}
// Find relays with old/missing metadata and refresh them. Only pick a
// few so we're not sending too many concurrent http requests
const staleRelays = shuffle(
relays.get().filter(r => (r.info?.last_checked || 0) < now() - seconds(7, "day"))
).slice(0, 10) as Relay[]
// few so we're not asking for too much data at once
const staleRelays = relays
.get()
.filter(r => (r.info?.last_checked || 0) < now() - seconds(7, "day"))
.slice(0, 50)
for (const relay of staleRelays) {
tryFetch(async () => {
const info = await Fetch.fetchJson(dufflepud("relay/info"), {
method: "POST",
body: JSON.stringify({url: relay.url}),
headers: {
"Content-Type": "application/json",
},
})
relays.key(relay.url).merge({...info, last_checked: now()})
tryFetch(async () => {
const result = await Fetch.fetchJson(dufflepud("relay/info"), {
method: "POST",
body: JSON.stringify({urls: pluck("url", staleRelays)}),
headers: {
"Content-Type": "application/json",
},
})
}
for (const {url, info} of result.data) {
relays.key(url).merge({...info, url, last_checked: now()})
}
})
}, 30_000)
return () => {

View File

@ -1,5 +1,4 @@
<script lang="ts">
import type {Filter} from "nostr-tools"
import {filterVals} from "hurdak"
import {modal} from "src/partials/state"
import Anchor from "src/partials/Anchor.svelte"
@ -13,6 +12,7 @@
isEventMuted,
getParentHints,
isShareableRelay,
getIdFilters,
selectHints,
} from "src/engine"
@ -33,15 +33,15 @@
load({
relays,
filters: [
id
? {ids: [id]}
: filterVals(xs => xs.length > 0, {
filters: id
? getIdFilters([id])
: [
filterVals(xs => xs.length > 0, {
"#d": [identifier],
kinds: [kind],
authors: [pubkey],
}),
] as Filter[],
],
onEvent: event => {
loading = false
muted = $isEventMuted(event)

View File

@ -2,11 +2,12 @@
import {onMount} from "svelte"
import {uniq, pluck} from "ramda"
import {batch} from "hurdak"
import {Tags} from "src/util/nostr"
import Content from "src/partials/Content.svelte"
import Spinner from "src/partials/Spinner.svelte"
import PersonSummary from "src/app/shared/PersonSummary.svelte"
import type {Event} from "src/engine"
import {subscribe, loadPubkeys, getPubkeyHints, follows} from "src/engine"
import {subscribe, loadPubkeys, getPubkeyHints} from "src/engine"
export let type
export let pubkey
@ -14,23 +15,30 @@
let pubkeys = []
onMount(() => {
if (type === "follows") {
pubkeys = Array.from($follows)
} else {
const sub = subscribe({
relays: getPubkeyHints(pubkey, "read"),
filters: [{kinds: [3], "#p": [pubkey]}],
onEvent: batch(500, (events: Event[]) => {
const newPubkeys = pluck("pubkey", events)
const sub =
type === "follows"
? subscribe({
relays: getPubkeyHints(pubkey, "read"),
filters: [{kinds: [3], authors: [pubkey]}],
onEvent: (e: Event) => {
pubkeys = Tags.from(e).type("p").values().all()
loadPubkeys(newPubkeys)
loadPubkeys(pubkeys)
},
})
: subscribe({
relays: getPubkeyHints(pubkey, "read"),
filters: [{kinds: [3], "#p": [pubkey]}],
onEvent: batch(500, (events: Event[]) => {
const newPubkeys = pluck("pubkey", events)
pubkeys = uniq(pubkeys.concat(newPubkeys))
}),
})
loadPubkeys(newPubkeys)
return () => sub.close()
}
pubkeys = uniq(pubkeys.concat(newPubkeys))
}),
})
return () => sub.close()
})
</script>

View File

@ -57,7 +57,7 @@ export class IndexedDB {
}
close() {
return this.db.close()
return this.db?.close()
}
delete() {

View File

@ -33,7 +33,7 @@ projections.addHandler(EventKind.Delete, e => {
projections.addHandler(EventKind.GiftWrap, e => {
const session = sessions.get()[Tags.from(e).getMeta("p")]
if (!session?.privkey) {
if (session?.method !== "privkey") {
return
}

View File

@ -4,6 +4,7 @@ import {findReplyId, findRootId} from "src/util/nostr"
import type {DisplayEvent} from "src/engine/notes/model"
import type {Event} from "src/engine/events/model"
import {writable} from "src/engine/core/utils"
import {getIdFilters} from "./filters"
import {load} from "./load"
export type ThreadOpts = {
@ -37,7 +38,7 @@ export class ThreadLoader {
if (filteredIds.length > 0) {
load({
relays: this.opts.relays,
filters: [{ids: filteredIds}],
filters: getIdFilters(filteredIds),
onEvent: batch(300, (events: Event[]) => {
this.addToThread(events)
this.loadNotes(events.flatMap(e => [findReplyId(e), findRootId(e)]))

View File

@ -9,7 +9,13 @@ import {env, sessions} from "src/engine/session/state"
import {_events} from "src/engine/events/state"
import {events, isEventMuted} from "src/engine/events/derived"
import {mergeHints, getPubkeyHints, getParentHints} from "src/engine/relays/utils"
import {loadPubkeys, load, subscribe, subscribePersistent} from "src/engine/network/utils"
import {
loadPubkeys,
load,
subscribe,
subscribePersistent,
getIdFilters,
} from "src/engine/network/utils"
const onNotificationEvent = batch(300, (chunk: Event[]) => {
const kinds = getNotificationKinds()
@ -21,7 +27,7 @@ const onNotificationEvent = batch(300, (chunk: Event[]) => {
load({
relays: mergeHints(eventsWithParent.map(getParentHints)),
filters: [{ids: eventsWithParent.map(findReplyId)}],
filters: getIdFilters(eventsWithParent.map(findReplyId)),
onEvent: e => _events.update($events => $events.concat(e)),
})

View File

@ -1,18 +1,39 @@
import {tryFunc, Fetch} from "hurdak"
import {uniq} from "ramda"
import {tryFunc, createMapOf, Fetch} from "hurdak"
import {Tags} from "src/util/nostr"
import {tryJson, hexToBech32} from "src/util/misc"
import {tryJson, hexToBech32, createBatcher} from "src/util/misc"
import {updateStore} from "src/engine/core/commands"
import {projections} from "src/engine/core/projections"
import {dufflepud} from "src/engine/session/utils"
import {getLnUrl} from "src/engine/zaps/utils"
import {people} from "./state"
const fetchHandle = createBatcher(500, async (handles: string[]) => {
const {data} = await tryFunc(() =>
Fetch.postJson(dufflepud("handle/info"), {handles: uniq(handles)})
)
const infoByHandle = createMapOf("handle", "info", data)
return handles.map(h => infoByHandle[h])
})
const fetchZapper = createBatcher(500, async (lnurls: string[]) => {
const {data} = await tryFunc(() =>
Fetch.postJson(dufflepud("zapper/info"), {lnurls: uniq(lnurls)})
)
const infoByLnurl = createMapOf("lnurl", "info", data)
return lnurls.map(h => infoByLnurl[h])
})
const updateHandle = async (e, {nip05}) => {
if (!nip05) {
return
}
const profile = await tryFunc(() => Fetch.postJson(dufflepud("handle/info"), {handle: nip05}))
const profile = await fetchHandle(nip05)
if (profile?.pubkey === e.pubkey) {
updateStore(people.key(e.pubkey), e.created_at, {
@ -34,7 +55,7 @@ const updateZapper = async (e, {lud16, lud06}) => {
return
}
const result = await tryFunc(() => Fetch.postJson(dufflepud("zapper/info"), {lnurl}))
const result = await fetchZapper(lnurl)
if (!result?.allowsNostr || !result?.nostrPubkey) {
return

View File

@ -1,7 +1,7 @@
import {bech32, utf8} from "@scure/base"
import {debounce} from "throttle-debounce"
import {pluck, sum, is, equals} from "ramda"
import {Storage, isPojo, first, seconds, tryFunc, sleep, round} from "hurdak"
import {Storage, defer, isPojo, first, seconds, tryFunc, sleep, round} from "hurdak"
import Fuse from "fuse.js/dist/fuse.min.js"
import {writable} from "svelte/store"
import {warn} from "src/util/logger"
@ -340,3 +340,30 @@ export const sumBy = (f, xs) => sum(xs.map(f))
export const stripProto = url => url.replace(/.*:\/\//, "")
export const ensureProto = url => (url.includes("://") ? url : "https://" + url)
export const createBatcher = <T, U>(t, execute: (request: T[]) => U[] | Promise<U[]>) => {
const queue = []
const _execute = async () => {
const items = queue.splice(0)
const results = await execute(pluck("request", items))
if (results.length !== items.length) {
console.warn("Execute must return a promise for each request", results, items)
}
results.forEach(async (r, i) => items[i].deferred.resolve(await r))
}
return (request: T): Promise<U> => {
const deferred = defer<U>()
if (queue.length === 0) {
setTimeout(_execute, t)
}
queue.push({request, deferred})
return deferred
}
}