feat: upgrade caches to worker
This commit is contained in:
@ -40,7 +40,15 @@ export class WorkerRelayInterface {
|
||||
return await this.#workerRpc<ReqCommand, Array<NostrEvent>>("req", req);
|
||||
}
|
||||
|
||||
#workerRpc<T, R>(cmd: string, args?: T, timeout = 5_000) {
|
||||
async count(req: ReqCommand) {
|
||||
return await this.#workerRpc<ReqCommand, number>("count", req);
|
||||
}
|
||||
|
||||
async summary() {
|
||||
return await this.#workerRpc<void, Record<string, number>>("summary");
|
||||
}
|
||||
|
||||
#workerRpc<T, R>(cmd: string, args?: T, timeout = 30_000) {
|
||||
const id = uuid();
|
||||
const msg = {
|
||||
id,
|
||||
|
@ -1,16 +1,17 @@
|
||||
import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
|
||||
import debug from "debug";
|
||||
import { NostrEvent, ReqFilter } from "types";
|
||||
import { NostrEvent, ReqFilter, unixNowMs } from "./types";
|
||||
|
||||
export class WorkerRelay {
|
||||
#sqlite?: Sqlite3Static;
|
||||
#log = (msg: string) => console.debug(msg);
|
||||
#log = (...msg: Array<any>) => console.debug(...msg);
|
||||
#db?: Database;
|
||||
|
||||
/**
|
||||
* Initialize the SQLite driver
|
||||
*/
|
||||
async init() {
|
||||
if (this.#sqlite) return;
|
||||
this.#sqlite = await sqlite3InitModule();
|
||||
this.#log(`Got SQLite version: ${this.#sqlite.version.libVersion}`);
|
||||
}
|
||||
@ -20,15 +21,26 @@ export class WorkerRelay {
|
||||
*/
|
||||
async open(path: string) {
|
||||
if (!this.#sqlite) throw new Error("Must call init first");
|
||||
if (this.#db) return;
|
||||
|
||||
if ("opfs" in this.#sqlite) {
|
||||
this.#db = new this.#sqlite.oo1.OpfsDb(path, "cw");
|
||||
this.#log(`Opened ${this.#db.filename}`);
|
||||
try {
|
||||
this.#db = new this.#sqlite.oo1.OpfsDb(path, "cw");
|
||||
this.#log(`Opened ${this.#db.filename}`);
|
||||
} catch (e) {
|
||||
// wipe db
|
||||
console.error(e);
|
||||
}
|
||||
} else {
|
||||
throw new Error("OPFS not supported!");
|
||||
}
|
||||
}
|
||||
|
||||
close() {
|
||||
this.#db?.close();
|
||||
this.#db = undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Do database migration
|
||||
*/
|
||||
@ -56,34 +68,7 @@ export class WorkerRelay {
|
||||
event(ev: NostrEvent) {
|
||||
let eventInserted = false;
|
||||
this.#db?.transaction(db => {
|
||||
const legacyReplacable = [0, 3, 41];
|
||||
if (legacyReplacable.includes(ev.kind) || (ev.kind >= 10_000 && ev.kind < 20_000)) {
|
||||
db.exec("delete from events where kind = ? and pubkey = ?", {
|
||||
bind: [ev.kind, ev.pubkey],
|
||||
});
|
||||
this.#log(`Deleted old kind=${ev.kind}, author=${ev.pubkey} (rows=${db.changes()})`);
|
||||
}
|
||||
if (ev.kind >= 30_000 && ev.kind < 40_000) {
|
||||
const dTag = ev.tags.find(a => a[0] === "d")![1];
|
||||
db.exec(
|
||||
"delete from events where id in (select id from events, tags where events.id = tags.event_id and tags.key = ? and tags.value = ?)",
|
||||
{
|
||||
bind: ["d", dTag],
|
||||
},
|
||||
);
|
||||
this.#log(`Deleted old versions of: d=${dTag}, kind=${ev.kind}, author=${ev.pubkey} (rows=${db.changes()})`);
|
||||
}
|
||||
db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", {
|
||||
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)],
|
||||
});
|
||||
eventInserted = (this.#db?.changes() as number) > 0;
|
||||
if (eventInserted) {
|
||||
for (const t of ev.tags.filter(a => a[0].length === 1)) {
|
||||
db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", {
|
||||
bind: [ev.id, t[0], t[1]],
|
||||
});
|
||||
}
|
||||
}
|
||||
eventInserted = this.#insertEvent(db, ev);
|
||||
});
|
||||
if (eventInserted) {
|
||||
this.#log(`Inserted: kind=${ev.kind},authors=${ev.pubkey},id=${ev.id}`);
|
||||
@ -91,29 +76,125 @@ export class WorkerRelay {
|
||||
return eventInserted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write multiple events
|
||||
*/
|
||||
eventBatch(evs: Array<NostrEvent>) {
|
||||
let eventsInserted: Array<NostrEvent> = [];
|
||||
this.#db?.transaction(db => {
|
||||
for (const ev of evs) {
|
||||
if (this.#insertEvent(db, ev)) {
|
||||
eventsInserted.push(ev);
|
||||
}
|
||||
}
|
||||
});
|
||||
if (eventsInserted.length > 0) {
|
||||
this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}`);
|
||||
}
|
||||
return eventsInserted.length > 0;
|
||||
}
|
||||
|
||||
#insertEvent(db: Database, ev: NostrEvent) {
|
||||
const legacyReplacable = [0, 3, 41];
|
||||
if (legacyReplacable.includes(ev.kind) || (ev.kind >= 10_000 && ev.kind < 20_000)) {
|
||||
db.exec("delete from events where kind = ? and pubkey = ? and created < ?", {
|
||||
bind: [ev.kind, ev.pubkey, ev.created_at],
|
||||
});
|
||||
const oldDeleted = db.changes();
|
||||
if (oldDeleted === 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (ev.kind >= 30_000 && ev.kind < 40_000) {
|
||||
const dTag = ev.tags.find(a => a[0] === "d")![1];
|
||||
db.exec(
|
||||
"delete from events where id in (select id from events, tags where events.id = tags.event_id and tags.key = ? and tags.value = ?)",
|
||||
{
|
||||
bind: ["d", dTag],
|
||||
},
|
||||
);
|
||||
const oldDeleted = db.changes();
|
||||
if (oldDeleted === 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", {
|
||||
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)],
|
||||
});
|
||||
let eventInserted = (this.#db?.changes() as number) > 0;
|
||||
if (eventInserted) {
|
||||
for (const t of ev.tags.filter(a => a[0].length === 1)) {
|
||||
db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", {
|
||||
bind: [ev.id, t[0], t[1]],
|
||||
});
|
||||
}
|
||||
}
|
||||
return eventInserted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query relay by nostr filter
|
||||
*/
|
||||
req(req: ReqFilter) {
|
||||
const start = unixNowMs();
|
||||
|
||||
const [sql, params] = this.#buildQuery(req);
|
||||
const rows = this.#db?.exec(sql, {
|
||||
bind: params,
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
const results = rows?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? [];
|
||||
const time = unixNowMs() - start;
|
||||
this.#log(`Query results took ${time.toLocaleString()}ms`);
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Count results by nostr filter
|
||||
*/
|
||||
count(req: ReqFilter) {
|
||||
const start = unixNowMs();
|
||||
const [sql, params] = this.#buildQuery(req, true);
|
||||
const rows = this.#db?.exec(sql, {
|
||||
bind: params,
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
const results = (rows?.at(0)?.at(0) as number | undefined) ?? 0;
|
||||
const time = unixNowMs() - start;
|
||||
this.#log(`Query count results took ${time.toLocaleString()}ms`);
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a summary about events table
|
||||
*/
|
||||
summary() {
|
||||
const res = this.#db?.exec("select kind, count(*) from events group by kind order by 2 desc", {
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []);
|
||||
}
|
||||
|
||||
#buildQuery(req: ReqFilter, count = false) {
|
||||
const conditions: Array<string> = [];
|
||||
const params: Array<any> = [];
|
||||
|
||||
const repeatParams = (n: number) => {
|
||||
const ret = [];
|
||||
const ret: Array<string> = [];
|
||||
for (let x = 0; x < n; x++) {
|
||||
ret.push("?");
|
||||
}
|
||||
return ret.join(", ");
|
||||
};
|
||||
|
||||
let sql = `select json from events`;
|
||||
let sql = `select ${count ? "count(json)" : "json"} from events`;
|
||||
const tags = Object.entries(req).filter(([k]) => k.startsWith("#"));
|
||||
for (const [key, values] of tags) {
|
||||
const vArray = values as Array<string>;
|
||||
sql += ` inner join tags on events.id = tags.event_id and tags.key = ? and tags.value in (${repeatParams(
|
||||
vArray.length,
|
||||
)})`;
|
||||
params.push(key);
|
||||
params.push(key.slice(1));
|
||||
params.push(...vArray);
|
||||
}
|
||||
if (req.ids) {
|
||||
@ -142,13 +223,7 @@ export class WorkerRelay {
|
||||
if (req.limit) {
|
||||
sql += ` order by created desc limit ${req.limit}`;
|
||||
}
|
||||
|
||||
this.#log(`Made query ${sql} from ${JSON.stringify(req)}`);
|
||||
const rows = this.#db?.exec(sql, {
|
||||
bind: params,
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
return rows?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? [];
|
||||
return [sql, params];
|
||||
}
|
||||
|
||||
#migrate_v1() {
|
||||
|
@ -1,6 +1,6 @@
|
||||
export interface WorkerMessage<T> {
|
||||
id: string;
|
||||
cmd: "reply" | "init" | "open" | "migrate" | "event" | "req";
|
||||
cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary";
|
||||
args: T;
|
||||
}
|
||||
|
||||
@ -27,3 +27,7 @@ export interface ReqFilter {
|
||||
not?: ReqFilter;
|
||||
[key: string]: Array<string> | Array<number> | string | number | undefined | ReqFilter;
|
||||
}
|
||||
|
||||
export function unixNowMs() {
|
||||
return new Date().getTime();
|
||||
}
|
@ -13,43 +13,77 @@ async function reply<T>(id: string, obj?: T) {
|
||||
} as WorkerMessage<T>);
|
||||
}
|
||||
|
||||
// Event inserter queue
|
||||
let eventWriteQueue: Array<NostrEvent> = [];
|
||||
async function insertBatch() {
|
||||
if (eventWriteQueue.length > 0) {
|
||||
relay.eventBatch(eventWriteQueue);
|
||||
eventWriteQueue = [];
|
||||
}
|
||||
setTimeout(() => insertBatch(), 100);
|
||||
}
|
||||
setTimeout(() => insertBatch(), 100);
|
||||
|
||||
globalThis.onclose = () => {
|
||||
relay.close();
|
||||
};
|
||||
|
||||
globalThis.onmessage = async ev => {
|
||||
//console.debug(ev);
|
||||
|
||||
const msg = ev.data as WorkerMessage<any>;
|
||||
switch (msg.cmd) {
|
||||
case "init": {
|
||||
await relay.init();
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "open": {
|
||||
await relay.open("/relay.db");
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "migrate": {
|
||||
await relay.migrate();
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "event": {
|
||||
await relay.event(msg.args as NostrEvent);
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "req": {
|
||||
const req = msg.args as ReqCommand;
|
||||
const results = [];
|
||||
for (const r of req.slice(2)) {
|
||||
results.push(...(await relay.req(r as ReqFilter)));
|
||||
try {
|
||||
switch (msg.cmd) {
|
||||
case "init": {
|
||||
await relay.init();
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "open": {
|
||||
await relay.open("/relay.db");
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "migrate": {
|
||||
relay.migrate();
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "event": {
|
||||
eventWriteQueue.push(msg.args as NostrEvent);
|
||||
reply(msg.id, true);
|
||||
break;
|
||||
}
|
||||
case "req": {
|
||||
const req = msg.args as ReqCommand;
|
||||
const results = [];
|
||||
for (const r of req.slice(2)) {
|
||||
results.push(...relay.req(r as ReqFilter));
|
||||
}
|
||||
reply(msg.id, results);
|
||||
break;
|
||||
}
|
||||
case "count": {
|
||||
const req = msg.args as ReqCommand;
|
||||
let results = 0;
|
||||
for (const r of req.slice(2)) {
|
||||
const c = relay.count(r as ReqFilter);
|
||||
results += c;
|
||||
}
|
||||
reply(msg.id, results);
|
||||
break;
|
||||
}
|
||||
case "summary": {
|
||||
const res = relay.summary();
|
||||
reply(msg.id, res);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
reply(msg.id, { error: "Unknown command" });
|
||||
break;
|
||||
}
|
||||
reply(msg.id, results);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
reply(msg.id, { error: "Unknown command" });
|
||||
break;
|
||||
}
|
||||
} catch (e) {
|
||||
reply(msg.id, { error: e });
|
||||
}
|
||||
};
|
||||
|
Reference in New Issue
Block a user