Re-work streamContext

This commit is contained in:
Jonathan Staab 2023-04-05 10:40:23 -05:00
parent 6722131c8f
commit ab1c2cb210
3 changed files with 94 additions and 70 deletions

View File

@ -129,35 +129,74 @@ const loadParents = (notes, opts = {}) => {
})
}
const streamContext = ({notes, onChunk, depth = 0}) =>
// Some relays reject very large filters, send multiple subscriptions
Promise.all(
chunk(256, notes).map(async events => {
// Instead of recurring to depth, trampoline so we can batch requests
while (events.length > 0 && depth > 0) {
const chunk = events.splice(0)
const authors = getStalePubkeys(pluck("pubkey", chunk))
const filter = [{kinds: [1, 7, 9735], "#e": pluck("id", chunk)}] as Array<object>
const relays = sampleRelays(aggregateScores(chunk.map(getRelaysForEventChildren)))
const streamContext = ({notes, onChunk, maxDepth = 2}) => {
const subs = []
const seen = new Set()
const relays = sampleRelays(aggregateScores(notes.map(getRelaysForEventChildren)))
// Load authors and reactions in one subscription
if (authors.length > 0) {
filter.push({kinds: personKinds, authors})
}
const loadChunk = (events, depth) => {
// Remove anything from the chunk we've already seen
events = events.filter(e => e.kind === 1 && !seen.has(e.id))
depth -= 1
// If we have no new information, no need to re-subscribe
if (events.length === 0) {
return
}
const promise = load({relays, filter, onChunk})
// Add our new events to the list of stuff we've seen
events.forEach(e => seen.add(e.id))
// Don't await the promise when we're on the last level, since we won't be
// displaying those replies, and we await `load` before showing children
// to reduce reflow
if (depth > 0) {
events = await promise
}
}
// Unsubscribe our current listeners since we're about to replace them
subs.map(sub => sub.then(s => s.unsub()))
// Add a subscription for each chunk to listen for new likes/replies/zaps
chunk(256, Array.from(seen)).forEach(ids => {
subs.push(
listen({
relays,
filter: [{kinds: [1, 7, 9735], "#e": ids, since: now()}],
onChunk: newEvents => {
onChunk(newEvents)
if (depth < maxDepth) {
loadChunk(newEvents, depth + 1)
}
},
})
)
})
)
const newIds = pluck("id", events)
const pubkeys = pluck("pubkey", events)
// Load any people we should know about
loadPeople(pubkeys)
// Load data prior to now for our new ids
chunk(256, newIds).forEach(ids => {
load({
relays,
filter: [{kinds: [1, 7, 9735], "#e": ids}],
onChunk: newEvents => {
onChunk(newEvents)
if (depth < maxDepth) {
loadChunk(newEvents, depth + 1)
}
},
})
})
}
// Kick things off by loading our first chunk
loadChunk(notes, 1)
return {
unsub: () => {
subs.map(sub => sub.then(s => s.unsub()))
},
}
}
const applyContext = (notes, context) => {
context = context.map(assoc("isContext", true))

View File

@ -66,7 +66,7 @@
// Stream in additional data and merge it in
network.streamContext({
depth: 2,
maxDepth: 2,
notes: combined.filter(propEq("kind", 1)),
onChunk: context => {
context = user.applyMutes(context)

View File

@ -1,12 +1,14 @@
<script>
import {onMount} from "svelte"
import {pluck, propEq} from "ramda"
import {onMount, onDestroy} from "svelte"
import {nip19} from "nostr-tools"
import {fly} from "svelte/transition"
import {first} from "hurdak/lib/hurdak"
import {log} from "src/util/logger"
import {asDisplayEvent} from "src/util/nostr"
import Content from "src/partials/Content.svelte"
import Spinner from "src/partials/Spinner.svelte"
import Note from "src/views/notes/Note.svelte"
import user from "src/agent/user"
import network from "src/agent/network"
import {sampleRelays} from "src/agent/relays"
@ -14,58 +16,41 @@
export let relays = []
export let invertColors = false
let found = false
let sub = null
let loading = true
let seen = new Set()
onMount(() => {
const sub = network.listen({
relays: sampleRelays(relays),
filter: [
{kinds: [1], ids: [note.id]},
{kinds: [1, 7, 9735], "#e": [note.id]},
],
onChunk: chunk => {
const children = chunk.filter(propEq("kind", 1))
onMount(async () => {
if (!note.pubkey) {
await network.load({
relays: sampleRelays(relays),
filter: {kinds: [1], ids: [note.id]},
onChunk: events => {
note = first(events)
},
})
}
// Recursively bring in context for children, since reactions etc don't
// contain the full chain of ancestors
network.streamContext({
depth: 5,
notes: children.filter(e => !seen.has(e.id)),
onChunk: childChunk => {
note = first(network.applyContext([note], childChunk))
},
})
if (note) {
log("NoteDetail", nip19.noteEncode(note.id), note)
// Keep track of the children we've seen, update latest version of our note
children.forEach(event => {
if (event.id === note.id) {
found = true
loading = false
note = {...note, ...event}
}
sub = network.streamContext({
maxDepth: 6,
notes: [note],
onChunk: context => {
note = first(network.applyContext([note], user.applyMutes(context)))
},
})
}
seen.add(event.id)
})
loading = false
})
// Load authors
network.loadPeople(pluck("pubkey", children))
// Apply context
note = first(network.applyContext([note], chunk))
},
})
setTimeout(() => {
loading = false
}, 3000)
return () => sub.then(s => s.unsub())
onDestroy(() => {
sub?.unsub()
})
</script>
{#if !loading && !found}
{#if !loading && !note.content}
<div in:fly={{y: 20}}>
<Content size="lg" class="text-center">Sorry, we weren't able to find this note.</Content>
</div>