diff --git a/packages/app/src/Cache/index.ts b/packages/app/src/Cache/index.ts index 17360594..6bb1dd22 100644 --- a/packages/app/src/Cache/index.ts +++ b/packages/app/src/Cache/index.ts @@ -13,6 +13,7 @@ export const Relay = new WorkerRelayInterface( ); export async function initRelayWorker() { try { + await Relay.debug(""); await Relay.init({ databasePath: "relay.db", insertBatchSize: 100, diff --git a/packages/system/src/sync/connection.ts b/packages/system/src/sync/connection.ts index 35e97710..70fee5f3 100644 --- a/packages/system/src/sync/connection.ts +++ b/packages/system/src/sync/connection.ts @@ -1,16 +1,16 @@ import { Connection, SyncCommand } from "../connection"; -import { FallbackSyncMethod } from "../system"; import { EventExt, EventType } from "../event-ext"; import { NoteCollection } from "../note-collection"; import { RangeSync } from "./range-sync"; import { NegentropyFlow } from "../negentropy/negentropy-flow"; +import { SystemConfig } from "../system"; export interface ConnectionSyncModule { sync: (c: Connection, item: SyncCommand, cb?: () => void) => void; } export class DefaultSyncModule implements ConnectionSyncModule { - constructor(readonly method: FallbackSyncMethod) {} + constructor(readonly method: SystemConfig["fallbackSync"]) {} sync(c: Connection, item: SyncCommand, cb?: () => void) { const [_, id, eventSet, ...filters] = item; @@ -49,9 +49,9 @@ export class DefaultSyncModule implements ConnectionSyncModule { ); if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplaceableSync) { c.request(["REQ", id, ...filters], cb); - } else if (this.method === FallbackSyncMethod.Since) { + } else if (this.method === "since") { this.#syncSince(c, item, cb); - } else if (this.method === FallbackSyncMethod.RangeSync) { + } else if (this.method === "range-sync") { this.#syncRangeSync(c, item, cb); } else { throw new Error("No fallback sync method"); diff --git a/packages/system/src/system-base.ts b/packages/system/src/system-base.ts index 2cc1e908..d7ceddf0 100644 --- a/packages/system/src/system-base.ts +++ b/packages/system/src/system-base.ts @@ -5,7 +5,7 @@ import { EventsCache } from "./cache/events"; import { UserFollowsCache } from "./cache/user-follows-lists"; import { UserRelaysCache, UserProfileCache, RelayMetricCache, NostrEvent } from "./index"; import { DefaultOptimizer, Optimizer } from "./query-optimizer"; -import { FallbackSyncMethod, NostrSystemEvents, SystemConfig } from "./system"; +import { NostrSystemEvents, SystemConfig } from "./system"; import { EventEmitter } from "eventemitter3"; export abstract class SystemBase extends EventEmitter { @@ -30,7 +30,7 @@ export abstract class SystemBase extends EventEmitter { db: props.db, automaticOutboxModel: props.automaticOutboxModel ?? true, buildFollowGraph: props.buildFollowGraph ?? false, - fallbackSync: props.fallbackSync ?? FallbackSyncMethod.Since, + fallbackSync: props.fallbackSync ?? "since", }; } diff --git a/packages/system/src/system.ts b/packages/system/src/system.ts index 760fe480..55f035de 100644 --- a/packages/system/src/system.ts +++ b/packages/system/src/system.ts @@ -95,12 +95,7 @@ export interface SystemConfig { /** * Pick a fallback sync method when negentropy is not available */ - fallbackSync: FallbackSyncMethod; -} - -export enum FallbackSyncMethod { - Since = "since", - RangeSync = "range-sync", + fallbackSync: "since" | "range-sync"; } export interface SystemInterface { diff --git a/packages/worker-relay/src/sqlite/fixers.ts b/packages/worker-relay/src/sqlite/fixers.ts new file mode 100644 index 00000000..a4d69b3e --- /dev/null +++ b/packages/worker-relay/src/sqlite/fixers.ts @@ -0,0 +1,5 @@ +import { SqliteRelay } from "./sqlite-relay"; + +export async function runFixers(relay: SqliteRelay) { + +} \ No newline at end of file diff --git a/packages/worker-relay/src/migrations.ts b/packages/worker-relay/src/sqlite/migrations.ts similarity index 97% rename from packages/worker-relay/src/migrations.ts rename to packages/worker-relay/src/sqlite/migrations.ts index debf0caa..ea3b412c 100644 --- a/packages/worker-relay/src/migrations.ts +++ b/packages/worker-relay/src/sqlite/migrations.ts @@ -1,6 +1,6 @@ -import { NostrEvent } from "./types"; +import { NostrEvent } from "../types"; import { SqliteRelay } from "./sqlite-relay"; -import { debugLog } from "./debug"; +import { debugLog } from "../debug"; const log = (msg: string, ...args: Array) => debugLog("SqliteRelay:migrations", msg, ...args); diff --git a/packages/worker-relay/src/sqlite-relay.ts b/packages/worker-relay/src/sqlite/sqlite-relay.ts similarity index 95% rename from packages/worker-relay/src/sqlite-relay.ts rename to packages/worker-relay/src/sqlite/sqlite-relay.ts index d48250a7..9e426202 100644 --- a/packages/worker-relay/src/sqlite-relay.ts +++ b/packages/worker-relay/src/sqlite/sqlite-relay.ts @@ -1,11 +1,12 @@ import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm"; import { EventEmitter } from "eventemitter3"; -import { EventMetadata, NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "./types"; +import { EventMetadata, NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "../types"; import migrate from "./migrations"; -import { debugLog } from "./debug"; +import { debugLog } from "../debug"; // import wasm file directly, this needs to be copied from https://sqlite.org/download.html import SqlitePath from "./sqlite3.wasm?url"; +import { runFixers } from "./fixers"; export class SqliteRelay extends EventEmitter implements RelayHandler { #sqlite?: Sqlite3Static; @@ -30,7 +31,11 @@ export class SqliteRelay extends EventEmitter implements Rel }); this.#log(`Got SQLite version: ${this.#sqlite.version.libVersion}`); await this.#open(path); - this.db && migrate(this); + if (this.db) { + await migrate(this); + // dont await to avoid timeout + runFixers(this); + } } /** @@ -103,10 +108,11 @@ export class SqliteRelay extends EventEmitter implements Rel db.exec(`delete from events where id in (${this.#repeatParams(ids.length)})`, { bind: ids, }); + const deleted = db.changes(); db.exec(`delete from search_content where id in (${this.#repeatParams(ids.length)})`, { bind: ids, }); - this.#log("Deleted", ids, db.changes()); + this.#log("Deleted", ids, deleted); } #insertEvent(db: Database, ev: NostrEvent) { @@ -154,17 +160,19 @@ export class SqliteRelay extends EventEmitter implements Rel db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", { bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)], }); - let eventInserted = (this.db?.changes() as number) > 0; - if (eventInserted) { + const insertedEvents = db.changes(); + if (insertedEvents > 0) { for (const t of ev.tags.filter(a => a[0].length === 1)) { db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", { bind: [ev.id, t[0], t[1]], }); } this.insertIntoSearchIndex(db, ev); + } else { + return 0; } this.#seenInserts.add(ev.id); - return eventInserted; + return insertedEvents; } /** @@ -379,4 +387,8 @@ export class SqliteRelay extends EventEmitter implements Rel }); } } + + #fixMissingTags(db: Database) { + + } } diff --git a/packages/worker-relay/src/sqlite3.wasm b/packages/worker-relay/src/sqlite/sqlite3.wasm similarity index 100% rename from packages/worker-relay/src/sqlite3.wasm rename to packages/worker-relay/src/sqlite/sqlite3.wasm diff --git a/packages/worker-relay/src/worker.ts b/packages/worker-relay/src/worker.ts index 90f01bb4..f014a2cd 100644 --- a/packages/worker-relay/src/worker.ts +++ b/packages/worker-relay/src/worker.ts @@ -1,6 +1,6 @@ /// -import { SqliteRelay } from "./sqlite-relay"; +import { SqliteRelay } from "./sqlite/sqlite-relay"; import { InMemoryRelay } from "./memory-relay"; import { setLogging } from "./debug"; import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";