Move nip 65 over

This commit is contained in:
Jonathan Staab 2023-08-31 15:36:07 -07:00
parent 35ae7ab1f8
commit fcf5faa960
15 changed files with 1136 additions and 7 deletions

View File

@ -86,6 +86,11 @@ export type Relay = {
info?: RelayInfo info?: RelayInfo
} }
export enum RelayMode {
Read = "read",
Write = "write",
}
export type RelayPolicyEntry = { export type RelayPolicyEntry = {
url: string url: string
read: boolean read: boolean

View File

@ -1,3 +1,4 @@
import "./nip02" import "./nip02"
import "./nip65"
export * from "./core" export * from "./core"

View File

@ -0,0 +1,84 @@
import {uniqBy, prop, inc} from "ramda"
import {tryJson, now} from "src/util/misc"
import {warn} from "src/util/logger"
import {normalizeRelayUrl, isShareableRelay, Tags} from "src/util/nostr"
import type {RelayPolicyEntry} from "src/engine2/model"
import {RelayMode} from "src/engine2/model"
import {relays, relayPolicies} from "src/engine2/state"
import {projections} from "src/engine2/projections/core"
const addRelay = (url: string) => {
if (isShareableRelay(url)) {
const relay = relays.key(url).get()
relays.key(url).merge({
count: inc(relay?.count || 0),
first_seen: relay?.first_seen || now(),
info: {
last_checked: 0,
},
})
}
}
const setPolicy = (
{pubkey, created_at}: {pubkey: string; created_at: number},
relays: RelayPolicyEntry[]
) => {
if (relays?.length > 0) {
if (created_at < relayPolicies.key(pubkey).get()?.created_at) {
return
}
relayPolicies.key(pubkey).merge({
created_at,
updated_at: now(),
relays: uniqBy(prop("url"), relays).map((relay: RelayPolicyEntry) => {
addRelay(relay.url)
return {read: true, write: true, ...relay}
}),
})
}
}
projections.addHandler(2, e => {
addRelay(normalizeRelayUrl(e.content))
})
projections.addHandler(3, e => {
setPolicy(
e,
tryJson<RelayPolicyEntry[]>(() => {
return Object.entries(JSON.parse(e.content || ""))
.filter(([url]) => isShareableRelay(url))
.map(([url, conditions]) => {
// @ts-ignore
const write = ![false, "!"].includes(conditions.write)
// @ts-ignore
const read = ![false, "!"].includes(conditions.read)
return {url: normalizeRelayUrl(url), write, read}
})
}) as RelayPolicyEntry[]
)
})
projections.addHandler(10002, e => {
setPolicy(
e,
Tags.from(e)
.type("r")
.all()
.map(([_, url, mode]) => {
const write = !mode || mode === RelayMode.Write
const read = !mode || mode === RelayMode.Read
if (!write && !read) {
warn(`Encountered unknown relay mode: ${mode}`)
}
return {url: normalizeRelayUrl(url), write, read}
})
)
})

View File

@ -1,2 +1,3 @@
export * from "./session" export * from "./session"
export * from "./nip02" export * from "./nip02"
export * from "./nip65"

View File

@ -0,0 +1,162 @@
import {sortBy, pluck, uniq, nth, prop, last} from "ramda"
import {chain} from "hurdak"
import {fuzzy} from "src/util/misc"
import {findReplyId, findRootId, isShareableRelay, Tags} from "src/util/nostr"
import {derived} from "src/engine2/util/store"
import type {Event, Relay, RelayInfo} from "src/engine2/model"
import {RelayMode} from "src/engine2/model"
import {env, pool, relays, relayPolicies} from "src/engine2/state"
import {stateKey} from "src/engine2/queries/session"
export const relayIsLowQuality = (url: string) =>
pool.get(url, {autoConnect: false})?.meta?.quality < 0.6
export const getRelay = (url: string): Relay => relays.key(url).get() || {url}
export const getRelayInfo = (url: string): RelayInfo => getRelay(url)?.info || {}
export const displayRelay = ({url}: Relay) => last(url.split("://"))
export const searchRelays = derived(relays, $relays => fuzzy($relays.values(), {keys: ["url"]}))
export const getSearchRelays = () => {
const searchableRelayUrls = relays
.get()
.filter(r => (r.info?.supported_nips || []).includes(50))
.map(prop("url"))
return uniq(env.get().SEARCH_RELAYS.concat(searchableRelayUrls)).slice(0, 8)
}
export const getPubkeyRelays = (pubkey: string, mode: string = null) => {
const relays = relayPolicies.key(pubkey).get()?.relays || []
return mode ? relays.filter(prop(mode)) : relays
}
export const getPubkeyRelayUrls = (pubkey: string, mode: string = null) =>
pluck("url", getPubkeyRelays(pubkey, mode))
export const getUserRelays = (mode: string = null) => getPubkeyRelays(stateKey.get(), mode)
export const getUserRelayUrls = (mode: string = null) => pluck("url", getUserRelays(mode))
// Smart relay selection
//
// From Mike Dilger:
// 1) Other people's write relays — pull events from people you follow,
// including their contact lists
// 2) Other people's read relays — push events that tag them (replies or just tagging).
// However, these may be authenticated, use with caution
// 3) Your write relays —- write events you post to your microblog feed for the
// world to see. ALSO write your contact list. ALSO read back your own contact list.
// 4) Your read relays —- read events that tag you. ALSO both write and read
// client-private data like client configuration events or anything that the world
// doesn't need to see.
// 5) Advertise relays — write and read back your own relay list
export const selectHints = (limit: number | null, hints: Iterable<string>) => {
const seen = new Set()
const ok = []
const bad = []
for (const url of chain(hints, getUserRelayUrls(RelayMode.Read), env.get().DEFAULT_RELAYS)) {
if (seen.has(url)) {
continue
}
seen.add(url)
// Filter out relays that appear to be broken or slow
if (!isShareableRelay(url)) {
bad.push(url)
} else if (relayIsLowQuality(url)) {
bad.push(url)
} else {
ok.push(url)
}
if (limit && ok.length > limit) {
break
}
}
// If we don't have enough hints, use the broken ones
return ok.concat(bad).slice(0, limit || Infinity)
}
export const hintSelector =
(generateHints: (...args: any[]) => Iterable<string>) =>
(limit: number, ...args: any[]) =>
selectHints(limit, generateHints(...args))
export const getPubkeyHints = hintSelector(function* (pubkey: string, mode: RelayMode) {
yield* getPubkeyRelayUrls(pubkey, mode)
})
export const getEventHints = hintSelector(function* (event: Event) {
yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Write)
})
// If we're looking for an event's children, the read relays the author has
// advertised would be the most reliable option, since well-behaved clients
// will write replies there.
export const getReplyHints = hintSelector(function* (event) {
yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read)
})
// If we're looking for an event's parent, tags are the most reliable hint,
// but we can also look at where the author of the note reads from
export const getParentHints = hintSelector(function* (event) {
const parentId = findReplyId(event)
yield* Tags.from(event).equals(parentId).relays()
yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read)
})
export const getRootHints = hintSelector(function* (event) {
const rootId = findRootId(event)
yield* Tags.from(event).equals(rootId).relays()
yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read)
})
// If we're replying or reacting to an event, we want the author to know, as well as
// anyone else who is tagged in the original event or the reply. Get everyone's read
// relays. Limit how many per pubkey we publish to though. We also want to advertise
// our content to our followers, so publish to our write relays as well.
export const getPublishHints = (limit: number, event: Event, extraRelays: string[] = []) => {
const pubkeys = Tags.from(event).type("p").values().all()
const hintGroups = pubkeys.map(pubkey => getPubkeyRelayUrls(pubkey, RelayMode.Read))
const authorRelays = getPubkeyRelayUrls(event.pubkey, RelayMode.Write)
return mergeHints(limit, hintGroups.concat([extraRelays, authorRelays]))
}
export const mergeHints = (limit: number, groups: string[][]) => {
const scores = {} as Record<string, any>
for (const hints of groups) {
hints.forEach((hint, i) => {
const score = 1 / (i + 1) / hints.length
if (!scores[hint]) {
scores[hint] = {score: 0, count: 0}
}
scores[hint].score += score
scores[hint].count += 1
})
}
// Use the log-sum-exp and a weighted sum
for (const score of Object.values(scores)) {
const weight = Math.log(groups.length / score.count)
score.score = weight + Math.log1p(Math.exp(score.score - score.count))
}
return sortBy(([hint, {score}]) => score, Object.entries(scores))
.map(nth(0))
.slice(0, limit)
}

