feat: insertBatchSize

This commit is contained in:
kieran 2024-04-12 18:28:50 +01:00
parent e62bb58362
commit e5c8634c59
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
6 changed files with 39 additions and 11 deletions

View File

@ -13,7 +13,10 @@ export const Relay = new WorkerRelayInterface(
); );
export async function initRelayWorker() { export async function initRelayWorker() {
try { try {
await Relay.init("relay.db"); await Relay.init({
databasePath: "relay.db",
insertBatchSize: 100
});
} catch (e) { } catch (e) {
console.error(e); console.error(e);
} }

View File

@ -22,7 +22,10 @@ const workerScript = import.meta.env.DEV
const workerRelay = new WorkerRelayInterface(workerScript); const workerRelay = new WorkerRelayInterface(workerScript);
// load sqlite database and run migrations // load sqlite database and run migrations
await workerRelay.init("my-relay.db"); await workerRelay.init({
databasePath: "relay.db",
insertBatchSize: 100
});
// Query worker relay with regular nostr REQ command // Query worker relay with regular nostr REQ command
const results = await workerRelay.query(["REQ", "1", { kinds: [1], limit: 10 }]); const results = await workerRelay.query(["REQ", "1", { kinds: [1], limit: 10 }]);

View File

@ -11,7 +11,10 @@ const workerScript = import.meta.env.DEV
const workerRelay = new WorkerRelayInterface(workerScript); const workerRelay = new WorkerRelayInterface(workerScript);
// load sqlite database and run migrations // load sqlite database and run migrations
await workerRelay.init("my-relay.db"); await workerRelay.init({
databasePath: "relay.db",
insertBatchSize: 100
});
// Query worker relay with regular nostr REQ command // Query worker relay with regular nostr REQ command
const results = await workerRelay.query(["REQ", "1", { kinds: [1], limit: 10 }]); const results = await workerRelay.query(["REQ", "1", { kinds: [1], limit: 10 }]);

View File

@ -1,6 +1,6 @@
{ {
"name": "@snort/worker-relay", "name": "@snort/worker-relay",
"version": "1.0.9", "version": "1.0.10",
"description": "A nostr relay in a service worker", "description": "A nostr relay in a service worker",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",

View File

@ -1,6 +1,18 @@
import { EventMetadata, NostrEvent, OkResponse, ReqCommand, WorkerMessage, WorkerMessageCommand } from "./types"; import { EventMetadata, NostrEvent, OkResponse, ReqCommand, WorkerMessage, WorkerMessageCommand } from "./types";
import { v4 as uuid } from "uuid"; import { v4 as uuid } from "uuid";
export interface InitAargs {
/**
* OPFS file path for the database
*/
databasePath: string,
/**
* How many events to insert per batch
*/
insertBatchSize?: number
}
export class WorkerRelayInterface { export class WorkerRelayInterface {
#worker: Worker; #worker: Worker;
#commandQueue: Map<string, (v: unknown, ports: ReadonlyArray<MessagePort>) => void> = new Map(); #commandQueue: Map<string, (v: unknown, ports: ReadonlyArray<MessagePort>) => void> = new Map();
@ -35,8 +47,8 @@ export class WorkerRelayInterface {
}; };
} }
async init(databasePath: string) { async init(args: InitAargs) {
return await this.#workerRpc<Array<string | undefined>, boolean>("init", [databasePath]); return await this.#workerRpc<InitAargs, boolean>("init", args);
} }
async event(ev: NostrEvent) { async event(ev: NostrEvent) {

View File

@ -2,12 +2,13 @@
import { SqliteRelay } from "./sqlite-relay"; import { SqliteRelay } from "./sqlite-relay";
import { InMemoryRelay } from "./memory-relay"; import { InMemoryRelay } from "./memory-relay";
import { debugLog, setLogging } from "./debug"; import { setLogging } from "./debug";
import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue"; import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";
import { NostrEvent, RelayHandler, ReqCommand, ReqFilter, WorkerMessage, unixNowMs, EventMetadata } from "./types"; import { NostrEvent, RelayHandler, ReqCommand, ReqFilter, WorkerMessage, unixNowMs, EventMetadata } from "./types";
import { getForYouFeed } from "./forYouFeed"; import { getForYouFeed } from "./forYouFeed";
let relay: RelayHandler | undefined; let relay: RelayHandler | undefined;
let insertBatchSize = 10;
// Event inserter queue // Event inserter queue
let eventWriteQueue: Array<NostrEvent> = []; let eventWriteQueue: Array<NostrEvent> = [];
@ -24,7 +25,7 @@ async function insertBatch() {
//console.debug("Yield insert, queue length: ", eventWriteQueue.length, ", cmds: ", cmdQueue.length); //console.debug("Yield insert, queue length: ", eventWriteQueue.length, ", cmds: ", cmdQueue.length);
break; break;
} }
const batch = eventWriteQueue.splice(0, 10); const batch = eventWriteQueue.splice(0, insertBatchSize);
eventWriteQueue = eventWriteQueue.slice(batch.length); eventWriteQueue = eventWriteQueue.slice(batch.length);
relay.eventBatch(batch); relay.eventBatch(batch);
} }
@ -42,6 +43,11 @@ try {
console.error(e); console.error(e);
} }
interface InitAargs {
databasePath: string,
insertBatchSize?: number
}
const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: MessageEvent) => { const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: MessageEvent) => {
async function reply<T>(id: string, obj?: T) { async function reply<T>(id: string, obj?: T) {
port.postMessage({ port.postMessage({
@ -61,18 +67,19 @@ const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: Mes
} }
case "init": { case "init": {
await barrierQueue(cmdQueue, async () => { await barrierQueue(cmdQueue, async () => {
const [dbPath] = msg.args as Array<string>; const args = msg.args as InitAargs;
insertBatchSize = args.insertBatchSize ?? 10;
try { try {
if ("WebAssembly" in self) { if ("WebAssembly" in self) {
relay = new SqliteRelay(); relay = new SqliteRelay();
} else { } else {
relay = new InMemoryRelay(); relay = new InMemoryRelay();
} }
await relay.init(dbPath); await relay.init(args.databasePath);
} catch (e) { } catch (e) {
console.error("Fallback to InMemoryRelay", e); console.error("Fallback to InMemoryRelay", e);
relay = new InMemoryRelay(); relay = new InMemoryRelay();
await relay.init(dbPath); await relay.init(args.databasePath);
} }
reply(msg.id, true); reply(msg.id, true);
}); });