From 2c9ff7bac06fd14a37ea928d362873aba2407c13 Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Fri, 10 Feb 2023 20:58:50 -0600 Subject: [PATCH] Re-write data fetching to support lazily streaming in event context --- README.md | 8 +- src/agent/helpers.ts | 5 +- src/agent/network.ts | 138 +++++++++++++++++++-------------- src/agent/pool.ts | 92 ++++++++++++++++------ src/app/alerts.js | 16 ++-- src/app/index.ts | 55 +++---------- src/app/messages.js | 6 +- src/partials/Note.svelte | 41 +++++++--- src/partials/Notes.svelte | 88 ++++++++++++++++----- src/routes/Alerts.svelte | 4 +- src/routes/ChatRoom.svelte | 6 +- src/routes/Messages.svelte | 6 +- src/util/misc.ts | 2 +- src/util/types.ts | 13 ++++ src/views/NoteDetail.svelte | 37 ++++----- src/views/notes/Global.svelte | 25 +----- src/views/notes/Network.svelte | 32 +------- src/views/person/Likes.svelte | 23 +----- src/views/person/Notes.svelte | 25 +----- 19 files changed, 329 insertions(+), 293 deletions(-) diff --git a/README.md b/README.md index 396abae2..a68386dc 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,9 @@ If you like Coracle and want to support its development, you can donate sats via - [ ] Support relay auth - [ ] Support invoices, tips, zaps https://twitter.com/jb55/status/1604131336247476224 - [ ] Separate settings for read, write, and broadcast relays based on NIP 65 -- [ ] Release to android with https://svelte-native.technology/docs +- [ ] Release to android + - https://svelte-native.technology/docs + - https://ionic.io/blog/capacitor-everything-youve-ever-wanted-to-know - [ ] Add no-relay gossip - Capture certain events in a local db - File import/export from db, NFC transfer @@ -82,6 +84,10 @@ If you like Coracle and want to support its development, you can donate sats via - [ ] Likes list - [ ] Fix anon/new user experience - [ ] Stream likes rather than load, they're probably what is slowing things down. Figure out how to multiplex them, or store them in the database by count + - Stream likes and replies, only load parent up front + - Show full thread on detail view - fetch parent, load all descendants, highlight anchor +- [ ] Likes are slow +- [ ] Show loading on replies/notes # Changelog diff --git a/src/agent/helpers.ts b/src/agent/helpers.ts index 7a51d0d9..d29dea4c 100644 --- a/src/agent/helpers.ts +++ b/src/agent/helpers.ts @@ -1,4 +1,4 @@ -import type {Person} from 'src/util/types' +import type {Person, MyEvent} from 'src/util/types' import type {Readable} from 'svelte/store' import {isEmpty, pick, identity, sortBy, uniq, reject, groupBy, last, propEq, uniqBy, prop} from 'ramda' import {first} from 'hurdak/lib/hurdak' @@ -70,6 +70,9 @@ export const getEventRelays = event => { ) } +export const getTopRelaysFromEvents = (events: Array) => + uniqBy(prop('url'), events.map(e => getBestRelay(e.pubkey) || {url: e.seen_on})) + export const getStalePubkeys = pubkeys => { // If we're not reloading, only get pubkeys we don't already know about return uniq(pubkeys).filter(pubkey => { diff --git a/src/agent/network.ts b/src/agent/network.ts index ab273cb2..35f60000 100644 --- a/src/agent/network.ts +++ b/src/agent/network.ts @@ -1,7 +1,9 @@ -import {uniqBy, prop, uniq, flatten, pluck, identity} from 'ramda' -import {ensurePlural, createMap, chunk} from 'hurdak/lib/hurdak' -import {findReply, personKinds, Tags} from 'src/util/nostr' -import {getFollows, getStalePubkeys} from 'src/agent/helpers' +import type {MyEvent} from 'src/util/types' +import {uniq, uniqBy, prop, map, propEq, indexBy, pluck} from 'ramda' +import {findReply, personKinds, findReplyId, Tags} from 'src/util/nostr' +import {chunk} from 'hurdak/lib/hurdak' +import {batch} from 'src/util/misc' +import {getFollows, getStalePubkeys, getTopRelaysFromEvents} from 'src/agent/helpers' import pool from 'src/agent/pool' import keys from 'src/agent/keys' import sync from 'src/agent/sync' @@ -25,20 +27,37 @@ const load = async (relays, filter, opts?): Promise[]> = return events } -const listen = (relays, filter, onEvent, {shouldProcess = true}: any = {}) => { +const listen = (relays, filter, onEvents, {shouldProcess = true}: any = {}) => { return pool.subscribe(relays, filter, { - onEvent: e => { + onEvent: batch(300, events => { if (shouldProcess) { - sync.processEvents(e) + sync.processEvents(events) } - if (onEvent) { - onEvent(e) + if (onEvents) { + onEvents(events) } - }, + }), }) } +const listenUntilEose = (relays, filter, onEvents, {shouldProcess = true}: any = {}) => { + return new Promise(resolve => { + pool.subscribeUntilEose(relays, filter, { + onClose: () => resolve(), + onEvent: batch(300, events => { + if (shouldProcess) { + sync.processEvents(events) + } + + if (onEvents) { + onEvents(events) + } + }), + }) + }) as Promise +} + const loadPeople = (relays, pubkeys, {kinds = personKinds, force = false, ...opts} = {}) => { pubkeys = uniq(pubkeys) @@ -59,57 +78,58 @@ const loadNetwork = async (relays, pubkey) => { await loadPeople(tags.relays(), tags.values().all()) } -const loadContext = async (relays, notes, {loadParents = false, depth = 0, ...opts}: any = {}) => { - notes = ensurePlural(notes) +const loadParents = (relays, notes) => { + const parentIds = new Set(Tags.wrap(notes.map(findReply)).values().all()) - if (notes.length === 0) { - return notes + if (parentIds.size === 0) { + return [] } - return flatten(await Promise.all( - chunk(256, notes).map(async chunk => { - const chunkIds = pluck('id', chunk) - const authors = getStalePubkeys(pluck('pubkey', chunk)) - const parentTags = uniq(chunk.map(findReply).filter(identity)) - const parentIds = Tags.wrap(parentTags).values().all() - const combinedRelays = uniq(relays.concat(Tags.wrap(parentTags).relays())) - const filter = [{kinds: [1, 7], '#e': chunkIds} as object] - - if (authors.length > 0) { - filter.push({kinds: personKinds, authors}) - } - - if (loadParents && parentTags.length > 0) { - filter.push({kinds: [1], ids: parentIds}) - } - - let events = await load(combinedRelays, filter, opts) - - // Find children, but only if we didn't already get them - const children = events.filter(e => e.kind === 1) - const childRelays = relays.concat(Tags.from(children).relays()) - - if (depth > 0 && children.length > 0) { - events = events.concat( - await loadContext(childRelays, children, {depth: depth - 1, ...opts}) - ) - } - - if (loadParents && parentIds.length > 0) { - const eventsById = createMap('id', events) - const parents = parentIds.map(id => eventsById[id]).filter(identity) - const parentRelays = relays.concat(Tags.from(parents).relays()) - - events = events.concat(await loadContext(parentRelays, parents, opts)) - } - - // Load missing people from replies etc - await loadPeople(relays, pluck('pubkey', events)) - - // We're recurring and so may end up with duplicates here - return uniqBy(prop('id'), events) - }) - )) + return load( + relays.concat(getTopRelaysFromEvents(notes)), + {kinds: [1], ids: Array.from(parentIds)} + ) +} + +const streamContext = ({relays, notes, updateNotes, depth = 0}) => { + // Some relays reject very large filters, send multiple + chunk(256, notes).forEach(chunk => { + const authors = getStalePubkeys(pluck('pubkey', chunk)) + const filter = [ + {kinds: [1, 7], '#e': pluck('id', chunk)}, + {kinds: personKinds, authors}, + ] + + // Load authors and reactions in one subscription + listenUntilEose(relays, filter, events => { + const repliesByParentId = indexBy(findReplyId, events.filter(propEq('kind', 1))) + const reactionsByParentId = indexBy(findReplyId, events.filter(propEq('kind', 7))) + + // Recur if we need to + if (depth > 0) { + streamContext({relays, notes: events, updateNotes, depth: depth - 1}) + } + + const annotate = ({replies = [], reactions = [], children = [], ...note}) => { + if (depth > 0) { + children = uniqBy(prop('id'), children.concat(replies)) + } + + return { + ...note, + replies: uniqBy(prop('id'), replies.concat(repliesByParentId[note.id] || [])), + reactions: uniqBy(prop('id'), reactions.concat(reactionsByParentId[note.id] || [])), + children: children.map(annotate), + } + } + + updateNotes(map(annotate)) + }) + }) +} + +export default { + publish, load, listen, listenUntilEose, loadNetwork, loadPeople, personKinds, + loadParents, streamContext, } -export default {publish, load, listen, loadNetwork, loadPeople, personKinds, loadContext} diff --git a/src/agent/pool.ts b/src/agent/pool.ts index 2fb32cac..fadb6943 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -1,4 +1,5 @@ import type {Relay} from 'nostr-tools' +import type {MyEvent} from 'src/util/types' import {relayInit} from 'nostr-tools' import {uniqBy, prop, find, is} from 'ramda' import {ensurePlural} from 'hurdak/lib/hurdak' @@ -108,8 +109,10 @@ const describeFilter = ({kinds = [], ...filter}) => { return '(' + parts.join(',') + ')' } +const normalizeRelays = relays => uniqBy(prop('url'), relays.filter(r => isRelay(r.url))) + const subscribe = async (relays, filters, {onEvent, onEose}: Record void>) => { - relays = uniqBy(prop('url'), relays.filter(r => isRelay(r.url))) + relays = normalizeRelays(relays) filters = ensurePlural(filters) // Create a human readable subscription id for debugging @@ -118,15 +121,17 @@ const subscribe = async (relays, filters, {onEvent, onEose}: Record { const conn = await connect(relay.url) // If the relay failed to connect, give up - if (!conn) { + if (!conn || conn.status === 'closed') { return null } @@ -137,13 +142,25 @@ const subscribe = async (relays, filters, {onEvent, onEose}: Record onEose(conn.nostr.url)) + sub.on('eose', () => { + onEose(conn.nostr.url) + + // Keep track of relay timing stats, but only for the first eose we get + if (!eose.has(conn.nostr.url)) { + eose.add(conn.nostr.url) + + conn.stats.count += 1 + conn.stats.timer += Date.now() - now + } + }) } conn.stats.activeCount += 1 @@ -172,23 +189,61 @@ const subscribe = async (relays, filters, {onEvent, onEose}: Record) => void, + onEose?: (url: string) => void, + onClose?: () => void, + timeout?: number + } +) => { + relays = normalizeRelays(relays) + + const now = Date.now() + const eose = new Set() + + const attemptToComplete = () => { + if (eose.size === relays.length || Date.now() - now >= timeout) { + onClose?.() + agg.unsub() + } + } + + // If a relay takes too long, give up + setTimeout(attemptToComplete, timeout) + + const agg = await subscribe(relays, filters, { + onEvent, + onEose: url => { + onEose?.(url) + attemptToComplete() + }, + }) + + return agg +} + const request = (relays, filters, {threshold = 0.5} = {}): Promise[]> => { return new Promise(async resolve => { - relays = uniqBy(prop('url'), relays.filter(r => isRelay(r.url))) + relays = normalizeRelays(relays) threshold = relays.length * threshold const now = Date.now() const relaysWithEvents = new Set() const events = [] - const eose = [] + const eose = new Set() const attemptToComplete = () => { - const allEose = eose.length === relays.length - const atThreshold = eose.filter(url => relaysWithEvents.has(url)).length >= threshold + const allEose = eose.size === relays.length + const atThreshold = Array.from(eose) + .filter(url => relaysWithEvents.has(url)).length >= threshold + const hardTimeout = Date.now() - now >= 5000 const softTimeout = ( Date.now() - now >= 1000 - && eose.length > relays.length - Math.round(relays.length / 10) + && eose.size > relays.length - Math.round(relays.length / 10) ) if (allEose || atThreshold || hardTimeout || softTimeout) { @@ -206,22 +261,13 @@ const request = (relays, filters, {threshold = 0.5} = {}): Promise { - if (!eose.includes(url)) { - eose.push(url) - - const conn = findConnection(url) - - // Keep track of relay timing stats - if (conn) { - conn.stats.count += 1 - conn.stats.timer += Date.now() - now - } - } - + eose.add(url) attemptToComplete() }, }) }) } -export default {getConnections, findConnection, connect, publish, subscribe, request} +export default { + getConnections, findConnection, connect, publish, subscribe, subscribeUntilEose, request, +} diff --git a/src/app/alerts.js b/src/app/alerts.js index c9db6005..5f186d7d 100644 --- a/src/app/alerts.js +++ b/src/app/alerts.js @@ -1,11 +1,11 @@ import {get} from 'svelte/store' import {groupBy, pluck, partition, propEq} from 'ramda' import {createMap} from 'hurdak/lib/hurdak' -import {synced, timedelta, batch, now} from 'src/util/misc' +import {synced, timedelta, now} from 'src/util/misc' import {isAlert, findReplyId} from 'src/util/nostr' import database from 'src/agent/database' import network from 'src/agent/network' -import {annotate} from 'src/app' +import {asDisplayEvent, mergeParents} from 'src/app' let listener @@ -16,13 +16,13 @@ const onChunk = async (relays, pubkey, events) => { events = events.filter(e => isAlert(e, pubkey)) if (events.length > 0) { - const context = await network.loadContext(relays, events) + const parents = await network.loadParents(relays, events) const [likes, notes] = partition(propEq('kind', 7), events) - const annotatedNotes = notes.map(n => annotate(n, context)) + const annotatedNotes = mergeParents(notes.concat(parents).map(asDisplayEvent)) const likesByParent = groupBy(findReplyId, likes) - const likedNotes = context + const likedNotes = parents .filter(e => likesByParent[e.id]) - .map(e => annotate({...e, likedBy: pluck('pubkey', likesByParent[e.id])}, context)) + .map(e => asDisplayEvent({...e, likedBy: pluck('pubkey', likesByParent[e.id])})) await database.alerts.bulkPut(createMap('id', annotatedNotes.concat(likedNotes))) @@ -52,9 +52,9 @@ const listen = async (relays, pubkey) => { listener = await network.listen( relays, {kinds: [1, 7], '#p': [pubkey], since: now()}, - batch(300, events => { + events => { onChunk(relays, pubkey, events) - }) + } ) } diff --git a/src/app/index.ts b/src/app/index.ts index e92668fe..b5fc5538 100644 --- a/src/app/index.ts +++ b/src/app/index.ts @@ -1,5 +1,5 @@ -import type {Person} from 'src/util/types' -import {pluck, whereEq, sortBy, identity, when, assoc, reject} from 'ramda' +import type {Person, DisplayEvent} from 'src/util/types' +import {groupBy, whereEq, identity, when, assoc, reject} from 'ramda' import {navigate} from 'svelte-routing' import {createMap, ellipsize} from 'hurdak/lib/hurdak' import {get} from 'svelte/store' @@ -124,46 +124,15 @@ export const renderNote = (note, {showEntire = false}) => { return content } -export const annotate = (note, context) => { - const reactions = context.filter(e => e.kind === 7 && findReplyId(e) === note.id) - const replies = context.filter(e => e.kind === 1 && findReplyId(e) === note.id) +export const asDisplayEvent = event => + ({children: [], replies: [], reactions: [], ...event}) - return { - ...note, reactions, - person: database.people.get(note.pubkey), - replies: sortBy(e => e.created_at, replies).map(r => annotate(r, context)), - } -} - -export const threadify = (events, context, {muffle = [], showReplies = true} = {}) => { - const contextById = createMap('id', events.concat(context)) - - // Show parents when possible. For reactions, if there's no parent, - // throw it away. Sort by created date descending - const notes = sortBy( - e => -e.created_at, - events - .map(e => contextById[findReplyId(e)] || (e.kind === 1 ? e : null)) - .filter(e => e && !muffle.includes(e.pubkey)) - ) - - if (!showReplies) { - return notes.filter(note => !findReplyId(note)).map(n => annotate(n, context)) - } - - // Don't show notes that will also show up as children - const noteIds = new Set(pluck('id', notes)) - - // Annotate our feed with parents, reactions, replies. - return notes - .filter(note => !noteIds.has(findReplyId(note))) - .map(note => { - let parent = contextById[findReplyId(note)] - - if (parent) { - parent = annotate(parent, context) - } - - return annotate({...note, parent}, context) - }) +export const mergeParents = (notes: Array) => { + const m = createMap('id', notes) + + return Object.entries(groupBy(findReplyId, notes)) + // Substiture parent and add notes as children + .flatMap(([p, children]) => m[p] ? [{...m[p], children}] : children) + // Remove replies where we failed to find a parent + .filter((note: DisplayEvent) => !findReplyId(note) || note.children.length > 0) } diff --git a/src/app/messages.js b/src/app/messages.js index e023cc8d..1553a4e7 100644 --- a/src/app/messages.js +++ b/src/app/messages.js @@ -1,6 +1,6 @@ import {pluck, reject} from 'ramda' import {get} from 'svelte/store' -import {synced, now, timedelta, batch} from 'src/util/misc' +import {synced, now, timedelta} from 'src/util/misc' import {user} from 'src/agent/helpers' import database from 'src/agent/database' import network from 'src/agent/network' @@ -20,7 +20,7 @@ const listen = async (relays, pubkey) => { relays, [{kinds: [4], authors: [pubkey], since}, {kinds: [4], '#p': [pubkey], since}], - batch(300, async events => { + async events => { const $user = get(user) // Reload annotated messages, don't alert about messages to self @@ -39,7 +39,7 @@ const listen = async (relays, pubkey) => { return o }) } - }) + } ) } diff --git a/src/partials/Note.svelte b/src/partials/Note.svelte index 38a5caff..fd3e73ad 100644 --- a/src/partials/Note.svelte +++ b/src/partials/Note.svelte @@ -2,10 +2,11 @@ import cx from 'classnames' import {nip19} from 'nostr-tools' import {whereEq, without, uniq, pluck, reject, propEq, find} from 'ramda' + import {tweened} from 'svelte/motion' import {slide} from 'svelte/transition' import {navigate} from 'svelte-routing' import {quantify} from 'hurdak/lib/hurdak' - import {Tags, findReply, findReplyId, displayPerson, isLike} from "src/util/nostr" + import {Tags, findReply, findRoot, findReplyId, displayPerson, isLike} from "src/util/nostr" import {extractUrls} from "src/util/html" import Preview from 'src/partials/Preview.svelte' import Anchor from 'src/partials/Anchor.svelte' @@ -19,7 +20,6 @@ import cmd from 'src/agent/cmd' export let note - export let depth = 0 export let anchorId = null export let showParent = true export let invertColors = false @@ -38,6 +38,11 @@ let likes, flags, like, flag + const interpolate = (a, b) => t => a + Math.round((b - a) * t) + const likesCount = tweened(0, {interpolate}) + const flagsCount = tweened(0, {interpolate}) + const repliesCount = tweened(0, {interpolate}) + $: { likes = note.reactions.filter(n => isLike(n.content)) flags = note.reactions.filter(whereEq({content: '-'})) @@ -46,6 +51,10 @@ $: like = find(whereEq({pubkey: $user?.pubkey}), likes) $: flag = find(whereEq({pubkey: $user?.pubkey}), flags) + $: $likesCount = likes.length + $: $flagsCount = flags.length + $: $repliesCount = note.replies.length + const onClick = e => { const target = e.target as HTMLElement @@ -61,6 +70,13 @@ modal.set({type: 'note/detail', note: {id}, relays}) } + const goToRoot = async () => { + const [id, url] = findRoot(note).slice(1) + const relays = getEventRelays(note).concat({url}) + + modal.set({type: 'note/detail', note: {id}, relays}) + } + const react = async content => { if (!$user) { return navigate('/login') @@ -168,6 +184,11 @@ Reply to {findReplyId(note).slice(0, 8)} {/if} + {#if findRoot(note) && findRoot(note) !== findReply(note) && showParent} + + Go to root + + {/if} {#if flag}

