feat: add fallback memory relay

This commit is contained in:
Kieran 2024-01-19 11:56:07 +00:00
parent 8e33d10319
commit cb0b75c652
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
8 changed files with 202 additions and 103 deletions

View File

@ -1,5 +1,4 @@
/*
Content-Security-Policy: default-src 'self'; manifest-src *; child-src 'none'; worker-src 'self'; frame-src youtube.com www.youtube.com https://platform.twitter.com https://embed.tidal.com https://w.soundcloud.com https://www.mixcloud.com https://open.spotify.com https://player.twitch.tv https://embed.music.apple.com https://nostrnests.com https://embed.wavlake.com https://challenges.cloudflare.com; style-src 'self' 'unsafe-inline'; connect-src *; img-src * data: blob:; font-src 'self'; media-src * blob:; script-src 'self' 'wasm-unsafe-eval' https://analytics.v0l.io https://platform.twitter.com https://embed.tidal.com https://challenges.cloudflare.com;
Cross-Origin-Resource-Policy: corss-origin
Cross-Origin-Opener-Policy: same-origin
Cross-Origin-Embedder-Policy: credentialless
Cross-Origin-Embedder-Policy: cross-origin

View File

@ -11,11 +11,7 @@ import { ProfileCacheRelayWorker } from "./ProfileWorkeCache";
export const Relay = new WorkerRelayInterface(WorkerRelayPath);
export async function initRelayWorker() {
try {
if (await Relay.init()) {
if (await Relay.open()) {
await Relay.migrate();
}
}
await Relay.init("relay.db");
} catch (e) {
console.error(e);
}

View File

@ -49,9 +49,8 @@ export default defineConfig({
},
server: {
headers: {
"Cross-Origin-Resource-Policy": "corss-origin",
"Cross-Origin-Opener-Policy": "same-origin",
"Cross-Origin-Embedder-Policy": "credentialless",
"Cross-Origin-Embedder-Policy": "require-corp",
},
},
optimizeDeps: {

View File

@ -1,14 +1,11 @@
import debug from "debug";
import { NostrEvent, ReqCommand, WorkerMessage } from "./types";
import { NostrEvent, ReqCommand, WorkerMessage, WorkerMessageCommand } from "./types";
import { v4 as uuid } from "uuid";
export class WorkerRelayInterface {
#worker: Worker;
#log = (msg: any) => console.debug(msg);
#commandQueue: Map<string, (v: unknown, ports: ReadonlyArray<MessagePort>) => void> = new Map();
constructor(path: string) {
this.#log(`Module path: ${path}`);
this.#worker = new Worker(path, { type: "module" });
this.#worker.onmessage = e => {
const cmd = e.data as WorkerMessage<any>;
@ -20,16 +17,8 @@ export class WorkerRelayInterface {
};
}
async init() {
return (await this.#workerRpc<void, boolean>("init")).result;
}
async open() {
return (await this.#workerRpc<void, boolean>("open")).result;
}
async migrate() {
return (await this.#workerRpc<void, boolean>("migrate")).result;
async init(path: string) {
return (await this.#workerRpc<string, boolean>("init", path)).result;
}
async event(ev: NostrEvent) {
@ -60,7 +49,7 @@ export class WorkerRelayInterface {
return (await this.#workerRpc<object, Array<Array<any>>>("sql", { sql, params })).result;
}
#workerRpc<T, R>(cmd: string, args?: T) {
#workerRpc<T, R>(cmd: WorkerMessageCommand, args?: T) {
const id = uuid();
const msg = {
id,

View File

@ -0,0 +1,77 @@
import EventEmitter from "eventemitter3";
import { NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, eventMatchesFilter } from "./types";
/**
* A very simple dumb fallback relay using a flat table
*/
export class InMemoryRelay extends EventEmitter<RelayHandlerEvents> implements RelayHandler {
#events: Map<string, NostrEvent> = new Map();
#log = (...args: any[]) => console.debug(...args);
init(path: string): Promise<void> {
this.#log("Using in-memory relay");
return Promise.resolve();
}
count(req: ReqFilter): number {
let ret = 0;
for (const [, e] of this.#events) {
if (eventMatchesFilter(e, req)) {
ret++;
}
}
return ret;
}
summary(): Record<string, number> {
let ret = {} as Record<string, number>;
for (const [k, v] of this.#events) {
ret[v.kind.toString()] ??= 0;
ret[v.kind.toString()]++;
}
return ret;
}
dump(): Promise<Uint8Array> {
return Promise.resolve(new Uint8Array());
}
close(): void {
// nothing
}
event(ev: NostrEvent) {
if (this.#events.has(ev.id)) return false;
this.#events.set(ev.id, ev);
this.emit("event", [ev]);
return true;
}
eventBatch(evs: NostrEvent[]) {
const inserted = [];
for (const ev of evs) {
if (this.#events.has(ev.id)) continue;
this.#events.set(ev.id, ev);
inserted.push(ev);
}
if (inserted.length > 0) {
this.emit("event", inserted);
return true;
}
return false;
}
sql(sql: string, params: (string | number)[]): (string | number)[][] {
return [];
}
req(id: string, filter: ReqFilter) {
const ret = [];
for (const [, e] of this.#events) {
if (eventMatchesFilter(e, filter)) {
ret.push(e);
}
}
return ret;
}
}

View File

@ -1,13 +1,8 @@
import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
import debug from "debug";
import { EventEmitter } from "eventemitter3";
import { NostrEvent, ReqFilter, unixNowMs } from "./types";
import { NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "./types";
export interface WorkerRelayEvents {
event: (evs: Array<NostrEvent>) => void;
}
export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
export class WorkerRelay extends EventEmitter<RelayHandlerEvents> implements RelayHandler {
#sqlite?: Sqlite3Static;
#log = (...args: any[]) => console.debug(...args);
#db?: Database;
@ -16,16 +11,18 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
/**
* Initialize the SQLite driver
*/
async init() {
async init(path: string) {
if (this.#sqlite) return;
this.#sqlite = await sqlite3InitModule();
this.#log(`Got SQLite version: ${this.#sqlite.version.libVersion}`);
await this.#open(path);
this.#migrate();
}
/**
* Open the database from its path
*/
async open(path: string) {
async #open(path: string) {
if (!this.#sqlite) throw new Error("Must call init first");
if (this.#db) return;
@ -50,7 +47,7 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
/**
* Do database migration
*/
migrate() {
#migrate() {
if (!this.#db) throw new Error("DB must be open");
this.#db.exec(
@ -88,7 +85,7 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
* Run any SQL command
*/
sql(sql: string, params: Array<any>) {
return this.#db?.selectArrays(sql, params);
return this.#db?.selectArrays(sql, params) as Array<Array<string | number>>;
}
/**
@ -229,7 +226,7 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
} catch (e) {
console.error(e);
} finally {
this.open(filePath);
await this.#open(filePath);
}
return new Uint8Array();
}

View File

@ -1,6 +1,19 @@
import { EventEmitter } from "eventemitter3";
export type WorkerMessageCommand =
| "reply"
| "init"
| "event"
| "req"
| "count"
| "summary"
| "close"
| "dumpDb"
| "sql";
export interface WorkerMessage<T> {
id: string;
cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb" | "sql";
cmd: WorkerMessageCommand;
args: T;
}
@ -32,6 +45,51 @@ export interface ReqFilter {
[key: string]: Array<string> | Array<number> | string | number | undefined | ReqFilter;
}
export interface RelayHandler extends EventEmitter<RelayHandlerEvents> {
init(path: string): Promise<void>;
close(): void;
event(ev: NostrEvent): boolean;
eventBatch(evs: Array<NostrEvent>): boolean;
sql(sql: string, params: Array<string | number>): Array<Array<string | number>>;
req(id: string, req: ReqFilter): Array<NostrEvent>;
count(req: ReqFilter): number;
summary(): Record<string, number>;
dump(): Promise<Uint8Array>;
}
export interface RelayHandlerEvents {
event: (evs: Array<NostrEvent>) => void;
}
export function unixNowMs() {
return new Date().getTime();
}
export function eventMatchesFilter(ev: NostrEvent, filter: ReqFilter) {
if (filter.since && ev.created_at < filter.since) {
return false;
}
if (filter.until && ev.created_at > filter.until) {
return false;
}
if (!(filter.ids?.includes(ev.id) ?? true)) {
return false;
}
if (!(filter.authors?.includes(ev.pubkey) ?? true)) {
return false;
}
if (!(filter.kinds?.includes(ev.kind) ?? true)) {
return false;
}
const tags = Object.entries(filter).filter(([k]) => k.startsWith("#"));
for (const [k, v] of tags) {
const vargs = v as Array<string>;
for (const x of vargs) {
if (!ev.tags.find(a => a[0] === k.slice(1) && a[1] === x)) {
return false;
}
}
}
return true;
}

View File

@ -1,8 +1,9 @@
/// <reference lib="webworker" />
import { InMemoryRelay } from "./memory-relay";
import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";
import { WorkerRelay } from "./relay";
import { NostrEvent, ReqCommand, ReqFilter, WorkerMessage } from "./types";
import { NostrEvent, RelayHandler, ReqCommand, ReqFilter, WorkerMessage, eventMatchesFilter } from "./types";
interface PortedFilter {
filters: Array<ReqFilter>;
@ -12,23 +13,7 @@ interface PortedFilter {
// Active open subscriptions awaiting new events
const ActiveSubscriptions = new Map<string, PortedFilter>();
const relay = new WorkerRelay();
relay.on("event", evs => {
for (const pf of ActiveSubscriptions.values()) {
const pfSend = [];
for (const ev of evs) {
for (const fx of pf.filters) {
if (eventMatchesFilter(ev, fx)) {
pfSend.push(ev);
continue;
}
}
}
if (pfSend.length > 0) {
pf.port.postMessage(pfSend);
}
}
});
let relay: RelayHandler | undefined;
async function reply<T>(id: string, obj?: T, transferables?: Transferable[]) {
globalThis.postMessage(
@ -46,7 +31,7 @@ let eventWriteQueue: Array<NostrEvent> = [];
async function insertBatch() {
// Only insert event batches when the command queue is empty
// This is to make req's execute first and not block them
if (eventWriteQueue.length > 0 && cmdQueue.length === 0) {
if (relay && eventWriteQueue.length > 0 && cmdQueue.length === 0) {
relay.eventBatch(eventWriteQueue);
eventWriteQueue = [];
}
@ -57,31 +42,49 @@ setTimeout(() => insertBatch(), 100);
const cmdQueue: Array<WorkQueueItem> = [];
processWorkQueue(cmdQueue, 50);
async function tryOpfs() {
try {
await navigator.storage.getDirectory();
return true;
} catch {
// ignore
}
return false;
}
globalThis.onclose = () => {
relay.close();
relay?.close();
};
globalThis.onmessage = ev => {
globalThis.onmessage = async ev => {
const msg = ev.data as WorkerMessage<any>;
try {
switch (msg.cmd) {
case "init": {
barrierQueue(cmdQueue, async () => {
await relay.init();
reply(msg.id, true);
});
break;
await barrierQueue(cmdQueue, async () => {
if ("WebAssembly" in globalThis && (await tryOpfs())) {
relay = new WorkerRelay();
} else {
relay = new InMemoryRelay();
}
case "open": {
barrierQueue(cmdQueue, async () => {
await relay.open("/relay.db");
reply(msg.id, true);
});
break;
relay.on("event", evs => {
for (const pf of ActiveSubscriptions.values()) {
const pfSend = [];
for (const ev of evs) {
for (const fx of pf.filters) {
if (eventMatchesFilter(ev, fx)) {
pfSend.push(ev);
continue;
}
case "migrate": {
barrierQueue(cmdQueue, async () => {
relay.migrate();
}
}
if (pfSend.length > 0) {
pf.port.postMessage(pfSend);
}
}
});
await relay.init(msg.args as string);
reply(msg.id, true);
});
break;
@ -97,7 +100,7 @@ globalThis.onmessage = ev => {
break;
}
case "req": {
barrierQueue(cmdQueue, async () => {
await barrierQueue(cmdQueue, async () => {
const req = msg.args as ReqCommand;
const chan = new MessageChannel();
if (req.leaveOpen) {
@ -108,18 +111,18 @@ globalThis.onmessage = ev => {
}
const results = [];
for (const r of req.filters) {
results.push(...relay.req(req.id, r as ReqFilter));
results.push(...relay!.req(req.id, r as ReqFilter));
}
reply(msg.id, results, req.leaveOpen ? [chan.port2] : undefined);
});
break;
}
case "count": {
barrierQueue(cmdQueue, async () => {
await barrierQueue(cmdQueue, async () => {
const req = msg.args as ReqCommand;
let results = 0;
for (const r of req.filters) {
const c = relay.count(r as ReqFilter);
const c = relay!.count(r as ReqFilter);
results += c;
}
reply(msg.id, results);
@ -127,26 +130,26 @@ globalThis.onmessage = ev => {
break;
}
case "summary": {
barrierQueue(cmdQueue, async () => {
const res = relay.summary();
await barrierQueue(cmdQueue, async () => {
const res = relay!.summary();
reply(msg.id, res);
});
break;
}
case "dumpDb": {
barrierQueue(cmdQueue, async () => {
const res = await relay.dump();
await barrierQueue(cmdQueue, async () => {
const res = await relay!.dump();
reply(msg.id, res);
});
break;
}
case "sql": {
barrierQueue(cmdQueue, async () => {
await barrierQueue(cmdQueue, async () => {
const req = msg.args as {
sql: string;
params: Array<any>;
};
const res = relay.sql(req.sql, req.params);
const res = relay!.sql(req.sql, req.params);
reply(msg.id, res);
});
break;
@ -161,22 +164,3 @@ globalThis.onmessage = ev => {
reply(msg.id, { error: e });
}
};
export function eventMatchesFilter(ev: NostrEvent, filter: ReqFilter) {
if (filter.since && ev.created_at < filter.since) {
return false;
}
if (filter.until && ev.created_at > filter.until) {
return false;
}
if (!(filter.ids?.includes(ev.id) ?? true)) {
return false;
}
if (!(filter.authors?.includes(ev.pubkey) ?? true)) {
return false;
}
if (!(filter.kinds?.includes(ev.kind) ?? true)) {
return false;
}
return true;
}