View File

@ -1,4 +1,4 @@
import {find, whereEq} from "ramda" import {find, defaultTo, whereEq} from "ramda"
import {derived} from "src/engine2/util/store" import {derived} from "src/engine2/util/store"
import {pubkey, keys} from "src/engine2/state" import {pubkey, keys} from "src/engine2/state"
import {prepareNdk, ndkInstances} from "./ndk" import {prepareNdk, ndkInstances} from "./ndk"
@ -6,6 +6,8 @@ import {Signer} from "./signer"
import {Crypto} from "./crypto" import {Crypto} from "./crypto"
import {Wrapper} from "./wrapper" import {Wrapper} from "./wrapper"
export const stateKey = pubkey.derived(defaultTo("anonymous"))
export const user = derived([pubkey, keys], ([$pubkey, $keys]) => { export const user = derived([pubkey, keys], ([$pubkey, $keys]) => {
return find(whereEq({pubkey: $pubkey}), $keys) return find(whereEq({pubkey: $pubkey}), $keys)
}) })

View File

@ -0,0 +1,330 @@
import {matchFilters} from "nostr-tools"
import {throttle} from "throttle-debounce"
import {omit, find, pluck, flatten, without, groupBy, sortBy, prop, uniqBy, reject} from "ramda"
import {ensurePlural, batch, chunk} from "hurdak"
import {now, pushToKey} from "src/util/misc"
import {findReplyAndRootIds, findReplyId, findRootId, Tags, noteKinds} from "src/util/nostr"
import {collection} from "src/engine2/util/store"
import type {Collection} from "src/engine2/util/store"
import type {Event, DisplayEvent, Filter} from "src/engine2/model"
import {settings, env} from "src/engine2/state"
import {mergeHints, getReplyHints, getRootHints, getParentHints} from "src/engine2/queries"
import {Subscription} from "./subscription"
import {loadPubkeys} from "./pubkeys"
const fromDisplayEvent = (e: DisplayEvent): Event =>
omit(["zaps", "likes", "replies", "matchesFilter"], e)
export type ContextLoaderOpts = {
isMuted: (e: Event) => boolean
relays?: string[]
filters?: Filter[]
onEvent?: (e: Event) => void
shouldLoadParents?: boolean
}
export class ContextLoader {
stopped: boolean
data: Collection<Event>
seen: Set<string>
subs: Record<string, Array<{close: () => void}>>
constructor(readonly opts: ContextLoaderOpts) {
this.stopped = false
this.data = collection<Event>("id")
this.seen = new Set()
this.subs = {
context: [],
listeners: [],
}
}
// Utils
addSubs(key: string, subs: Array<Subscription>) {
for (const sub of ensurePlural(subs)) {
this.subs[key].push(sub)
sub.on("close", () => {
this.subs[key] = without([sub], this.subs[key])
})
}
}
getAllSubs() {
return flatten(Object.values(this.subs))
}
getReplyKinds() {
const {ENABLE_ZAPS} = env.get()
return ENABLE_ZAPS ? [1, 7, 9735] : [1, 7]
}
matchFilters(e: Event) {
return !this.opts.filters || matchFilters(ensurePlural(this.opts.filters), e)
}
isTextNote(e: Event) {
return noteKinds.includes(e.kind)
}
isMissingParent = (e: Event) => {
const parentId = findReplyId(e)
return parentId && this.matchFilters(e) && !this.data.key(parentId).exists()
}
preprocessEvents = (events: Event[]) => {
events = reject((e: Event) => this.seen.has(e.id) || this.opts.isMuted(e), events)
for (const event of events) {
this.seen.add(event.id)
}
return events
}
getRelayLimit() {
return settings.get().relay_limit
}
mergeHints(groups: string[][]) {
if (this.opts.relays) {
return this.opts.relays
}
return mergeHints(this.getRelayLimit(), groups)
}
applyContext = (notes: Event[], {substituteParents = false, alreadySeen = new Set()} = {}) => {
const contextById = {} as Record<string, Event>
const zapsByParentId = {} as Record<string, Event[]>
const reactionsByParentId = {} as Record<string, Event[]>
const repliesByParentId = {} as Record<string, Event[]>
for (const event of this.data.get().concat(notes)) {
if (contextById[event.id]) {
continue
}
contextById[event.id] = event
const parentId = findReplyId(event)
if (event.kind === 9735) {
pushToKey(zapsByParentId, parentId, event)
} else if (event.kind === 7) {
pushToKey(reactionsByParentId, parentId, event)
} else {
pushToKey(repliesByParentId, parentId, event)
}
}
const annotate = (note: Event): DisplayEvent => {
const {replies = [], reactions = [], zaps = []} = note as DisplayEvent
const combinedZaps = zaps.concat(zapsByParentId[note.id] || [])
const combinedReactions = reactions.concat(reactionsByParentId[note.id] || [])
const combinedReplies = (replies as Event[])
.concat(repliesByParentId[note.id] || [])
.map(annotate)
return {
...note,
zaps: uniqBy(prop("id"), combinedZaps),
reactions: uniqBy(prop("id"), combinedReactions),
replies: sortBy((e: DisplayEvent) => -e.created_at, uniqBy(prop("id"), combinedReplies)),
matchesFilter:
!alreadySeen.has(note.id) &&
(this.matchFilters(note) || Boolean(find(prop("matchesFilter"), combinedReplies))),
}
}
if (substituteParents) {
// We may have loaded a reply from a follower to someone we muted
notes = reject(
this.opts.isMuted,
notes.map(note => {
for (let i = 0; i < 2; i++) {
const parent = contextById[findReplyId(note)]
if (!parent) {
break
}
note = parent
}
return note
})
)
}
return uniqBy(prop("id"), notes).map(annotate)
}
// Context loaders
loadPubkeys = (events: Event[]) => {
loadPubkeys(
events.filter(this.isTextNote).flatMap((e: Event) => Tags.from(e).pubkeys().concat(e.pubkey))
)
}
loadParents = (events: Event[]) => {
if (this.stopped) {
return
}
const parentsInfo = events.flatMap((e: Event) => {
const info = []
const {root, reply} = findReplyAndRootIds(e)
if (reply && !this.seen.has(reply)) {
info.push({id: reply, hints: getParentHints(this.getRelayLimit(), e)})
}
if (root && !this.seen.has(root)) {
info.push({id: findRootId(e), hints: getRootHints(this.getRelayLimit(), e)})
}
return info
})
if (parentsInfo.length > 0) {
const sub = new Subscription({
timeout: 5000,
filters: [{ids: pluck("id", parentsInfo)}],
relays: this.mergeHints(pluck("hints", parentsInfo)),
})
sub.on(
"event",
batch(100, (context: Event[]) => this.addContext(context, {depth: 2}))
)
this.addSubs("context", [sub])
}
}
loadContext = batch(300, (eventGroups: any) => {
if (this.stopped) {
return
}
const groupsByDepth = groupBy(prop("depth"), eventGroups)
for (const [depthStr, groups] of Object.entries(groupsByDepth)) {
const depth = parseInt(depthStr)
if (depth === 0) {
continue
}
const events = uniqBy(
prop("id"),
flatten(pluck("events", groups as any[])).filter(this.isTextNote)
) as Event[]
for (const c of chunk(256, events)) {
const sub = new Subscription({
timeout: 5000,
relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))),
filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c as Event[])}],
})
sub.on(
"event",
batch(100, (context: Event[]) => this.addContext(context, {depth: depth - 1}))
)
this.addSubs("context", [sub])
}
}
})
listenForContext = throttle(5000, () => {
if (this.stopped) {
return
}
this.subs.listeners.forEach(sub => sub.close())
const contextByParentId = groupBy(findReplyId, this.data.get())
const findNotes = (events: Event[]): Event[] =>
uniqBy(
prop("id"),
events
.filter(this.isTextNote)
.flatMap(e => findNotes(contextByParentId[e.id] || []).concat(e))
)
for (const c of chunk(256, findNotes(this.data.get()))) {
const sub = new Subscription({
relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))),
filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c), since: now()}],
})
sub.on(
"event",
batch(100, (context: Event[]) => this.addContext(context, {depth: 2}))
)
this.addSubs("listeners", [sub])
}
})
// Adders
addContext = (newEvents: Event[], {shouldLoadParents = false, depth = 0}) => {
const events = this.preprocessEvents(newEvents)
this.data.update($context => $context.concat(events))
this.loadPubkeys(events)
if (shouldLoadParents) {
this.loadParents(events)
}
this.loadContext({events, depth})
this.listenForContext()
if (this.opts.onEvent) {
for (const event of events) {
this.opts.onEvent(event)
}
}
}
hydrate(notes: Partial<DisplayEvent>[], depth) {
const context: Event[] = []
const addContext = (note: Partial<DisplayEvent>) => {
// Only add to context if it's a real event
if (note.sig) {
context.push(fromDisplayEvent(note as DisplayEvent))
}
note.zaps?.forEach(zap => context.push(zap))
note.reactions?.forEach(reaction => context.push(reaction))
note.replies?.forEach(reply => addContext(reply))
}
notes.forEach(addContext)
this.addContext(context, {depth})
}
// Control
stop() {
this.stopped = true
for (const sub of this.getAllSubs()) {
sub.close()
}
}
}

