Re-work feeds using lib

This commit is contained in:
Jon Staab 2024-04-12 17:05:28 -07:00
parent 103e8ca1ab
commit e3e79cea04
10 changed files with 191 additions and 287 deletions

View File

@ -22,7 +22,7 @@
forcePlatformRelaySelections,
} from "src/engine"
import {router} from "src/app/router"
import {feedCompiler} from "src/app/util"
import {feedLoader} from "src/app/util"
export let feed
export let group = null
@ -71,9 +71,10 @@
let subs = []
onMount(async () => {
const {filters} = await feedCompiler.compile(feed)
const {filters} = await feedLoader.compiler.compile(feed)
const selections = getFilterSelections(filters)
const subs = forcePlatformRelaySelections(selections).map(({relay, filters}) =>
subs = forcePlatformRelaySelections(selections).map(({relay, filters}) =>
subscribe({relays: [relay], filters, onEvent}),
)
})

View File

@ -11,7 +11,6 @@
import {FeedLoader} from "src/app/util"
export let feed: Feed
export let relays = []
export let anchor = null
export let eager = false
export let skipCache = false
@ -25,18 +24,22 @@
export let onEvent = null
let loader, element
let limit = 0
let notes = readable([])
const hideReplies = writable(Storage.getJson("hideReplies"))
const loadMore = () => loader.load(20)
const loadMore = () => {
limit += 5
if ($notes.length < limit) {
loader.load(20)
}
}
const start = () => {
loader?.stop()
loader = new FeedLoader({
feed,
relays,
anchor,
skipCache,
skipNetwork,
@ -52,12 +55,6 @@
notes = loader.notes
}
const updateFilter = newFilter => {
filter = newFilter
start()
}
const unsubHideReplies = hideReplies.subscribe($hideReplies => {
start()
Storage.setJson("hideReplies", $hideReplies)
@ -69,27 +66,16 @@
return () => {
unsubHideReplies()
scroller?.stop()
loader?.stop()
}
})
</script>
<FlexColumn xl bind:element>
{#await loader.config}
<!-- pass -->
{:then { filters }}
{#each $notes as note, i (note.id)}
<div in:fly={{y: 20}}>
<Note
depth={$hideReplies ? 0 : 2}
context={note.replies || []}
{filters}
{showGroup}
{anchor}
{note} />
</div>
{/each}
{/await}
{#each $notes.slice(0, limit) as note, i (note.id)}
<div in:fly={{y: 20}}>
<Note depth={$hideReplies ? 0 : 2} context={note.replies || []} {showGroup} {anchor} {note} />
</div>
{/each}
</FlexColumn>
{#if !hideSpinner}

View File

@ -1,19 +1,17 @@
import {partition, prop, uniqBy, without, assoc} from "ramda"
import {partition, prop, uniqBy} from "ramda"
import {batch} from "hurdak"
import {now, writable} from "@coracle.social/lib"
import type {Filter} from "@coracle.social/util"
import {writable} from "@coracle.social/lib"
import {
Tags,
getIdOrAddress,
getIdAndAddress,
getIdFilters,
guessFilterDelta,
isContextAddress,
decodeAddress,
} from "@coracle.social/util"
import type {Feed} from "@coracle.social/feeds"
import {FeedCompiler, Scope} from "@coracle.social/feeds"
import {race} from "src/util/misc"
import type {Feed, Loader} from "@coracle.social/feeds"
import {FeedLoader as CoreFeedLoader, FeedType, Scope} from "@coracle.social/feeds"
import {generatePrivateKey} from "src/util/nostr"
import {info} from "src/util/logger"
import {LOCAL_RELAY_URL, reactionKinds, repostKinds} from "src/util/nostr"
import type {DisplayEvent, Event} from "src/engine"
import {
@ -21,15 +19,14 @@ import {
unwrapRepost,
isEventMuted,
isDeleted,
primeWotCaches,
hints,
forcePlatformRelays,
forcePlatformRelaySelections,
forceRelaySelections,
addRepostFilters,
getFilterSelections,
tracker,
load,
subscribe,
MultiCursor,
dvmRequest,
getFollowedPubkeys,
getFollowers,
@ -39,65 +36,70 @@ import {
user,
} from "src/engine"
export const feedCompiler = new FeedCompiler({
requestDvm: async ({request, onEvent}) => {
const event = await dvmRequest({
...request,
timeout: 3000,
sk: generatePrivateKey(),
})
const requestDvm = async ({kind, tags = [], onEvent}) => {
const sk = generatePrivateKey()
const event = await dvmRequest({kind, tags, sk, timeout: 3000})
if (event) {
onEvent(event)
if (event) {
onEvent(event)
}
}
const request = async ({relays, filters, onEvent}) => {
if (relays.length > 0) {
await load({filters, relays, onEvent})
} else {
await Promise.all(
getFilterSelections(filters).map(({relay, filters}) =>
load({filters, relays: [relay], onEvent}),
),
)
}
}
const getPubkeysForScope = (scope: string) => {
const $user = user.get()
switch (scope) {
case Scope.Self:
return $user ? [$user.pubkey] : []
case Scope.Follows:
return getFollowedPubkeys($user)
case Scope.Followers:
return Array.from(getFollowers($user.pubkey).map(p => p.pubkey))
default:
throw new Error(`Invalid scope ${scope}`)
}
}
const getPubkeysForWotRange = (min, max) => {
const pubkeys = []
const $user = user.get()
const thresholdMin = maxWot.get() * min
const thresholdMax = maxWot.get() * max
primeWotCaches($user.pubkey)
for (const person of people.get()) {
const score = getWotScore($user.pubkey, person.pubkey)
if (score >= thresholdMin && score <= thresholdMax) {
pubkeys.push(person.pubkey)
}
},
request: async ({relays, filters, onEvent}) => {
if (relays.length > 0) {
await load({filters, relays, onEvent})
} else {
await Promise.all(
getFilterSelections(filters).map(({relay, filters}) =>
load({filters, relays: [relay], onEvent}),
),
)
}
},
getPubkeysForScope: (scope: string) => {
const $user = user.get()
}
switch (scope) {
case Scope.Self:
return $user ? [$user.pubkey] : []
case Scope.Follows:
return getFollowedPubkeys($user)
case Scope.Followers:
return Array.from(getFollowers($user.pubkey).map(p => p.pubkey))
default:
throw new Error(`Invalid scope ${scope}`)
}
},
getPubkeysForWotRange: (min, max) => {
const pubkeys = []
const $user = user.get()
const thresholdMin = maxWot.get() * min
const thresholdMax = maxWot.get() * max
return pubkeys
}
for (const person of people.get()) {
const score = getWotScore($user.pubkey, person.pubkey)
if (score >= thresholdMin && score <= thresholdMax) {
pubkeys.push(person.pubkey)
}
}
return pubkeys
},
export const feedLoader = new CoreFeedLoader({
request,
requestDvm,
getPubkeysForScope,
getPubkeysForWotRange,
})
export type FeedOpts = {
feed: Feed
relays: string[]
onEvent?: (e: Event) => void
anchor?: string
skipCache?: boolean
skipNetwork?: boolean
@ -108,108 +110,91 @@ export type FeedOpts = {
shouldHideReplies?: boolean
shouldLoadParents?: boolean
includeReposts?: boolean
onEvent?: (e: Event) => void
}
export class FeedLoader {
stopped = false
config: Promise<{filters: Filter[]}>
subs: Array<{close: () => void}> = []
buffer = writable<Event[]>([])
done = false
loader: Promise<Loader>
notes = writable<DisplayEvent[]>([])
parents = new Map<string, DisplayEvent>()
reposts = new Map<string, Event[]>()
replies = new Map<string, Event[]>()
cursor: MultiCursor
isEventMuted = isEventMuted.get()
isDeleted = isDeleted.get()
constructor(readonly opts: FeedOpts) {
this.config = this.start()
}
async start() {
const requestItem = await feedCompiler.compile(this.opts.feed)
const filters =
this.opts.includeReposts && !requestItem.filters.some(f => f.authors?.length > 0)
? addRepostFilters(requestItem.filters)
: requestItem.filters
let relaySelections = []
if (requestItem.relays.length > 0) {
relaySelections = requestItem.relays.map(relay => ({relay, filters}))
} else if (!this.opts.skipNetwork) {
relaySelections = getFilterSelections(filters)
relaySelections = forceRelaySelections(relaySelections, this.opts.relays)
if (!this.opts.skipPlatform) {
relaySelections = forcePlatformRelaySelections(relaySelections)
}
}
if (!this.opts.skipCache && requestItem.relays.length === 0) {
relaySelections.push({relay: LOCAL_RELAY_URL, filters})
}
// No point in subscribing if we have an end date
if (this.opts.shouldListen && !filters.every(prop("until"))) {
this.addSubs(
relaySelections.map(({relay, filters}) =>
subscribe({
relays: [relay],
skipCache: true,
filters: filters.map(assoc("since", now())),
onEvent: batch(300, async (events: Event[]) => {
events = await this.discardEvents(events)
if (this.opts.shouldLoadParents) {
this.loadParents(events)
}
if (this.opts.shouldBuffer) {
this.buffer.update($buffer => $buffer.concat(events))
} else {
this.addToFeed(events, {prepend: true})
}
}),
}),
),
)
}
this.cursor = new MultiCursor({
relaySelections,
onEvent: batch(300, async events => {
if (this.opts.shouldLoadParents) {
this.loadParents(await this.discardEvents(events))
// Use a custom feed loader so we can intercept the filters
const feedLoader = new CoreFeedLoader({
requestDvm,
getPubkeysForScope,
getPubkeysForWotRange,
request: async ({relays, filters, onEvent}) => {
if (this.opts.includeReposts && !filters.some(f => f.authors?.length > 0)) {
filters = addRepostFilters(filters)
}
}),
const promises = []
if (relays.length > 0) {
promises.push(load({filters, relays, onEvent}))
} else {
if (!this.opts.skipCache) {
promises.push(load({filters, relays: [LOCAL_RELAY_URL], onEvent}))
}
if (!this.opts.skipNetwork) {
let selections = getFilterSelections(filters)
if (!this.opts.skipPlatform) {
selections = forcePlatformRelaySelections(selections)
}
for (const {relay, filters} of selections) {
promises.push(load({filters, relays: [relay], onEvent}))
}
}
}
await Promise.all(promises)
},
})
const subs = this.addSubs(this.cursor.load(50))
this.loader = feedLoader.getLoader(opts.feed, {
onEvent: batch(300, async events => {
const keep = await this.discardEvents(events)
// Wait until at least one subscription has completed to reduce the chance of
// out of order notes
if (subs.length > 1) {
await race(
Math.min(2, subs.length),
subs.map(
s =>
new Promise(r => {
s.emitter.on("event", r)
s.emitter.on("complete", r)
}),
),
)
}
if (this.opts.shouldLoadParents) {
this.loadParents(keep)
}
return {filters}
const ok = this.deferOrphans(keep)
this.addToFeed(ok)
}),
onExhausted: () => {
this.done = true
},
})
}
async discardEvents(events) {
// Public api
subscribe = f => this.notes.subscribe(f)
load = (limit: number) => this.loader.then(loader => loader(limit))
// Event selection, deferral, and parent loading
discardEvents = async events => {
let strict = false
// Be more tolerant when looking at communities
const {filters} = await this.config
const strict = filters.some(f => f["#a"])
feedLoader.compiler.walk(this.opts.feed, ([type, ...feed]) => {
if (type === FeedType.Filter) {
strict = feed.some(f => f["#a"]?.find(a => isContextAddress(decodeAddress(a))))
}
})
return events.filter(e => {
if (this.isDeleted(e)) {
@ -263,15 +248,12 @@ export class FeedLoader {
return
}
const scenario =
this.opts.relays.length > 0
? hints.product(Array.from(parentIds), this.opts.relays)
: hints.merge(notesWithParent.map(hints.EventParents))
const selections = hints.merge(notesWithParent.map(hints.EventParents)).getSelections()
for (const {relay, values} of scenario.getSelections()) {
for (const {relay, values} of selections) {
load({
relays: [relay],
filters: getIdFilters(values),
relays: this.opts.skipPlatform ? [relay] : forcePlatformRelays([relay]),
onEvent: batch(100, async events => {
for (const e of await this.discardEvents(events)) {
this.parents.set(e.id, e)
@ -281,30 +263,34 @@ export class FeedLoader {
}
}
// Control
addSubs(subs) {
for (const sub of subs) {
this.subs.push(sub)
sub.emitter.on("complete", () => {
this.subs = without([sub], this.subs)
})
deferOrphans = (notes: Event[]) => {
if (!this.opts.shouldLoadParents || this.opts.shouldDefer === false) {
return notes
}
return subs
}
// If something has a parent id but we haven't found the parent yet, skip it until we have it.
const [ok, defer] = partition(e => {
const parent = Tags.fromEvent(e).parent()
stop() {
this.stopped = true
return !parent || this.parents.has(parent.value())
}, notes)
for (const sub of this.subs) {
sub.close()
}
setTimeout(() => this.addToFeed(defer), 3000)
return ok
}
// Feed building
addToFeed = (notes: Event[], {prepend = false} = {}) => {
this.notes.update($notes => {
const chunk = this.buildFeedChunk(notes)
const combined = prepend ? [...chunk, ...$notes] : [...$notes, ...chunk]
return uniqBy(prop("id"), combined)
})
}
buildFeedChunk = (notes: Event[]) => {
const seen = new Set(this.notes.get().map(getIdOrAddress))
const parents = []
@ -331,8 +317,8 @@ export class FeedLoader {
}
}
// Only replace parents for kind 1 replies
if (e.kind !== 1) {
// Only replace parents for kind 1 replies or reactions
if (!reactionKinds.concat(1).includes(e.kind)) {
return e
}
@ -381,73 +367,4 @@ export class FeedLoader {
),
)
}
addToFeed = (notes: Event[], {prepend = false} = {}) => {
this.notes.update($notes => {
const chunk = this.buildFeedChunk(notes)
const combined = prepend ? [...chunk, ...$notes] : [...$notes, ...chunk]
return uniqBy(prop("id"), combined)
})
}
subscribe = f => this.notes.subscribe(f)
// Loading
async load(n) {
await this.config
if (this.cursor.done()) {
return
}
const [subs, events] = this.cursor.take(n)
this.addSubs(subs)
this.addToFeed(this.deferOrphans(await this.discardEvents(events)))
}
loadBuffer() {
this.buffer.update($buffer => {
this.addToFeed($buffer)
return []
})
}
deferOrphans = (notes: Event[]) => {
if (!this.opts.shouldLoadParents || this.opts.shouldDefer === false) {
return notes
}
// If something has a parent id but we haven't found the parent yet, skip it until we have it.
const [ok, defer] = partition(e => {
const parent = Tags.fromEvent(e).parent()
return !parent || this.parents.has(parent.value())
}, notes)
setTimeout(() => this.addToFeed(defer), 3000)
return ok
}
deferAncient = async (notes: Event[]) => {
const {filters} = await this.config
if (this.opts.shouldDefer === false) {
return notes
}
// Sometimes relays send very old data very quickly. Pop these off the queue and re-add
// them after we have more timely data. They still might be relevant, but order will still
// be maintained since everything before the cutoff will be deferred the same way.
const since = now() - guessFilterDelta(filters)
const [ok, defer] = partition(e => e.created_at > since, notes)
setTimeout(() => this.addToFeed(defer), 5000)
return ok
}
}

View File

@ -1,7 +1,7 @@
<script lang="ts">
import cx from "classnames"
import {Tags} from "@coracle.social/util"
import {Scope, filter} from "@coracle.social/feeds"
import {Scope, filter, usingRelays} from "@coracle.social/feeds"
import {noteKinds} from "src/util/nostr"
import {theme} from "src/partials/state"
import Anchor from "src/partials/Anchor.svelte"
@ -13,6 +13,10 @@
export let relays = []
export let feed = filter({kinds: noteKinds, scopes: [Scope.Follows]})
if (relays.length > 0) {
feed = usingRelays(relays, feed)
}
let key = Math.random()
const showLists = () => router.at("lists").open()
@ -26,10 +30,6 @@
const topics = tags.topics().valueOf()
const urls = tags.values("r").valueOf()
if (urls.length > 0) {
relays = urls
}
if (authors.length > 0) {
feed = filter({kinds: noteKinds, authors})
} else if (topics.length > 0) {
@ -38,6 +38,10 @@
feed = filter({kinds: noteKinds, scopes: [Scope.Follows]})
}
if (urls.length > 0) {
feed = usingRelays(urls, feed)
}
key = Math.random()
}
@ -54,7 +58,7 @@
{/if}
{#key key}
<Feed skipCache includeReposts showGroup {feed} {relays}>
<Feed skipCache includeReposts showGroup {feed}>
<div slot="controls">
{#if $canSign}
{#if $userLists.length > 0}

View File

@ -34,7 +34,7 @@
const setActiveTab = tab => router.at("notifications").at(tab).push()
const loadMore = () => {
const loadMore = async () => {
limit += 4
}

View File

@ -1,6 +1,6 @@
<script lang="ts">
import {batch} from "hurdak"
import {Scope, filter} from "@coracle.social/feeds"
import {filter, usingRelays} from "@coracle.social/feeds"
import {getAvgRating, noteKinds} from "src/util/nostr"
import Feed from "src/app/shared/Feed.svelte"
import Tabs from "src/partials/Tabs.svelte"
@ -15,8 +15,8 @@
let reviews = []
let activeTab = "notes"
url = normalizeRelayUrl(url)
$: url = normalizeRelayUrl(url)
$: feed = usingRelays([url], feed)
$: rating = getAvgRating(reviews)
const relay = deriveRelay(url)
@ -54,5 +54,5 @@
"#r": [$relay.url],
})} />
{:else}
<Feed skipCache relays={[$relay.url]} {feed} />
<Feed skipCache {feed} />
{/if}

View File

@ -12,7 +12,7 @@
const sortedEvents = events.derived(sortEventsDesc)
const loadMore = () => {
const loadMore = async () => {
limit += 50
}

View File

@ -1,12 +1,8 @@
import {shuffle, splitAt} from "@coracle.social/lib"
import {splitAt} from "@coracle.social/lib"
import type {Filter, RouterScenario, RouterScenarioOptions} from "@coracle.social/util"
import {isContextAddress, mergeFilters, getFilterId, decodeAddress} from "@coracle.social/util"
import {without, sortBy, prop} from "ramda"
import {switcherFn} from "hurdak"
import {env} from "src/engine/session/state"
import {user} from "src/engine/session/derived"
import {getSetting} from "src/engine/session/utils"
import {getFollowedPubkeys, getNetwork} from "src/engine/people/utils"
import {hints} from "src/engine/relays/utils"
export const addRepostFilters = (filters: Filter[]) =>

View File

@ -25,7 +25,7 @@
scroller = createScroller(loadMore, {element, reverse: true})
}
const loadMore = () => {
const loadMore = async () => {
limit += 10
}

View File

@ -85,7 +85,7 @@ type ScrollerOpts = {
}
export const createScroller = (
loadMore: () => any,
loadMore: () => Promise<void>,
{delay = 1000, threshold = 2000, reverse = false, element}: ScrollerOpts = {},
) => {
let done = false