refactor: extract connection pool

wip: setup system-worker
This commit is contained in:
2023-12-28 17:40:26 +00:00
parent e7e7fdc14d
commit c2e1215667
7 changed files with 455 additions and 169 deletions

View File

@ -1,9 +1,9 @@
import debug from "debug";
import EventEmitter from "eventemitter3";
import { unwrap, sanitizeRelayUrl, FeedCache, removeUndefined } from "@snort/shared";
import { unwrap, FeedCache } from "@snort/shared";
import { NostrEvent, TaggedNostrEvent } from "./nostr";
import { Connection, RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { Query } from "./query";
import { NoteCollection, NoteStore } from "./note-collection";
import { BuiltRawReqFilter, RequestBuilder, RequestStrategy } from "./request-builder";
@ -22,14 +22,25 @@ import {
EventExt,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayCache, RelayMetadataLoader, pickRelaysForReply } from "./outbox-model";
import { RelayCache, RelayMetadataLoader } from "./outbox-model";
import { QueryOptimizer, DefaultQueryOptimizer } from "./query-optimizer";
import { trimFilters } from "./request-trim";
import { NostrConnectionPool } from "./nostr-connection-pool";
interface NostrSystemEvents {
export interface NostrSystemEvents {
change: (state: SystemSnapshot) => void;
auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void;
event: (ev: TaggedNostrEvent) => void;
event: (id: string, ev: TaggedNostrEvent) => void;
}
export interface NostrsystemProps {
relayCache?: FeedCache<UsersRelays>;
profileCache?: FeedCache<MetadataCache>;
relayMetrics?: FeedCache<RelayMetrics>;
eventsCache?: FeedCache<NostrEvent>;
queryOptimizer?: QueryOptimizer;
db?: SnortSystemDb;
checkSigs?: boolean;
}
/**
@ -37,11 +48,7 @@ interface NostrSystemEvents {
*/
export class NostrSystem extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#log = debug("System");
/**
* All currently connected websockets
*/
#sockets = new Map<string, Connection>();
#pool = new NostrConnectionPool();
/**
* All active queries
@ -90,15 +97,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
#relayLoader: RelayMetadataLoader;
constructor(props: {
relayCache?: FeedCache<UsersRelays>;
profileCache?: FeedCache<MetadataCache>;
relayMetrics?: FeedCache<RelayMetrics>;
eventsCache?: FeedCache<NostrEvent>;
queryOptimizer?: QueryOptimizer;
db?: SnortSystemDb;
checkSigs?: boolean;
}) {
constructor(props: NostrsystemProps) {
super();
this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
@ -111,6 +110,67 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache);
this.checkSigs = props.checkSigs ?? true;
this.#cleanup();
// hook connection pool
this.#pool.on("connected", (id, wasReconnect) => {
const c = this.#pool.getConnection(id);
if (c) {
this.#relayMetrics.onConnect(c.Address);
if (wasReconnect) {
for (const [, q] of this.Queries) {
q.connectionRestored(c);
}
}
}
});
this.#pool.on("connectFailed", address => {
this.#relayMetrics.onDisconnect(address, 0);
});
this.#pool.on("event", (_, sub, ev) => {
ev.relays?.length && this.#relayMetrics.onEvent(ev.relays[0]);
if (!EventExt.isValid(ev)) {
this.#log("Rejecting invalid event %O", ev);
return;
}
if (this.checkSigs) {
const id = EventExt.createId(ev);
if (!this.#queryOptimizer.schnorrVerify(id, ev.sig, ev.pubkey)) {
this.#log("Invalid sig %O", ev);
return;
}
}
this.emit("event", sub, ev);
});
this.#pool.on("disconnect", (id, code) => {
const c = this.#pool.getConnection(id);
if (c) {
this.#relayMetrics.onDisconnect(c.Address, code);
for (const [, q] of this.Queries) {
q.connectionLost(c.Id);
}
}
});
this.#pool.on("eose", (id, sub) => {
const c = this.#pool.getConnection(id);
if (c) {
for (const [, v] of this.Queries) {
v.eose(sub, c);
}
}
});
this.#pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb));
this.#pool.on("notice", (addr, msg) => {
this.#log("NOTICE: %s %s", addr, msg);
});
// internal handler for on-event
this.on("event", (sub, ev) => {
for (const [, v] of this.Queries) {
v.handleEvent(sub, ev);
}
});
}
get ProfileLoader() {
@ -118,7 +178,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
get Sockets(): ConnectionStateSnapshot[] {
return [...this.#sockets.values()].map(a => a.takeSnapshot());
return this.#pool.getState();
}
get RelayCache(): RelayCache {
@ -129,9 +189,6 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
return this.#queryOptimizer;
}
/**
* Setup caches
*/
async Init() {
const t = [
this.#relayCache.preload(),
@ -142,109 +199,16 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
await Promise.all(t);
}
/**
* Connect to a NOSTR relay if not already connected
*/
async ConnectToRelay(address: string, options: RelaySettings) {
const addr = unwrap(sanitizeRelayUrl(address));
try {
const existing = this.#sockets.get(addr);
if (!existing) {
const c = new Connection(addr, options);
this.#sockets.set(addr, c);
c.on("event", (s, e) => this.#onEvent(s, e));
c.on("eose", s => this.#onEndOfStoredEvents(c, s));
c.on("disconnect", code => this.#onRelayDisconnect(c, code));
c.on("connected", r => this.#onRelayConnected(c, r));
c.on("auth", (c, r, cb) => this.emit("auth", c, r, cb));
await c.Connect();
} else {
// update settings if already connected
existing.Settings = options;
existing.Ephemeral = false;
}
} catch (e) {
console.error(e);
this.#relayMetrics.onDisconnect(addr, 0);
}
await this.#pool.connect(address, options, false);
}
#onRelayConnected(c: Connection, wasReconnect: boolean) {
this.#relayMetrics.onConnect(c.Address);
if (wasReconnect) {
for (const [, q] of this.Queries) {
q.connectionRestored(c);
}
}
ConnectEphemeralRelay(address: string) {
return this.#pool.connect(address, { read: true, write: true }, true);
}
#onRelayDisconnect(c: Connection, code: number) {
this.#relayMetrics.onDisconnect(c.Address, code);
for (const [, q] of this.Queries) {
q.connectionLost(c.Id);
}
}
#onEndOfStoredEvents(c: Readonly<Connection>, sub: string) {
for (const [, v] of this.Queries) {
v.eose(sub, c);
}
}
#onEvent(sub: string, ev: TaggedNostrEvent) {
ev.relays?.length && this.#relayMetrics.onEvent(ev.relays[0]);
if (!EventExt.isValid(ev)) {
this.#log("Rejecting invalid event %O", ev);
return;
}
if (this.checkSigs) {
const id = EventExt.createId(ev);
if (!this.#queryOptimizer.schnorrVerify(id, ev.sig, ev.pubkey)) {
this.#log("Invalid sig %O", ev);
return;
}
}
this.emit("event", ev);
for (const [, v] of this.Queries) {
v.handleEvent(sub, ev);
}
}
/**
*
* @param address Relay address URL
*/
async ConnectEphemeralRelay(address: string): Promise<Connection | undefined> {
try {
const addr = unwrap(sanitizeRelayUrl(address));
if (!this.#sockets.has(addr)) {
const c = new Connection(addr, { read: true, write: true }, true);
this.#sockets.set(addr, c);
c.on("event", (s, e) => this.#onEvent(s, e));
c.on("eose", s => this.#onEndOfStoredEvents(c, s));
c.on("disconnect", code => this.#onRelayDisconnect(c, code));
c.on("connected", r => this.#onRelayConnected(c, r));
c.on("auth", (c, r, cb) => this.emit("auth", c, r, cb));
await c.Connect();
return c;
}
} catch (e) {
console.error(e);
}
}
/**
* Disconnect from a relay
*/
DisconnectRelay(address: string) {
const c = this.#sockets.get(address);
if (c) {
this.#sockets.delete(address);
c.Close();
}
this.#pool.disconnect(address);
}
GetQuery(id: string): Query | undefined {
@ -354,7 +318,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
if (qSend.relay) {
this.#log("Sending query to %s %O", qSend.relay, qSend);
const s = this.#sockets.get(qSend.relay);
const s = this.#pool.getConnection(qSend.relay);
if (s) {
const qt = q.sendToRelay(s, qSend);
if (qt) {
@ -373,7 +337,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
} else {
const ret = [];
for (const [a, s] of this.#sockets) {
for (const [a, s] of this.#pool) {
if (!s.Ephemeral) {
this.#log("Sending query to %s %O", a, qSend);
const qt = q.sendToRelay(s, qSend);
@ -388,58 +352,16 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
HandleEvent(ev: TaggedNostrEvent) {
this.#onEvent("*", ev);
this.emit("event", "*", ev);
}
/**
* Send events to writable relays
*/
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void) {
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]> {
this.HandleEvent({ ...ev, relays: [] });
const socks = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write);
const replyRelays = await pickRelaysForReply(ev, this);
const oks = await Promise.all([
...socks.map(async s => {
try {
const rsp = await s.SendAsync(ev);
cb?.(rsp);
return rsp;
} catch (e) {
console.error(e);
}
return;
}),
...replyRelays.filter(a => !this.#sockets.has(a)).map(a => this.WriteOnceToRelay(a, ev)),
]);
return removeUndefined(oks);
return await this.#pool.broadcast(this, ev, cb);
}
/**
* Write an event to a relay then disconnect
*/
async WriteOnceToRelay(address: string, ev: NostrEvent): Promise<OkResponse> {
const addrClean = sanitizeRelayUrl(address);
if (!addrClean) {
throw new Error("Invalid relay address");
}
const existing = this.#sockets.get(addrClean);
if (existing) {
return await existing.SendAsync(ev);
} else {
return await new Promise<OkResponse>((resolve, reject) => {
const c = new Connection(address, { write: true, read: true }, true);
const t = setTimeout(reject, 10_000);
c.once("connected", async () => {
clearTimeout(t);
const rsp = await c.SendAsync(ev);
c.Close();
resolve(rsp);
});
c.Connect();
});
}
return await this.#pool.broadcastTo(address, ev);
}
takeSnapshot(): SystemSnapshot {