From fcf5faa9602c9a27f6ed39263a8dcef5f88dc183 Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Thu, 31 Aug 2023 15:36:07 -0700 Subject: [PATCH] Move nip 65 over --- src/engine2/model/index.ts | 5 + src/engine2/projections/index.ts | 1 + src/engine2/projections/nip65.ts | 84 +++++++ src/engine2/queries/index.ts | 1 + src/engine2/queries/nip65.ts | 162 +++++++++++++ src/engine2/queries/session/index.ts | 4 +- src/engine2/requests/context.ts | 330 +++++++++++++++++++++++++++ src/engine2/requests/cursor.ts | 140 ++++++++++++ src/engine2/requests/executor.ts | 3 +- src/engine2/requests/feed.ts | 221 ++++++++++++++++++ src/engine2/requests/index.ts | 3 +- src/engine2/requests/pool.ts | 3 - src/engine2/requests/pubkeys.ts | 86 +++++++ src/engine2/requests/thread.ts | 95 ++++++++ src/engine2/state/index.ts | 5 + 15 files changed, 1136 insertions(+), 7 deletions(-) create mode 100644 src/engine2/projections/nip65.ts create mode 100644 src/engine2/queries/nip65.ts create mode 100644 src/engine2/requests/context.ts create mode 100644 src/engine2/requests/cursor.ts create mode 100644 src/engine2/requests/feed.ts delete mode 100644 src/engine2/requests/pool.ts create mode 100644 src/engine2/requests/pubkeys.ts create mode 100644 src/engine2/requests/thread.ts diff --git a/src/engine2/model/index.ts b/src/engine2/model/index.ts index 8fda4087..bccaefa2 100644 --- a/src/engine2/model/index.ts +++ b/src/engine2/model/index.ts @@ -86,6 +86,11 @@ export type Relay = { info?: RelayInfo } +export enum RelayMode { + Read = "read", + Write = "write", +} + export type RelayPolicyEntry = { url: string read: boolean diff --git a/src/engine2/projections/index.ts b/src/engine2/projections/index.ts index d614000f..fd0cd9ee 100644 --- a/src/engine2/projections/index.ts +++ b/src/engine2/projections/index.ts @@ -1,3 +1,4 @@ import "./nip02" +import "./nip65" export * from "./core" diff --git a/src/engine2/projections/nip65.ts b/src/engine2/projections/nip65.ts new file mode 100644 index 00000000..049a1f9c --- /dev/null +++ b/src/engine2/projections/nip65.ts @@ -0,0 +1,84 @@ +import {uniqBy, prop, inc} from "ramda" +import {tryJson, now} from "src/util/misc" +import {warn} from "src/util/logger" +import {normalizeRelayUrl, isShareableRelay, Tags} from "src/util/nostr" +import type {RelayPolicyEntry} from "src/engine2/model" +import {RelayMode} from "src/engine2/model" +import {relays, relayPolicies} from "src/engine2/state" +import {projections} from "src/engine2/projections/core" + +const addRelay = (url: string) => { + if (isShareableRelay(url)) { + const relay = relays.key(url).get() + + relays.key(url).merge({ + count: inc(relay?.count || 0), + first_seen: relay?.first_seen || now(), + info: { + last_checked: 0, + }, + }) + } +} + +const setPolicy = ( + {pubkey, created_at}: {pubkey: string; created_at: number}, + relays: RelayPolicyEntry[] +) => { + if (relays?.length > 0) { + if (created_at < relayPolicies.key(pubkey).get()?.created_at) { + return + } + + relayPolicies.key(pubkey).merge({ + created_at, + updated_at: now(), + relays: uniqBy(prop("url"), relays).map((relay: RelayPolicyEntry) => { + addRelay(relay.url) + + return {read: true, write: true, ...relay} + }), + }) + } +} + +projections.addHandler(2, e => { + addRelay(normalizeRelayUrl(e.content)) +}) + +projections.addHandler(3, e => { + setPolicy( + e, + tryJson(() => { + return Object.entries(JSON.parse(e.content || "")) + .filter(([url]) => isShareableRelay(url)) + .map(([url, conditions]) => { + // @ts-ignore + const write = ![false, "!"].includes(conditions.write) + // @ts-ignore + const read = ![false, "!"].includes(conditions.read) + + return {url: normalizeRelayUrl(url), write, read} + }) + }) as RelayPolicyEntry[] + ) +}) + +projections.addHandler(10002, e => { + setPolicy( + e, + Tags.from(e) + .type("r") + .all() + .map(([_, url, mode]) => { + const write = !mode || mode === RelayMode.Write + const read = !mode || mode === RelayMode.Read + + if (!write && !read) { + warn(`Encountered unknown relay mode: ${mode}`) + } + + return {url: normalizeRelayUrl(url), write, read} + }) + ) +}) diff --git a/src/engine2/queries/index.ts b/src/engine2/queries/index.ts index 4861f24b..4ee1d92b 100644 --- a/src/engine2/queries/index.ts +++ b/src/engine2/queries/index.ts @@ -1,2 +1,3 @@ export * from "./session" export * from "./nip02" +export * from "./nip65" diff --git a/src/engine2/queries/nip65.ts b/src/engine2/queries/nip65.ts new file mode 100644 index 00000000..c6f6703b --- /dev/null +++ b/src/engine2/queries/nip65.ts @@ -0,0 +1,162 @@ +import {sortBy, pluck, uniq, nth, prop, last} from "ramda" +import {chain} from "hurdak" +import {fuzzy} from "src/util/misc" +import {findReplyId, findRootId, isShareableRelay, Tags} from "src/util/nostr" +import {derived} from "src/engine2/util/store" +import type {Event, Relay, RelayInfo} from "src/engine2/model" +import {RelayMode} from "src/engine2/model" +import {env, pool, relays, relayPolicies} from "src/engine2/state" +import {stateKey} from "src/engine2/queries/session" + +export const relayIsLowQuality = (url: string) => + pool.get(url, {autoConnect: false})?.meta?.quality < 0.6 + +export const getRelay = (url: string): Relay => relays.key(url).get() || {url} + +export const getRelayInfo = (url: string): RelayInfo => getRelay(url)?.info || {} + +export const displayRelay = ({url}: Relay) => last(url.split("://")) + +export const searchRelays = derived(relays, $relays => fuzzy($relays.values(), {keys: ["url"]})) + +export const getSearchRelays = () => { + const searchableRelayUrls = relays + .get() + .filter(r => (r.info?.supported_nips || []).includes(50)) + .map(prop("url")) + + return uniq(env.get().SEARCH_RELAYS.concat(searchableRelayUrls)).slice(0, 8) +} + +export const getPubkeyRelays = (pubkey: string, mode: string = null) => { + const relays = relayPolicies.key(pubkey).get()?.relays || [] + + return mode ? relays.filter(prop(mode)) : relays +} + +export const getPubkeyRelayUrls = (pubkey: string, mode: string = null) => + pluck("url", getPubkeyRelays(pubkey, mode)) + +export const getUserRelays = (mode: string = null) => getPubkeyRelays(stateKey.get(), mode) + +export const getUserRelayUrls = (mode: string = null) => pluck("url", getUserRelays(mode)) + +// Smart relay selection +// +// From Mike Dilger: +// 1) Other people's write relays — pull events from people you follow, +// including their contact lists +// 2) Other people's read relays — push events that tag them (replies or just tagging). +// However, these may be authenticated, use with caution +// 3) Your write relays —- write events you post to your microblog feed for the +// world to see. ALSO write your contact list. ALSO read back your own contact list. +// 4) Your read relays —- read events that tag you. ALSO both write and read +// client-private data like client configuration events or anything that the world +// doesn't need to see. +// 5) Advertise relays — write and read back your own relay list + +export const selectHints = (limit: number | null, hints: Iterable) => { + const seen = new Set() + const ok = [] + const bad = [] + + for (const url of chain(hints, getUserRelayUrls(RelayMode.Read), env.get().DEFAULT_RELAYS)) { + if (seen.has(url)) { + continue + } + + seen.add(url) + + // Filter out relays that appear to be broken or slow + if (!isShareableRelay(url)) { + bad.push(url) + } else if (relayIsLowQuality(url)) { + bad.push(url) + } else { + ok.push(url) + } + + if (limit && ok.length > limit) { + break + } + } + + // If we don't have enough hints, use the broken ones + return ok.concat(bad).slice(0, limit || Infinity) +} + +export const hintSelector = + (generateHints: (...args: any[]) => Iterable) => + (limit: number, ...args: any[]) => + selectHints(limit, generateHints(...args)) + +export const getPubkeyHints = hintSelector(function* (pubkey: string, mode: RelayMode) { + yield* getPubkeyRelayUrls(pubkey, mode) +}) + +export const getEventHints = hintSelector(function* (event: Event) { + yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Write) +}) + +// If we're looking for an event's children, the read relays the author has +// advertised would be the most reliable option, since well-behaved clients +// will write replies there. +export const getReplyHints = hintSelector(function* (event) { + yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read) +}) + +// If we're looking for an event's parent, tags are the most reliable hint, +// but we can also look at where the author of the note reads from +export const getParentHints = hintSelector(function* (event) { + const parentId = findReplyId(event) + + yield* Tags.from(event).equals(parentId).relays() + yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read) +}) + +export const getRootHints = hintSelector(function* (event) { + const rootId = findRootId(event) + + yield* Tags.from(event).equals(rootId).relays() + yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read) +}) + +// If we're replying or reacting to an event, we want the author to know, as well as +// anyone else who is tagged in the original event or the reply. Get everyone's read +// relays. Limit how many per pubkey we publish to though. We also want to advertise +// our content to our followers, so publish to our write relays as well. +export const getPublishHints = (limit: number, event: Event, extraRelays: string[] = []) => { + const pubkeys = Tags.from(event).type("p").values().all() + const hintGroups = pubkeys.map(pubkey => getPubkeyRelayUrls(pubkey, RelayMode.Read)) + const authorRelays = getPubkeyRelayUrls(event.pubkey, RelayMode.Write) + + return mergeHints(limit, hintGroups.concat([extraRelays, authorRelays])) +} + +export const mergeHints = (limit: number, groups: string[][]) => { + const scores = {} as Record + + for (const hints of groups) { + hints.forEach((hint, i) => { + const score = 1 / (i + 1) / hints.length + + if (!scores[hint]) { + scores[hint] = {score: 0, count: 0} + } + + scores[hint].score += score + scores[hint].count += 1 + }) + } + + // Use the log-sum-exp and a weighted sum + for (const score of Object.values(scores)) { + const weight = Math.log(groups.length / score.count) + + score.score = weight + Math.log1p(Math.exp(score.score - score.count)) + } + + return sortBy(([hint, {score}]) => score, Object.entries(scores)) + .map(nth(0)) + .slice(0, limit) +} diff --git a/src/engine2/queries/session/index.ts b/src/engine2/queries/session/index.ts index 2cef69fe..c42a625f 100644 --- a/src/engine2/queries/session/index.ts +++ b/src/engine2/queries/session/index.ts @@ -1,4 +1,4 @@ -import {find, whereEq} from "ramda" +import {find, defaultTo, whereEq} from "ramda" import {derived} from "src/engine2/util/store" import {pubkey, keys} from "src/engine2/state" import {prepareNdk, ndkInstances} from "./ndk" @@ -6,6 +6,8 @@ import {Signer} from "./signer" import {Crypto} from "./crypto" import {Wrapper} from "./wrapper" +export const stateKey = pubkey.derived(defaultTo("anonymous")) + export const user = derived([pubkey, keys], ([$pubkey, $keys]) => { return find(whereEq({pubkey: $pubkey}), $keys) }) diff --git a/src/engine2/requests/context.ts b/src/engine2/requests/context.ts new file mode 100644 index 00000000..1fb0ac8e --- /dev/null +++ b/src/engine2/requests/context.ts @@ -0,0 +1,330 @@ +import {matchFilters} from "nostr-tools" +import {throttle} from "throttle-debounce" +import {omit, find, pluck, flatten, without, groupBy, sortBy, prop, uniqBy, reject} from "ramda" +import {ensurePlural, batch, chunk} from "hurdak" +import {now, pushToKey} from "src/util/misc" +import {findReplyAndRootIds, findReplyId, findRootId, Tags, noteKinds} from "src/util/nostr" +import {collection} from "src/engine2/util/store" +import type {Collection} from "src/engine2/util/store" +import type {Event, DisplayEvent, Filter} from "src/engine2/model" +import {settings, env} from "src/engine2/state" +import {mergeHints, getReplyHints, getRootHints, getParentHints} from "src/engine2/queries" +import {Subscription} from "./subscription" +import {loadPubkeys} from "./pubkeys" + +const fromDisplayEvent = (e: DisplayEvent): Event => + omit(["zaps", "likes", "replies", "matchesFilter"], e) + +export type ContextLoaderOpts = { + isMuted: (e: Event) => boolean + relays?: string[] + filters?: Filter[] + onEvent?: (e: Event) => void + shouldLoadParents?: boolean +} + +export class ContextLoader { + stopped: boolean + data: Collection + seen: Set + subs: Record void}>> + + constructor(readonly opts: ContextLoaderOpts) { + this.stopped = false + this.data = collection("id") + this.seen = new Set() + this.subs = { + context: [], + listeners: [], + } + } + + // Utils + + addSubs(key: string, subs: Array) { + for (const sub of ensurePlural(subs)) { + this.subs[key].push(sub) + + sub.on("close", () => { + this.subs[key] = without([sub], this.subs[key]) + }) + } + } + + getAllSubs() { + return flatten(Object.values(this.subs)) + } + + getReplyKinds() { + const {ENABLE_ZAPS} = env.get() + + return ENABLE_ZAPS ? [1, 7, 9735] : [1, 7] + } + + matchFilters(e: Event) { + return !this.opts.filters || matchFilters(ensurePlural(this.opts.filters), e) + } + + isTextNote(e: Event) { + return noteKinds.includes(e.kind) + } + + isMissingParent = (e: Event) => { + const parentId = findReplyId(e) + + return parentId && this.matchFilters(e) && !this.data.key(parentId).exists() + } + + preprocessEvents = (events: Event[]) => { + events = reject((e: Event) => this.seen.has(e.id) || this.opts.isMuted(e), events) + + for (const event of events) { + this.seen.add(event.id) + } + + return events + } + + getRelayLimit() { + return settings.get().relay_limit + } + + mergeHints(groups: string[][]) { + if (this.opts.relays) { + return this.opts.relays + } + + return mergeHints(this.getRelayLimit(), groups) + } + + applyContext = (notes: Event[], {substituteParents = false, alreadySeen = new Set()} = {}) => { + const contextById = {} as Record + const zapsByParentId = {} as Record + const reactionsByParentId = {} as Record + const repliesByParentId = {} as Record + + for (const event of this.data.get().concat(notes)) { + if (contextById[event.id]) { + continue + } + + contextById[event.id] = event + + const parentId = findReplyId(event) + + if (event.kind === 9735) { + pushToKey(zapsByParentId, parentId, event) + } else if (event.kind === 7) { + pushToKey(reactionsByParentId, parentId, event) + } else { + pushToKey(repliesByParentId, parentId, event) + } + } + + const annotate = (note: Event): DisplayEvent => { + const {replies = [], reactions = [], zaps = []} = note as DisplayEvent + const combinedZaps = zaps.concat(zapsByParentId[note.id] || []) + const combinedReactions = reactions.concat(reactionsByParentId[note.id] || []) + const combinedReplies = (replies as Event[]) + .concat(repliesByParentId[note.id] || []) + .map(annotate) + + return { + ...note, + zaps: uniqBy(prop("id"), combinedZaps), + reactions: uniqBy(prop("id"), combinedReactions), + replies: sortBy((e: DisplayEvent) => -e.created_at, uniqBy(prop("id"), combinedReplies)), + matchesFilter: + !alreadySeen.has(note.id) && + (this.matchFilters(note) || Boolean(find(prop("matchesFilter"), combinedReplies))), + } + } + + if (substituteParents) { + // We may have loaded a reply from a follower to someone we muted + notes = reject( + this.opts.isMuted, + notes.map(note => { + for (let i = 0; i < 2; i++) { + const parent = contextById[findReplyId(note)] + + if (!parent) { + break + } + + note = parent + } + + return note + }) + ) + } + + return uniqBy(prop("id"), notes).map(annotate) + } + + // Context loaders + + loadPubkeys = (events: Event[]) => { + loadPubkeys( + events.filter(this.isTextNote).flatMap((e: Event) => Tags.from(e).pubkeys().concat(e.pubkey)) + ) + } + + loadParents = (events: Event[]) => { + if (this.stopped) { + return + } + + const parentsInfo = events.flatMap((e: Event) => { + const info = [] + const {root, reply} = findReplyAndRootIds(e) + + if (reply && !this.seen.has(reply)) { + info.push({id: reply, hints: getParentHints(this.getRelayLimit(), e)}) + } + + if (root && !this.seen.has(root)) { + info.push({id: findRootId(e), hints: getRootHints(this.getRelayLimit(), e)}) + } + + return info + }) + + if (parentsInfo.length > 0) { + const sub = new Subscription({ + timeout: 5000, + filters: [{ids: pluck("id", parentsInfo)}], + relays: this.mergeHints(pluck("hints", parentsInfo)), + }) + + sub.on( + "event", + batch(100, (context: Event[]) => this.addContext(context, {depth: 2})) + ) + + this.addSubs("context", [sub]) + } + } + + loadContext = batch(300, (eventGroups: any) => { + if (this.stopped) { + return + } + + const groupsByDepth = groupBy(prop("depth"), eventGroups) + + for (const [depthStr, groups] of Object.entries(groupsByDepth)) { + const depth = parseInt(depthStr) + + if (depth === 0) { + continue + } + + const events = uniqBy( + prop("id"), + flatten(pluck("events", groups as any[])).filter(this.isTextNote) + ) as Event[] + + for (const c of chunk(256, events)) { + const sub = new Subscription({ + timeout: 5000, + relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))), + filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c as Event[])}], + }) + + sub.on( + "event", + batch(100, (context: Event[]) => this.addContext(context, {depth: depth - 1})) + ) + + this.addSubs("context", [sub]) + } + } + }) + + listenForContext = throttle(5000, () => { + if (this.stopped) { + return + } + + this.subs.listeners.forEach(sub => sub.close()) + + const contextByParentId = groupBy(findReplyId, this.data.get()) + + const findNotes = (events: Event[]): Event[] => + uniqBy( + prop("id"), + events + .filter(this.isTextNote) + .flatMap(e => findNotes(contextByParentId[e.id] || []).concat(e)) + ) + + for (const c of chunk(256, findNotes(this.data.get()))) { + const sub = new Subscription({ + relays: this.mergeHints(c.map(e => getReplyHints(this.getRelayLimit(), e))), + filters: [{kinds: this.getReplyKinds(), "#e": pluck("id", c), since: now()}], + }) + + sub.on( + "event", + batch(100, (context: Event[]) => this.addContext(context, {depth: 2})) + ) + + this.addSubs("listeners", [sub]) + } + }) + + // Adders + + addContext = (newEvents: Event[], {shouldLoadParents = false, depth = 0}) => { + const events = this.preprocessEvents(newEvents) + + this.data.update($context => $context.concat(events)) + + this.loadPubkeys(events) + + if (shouldLoadParents) { + this.loadParents(events) + } + + this.loadContext({events, depth}) + + this.listenForContext() + + if (this.opts.onEvent) { + for (const event of events) { + this.opts.onEvent(event) + } + } + } + + hydrate(notes: Partial[], depth) { + const context: Event[] = [] + + const addContext = (note: Partial) => { + // Only add to context if it's a real event + if (note.sig) { + context.push(fromDisplayEvent(note as DisplayEvent)) + } + + note.zaps?.forEach(zap => context.push(zap)) + note.reactions?.forEach(reaction => context.push(reaction)) + note.replies?.forEach(reply => addContext(reply)) + } + + notes.forEach(addContext) + + this.addContext(context, {depth}) + } + + // Control + + stop() { + this.stopped = true + + for (const sub of this.getAllSubs()) { + sub.close() + } + } +} diff --git a/src/engine2/requests/cursor.ts b/src/engine2/requests/cursor.ts new file mode 100644 index 00000000..a83ef05a --- /dev/null +++ b/src/engine2/requests/cursor.ts @@ -0,0 +1,140 @@ +import {mergeRight, identity, sortBy} from "ramda" +import {seconds, first} from "hurdak" +import {now} from "src/util/misc" +import {EPOCH} from "src/util/nostr" +import type {Filter, Event} from "src/engine2/model" +import {Subscription} from "./subscription" + +export type CursorOpts = { + relay: string + filters: Filter[] + onEvent?: (e: Event) => void +} + +export class Cursor { + until = now() + delta = seconds(10, "minute") + since = now() - this.delta + buffer: Event[] = [] + loading = false + done = false + + constructor(readonly opts: CursorOpts) {} + + load(n: number) { + const limit = n - this.buffer.length + + // If we're already loading, or we have enough buffered, do nothing + if (this.done || this.loading || limit <= 0) { + return null + } + + const {since, until} = this + const {relay, filters, onEvent} = this.opts + + this.loading = true + + let count = 0 + + const sub = new Subscription({ + timeout: 5000, + relays: [relay], + filters: filters.map(mergeRight({until, limit, since})), + }) + + sub.on("event", (event: Event) => { + this.until = Math.min(until, event.created_at) - 1 + this.buffer.push(event) + + count += 1 + + onEvent?.(event) + }) + + sub.on("close", () => { + this.loading = false + + // Relays can't be relied upon to return events in descending order, do exponential + // windowing to ensure we get the most recent stuff on first load, but eventually find it all + if (count === 0) { + this.delta *= 10 + } + + if (this.since <= EPOCH) { + this.done = true + } + + this.since -= this.delta + }) + + return sub + } + + take(n = Infinity) { + return this.buffer.splice(0, n) + } + + count() { + return this.buffer.length + } + + peek() { + return this.buffer[0] + } + + pop() { + return first(this.take(1)) + } +} + +export class MultiCursor { + bufferFactor = 4 + seen_on: Map + cursors: Cursor[] + + constructor(cursors: Cursor[]) { + this.seen_on = new Map() + this.cursors = cursors + } + + load(limit: number) { + return this.cursors.map(c => c.load(limit)).filter(identity) + } + + count() { + return this.cursors.reduce((n, c) => n + c.buffer.length, 0) + } + + take(n: number): [Subscription[], Event[]] { + const events = [] + + while (events.length < n) { + // Find the most recent event available so that they're sorted + const [cursor] = sortBy( + c => -c.peek().created_at, + this.cursors.filter(c => c.peek()) + ) + + if (!cursor) { + break + } + + const event = cursor.pop() + + // Merge seen_on via mutation so it applies to future. If we've already + // seen the event, we're also done and we don't need to add it to our buffer + if (this.seen_on.has(event.id) && !this.seen_on.get(event.id).includes(event.seen_on[0])) { + this.seen_on.get(event.id).push(event.seen_on[0]) + } else { + this.seen_on.set(event.id, event.seen_on) + + events.push(event) + } + } + + // Preload the next page + const subs = this.load(n * this.bufferFactor) + + return [subs, events] + } +} diff --git a/src/engine2/requests/executor.ts b/src/engine2/requests/executor.ts index 00c5c963..e9794645 100644 --- a/src/engine2/requests/executor.ts +++ b/src/engine2/requests/executor.ts @@ -3,8 +3,7 @@ import {Plex, Relays, Executor} from "paravel" import {error, warn} from "src/util/logger" import {normalizeRelayUrl} from "src/util/nostr" import {writable} from "src/engine2/util" -import {env, settings} from "src/engine2/state" -import {pool} from "./pool" +import {env, pool, settings} from "src/engine2/state" export const authHandler = writable(null) diff --git a/src/engine2/requests/feed.ts b/src/engine2/requests/feed.ts new file mode 100644 index 00000000..cec9abe7 --- /dev/null +++ b/src/engine2/requests/feed.ts @@ -0,0 +1,221 @@ +import {partition, reject, identity, uniqBy, pluck, sortBy, without, any, prop, assoc} from "ramda" +import {ensurePlural, union, seconds, doPipe, throttle, batch} from "hurdak" +import {now, race} from "src/util/misc" +import {findReplyId} from "src/util/nostr" +import type {Event, DisplayEvent, Filter} from "src/engine2/model" +import {writable} from "src/engine2/util/store" +import {Subscription} from "./subscription" +import {getUrls} from "./executor" +import {Cursor, MultiCursor} from "./cursor" +import {ContextLoader} from "./context" + +export type FeedOpts = { + depth: number + relays: string[] + filters: Filter[] + isMuted: (e: Event) => boolean + onEvent?: (e: Event) => void + shouldLoadParents?: boolean + shouldUseNip65?: boolean +} + +export class FeedLoader { + since = now() + stopped = false + context: ContextLoader + subs: Array<{close: () => void}> = [] + feed = writable([]) + stream = writable([]) + deferred: Event[] = [] + cursor: MultiCursor + ready: Promise + + constructor(readonly opts: FeedOpts) { + const urls = getUrls(opts.relays) + + this.context = new ContextLoader({ + relays: opts.shouldUseNip65 ? null : urls, + filters: opts.filters, + isMuted: opts.isMuted, + onEvent: event => { + opts.onEvent?.(event) + + this.updateFeed() + }, + }) + + // No point in subscribing if we have an end date + if (!any(prop("until"), ensurePlural(opts.filters) as any[])) { + const sub = new Subscription({ + relays: urls, + filters: opts.filters.map(assoc("since", this.since)), + }) + + sub.on( + "event", + batch(1000, (context: Event[]) => { + this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth}) + this.stream.update($stream => $stream.concat(context)) + }) + ) + + this.addSubs([sub]) + } + + this.cursor = new MultiCursor( + urls.map( + relay => + new Cursor({ + relay, + filters: opts.filters, + onEvent: batch(100, (context: Event[]) => + this.context.addContext(context, {shouldLoadParents: true, depth: opts.depth}) + ), + }) + ) + ) + + const subs = this.cursor.load(50) + + this.addSubs(subs) + + // Wait until a good number of subscriptions have completed to reduce the chance of + // out of order notes + this.ready = race(0.2, pluck("result", subs)) + } + + // Control + + addSubs(subs: Array) { + for (const sub of ensurePlural(subs)) { + this.subs.push(sub) + + sub.on("close", () => { + this.subs = without([sub], this.subs) + }) + } + } + + stop() { + this.stopped = true + this.context.stop() + + for (const sub of this.subs) { + sub.close() + } + } + + // Feed building + + addToFeed = (notes: Event[]) => { + const getChildIds = note => note.replies.flatMap(child => [child.id, getChildIds(child)]) + + this.feed.update($feed => { + // Avoid showing the same note twice, even if it's once as a parent and once as a child + const feedIds = new Set(pluck("id", $feed)) + const feedChildIds = new Set($feed.flatMap(getChildIds)) + const feedParentIds = new Set($feed.map(findReplyId).filter(identity)) + + return uniqBy( + prop("id"), + $feed.concat( + this.context.applyContext( + sortBy( + e => -e.created_at, + reject( + (e: Event) => + feedIds.has(findReplyId(e)) || feedChildIds.has(e.id) || feedParentIds.has(e.id), + notes + ) + ), + { + substituteParents: true, + alreadySeen: union(feedIds, feedChildIds), + } + ) + ) + ) + }) + } + + updateFeed = throttle(500, () => { + this.feed.update($feed => this.context.applyContext($feed)) + }) + + // Loading + + async load(n) { + await this.ready + + const [subs, notes] = this.cursor.take(n) + const deferred = this.deferred.splice(0) + + this.addSubs(subs) + + const ok = doPipe(notes.concat(deferred), [ + this.deferReactions, + this.deferOrphans, + this.deferAncient, + ]) + + this.addToFeed(ok) + } + + loadStream() { + this.stream.update($stream => { + this.feed.update($feed => { + return uniqBy( + prop("id"), + this.context + .applyContext($stream, { + substituteParents: true, + }) + .concat($feed) + ) + }) + + return [] + }) + } + + deferReactions = (notes: Event[]) => { + const [defer, ok] = partition( + e => !this.context.isTextNote(e) && this.context.isMissingParent(e), + notes + ) + + setTimeout(() => { + // Defer again if we still don't have a parent, it's pointless to show an orphaned reaction + const [orphans, ready] = partition(this.context.isMissingParent, defer) + + this.addToFeed(ready) + this.deferred = this.deferred.concat(orphans) + }, 1500) + + return ok + } + + deferOrphans = (notes: Event[]) => { + // If something has a parent id but we haven't found the parent yet, skip it until we have it. + const [defer, ok] = partition( + e => this.context.isTextNote(e) && this.context.isMissingParent(e), + notes + ) + + setTimeout(() => this.addToFeed(defer), 1500) + + return ok + } + + deferAncient = (notes: Event[]) => { + // Sometimes relays send very old data very quickly. Pop these off the queue and re-add + // them after we have more timely data. They still might be relevant, but order will still + // be maintained since everything before the cutoff will be deferred the same way. + const since = now() - seconds(6, "hour") + const [defer, ok] = partition(e => e.created_at < since, notes) + + setTimeout(() => this.addToFeed(defer), 4000) + + return ok + } +} diff --git a/src/engine2/requests/index.ts b/src/engine2/requests/index.ts index d86fc2f4..cf5f4afa 100644 --- a/src/engine2/requests/index.ts +++ b/src/engine2/requests/index.ts @@ -1,4 +1,5 @@ -export * from "./pool" export * from "./executor" export * from "./publisher" export * from "./subscription" +export * from "./cursor" +export * from "./context" diff --git a/src/engine2/requests/pool.ts b/src/engine2/requests/pool.ts deleted file mode 100644 index f4c155a5..00000000 --- a/src/engine2/requests/pool.ts +++ /dev/null @@ -1,3 +0,0 @@ -import {Pool} from "paravel" - -export const pool = new Pool() diff --git a/src/engine2/requests/pubkeys.ts b/src/engine2/requests/pubkeys.ts new file mode 100644 index 00000000..a2a403df --- /dev/null +++ b/src/engine2/requests/pubkeys.ts @@ -0,0 +1,86 @@ +import {without, pluck, uniq} from "ramda" +import {chunk, seconds, ensurePlural} from "hurdak" +import {personKinds, appDataKeys} from "src/util/nostr" +import {now} from "src/util/misc" +import type {Filter} from "src/engine2/model" +import {profiles, settings} from "src/engine2/state" +import {mergeHints, getPubkeyHints} from "src/engine2/queries" +import {Subscription} from "./subscription" + +export type LoadPeopleOpts = { + relays?: string[] + kinds?: number[] + force?: boolean +} + +export const attemptedPubkeys = new Set() + +export const getStalePubkeys = (pubkeys: string[]) => { + const stale = new Set() + const since = now() - seconds(3, "hour") + + for (const pubkey of pubkeys) { + if (stale.has(pubkey) || attemptedPubkeys.has(pubkey)) { + continue + } + + attemptedPubkeys.add(pubkey) + + if (profiles.key(pubkey).get()?.updated_at || 0 > since) { + continue + } + + stale.add(pubkey) + } + + return Array.from(stale) +} + +export const loadPubkeys = async ( + pubkeyGroups: string | string[], + {relays, force, kinds = personKinds}: LoadPeopleOpts = {} +) => { + const rawPubkeys = ensurePlural(pubkeyGroups).reduce((a, b) => a.concat(b), []) + const pubkeys = force ? uniq(rawPubkeys) : getStalePubkeys(rawPubkeys) + + const getChunkRelays = (chunk: string[]) => { + if (relays?.length > 0) { + return relays + } + + const {relay_limit: limit} = settings.get() + + return mergeHints( + limit, + chunk.map(pubkey => getPubkeyHints(limit, pubkey, "write")) + ) + } + + const getChunkFilters = (chunk: string[]) => { + const filter = [] as Filter[] + + filter.push({kinds: without([30078], kinds), authors: chunk}) + + // Add a separate filter for app data so we're not pulling down other people's stuff, + // or obsolete events of our own. + if (kinds.includes(30078)) { + filter.push({kinds: [30078], authors: chunk, "#d": Object.values(appDataKeys)}) + } + + return filter + } + + await Promise.all( + pluck( + "result", + chunk(256, pubkeys).map( + (chunk: string[]) => + new Subscription({ + relays: getChunkRelays(chunk), + filters: getChunkFilters(chunk), + timeout: 10_000, + }) + ) + ) + ) +} diff --git a/src/engine2/requests/thread.ts b/src/engine2/requests/thread.ts new file mode 100644 index 00000000..4d6dc904 --- /dev/null +++ b/src/engine2/requests/thread.ts @@ -0,0 +1,95 @@ +import {uniqBy, identity, prop, pluck, sortBy} from "ramda" +import {throttle, batch} from "hurdak" +import {findReplyId, findRootId} from "src/util/nostr" +import type {Event, DisplayEvent} from "src/engine2/model" +import {writable} from "src/engine2/util/store" +import {ContextLoader} from "./context" +import {Subscription} from "./subscription" + +export type ThreadOpts = { + anchorId: string + relays: string[] + isMuted: (e: Event) => boolean +} + +export class ThreadLoader { + stopped = false + context: ContextLoader + anchor = writable(null) + parent = writable(null) + ancestors = writable([]) + root = writable(null) + + constructor(readonly opts: ThreadOpts) { + this.context = new ContextLoader({ + isMuted: opts.isMuted, + onEvent: this.updateThread, + }) + + this.loadNotes([opts.anchorId], 2) + } + + stop() { + this.context.stop() + } + + loadNotes(ids, depth = 1) { + const seen = new Set(pluck("id", this.getThread())) + const filteredIds = ids.filter(id => id && !seen.has(id)) + + if (filteredIds.length > 0) { + const sub = new Subscription({ + timeout: 4000, + relays: this.opts.relays, + filters: [{ids: filteredIds}], + }) + + sub.on( + "event", + batch(300, (events: Event[]) => { + this.context.addContext(events, {depth: 1}) + this.addToThread(this.context.applyContext(events)) + this.loadNotes(events.flatMap(e => [findReplyId(e), findRootId(e)])) + }) + ) + } + } + + // Thread building + + getThread() { + return [this.root.get(), ...this.ancestors.get(), this.parent.get(), this.anchor.get()].filter( + identity + ) + } + + addToThread(events) { + const ancestors = [] + + for (const event of events) { + if (event.id === this.opts.anchorId) { + this.anchor.set(event) + } else { + const anchor = this.anchor.get() + + if (event.id === findReplyId(anchor)) { + this.parent.set(event) + } else if (event.id === findRootId(anchor)) { + this.root.set(event) + } else { + ancestors.push(event) + } + } + } + + if (ancestors.length > 0) { + this.ancestors.update($xs => + sortBy(prop("created_at"), uniqBy(prop("id"), ancestors.concat($xs))) + ) + } + } + + updateThread = throttle(500, () => { + this.addToThread(this.context.applyContext(this.getThread())) + }) +} diff --git a/src/engine2/state/index.ts b/src/engine2/state/index.ts index 33c4ba71..6bed7b36 100644 --- a/src/engine2/state/index.ts +++ b/src/engine2/state/index.ts @@ -1,3 +1,4 @@ +import {Pool} from "paravel" import {collection, writable} from "src/engine2/util/store" import type { Event, @@ -30,3 +31,7 @@ export const handles = collection("pubkey") export const zappers = collection("pubkey") export const relays = collection("url") export const relayPolicies = collection("pubkey") + +// Other stuff + +export const pool = new Pool()