forked from Kieran/snort
chore: move sqlite files
fix: debug print for deletes
This commit is contained in:
parent
b2b25377cd
commit
ebc45ae9c1
@ -13,6 +13,7 @@ export const Relay = new WorkerRelayInterface(
|
||||
);
|
||||
export async function initRelayWorker() {
|
||||
try {
|
||||
await Relay.debug("");
|
||||
await Relay.init({
|
||||
databasePath: "relay.db",
|
||||
insertBatchSize: 100,
|
||||
|
@ -1,16 +1,16 @@
|
||||
import { Connection, SyncCommand } from "../connection";
|
||||
import { FallbackSyncMethod } from "../system";
|
||||
import { EventExt, EventType } from "../event-ext";
|
||||
import { NoteCollection } from "../note-collection";
|
||||
import { RangeSync } from "./range-sync";
|
||||
import { NegentropyFlow } from "../negentropy/negentropy-flow";
|
||||
import { SystemConfig } from "../system";
|
||||
|
||||
export interface ConnectionSyncModule {
|
||||
sync: (c: Connection, item: SyncCommand, cb?: () => void) => void;
|
||||
}
|
||||
|
||||
export class DefaultSyncModule implements ConnectionSyncModule {
|
||||
constructor(readonly method: FallbackSyncMethod) {}
|
||||
constructor(readonly method: SystemConfig["fallbackSync"]) {}
|
||||
|
||||
sync(c: Connection, item: SyncCommand, cb?: () => void) {
|
||||
const [_, id, eventSet, ...filters] = item;
|
||||
@ -49,9 +49,9 @@ export class DefaultSyncModule implements ConnectionSyncModule {
|
||||
);
|
||||
if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplaceableSync) {
|
||||
c.request(["REQ", id, ...filters], cb);
|
||||
} else if (this.method === FallbackSyncMethod.Since) {
|
||||
} else if (this.method === "since") {
|
||||
this.#syncSince(c, item, cb);
|
||||
} else if (this.method === FallbackSyncMethod.RangeSync) {
|
||||
} else if (this.method === "range-sync") {
|
||||
this.#syncRangeSync(c, item, cb);
|
||||
} else {
|
||||
throw new Error("No fallback sync method");
|
||||
|
@ -5,7 +5,7 @@ import { EventsCache } from "./cache/events";
|
||||
import { UserFollowsCache } from "./cache/user-follows-lists";
|
||||
import { UserRelaysCache, UserProfileCache, RelayMetricCache, NostrEvent } from "./index";
|
||||
import { DefaultOptimizer, Optimizer } from "./query-optimizer";
|
||||
import { FallbackSyncMethod, NostrSystemEvents, SystemConfig } from "./system";
|
||||
import { NostrSystemEvents, SystemConfig } from "./system";
|
||||
import { EventEmitter } from "eventemitter3";
|
||||
|
||||
export abstract class SystemBase extends EventEmitter<NostrSystemEvents> {
|
||||
@ -30,7 +30,7 @@ export abstract class SystemBase extends EventEmitter<NostrSystemEvents> {
|
||||
db: props.db,
|
||||
automaticOutboxModel: props.automaticOutboxModel ?? true,
|
||||
buildFollowGraph: props.buildFollowGraph ?? false,
|
||||
fallbackSync: props.fallbackSync ?? FallbackSyncMethod.Since,
|
||||
fallbackSync: props.fallbackSync ?? "since",
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -95,12 +95,7 @@ export interface SystemConfig {
|
||||
/**
|
||||
* Pick a fallback sync method when negentropy is not available
|
||||
*/
|
||||
fallbackSync: FallbackSyncMethod;
|
||||
}
|
||||
|
||||
export enum FallbackSyncMethod {
|
||||
Since = "since",
|
||||
RangeSync = "range-sync",
|
||||
fallbackSync: "since" | "range-sync";
|
||||
}
|
||||
|
||||
export interface SystemInterface {
|
||||
|
5
packages/worker-relay/src/sqlite/fixers.ts
Normal file
5
packages/worker-relay/src/sqlite/fixers.ts
Normal file
@ -0,0 +1,5 @@
|
||||
import { SqliteRelay } from "./sqlite-relay";
|
||||
|
||||
export async function runFixers(relay: SqliteRelay) {
|
||||
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
import { NostrEvent } from "./types";
|
||||
import { NostrEvent } from "../types";
|
||||
import { SqliteRelay } from "./sqlite-relay";
|
||||
import { debugLog } from "./debug";
|
||||
import { debugLog } from "../debug";
|
||||
|
||||
const log = (msg: string, ...args: Array<any>) => debugLog("SqliteRelay:migrations", msg, ...args);
|
||||
|
@ -1,11 +1,12 @@
|
||||
import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
|
||||
import { EventEmitter } from "eventemitter3";
|
||||
import { EventMetadata, NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "./types";
|
||||
import { EventMetadata, NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "../types";
|
||||
import migrate from "./migrations";
|
||||
import { debugLog } from "./debug";
|
||||
import { debugLog } from "../debug";
|
||||
|
||||
// import wasm file directly, this needs to be copied from https://sqlite.org/download.html
|
||||
import SqlitePath from "./sqlite3.wasm?url";
|
||||
import { runFixers } from "./fixers";
|
||||
|
||||
export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements RelayHandler {
|
||||
#sqlite?: Sqlite3Static;
|
||||
@ -30,7 +31,11 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
});
|
||||
this.#log(`Got SQLite version: ${this.#sqlite.version.libVersion}`);
|
||||
await this.#open(path);
|
||||
this.db && migrate(this);
|
||||
if (this.db) {
|
||||
await migrate(this);
|
||||
// dont await to avoid timeout
|
||||
runFixers(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -103,10 +108,11 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
db.exec(`delete from events where id in (${this.#repeatParams(ids.length)})`, {
|
||||
bind: ids,
|
||||
});
|
||||
const deleted = db.changes();
|
||||
db.exec(`delete from search_content where id in (${this.#repeatParams(ids.length)})`, {
|
||||
bind: ids,
|
||||
});
|
||||
this.#log("Deleted", ids, db.changes());
|
||||
this.#log("Deleted", ids, deleted);
|
||||
}
|
||||
|
||||
#insertEvent(db: Database, ev: NostrEvent) {
|
||||
@ -154,17 +160,19 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", {
|
||||
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)],
|
||||
});
|
||||
let eventInserted = (this.db?.changes() as number) > 0;
|
||||
if (eventInserted) {
|
||||
const insertedEvents = db.changes();
|
||||
if (insertedEvents > 0) {
|
||||
for (const t of ev.tags.filter(a => a[0].length === 1)) {
|
||||
db.exec("insert into tags(event_id, key, value) values(?, ?, ?)", {
|
||||
bind: [ev.id, t[0], t[1]],
|
||||
});
|
||||
}
|
||||
this.insertIntoSearchIndex(db, ev);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
this.#seenInserts.add(ev.id);
|
||||
return eventInserted;
|
||||
return insertedEvents;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -379,4 +387,8 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#fixMissingTags(db: Database) {
|
||||
|
||||
}
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
/// <reference lib="webworker" />
|
||||
|
||||
import { SqliteRelay } from "./sqlite-relay";
|
||||
import { SqliteRelay } from "./sqlite/sqlite-relay";
|
||||
import { InMemoryRelay } from "./memory-relay";
|
||||
import { setLogging } from "./debug";
|
||||
import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";
|
||||
|
Loading…
Reference in New Issue
Block a user