View File

@ -0,0 +1,140 @@
import {mergeRight, identity, sortBy} from "ramda"
import {seconds, first} from "hurdak"
import {now} from "src/util/misc"
import {EPOCH} from "src/util/nostr"
import type {Filter, Event} from "src/engine2/model"
import {Subscription} from "./subscription"
export type CursorOpts = {
relay: string
filters: Filter[]
onEvent?: (e: Event) => void
}
export class Cursor {
until = now()
delta = seconds(10, "minute")
since = now() - this.delta
buffer: Event[] = []
loading = false
done = false
constructor(readonly opts: CursorOpts) {}
load(n: number) {
const limit = n - this.buffer.length
// If we're already loading, or we have enough buffered, do nothing
if (this.done || this.loading || limit <= 0) {
return null
}
const {since, until} = this
const {relay, filters, onEvent} = this.opts
this.loading = true
let count = 0
const sub = new Subscription({
timeout: 5000,
relays: [relay],
filters: filters.map(mergeRight({until, limit, since})),
})
sub.on("event", (event: Event) => {
this.until = Math.min(until, event.created_at) - 1
this.buffer.push(event)
count += 1
onEvent?.(event)
})
sub.on("close", () => {
this.loading = false
// Relays can't be relied upon to return events in descending order, do exponential
// windowing to ensure we get the most recent stuff on first load, but eventually find it all
if (count === 0) {
this.delta *= 10
}
if (this.since <= EPOCH) {
this.done = true
}
this.since -= this.delta
})
return sub
}
take(n = Infinity) {
return this.buffer.splice(0, n)
}
count() {
return this.buffer.length
}
peek() {
return this.buffer[0]
}
pop() {
return first(this.take(1))
}
}
export class MultiCursor {
bufferFactor = 4
seen_on: Map<string, string[]>
cursors: Cursor[]
constructor(cursors: Cursor[]) {
this.seen_on = new Map()
this.cursors = cursors
}
load(limit: number) {
return this.cursors.map(c => c.load(limit)).filter(identity)
}
count() {
return this.cursors.reduce((n, c) => n + c.buffer.length, 0)
}
take(n: number): [Subscription[], Event[]] {
const events = []
while (events.length < n) {
// Find the most recent event available so that they're sorted
const [cursor] = sortBy(
c => -c.peek().created_at,
this.cursors.filter(c => c.peek())
)
if (!cursor) {
break
}
const event = cursor.pop()
// Merge seen_on via mutation so it applies to future. If we've already
// seen the event, we're also done and we don't need to add it to our buffer
if (this.seen_on.has(event.id) && !this.seen_on.get(event.id).includes(event.seen_on[0])) {
this.seen_on.get(event.id).push(event.seen_on[0])
} else {
this.seen_on.set(event.id, event.seen_on)
events.push(event)
}
}
// Preload the next page
const subs = this.load(n * this.bufferFactor)
return [subs, events]
}
}

