feat: collect relay metrics

This commit is contained in:
Kieran 2023-11-08 09:41:12 +00:00
parent 8dbbb24729
commit 3326aedc52
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
5 changed files with 125 additions and 35 deletions

View File

@ -36,6 +36,8 @@ export interface MetadataCache extends UserMetadata {
export interface RelayMetrics {
addr: string;
events: number;
connects: number;
lastSeen: number;
disconnects: number;
latency: number[];
}

View File

@ -183,7 +183,11 @@ export class Connection extends EventEmitter {
`Closed (code=${e.code}), trying again in ${(this.ConnectTimeout / 1000).toFixed(0).toLocaleString()} sec`,
);
this.ReconnectTimer = setTimeout(() => {
this.Connect();
try {
this.Connect();
} catch {
this.emit("disconnect", -1);
}
}, this.ConnectTimeout);
this.Stats.Disconnects++;
} else {
@ -191,7 +195,7 @@ export class Connection extends EventEmitter {
this.ReconnectTimer = undefined;
}
this.emit("disconnected", e.code);
this.emit("disconnect", e.code);
this.#reset();
this.notifyChange();
}

View File

@ -147,8 +147,8 @@ export class NostrSystem extends EventEmitter implements SystemInterface {
* Connect to a NOSTR relay if not already connected
*/
async ConnectToRelay(address: string, options: RelaySettings) {
const addr = unwrap(sanitizeRelayUrl(address));
try {
const addr = unwrap(sanitizeRelayUrl(address));
const existing = this.#sockets.get(addr);
if (!existing) {
const c = new Connection(addr, options);
@ -165,10 +165,12 @@ export class NostrSystem extends EventEmitter implements SystemInterface {
}
} catch (e) {
console.error(e);
this.#relayMetrics.onDisconnect(addr, 0);
}
}
#onRelayConnected(c: Connection, wasReconnect: boolean) {
this.#relayMetrics.onConnect(c.Address);
if (wasReconnect) {
for (const [, q] of this.Queries) {
q.connectionRestored(c);
@ -177,7 +179,7 @@ export class NostrSystem extends EventEmitter implements SystemInterface {
}
#onRelayDisconnect(c: Connection, code: number) {
this.#relayMetrics.onDisconnect(c, code);
this.#relayMetrics.onDisconnect(c.Address, code);
for (const [, q] of this.Queries) {
q.connectionLost(c.Id);
}
@ -190,6 +192,7 @@ export class NostrSystem extends EventEmitter implements SystemInterface {
}
#onEvent(sub: string, ev: TaggedNostrEvent) {
this.#relayMetrics.onEvent(ev.relays[0]);
if (!EventExt.isValid(ev)) {
this.#log("Rejecting invalid event %O", ev);
return;
@ -292,6 +295,8 @@ export class NostrSystem extends EventEmitter implements SystemInterface {
const filters = req.build(this);
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen);
q.on("trace", r => this.#relayMetrics.onTraceReport(r));
if (filters.some(a => a.filters.some(b => b.ids))) {
const expectIds = new Set(filters.flatMap(a => a.filters).flatMap(a => a.ids ?? []));
q.feed.onEvent(async evs => {

View File

@ -6,53 +6,62 @@ import { Connection, ReqFilter, Nips, TaggedNostrEvent } from ".";
import { NoteStore } from "./note-collection";
import { BuiltRawReqFilter } from "./request-builder";
import { eventMatchesFilter } from "./request-matcher";
import EventEmitter from "events";
interface QueryTraceEvents {
change: () => void;
close: (id: string) => void;
eose: (id: string, connId: string, wasForced: boolean) => void;
}
export declare interface QueryTrace {
on<U extends keyof QueryTraceEvents>(event: U, listener: QueryTraceEvents[U]): this;
once<U extends keyof QueryTraceEvents>(event: U, listener: QueryTraceEvents[U]): this;
}
/**
* Tracing for relay query status
*/
class QueryTrace {
export class QueryTrace extends EventEmitter {
readonly id: string;
readonly start: number;
sent?: number;
eose?: number;
close?: number;
#wasForceClosed = false;
readonly #fnClose: (id: string) => void;
readonly #fnProgress: () => void;
constructor(
readonly relay: string,
readonly filters: Array<ReqFilter>,
readonly connId: string,
fnClose: (id: string) => void,
fnProgress: () => void,
) {
super();
this.id = uuid();
this.start = unixNowMs();
this.#fnClose = fnClose;
this.#fnProgress = fnProgress;
}
sentToRelay() {
this.sent = unixNowMs();
this.#fnProgress();
this.emit("change");
}
gotEose() {
this.eose = unixNowMs();
this.#fnProgress();
this.emit("change");
this.emit("eose", this.id, this.connId, false);
}
forceEose() {
this.eose = unixNowMs();
this.#wasForceClosed = true;
this.sendClose();
this.emit("eose", this.id, this.connId, true);
}
sendClose() {
this.close = unixNowMs();
this.#fnClose(this.id);
this.#fnProgress();
this.emit("close", this.id);
this.emit("change");
}
/**
@ -101,10 +110,27 @@ export interface QueryBase {
relays?: Array<string>;
}
export interface TraceReport {
id: string;
conn: Connection;
wasForced: boolean;
queued: number;
responseTime: number;
}
interface QueryEvents {
trace: (report: TraceReport) => void;
}
export declare interface Query {
on<U extends keyof QueryEvents>(event: U, listener: QueryEvents[U]): this;
once<U extends keyof QueryEvents>(event: U, listener: QueryEvents[U]): this;
}
/**
* Active or queued query on the system
*/
export class Query implements QueryBase {
export class Query extends EventEmitter implements QueryBase {
/**
* Uniquie ID of this query
*/
@ -143,6 +169,7 @@ export class Query implements QueryBase {
#log = debug("Query");
constructor(id: string, instance: string, feed: NoteStore, leaveOpen?: boolean) {
super();
this.id = id;
this.#feed = feed;
this.fromInstance = instance;
@ -201,17 +228,7 @@ export class Query implements QueryBase {
* Insert a new trace as a placeholder
*/
insertCompletedTrace(subq: BuiltRawReqFilter, data: Readonly<Array<TaggedNostrEvent>>) {
const qt = new QueryTrace(
subq.relay,
subq.filters,
"",
() => {
// nothing to close
},
() => {
// nothing to progress
},
);
const qt = new QueryTrace(subq.relay, subq.filters, "");
qt.sentToRelay();
qt.gotEose();
this.#tracing.push(qt);
@ -307,12 +324,17 @@ export class Query implements QueryBase {
}
#sendQueryInternal(c: Connection, q: BuiltRawReqFilter) {
const qt = new QueryTrace(
c.Address,
q.filters,
c.Id,
x => c.CloseReq(x),
() => this.#onProgress(),
const qt = new QueryTrace(c.Address, q.filters, c.Id);
qt.on("close", x => c.CloseReq(x));
qt.on("change", () => this.#onProgress());
qt.on("eose", (id, connId, forced) =>
this.emit("trace", {
id,
conn: c,
wasForced: forced,
queued: qt.queued,
responseTime: qt.responseTime,
} as TraceReport),
);
this.#tracing.push(qt);
c.QueueReq(["REQ", qt.id, ...qt.filters], () => qt.sentToRelay());

View File

@ -1,13 +1,70 @@
import { FeedCache } from "@snort/shared";
import { FeedCache, unixNowMs } from "@snort/shared";
import { Connection } from "connection";
import { RelayMetrics } from "cache";
import { TraceReport } from "query";
export class RelayMetricHandler {
readonly #cache: FeedCache<RelayMetrics>;
constructor(cache: FeedCache<RelayMetrics>) {
this.#cache = cache;
setInterval(() => {
this.#flush();
}, 10_000);
}
onDisconnect(c: Connection, code: number) {}
async onEvent(addr: string) {
const v = await this.#cache.get(addr);
if (v) {
v.events++;
v.lastSeen = unixNowMs();
}
}
async onConnect(addr: string) {
const v = await this.#cache.get(addr);
if (v) {
v.connects++;
v.lastSeen = unixNowMs();
} else {
await this.#cache.set({
addr: addr,
connects: 1,
disconnects: 0,
events: 0,
lastSeen: unixNowMs(),
latency: [],
});
}
}
async onDisconnect(addr: string, code: number) {
const v = await this.#cache.get(addr);
if (v) {
v.disconnects++;
} else {
await this.#cache.set({
addr: addr,
connects: 0,
disconnects: 1,
events: 0,
lastSeen: unixNowMs(),
latency: [],
});
}
}
onTraceReport(t: TraceReport) {
const v = this.#cache.getFromCache(t.conn.Address);
if (v) {
v.latency.push(t.responseTime);
v.latency = v.latency.slice(-50);
}
}
async #flush() {
const data = this.#cache.snapshot();
await this.#cache.bulkSet(data);
}
}