migrations file, add seen_at column to events
This commit is contained in:
parent
c5e534a730
commit
351a249a32
101
packages/worker-relay/src/migrations.ts
Normal file
101
packages/worker-relay/src/migrations.ts
Normal file
@ -0,0 +1,101 @@
|
||||
import {NostrEvent} from "./types";
|
||||
import {SqliteRelay} from "./sqlite-relay";
|
||||
import debug from "debug";
|
||||
|
||||
const log = debug('SqliteRelay:migrations');
|
||||
|
||||
/**
|
||||
* Do database migration
|
||||
*/
|
||||
function migrate(relay: SqliteRelay) {
|
||||
if (!relay.db) throw new Error("DB must be open");
|
||||
|
||||
relay.db.exec(
|
||||
'CREATE TABLE IF NOT EXISTS "__migration" (version INTEGER,migrated NUMERIC, CONSTRAINT "__migration_PK" PRIMARY KEY (version))',
|
||||
);
|
||||
const res = relay.db.exec("select max(version) from __migration", {
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
|
||||
const version = (res[0][0] as number | undefined) ?? 0;
|
||||
log(`Starting migration from: v${version}`);
|
||||
if (version < 1) {
|
||||
migrate_v1(relay);
|
||||
log("Migrated to v1");
|
||||
}
|
||||
if (version < 2) {
|
||||
migrate_v2(relay);
|
||||
log("Migrated to v2");
|
||||
}
|
||||
if (version < 3) {
|
||||
migrate_v3(relay);
|
||||
log("Migrated to v3");
|
||||
}
|
||||
if (version < 4) {
|
||||
migrate_v4(relay);
|
||||
log("Migrated to v4");
|
||||
}
|
||||
}
|
||||
|
||||
function migrate_v1(relay: SqliteRelay) {
|
||||
relay.db?.transaction(db => {
|
||||
db.exec(
|
||||
"CREATE TABLE events (\
|
||||
id TEXT(64) PRIMARY KEY, \
|
||||
pubkey TEXT(64), \
|
||||
created INTEGER, \
|
||||
kind INTEGER, \
|
||||
json TEXT \
|
||||
)",
|
||||
);
|
||||
db.exec(
|
||||
"CREATE TABLE tags (\
|
||||
event_id TEXT(64), \
|
||||
key TEXT, \
|
||||
value TEXT, \
|
||||
CONSTRAINT tags_FK FOREIGN KEY (event_id) REFERENCES events(id) ON DELETE CASCADE \
|
||||
)",
|
||||
);
|
||||
db.exec("CREATE INDEX tags_key_IDX ON tags (key,value)");
|
||||
db.exec("insert into __migration values(1, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function migrate_v2(relay: SqliteRelay) {
|
||||
relay.db?.transaction(db => {
|
||||
db.exec("CREATE INDEX pubkey_kind_IDX ON events (pubkey,kind)");
|
||||
db.exec("CREATE INDEX pubkey_created_IDX ON events (pubkey,created)");
|
||||
db.exec("insert into __migration values(2, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function migrate_v3(relay: SqliteRelay) {
|
||||
relay.db?.transaction(db => {
|
||||
db.exec("CREATE VIRTUAL TABLE search_content using fts5(id UNINDEXED, content)");
|
||||
const events = db.selectArrays("select json from events where kind in (?,?)", [0, 1]);
|
||||
for (const json of events) {
|
||||
const ev = JSON.parse(json[0] as string) as NostrEvent;
|
||||
if (ev) {
|
||||
relay.insertIntoSearchIndex(db, ev);
|
||||
}
|
||||
}
|
||||
db.exec("insert into __migration values(3, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function migrate_v4(relay: SqliteRelay) {
|
||||
relay.db?.transaction(db => {
|
||||
db.exec("ALTER TABLE events ADD COLUMN seen_at INTEGER");
|
||||
db.exec("insert into __migration values(4, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export default migrate;
|
@ -2,11 +2,12 @@ import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-w
|
||||
import { EventEmitter } from "eventemitter3";
|
||||
import { NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "./types";
|
||||
import debug from "debug";
|
||||
import migrate from "./migrations";
|
||||
|
||||
export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements RelayHandler {
|
||||
#sqlite?: Sqlite3Static;
|
||||
#log = debug("SqliteRelay");
|
||||
#db?: Database;
|
||||
db?: Database;
|
||||
#seenInserts = new Set<string>();
|
||||
|
||||
/**
|
||||
@ -17,7 +18,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
this.#sqlite = await sqlite3InitModule();
|
||||
this.#log(`Got SQLite version: ${this.#sqlite.version.libVersion}`);
|
||||
await this.#open(path);
|
||||
this.#migrate();
|
||||
this.db && migrate(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -25,13 +26,13 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
*/
|
||||
async #open(path: string) {
|
||||
if (!this.#sqlite) throw new Error("Must call init first");
|
||||
if (this.#db) return;
|
||||
if (this.db) return;
|
||||
|
||||
if ("opfs" in this.#sqlite) {
|
||||
try {
|
||||
this.#db = new this.#sqlite.oo1.OpfsDb(path, "cw");
|
||||
this.#log(`Opened ${this.#db.filename}`);
|
||||
this.#db.exec(
|
||||
this.db = new this.#sqlite.oo1.OpfsDb(path, "cw");
|
||||
this.#log(`Opened ${this.db.filename}`);
|
||||
this.db.exec(
|
||||
`PRAGMA cache_size=${
|
||||
32 * 1024
|
||||
}; PRAGMA page_size=8192; PRAGMA journal_mode=MEMORY; PRAGMA temp_store=MEMORY;`,
|
||||
@ -46,44 +47,15 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
}
|
||||
|
||||
close() {
|
||||
this.#db?.close();
|
||||
this.#db = undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Do database migration
|
||||
*/
|
||||
#migrate() {
|
||||
if (!this.#db) throw new Error("DB must be open");
|
||||
|
||||
this.#db.exec(
|
||||
'CREATE TABLE IF NOT EXISTS "__migration" (version INTEGER,migrated NUMERIC, CONSTRAINT "__migration_PK" PRIMARY KEY (version))',
|
||||
);
|
||||
const res = this.#db.exec("select max(version) from __migration", {
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
|
||||
const version = (res[0][0] as number | undefined) ?? 0;
|
||||
this.#log(`Starting migration from: v${version}`);
|
||||
if (version < 1) {
|
||||
this.#migrate_v1();
|
||||
this.#log("Migrated to v1");
|
||||
}
|
||||
if (version < 2) {
|
||||
this.#migrate_v2();
|
||||
this.#log("Migrated to v2");
|
||||
}
|
||||
if (version < 3) {
|
||||
this.#migrate_v3();
|
||||
this.#log("Migrated to v3");
|
||||
}
|
||||
this.db?.close();
|
||||
this.db = undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert an event to the database
|
||||
*/
|
||||
event(ev: NostrEvent) {
|
||||
if (this.#insertEvent(this.#db!, ev)) {
|
||||
if (this.#insertEvent(this.db!, ev)) {
|
||||
this.#log(`Inserted: kind=${ev.kind},authors=${ev.pubkey},id=${ev.id}`);
|
||||
this.emit("event", [ev]);
|
||||
return true;
|
||||
@ -92,7 +64,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
}
|
||||
|
||||
sql(sql: string, params: Array<any>) {
|
||||
return this.#db?.selectArrays(sql, params) as Array<Array<string | number>>;
|
||||
return this.db?.selectArrays(sql, params) as Array<Array<string | number>>;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -101,7 +73,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
eventBatch(evs: Array<NostrEvent>) {
|
||||
const start = unixNowMs();
|
||||
let eventsInserted: Array<NostrEvent> = [];
|
||||
this.#db?.transaction(db => {
|
||||
this.db?.transaction(db => {
|
||||
for (const ev of evs) {
|
||||
if (this.#insertEvent(db, ev)) {
|
||||
eventsInserted.push(ev);
|
||||
@ -164,14 +136,14 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", {
|
||||
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)],
|
||||
});
|
||||
let eventInserted = (this.#db?.changes() as number) > 0;
|
||||
let eventInserted = (this.db?.changes() as number) > 0;
|
||||
if (eventInserted) {
|
||||
for (const t of ev.tags.filter(a => a[0].length === 1)) {
|
||||
db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", {
|
||||
bind: [ev.id, t[0], t[1]],
|
||||
});
|
||||
}
|
||||
this.#insertSearchIndex(db, ev);
|
||||
this.insertIntoSearchIndex(db, ev);
|
||||
}
|
||||
this.#seenInserts.add(ev.id);
|
||||
return eventInserted;
|
||||
@ -184,7 +156,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
const start = unixNowMs();
|
||||
|
||||
const [sql, params] = this.#buildQuery(req);
|
||||
const res = this.#db?.selectArrays(sql, params);
|
||||
const res = this.db?.selectArrays(sql, params);
|
||||
const results =
|
||||
res?.map(a => {
|
||||
if (req.ids_only === true) {
|
||||
@ -203,7 +175,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
count(req: ReqFilter) {
|
||||
const start = unixNowMs();
|
||||
const [sql, params] = this.#buildQuery(req, true);
|
||||
const rows = this.#db?.exec(sql, {
|
||||
const rows = this.db?.exec(sql, {
|
||||
bind: params,
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
@ -217,7 +189,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
* Get a summary about events table
|
||||
*/
|
||||
summary() {
|
||||
const res = this.#db?.exec("select kind, count(*) from events group by kind", {
|
||||
const res = this.db?.exec("select kind, count(*) from events group by kind", {
|
||||
returnValue: "resultRows",
|
||||
});
|
||||
return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []);
|
||||
@ -227,10 +199,10 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
* Dump the database file
|
||||
*/
|
||||
async dump() {
|
||||
const filePath = String(this.#db?.filename ?? "");
|
||||
const filePath = String(this.db?.filename ?? "");
|
||||
try {
|
||||
this.#db?.close();
|
||||
this.#db = undefined;
|
||||
this.db?.close();
|
||||
this.db = undefined;
|
||||
const dir = await navigator.storage.getDirectory();
|
||||
// @ts-expect-error
|
||||
for await (const [name, file] of dir) {
|
||||
@ -328,43 +300,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
return res;
|
||||
}
|
||||
|
||||
#migrate_v1() {
|
||||
this.#db?.transaction(db => {
|
||||
db.exec(
|
||||
"CREATE TABLE events (\
|
||||
id TEXT(64) PRIMARY KEY, \
|
||||
pubkey TEXT(64), \
|
||||
created INTEGER, \
|
||||
kind INTEGER, \
|
||||
json TEXT \
|
||||
)",
|
||||
);
|
||||
db.exec(
|
||||
"CREATE TABLE tags (\
|
||||
event_id TEXT(64), \
|
||||
key TEXT, \
|
||||
value TEXT, \
|
||||
CONSTRAINT tags_FK FOREIGN KEY (event_id) REFERENCES events(id) ON DELETE CASCADE \
|
||||
)",
|
||||
);
|
||||
db.exec("CREATE INDEX tags_key_IDX ON tags (key,value)");
|
||||
db.exec("insert into __migration values(1, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#migrate_v2() {
|
||||
this.#db?.transaction(db => {
|
||||
db.exec("CREATE INDEX pubkey_kind_IDX ON events (pubkey,kind)");
|
||||
db.exec("CREATE INDEX pubkey_created_IDX ON events (pubkey,created)");
|
||||
db.exec("insert into __migration values(2, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#insertSearchIndex(db: Database, ev: NostrEvent) {
|
||||
insertIntoSearchIndex(db: Database, ev: NostrEvent) {
|
||||
if (ev.kind === 0) {
|
||||
const profile = JSON.parse(ev.content) as {
|
||||
name?: string;
|
||||
@ -393,20 +329,4 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#migrate_v3() {
|
||||
this.#db?.transaction(db => {
|
||||
db.exec("CREATE VIRTUAL TABLE search_content using fts5(id UNINDEXED, content)");
|
||||
const events = db.selectArrays("select json from events where kind in (?,?)", [0, 1]);
|
||||
for (const json of events) {
|
||||
const ev = JSON.parse(json[0] as string) as NostrEvent;
|
||||
if (ev) {
|
||||
this.#insertSearchIndex(db, ev);
|
||||
}
|
||||
}
|
||||
db.exec("insert into __migration values(3, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user