feat: worker-relay delete

fix: worker-relay insert replacable events duplicate
This commit is contained in:
2024-05-23 11:59:48 +01:00
parent bb5bf34fe9
commit b764cc1535
15 changed files with 271 additions and 138 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@snort/worker-relay",
"version": "1.0.10",
"version": "1.1.0",
"description": "A nostr relay in a service worker",
"main": "dist/index.js",
"types": "dist/index.d.ts",
@ -18,7 +18,7 @@
"dist"
],
"dependencies": {
"@sqlite.org/sqlite-wasm": "^3.45.1-build1",
"@sqlite.org/sqlite-wasm": "^3.45.3-build3",
"eventemitter3": "^5.0.1",
"uuid": "^9.0.1"
},

View File

@ -63,6 +63,11 @@ export class WorkerRelayInterface {
return await this.#workerRpc<ReqCommand, number>("count", req);
}
async delete(req: ReqCommand) {
console.debug("DELETE", req);
return await this.#workerRpc<ReqCommand, Array<string>>("delete", req);
}
async summary() {
return await this.#workerRpc<void, Record<string, number>>("summary");
}

View File

@ -80,6 +80,13 @@ export class InMemoryRelay extends EventEmitter<RelayHandlerEvents> implements R
return ret;
}
delete(filter: ReqFilter) {
const forDelete = this.req("ids-for-delete", { ...filter, ids_only: true }) as Array<string>;
forDelete.forEach(a => this.#events.delete(a));
return forDelete;
}
setEventMetadata(_id: string, _meta: EventMetadata) {
return;
}

View File

@ -99,6 +99,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
}
#deleteById(db: Database, ids: Array<string>) {
if (ids.length === 0) return;
db.exec(`delete from events where id in (${this.#repeatParams(ids.length)})`, {
bind: ids,
});
@ -126,6 +127,9 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
this.#deleteById(db, toDelete);
}
return false;
} else {
// delete older versions
this.#deleteById(db, oldEvents);
}
}
if (ev.kind >= 30_000 && ev.kind < 40_000) {
@ -142,6 +146,9 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
this.#deleteById(db, toDelete);
}
return false;
} else {
// delete older versions
this.#deleteById(db, oldEvents);
}
}
db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", {
@ -196,6 +203,32 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
return results;
}
/**
* Delete events by nostr filter
*/
delete(req: ReqFilter) {
this.#log(`Starting delete of ${JSON.stringify(req)}`);
const start = unixNowMs();
const for_delete = this.req("ids-for-delete", { ...req, ids_only: true }) as Array<string>;
const grouped = for_delete.reduce(
(acc, v, i) => {
const batch = (i / 1000).toFixed(0);
acc[batch] ??= [];
acc[batch].push(v);
return acc;
},
{} as Record<string, Array<string>>,
);
this.#log(`Starting delete of ${Object.keys(grouped).length} batches`);
Object.entries(grouped).forEach(([batch, ids]) => {
this.#deleteById(this.db!, ids);
});
const time = unixNowMs() - start;
this.#log(`Delete ${for_delete.length} events took ${time.toLocaleString()}ms`);
return for_delete;
}
/**
* Get a summary about events table
*/
@ -231,7 +264,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
return new Uint8Array();
}
#buildQuery(req: ReqFilter, count = false): [string, Array<any>] {
#buildQuery(req: ReqFilter, count = false, remove = false): [string, Array<any>] {
const conditions: Array<string> = [];
const params: Array<any> = [];
@ -241,7 +274,11 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
} else if (req.ids_only === true) {
resultType = "id";
}
let sql = `select ${resultType} from events`;
let operation = `select ${resultType}`;
if (remove) {
operation = "delete";
}
let sql = `${operation} from events`;
const tags = Object.entries(req).filter(([k]) => k.startsWith("#"));
let tx = 0;
for (const [key, values] of tags) {

View File

@ -12,7 +12,8 @@ export type WorkerMessageCommand =
| "emit-event"
| "forYouFeed"
| "setEventMetadata"
| "debug";
| "debug"
| "delete";
export interface WorkerMessage<T> {
id: string;
@ -70,6 +71,7 @@ export interface RelayHandler extends EventEmitter<RelayHandlerEvents> {
count(req: ReqFilter): number;
summary(): Record<string, number>;
dump(): Promise<Uint8Array>;
delete(req: ReqFilter): Array<string>;
setEventMetadata(id: string, meta: EventMetadata): void;
}

View File

@ -4,7 +4,16 @@ import { SqliteRelay } from "./sqlite-relay";
import { InMemoryRelay } from "./memory-relay";
import { setLogging } from "./debug";
import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";
import { NostrEvent, RelayHandler, ReqCommand, ReqFilter, WorkerMessage, unixNowMs, EventMetadata } from "./types";
import {
NostrEvent,
RelayHandler,
ReqCommand,
ReqFilter,
WorkerMessage,
unixNowMs,
EventMetadata,
OkResponse,
} from "./types";
import { getForYouFeed } from "./forYouFeed";
let relay: RelayHandler | undefined;
@ -86,8 +95,13 @@ const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: Mes
break;
}
case "event": {
eventWriteQueue.push(msg.args as NostrEvent);
reply(msg.id, true);
const ev = msg.args as NostrEvent;
eventWriteQueue.push(ev);
reply(msg.id, {
ok: true,
id: ev.id,
relay: "",
} as OkResponse);
break;
}
case "close": {
@ -130,6 +144,20 @@ const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: Mes
});
break;
}
case "delete": {
console.debug("DELETE", msg.args);
await barrierQueue(cmdQueue, async () => {
const req = msg.args as ReqCommand;
let results = [];
const filters = req.slice(2) as Array<ReqFilter>;
for (const r of filters) {
const c = relay!.delete(r);
results.push(...c);
}
reply(msg.id, results);
});
break;
}
case "summary": {
await barrierQueue(cmdQueue, async () => {
const res = relay!.summary();