diff --git a/packages/worker-relay/src/migrations.ts b/packages/worker-relay/src/migrations.ts new file mode 100644 index 00000000..219d91de --- /dev/null +++ b/packages/worker-relay/src/migrations.ts @@ -0,0 +1,101 @@ +import {NostrEvent} from "./types"; +import {SqliteRelay} from "./sqlite-relay"; +import debug from "debug"; + +const log = debug('SqliteRelay:migrations'); + +/** + * Do database migration + */ +function migrate(relay: SqliteRelay) { + if (!relay.db) throw new Error("DB must be open"); + + relay.db.exec( + 'CREATE TABLE IF NOT EXISTS "__migration" (version INTEGER,migrated NUMERIC, CONSTRAINT "__migration_PK" PRIMARY KEY (version))', + ); + const res = relay.db.exec("select max(version) from __migration", { + returnValue: "resultRows", + }); + + const version = (res[0][0] as number | undefined) ?? 0; + log(`Starting migration from: v${version}`); + if (version < 1) { + migrate_v1(relay); + log("Migrated to v1"); + } + if (version < 2) { + migrate_v2(relay); + log("Migrated to v2"); + } + if (version < 3) { + migrate_v3(relay); + log("Migrated to v3"); + } + if (version < 4) { + migrate_v4(relay); + log("Migrated to v4"); + } +} + +function migrate_v1(relay: SqliteRelay) { + relay.db?.transaction(db => { + db.exec( + "CREATE TABLE events (\ + id TEXT(64) PRIMARY KEY, \ + pubkey TEXT(64), \ + created INTEGER, \ + kind INTEGER, \ + json TEXT \ + )", + ); + db.exec( + "CREATE TABLE tags (\ + event_id TEXT(64), \ + key TEXT, \ + value TEXT, \ + CONSTRAINT tags_FK FOREIGN KEY (event_id) REFERENCES events(id) ON DELETE CASCADE \ + )", + ); + db.exec("CREATE INDEX tags_key_IDX ON tags (key,value)"); + db.exec("insert into __migration values(1, ?)", { + bind: [new Date().getTime() / 1000], + }); + }); +} + +function migrate_v2(relay: SqliteRelay) { + relay.db?.transaction(db => { + db.exec("CREATE INDEX pubkey_kind_IDX ON events (pubkey,kind)"); + db.exec("CREATE INDEX pubkey_created_IDX ON events (pubkey,created)"); + db.exec("insert into __migration values(2, ?)", { + bind: [new Date().getTime() / 1000], + }); + }); +} + +function migrate_v3(relay: SqliteRelay) { + relay.db?.transaction(db => { + db.exec("CREATE VIRTUAL TABLE search_content using fts5(id UNINDEXED, content)"); + const events = db.selectArrays("select json from events where kind in (?,?)", [0, 1]); + for (const json of events) { + const ev = JSON.parse(json[0] as string) as NostrEvent; + if (ev) { + relay.insertIntoSearchIndex(db, ev); + } + } + db.exec("insert into __migration values(3, ?)", { + bind: [new Date().getTime() / 1000], + }); + }); +} + +async function migrate_v4(relay: SqliteRelay) { + relay.db?.transaction(db => { + db.exec("ALTER TABLE events ADD COLUMN seen_at INTEGER"); + db.exec("insert into __migration values(4, ?)", { + bind: [new Date().getTime() / 1000], + }); + }); +} + +export default migrate; \ No newline at end of file diff --git a/packages/worker-relay/src/sqlite-relay.ts b/packages/worker-relay/src/sqlite-relay.ts index 0e0da2a7..f1dfc39d 100644 --- a/packages/worker-relay/src/sqlite-relay.ts +++ b/packages/worker-relay/src/sqlite-relay.ts @@ -2,11 +2,12 @@ import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-w import { EventEmitter } from "eventemitter3"; import { NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "./types"; import debug from "debug"; +import migrate from "./migrations"; export class SqliteRelay extends EventEmitter implements RelayHandler { #sqlite?: Sqlite3Static; #log = debug("SqliteRelay"); - #db?: Database; + db?: Database; #seenInserts = new Set(); /** @@ -17,7 +18,7 @@ export class SqliteRelay extends EventEmitter implements Rel this.#sqlite = await sqlite3InitModule(); this.#log(`Got SQLite version: ${this.#sqlite.version.libVersion}`); await this.#open(path); - this.#migrate(); + this.db && migrate(this); } /** @@ -25,13 +26,13 @@ export class SqliteRelay extends EventEmitter implements Rel */ async #open(path: string) { if (!this.#sqlite) throw new Error("Must call init first"); - if (this.#db) return; + if (this.db) return; if ("opfs" in this.#sqlite) { try { - this.#db = new this.#sqlite.oo1.OpfsDb(path, "cw"); - this.#log(`Opened ${this.#db.filename}`); - this.#db.exec( + this.db = new this.#sqlite.oo1.OpfsDb(path, "cw"); + this.#log(`Opened ${this.db.filename}`); + this.db.exec( `PRAGMA cache_size=${ 32 * 1024 }; PRAGMA page_size=8192; PRAGMA journal_mode=MEMORY; PRAGMA temp_store=MEMORY;`, @@ -46,44 +47,15 @@ export class SqliteRelay extends EventEmitter implements Rel } close() { - this.#db?.close(); - this.#db = undefined; - } - - /** - * Do database migration - */ - #migrate() { - if (!this.#db) throw new Error("DB must be open"); - - this.#db.exec( - 'CREATE TABLE IF NOT EXISTS "__migration" (version INTEGER,migrated NUMERIC, CONSTRAINT "__migration_PK" PRIMARY KEY (version))', - ); - const res = this.#db.exec("select max(version) from __migration", { - returnValue: "resultRows", - }); - - const version = (res[0][0] as number | undefined) ?? 0; - this.#log(`Starting migration from: v${version}`); - if (version < 1) { - this.#migrate_v1(); - this.#log("Migrated to v1"); - } - if (version < 2) { - this.#migrate_v2(); - this.#log("Migrated to v2"); - } - if (version < 3) { - this.#migrate_v3(); - this.#log("Migrated to v3"); - } + this.db?.close(); + this.db = undefined; } /** * Insert an event to the database */ event(ev: NostrEvent) { - if (this.#insertEvent(this.#db!, ev)) { + if (this.#insertEvent(this.db!, ev)) { this.#log(`Inserted: kind=${ev.kind},authors=${ev.pubkey},id=${ev.id}`); this.emit("event", [ev]); return true; @@ -92,7 +64,7 @@ export class SqliteRelay extends EventEmitter implements Rel } sql(sql: string, params: Array) { - return this.#db?.selectArrays(sql, params) as Array>; + return this.db?.selectArrays(sql, params) as Array>; } /** @@ -101,7 +73,7 @@ export class SqliteRelay extends EventEmitter implements Rel eventBatch(evs: Array) { const start = unixNowMs(); let eventsInserted: Array = []; - this.#db?.transaction(db => { + this.db?.transaction(db => { for (const ev of evs) { if (this.#insertEvent(db, ev)) { eventsInserted.push(ev); @@ -164,14 +136,14 @@ export class SqliteRelay extends EventEmitter implements Rel db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", { bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)], }); - let eventInserted = (this.#db?.changes() as number) > 0; + let eventInserted = (this.db?.changes() as number) > 0; if (eventInserted) { for (const t of ev.tags.filter(a => a[0].length === 1)) { db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", { bind: [ev.id, t[0], t[1]], }); } - this.#insertSearchIndex(db, ev); + this.insertIntoSearchIndex(db, ev); } this.#seenInserts.add(ev.id); return eventInserted; @@ -184,7 +156,7 @@ export class SqliteRelay extends EventEmitter implements Rel const start = unixNowMs(); const [sql, params] = this.#buildQuery(req); - const res = this.#db?.selectArrays(sql, params); + const res = this.db?.selectArrays(sql, params); const results = res?.map(a => { if (req.ids_only === true) { @@ -203,7 +175,7 @@ export class SqliteRelay extends EventEmitter implements Rel count(req: ReqFilter) { const start = unixNowMs(); const [sql, params] = this.#buildQuery(req, true); - const rows = this.#db?.exec(sql, { + const rows = this.db?.exec(sql, { bind: params, returnValue: "resultRows", }); @@ -217,7 +189,7 @@ export class SqliteRelay extends EventEmitter implements Rel * Get a summary about events table */ summary() { - const res = this.#db?.exec("select kind, count(*) from events group by kind", { + const res = this.db?.exec("select kind, count(*) from events group by kind", { returnValue: "resultRows", }); return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []); @@ -227,10 +199,10 @@ export class SqliteRelay extends EventEmitter implements Rel * Dump the database file */ async dump() { - const filePath = String(this.#db?.filename ?? ""); + const filePath = String(this.db?.filename ?? ""); try { - this.#db?.close(); - this.#db = undefined; + this.db?.close(); + this.db = undefined; const dir = await navigator.storage.getDirectory(); // @ts-expect-error for await (const [name, file] of dir) { @@ -328,43 +300,7 @@ export class SqliteRelay extends EventEmitter implements Rel return res; } - #migrate_v1() { - this.#db?.transaction(db => { - db.exec( - "CREATE TABLE events (\ - id TEXT(64) PRIMARY KEY, \ - pubkey TEXT(64), \ - created INTEGER, \ - kind INTEGER, \ - json TEXT \ - )", - ); - db.exec( - "CREATE TABLE tags (\ - event_id TEXT(64), \ - key TEXT, \ - value TEXT, \ - CONSTRAINT tags_FK FOREIGN KEY (event_id) REFERENCES events(id) ON DELETE CASCADE \ - )", - ); - db.exec("CREATE INDEX tags_key_IDX ON tags (key,value)"); - db.exec("insert into __migration values(1, ?)", { - bind: [new Date().getTime() / 1000], - }); - }); - } - - #migrate_v2() { - this.#db?.transaction(db => { - db.exec("CREATE INDEX pubkey_kind_IDX ON events (pubkey,kind)"); - db.exec("CREATE INDEX pubkey_created_IDX ON events (pubkey,created)"); - db.exec("insert into __migration values(2, ?)", { - bind: [new Date().getTime() / 1000], - }); - }); - } - - #insertSearchIndex(db: Database, ev: NostrEvent) { + insertIntoSearchIndex(db: Database, ev: NostrEvent) { if (ev.kind === 0) { const profile = JSON.parse(ev.content) as { name?: string; @@ -393,20 +329,4 @@ export class SqliteRelay extends EventEmitter implements Rel }); } } - - #migrate_v3() { - this.#db?.transaction(db => { - db.exec("CREATE VIRTUAL TABLE search_content using fts5(id UNINDEXED, content)"); - const events = db.selectArrays("select json from events where kind in (?,?)", [0, 1]); - for (const json of events) { - const ev = JSON.parse(json[0] as string) as NostrEvent; - if (ev) { - this.#insertSearchIndex(db, ev); - } - } - db.exec("insert into __migration values(3, ?)", { - bind: [new Date().getTime() / 1000], - }); - }); - } }