feat: process worker messages in queue

This commit is contained in:
Kieran 2024-01-18 11:17:57 +00:00
parent 2ea516e636
commit 6d8c0325e4
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
8 changed files with 92 additions and 65 deletions

View File

@ -1,17 +0,0 @@
import { FeedCache } from "@snort/shared";
import { db, Payment } from "@/Db";
export class Payments extends FeedCache<Payment> {
constructor() {
super("PaymentsCache", db.payments);
}
key(of: Payment): string {
return of.url;
}
takeSnapshot(): Array<Payment> {
return [...this.cache.values()];
}
}

View File

@ -3,7 +3,6 @@ import { SnortSystemDb } from "@snort/system-web";
import { ChatCache } from "./ChatCache";
import { GiftWrapCache } from "./GiftWrapCache";
import { Payments } from "./PaymentsCache";
export const SystemDb = new SnortSystemDb();
export const UserCache = new UserProfileCache(SystemDb.users);
@ -11,7 +10,6 @@ export const UserRelays = new UserRelaysCache(SystemDb.userRelays);
export const RelayMetrics = new RelayMetricCache(SystemDb.relayMetrics);
export const Chats = new ChatCache();
export const PaymentsCache = new Payments();
export const GiftsCache = new GiftWrapCache();
export async function preload(follows?: Array<string>) {

View File

@ -112,9 +112,6 @@ export default function useLoginFeed() {
if (contactList) {
const pTags = contactList.tags.filter(a => a[0] === "p").map(a => a[1]);
setFollows(login.id, pTags, contactList.created_at * 1000);
// TODO: fixup
// FollowsFeed.backFillIfMissing(system, pTags);
}
const relays = getNewest(loginFeed.filter(a => a.kind === EventKind.Relays));

View File

@ -2,7 +2,7 @@ import { FeedCache } from "@snort/shared";
import { ReactNode, useEffect, useState, useSyncExternalStore } from "react";
import { FormattedMessage, FormattedNumber } from "react-intl";
import { Chats, GiftsCache, PaymentsCache, RelayMetrics, UserCache } from "@/Cache";
import { Chats, GiftsCache, RelayMetrics, UserCache } from "@/Cache";
import AsyncButton from "@/Components/Button/AsyncButton";
import { Relay } from "@/system";
@ -16,7 +16,6 @@ export function CacheSettings() {
<CacheDetails cache={UserCache} name={<FormattedMessage defaultMessage="Profiles" id="2zJXeA" />} />
<CacheDetails cache={Chats} name={<FormattedMessage defaultMessage="Chats" id="ABAQyo" />} />
<CacheDetails cache={RelayMetrics} name={<FormattedMessage defaultMessage="Relay Metrics" id="tjpYlr" />} />
<CacheDetails cache={PaymentsCache} name={<FormattedMessage defaultMessage="Payments" id="iYc3Ld" />} />
<CacheDetails cache={GiftsCache} name={<FormattedMessage defaultMessage="Gift Wraps" id="fjAcWo" />} />
</div>
);
@ -61,25 +60,30 @@ function RelayCacheStats() {
return (
<div className="flex justify-between br p bg-superdark">
<div className="flex flex-col g4">
<div className="flex flex-col g4 w-64">
<FormattedMessage defaultMessage="Worker Relay" id="xSoIUU" />
{Object.entries(counts).map(([k, v]) => {
return (
<small key={k}>
<FormattedMessage
defaultMessage="{n} kind {k} events"
id="I97cCX"
values={{
n: <FormattedNumber value={v} />,
k: k,
}}
/>
</small>
);
})}
<table className="text-secondary">
<thead>
<tr>
<th className="text-left"><FormattedMessage defaultMessage="Kind" id="e5x8FT" /></th>
<th className="text-left"><FormattedMessage defaultMessage="Count" id="Aujn2T" /></th>
</tr>
</thead>
<tbody>
{Object.entries(counts).sort(([, a], [, b]) => a > b ? -1 : 1).map(([k, v]) => {
return (
<tr key={k}>
<td>{k}</td>
<td><FormattedNumber value={v} /></td>
</tr>
);
})}
</tbody>
</table>
</div>
<div>
<AsyncButton onClick={() => {}}>
<AsyncButton onClick={() => { }}>
<FormattedMessage defaultMessage="Clear" id="/GCoTA" />
</AsyncButton>
</div>

View File

@ -1,7 +1,6 @@
import "./index.css";
import "@szhsin/react-menu/dist/index.css";
import "@/assets/fonts/inter.css";
import "./wdyr";
import { encodeTLVEntries } from "@snort/system";
import { SnortContext } from "@snort/system-react";

View File

@ -0,0 +1,30 @@
export interface WorkQueueItem {
next: () => Promise<unknown>;
resolve(v: unknown): void;
reject(e: unknown): void;
}
export async function processWorkQueue(queue?: Array<WorkQueueItem>, queueDelay = 200) {
while (queue && queue.length > 0) {
const v = queue.shift();
if (v) {
try {
const ret = await v.next();
v.resolve(ret);
} catch (e) {
v.reject(e);
}
}
}
setTimeout(() => processWorkQueue(queue, queueDelay), queueDelay);
}
export const barrierQueue = async <T>(queue: Array<WorkQueueItem>, then: () => Promise<T>): Promise<T> => {
return new Promise<T>((resolve, reject) => {
queue.push({
next: then,
resolve,
reject,
});
});
};

View File

@ -4,7 +4,7 @@ import { NostrEvent, ReqFilter, unixNowMs } from "./types";
export class WorkerRelay {
#sqlite?: Sqlite3Static;
#log = (...msg: Array<any>) => console.debug(...msg);
#log = debug("WorkerRelay");
#db?: Database;
/**
@ -169,7 +169,7 @@ export class WorkerRelay {
* Get a summary about events table
*/
summary() {
const res = this.#db?.exec("select kind, count(*) from events group by kind order by 2 desc", {
const res = this.#db?.exec("select kind, count(*) from events group by kind", {
returnValue: "resultRows",
});
return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []);

View File

@ -1,5 +1,6 @@
/// <reference lib="webworker" />
import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";
import { WorkerRelay } from "./relay";
import { NostrEvent, ReqCommand, ReqFilter, WorkerMessage } from "./types";
@ -24,29 +25,38 @@ async function insertBatch() {
}
setTimeout(() => insertBatch(), 100);
let cmdQueue: Array<WorkQueueItem> = [];
processWorkQueue(cmdQueue, 50);
globalThis.onclose = () => {
relay.close();
};
globalThis.onmessage = async ev => {
globalThis.onmessage = ev => {
//console.debug(ev);
const msg = ev.data as WorkerMessage<any>;
try {
switch (msg.cmd) {
case "init": {
await relay.init();
reply(msg.id, true);
barrierQueue(cmdQueue, async () => {
await relay.init();
reply(msg.id, true);
});
break;
}
case "open": {
await relay.open("/relay.db");
reply(msg.id, true);
barrierQueue(cmdQueue, async () => {
await relay.open("/relay.db");
reply(msg.id, true);
});
break;
}
case "migrate": {
relay.migrate();
reply(msg.id, true);
barrierQueue(cmdQueue, async () => {
relay.migrate();
reply(msg.id, true);
});
break;
}
case "event": {
@ -55,27 +65,33 @@ globalThis.onmessage = async ev => {
break;
}
case "req": {
const req = msg.args as ReqCommand;
const results = [];
for (const r of req.slice(2)) {
results.push(...relay.req(r as ReqFilter));
}
reply(msg.id, results);
barrierQueue(cmdQueue, async () => {
const req = msg.args as ReqCommand;
const results = [];
for (const r of req.slice(2)) {
results.push(...relay.req(r as ReqFilter));
}
reply(msg.id, results);
});
break;
}
case "count": {
const req = msg.args as ReqCommand;
let results = 0;
for (const r of req.slice(2)) {
const c = relay.count(r as ReqFilter);
results += c;
}
reply(msg.id, results);
barrierQueue(cmdQueue, async () => {
const req = msg.args as ReqCommand;
let results = 0;
for (const r of req.slice(2)) {
const c = relay.count(r as ReqFilter);
results += c;
}
reply(msg.id, results);
});
break;
}
case "summary": {
const res = relay.summary();
reply(msg.id, res);
barrierQueue(cmdQueue, async () => {
const res = relay.summary();
reply(msg.id, res);
});
break;
}
default: {