Fix feeds

This commit is contained in:
Jonathan Staab 2023-07-24 09:43:52 -07:00
parent 4f68406f30
commit 16bed6403c
11 changed files with 303 additions and 242 deletions

View File

@ -2,7 +2,7 @@
import type {DynamicFilter, DisplayEvent} from "src/engine/types"
import {onMount, onDestroy} from "svelte"
import {readable, derived, writable} from "svelte/store"
import {Feed} from "src/engine"
import {FeedLoader} from "src/engine"
import {last, equals} from "ramda"
import {fly} from "src/util/transition"
import {quantify} from "hurdak"
@ -66,7 +66,7 @@
return Nip65.mergeHints(limit, hints)
}
const loadMore = () => feed.load()
const loadMore = () => feed.load(5)
export const stop = () => {
feed?.stop()
@ -82,7 +82,7 @@
filter = newFilter
}
feed = new Feed(engine, {
feed = new FeedLoader(engine, {
depth: 2,
relays: getRelays(),
filter: compileFilter(filter),

View File

@ -1,7 +1,7 @@
<script>
import {propEq, find} from "ramda"
import {onMount, onDestroy} from "svelte"
import {Feed} from "src/engine"
import {FeedLoader} from "src/engine"
import {fly} from "src/util/transition"
import {isMobile} from "src/util/html"
import {asDisplayEvent} from "src/util/nostr"
@ -16,7 +16,7 @@
export let relays = []
export let invertColors = false
const feed = new Feed(engine, {
const feed = new FeedLoader(engine, {
limit: 1,
depth: 6,
relays: Nip65.selectHints(10, relays),

View File

@ -58,7 +58,7 @@
tryFunc(() => {
nip19.decode(entity)
navigate("/" + entity)
}, "TypeError")
})
}
})

View File

@ -201,7 +201,7 @@ export class Nip65 {
.slice(0, limit)
}
initialize(engine: Engine) {
async initialize(engine: Engine) {
this.engine = engine
engine.Events.addHandler(2, e => {
@ -246,22 +246,21 @@ export class Nip65 {
})
)
})
;(async () => {
const {DEFAULT_RELAYS, FORCE_RELAYS, DUFFLEPUD_URL} = engine.Env
// Throw some hardcoded defaults in there
DEFAULT_RELAYS.forEach(this.addRelay)
const {DEFAULT_RELAYS, FORCE_RELAYS, DUFFLEPUD_URL} = engine.Env
// Load relays from nostr.watch via dufflepud
if (FORCE_RELAYS.length === 0 && DUFFLEPUD_URL) {
try {
const json = await Fetch.fetchJson(DUFFLEPUD_URL + "/relay")
// Throw some hardcoded defaults in there
DEFAULT_RELAYS.forEach(this.addRelay)
json.relays.filter(isShareableRelay).forEach(this.addRelay)
} catch (e) {
warn("Failed to fetch relays list", e)
}
// Load relays from nostr.watch via dufflepud
if (FORCE_RELAYS.length === 0 && DUFFLEPUD_URL) {
try {
const json = await Fetch.fetchJson(DUFFLEPUD_URL + "/relay")
json.relays.filter(isShareableRelay).forEach(this.addRelay)
} catch (e) {
warn("Failed to fetch relays list", e)
}
})()
}
}
}

View File

@ -19,7 +19,8 @@ export {Nip65} from "./components/Nip65"
export {Outbox} from "./components/Outbox"
export {User} from "./components/User"
export {Feed} from "./util/Feed"
export {FeedLoader} from "./util/FeedLoader"
export {ContextLoader} from "./util/ContextLoader"
export {Cursor, MultiCursor} from "./util/Cursor"
export {StorageAdapter} from "./util/StorageAdapter"
export {PubkeyLoader} from "./util/PubkeyLoader"

View File

@ -2,72 +2,48 @@ import {matchFilters} from "nostr-tools"
import {throttle} from "throttle-debounce"
import {
map,
omit,
pick,
pluck,
partition,
identity,
flatten,
without,
groupBy,
any,
sortBy,
prop,
uniqBy,
assoc,
reject,
} from "ramda"
import {ensurePlural, seconds, sleep, batch, union, chunk, doPipe} from "hurdak"
import {ensurePlural, batch, union, chunk} from "hurdak"
import {now, pushToKey} from "src/util/misc"
import {findReplyId, Tags, noteKinds} from "src/util/nostr"
import {collection} from "./store"
import {Cursor, MultiCursor} from "src/engine/util/Cursor"
import {PubkeyLoader} from "src/engine/util/PubkeyLoader"
import type {Collection} from "src/engine/util/store"
import type {Subscription} from "src/engine/util/Subscription"
import type {Event, DisplayEvent, Filter} from "src/engine/types"
import type {Engine} from "src/engine/Engine"
const fromDisplayEvent = (e: DisplayEvent): Event =>
omit(["zaps", "likes", "replies", "matchesFilter"], e)
export type FeedOpts = {
limit?: number
depth: number
relays: string[]
export type ContextLoaderOpts = {
filter: Filter | Filter[]
onEvent?: (e: Event) => void
shouldLoadParents?: boolean
}
export class Feed {
export class ContextLoader {
engine: Engine
pubkeyLoader: PubkeyLoader
limit: number
since: number
started: boolean
stopped: boolean
deferred: Event[]
context: Collection<Event>
feed: Collection<DisplayEvent>
data: Collection<Event>
seen: Set<string>
subs: Record<string, Array<{close: () => void}>>
cursor: MultiCursor
constructor(engine: Engine, readonly opts: FeedOpts) {
constructor(engine: Engine, readonly opts: ContextLoaderOpts) {
this.engine = engine
this.pubkeyLoader = new PubkeyLoader(engine)
this.limit = opts.limit || 20
this.since = now()
this.started = false
this.stopped = false
this.deferred = []
this.context = collection<Event>("id")
this.feed = collection<DisplayEvent>("id")
this.data = collection<Event>("id")
this.seen = new Set()
this.subs = {
main: [],
notes: [],
context: [],
listeners: [],
}
@ -104,7 +80,7 @@ export class Feed {
isMissingParent = (e: Event) => {
const parentId = findReplyId(e)
return parentId && this.matchFilters(e) && !this.context.key(parentId).exists()
return parentId && this.matchFilters(e) && !this.data.key(parentId).exists()
}
preprocessEvents = (events: Event[]) => {
@ -125,7 +101,7 @@ export class Feed {
return Nip65.mergeHints(User.getSetting("relay_limit"), groups)
}
applyContext(notes: Event[], context: Event[], substituteParents = false) {
applyContext = (notes: Event[], substituteParents = false) => {
const parentIds = new Set(notes.map(findReplyId).filter(identity))
const forceShow = union(new Set(pluck("id", notes)), parentIds)
const contextById = {} as Record<string, Event>
@ -133,7 +109,7 @@ export class Feed {
const reactionsByParentId = {} as Record<string, Event[]>
const repliesByParentId = {} as Record<string, Event[]>
for (const event of context.concat(notes)) {
for (const event of this.data.get()) {
const parentId = findReplyId(event)
if (contextById[event.id]) {
@ -166,9 +142,9 @@ export class Feed {
}
}
return notes.map(note => {
if (substituteParents) {
for (let i = 0; i < 3; i++) {
if (substituteParents) {
notes = notes.map(note => {
for (let i = 0; i < 2; i++) {
const parent = contextById[findReplyId(note)]
if (!parent) {
@ -177,10 +153,12 @@ export class Feed {
note = parent
}
}
return annotate(note)
})
return note
})
}
return uniqBy(prop("id"), notes).map(annotate)
}
// Context loaders
@ -192,6 +170,10 @@ export class Feed {
}
loadParents = (events: Event[]) => {
if (this.stopped) {
return
}
const {Network, Nip65} = this.engine
const parentsInfo = events
.map((e: Event) => ({id: findReplyId(e), hints: Nip65.getParentHints(10, e)}))
@ -210,6 +192,10 @@ export class Feed {
}
loadContext = batch(300, (eventGroups: any) => {
if (this.stopped) {
return
}
const {Network, Nip65} = this.engine
const groupsByDepth = groupBy(prop("depth"), eventGroups)
@ -235,22 +221,22 @@ export class Feed {
})
listenForContext = throttle(5000, () => {
const {Network, Nip65} = this.engine
if (this.stopped) {
return
}
const {Network, Nip65} = this.engine
this.subs.listeners.forEach(sub => sub.close())
const contextByParentId = groupBy(findReplyId, this.context.get())
const contextByParentId = groupBy(findReplyId, this.data.get())
const findNotes = (events: Event[]): Event[] =>
events
.filter(this.isTextNote)
.flatMap(e => findNotes(contextByParentId[e.id] || []).concat(e))
for (const c of chunk(256, findNotes(this.feed.get()))) {
for (const c of chunk(256, findNotes(this.data.get()))) {
this.addSubs("listeners", [
Network.subscribe({
relays: this.mergeHints(c.map(e => Nip65.getReplyHints(10, e))),
@ -272,9 +258,7 @@ export class Feed {
}
}
this.feed.update($feed => this.applyContext($feed, events, false))
this.context.update($context => $context.concat(events))
this.data.update($context => $context.concat(events))
this.loadPubkeys(events)
@ -289,39 +273,6 @@ export class Feed {
// Control
start() {
const {since} = this
const {relays, filter, depth} = this.opts
// No point in subscribing if we have an end date
if (!any(prop("until"), ensurePlural(filter) as any[])) {
this.addSubs("main", [
this.engine.Network.subscribe({
relays,
filter: ensurePlural(filter).map(assoc("since", since)),
onEvent: batch(1000, (context: Event[]) =>
this.addContext(context, {shouldLoadParents: true, depth})
),
}),
])
}
this.cursor = new MultiCursor(
relays.map(
relay =>
new Cursor(this.engine, {
relay,
filter,
onEvent: batch(100, (context: Event[]) =>
this.addContext(context, {shouldLoadParents: true, depth})
),
})
)
)
this.started = true
}
stop() {
this.stopped = true
@ -329,121 +280,4 @@ export class Feed {
sub.close()
}
}
hydrate(feed: DisplayEvent[]) {
const {depth} = this.opts
const notes: DisplayEvent[] = []
const context: Event[] = []
const addContext = (note: DisplayEvent) => {
context.push(fromDisplayEvent(note))
note.zaps.forEach(zap => context.push(zap))
note.reactions.forEach(reaction => context.push(reaction))
note.replies.forEach(reply => addContext(reply))
}
feed.forEach(note => {
addContext(note)
notes.push(note)
})
this.feed.set(notes)
this.addContext(context, {depth})
}
// Loading
async load() {
if (!this.started) {
this.start()
}
// If we don't have a decent number of notes yet, try to get enough
// to avoid out of order notes
if (this.cursor.count() < this.limit) {
this.addSubs("notes", this.cursor.load(this.limit))
await sleep(500)
}
const [subs, notes] = this.cursor.take(5)
const deferred = this.deferred.splice(0)
this.addSubs("notes", subs)
const ok = doPipe(notes.concat(deferred), [
this.deferReactions,
this.deferOrphans,
this.deferAncient,
])
this.addToFeed(ok)
}
async loadAll() {
if (!this.started) {
this.start()
}
this.addSubs("notes", this.cursor.load(this.limit))
// Wait for our requested notes
while (!this.cursor.done()) {
const [subs, notes] = this.cursor.take(this.limit)
this.addSubs("notes", subs)
this.addToFeed(notes)
await sleep(300)
}
// Wait for our requested context
while (this.getAllSubs(["notes", "context"]).length > 0) {
await sleep(300)
}
}
deferReactions = (notes: Event[]) => {
const [defer, ok] = partition(e => !this.isTextNote(e) && this.isMissingParent(e), notes)
setTimeout(() => {
// Defer again if we still don't have a parent, it's pointless to show an orphaned reaction
const [orphans, ready] = partition(this.isMissingParent, defer)
this.addToFeed(ready)
this.deferred = this.deferred.concat(orphans)
}, 1500)
return ok
}
deferOrphans = (notes: Event[]) => {
// If something has a parent id but we haven't found the parent yet, skip it until we have it.
const [defer, ok] = partition(e => this.isTextNote(e) && this.isMissingParent(e), notes)
setTimeout(() => this.addToFeed(defer), 1500)
return ok
}
deferAncient = (notes: Event[]) => {
// Sometimes relays send very old data very quickly. Pop these off the queue and re-add
// them after we have more timely data. They still might be relevant, but order will still
// be maintained since everything before the cutoff will be deferred the same way.
const since = now() - seconds(6, "hour")
const [defer, ok] = partition(e => e.created_at < since, notes)
setTimeout(() => this.addToFeed(defer), 1500)
return ok
}
addToFeed(notes: Event[]) {
const context = this.context.get()
const applied = this.applyContext(notes, context, true)
const sorted = sortBy(e => -e.created_at, applied)
this.feed.update($feed => $feed.concat(sorted))
}
}

View File

@ -13,17 +13,13 @@ export type CursorOpts = {
export class Cursor {
engine: Engine
done: boolean
until: number
buffer: Event[]
loading: boolean
done = false
until = now()
buffer: Event[] = []
loading = false
constructor(engine: Engine, readonly opts: CursorOpts) {
this.engine = engine
this.done = false
this.until = now()
this.buffer = []
this.loading = false
}
load(n: number) {
@ -59,7 +55,6 @@ export class Cursor {
},
onClose: () => {
this.loading = false
this.done = true
},
})
}
@ -82,24 +77,25 @@ export class Cursor {
}
export class MultiCursor {
#seen_on: Map<string, string[]>
#cursors: Cursor[]
bufferFactor = 4
seen_on: Map<string, string[]>
cursors: Cursor[]
constructor(cursors: Cursor[]) {
this.#seen_on = new Map()
this.#cursors = cursors
this.seen_on = new Map()
this.cursors = cursors
}
load(limit: number) {
return this.#cursors.map(c => c.load(limit)).filter(identity)
return this.cursors.map(c => c.load(limit)).filter(identity)
}
done() {
return all(prop("done"), this.#cursors)
return all(prop("done"), this.cursors)
}
count() {
return this.#cursors.reduce((n, c) => n + c.buffer.length, 0)
return this.cursors.reduce((n, c) => n + c.buffer.length, 0)
}
take(n: number): [Subscription[], Event[]] {
@ -109,7 +105,7 @@ export class MultiCursor {
// Find the most recent event available so that they're sorted
const [cursor] = sortBy(
c => -c.peek().created_at,
this.#cursors.filter(c => c.peek())
this.cursors.filter(c => c.peek())
)
if (!cursor) {
@ -120,17 +116,17 @@ export class MultiCursor {
// Merge seen_on via mutation so it applies to future. If we've already
// seen the event, we're also done and we don't need to add it to our buffer
if (this.#seen_on.has(event.id)) {
this.#seen_on.get(event.id).push(event.seen_on[0])
if (this.seen_on.has(event.id)) {
this.seen_on.get(event.id).push(event.seen_on[0])
} else {
this.#seen_on.set(event.id, event.seen_on)
this.seen_on.set(event.id, event.seen_on)
events.push(event)
}
}
// Preload the next page
const subs = this.load(n)
const subs = this.load(n * this.bufferFactor)
return [subs, events]
}

View File

@ -0,0 +1,211 @@
import {partition, pluck, sortBy, omit, without, any, prop, assoc} from "ramda"
import {ensurePlural, seconds, doPipe, sleep, throttle, batch} from "hurdak"
import {now, race} from "src/util/misc"
import {Cursor, MultiCursor} from "src/engine/util/Cursor"
import {PubkeyLoader} from "src/engine/util/PubkeyLoader"
import {ContextLoader} from "src/engine/util/ContextLoader"
import {writable} from "src/engine/util/store"
import type {Subscription} from "src/engine/util/Subscription"
import type {Event, DisplayEvent, Filter} from "src/engine/types"
import type {Engine} from "src/engine/Engine"
const fromDisplayEvent = (e: DisplayEvent): Event =>
omit(["zaps", "likes", "replies", "matchesFilter"], e)
export type FeedOpts = {
depth: number
relays: string[]
filter: Filter | Filter[]
onEvent?: (e: Event) => void
shouldLoadParents?: boolean
}
export class FeedLoader {
engine: Engine
pubkeyLoader: PubkeyLoader
since = now()
stopped = false
context: ContextLoader
subs: Array<{close: () => void}> = []
feed = writable<DisplayEvent[]>([])
deferred: Event[] = []
cursor: MultiCursor
ready: Promise<void>
constructor(engine: Engine, readonly opts: FeedOpts) {
this.engine = engine
this.pubkeyLoader = new PubkeyLoader(engine)
this.context = new ContextLoader(engine, {
filter: opts.filter,
onEvent: event => {
opts.onEvent?.(event)
this.updateFeed()
},
shouldLoadParents: opts.shouldLoadParents,
})
// No point in subscribing if we have an end date
if (!any(prop("until"), ensurePlural(opts.filter) as any[])) {
this.addSubs([
this.engine.Network.subscribe({
relays: opts.relays,
filter: ensurePlural(opts.filter).map(assoc("since", this.since)),
onEvent: batch(1000, (context: Event[]) =>
this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth})
),
}),
])
}
this.cursor = new MultiCursor(
opts.relays.map(
relay =>
new Cursor(this.engine, {
relay,
filter: opts.filter,
onEvent: batch(100, (context: Event[]) =>
this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth})
),
})
)
)
const subs = this.cursor.load(50)
this.addSubs(subs)
// Wait until a good number of subscriptions have completed to reduce the chance of
// out of order notes
this.ready = race(0.5, pluck("complete", subs))
}
// Control
addSubs(subs: Array<Subscription>) {
for (const sub of ensurePlural(subs)) {
this.subs.push(sub)
sub.on("close", () => {
this.subs = without([sub], this.subs)
})
}
}
stop() {
this.stopped = true
this.context.stop()
for (const sub of this.subs) {
sub.close()
}
}
// Feed building
hydrate(feed: DisplayEvent[]) {
const {depth} = this.opts
const notes: DisplayEvent[] = []
const context: Event[] = []
const addContext = (note: DisplayEvent) => {
context.push(fromDisplayEvent(note))
note.zaps.forEach(zap => context.push(zap))
note.reactions.forEach(reaction => context.push(reaction))
note.replies.forEach(reply => addContext(reply))
}
feed.forEach(note => {
addContext(note)
notes.push(note)
})
this.context.addContext(context, {depth})
}
addToFeed = (notes: Event[]) => {
this.feed.update($feed => {
const newFeed = ($feed as Event[]).concat(sortBy(e => -e.created_at, notes))
return this.context.applyContext(newFeed, true)
})
}
updateFeed = throttle(500, () => {
this.feed.update($feed => this.context.applyContext($feed, true))
})
// Loading
async load(n) {
await this.ready
const [subs, notes] = this.cursor.take(n)
const deferred = this.deferred.splice(0)
this.addSubs(subs)
const ok = doPipe(notes.concat(deferred), [
this.deferReactions,
this.deferOrphans,
this.deferAncient,
])
this.addToFeed(ok)
}
async loadAll() {
this.addSubs(this.cursor.load(Infinity))
// Wait for our requested notes
while (!this.cursor.done()) {
this.load(20)
await sleep(300)
}
}
deferReactions = (notes: Event[]) => {
const [defer, ok] = partition(
e => !this.context.isTextNote(e) && this.context.isMissingParent(e),
notes
)
setTimeout(() => {
// Defer again if we still don't have a parent, it's pointless to show an orphaned reaction
const [orphans, ready] = partition(this.context.isMissingParent, defer)
this.addToFeed(ready)
this.deferred = this.deferred.concat(orphans)
}, 1500)
return ok
}
deferOrphans = (notes: Event[]) => {
// If something has a parent id but we haven't found the parent yet, skip it until we have it.
const [defer, ok] = partition(
e => this.context.isTextNote(e) && this.context.isMissingParent(e),
notes
)
setTimeout(() => this.addToFeed(defer), 1500)
return ok
}
deferAncient = (notes: Event[]) => {
// Sometimes relays send very old data very quickly. Pop these off the queue and re-add
// them after we have more timely data. They still might be relevant, but order will still
// be maintained since everything before the cutoff will be deferred the same way.
const since = now() - seconds(6, "hour")
const [defer, ok] = partition(e => e.created_at < since, notes)
setTimeout(() => this.addToFeed(defer), 4000)
return ok
}
}

View File

@ -2,12 +2,13 @@ import EventEmitter from "events"
import {defer} from "hurdak"
export class Subscription extends EventEmitter {
closed = false
opened = Date.now()
closed: number = null
complete = defer()
close = () => {
if (!this.closed) {
this.closed = true
this.closed = Date.now()
this.complete.resolve()
this.emit("close")
this.removeAllListeners()

View File

@ -7,10 +7,12 @@ import {writable} from "svelte/store"
import {warn} from "src/util/logger"
export const fuzzy = <T>(data: T[], opts = {}) => {
const {search} = new Fuse(data, opts) as {search: (q: string) => {item: T}[]}
const fuse = new Fuse(data, opts) as any
// Slice pattern because the docs warn that it"ll crash if too long
return (q: string) => (q ? pluck("item", search(q.slice(0, 32))) : data)
return (q: string) => {
return q ? pluck("item", fuse.search(q.slice(0, 32)) as any[]) : data
}
}
export const now = () => Math.round(new Date().valueOf() / 1000)
@ -229,3 +231,20 @@ export const pushToKey = <T>(m: Record<string, T[]>, k: string, v: T) => {
m[k] = m[k] || []
m[k].push(v)
}
export const race = (p, promises) => {
const threshold = Math.ceil(promises.length * p)
let count = 0
return new Promise<void>((resolve, reject) => {
promises.forEach(p => {
p.then(() => {
count++
if (count >= threshold) {
resolve()
}
}).catch(reject)
})
})
}

BIN
yarn.lock

Binary file not shown.