From 3df5282f81219a76419996db7dbe5b1dd4c6a953 Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Mon, 25 Sep 2023 16:15:49 -0700 Subject: [PATCH] Use multiplexer properly when loading --- src/engine/network/utils/load.ts | 102 +++++++++++++++++-------------- 1 file changed, 56 insertions(+), 46 deletions(-) diff --git a/src/engine/network/utils/load.ts b/src/engine/network/utils/load.ts index 5d3dd671..4b3b05b6 100644 --- a/src/engine/network/utils/load.ts +++ b/src/engine/network/utils/load.ts @@ -1,10 +1,13 @@ +import {always, path as getPath} from "ramda" import {defer} from "hurdak" import {pushToKey} from "src/util/misc" import {info} from "src/util/logger" +import {getSetting} from "src/engine/session/utils" import type {Event} from "src/engine/events/model" +import {mergeHints} from "src/engine/relays/utils" import type {Filter} from "../model" import {matchFilters, combineFilters} from "./filters" -import {Subscription} from "./subscribe" +import {subscribe} from "./subscribe" import {Tracker} from "./tracker" export type LoadOpts = { @@ -22,6 +25,41 @@ export type LoadItem = { const queue = [] +const loadChunk = (chunk, relays, eventWasSeen = always(false)) => { + const sub = subscribe({ + relays, + timeout: 15000, + filters: combineFilters(chunk.flatMap(getPath(["request", "filters"]))), + onEvent: e => { + if (eventWasSeen(e)) { + return + } + + for (const {request} of chunk) { + if (request.onEvent && matchFilters(request.filters, e)) { + request.onEvent(e) + } + } + }, + onEose: url => { + for (const {request} of chunk) { + request.onEose?.(url) + } + }, + onClose: events => { + for (const {request} of chunk) { + request.onClose?.(events) + } + }, + }) + + sub.result.then(events => { + for (const item of chunk) { + item.result.resolve(events) + } + }) +} + export const execute = () => { const filters = combineFilters(queue.flatMap(item => item.request.filters)) @@ -29,55 +67,27 @@ export const execute = () => { return } - const nRequests = queue.length + const items = queue.splice(0) - const itemsByRelay = {} - for (const item of queue.splice(0)) { - for (const url of item.request.relays) { - pushToKey(itemsByRelay, url, item) + info(`Loading ${items.length} grouped requests`, filters) + + // If we're using multiplexer, let it do its thing + if (getSetting("multiplextr_url")) { + loadChunk(items, mergeHints(items.map(getPath(["request", "relays"])))) + } else { + const itemsByRelay = {} + for (const item of items) { + for (const url of item.request.relays) { + pushToKey(itemsByRelay, url, item) + } } - } - info(`Loading ${nRequests} grouped requests`, filters) + // Group by relay, then by filter + for (const [url, chunk] of Object.entries(itemsByRelay) as [string, LoadItem[]][]) { + const tracker = new Tracker() - const tracker = new Tracker() - - // Group by relay, then by filter - for (const [url, items] of Object.entries(itemsByRelay) as [string, LoadItem[]][]) { - const filters = combineFilters(items.flatMap(item => item.request.filters)) - const sub = new Subscription({filters, relays: [url], timeout: 15000}) - - sub.on("event", e => { - const seen = tracker.add(e, url) - - if (seen) { - return - } - - for (const {request} of items) { - if (request.onEvent && matchFilters(request.filters, e)) { - request.onEvent(e) - } - } - }) - - sub.on("eose", url => { - for (const {request} of items) { - request.onEose?.(url) - } - }) - - sub.on("close", events => { - for (const {request} of items) { - request.onClose?.(events) - } - }) - - sub.result.then(events => { - for (const item of items) { - item.result.resolve(events) - } - }) + loadChunk(chunk, [url], e => tracker.add(e, url)) + } } }