You have flagged this content as offensive. @@ -185,15 +206,15 @@

e.stopPropagation()}>
{/if} @@ -230,17 +251,15 @@ {/if} -{#if depth > 0}
- {#if !showEntire && note.replies.length > 3} + {#if !showEntire && note.children.length > 3} {/if} - {#each note.replies.slice(showEntire ? 0 : -3) as r (r.id)} - + {#each note.children.slice(showEntire ? 0 : -3) as r (r.id)} + {/each}
{/if} -{/if} diff --git a/src/partials/Notes.svelte b/src/partials/Notes.svelte index 33340fba..c1099927 100644 --- a/src/partials/Notes.svelte +++ b/src/partials/Notes.svelte @@ -1,40 +1,86 @@ - - {#if newNotes.length > 0} + {#if notesBuffer.length > 0} {/if}
{#each notes as note (note.id)} - + {/each}
diff --git a/src/routes/Alerts.svelte b/src/routes/Alerts.svelte index fd9e6afb..f533e236 100644 --- a/src/routes/Alerts.svelte +++ b/src/routes/Alerts.svelte @@ -8,7 +8,7 @@ import Content from 'src/partials/Content.svelte' import Like from 'src/partials/Like.svelte' import database from 'src/agent/database' - import {alerts} from 'src/app' + import {alerts, asDisplayEvent} from 'src/app' let limit = 0 let notes = null @@ -21,7 +21,7 @@ const events = await database.alerts.all() - notes = sortBy(e => -e.created_at, events).slice(0, limit) + notes = sortBy(e => -e.created_at, events).slice(0, limit).map(asDisplayEvent) }) }) diff --git a/src/routes/ChatRoom.svelte b/src/routes/ChatRoom.svelte index 2e080654..189801d9 100644 --- a/src/routes/ChatRoom.svelte +++ b/src/routes/ChatRoom.svelte @@ -1,7 +1,7 @@ - + diff --git a/src/views/notes/Network.svelte b/src/views/notes/Network.svelte index e9dace61..bc894dad 100644 --- a/src/views/notes/Network.svelte +++ b/src/views/notes/Network.svelte @@ -1,40 +1,16 @@ - - + diff --git a/src/views/person/Likes.svelte b/src/views/person/Likes.svelte index d28f4cd6..ef883b0b 100644 --- a/src/views/person/Likes.svelte +++ b/src/views/person/Likes.svelte @@ -1,31 +1,12 @@ - + diff --git a/src/views/person/Notes.svelte b/src/views/person/Notes.svelte index c7e2a828..49c60623 100644 --- a/src/views/person/Notes.svelte +++ b/src/views/person/Notes.svelte @@ -1,31 +1,12 @@ - - +