feat: local releay search
This commit is contained in:
@ -2,7 +2,7 @@ import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-w
|
||||
import { EventEmitter } from "eventemitter3";
|
||||
import { NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "./types";
|
||||
|
||||
export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements RelayHandler {
|
||||
export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements RelayHandler {
|
||||
#sqlite?: Sqlite3Static;
|
||||
#log = (...args: any[]) => console.debug(...args);
|
||||
#db?: Database;
|
||||
@ -67,6 +67,10 @@ export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
this.#migrate_v2();
|
||||
this.#log("Migrated to v2");
|
||||
}
|
||||
if (version < 3) {
|
||||
this.#migrate_v3();
|
||||
this.#log("Migrated to v3");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -81,9 +85,6 @@ export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run any SQL command
|
||||
*/
|
||||
sql(sql: string, params: Array<any>) {
|
||||
return this.#db?.selectArrays(sql, params) as Array<Array<string | number>>;
|
||||
}
|
||||
@ -94,11 +95,13 @@ export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
eventBatch(evs: Array<NostrEvent>) {
|
||||
const start = unixNowMs();
|
||||
let eventsInserted: Array<NostrEvent> = [];
|
||||
for (const ev of evs) {
|
||||
if (this.#insertEvent(this.#db!, ev)) {
|
||||
eventsInserted.push(ev);
|
||||
this.#db?.transaction(db => {
|
||||
for (const ev of evs) {
|
||||
if (this.#insertEvent(db, ev)) {
|
||||
eventsInserted.push(ev);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if (eventsInserted.length > 0) {
|
||||
this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}, ${(unixNowMs() - start).toLocaleString()}ms`);
|
||||
this.emit("event", eventsInserted);
|
||||
@ -154,13 +157,12 @@ export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
});
|
||||
let eventInserted = (this.#db?.changes() as number) > 0;
|
||||
if (eventInserted) {
|
||||
db.transaction(db => {
|
||||
for (const t of ev.tags.filter(a => a[0].length === 1)) {
|
||||
db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", {
|
||||
bind: [ev.id, t[0], t[1]],
|
||||
});
|
||||
}
|
||||
});
|
||||
for (const t of ev.tags.filter(a => a[0].length === 1)) {
|
||||
db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", {
|
||||
bind: [ev.id, t[0], t[1]],
|
||||
});
|
||||
}
|
||||
this.#insertSearchIndex(db, ev);
|
||||
}
|
||||
this.#seenInserts.add(ev.id);
|
||||
return eventInserted;
|
||||
@ -245,6 +247,11 @@ export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
params.push(key.slice(1));
|
||||
params.push(...vArray);
|
||||
}
|
||||
if (req.search) {
|
||||
sql += " inner join search_content on search_content.id = events.id";
|
||||
conditions.push("search_content match ?");
|
||||
params.push(req.search.replaceAll(".", "+").replaceAll("@", "+"));
|
||||
}
|
||||
if (req.ids) {
|
||||
conditions.push(`id in (${this.#repeatParams(req.ids.length)})`);
|
||||
params.push(...req.ids);
|
||||
@ -335,4 +342,50 @@ export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#insertSearchIndex(db: Database, ev: NostrEvent) {
|
||||
if (ev.kind === 0) {
|
||||
const profile = JSON.parse(ev.content) as {
|
||||
name?: string;
|
||||
display_name?: string;
|
||||
lud16?: string;
|
||||
nip05?: string;
|
||||
website?: string;
|
||||
about?: string;
|
||||
};
|
||||
if (profile) {
|
||||
const indexContent = [
|
||||
profile.name,
|
||||
profile.display_name,
|
||||
profile.about,
|
||||
profile.website,
|
||||
profile.lud16,
|
||||
profile.nip05,
|
||||
].join(" ");
|
||||
db.exec("insert into search_content values(?,?)", {
|
||||
bind: [ev.id, indexContent],
|
||||
});
|
||||
}
|
||||
} else if (ev.kind === 1) {
|
||||
db.exec("insert into search_content values(?,?)", {
|
||||
bind: [ev.id, ev.content],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#migrate_v3() {
|
||||
this.#db?.transaction(db => {
|
||||
db.exec("CREATE VIRTUAL TABLE search_content using fts5(id UNINDEXED, content)");
|
||||
const events = db.selectArrays("select json from events where kind in (?,?)", [0, 1]);
|
||||
for (const json of events) {
|
||||
const ev = JSON.parse(json[0] as string) as NostrEvent;
|
||||
if (ev) {
|
||||
this.#insertSearchIndex(db, ev);
|
||||
}
|
||||
}
|
||||
db.exec("insert into __migration values(3, ?)", {
|
||||
bind: [new Date().getTime() / 1000],
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
@ -50,6 +50,10 @@ export interface RelayHandler extends EventEmitter<RelayHandlerEvents> {
|
||||
close(): void;
|
||||
event(ev: NostrEvent): boolean;
|
||||
eventBatch(evs: Array<NostrEvent>): boolean;
|
||||
|
||||
/**
|
||||
* Run any SQL command
|
||||
*/
|
||||
sql(sql: string, params: Array<string | number>): Array<Array<string | number>>;
|
||||
req(id: string, req: ReqFilter): Array<NostrEvent>;
|
||||
count(req: ReqFilter): number;
|
||||
|
@ -2,8 +2,8 @@
|
||||
|
||||
import { InMemoryRelay } from "./memory-relay";
|
||||
import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";
|
||||
import { WorkerRelay } from "./relay";
|
||||
import { NostrEvent, RelayHandler, ReqCommand, ReqFilter, WorkerMessage, eventMatchesFilter } from "./types";
|
||||
import { SqliteRelay } from "./sqlite-relay";
|
||||
import { NostrEvent, RelayHandler, ReqCommand, ReqFilter, WorkerMessage, eventMatchesFilter, unixNowMs } from "./types";
|
||||
|
||||
interface PortedFilter {
|
||||
filters: Array<ReqFilter>;
|
||||
@ -33,10 +33,18 @@ async function insertBatch() {
|
||||
// This is to make req's execute first and not block them
|
||||
if (eventWriteQueue.length > 0 && cmdQueue.length === 0) {
|
||||
await barrierQueue(cmdQueue, async () => {
|
||||
const start = unixNowMs();
|
||||
const timeLimit = 1000;
|
||||
if (relay) {
|
||||
const toWrite = [...eventWriteQueue];
|
||||
eventWriteQueue = [];
|
||||
relay.eventBatch(toWrite);
|
||||
while (eventWriteQueue.length > 0) {
|
||||
if (unixNowMs() - start >= timeLimit) {
|
||||
console.debug("Yield insert, queue length: ", eventWriteQueue.length, ", cmds: ", cmdQueue.length);
|
||||
break;
|
||||
}
|
||||
const batch = eventWriteQueue.splice(0, 10);
|
||||
eventWriteQueue = eventWriteQueue.slice(batch.length);
|
||||
relay.eventBatch(batch);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -68,7 +76,7 @@ globalThis.onmessage = async ev => {
|
||||
case "init": {
|
||||
await barrierQueue(cmdQueue, async () => {
|
||||
if ("WebAssembly" in globalThis && (await tryOpfs())) {
|
||||
relay = new WorkerRelay();
|
||||
relay = new SqliteRelay();
|
||||
} else {
|
||||
relay = new InMemoryRelay();
|
||||
}
|
||||
|
Reference in New Issue
Block a user