View File

@ -3,8 +3,7 @@ import {Plex, Relays, Executor} from "paravel"
import {error, warn} from "src/util/logger" import {error, warn} from "src/util/logger"
import {normalizeRelayUrl} from "src/util/nostr" import {normalizeRelayUrl} from "src/util/nostr"
import {writable} from "src/engine2/util" import {writable} from "src/engine2/util"
import {env, settings} from "src/engine2/state" import {env, pool, settings} from "src/engine2/state"
import {pool} from "./pool"
export const authHandler = writable(null) export const authHandler = writable(null)

View File

@ -0,0 +1,221 @@
import {partition, reject, identity, uniqBy, pluck, sortBy, without, any, prop, assoc} from "ramda"
import {ensurePlural, union, seconds, doPipe, throttle, batch} from "hurdak"
import {now, race} from "src/util/misc"
import {findReplyId} from "src/util/nostr"
import type {Event, DisplayEvent, Filter} from "src/engine2/model"
import {writable} from "src/engine2/util/store"
import {Subscription} from "./subscription"
import {getUrls} from "./executor"
import {Cursor, MultiCursor} from "./cursor"
import {ContextLoader} from "./context"
export type FeedOpts = {
depth: number
relays: string[]
filters: Filter[]
isMuted: (e: Event) => boolean
onEvent?: (e: Event) => void
shouldLoadParents?: boolean
shouldUseNip65?: boolean
}
export class FeedLoader {
since = now()
stopped = false
context: ContextLoader
subs: Array<{close: () => void}> = []
feed = writable<DisplayEvent[]>([])
stream = writable<Event[]>([])
deferred: Event[] = []
cursor: MultiCursor
ready: Promise<void>
constructor(readonly opts: FeedOpts) {
const urls = getUrls(opts.relays)
this.context = new ContextLoader({
relays: opts.shouldUseNip65 ? null : urls,
filters: opts.filters,
isMuted: opts.isMuted,
onEvent: event => {
opts.onEvent?.(event)
this.updateFeed()
},
})
// No point in subscribing if we have an end date
if (!any(prop("until"), ensurePlural(opts.filters) as any[])) {
const sub = new Subscription({
relays: urls,
filters: opts.filters.map(assoc("since", this.since)),
})
sub.on(
"event",
batch(1000, (context: Event[]) => {
this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth})
this.stream.update($stream => $stream.concat(context))
})
)
this.addSubs([sub])
}
this.cursor = new MultiCursor(
urls.map(
relay =>
new Cursor({
relay,
filters: opts.filters,
onEvent: batch(100, (context: Event[]) =>
this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth})
),
})
)
)
const subs = this.cursor.load(50)
this.addSubs(subs)
// Wait until a good number of subscriptions have completed to reduce the chance of
// out of order notes
this.ready = race(0.2, pluck("result", subs))
}
// Control
addSubs(subs: Array<Subscription>) {
for (const sub of ensurePlural(subs)) {
this.subs.push(sub)
sub.on("close", () => {
this.subs = without([sub], this.subs)
})
}
}
stop() {
this.stopped = true
this.context.stop()
for (const sub of this.subs) {
sub.close()
}
}
// Feed building
addToFeed = (notes: Event[]) => {
const getChildIds = note => note.replies.flatMap(child => [child.id, getChildIds(child)])
this.feed.update($feed => {
// Avoid showing the same note twice, even if it's once as a parent and once as a child
const feedIds = new Set(pluck("id", $feed))
const feedChildIds = new Set($feed.flatMap(getChildIds))
const feedParentIds = new Set($feed.map(findReplyId).filter(identity))
return uniqBy(
prop("id"),
$feed.concat(
this.context.applyContext(
sortBy(
e => -e.created_at,
reject(
(e: Event) =>
feedIds.has(findReplyId(e)) || feedChildIds.has(e.id) || feedParentIds.has(e.id),
notes
)
),
{
substituteParents: true,
alreadySeen: union(feedIds, feedChildIds),
}
)
)
)
})
}
updateFeed = throttle(500, () => {
this.feed.update($feed => this.context.applyContext($feed))
})
// Loading
async load(n) {
await this.ready
const [subs, notes] = this.cursor.take(n)
const deferred = this.deferred.splice(0)
this.addSubs(subs)
const ok = doPipe(notes.concat(deferred), [
this.deferReactions,
this.deferOrphans,
this.deferAncient,
])
this.addToFeed(ok)
}
loadStream() {
this.stream.update($stream => {
this.feed.update($feed => {
return uniqBy(
prop("id"),
this.context
.applyContext($stream, {
substituteParents: true,
})
.concat($feed)
)
})
return []
})
}
deferReactions = (notes: Event[]) => {
const [defer, ok] = partition(
e => !this.context.isTextNote(e) && this.context.isMissingParent(e),
notes
)
setTimeout(() => {
// Defer again if we still don't have a parent, it's pointless to show an orphaned reaction
const [orphans, ready] = partition(this.context.isMissingParent, defer)
this.addToFeed(ready)
this.deferred = this.deferred.concat(orphans)
}, 1500)
return ok
}
deferOrphans = (notes: Event[]) => {
// If something has a parent id but we haven't found the parent yet, skip it until we have it.
const [defer, ok] = partition(
e => this.context.isTextNote(e) && this.context.isMissingParent(e),
notes
)
setTimeout(() => this.addToFeed(defer), 1500)
return ok
}
deferAncient = (notes: Event[]) => {
// Sometimes relays send very old data very quickly. Pop these off the queue and re-add
// them after we have more timely data. They still might be relevant, but order will still
// be maintained since everything before the cutoff will be deferred the same way.
const since = now() - seconds(6, "hour")
const [defer, ok] = partition(e => e.created_at < since, notes)
setTimeout(() => this.addToFeed(defer), 4000)
return ok
}
}

