mirror of
https://github.com/coracle-social/coracle.git
synced 2024-09-29 08:21:20 +00:00
Use multiplexer properly when loading
This commit is contained in:
parent
b8707359ca
commit
3df5282f81
@ -1,10 +1,13 @@
|
||||
import {always, path as getPath} from "ramda"
|
||||
import {defer} from "hurdak"
|
||||
import {pushToKey} from "src/util/misc"
|
||||
import {info} from "src/util/logger"
|
||||
import {getSetting} from "src/engine/session/utils"
|
||||
import type {Event} from "src/engine/events/model"
|
||||
import {mergeHints} from "src/engine/relays/utils"
|
||||
import type {Filter} from "../model"
|
||||
import {matchFilters, combineFilters} from "./filters"
|
||||
import {Subscription} from "./subscribe"
|
||||
import {subscribe} from "./subscribe"
|
||||
import {Tracker} from "./tracker"
|
||||
|
||||
export type LoadOpts = {
|
||||
@ -22,6 +25,41 @@ export type LoadItem = {
|
||||
|
||||
const queue = []
|
||||
|
||||
const loadChunk = (chunk, relays, eventWasSeen = always(false)) => {
|
||||
const sub = subscribe({
|
||||
relays,
|
||||
timeout: 15000,
|
||||
filters: combineFilters(chunk.flatMap(getPath(["request", "filters"]))),
|
||||
onEvent: e => {
|
||||
if (eventWasSeen(e)) {
|
||||
return
|
||||
}
|
||||
|
||||
for (const {request} of chunk) {
|
||||
if (request.onEvent && matchFilters(request.filters, e)) {
|
||||
request.onEvent(e)
|
||||
}
|
||||
}
|
||||
},
|
||||
onEose: url => {
|
||||
for (const {request} of chunk) {
|
||||
request.onEose?.(url)
|
||||
}
|
||||
},
|
||||
onClose: events => {
|
||||
for (const {request} of chunk) {
|
||||
request.onClose?.(events)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
sub.result.then(events => {
|
||||
for (const item of chunk) {
|
||||
item.result.resolve(events)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const execute = () => {
|
||||
const filters = combineFilters(queue.flatMap(item => item.request.filters))
|
||||
|
||||
@ -29,55 +67,27 @@ export const execute = () => {
|
||||
return
|
||||
}
|
||||
|
||||
const nRequests = queue.length
|
||||
const items = queue.splice(0)
|
||||
|
||||
info(`Loading ${items.length} grouped requests`, filters)
|
||||
|
||||
// If we're using multiplexer, let it do its thing
|
||||
if (getSetting("multiplextr_url")) {
|
||||
loadChunk(items, mergeHints(items.map(getPath(["request", "relays"]))))
|
||||
} else {
|
||||
const itemsByRelay = {}
|
||||
for (const item of queue.splice(0)) {
|
||||
for (const item of items) {
|
||||
for (const url of item.request.relays) {
|
||||
pushToKey(itemsByRelay, url, item)
|
||||
}
|
||||
}
|
||||
|
||||
info(`Loading ${nRequests} grouped requests`, filters)
|
||||
|
||||
// Group by relay, then by filter
|
||||
for (const [url, chunk] of Object.entries(itemsByRelay) as [string, LoadItem[]][]) {
|
||||
const tracker = new Tracker()
|
||||
|
||||
// Group by relay, then by filter
|
||||
for (const [url, items] of Object.entries(itemsByRelay) as [string, LoadItem[]][]) {
|
||||
const filters = combineFilters(items.flatMap(item => item.request.filters))
|
||||
const sub = new Subscription({filters, relays: [url], timeout: 15000})
|
||||
|
||||
sub.on("event", e => {
|
||||
const seen = tracker.add(e, url)
|
||||
|
||||
if (seen) {
|
||||
return
|
||||
loadChunk(chunk, [url], e => tracker.add(e, url))
|
||||
}
|
||||
|
||||
for (const {request} of items) {
|
||||
if (request.onEvent && matchFilters(request.filters, e)) {
|
||||
request.onEvent(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
sub.on("eose", url => {
|
||||
for (const {request} of items) {
|
||||
request.onEose?.(url)
|
||||
}
|
||||
})
|
||||
|
||||
sub.on("close", events => {
|
||||
for (const {request} of items) {
|
||||
request.onClose?.(events)
|
||||
}
|
||||
})
|
||||
|
||||
sub.result.then(events => {
|
||||
for (const item of items) {
|
||||
item.result.resolve(events)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user