Upgrade paravel

This commit is contained in:
Jon Staab 2023-10-26 11:59:42 -07:00
parent 7d9ec3e9a5
commit 328ea31452
6 changed files with 49 additions and 141 deletions

View File

@ -58,7 +58,7 @@
"normalize-url": "^8.0.0",
"nostr-tools": "^1.12.1",
"npm-run-all": "^4.1.5",
"paravel": "^0.3.7",
"paravel": "^0.4.0",
"qr-scanner": "^1.4.2",
"qrcode": "^1.5.1",
"ramda": "^0.28.0",

View File

@ -1,12 +1,13 @@
import {mergeRight, pluck, max, identity, sortBy} from "ramda"
import {first} from "hurdak"
import type {Subscription} from "paravel"
import {now} from "src/util/misc"
import {info} from "src/util/logger"
import type {Event} from "src/engine/events/model"
import type {Filter} from "../model"
import {getUrls} from "./executor"
import {guessFilterDelta} from "./filters"
import {Subscription} from "./subscribe"
import {subscribe} from "./subscribe"
import {Tracker} from "./tracker"
export type CursorOpts = {
@ -50,36 +51,34 @@ export class Cursor {
let count = 0
const sub = new Subscription({
const sub = subscribe({
timeout: 3000,
relays: [relay],
filters: filters.map(mergeRight({until, limit, since})),
tracker: this.opts.tracker,
})
filters: filters.map(mergeRight({until, limit, since})),
onEvent: (event: Event) => {
this.until = Math.min(until, event.created_at) - 1
this.buffer.push(event)
sub.on("event", (event: Event) => {
this.until = Math.min(until, event.created_at) - 1
this.buffer.push(event)
count += 1
count += 1
onEvent?.(event)
},
onClose: () => {
this.loading = false
onEvent?.(event)
})
// Relays can't be relied upon to return events in descending order, do exponential
// windowing to ensure we get the most recent stuff on first load, but eventually find it all
if (count === 0) {
this.delta *= 10
}
sub.on("close", () => {
this.loading = false
// Relays can't be relied upon to return events in descending order, do exponential
// windowing to ensure we get the most recent stuff on first load, but eventually find it all
if (count === 0) {
this.delta *= 10
}
if (this.since === 0) {
this.done = true
} else {
this.since = Math.max(0, this.since - this.delta)
}
if (this.since === 0) {
this.done = true
} else {
this.since = Math.max(0, this.since - this.delta)
}
},
})
return sub
@ -140,7 +139,7 @@ export class MultiCursor {
return this.cursors.reduce((n, c) => n + c.buffer.length, 0)
}
take(n: number): [Subscription[], Event[]] {
take(n: number): [(typeof Subscription)[], Event[]] {
const events = []
while (events.length < n) {

View File

@ -86,7 +86,7 @@ export class FeedLoader {
// Wait until a good number of subscriptions have completed to reduce the chance of
// out of order notes
this.ready = race(0.2, pluck("result", subs))
this.ready = race(0.2, subs.map(s => new Promise(r => s.on('close', r))))
}
discardEvents(events) {

View File

@ -7,7 +7,7 @@ import type {Event} from "src/engine/events/model"
import {mergeHints} from "src/engine/relays/utils"
import type {Filter} from "../model"
import {matchFilters, combineFilters} from "./filters"
import {Subscription} from "./subscribe"
import {subscribe} from "./subscribe"
import {Tracker} from "./tracker"
export type LoadOneOpts = {
@ -30,7 +30,7 @@ export type LoadItem = {
const loadChunk = (chunk, relays, tracker) => {
const filters = combineFilters(chunk.flatMap(getPath(["request", "filters"])))
const sub = new Subscription({relays, filters, timeout: 15000})
const sub = subscribe({relays, filters, timeout: 15000})
const chunkResults = []
for (const item of chunk) {

View File

@ -1,129 +1,38 @@
import type {Executor} from "paravel"
import EventEmitter from "events"
import type {SubscriptionOpts} from "paravel"
import {Subscription} from "paravel"
import {assoc, map} from "ramda"
import {defer, updateIn} from "hurdak"
import {updateIn} from "hurdak"
import {now} from "src/util/misc"
import {warn, info} from "src/util/logger"
import type {Event} from "src/engine/events/model"
import {projections} from "src/engine/core/projections"
import {getUrls, getExecutor} from "./executor"
import type {Filter} from "../model"
import {matchFilters, hasValidSignature} from "./filters"
import {getUrls, getExecutor} from './executor'
import {Tracker} from "./tracker"
export type SubscriptionOpts = {
relays: string[]
filters: Filter[]
timeout?: number
export type SubscribeOpts = typeof SubscriptionOpts & {
tracker?: Tracker
shouldProject?: boolean
}
export class Subscription extends EventEmitter {
executor: typeof Executor
opened = Date.now()
closed: number = null
result = defer()
events = []
eose = new Set()
tracker: Tracker
sub: {unsubscribe: () => void} = null
id = Math.random().toString().slice(12, 16)
constructor(readonly opts: SubscriptionOpts) {
super()
this.tracker = opts.tracker || new Tracker()
this.start()
}
start = () => {
const {timeout, relays, filters} = this.opts
if (timeout) {
setTimeout(this.close, timeout)
}
this.executor = getExecutor(getUrls(relays))
// If one of our connections gets closed make sure to kill our sub
this.executor.target.connections.forEach(con => con.on("close", this.close))
this.sub = this.executor.subscribe(filters, {
onEvent: this.onEvent,
onEose: this.onEose,
})
}
onEvent = (url: string, event: Event) => {
const {filters} = this.opts
const seen = this.tracker.add(event, url)
if (seen) {
return
}
event.content = event.content || ""
if (!hasValidSignature(event)) {
warn("Signature verification failed", {event})
return
}
if (!matchFilters(filters, event)) {
warn("Event failed to match filter", {filters, event})
return
}
if (this.opts.shouldProject !== false) {
projections.push(event)
}
this.events.push(event)
this.emit("event", event)
}
onEose = (url: string) => {
const {timeout, relays} = this.opts
this.emit("eose", url)
this.eose.add(url)
if (timeout && this.eose.size === relays.length) {
this.close()
}
}
close = () => {
if (!this.closed) {
this.closed = Date.now()
this.sub.unsubscribe()
this.executor.target.connections.forEach(con => con.off("close", this.close))
this.executor.target.cleanup()
this.result.resolve(this.events)
this.emit("close", this.events)
this.removeAllListeners()
}
}
}
export type SubscribeOpts = SubscriptionOpts & {
onEvent?: (e: Event) => void
onEose?: (url: string) => void
onClose?: (events: Event[]) => void
}
export const subscribe = (opts: SubscribeOpts) => {
const sub = new Subscription(opts)
info(`Starting subscription with ${opts.relays.length} relays`, {
relays: opts.relays,
filters: opts.filters,
const tracker = opts.tracker || new Tracker()
const sub = new Subscription({
...opts,
hasSeen: tracker.add,
closeOnEose: Boolean(opts.timeout),
executor: getExecutor(getUrls(opts.relays)),
})
sub.on("event", e => {
opts.onEvent?.(e)
if (opts.shouldProject !== false) {
projections.push(e)
}
})
if (opts.onEvent) sub.on("event", opts.onEvent)
if (opts.onEose) sub.on("eose", opts.onEose)
if (opts.onClose) sub.on("close", opts.onClose)
@ -133,8 +42,8 @@ export const subscribe = (opts: SubscribeOpts) => {
export const subscribePersistent = async (opts: SubscribeOpts) => {
/* eslint no-constant-condition: 0 */
while (true) {
await subscribe(opts).result
const sub = subscribe(updateIn("filters", map(assoc("since", now())), opts))
opts = updateIn("filters", map(assoc("since", now())), opts)
await new Promise(resolve => sub.on("close", resolve))
}
}

BIN
yarn.lock

Binary file not shown.