diff --git a/packages/app/public/snort/_headers b/packages/app/public/snort/_headers index 59ae3bb9..0fb8e4cc 100644 --- a/packages/app/public/snort/_headers +++ b/packages/app/public/snort/_headers @@ -1,5 +1,4 @@ /* Content-Security-Policy: default-src 'self'; manifest-src *; child-src 'none'; worker-src 'self'; frame-src youtube.com www.youtube.com https://platform.twitter.com https://embed.tidal.com https://w.soundcloud.com https://www.mixcloud.com https://open.spotify.com https://player.twitch.tv https://embed.music.apple.com https://nostrnests.com https://embed.wavlake.com https://challenges.cloudflare.com; style-src 'self' 'unsafe-inline'; connect-src *; img-src * data: blob:; font-src 'self'; media-src * blob:; script-src 'self' 'wasm-unsafe-eval' https://analytics.v0l.io https://platform.twitter.com https://embed.tidal.com https://challenges.cloudflare.com; - Cross-Origin-Resource-Policy: corss-origin Cross-Origin-Opener-Policy: same-origin - Cross-Origin-Embedder-Policy: credentialless \ No newline at end of file + Cross-Origin-Embedder-Policy: cross-origin \ No newline at end of file diff --git a/packages/app/src/Cache/index.ts b/packages/app/src/Cache/index.ts index be0f9888..73c98604 100644 --- a/packages/app/src/Cache/index.ts +++ b/packages/app/src/Cache/index.ts @@ -11,11 +11,7 @@ import { ProfileCacheRelayWorker } from "./ProfileWorkeCache"; export const Relay = new WorkerRelayInterface(WorkerRelayPath); export async function initRelayWorker() { try { - if (await Relay.init()) { - if (await Relay.open()) { - await Relay.migrate(); - } - } + await Relay.init("relay.db"); } catch (e) { console.error(e); } diff --git a/packages/app/vite.config.ts b/packages/app/vite.config.ts index 88882809..5d4d8ebd 100644 --- a/packages/app/vite.config.ts +++ b/packages/app/vite.config.ts @@ -49,9 +49,8 @@ export default defineConfig({ }, server: { headers: { - "Cross-Origin-Resource-Policy": "corss-origin", "Cross-Origin-Opener-Policy": "same-origin", - "Cross-Origin-Embedder-Policy": "credentialless", + "Cross-Origin-Embedder-Policy": "require-corp", }, }, optimizeDeps: { diff --git a/packages/worker-relay/src/interface.ts b/packages/worker-relay/src/interface.ts index 92b58c64..c41e3991 100644 --- a/packages/worker-relay/src/interface.ts +++ b/packages/worker-relay/src/interface.ts @@ -1,14 +1,11 @@ -import debug from "debug"; -import { NostrEvent, ReqCommand, WorkerMessage } from "./types"; +import { NostrEvent, ReqCommand, WorkerMessage, WorkerMessageCommand } from "./types"; import { v4 as uuid } from "uuid"; export class WorkerRelayInterface { #worker: Worker; - #log = (msg: any) => console.debug(msg); #commandQueue: Map) => void> = new Map(); constructor(path: string) { - this.#log(`Module path: ${path}`); this.#worker = new Worker(path, { type: "module" }); this.#worker.onmessage = e => { const cmd = e.data as WorkerMessage; @@ -20,16 +17,8 @@ export class WorkerRelayInterface { }; } - async init() { - return (await this.#workerRpc("init")).result; - } - - async open() { - return (await this.#workerRpc("open")).result; - } - - async migrate() { - return (await this.#workerRpc("migrate")).result; + async init(path: string) { + return (await this.#workerRpc("init", path)).result; } async event(ev: NostrEvent) { @@ -60,7 +49,7 @@ export class WorkerRelayInterface { return (await this.#workerRpc>>("sql", { sql, params })).result; } - #workerRpc(cmd: string, args?: T) { + #workerRpc(cmd: WorkerMessageCommand, args?: T) { const id = uuid(); const msg = { id, diff --git a/packages/worker-relay/src/memory-relay.ts b/packages/worker-relay/src/memory-relay.ts new file mode 100644 index 00000000..088dcb10 --- /dev/null +++ b/packages/worker-relay/src/memory-relay.ts @@ -0,0 +1,77 @@ +import EventEmitter from "eventemitter3"; +import { NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, eventMatchesFilter } from "./types"; + +/** + * A very simple dumb fallback relay using a flat table + */ +export class InMemoryRelay extends EventEmitter implements RelayHandler { + #events: Map = new Map(); + #log = (...args: any[]) => console.debug(...args); + + init(path: string): Promise { + this.#log("Using in-memory relay"); + return Promise.resolve(); + } + + count(req: ReqFilter): number { + let ret = 0; + for (const [, e] of this.#events) { + if (eventMatchesFilter(e, req)) { + ret++; + } + } + return ret; + } + + summary(): Record { + let ret = {} as Record; + for (const [k, v] of this.#events) { + ret[v.kind.toString()] ??= 0; + ret[v.kind.toString()]++; + } + return ret; + } + + dump(): Promise { + return Promise.resolve(new Uint8Array()); + } + + close(): void { + // nothing + } + + event(ev: NostrEvent) { + if (this.#events.has(ev.id)) return false; + this.#events.set(ev.id, ev); + this.emit("event", [ev]); + return true; + } + + eventBatch(evs: NostrEvent[]) { + const inserted = []; + for (const ev of evs) { + if (this.#events.has(ev.id)) continue; + this.#events.set(ev.id, ev); + inserted.push(ev); + } + if (inserted.length > 0) { + this.emit("event", inserted); + return true; + } + return false; + } + + sql(sql: string, params: (string | number)[]): (string | number)[][] { + return []; + } + + req(id: string, filter: ReqFilter) { + const ret = []; + for (const [, e] of this.#events) { + if (eventMatchesFilter(e, filter)) { + ret.push(e); + } + } + return ret; + } +} diff --git a/packages/worker-relay/src/relay.ts b/packages/worker-relay/src/relay.ts index b443aa50..0326da09 100644 --- a/packages/worker-relay/src/relay.ts +++ b/packages/worker-relay/src/relay.ts @@ -1,13 +1,8 @@ import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm"; -import debug from "debug"; import { EventEmitter } from "eventemitter3"; -import { NostrEvent, ReqFilter, unixNowMs } from "./types"; +import { NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "./types"; -export interface WorkerRelayEvents { - event: (evs: Array) => void; -} - -export class WorkerRelay extends EventEmitter { +export class WorkerRelay extends EventEmitter implements RelayHandler { #sqlite?: Sqlite3Static; #log = (...args: any[]) => console.debug(...args); #db?: Database; @@ -16,16 +11,18 @@ export class WorkerRelay extends EventEmitter { /** * Initialize the SQLite driver */ - async init() { + async init(path: string) { if (this.#sqlite) return; this.#sqlite = await sqlite3InitModule(); this.#log(`Got SQLite version: ${this.#sqlite.version.libVersion}`); + await this.#open(path); + this.#migrate(); } /** * Open the database from its path */ - async open(path: string) { + async #open(path: string) { if (!this.#sqlite) throw new Error("Must call init first"); if (this.#db) return; @@ -50,7 +47,7 @@ export class WorkerRelay extends EventEmitter { /** * Do database migration */ - migrate() { + #migrate() { if (!this.#db) throw new Error("DB must be open"); this.#db.exec( @@ -88,7 +85,7 @@ export class WorkerRelay extends EventEmitter { * Run any SQL command */ sql(sql: string, params: Array) { - return this.#db?.selectArrays(sql, params); + return this.#db?.selectArrays(sql, params) as Array>; } /** @@ -229,7 +226,7 @@ export class WorkerRelay extends EventEmitter { } catch (e) { console.error(e); } finally { - this.open(filePath); + await this.#open(filePath); } return new Uint8Array(); } diff --git a/packages/worker-relay/src/types.ts b/packages/worker-relay/src/types.ts index 856f3566..bdb168fb 100644 --- a/packages/worker-relay/src/types.ts +++ b/packages/worker-relay/src/types.ts @@ -1,6 +1,19 @@ +import { EventEmitter } from "eventemitter3"; + +export type WorkerMessageCommand = + | "reply" + | "init" + | "event" + | "req" + | "count" + | "summary" + | "close" + | "dumpDb" + | "sql"; + export interface WorkerMessage { id: string; - cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb" | "sql"; + cmd: WorkerMessageCommand; args: T; } @@ -32,6 +45,51 @@ export interface ReqFilter { [key: string]: Array | Array | string | number | undefined | ReqFilter; } +export interface RelayHandler extends EventEmitter { + init(path: string): Promise; + close(): void; + event(ev: NostrEvent): boolean; + eventBatch(evs: Array): boolean; + sql(sql: string, params: Array): Array>; + req(id: string, req: ReqFilter): Array; + count(req: ReqFilter): number; + summary(): Record; + dump(): Promise; +} + +export interface RelayHandlerEvents { + event: (evs: Array) => void; +} + export function unixNowMs() { return new Date().getTime(); } + +export function eventMatchesFilter(ev: NostrEvent, filter: ReqFilter) { + if (filter.since && ev.created_at < filter.since) { + return false; + } + if (filter.until && ev.created_at > filter.until) { + return false; + } + if (!(filter.ids?.includes(ev.id) ?? true)) { + return false; + } + if (!(filter.authors?.includes(ev.pubkey) ?? true)) { + return false; + } + if (!(filter.kinds?.includes(ev.kind) ?? true)) { + return false; + } + const tags = Object.entries(filter).filter(([k]) => k.startsWith("#")); + for (const [k, v] of tags) { + const vargs = v as Array; + for (const x of vargs) { + if (!ev.tags.find(a => a[0] === k.slice(1) && a[1] === x)) { + return false; + } + } + } + + return true; +} diff --git a/packages/worker-relay/src/worker.ts b/packages/worker-relay/src/worker.ts index 46c3ec32..4dfafb8f 100644 --- a/packages/worker-relay/src/worker.ts +++ b/packages/worker-relay/src/worker.ts @@ -1,8 +1,9 @@ /// +import { InMemoryRelay } from "./memory-relay"; import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue"; import { WorkerRelay } from "./relay"; -import { NostrEvent, ReqCommand, ReqFilter, WorkerMessage } from "./types"; +import { NostrEvent, RelayHandler, ReqCommand, ReqFilter, WorkerMessage, eventMatchesFilter } from "./types"; interface PortedFilter { filters: Array; @@ -12,23 +13,7 @@ interface PortedFilter { // Active open subscriptions awaiting new events const ActiveSubscriptions = new Map(); -const relay = new WorkerRelay(); -relay.on("event", evs => { - for (const pf of ActiveSubscriptions.values()) { - const pfSend = []; - for (const ev of evs) { - for (const fx of pf.filters) { - if (eventMatchesFilter(ev, fx)) { - pfSend.push(ev); - continue; - } - } - } - if (pfSend.length > 0) { - pf.port.postMessage(pfSend); - } - } -}); +let relay: RelayHandler | undefined; async function reply(id: string, obj?: T, transferables?: Transferable[]) { globalThis.postMessage( @@ -46,7 +31,7 @@ let eventWriteQueue: Array = []; async function insertBatch() { // Only insert event batches when the command queue is empty // This is to make req's execute first and not block them - if (eventWriteQueue.length > 0 && cmdQueue.length === 0) { + if (relay && eventWriteQueue.length > 0 && cmdQueue.length === 0) { relay.eventBatch(eventWriteQueue); eventWriteQueue = []; } @@ -57,31 +42,49 @@ setTimeout(() => insertBatch(), 100); const cmdQueue: Array = []; processWorkQueue(cmdQueue, 50); +async function tryOpfs() { + try { + await navigator.storage.getDirectory(); + return true; + } catch { + // ignore + } + return false; +} + globalThis.onclose = () => { - relay.close(); + relay?.close(); }; -globalThis.onmessage = ev => { +globalThis.onmessage = async ev => { const msg = ev.data as WorkerMessage; try { switch (msg.cmd) { case "init": { - barrierQueue(cmdQueue, async () => { - await relay.init(); - reply(msg.id, true); - }); - break; - } - case "open": { - barrierQueue(cmdQueue, async () => { - await relay.open("/relay.db"); - reply(msg.id, true); - }); - break; - } - case "migrate": { - barrierQueue(cmdQueue, async () => { - relay.migrate(); + await barrierQueue(cmdQueue, async () => { + if ("WebAssembly" in globalThis && (await tryOpfs())) { + relay = new WorkerRelay(); + } else { + relay = new InMemoryRelay(); + } + + relay.on("event", evs => { + for (const pf of ActiveSubscriptions.values()) { + const pfSend = []; + for (const ev of evs) { + for (const fx of pf.filters) { + if (eventMatchesFilter(ev, fx)) { + pfSend.push(ev); + continue; + } + } + } + if (pfSend.length > 0) { + pf.port.postMessage(pfSend); + } + } + }); + await relay.init(msg.args as string); reply(msg.id, true); }); break; @@ -97,7 +100,7 @@ globalThis.onmessage = ev => { break; } case "req": { - barrierQueue(cmdQueue, async () => { + await barrierQueue(cmdQueue, async () => { const req = msg.args as ReqCommand; const chan = new MessageChannel(); if (req.leaveOpen) { @@ -108,18 +111,18 @@ globalThis.onmessage = ev => { } const results = []; for (const r of req.filters) { - results.push(...relay.req(req.id, r as ReqFilter)); + results.push(...relay!.req(req.id, r as ReqFilter)); } reply(msg.id, results, req.leaveOpen ? [chan.port2] : undefined); }); break; } case "count": { - barrierQueue(cmdQueue, async () => { + await barrierQueue(cmdQueue, async () => { const req = msg.args as ReqCommand; let results = 0; for (const r of req.filters) { - const c = relay.count(r as ReqFilter); + const c = relay!.count(r as ReqFilter); results += c; } reply(msg.id, results); @@ -127,26 +130,26 @@ globalThis.onmessage = ev => { break; } case "summary": { - barrierQueue(cmdQueue, async () => { - const res = relay.summary(); + await barrierQueue(cmdQueue, async () => { + const res = relay!.summary(); reply(msg.id, res); }); break; } case "dumpDb": { - barrierQueue(cmdQueue, async () => { - const res = await relay.dump(); + await barrierQueue(cmdQueue, async () => { + const res = await relay!.dump(); reply(msg.id, res); }); break; } case "sql": { - barrierQueue(cmdQueue, async () => { + await barrierQueue(cmdQueue, async () => { const req = msg.args as { sql: string; params: Array; }; - const res = relay.sql(req.sql, req.params); + const res = relay!.sql(req.sql, req.params); reply(msg.id, res); }); break; @@ -161,22 +164,3 @@ globalThis.onmessage = ev => { reply(msg.id, { error: e }); } }; - -export function eventMatchesFilter(ev: NostrEvent, filter: ReqFilter) { - if (filter.since && ev.created_at < filter.since) { - return false; - } - if (filter.until && ev.created_at > filter.until) { - return false; - } - if (!(filter.ids?.includes(ev.id) ?? true)) { - return false; - } - if (!(filter.authors?.includes(ev.pubkey) ?? true)) { - return false; - } - if (!(filter.kinds?.includes(ev.kind) ?? true)) { - return false; - } - return true; -}