Improve channel management and chunked retrieval of events

This commit is contained in:
Jonathan Staab 2022-11-29 10:06:51 -08:00
parent 74addf6676
commit f36ee418eb
11 changed files with 322 additions and 121 deletions

2
.ackrc
View File

@ -1 +1,3 @@
--ignore-dir=node_modules
--ignore-dir=dist
--ignore-file=match:package-lock.json

View File

@ -1,8 +1,6 @@
Bugs
- [ ] User detail notes are backwards
- [ ] Improve data loading. Ditch nostr-tools, or use eose. Maybe load one event at a time using timestamps? Relies on ordering.
- [ ] Pagination
- [ ] Don't lose scroll position when opening modals
- [ ] Optimistically load events the user publishes (e.g. to reduce reflow for reactions/replies). Essentially, we can pretend to be our own in-memory relay.
Features

BIN
package-lock.json generated

Binary file not shown.

View File

@ -28,7 +28,7 @@
"dexie": "^3.2.2",
"fuse.js": "^6.6.2",
"hurdak": "github:ConsignCloud/hurdak",
"nostr-tools": "^0.24.1",
"nostr-tools": "github:fiatjaf/nostr-tools#1b798b2",
"ramda": "^0.28.0",
"svelte-routing": "^1.6.0",
"throttle-debounce": "^5.0.0",

View File

