refactor: fix followgraph / add indexes
This commit is contained in:
@ -60,7 +60,7 @@ export class WorkerRelayInterface {
|
||||
return (await this.#workerRpc<object, Array<Array<any>>>("sql", { sql, params })).result;
|
||||
}
|
||||
|
||||
#workerRpc<T, R>(cmd: string, args?: T, timeout = 30_000) {
|
||||
#workerRpc<T, R>(cmd: string, args?: T) {
|
||||
const id = uuid();
|
||||
const msg = {
|
||||
id,
|
||||
@ -71,20 +71,14 @@ export class WorkerRelayInterface {
|
||||
return new Promise<{
|
||||
result: R;
|
||||
port: MessagePort | undefined;
|
||||
}>((resolve, reject) => {
|
||||
let t: ReturnType<typeof setTimeout>;
|
||||
}>(resolve => {
|
||||
this.#commandQueue.set(id, (v, port) => {
|
||||
clearTimeout(t);
|
||||
const cmdReply = v as WorkerMessage<R>;
|
||||
resolve({
|
||||
result: cmdReply.args,
|
||||
port: port.length > 0 ? port[0] : undefined,
|
||||
});
|
||||
});
|
||||
t = setTimeout(() => {
|
||||
reject("timeout");
|
||||
this.#commandQueue.delete(id);
|
||||
}, timeout);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -66,21 +66,22 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
|
||||
this.#migrate_v1();
|
||||
this.#log("Migrated to v1");
|
||||
}
|
||||
if (version < 2) {
|
||||
this.#migrate_v2();
|
||||
this.#log("Migrated to v2");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert an event to the database
|
||||
*/
|
||||
event(ev: NostrEvent) {
|
||||
let eventInserted = false;
|
||||
this.#db?.transaction(db => {
|
||||
eventInserted = this.#insertEvent(db, ev);
|
||||
});
|
||||
if (eventInserted) {
|
||||
if (this.#insertEvent(this.#db!, ev)) {
|
||||
this.#log(`Inserted: kind=${ev.kind},authors=${ev.pubkey},id=${ev.id}`);
|
||||
this.emit("event", [ev]);
|
||||
return true;
|
||||
}
|
||||
return eventInserted;
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -96,13 +97,11 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
|
||||
eventBatch(evs: Array<NostrEvent>) {
|
||||
const start = unixNowMs();
|
||||
let eventsInserted: Array<NostrEvent> = [];
|
||||
this.#db?.transaction(db => {
|
||||
for (const ev of evs) {
|
||||
if (this.#insertEvent(db, ev)) {
|
||||
eventsInserted.push(ev);
|
||||
}
|
||||
for (const ev of evs) {
|
||||
if (this.#insertEvent(this.#db!, ev)) {
|
||||
eventsInserted.push(ev);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (eventsInserted.length > 0) {
|
||||
this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}, ${(unixNowMs() - start).toLocaleString()}ms`);
|
||||
this.emit("event", eventsInserted);
|
||||
@ -114,6 +113,7 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
|
||||
db.exec(`delete from events where id in (${this.#repeatParams(ids.length)})`, {
|
||||
bind: ids,
|
||||
});
|
||||
this.#log("Deleted", ids, db.changes());
|
||||
}
|
||||
|
||||
#insertEvent(db: Database, ev: NostrEvent) {
|
||||
@ -326,4 +326,14 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#migrate_v2() {
|
||||
this.#db?.transaction(db => {
|
||||
db.exec("CREATE INDEX pubkey_kind_IDX ON events (pubkey,kind)");
|
||||
db.exec("CREATE INDEX pubkey_created_IDX ON events (pubkey,created)");
|
||||
db.exec("insert into __migration values(2, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user