View File

@ -1,4 +1,5 @@
export * from "./pool"
export * from "./executor" export * from "./executor"
export * from "./publisher" export * from "./publisher"
export * from "./subscription" export * from "./subscription"
export * from "./cursor"
export * from "./context"

View File

@ -1,3 +0,0 @@
import {Pool} from "paravel"
export const pool = new Pool()

View File

@ -0,0 +1,86 @@
import {without, pluck, uniq} from "ramda"
import {chunk, seconds, ensurePlural} from "hurdak"
import {personKinds, appDataKeys} from "src/util/nostr"
import {now} from "src/util/misc"
import type {Filter} from "src/engine2/model"
import {profiles, settings} from "src/engine2/state"
import {mergeHints, getPubkeyHints} from "src/engine2/queries"
import {Subscription} from "./subscription"
export type LoadPeopleOpts = {
relays?: string[]
kinds?: number[]
force?: boolean
}
export const attemptedPubkeys = new Set()
export const getStalePubkeys = (pubkeys: string[]) => {
const stale = new Set()
const since = now() - seconds(3, "hour")
for (const pubkey of pubkeys) {
if (stale.has(pubkey) || attemptedPubkeys.has(pubkey)) {
continue
}
attemptedPubkeys.add(pubkey)
if (profiles.key(pubkey).get()?.updated_at || 0 > since) {
continue
}
stale.add(pubkey)
}
return Array.from(stale)
}
export const loadPubkeys = async (
pubkeyGroups: string | string[],
{relays, force, kinds = personKinds}: LoadPeopleOpts = {}
) => {
const rawPubkeys = ensurePlural(pubkeyGroups).reduce((a, b) => a.concat(b), [])
const pubkeys = force ? uniq(rawPubkeys) : getStalePubkeys(rawPubkeys)
const getChunkRelays = (chunk: string[]) => {
if (relays?.length > 0) {
return relays
}
const {relay_limit: limit} = settings.get()
return mergeHints(
limit,
chunk.map(pubkey => getPubkeyHints(limit, pubkey, "write"))
)
}
const getChunkFilters = (chunk: string[]) => {
const filter = [] as Filter[]
filter.push({kinds: without([30078], kinds), authors: chunk})
// Add a separate filter for app data so we're not pulling down other people's stuff,
// or obsolete events of our own.
if (kinds.includes(30078)) {
filter.push({kinds: [30078], authors: chunk, "#d": Object.values(appDataKeys)})
}
return filter
}
await Promise.all(
pluck(
"result",
chunk(256, pubkeys).map(
(chunk: string[]) =>
new Subscription({
relays: getChunkRelays(chunk),
filters: getChunkFilters(chunk),
timeout: 10_000,
})
)
)
)
}

