From 6d8c0325e45b561f914fb2dd75b03a72d8c6fcda Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 18 Jan 2024 11:17:57 +0000 Subject: [PATCH] feat: process worker messages in queue --- packages/app/src/Cache/PaymentsCache.ts | 17 ------- packages/app/src/Cache/index.ts | 2 - packages/app/src/Feed/LoginFeed.ts | 3 -- packages/app/src/Pages/settings/Cache.tsx | 40 ++++++++------- packages/app/src/index.tsx | 1 - packages/worker-relay/src/queue.ts | 30 ++++++++++++ packages/worker-relay/src/relay.ts | 4 +- packages/worker-relay/src/worker.ts | 60 ++++++++++++++--------- 8 files changed, 92 insertions(+), 65 deletions(-) delete mode 100644 packages/app/src/Cache/PaymentsCache.ts create mode 100644 packages/worker-relay/src/queue.ts diff --git a/packages/app/src/Cache/PaymentsCache.ts b/packages/app/src/Cache/PaymentsCache.ts deleted file mode 100644 index aa6e0bd6..00000000 --- a/packages/app/src/Cache/PaymentsCache.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { FeedCache } from "@snort/shared"; - -import { db, Payment } from "@/Db"; - -export class Payments extends FeedCache { - constructor() { - super("PaymentsCache", db.payments); - } - - key(of: Payment): string { - return of.url; - } - - takeSnapshot(): Array { - return [...this.cache.values()]; - } -} diff --git a/packages/app/src/Cache/index.ts b/packages/app/src/Cache/index.ts index cbd67fc4..84a899ec 100644 --- a/packages/app/src/Cache/index.ts +++ b/packages/app/src/Cache/index.ts @@ -3,7 +3,6 @@ import { SnortSystemDb } from "@snort/system-web"; import { ChatCache } from "./ChatCache"; import { GiftWrapCache } from "./GiftWrapCache"; -import { Payments } from "./PaymentsCache"; export const SystemDb = new SnortSystemDb(); export const UserCache = new UserProfileCache(SystemDb.users); @@ -11,7 +10,6 @@ export const UserRelays = new UserRelaysCache(SystemDb.userRelays); export const RelayMetrics = new RelayMetricCache(SystemDb.relayMetrics); export const Chats = new ChatCache(); -export const PaymentsCache = new Payments(); export const GiftsCache = new GiftWrapCache(); export async function preload(follows?: Array) { diff --git a/packages/app/src/Feed/LoginFeed.ts b/packages/app/src/Feed/LoginFeed.ts index b5e8356c..5a798342 100644 --- a/packages/app/src/Feed/LoginFeed.ts +++ b/packages/app/src/Feed/LoginFeed.ts @@ -112,9 +112,6 @@ export default function useLoginFeed() { if (contactList) { const pTags = contactList.tags.filter(a => a[0] === "p").map(a => a[1]); setFollows(login.id, pTags, contactList.created_at * 1000); - - // TODO: fixup - // FollowsFeed.backFillIfMissing(system, pTags); } const relays = getNewest(loginFeed.filter(a => a.kind === EventKind.Relays)); diff --git a/packages/app/src/Pages/settings/Cache.tsx b/packages/app/src/Pages/settings/Cache.tsx index e430b701..812d17b7 100644 --- a/packages/app/src/Pages/settings/Cache.tsx +++ b/packages/app/src/Pages/settings/Cache.tsx @@ -2,7 +2,7 @@ import { FeedCache } from "@snort/shared"; import { ReactNode, useEffect, useState, useSyncExternalStore } from "react"; import { FormattedMessage, FormattedNumber } from "react-intl"; -import { Chats, GiftsCache, PaymentsCache, RelayMetrics, UserCache } from "@/Cache"; +import { Chats, GiftsCache, RelayMetrics, UserCache } from "@/Cache"; import AsyncButton from "@/Components/Button/AsyncButton"; import { Relay } from "@/system"; @@ -16,7 +16,6 @@ export function CacheSettings() { } /> } /> } /> - } /> } /> ); @@ -61,25 +60,30 @@ function RelayCacheStats() { return (
-
+
- {Object.entries(counts).map(([k, v]) => { - return ( - - , - k: k, - }} - /> - - ); - })} + + + + + + + + + {Object.entries(counts).sort(([, a], [, b]) => a > b ? -1 : 1).map(([k, v]) => { + return ( + + + + + ); + })} + +
{k}
+
- {}}> + { }}>
diff --git a/packages/app/src/index.tsx b/packages/app/src/index.tsx index 385eeb4f..027cd8b2 100644 --- a/packages/app/src/index.tsx +++ b/packages/app/src/index.tsx @@ -1,7 +1,6 @@ import "./index.css"; import "@szhsin/react-menu/dist/index.css"; import "@/assets/fonts/inter.css"; -import "./wdyr"; import { encodeTLVEntries } from "@snort/system"; import { SnortContext } from "@snort/system-react"; diff --git a/packages/worker-relay/src/queue.ts b/packages/worker-relay/src/queue.ts new file mode 100644 index 00000000..37df22e0 --- /dev/null +++ b/packages/worker-relay/src/queue.ts @@ -0,0 +1,30 @@ +export interface WorkQueueItem { + next: () => Promise; + resolve(v: unknown): void; + reject(e: unknown): void; +} + +export async function processWorkQueue(queue?: Array, queueDelay = 200) { + while (queue && queue.length > 0) { + const v = queue.shift(); + if (v) { + try { + const ret = await v.next(); + v.resolve(ret); + } catch (e) { + v.reject(e); + } + } + } + setTimeout(() => processWorkQueue(queue, queueDelay), queueDelay); +} + +export const barrierQueue = async (queue: Array, then: () => Promise): Promise => { + return new Promise((resolve, reject) => { + queue.push({ + next: then, + resolve, + reject, + }); + }); +}; diff --git a/packages/worker-relay/src/relay.ts b/packages/worker-relay/src/relay.ts index 389adc82..6ab25140 100644 --- a/packages/worker-relay/src/relay.ts +++ b/packages/worker-relay/src/relay.ts @@ -4,7 +4,7 @@ import { NostrEvent, ReqFilter, unixNowMs } from "./types"; export class WorkerRelay { #sqlite?: Sqlite3Static; - #log = (...msg: Array) => console.debug(...msg); + #log = debug("WorkerRelay"); #db?: Database; /** @@ -169,7 +169,7 @@ export class WorkerRelay { * Get a summary about events table */ summary() { - const res = this.#db?.exec("select kind, count(*) from events group by kind order by 2 desc", { + const res = this.#db?.exec("select kind, count(*) from events group by kind", { returnValue: "resultRows", }); return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []); diff --git a/packages/worker-relay/src/worker.ts b/packages/worker-relay/src/worker.ts index 62b818c7..0c529430 100644 --- a/packages/worker-relay/src/worker.ts +++ b/packages/worker-relay/src/worker.ts @@ -1,5 +1,6 @@ /// +import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue"; import { WorkerRelay } from "./relay"; import { NostrEvent, ReqCommand, ReqFilter, WorkerMessage } from "./types"; @@ -24,29 +25,38 @@ async function insertBatch() { } setTimeout(() => insertBatch(), 100); +let cmdQueue: Array = []; +processWorkQueue(cmdQueue, 50); + globalThis.onclose = () => { relay.close(); }; -globalThis.onmessage = async ev => { +globalThis.onmessage = ev => { //console.debug(ev); const msg = ev.data as WorkerMessage; try { switch (msg.cmd) { case "init": { - await relay.init(); - reply(msg.id, true); + barrierQueue(cmdQueue, async () => { + await relay.init(); + reply(msg.id, true); + }); break; } case "open": { - await relay.open("/relay.db"); - reply(msg.id, true); + barrierQueue(cmdQueue, async () => { + await relay.open("/relay.db"); + reply(msg.id, true); + }); break; } case "migrate": { - relay.migrate(); - reply(msg.id, true); + barrierQueue(cmdQueue, async () => { + relay.migrate(); + reply(msg.id, true); + }); break; } case "event": { @@ -55,27 +65,33 @@ globalThis.onmessage = async ev => { break; } case "req": { - const req = msg.args as ReqCommand; - const results = []; - for (const r of req.slice(2)) { - results.push(...relay.req(r as ReqFilter)); - } - reply(msg.id, results); + barrierQueue(cmdQueue, async () => { + const req = msg.args as ReqCommand; + const results = []; + for (const r of req.slice(2)) { + results.push(...relay.req(r as ReqFilter)); + } + reply(msg.id, results); + }); break; } case "count": { - const req = msg.args as ReqCommand; - let results = 0; - for (const r of req.slice(2)) { - const c = relay.count(r as ReqFilter); - results += c; - } - reply(msg.id, results); + barrierQueue(cmdQueue, async () => { + const req = msg.args as ReqCommand; + let results = 0; + for (const r of req.slice(2)) { + const c = relay.count(r as ReqFilter); + results += c; + } + reply(msg.id, results); + }); break; } case "summary": { - const res = relay.summary(); - reply(msg.id, res); + barrierQueue(cmdQueue, async () => { + const res = relay.summary(); + reply(msg.id, res); + }); break; } default: {