Paginate per relay

This commit is contained in:
Jonathan Staab 2023-04-21 09:16:05 -05:00
parent 612a0d18df
commit 1719da5562
10 changed files with 154 additions and 119 deletions

View File

@ -2,10 +2,12 @@
# 0.2.25
- [x] Add purplepag.es to sign in flow
- [x] Add purplepag.es to sign in flow to speed things up
- [x] Include people with only a display_name in search
- [x] Fix AUTH over multiplextr
- [x] Remember whether messages/notifications have been read
- [x] Remember chat membership
- [x] Add new cursor implementation to paginate relays independently
## 0.2.24

View File

@ -1,10 +1,12 @@
# Current
- [ ] Add way to turn off likes/zaps
- [ ] Remember joined rooms
- [ ] Fix notifications, separate into mentions/replies and other
- [ ] Render npubs properly http://localhost:5173/nevent1qqsqqr7r9w95lvj79zpsykup2d995jqhyxdntq98tu6tsvmjuh5ak9spz3mhxue69uhhyetvv9ujuerpd46hxtnfduyyc40u
- [ ] Links/topics/mentions open modals
- [ ] Render link previews inline rather than at bottom, check NostrReport for this
- [ ] Wait for an auth challenge based on relay document to avoid missing first few REQs
- [ ] Image classification
- https://github.com/bhky/opennsfw2
- [ ] Claim relays bounty
# Core

View File

