From 37000ecc7ab6d6cc4103af992cb9883024630663 Mon Sep 17 00:00:00 2001 From: kieran Date: Tue, 18 Jun 2024 10:49:29 +0100 Subject: [PATCH] feat: use localhost relay over worker-relay --- packages/app/src/Cache/EventCacheWorker.ts | 7 +-- packages/app/src/Cache/ProfileWorkerCache.ts | 7 +-- packages/app/src/Cache/UserFollowsWorker.ts | 7 +-- packages/app/src/Cache/index.ts | 38 +++++++++++-- packages/app/src/Pages/settings/Cache.tsx | 9 ++- packages/app/src/system.ts | 1 - packages/system/src/connection-cache-relay.ts | 55 +++++++++++++++++++ packages/system/src/connection.ts | 42 ++++++++------ packages/system/src/index.ts | 2 + 9 files changed, 130 insertions(+), 38 deletions(-) create mode 100644 packages/system/src/connection-cache-relay.ts diff --git a/packages/app/src/Cache/EventCacheWorker.ts b/packages/app/src/Cache/EventCacheWorker.ts index af6b216c..6c40f4af 100644 --- a/packages/app/src/Cache/EventCacheWorker.ts +++ b/packages/app/src/Cache/EventCacheWorker.ts @@ -1,14 +1,13 @@ import { CachedTable, CacheEvents } from "@snort/shared"; -import { NostrEvent } from "@snort/system"; -import { WorkerRelayInterface } from "@snort/worker-relay"; +import { CacheRelay, NostrEvent } from "@snort/system"; import { EventEmitter } from "eventemitter3"; export class EventCacheWorker extends EventEmitter implements CachedTable { - #relay: WorkerRelayInterface; + #relay: CacheRelay; #keys = new Set(); #cache = new Map(); - constructor(relay: WorkerRelayInterface) { + constructor(relay: CacheRelay) { super(); this.#relay = relay; } diff --git a/packages/app/src/Cache/ProfileWorkerCache.ts b/packages/app/src/Cache/ProfileWorkerCache.ts index 0843600b..b526f7d2 100644 --- a/packages/app/src/Cache/ProfileWorkerCache.ts +++ b/packages/app/src/Cache/ProfileWorkerCache.ts @@ -1,16 +1,15 @@ import { CachedTable, CacheEvents, removeUndefined, unixNowMs, unwrap } from "@snort/shared"; -import { CachedMetadata, mapEventToProfile, NostrEvent } from "@snort/system"; -import { WorkerRelayInterface } from "@snort/worker-relay"; +import { CachedMetadata, CacheRelay, mapEventToProfile, NostrEvent } from "@snort/system"; import debug from "debug"; import { EventEmitter } from "eventemitter3"; export class ProfileCacheRelayWorker extends EventEmitter implements CachedTable { - #relay: WorkerRelayInterface; + #relay: CacheRelay; #keys = new Set(); #cache = new Map(); #log = debug("ProfileCacheRelayWorker"); - constructor(relay: WorkerRelayInterface) { + constructor(relay: CacheRelay) { super(); this.#relay = relay; } diff --git a/packages/app/src/Cache/UserFollowsWorker.ts b/packages/app/src/Cache/UserFollowsWorker.ts index c128b910..3530a403 100644 --- a/packages/app/src/Cache/UserFollowsWorker.ts +++ b/packages/app/src/Cache/UserFollowsWorker.ts @@ -1,16 +1,15 @@ import { CachedTable, CacheEvents, removeUndefined, unixNowMs, unwrap } from "@snort/shared"; -import { EventKind, NostrEvent, UsersFollows } from "@snort/system"; -import { WorkerRelayInterface } from "@snort/worker-relay"; +import { CacheRelay, EventKind, NostrEvent, UsersFollows } from "@snort/system"; import debug from "debug"; import { EventEmitter } from "eventemitter3"; export class UserFollowsWorker extends EventEmitter implements CachedTable { - #relay: WorkerRelayInterface; + #relay: CacheRelay; #keys = new Set(); #cache = new Map(); #log = debug("UserFollowsWorker"); - constructor(relay: WorkerRelayInterface) { + constructor(relay: CacheRelay) { super(); this.#relay = relay; } diff --git a/packages/app/src/Cache/index.ts b/packages/app/src/Cache/index.ts index 6bb1dd22..194f5ea1 100644 --- a/packages/app/src/Cache/index.ts +++ b/packages/app/src/Cache/index.ts @@ -1,4 +1,4 @@ -import { RelayMetricCache, UserRelaysCache } from "@snort/system"; +import { CacheRelay, Connection, ConnectionCacheRelay, RelayMetricCache, UserRelaysCache } from "@snort/system"; import { SnortSystemDb } from "@snort/system-web"; import { WorkerRelayInterface } from "@snort/worker-relay"; import WorkerVite from "@snort/worker-relay/src/worker?worker"; @@ -8,13 +8,41 @@ import { GiftWrapCache } from "./GiftWrapCache"; import { ProfileCacheRelayWorker } from "./ProfileWorkerCache"; import { UserFollowsWorker } from "./UserFollowsWorker"; -export const Relay = new WorkerRelayInterface( +const cacheRelay = localStorage.getItem("cache-relay"); + +const workerRelay = new WorkerRelayInterface( import.meta.env.DEV ? new URL("@snort/worker-relay/dist/esm/worker.mjs", import.meta.url) : new WorkerVite(), ); -export async function initRelayWorker() { + +export const Relay: CacheRelay = cacheRelay + ? new ConnectionCacheRelay(new Connection(cacheRelay, { read: true, write: true })) + : workerRelay; + +async function tryUseCacheRelay(url: string) { try { - await Relay.debug(""); - await Relay.init({ + const conn = new Connection(url, { read: true, write: true }); + await conn.connect(true); + localStorage.setItem("cache-relay", url); + return conn; + } catch (e) { + console.error(e); + } +} + +export async function initRelayWorker() { + if (!cacheRelay) { + let conn = await tryUseCacheRelay("ws://localhost:4869"); + if (!conn) { + conn = await tryUseCacheRelay("ws://umbrel:4848"); + } + if (conn) return; + } else if (Relay instanceof ConnectionCacheRelay) { + await Relay.connection.connect(); + } + + try { + await workerRelay.debug(""); + await workerRelay.init({ databasePath: "relay.db", insertBatchSize: 100, }); diff --git a/packages/app/src/Pages/settings/Cache.tsx b/packages/app/src/Pages/settings/Cache.tsx index b479b609..0f38cb5c 100644 --- a/packages/app/src/Pages/settings/Cache.tsx +++ b/packages/app/src/Pages/settings/Cache.tsx @@ -6,6 +6,7 @@ import { useNavigate } from "react-router-dom"; import { GiftsCache, Relay, RelayMetrics } from "@/Cache"; import AsyncButton from "@/Components/Button/AsyncButton"; import useLogin from "@/Hooks/useLogin"; +import { WorkerRelayInterface } from "@snort/worker-relay"; export function CacheSettings() { return ( @@ -56,9 +57,11 @@ function RelayCacheStats() { const navigate = useNavigate(); useEffect(() => { - Relay.summary().then(setCounts); - if (login.publicKey) { - Relay.count(["REQ", "my", { authors: [login.publicKey] }]).then(setMyEvents); + if (Relay instanceof WorkerRelayInterface) { + Relay.summary().then(setCounts); + if (login.publicKey) { + Relay.count(["REQ", "my", { authors: [login.publicKey] }]).then(setMyEvents); + } } }, []); diff --git a/packages/app/src/system.ts b/packages/app/src/system.ts index 05e7f8fe..e62d1e3f 100644 --- a/packages/app/src/system.ts +++ b/packages/app/src/system.ts @@ -30,7 +30,6 @@ System.on("auth", async (c, r, cb) => { }); System.on("event", (_, ev) => { - Relay.event(ev); EventsCache.discover(ev); UserCache.discover(ev); addEventToFuzzySearch(ev); diff --git a/packages/system/src/connection-cache-relay.ts b/packages/system/src/connection-cache-relay.ts new file mode 100644 index 00000000..900e1780 --- /dev/null +++ b/packages/system/src/connection-cache-relay.ts @@ -0,0 +1,55 @@ +import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent } from "./nostr"; +import { CacheRelay } from "./cache-relay"; +import { Connection } from "./connection"; +import { NoteCollection } from "./note-collection"; +import { v4 as uuid } from "uuid"; + +/** + * Use a regular connection as a CacheRelay + */ +export class ConnectionCacheRelay implements CacheRelay { + #eventsSent = new Set(); + + constructor(readonly connection: Connection) {} + + async event(ev: NostrEvent): Promise { + if (this.#eventsSent.has(ev.id)) + return { + ok: true, + id: ev.id, + message: "duplicate", + } as OkResponse; + this.#eventsSent.add(ev.id); + return await this.connection.publish(ev); + } + + query(req: ReqCommand): Promise { + const id = uuid(); + return new Promise((resolve, reject) => { + const results = new NoteCollection(); + const evh = (s: string, e: TaggedNostrEvent) => { + if (s === id) { + results.add(e); + } + }; + const eoh = (s: string) => { + if (s === id) { + resolve(results.snapshot); + this.connection.closeRequest(id); + this.connection.off("event", evh); + this.connection.off("eose", eoh); + this.connection.off("closed", eoh); + } + }; + this.connection.on("event", evh); + this.connection.on("eose", eoh); + this.connection.on("closed", eoh); + this.connection.request(["REQ", id, ...(req.slice(2) as Array)]); + }); + } + + delete(req: ReqCommand): Promise { + // ignored + return Promise.resolve([]); + } +} diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 18cc7df7..e41c764f 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -100,7 +100,7 @@ export class Connection extends EventEmitter implements Co return [...this.#activeRequests]; } - async connect() { + async connect(awaitOpen = false) { // already connected if (this.isOpen || this.isConnecting) return; // wait for re-connect timer @@ -131,24 +131,31 @@ export class Connection extends EventEmitter implements Co // ignored } - const wasReconnect = this.Socket !== null; - if (this.Socket) { - this.id = uuid(); - if (this.isOpen) { - this.Socket.close(); + try { + const wasReconnect = this.Socket !== null; + if (this.Socket) { + this.id = uuid(); + if (this.isOpen) { + this.Socket.close(); + } + this.Socket.onopen = null; + this.Socket.onmessage = null; + this.Socket.onerror = null; + this.Socket.onclose = null; + this.Socket = null; } - this.Socket.onopen = null; - this.Socket.onmessage = null; - this.Socket.onerror = null; - this.Socket.onclose = null; - this.Socket = null; + this.Socket = new WebSocket(this.address); + this.Socket.onopen = () => this.#onOpen(wasReconnect); + this.Socket.onmessage = e => this.#onMessage(e); + this.Socket.onerror = e => this.#onError(e); + this.Socket.onclose = e => this.#onClose(e); + if (awaitOpen) { + await new Promise(resolve => this.once("connected", resolve)); + } + } catch (e) { + this.#connectStarted = false; + throw e; } - this.Socket = new WebSocket(this.address); - this.Socket.onopen = () => this.#onOpen(wasReconnect); - this.Socket.onmessage = e => this.#onMessage(e); - this.Socket.onerror = e => this.#onError(e); - this.Socket.onclose = e => this.#onClose(e); - this.#connectStarted = false; } close() { @@ -158,6 +165,7 @@ export class Connection extends EventEmitter implements Co #onOpen(wasReconnect: boolean) { this.#downCount = 0; + this.#connectStarted = false; this.#log(`Open!`); this.#setupEphemeral(); this.emit("connected", wasReconnect); diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index 1002aa0c..54e0562f 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -28,6 +28,8 @@ export * from "./encrypted"; export * from "./outbox"; export * from "./sync"; export * from "./user-state"; +export * from "./cache-relay"; +export * from "./connection-cache-relay"; export * from "./impl/nip4"; export * from "./impl/nip7";