feat: use localhost relay over worker-relay
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
kieran 2024-06-18 10:49:29 +01:00
parent 44435db21d
commit 37000ecc7a
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
9 changed files with 130 additions and 38 deletions

View File

@ -1,14 +1,13 @@
import { CachedTable, CacheEvents } from "@snort/shared";
import { NostrEvent } from "@snort/system";
import { WorkerRelayInterface } from "@snort/worker-relay";
import { CacheRelay, NostrEvent } from "@snort/system";
import { EventEmitter } from "eventemitter3";
export class EventCacheWorker extends EventEmitter<CacheEvents> implements CachedTable<NostrEvent> {
#relay: WorkerRelayInterface;
#relay: CacheRelay;
#keys = new Set<string>();
#cache = new Map<string, NostrEvent>();
constructor(relay: WorkerRelayInterface) {
constructor(relay: CacheRelay) {
super();
this.#relay = relay;
}

View File

@ -1,16 +1,15 @@
import { CachedTable, CacheEvents, removeUndefined, unixNowMs, unwrap } from "@snort/shared";
import { CachedMetadata, mapEventToProfile, NostrEvent } from "@snort/system";
import { WorkerRelayInterface } from "@snort/worker-relay";
import { CachedMetadata, CacheRelay, mapEventToProfile, NostrEvent } from "@snort/system";
import debug from "debug";
import { EventEmitter } from "eventemitter3";
export class ProfileCacheRelayWorker extends EventEmitter<CacheEvents> implements CachedTable<CachedMetadata> {
#relay: WorkerRelayInterface;
#relay: CacheRelay;
#keys = new Set<string>();
#cache = new Map<string, CachedMetadata>();
#log = debug("ProfileCacheRelayWorker");
constructor(relay: WorkerRelayInterface) {
constructor(relay: CacheRelay) {
super();
this.#relay = relay;
}

View File

@ -1,16 +1,15 @@
import { CachedTable, CacheEvents, removeUndefined, unixNowMs, unwrap } from "@snort/shared";
import { EventKind, NostrEvent, UsersFollows } from "@snort/system";
import { WorkerRelayInterface } from "@snort/worker-relay";
import { CacheRelay, EventKind, NostrEvent, UsersFollows } from "@snort/system";
import debug from "debug";
import { EventEmitter } from "eventemitter3";
export class UserFollowsWorker extends EventEmitter<CacheEvents> implements CachedTable<UsersFollows> {
#relay: WorkerRelayInterface;
#relay: CacheRelay;
#keys = new Set<string>();
#cache = new Map<string, UsersFollows>();
#log = debug("UserFollowsWorker");
constructor(relay: WorkerRelayInterface) {
constructor(relay: CacheRelay) {
super();
this.#relay = relay;
}

View File

@ -1,4 +1,4 @@
import { RelayMetricCache, UserRelaysCache } from "@snort/system";
import { CacheRelay, Connection, ConnectionCacheRelay, RelayMetricCache, UserRelaysCache } from "@snort/system";
import { SnortSystemDb } from "@snort/system-web";
import { WorkerRelayInterface } from "@snort/worker-relay";
import WorkerVite from "@snort/worker-relay/src/worker?worker";
@ -8,13 +8,41 @@ import { GiftWrapCache } from "./GiftWrapCache";
import { ProfileCacheRelayWorker } from "./ProfileWorkerCache";
import { UserFollowsWorker } from "./UserFollowsWorker";
export const Relay = new WorkerRelayInterface(
const cacheRelay = localStorage.getItem("cache-relay");
const workerRelay = new WorkerRelayInterface(
import.meta.env.DEV ? new URL("@snort/worker-relay/dist/esm/worker.mjs", import.meta.url) : new WorkerVite(),
);
export async function initRelayWorker() {
export const Relay: CacheRelay = cacheRelay
? new ConnectionCacheRelay(new Connection(cacheRelay, { read: true, write: true }))
: workerRelay;
async function tryUseCacheRelay(url: string) {
try {
await Relay.debug("");
await Relay.init({
const conn = new Connection(url, { read: true, write: true });
await conn.connect(true);
localStorage.setItem("cache-relay", url);
return conn;
} catch (e) {
console.error(e);
}
}
export async function initRelayWorker() {
if (!cacheRelay) {
let conn = await tryUseCacheRelay("ws://localhost:4869");
if (!conn) {
conn = await tryUseCacheRelay("ws://umbrel:4848");
}
if (conn) return;
} else if (Relay instanceof ConnectionCacheRelay) {
await Relay.connection.connect();
}
try {
await workerRelay.debug("");
await workerRelay.init({
databasePath: "relay.db",
insertBatchSize: 100,
});

View File

@ -6,6 +6,7 @@ import { useNavigate } from "react-router-dom";
import { GiftsCache, Relay, RelayMetrics } from "@/Cache";
import AsyncButton from "@/Components/Button/AsyncButton";
import useLogin from "@/Hooks/useLogin";
import { WorkerRelayInterface } from "@snort/worker-relay";
export function CacheSettings() {
return (
@ -56,9 +57,11 @@ function RelayCacheStats() {
const navigate = useNavigate();
useEffect(() => {
Relay.summary().then(setCounts);
if (login.publicKey) {
Relay.count(["REQ", "my", { authors: [login.publicKey] }]).then(setMyEvents);
if (Relay instanceof WorkerRelayInterface) {
Relay.summary().then(setCounts);
if (login.publicKey) {
Relay.count(["REQ", "my", { authors: [login.publicKey] }]).then(setMyEvents);
}
}
}, []);

View File

@ -30,7 +30,6 @@ System.on("auth", async (c, r, cb) => {
});
System.on("event", (_, ev) => {
Relay.event(ev);
EventsCache.discover(ev);
UserCache.discover(ev);
addEventToFuzzySearch(ev);

View File

@ -0,0 +1,55 @@
import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent } from "./nostr";
import { CacheRelay } from "./cache-relay";
import { Connection } from "./connection";
import { NoteCollection } from "./note-collection";
import { v4 as uuid } from "uuid";
/**
* Use a regular connection as a CacheRelay
*/
export class ConnectionCacheRelay implements CacheRelay {
#eventsSent = new Set<string>();
constructor(readonly connection: Connection) {}
async event(ev: NostrEvent): Promise<OkResponse> {
if (this.#eventsSent.has(ev.id))
return {
ok: true,
id: ev.id,
message: "duplicate",
} as OkResponse;
this.#eventsSent.add(ev.id);
return await this.connection.publish(ev);
}
query(req: ReqCommand): Promise<NostrEvent[]> {
const id = uuid();
return new Promise((resolve, reject) => {
const results = new NoteCollection();
const evh = (s: string, e: TaggedNostrEvent) => {
if (s === id) {
results.add(e);
}
};
const eoh = (s: string) => {
if (s === id) {
resolve(results.snapshot);
this.connection.closeRequest(id);
this.connection.off("event", evh);
this.connection.off("eose", eoh);
this.connection.off("closed", eoh);
}
};
this.connection.on("event", evh);
this.connection.on("eose", eoh);
this.connection.on("closed", eoh);
this.connection.request(["REQ", id, ...(req.slice(2) as Array<ReqFilter>)]);
});
}
delete(req: ReqCommand): Promise<string[]> {
// ignored
return Promise.resolve([]);
}
}

View File

@ -100,7 +100,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
return [...this.#activeRequests];
}
async connect() {
async connect(awaitOpen = false) {
// already connected
if (this.isOpen || this.isConnecting) return;
// wait for re-connect timer
@ -131,24 +131,31 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
// ignored
}
const wasReconnect = this.Socket !== null;
if (this.Socket) {
this.id = uuid();
if (this.isOpen) {
this.Socket.close();
try {
const wasReconnect = this.Socket !== null;
if (this.Socket) {
this.id = uuid();
if (this.isOpen) {
this.Socket.close();
}
this.Socket.onopen = null;
this.Socket.onmessage = null;
this.Socket.onerror = null;
this.Socket.onclose = null;
this.Socket = null;
}
this.Socket.onopen = null;
this.Socket.onmessage = null;
this.Socket.onerror = null;
this.Socket.onclose = null;
this.Socket = null;
this.Socket = new WebSocket(this.address);
this.Socket.onopen = () => this.#onOpen(wasReconnect);
this.Socket.onmessage = e => this.#onMessage(e);
this.Socket.onerror = e => this.#onError(e);
this.Socket.onclose = e => this.#onClose(e);
if (awaitOpen) {
await new Promise(resolve => this.once("connected", resolve));
}
} catch (e) {
this.#connectStarted = false;
throw e;
}
this.Socket = new WebSocket(this.address);
this.Socket.onopen = () => this.#onOpen(wasReconnect);
this.Socket.onmessage = e => this.#onMessage(e);
this.Socket.onerror = e => this.#onError(e);
this.Socket.onclose = e => this.#onClose(e);
this.#connectStarted = false;
}
close() {
@ -158,6 +165,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
#onOpen(wasReconnect: boolean) {
this.#downCount = 0;
this.#connectStarted = false;
this.#log(`Open!`);
this.#setupEphemeral();
this.emit("connected", wasReconnect);

View File

@ -28,6 +28,8 @@ export * from "./encrypted";
export * from "./outbox";
export * from "./sync";
export * from "./user-state";
export * from "./cache-relay";
export * from "./connection-cache-relay";
export * from "./impl/nip4";
export * from "./impl/nip7";