@ -1,25 +1,31 @@
<script>
import {onMount} from 'svelte'
import {find, propEq} from 'ramda'
import {findNotes} from "src/state/app"
import {notesCursor} from "src/state/app"
import {user} from "src/state/user"
import Note from 'src/partials/Note.svelte'
export let note
onMount(() => {
const start = findNotes(
let onScroll
onMount(async () => {
const cursor = await notesCursor(
[{ids: [note.id]},
{'#e': [note.id]},
// We can't target reaction deletes by e tag, so get them
// all so we can support toggling like/flags for our user
{kinds: [5], authors: $user ? [$user.pubkey] : []}],
$notes => {
note = find(propEq('id', note.id), $notes) || note
}
{isInModal: true}
)
return start()
cursor.notes.subscribe($notes => {
note = find(propEq('id', note.id), $notes) || note
})
onScroll = cursor.onScroll
return cursor.unsub
})
</script>

View File

@ -4,27 +4,29 @@
import Anchor from "src/partials/Anchor.svelte"
import Note from "src/partials/Note.svelte"
import {relays} from "src/state/nostr"
import {findNotesAndWatchModal} from "src/state/app"
import {notesCursor} from "src/state/app"
let notes
let onScroll
const createNote = () => {
navigate("/notes/new")
}
onMount(() => {
return findNotesAndWatchModal({
limit: 100,
}, $notes => {
if ($notes.length) {
notes = $notes
}
})
onMount(async () => {
const cursor = await notesCursor({kinds: [1]}, {showParents: true})
notes = cursor.notes
onScroll = cursor.onScroll
return cursor.unsub
})
</script>
<svelte:window on:scroll={onScroll} />
<ul class="py-8 flex flex-col gap-2 max-w-xl m-auto">
{#each (notes || []) as n (n.id)}
{#each (notes ? $notes : []) as n (n.id)}
<li class="border-l border-solid border-medium">
<Note interactive note={n} />
{#each n.replies as r (r.id)}

View File

@ -2,7 +2,6 @@
import {fly} from 'svelte/transition'
import {fuzzy} from "src/util/misc"
import Input from "src/partials/Input.svelte"
import Anchor from "src/partials/Anchor.svelte"
import {dispatch} from "src/state/dispatch"
import {relays, knownRelays} from "src/state/nostr"
import {modal} from "src/state/app"

View File

@ -1,27 +1,25 @@
<script>
import {onMount} from 'svelte'
import {reverse} from 'ramda'
import {fly} from 'svelte/transition'
import Note from "src/partials/Note.svelte"
import {user as currentUser} from 'src/state/user'
import {accounts, findNotesAndWatchModal} from "src/state/app"
import {accounts, notesCursor} from "src/state/app"
export let pubkey
let user
let notes
let onScroll
$: user = $accounts[pubkey]
onMount(() => {
return findNotesAndWatchModal({
authors: [pubkey],
limit: 100,
}, $notes => {
if ($notes.length) {
notes = $notes
}
})
onMount(async () => {
const cursor = await notesCursor({authors: [pubkey]})
notes = cursor.notes
onScroll = cursor.onScroll
return cursor.unsub
})
</script>
@ -47,7 +45,7 @@
</div>
<div class="h-px bg-medium" in:fly={{y: 20, delay: 200}} />
<ul class="flex flex-col -mt-4" in:fly={{y: 20, delay: 400}}>
{#each reverse(notes || []) as n (n.id)}
{#each (notes ? $notes : []) as n (n.id)}
<li class="border-l border-solid border-medium pb-2">
<Note interactive note={n} />
{#each n.replies as r (r.id)}
@ -56,8 +54,6 @@
</div>
{/each}
</li>
{:else}
<li class="py-4">This user hasn't posted any notes.</li>
{/each}
</ul>
</div>

View File

@ -1,11 +1,11 @@
import {prop, uniq, sortBy, uniqBy, find, last, groupBy} from 'ramda'
import {prop, identity, whereEq, reverse, uniq, sortBy, uniqBy, find, last, pluck, groupBy} from 'ramda'
import {debounce} from 'throttle-debounce'
import {writable, derived, get} from 'svelte/store'
import {writable, get} from 'svelte/store'
import {navigate} from "svelte-routing"
import {switcherFn, ensurePlural} from 'hurdak/lib/hurdak'
import {getLocalJson, setLocalJson, now, timedelta} from "src/util/misc"
import {switcherFn} from 'hurdak/lib/hurdak'
import {getLocalJson, setLocalJson, now, timedelta, sleep} from "src/util/misc"
import {user} from 'src/state/user'
import {channels, relays} from 'src/state/nostr'
import {_channels, filterMatches, Cursor, channels, relays, findReply} from 'src/state/nostr'
export const modal = writable(null)
@ -30,7 +30,7 @@ export const logout = () => {
}, 200)
}
// Utils
// Accounts
export const ensureAccounts = async (pubkeys, {force = false} = {}) => {
const $accounts = get(accounts)
@ -61,105 +61,168 @@ export const ensureAccounts = async (pubkeys, {force = false} = {}) => {
user.update($user => ({...$user, ...get(accounts)[$user.pubkey]}))
}
export const findNotes = (filters, cb) => {
const start = () => {
const notes = writable([])
const reactions = writable([])
// Notes
let pubkeys = []
const refreshAccounts = debounce(300, () => {
ensureAccounts(uniq(pubkeys))
pubkeys = []
export const annotateNotesChunk = async (chunk, {showParents = false} = {}) => {
if (showParents) {
// Find parents of replies to provide context
const parents = await _channels.getter.all({
kinds: [1],
ids: chunk.map(findReply).filter(identity),
})
const closeRequest = channels.watcher.sub({
filter: ensurePlural(filters).map(q => ({kinds: [1, 5, 7], ...q})),
cb: e => {
// Chunk requests to load accounts
pubkeys.push(e.pubkey)
refreshAccounts()
// Remove replies, show parents instead
chunk = parents
.concat(chunk.filter(e => !find(whereEq({id: findReply(e)}), parents)))
}
switcherFn(e.kind, {
1: () => {
notes.update($xs => uniqBy(prop('id'), $xs.concat(e)))
},
5: () => {
const ids = e.tags.map(t => t[1])
if (chunk.length === 0) {
return chunk
}
notes.update($xs => $xs.filter(({id}) => !id.includes(ids)))
reactions.update($xs => $xs.filter(({id}) => !id.includes(ids)))
},
7: () => {
reactions.update($xs => $xs.concat(e))
},
})
const replies = await _channels.getter.all({
kinds: [1],
'#e': pluck('id', chunk),
})
const reactions = await _channels.getter.all({
kinds: [7],
'#e': pluck('id', chunk.concat(replies)),
})
const repliesById = groupBy(
n => find(t => last(t) === 'reply', n.tags)[1],
replies.filter(n => n.tags.map(last).includes('reply'))
)
const reactionsById = groupBy(
n => find(t => last(t) === 'reply', n.tags)[1],
reactions.filter(n => n.tags.map(last).includes('reply'))
)
await ensureAccounts(uniq(pluck('pubkey', chunk.concat(replies).concat(reactions))))
const $accounts = get(accounts)
const annotate = e => ({
...e,
user: $accounts[e.pubkey],
replies: (repliesById[e.id] || []).map(reply => annotate(reply)),
reactions: (reactionsById[e.id] || []).map(reaction => annotate(reaction)),
})
return reverse(sortBy(prop('created'), chunk.map(annotate)))
}
export const notesCursor = async (
filter,
{
showParents = false,
delta = timedelta(1, 'hours'),
isInModal = false,
} = {}
) => {
const cursor = new Cursor(filter, delta)
const notes = writable([])
const addChunk = chunk => {
notes.update($notes => uniqBy(prop('id'), $notes.concat(chunk)))
}
const unsub = await _channels.listener.sub(
{kinds: [1, 5, 7], since: now()},
e => switcherFn(e.kind, {
1: async () => {
if (filterMatches(filter, e)) {
addChunk(await annotateNotesChunk([e], {showParents}))
}
},
})
5: () => {
const ids = e.tags.map(t => t[1])
const annotatedNotes = derived(
[notes, reactions, accounts],
([$notes, $reactions, $accounts]) => {
const repliesById = groupBy(
n => find(t => last(t) === 'reply', n.tags)[1],
$notes.filter(n => n.tags.map(last).includes('reply'))
notes.update($notes =>
$notes
.filter(e => !ids.includes(e.id))
.map(n => ({
...n,
replies: n.replies.filter(e => !ids.includes(e.id)),
reactions: n.reactions.filter(e => !ids.includes(e.id)),
}))
)
},
7: () => {
const id = findReply(e)
const reactionsById = groupBy(
n => find(t => last(t) === 'reply', n.tags)[1],
$reactions.filter(n => n.tags.map(last).includes('reply'))
notes.update($notes =>
$notes
.map(n => {
if (n.id === id) {
return {...n, reactions: [...n.reactions, e]}
}
return {
...n,
replies: n.replies.map(r => {
if (r.id === id) {
return {...r, reactions: [...r.reactions, e]}
}
return r
}),
}
})
)
const annotate = n => ({
...n,
user: $accounts[n.pubkey],
replies: (repliesById[n.id] || []).map(reply => annotate(reply)),
reactions: (reactionsById[n.id] || []).map(reaction => annotate(reaction)),
})
return sortBy(prop('created'), $notes.map(annotate))
}
)
})
)
const unsubscribe = annotatedNotes.subscribe(debounce(100, cb))
const loadChunk = async () => {
const chunk = await annotateNotesChunk(await cursor.chunk(), {showParents})
return () => {
unsubscribe()
addChunk(chunk)
closeRequest()
// If we have an empty chunk, increase our step size so we can get back to where
// we might have old events. Once we get a chunk, knock it down to the default again
if (chunk.length === 0) {
cursor.delta = Math.min(timedelta(30, 'days'), cursor.delta * 2)
} else {
cursor.delta = delta
}
}
// Allow caller to suspend/restart the subscription
return start
}
const onScroll = debounce(1000, async () => {
/* eslint no-constant-condition: 0 */
while (true) {
// If a modal opened up, wait for them to close it
if (!isInModal && get(modal)) {
await sleep(1000)
export const findNotesAndWatchModal = (filters, cb) => {
const start = findNotes(filters, cb)
continue
}
let stop = start()
// While we have empty space, fill it
if (window.scrollY + window.innerHeight * 3 < document.body.scrollHeight) {
break
}
// Suspend our subscription while we have note detail open
// so we can avoid exceeding our concurrent subscription limit
const unsub = modal.subscribe($modal => {
if ($modal) {
stop && stop()
stop = null
} else if (!stop) {
// Wait for animations to complete
setTimeout(
() => {
stop = start()
},
600
)
// If we've gone back to the network's inception we're done
if (cursor.since <= 1633046400) {
break
}
await loadChunk()
}
})
return () => {
stop && stop()
unsub()
onScroll()
return {
notes,
onScroll,
unsub: () => {
cursor.stop()
unsub()
},
}
}

View File

@ -1,12 +1,145 @@
import {writable} from 'svelte/store'
import {debounce} from 'throttle-debounce'
import {relayPool, getPublicKey} from 'nostr-tools'
import {last, uniqBy, prop} from 'ramda'
import {first} from 'hurdak/lib/hurdak'
import {getLocalJson, setLocalJson} from "src/util/misc"
import {last, intersection, uniqBy, prop} from 'ramda'
import {first, noop, ensurePlural} from 'hurdak/lib/hurdak'
import {getLocalJson, setLocalJson, now, timedelta} from "src/util/misc"
export const nostr = relayPool()
export const filterTags = (where, events) =>
ensurePlural(events)
.flatMap(
e => e.tags.filter(t => {
if (where.tag && where.tag !== t[0]) {
return false
}
if (where.type && where.type !== last(t)) {
return false
}
return true
}).map(t => t[1])
)
export const findTag = (where, events) => first(filterTags(where, events))
// Support the deprecated version where tags are marked as replies
export const findReply = e =>
findTag({tag: "e", type: "reply"}, e) || findTag({tag: "e"}, e)
export const filterMatches = (filter, e) => {
return Boolean(find(
f => {
return (
(!f.ids || f.ids.includes(e.id))
&& (!f.authors || f.authors.includes(e.pubkey))
&& (!f.kinds || f.kinds.includes(e.kind))
&& (!f['#e'] || intersection(f['#e'], e.tags.filter(t => t[0] === 'e').map(t => t[1])))
&& (!f['#p'] || intersection(f['#p'], e.tags.filter(t => t[0] === 'p').map(t => t[1])))
&& (!f.since || f.since >= e.created_at)
&& (!f.until || f.until <= e.created_at)
)
},
ensurePlural(filter)
))
}
export class Channel {
constructor(name) {
this.name = name
this.p = Promise.resolve()
}
sub(filter, cb, onEose = noop) {
this.p = this.p.then(() => {
const sub = nostr.sub({filter, cb}, this.name, onEose)
return () => sub.unsub()
})
return this.p
}
all(filter) {
/* eslint no-async-promise-executor: 0 */
return new Promise(async resolve => {
const result = []
const unsub = await this.sub(
filter,
e => result.push(e),
r => {
unsub()
resolve(result)
},
)
})
}
}
export const _channels = {
listener: new Channel('listener'),
getter: new Channel('getter'),
}
// We want to get old events, then listen for new events, then potentially retrieve
// older events again for pagination. Since we have to limit channels to 3 per nip 01,
// this requires us to unsubscribe and re-subscribe frequently
export class Cursor {
constructor(filter, delta = timedelta(1, 'hours')) {
this.filter = ensurePlural(filter)
this.delta = delta
this.since = now() - delta
this.until = now()
this.unsub = null
this.q = []
this.p = Promise.resolve()
}
async start() {
if (!this.unsub) {
this.unsub = await _channels.getter.sub(
this.filter.map(f => ({...f, since: this.since, until: this.until})),
e => this.onEvent(e),
r => this.onEose(r)
)
}
}
stop() {
if (this.unsub) {
this.unsub()
this.unsub = null
}
}
restart() {
this.stop()
this.start()
}
step() {
this.since -= this.delta
this.restart()
}
onEvent(e) {
this.until = e.created_at - 1
this.q.push(e)
}
onEose() {
this.stop()
}
async chunk() {
this.step()
/* eslint no-constant-condition: 0 */
while (true) {
await new Promise(requestAnimationFrame)
if (!this.unsub) {
return this.q.splice(0)
}
}
}
}
// Track who is subscribing, so we don't go over our limit
const channel = name => {

View File

@ -47,3 +47,5 @@ export const formatTimestamp = ts => {
return formatter.format(new Date(ts * 1000))
}
export const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))