Add sync helper and loadAll method to multicursor

This commit is contained in:
Jon Staab 2024-03-26 11:36:04 -07:00
parent 5aebc1a74c
commit e2fe579e6c
9 changed files with 57 additions and 93 deletions

View File

@ -15,6 +15,7 @@
import {
nip44,
pubkey,
readable,
canSign,
channels,
hasNewMessages,
@ -35,7 +36,8 @@
$: tabChannels = sortChannels(activeTab === "conversations" ? $accepted : $requests)
let cursor, unsub, interval, loading
let loader
let loading = readable(false)
let hideNip04Alert = Storage.getJson("hide_nip04_alert")
const hideAlert = () => {
@ -44,15 +46,9 @@
}
const loadMessages = ({reload = false} = {}) => {
if (!loading) {
unsub?.()
clearInterval(interval)
;[cursor, unsub] = loadAllMessages({reload})
setInterval(() => {
loading = !cursor.done()
}, 300)
}
loader?.stop()
loader = loadAllMessages({reload})
loading = loader.loading
}
onMount(() => {
@ -62,7 +58,7 @@
}
return () => {
unsub?.()
loader?.stop()
}
})
@ -112,11 +108,11 @@
<div slot="trigger">
<i
class="fa fa-arrows-rotate cursor-pointer text-neutral-600"
class:fa-spin={loading}
class:fa-spin={$loading}
on:click={() => loadMessages({reload: true})} />
</div>
<div slot="tooltip">
{#if loading}
{#if $loading}
Loading conversations...
{:else}
Reload conversations

View File

@ -1,5 +1,5 @@
import {assocPath, uniq} from "ramda"
import {seconds, sleep} from "hurdak"
import {seconds} from "hurdak"
import {now} from "@coracle.social/lib"
import {sessions} from "src/engine/session/state"
import {session} from "src/engine/session/derived"
@ -26,25 +26,9 @@ export const loadAllMessages = ({reload = false} = {}) => {
],
})
let done = false
setTimeout(async () => {
while (!done) {
cursor.take(250)
await sleep(2000)
done = cursor.done()
}
return cursor.loadAll({
onComplete: unsubscribePubkeys,
})
return [
cursor,
() => {
done = true
unsubscribePubkeys()
},
]
}
export const listenForMessages = (pubkeys: string[]) => {

View File

@ -1,5 +1,5 @@
import {prop} from "ramda"
import {Worker} from "src/engine/core/utils"
import {Worker} from "@coracle.social/lib"
import type {Event} from "src/engine/events/model"
export const projections = new Worker<Event>({

View File

@ -1,3 +1,2 @@
export * from "./store"
export * from "./worker"
export * from "./storage"

View File

@ -1,54 +0,0 @@
const ANY = "worker/ANY"
export type WorkerOpts<T> = {
getKey: (x: T) => any
}
export class Worker<T> {
buffer: T[] = []
handlers: Map<any, Array<(x: T) => void>> = new Map()
timeout: NodeJS.Timeout | undefined
constructor(readonly opts: WorkerOpts<T>) {}
#doWork = async () => {
for (const message of this.buffer.splice(0, 50)) {
const k = this.opts.getKey(message)
for (const handler of this.handlers.get(ANY) || []) {
await handler(message)
}
for (const handler of this.handlers.get(k) || []) {
await handler(message)
}
}
this.timeout = undefined
this.#enqueueWork()
}
#enqueueWork = () => {
if (!this.timeout && this.buffer.length > 0) {
this.timeout = setTimeout(this.#doWork, 50)
}
}
push = (message: T) => {
this.buffer.push(message)
this.#enqueueWork()
}
addHandler = (k, handler: (message: T) => void) => {
if (!this.handlers.has(k)) {
this.handlers.set(k, [])
}
this.handlers.get(k).push(handler)
}
addGlobalHandler = (handler: (message: T) => void) => {
this.addHandler(ANY, handler)
}
}

View File

@ -1,8 +1,10 @@
import {seconds} from "hurdak"
import type {Event} from "nostr-tools"
import {Worker} from "@coracle.social/lib"
import {giftWrapKinds} from "src/util/nostr"
import {session, nip44, nip04} from "src/engine/session/derived"
import {hints} from "src/engine/relays/utils"
import {load} from "src/engine/network/utils"
import {load, MultiCursor, Publisher} from "src/engine/network/utils"
export const loadDeletes = () => {
const {pubkey, deletes_last_synced = 0} = session.get()
@ -45,3 +47,17 @@ export const loadGiftWrap = () => {
})
}
}
export const sync = (fromUrl, toUrl, filters) => {
const worker = new Worker<Event>()
worker.addGlobalHandler(event => Publisher.publish({event, relays: [toUrl]}))
const cursor = new MultiCursor({
filters,
relays: [fromUrl],
onEvent: worker.push,
})
cursor.loadAll()
}

View File

@ -1,11 +1,12 @@
import {mergeLeft, pluck, min, max, identity, sortBy} from "ramda"
import {first} from "hurdak"
import {first, sleep} from "hurdak"
import {now} from "@coracle.social/lib"
import type {Filter} from "@coracle.social/util"
import {guessFilterDelta} from "@coracle.social/util"
import type {Subscription} from "@coracle.social/network"
import type {Event} from "src/engine/events/model"
import {sortEventsDesc} from "src/engine/events/utils"
import {writable} from "src/engine/core/utils"
import {getUrls} from "./executor"
import {subscribe} from "./subscribe"
import {Tracker} from "./tracker"
@ -184,4 +185,26 @@ export class MultiCursor {
return [subs, events]
}
loadAll = ({onComplete = null} = {}) => {
const loading = writable(true)
const stop = () => {
loading.set(false)
onComplete?.()
}
setTimeout(async () => {
while (loading.get()) {
this.take(250)
await sleep(2000)
if (this.done()) {
stop()
}
}
})
return {stop, loading}
}
}

View File

@ -16,7 +16,7 @@ export const getUrls = (relays: string[]) => {
error(`Attempted to connect to zero urls`)
}
const urls = uniq(relays.map(normalizeRelayUrl))
const urls = uniq(relays.map(url => normalizeRelayUrl(url, {allowInsecure: true})))
if (urls.length !== relays.length) {
warn(`Attempted to connect to non-unique relays`)

View File

@ -15,13 +15,13 @@ import {getSetting} from "src/engine/session/utils"
import type {Relay} from "./model"
import {relays} from "./state"
export const normalizeRelayUrl = (url: string) => {
export const normalizeRelayUrl = (url: string, opts = {}) => {
if (url === LOCAL_RELAY_URL) {
return url
}
try {
return normalize(url)
return normalize(url, opts)
} catch (e) {
return url
}