View File

@ -0,0 +1,95 @@
import {uniqBy, identity, prop, pluck, sortBy} from "ramda"
import {throttle, batch} from "hurdak"
import {findReplyId, findRootId} from "src/util/nostr"
import type {Event, DisplayEvent} from "src/engine2/model"
import {writable} from "src/engine2/util/store"
import {ContextLoader} from "./context"
import {Subscription} from "./subscription"
export type ThreadOpts = {
anchorId: string
relays: string[]
isMuted: (e: Event) => boolean
}
export class ThreadLoader {
stopped = false
context: ContextLoader
anchor = writable<DisplayEvent>(null)
parent = writable<DisplayEvent>(null)
ancestors = writable<DisplayEvent[]>([])
root = writable<DisplayEvent>(null)
constructor(readonly opts: ThreadOpts) {
this.context = new ContextLoader({
isMuted: opts.isMuted,
onEvent: this.updateThread,
})
this.loadNotes([opts.anchorId], 2)
}
stop() {
this.context.stop()
}
loadNotes(ids, depth = 1) {
const seen = new Set(pluck("id", this.getThread()))
const filteredIds = ids.filter(id => id && !seen.has(id))
if (filteredIds.length > 0) {
const sub = new Subscription({
timeout: 4000,
relays: this.opts.relays,
filters: [{ids: filteredIds}],
})
sub.on(
"event",
batch(300, (events: Event[]) => {
this.context.addContext(events, {depth: 1})
this.addToThread(this.context.applyContext(events))
this.loadNotes(events.flatMap(e => [findReplyId(e), findRootId(e)]))
})
)
}
}
// Thread building
getThread() {
return [this.root.get(), ...this.ancestors.get(), this.parent.get(), this.anchor.get()].filter(
identity
)
}
addToThread(events) {
const ancestors = []
for (const event of events) {
if (event.id === this.opts.anchorId) {
this.anchor.set(event)
} else {
const anchor = this.anchor.get()
if (event.id === findReplyId(anchor)) {
this.parent.set(event)
} else if (event.id === findRootId(anchor)) {
this.root.set(event)
} else {
ancestors.push(event)
}
}
}
if (ancestors.length > 0) {
this.ancestors.update($xs =>
sortBy(prop("created_at"), uniqBy(prop("id"), ancestors.concat($xs)))
)
}
}
updateThread = throttle(500, () => {
this.addToThread(this.context.applyContext(this.getThread()))
})
}

View File

@ -1,3 +1,4 @@
import {Pool} from "paravel"
import {collection, writable} from "src/engine2/util/store" import {collection, writable} from "src/engine2/util/store"
import type { import type {
Event, Event,
@ -30,3 +31,7 @@ export const handles = collection<Handle>("pubkey")
export const zappers = collection<Zapper>("pubkey") export const zappers = collection<Zapper>("pubkey")
export const relays = collection<Relay>("url") export const relays = collection<Relay>("url")
export const relayPolicies = collection<RelayPolicy>("pubkey") export const relayPolicies = collection<RelayPolicy>("pubkey")
// Other stuff
export const pool = new Pool()