@ -1,7 +1,19 @@
import type {MyEvent} from "src/util/types"
import {without, sortBy, assoc, uniq, uniqBy, prop, propEq, groupBy, pluck} from "ramda"
import type {MyEvent, Relay} from "src/util/types"
import {
without,
mergeLeft,
fromPairs,
sortBy,
assoc,
uniq,
uniqBy,
prop,
propEq,
groupBy,
pluck,
} from "ramda"
import {personKinds, appDataKeys, findReplyId} from "src/util/nostr"
import {chunk} from "hurdak/lib/hurdak"
import {chunk, ensurePlural} from "hurdak/lib/hurdak"
import {batch, now, timedelta} from "src/util/misc"
import {
getRelaysForEventParent,
@ -97,6 +109,116 @@ const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 5
}) as Promise<MyEvent[]>
}
class Cursor {
relays: Array<Relay>
limit: number
delta?: number
until: Record<string, number>
buffer: Array<MyEvent>
seen: Set<string>
constructor({relays, limit = 20, delta = undefined}) {
this.relays = relays
this.limit = limit
this.delta = delta
this.until = fromPairs(relays.map(({url}) => [url, now()]))
this.buffer = []
this.seen = new Set()
}
async loadPage({filter, onChunk}) {
// Undo and redo batching so it works across multiple calls to load
const onEvent = batch(500, onChunk)
const untilCopy = {...this.until}
await Promise.all(
this.getGroupedRelays().map(([until, relays]) => {
const since = this.delta ? until - this.delta : 0
// If the relay gave us a bunch of stuff outside our window, hold on to
// it until it's needed, and don't request it again
this.buffer = this.buffer.filter(event => {
if (event.created_at > since) {
onEvent(event)
return false
}
return true
})
return load({
relays: relays,
filter: ensurePlural(filter).map(mergeLeft({until, since, limit: this.limit})),
onChunk: events => {
for (const event of events) {
if (event.created_at < this.until[event.seen_on]) {
this.until[event.seen_on] = event.created_at
}
if (this.seen.has(event.id)) {
continue
}
this.seen.add(event.id)
if (event.created_at < since) {
this.buffer.push(event)
} else {
onEvent(event)
}
}
},
})
})
)
// If we got zero results for any relays, they have nothing for the given window,
// back until up to since for next time
if (this.delta) {
this.relays.forEach(r => {
if (untilCopy[r.url] === this.until[r.url]) {
this.until[r.url] -= this.delta
}
})
}
}
getGroupedRelays() {
// Group relays by rounded clusters to get some benefit out of
// multiplextr despite paginating per-relay
const threshold = timedelta(5, "minutes")
const untils = this.relays.map(({url}) => this.until[url])
for (let i = 0; i < untils.length; i++) {
for (let j = i + 1; j < untils.length; j++) {
if (Math.abs(untils[j] - untils[i]) > threshold) {
continue
}
// Take the later timestamp so we don't miss anything
if (untils[i] > untils[j]) {
untils[j] = untils[i]
} else {
untils[i] = untils[j]
}
}
}
const relaysByUntil = new Map()
for (let i = 0; i < untils.length; i++) {
const until = untils[i]
const relay = this.relays[i]
if (!relaysByUntil.has(until)) {
relaysByUntil.set(until, [])
}
relaysByUntil.get(until).push(relay)
}
return Array.from(relaysByUntil.entries())
}
}
const loadPeople = async (pubkeys, {relays = null, kinds = personKinds, force = false} = {}) => {
pubkeys = uniq(pubkeys)
@ -236,6 +358,7 @@ const applyContext = (notes, context) => {
export default {
load,
listen,
Cursor,
loadPeople,
loadParents,
streamContext,

View File

@ -106,31 +106,13 @@ export default {
}
},
setLastChecked(k, v) {
profile.update($profile => {
const lastChecked = {...$profile.last_checked, [k]: v}
this.setAppData("last_checked/v1", lastChecked)
return {...$profile, last_checked: lastChecked}
})
this.setAppData("last_checked/v1", {...profileCopy.last_checked, [k]: v})
},
joinRoom(id) {
profile.update($profile => {
const roomsJoined = $profile.rooms_joined.concat(id)
this.setAppData("rooms_joined/v1", roomsJoined)
return {...$profile, rooms_joined: roomsJoined}
})
this.setAppData("rooms_joined/v1", profileCopy.rooms_joined.concat(id))
},
leaveRoom(id) {
profile.update($profile => {
const roomsJoined = without([id], $profile.rooms_joined)
this.setAppData("rooms_joined/v1", roomsJoined)
return {...$profile, rooms_joined: roomsJoined}
})
this.setAppData("rooms_joined/v1", without([id], profileCopy.rooms_joined))
},
// Petnames

View File

@ -3,7 +3,7 @@
import {last, partition, always, propEq, uniqBy, sortBy, prop} from "ramda"
import {fly} from "svelte/transition"
import {quantify} from "hurdak/lib/hurdak"
import {createScroller, now, timedelta, Cursor} from "src/util/misc"
import {createScroller, now, timedelta} from "src/util/misc"
import {asDisplayEvent, mergeFilter} from "src/util/nostr"
import Spinner from "src/partials/Spinner.svelte"
import Modal from "src/partials/Modal.svelte"
@ -29,8 +29,8 @@
// Add a short buffer so we can get the most possible results for recent notes
const since = now()
const maxNotes = 100
const cursor = new Cursor({delta})
const seen = new Set()
const cursor = new network.Cursor({relays, delta})
const getModal = () => last(document.querySelectorAll(".modal-content"))
const setFeedRelay = relay => {
@ -54,12 +54,8 @@
// Deduplicate and filter out stuff we don't want, apply user preferences
const filtered = user.applyMutes(newNotes.filter(n => !seen.has(n.id) && shouldDisplay(n)))
// Drop the oldest 20% of notes. We sometimes get pretty old stuff since we don't
// use a since on our filter
const pruned = cursor.prune(filtered)
// Keep track of what we've seen
for (const note of pruned) {
for (const note of filtered) {
seen.add(note.id)
}
@ -105,16 +101,13 @@
const loadMore = async () => {
// Wait for this page to load before trying again
await network.load({
relays: feedRelay ? [feedRelay] : relays,
filter: mergeFilter(filter, cursor.getFilter()),
await cursor.loadPage({
filter,
onChunk: chunk => {
// Stack promises to avoid too many concurrent subscriptions
p = p.then(() => onChunk(chunk))
},
})
// Update our cursor
cursor.update(notes)
}
onMount(() => {

View File

@ -17,6 +17,7 @@
const id = toHex(entity)
const room = watch("rooms", t => t.get(id) || {id})
const getRelays = () => sampleRelays($room ? getRelaysForEventChildren($room) : [])
const cursor = new network.Cursor({relays: getRelays()})
user.setLastChecked(`chat/${id}`, now())
@ -27,12 +28,8 @@
onChunk,
})
const loadMessages = (cursor, onChunk) =>
network.load({
relays: getRelays(),
filter: {kinds: [42], "#e": [id], ...cursor.getFilter()},
onChunk,
})
const loadMessages = onChunk =>
cursor.loadPage({filter: {kinds: [42], "#e": [id]}, onChunk})
const edit = () => {
modal.push({type: "room/edit", room: $room})

View File

@ -46,9 +46,7 @@
relays = urls.length > 0 ? urls.map(objOf("url")) : sampleRelays(getUserReadRelays())
}
// Separate notes and reactions into two queries since otherwise reactions dominate,
// we never find their parents (or reactions are mostly to a few posts), and the feed sucks
filter = [1, 7].map(kind => ({...filter, kinds: [kind]}))
filter = [{...filter, kinds: [1]}]
}
const setActiveTab = tab => {

View File

@ -26,6 +26,7 @@
user.setLastChecked(`dm/${pubkey}`, now())
const getRelays = () => sampleRelays(getAllPubkeyRelays([pubkey, user.getPubkey()]))
const cursor = new network.Cursor({relays: getRelays()})
const decryptMessages = async events => {
const results = []
@ -40,7 +41,7 @@
return results
}
const getFilters = extra => [
const getFilters = (extra = {}) => [
{kinds: [4], authors: [user.getPubkey()], "#p": [pubkey], ...extra},
{kinds: [4], authors: [pubkey], "#p": [user.getPubkey()], ...extra},
]
@ -52,10 +53,9 @@
onChunk: async events => onChunk(await decryptMessages(events)),
})
const loadMessages = (cursor, onChunk) =>
network.load({
relays: getRelays(),
filter: getFilters(cursor.getFilter()),
const loadMessages = onChunk =>
cursor.loadPage({
filter: getFilters(),
onChunk: async events => onChunk(await decryptMessages(events)),
})

View File

@ -3,7 +3,7 @@
import {fly} from "svelte/transition"
import {navigate} from "svelte-routing"
import {prop, max, path as getPath, reverse, pluck, uniqBy, sortBy, last} from "ramda"
import {sleep, createScroller, Cursor} from "src/util/misc"
import {sleep, createScroller} from "src/util/misc"
import Spinner from "src/partials/Spinner.svelte"
import user from "src/agent/user"
import {getPersonWithFallback} from "src/agent/db"
@ -18,7 +18,6 @@
let loading = sleep(30_000)
let annotatedMessages = []
let showNewMessages = false
let cursor = new Cursor()
$: {
// Group messages so we're only showing the person once per chunk
@ -68,12 +67,11 @@
const scroller = createScroller(
async () => {
await loadMessages(cursor, newMessages => {
await loadMessages(newMessages => {
stickToBottom(() => {
loading = sleep(30_000)
messages = sortBy(e => -e.created_at, uniqBy(prop("id"), newMessages.concat(messages)))
network.loadPeople(pluck("pubkey", newMessages))
cursor.update(messages)
})
})
},

View File

@ -2,13 +2,10 @@ import type {Writable} from "svelte/store"
import {bech32, utf8} from "@scure/base"
import {debounce, throttle} from "throttle-debounce"
import {
gt,
without,
whereEq,
reject,
mergeDeepRight,
aperture,
filter,
isNil,
is,
pluck,
@ -164,63 +161,6 @@ export const createScroller = (loadMore, {reverse = false, element = null} = {})
export const randomChoice = xs => xs[Math.floor(Math.random() * xs.length)]
export class Cursor {
delta?: number
since?: number
until: number
limit: number
count: number
constructor({limit = 20, delta = undefined} = {}) {
this.delta = delta
this.since = delta ? now() - delta : undefined
this.until = now()
this.limit = limit
this.count = 0
}
getFilter() {
return filter(identity, {
since: this.since,
until: this.until,
limit: this.limit,
})
}
// Remove events that are significantly older than the average
prune(events) {
const maxDiff = avg(events.map(e => this.until - e.created_at)) * 4
return events.filter(e => this.until - e.created_at < maxDiff)
}
// Calculate a reasonable amount to move our window to avoid fetching too much of the
// same stuff we already got without missing certain time periods due to a mismatch
// in event density between various relays
update(events) {
if (events.length > 2) {
// Keep track of how many requests we've made
this.count += 1
// Find the average gap between events to figure out how regularly people post to this
// feed. Multiply it by the number of events we have but scale down to avoid
// blowing past big gaps due to misbehaving relays skewing the results. Trim off
// outliers and scale based on results/requests to help with that
const timestamps = sortBy(identity, pluck("created_at", events))
const gaps = aperture(2, timestamps).map(([a, b]) => b - a)
const high = quantile(gaps, 0.5)
const gap = avg(gaps.filter(gt(high)))
// If we're just warming up, scale the window down even further to avoid
// blowing past the most relevant time period
const scale = Math.min(1, Math.log10(events.length)) * Math.min(1, Math.log10(this.count + 1))
// Only paginate part of the way so we can avoid missing stuff
this.until -= Math.round(gap * scale * this.limit)
}
if (this.since) {
this.since = Math.min(this.since, this.until) - this.delta
}
}
}
export const synced = (key, defaultValue = null) => {
// If it's an object, merge defaults
const store = writable(