move feed loader to events/requests

This commit is contained in:
Jon Staab 2024-04-15 12:44:21 -07:00
parent 011b476e0f
commit 0c0bbe7af2
18 changed files with 144 additions and 330 deletions

View File

@ -18,11 +18,11 @@
canSign,
isDeleted,
subscribe,
feedLoader,
getFilterSelections,
forcePlatformRelaySelections,
} from "src/engine"
import {router} from "src/app/router"
import {feedLoader} from "src/app/util"
export let feed
export let group = null

View File

@ -1,5 +1,5 @@
<script lang="ts">
import {writable} from '@coracle.social/lib'
import {writable} from "@coracle.social/lib"
import Chip from "src/partials/Chip.svelte"
import Toggle from "src/partials/Toggle.svelte"
import Modal from "src/partials/Modal.svelte"
@ -8,11 +8,6 @@
const isOpen = writable(false)
const displayPeople = pubkeys =>
pubkeys.length === 1 ? displayPubkey(pubkeys[0]) : `${pubkeys.length} people`
const displayTopics = topics => (topics.length === 1 ? topics[0] : `${topics.length} topics`)
const toggleReplies = () => {
value = {...value, shouldHideReplies: !value.shouldHideReplies}
}
@ -35,7 +30,9 @@
</div>
<div class="mb-2 mr-2 inline-block py-1">Showing notes:</div>
<Chip class="mb-2 mr-2 inline-block">From TBD</Chip>
<div class="inline-block rounded-full border border-neutral-100" on:click={() => isOpen.set(true)}>
<div
class="inline-block rounded-full border border-neutral-100"
on:click={() => isOpen.set(true)}>
<div class="flex h-7 w-7 items-center justify-center">
<i class="fa fa-plus cursor-pointer" />
</div>
@ -43,6 +40,5 @@
</div>
{#if $isOpen}
<Modal onEscape={() => isOpen.set(false)}>
</Modal>
<Modal onEscape={() => isOpen.set(false)}></Modal>
{/if}

View File

@ -1,8 +1,8 @@
<script lang="ts">
import {filter} from "@coracle.social/feeds"
import {filterFeed} from "@coracle.social/feeds"
import Calendar from "src/app/shared/Calendar.svelte"
export let address
</script>
<Calendar group={address} feed={filter({kinds: [31923], "#a": [address]})} />
<Calendar group={address} feed={filterFeed({kinds: [31923], "#a": [address]})} />

View File

@ -1,5 +1,5 @@
<script lang="ts">
import {filter} from "@coracle.social/feeds"
import {filterFeed} from "@coracle.social/feeds"
import Card from "src/partials/Card.svelte"
import Anchor from "src/partials/Anchor.svelte"
import Feed from "src/app/shared/Feed.svelte"
@ -15,4 +15,4 @@
<Anchor button accent on:click={createListing}>Create a listing</Anchor>
</Card>
<Feed hideControls feed={filter({kinds: [30402], "#a": [address]})} />
<Feed hideControls feed={filterFeed({kinds: [30402], "#a": [address]})} />

View File

@ -1,7 +1,7 @@
<script lang="ts">
import {without, last} from "ramda"
import {Tag, Tags, decodeAddress, isGroupAddress, getIdFilters} from "@coracle.social/util"
import {filter} from "@coracle.social/feeds"
import {filterFeed} from "@coracle.social/feeds"
import {noteKinds, generatePrivateKey} from "src/util/nostr"
import {fly} from "src/util/transition"
import FlexColumn from "src/partials/FlexColumn.svelte"
@ -100,7 +100,7 @@
shouldListen
hideControls
skipNetwork={isGroupAddress(decodeAddress(address))}
feed={filter({kinds: without([30402], noteKinds), "#a": [address]})} />
feed={filterFeed({kinds: without([30402], noteKinds), "#a": [address]})} />
{:else}
{#each feedEvents as event}
<div in:fly={{y: 20}}>

View File

@ -1,6 +1,7 @@
import {partition, prop, uniqBy} from "ramda"
import {batch} from "hurdak"
import {writable} from "@coracle.social/lib"
import type {Rumor} from "@coracle.social/util"
import {
Tags,
getIdOrAddress,
@ -15,6 +16,7 @@ import {generatePrivateKey} from "src/util/nostr"
import {LOCAL_RELAY_URL, reactionKinds, repostKinds} from "src/util/nostr"
import type {DisplayEvent, Event} from "src/engine"
import {
feedLoader as baseFeedLoader,
sortEventsDesc,
unwrapRepost,
isEventMuted,
@ -36,68 +38,6 @@ import {
user,
} from "src/engine"
const requestDvm = async ({kind, tags = [], onEvent}) => {
const sk = generatePrivateKey()
const event = await dvmRequest({kind, tags, sk, timeout: 3000})
if (event) {
onEvent(event)
}
}
const request = async ({relays, filters, onEvent}) => {
if (relays.length > 0) {
await load({filters, relays, onEvent})
} else {
await Promise.all(
getFilterSelections(filters).map(({relay, filters}) =>
load({filters, relays: [relay], onEvent}),
),
)
}
}
const getPubkeysForScope = (scope: string) => {
const $user = user.get()
switch (scope) {
case Scope.Self:
return $user ? [$user.pubkey] : []
case Scope.Follows:
return getFollowedPubkeys($user)
case Scope.Followers:
return Array.from(getFollowers($user.pubkey).map(p => p.pubkey))
default:
throw new Error(`Invalid scope ${scope}`)
}
}
const getPubkeysForWotRange = (min, max) => {
const pubkeys = []
const $user = user.get()
const thresholdMin = maxWot.get() * min
const thresholdMax = maxWot.get() * max
primeWotCaches($user.pubkey)
for (const person of people.get()) {
const score = getWotScore($user.pubkey, person.pubkey)
if (score >= thresholdMin && score <= thresholdMax) {
pubkeys.push(person.pubkey)
}
}
return pubkeys
}
export const feedLoader = new CoreFeedLoader({
request,
requestDvm,
getPubkeysForScope,
getPubkeysForWotRange,
})
export type FeedOpts = {
feed?: Feed
anchor?: string
@ -116,6 +56,7 @@ export type FeedOpts = {
export class FeedLoader {
done = false
loader: Promise<Loader>
feedLoader: CoreFeedLoader<Rumor>
notes = writable<DisplayEvent[]>([])
buffer = writable<DisplayEvent[]>([])
parents = new Map<string, DisplayEvent>()
@ -132,10 +73,8 @@ export class FeedLoader {
Object.assign(this.opts, opts)
// Use a custom feed loader so we can intercept the filters
const feedLoader = new CoreFeedLoader({
requestDvm,
getPubkeysForScope,
getPubkeysForWotRange,
this.feedLoader = new CoreFeedLoader({
...baseFeedLoader.options,
request: async ({relays, filters, onEvent}) => {
if (this.opts.includeReposts && !filters.some(f => f.authors?.length > 0)) {
filters = addRepostFilters(filters)
@ -167,7 +106,7 @@ export class FeedLoader {
},
})
this.loader = feedLoader.getLoader(this.opts.feed, {
this.loader = this.feedLoader.getLoader(this.opts.feed, {
onEvent: batch(300, async events => {
const keep = await this.discardEvents(events)
@ -198,7 +137,7 @@ export class FeedLoader {
let strict = false
// Be more tolerant when looking at communities
feedLoader.compiler.walk(this.opts.feed, ([type, ...feed]) => {
this.feedLoader.compiler.walk(this.opts.feed, ([type, ...feed]) => {
if (type === FeedType.Filter) {
strict = feed.some(f => f["#a"]?.find(a => isContextAddress(decodeAddress(a))))
}

View File

@ -1,11 +1,11 @@
<script lang="ts">
import {Scope, filter} from "@coracle.social/feeds"
import {Scope, filterFeed} from "@coracle.social/feeds"
import Calendar from "src/app/shared/Calendar.svelte"
import {env, loadGroupMessages} from "src/engine"
const feed = $env.FORCE_GROUP
? filter({kinds: [31923], "#a": [$env.FORCE_GROUP]})
: filter({kinds: [31923], scopes: [Scope.Self, Scope.Follows]})
? filterFeed({kinds: [31923], "#a": [$env.FORCE_GROUP]})
: filterFeed({kinds: [31923], scopes: [Scope.Self, Scope.Follows]})
if ($env.FORCE_GROUP) {
loadGroupMessages([$env.FORCE_GROUP])

View File

@ -1,7 +1,7 @@
<script lang="ts">
import {onMount} from "svelte"
import {getIdOrAddress, decodeAddress} from "@coracle.social/util"
import {filter} from "@coracle.social/feeds"
import {filterFeed} from "@coracle.social/feeds"
import {noteKinds} from "src/util/nostr"
import {fly} from "src/util/transition"
import FlexColumn from "src/partials/FlexColumn.svelte"
@ -39,7 +39,7 @@
hideControls
shouldListen
anchor={getIdOrAddress(event)}
feed={filter({kinds: noteKinds, "#a": [address]})} />
feed={filterFeed({kinds: noteKinds, "#a": [address]})} />
</FlexColumn>
</div>
{:else}

View File

@ -1,7 +1,7 @@
<script lang="ts">
import cx from "classnames"
import {Tags} from "@coracle.social/util"
import {Scope, filter, usingRelays} from "@coracle.social/feeds"
import {Scope, filterFeed, relayFeed} from "@coracle.social/feeds"
import {noteKinds} from "src/util/nostr"
import {theme} from "src/partials/state"
import Anchor from "src/partials/Anchor.svelte"
@ -11,10 +11,10 @@
import {session, canSign, lists, userLists} from "src/engine"
export let relays = []
export let feed = filter({kinds: noteKinds, scopes: [Scope.Follows]})
export let feed = filterFeed({kinds: noteKinds, scopes: [Scope.Follows]})
if (relays.length > 0) {
feed = usingRelays(relays, feed)
feed = relayFeed(relays, feed)
}
let key = Math.random()
@ -31,15 +31,15 @@
const urls = tags.values("r").valueOf()
if (authors.length > 0) {
feed = filter({kinds: noteKinds, authors})
feed = filterFeed({kinds: noteKinds, authors})
} else if (topics.length > 0) {
feed = filter({kinds: noteKinds, "#t": topics})
feed = filterFeed({kinds: noteKinds, "#t": topics})
} else {
feed = filter({kinds: noteKinds, scopes: [Scope.Follows]})
feed = filterFeed({kinds: noteKinds, scopes: [Scope.Follows]})
}
if (urls.length > 0) {
feed = usingRelays(urls, feed)
feed = relayFeed(urls, feed)
}
key = Math.random()

View File

@ -1,5 +1,5 @@
<script lang="ts">
import {Scope, filter} from "@coracle.social/feeds"
import {Scope, filterFeed} from "@coracle.social/feeds"
import Card from "src/partials/Card.svelte"
import Anchor from "src/partials/Anchor.svelte"
import Feed from "src/app/shared/Feed.svelte"
@ -7,8 +7,8 @@
import {env, canSign, loadGroupMessages} from "src/engine"
const feed = $env.FORCE_GROUP
? filter({kinds: [30402], "#a": [$env.FORCE_GROUP]})
: filter({kinds: [30402], scopes: [Scope.Self, Scope.Follows]})
? filterFeed({kinds: [30402], "#a": [$env.FORCE_GROUP]})
: filterFeed({kinds: [30402], scopes: [Scope.Self, Scope.Follows]})
const createListing = () => router.at("notes/create").qp({type: "listing"}).open()

View File

@ -1,7 +1,7 @@
<script lang="ts">
import {identity} from "ramda"
import {stripProtocol} from "@coracle.social/lib"
import {filter} from "@coracle.social/feeds"
import {filterFeed} from "@coracle.social/feeds"
import {info} from "src/util/logger"
import {ensureProto} from "src/util/misc"
import {noteKinds} from "src/util/nostr"
@ -31,7 +31,7 @@
export let npub
export let pubkey
export let relays = []
export let feed = filter({kinds: noteKinds, authors: [pubkey]})
export let feed = filterFeed({kinds: noteKinds, authors: [pubkey]})
const tabs = ["notes", "likes", "collections", "relays"].filter(identity)
const person = derivePerson(pubkey)
@ -99,7 +99,7 @@
{:else if activeTab === "notes"}
<Feed showGroup skipPlatform {feed} />
{:else if activeTab === "likes"}
<Feed showGroup hideControls feed={filter({kinds: [7], authors: [pubkey]})} />
<Feed showGroup hideControls feed={filterFeed({kinds: [7], authors: [pubkey]})} />
{:else if activeTab === "collections"}
<PersonCollections {pubkey} />
{:else if activeTab === "relays"}

View File

@ -1,6 +1,6 @@
<script lang="ts">
import {batch} from "hurdak"
import {filter, usingRelays} from "@coracle.social/feeds"
import {filterFeed, relayFeed} from "@coracle.social/feeds"
import {getAvgRating, noteKinds} from "src/util/nostr"
import Feed from "src/app/shared/Feed.svelte"
import Tabs from "src/partials/Tabs.svelte"
@ -10,13 +10,13 @@
import {deriveRelay, normalizeRelayUrl, displayRelay, getMinWot} from "src/engine"
export let url
export let feed = filter({kinds: noteKinds, min_wot: getMinWot()})
export let feed = filterFeed({kinds: noteKinds, min_wot: getMinWot()})
let reviews = []
let activeTab = "notes"
$: url = normalizeRelayUrl(url)
$: feed = usingRelays([url], feed)
$: feed = relayFeed([url], feed)
$: rating = getAvgRating(reviews)
const relay = deriveRelay(url)
@ -48,7 +48,7 @@
{#if activeTab === "reviews"}
<Feed
onEvent={onReview}
feed={filter({
feed={filterFeed({
kinds: [1986],
"#l": ["review/relay"],
"#r": [$relay.url],

View File

@ -1,5 +1,5 @@
<script lang="ts">
import {filter} from "@coracle.social/feeds"
import {filterFeed} from "@coracle.social/feeds"
import Feed from "src/app/shared/Feed.svelte"
import Heading from "src/partials/Heading.svelte"
import TopicActions from "src/app/shared/TopicActions.svelte"
@ -13,4 +13,4 @@
<TopicActions {topic} />
</div>
</div>
<Feed feed={filter({kinds: [1], "#t": [topic]})} />
<Feed feed={filterFeed({kinds: [1], "#t": [topic]})} />

View File

@ -1,13 +1,15 @@
import {assocPath, uniq} from "ramda"
import {seconds} from "hurdak"
import {now} from "@coracle.social/lib"
import {relayFeed, filterFeed} from "@coracle.social/feeds"
import {sessions} from "src/engine/session/state"
import {session} from "src/engine/session/derived"
import {loadPubkeys, subscribe, MultiCursor} from "src/engine/network/utils"
import {loadPubkeys, subscribe} from "src/engine/network/utils"
import {feedLoader} from "src/engine/events/requests"
import {hints} from "src/engine/relays/utils"
import {channels} from "./state"
export const loadAllMessages = ({reload = false} = {}) => {
export const loadAllMessages = async ({reload = false} = {}) => {
const {pubkey, nip24_messages_last_synced = 0} = session.get()
const since = reload ? 0 : Math.max(0, nip24_messages_last_synced - seconds(6, "hour"))
@ -18,20 +20,24 @@ export const loadAllMessages = ({reload = false} = {}) => {
loadPubkeys($channels.flatMap(c => c.members || []))
})
const relays = hints.User().getUrls()
const feed = relayFeed(
hints.User().getUrls(),
filterFeed({kinds: [4], authors: [pubkey], since}, {kinds: [4, 1059], "#p": [pubkey], since}),
)
const filters = [
{kinds: [4], authors: [pubkey], since},
{kinds: [4, 1059], "#p": [pubkey], since},
]
let exhausted = false
const cursor = new MultiCursor({
relaySelections: relays.map(relay => ({relay, filters})),
const load = await feedLoader.getLoader(feed, {
onExhausted: () => {
exhausted = true
},
})
return cursor.loadAll({
onComplete: unsubscribePubkeys,
})
while (!exhausted) {
await load(100)
}
unsubscribePubkeys()
}
export const listenForMessages = (pubkeys: string[]) => {

View File

@ -1,10 +1,19 @@
import {seconds} from "hurdak"
import type {Event} from "nostr-tools"
import {Worker} from "@coracle.social/lib"
import {giftWrapKinds} from "src/util/nostr"
import {session, nip44, nip04} from "src/engine/session/derived"
import type {Event} from "@coracle.social/util"
import {FeedLoader, Scope, relayFeed, filterFeed} from "@coracle.social/feeds"
import {giftWrapKinds, generatePrivateKey} from "src/util/nostr"
import {user, session, nip44, nip04} from "src/engine/session/derived"
import {people} from "src/engine/people/state"
import {
maxWot,
getWotScore,
primeWotCaches,
getFollowers,
getFollowedPubkeys,
} from "src/engine/people/utils"
import {hints} from "src/engine/relays/utils"
import {load, MultiCursor, publish} from "src/engine/network/utils"
import {load, publish, dvmRequest, getFilterSelections} from "src/engine/network/utils"
export const loadDeletes = () => {
const {pubkey, deletes_last_synced = 0} = session.get()
@ -49,15 +58,77 @@ export const loadGiftWrap = () => {
}
}
export const sync = (fromUrl, toUrl, filters) => {
export const feedLoader = new FeedLoader({
request: async ({relays, filters, onEvent}) => {
if (relays.length > 0) {
await load({filters, relays, onEvent})
} else {
await Promise.all(
getFilterSelections(filters).map(({relay, filters}) =>
load({filters, relays: [relay], onEvent}),
),
)
}
},
requestDvm: async ({kind, tags = [], onEvent}) => {
const sk = generatePrivateKey()
const event = await dvmRequest({kind, tags, sk, timeout: 3000})
if (event) {
onEvent(event)
}
},
getPubkeysForScope: (scope: string) => {
const $user = user.get()
switch (scope) {
case Scope.Self:
return $user ? [$user.pubkey] : []
case Scope.Follows:
return getFollowedPubkeys($user)
case Scope.Followers:
return Array.from(getFollowers($user.pubkey).map(p => p.pubkey))
default:
throw new Error(`Invalid scope ${scope}`)
}
},
getPubkeysForWotRange: (min, max) => {
const pubkeys = []
const $user = user.get()
const thresholdMin = maxWot.get() * min
const thresholdMax = maxWot.get() * max
primeWotCaches($user.pubkey)
for (const person of people.get()) {
const score = getWotScore($user.pubkey, person.pubkey)
if (score >= thresholdMin && score <= thresholdMax) {
pubkeys.push(person.pubkey)
}
}
return pubkeys
},
})
export const sync = async (fromUrl, toUrl, filters) => {
const worker = new Worker<Event>()
worker.addGlobalHandler(event => publish({event, relays: [toUrl]}))
const cursor = new MultiCursor({
relaySelections: [{relay: fromUrl, filters}],
const feed = relayFeed([fromUrl], filterFeed(...filters))
let exhausted = false
const load = await feedLoader.getLoader(feed, {
onEvent: worker.push,
onExhausted: () => {
exhausted = true
},
})
cursor.loadAll()
while (!exhausted) {
await load(100)
}
}

View File

@ -1,11 +1,12 @@
import type {Event} from "nostr-tools"
import {Tags, hasValidSignature} from "@coracle.social/util"
import {NetworkContext} from "@coracle.social/network"
import {FeedLoader, Scope} from "@coracle.social/feeds"
import {prop, filter, identity, uniq, sortBy} from "ramda"
import {LOCAL_RELAY_URL} from "src/util/nostr"
import {LOCAL_RELAY_URL, generatePrivateKey} from "src/util/nostr"
import {Storage, LocalStorageAdapter, IndexedDBAdapter, sortByPubkeyWhitelist} from "./core"
import {_lists} from "./lists"
import {people} from "./people"
import {people, getWotScore, primeWotCaches, maxWot, getFollowedPubkeys} from "./people"
import {relays} from "./relays"
import {groups, groupSharedKeys, groupAdminKeys, groupRequests, groupAlerts} from "./groups"
import {_labels} from "./labels"
@ -13,7 +14,7 @@ import {topics} from "./topics"
import {deletes, seen, _events, isDeleted, publishes} from "./events"
import {pubkey, sessions} from "./session"
import {channels} from "./channels"
import {onAuth, getExecutor, tracker} from "./network"
import {onAuth, getExecutor, tracker, dvmRequest, load, getFilterSelections} from "./network"
export * from "./core"
export * from "./auth"

View File

@ -1,198 +0,0 @@
import {mergeLeft, pluck, min, max, identity, sortBy} from "ramda"
import {first, sleep} from "hurdak"
import {now, writable} from "@coracle.social/lib"
import type {Filter} from "@coracle.social/util"
import {guessFilterDelta} from "@coracle.social/util"
import type {Subscription} from "@coracle.social/network"
import type {Event} from "src/engine/events/model"
import {sortEventsDesc} from "src/engine/events/utils"
import type {RelayFilters} from "src/engine/network/utils"
import {subscribe, LOAD_OPTS} from "./executor"
export type CursorOpts = {
relay: string
filters: Filter[]
onEvent?: (e: Event) => void
}
export class Cursor {
delta: number
since: number
until: number
minSince: number
buffer: Event[] = []
loading = false
done = false
constructor(readonly opts: CursorOpts) {
// If we're looking for something old, don't pointlessly ask for new stuff
const untils = pluck("until", opts.filters).filter(identity)
const maxUntil = untils.length === opts.filters.length ? untils.reduce(max, 0) : now()
const sinces = pluck("since", opts.filters).filter(identity)
const minSince = sinces.length === opts.filters.length ? sinces.reduce(min, now()) : 0
this.delta = guessFilterDelta(opts.filters)
this.since = maxUntil - this.delta
this.until = maxUntil
this.minSince = minSince
}
load(n: number) {
const limit = n - this.buffer.length
// If we're already loading, or we have enough buffered, do nothing
if (this.done || this.loading || limit <= 0) {
return null
}
const {since, until} = this
const {relay, filters, onEvent} = this.opts
this.loading = true
let count = 0
const onPageComplete = () => {
this.loading = false
// Relays can't be relied upon to return events in descending order, do exponential
// windowing to ensure we get the most recent stuff on first load, but eventually find it all
if (count === 0) {
this.delta *= 10
}
if (this.since === this.minSince) {
this.done = true
} else {
this.since = Math.max(this.minSince, this.since - this.delta)
}
}
const pageFilters = filters
// Remove filters that don't fit our window
.filter(f => {
const filterSince = f.since || 0
const filterUntil = f.until || now()
return filterSince < until && filterUntil > since
})
// Modify the filters to define our window
.map(mergeLeft({until, limit, since}))
if (pageFilters.length === 0) {
onPageComplete()
return null
}
const sub = subscribe({
...LOAD_OPTS,
relays: [relay],
immediate: true,
skipCache: true,
filters: pageFilters,
onComplete: onPageComplete,
onEvent: (event: Event) => {
this.until = Math.min(until, event.created_at) - 1
this.buffer = sortEventsDesc([...this.buffer, event])
count += 1
onEvent?.(event)
},
})
return sub
}
take(n = Infinity) {
return this.buffer.splice(0, n)
}
count() {
return this.buffer.length
}
peek() {
return this.buffer[0]
}
pop() {
return first(this.take(1))
}
}
export type MultiCursorOpts = {
relaySelections: RelayFilters[]
onEvent?: (e: Event) => void
}
export class MultiCursor {
bufferFactor = 4
cursors: Cursor[]
constructor(readonly opts: MultiCursorOpts) {
this.cursors = opts.relaySelections.map(
({relay, filters}) => new Cursor({relay, filters, onEvent: opts.onEvent}),
)
}
load(limit: number) {
return this.cursors.map(c => c.load(limit)).filter(identity)
}
done() {
return this.cursors.every(c => c.done)
}
count() {
return this.cursors.reduce((n, c) => n + c.buffer.length, 0)
}
take(n: number): [Subscription[], Event[]] {
const events = []
while (events.length < n) {
// Find the most recent event available so that they're sorted
const [cursor] = sortBy(
c => -c.peek().created_at,
this.cursors.filter(c => c.peek()),
)
if (!cursor) {
break
}
events.push(cursor.pop())
}
// Preload the next page
const subs = this.load(n * this.bufferFactor)
return [subs, events]
}
loadAll = ({onComplete = null} = {}) => {
const loading = writable(true)
const stop = () => {
loading.set(false)
onComplete?.()
}
setTimeout(async () => {
while (loading.get()) {
this.take(250)
await sleep(2000)
if (this.done()) {
stop()
}
}
})
return {stop, loading}
}
}

View File

@ -1,4 +1,3 @@
export * from "./cursor"
export * from "./dvms"
export * from "./executor"
export * from "./filters"