commit
aad92ec887
@ -30,9 +30,9 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
|
|||||||
};
|
};
|
||||||
const getState = (): StoreSnapshot<TSnapshot> => {
|
const getState = (): StoreSnapshot<TSnapshot> => {
|
||||||
if (rb?.id) {
|
if (rb?.id) {
|
||||||
const feed = System.GetFeed(rb.id);
|
const q = System.GetQuery(rb.id);
|
||||||
if (feed) {
|
if (q) {
|
||||||
return unwrap(feed).snapshot as StoreSnapshot<TSnapshot>;
|
return unwrap(q).feed?.snapshot as StoreSnapshot<TSnapshot>;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return EmptySnapshot as StoreSnapshot<TSnapshot>;
|
return EmptySnapshot as StoreSnapshot<TSnapshot>;
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import { Connection } from "@snort/nostr";
|
import { Connection } from "@snort/nostr";
|
||||||
import { unixNow } from "Util";
|
|
||||||
import { Query } from "./Query";
|
import { Query } from "./Query";
|
||||||
import { getRandomValues } from "crypto";
|
import { getRandomValues } from "crypto";
|
||||||
|
|
||||||
@ -8,15 +7,12 @@ window.crypto.getRandomValues = getRandomValues as any;
|
|||||||
|
|
||||||
describe("query", () => {
|
describe("query", () => {
|
||||||
test("progress", () => {
|
test("progress", () => {
|
||||||
const q = new Query("test", {
|
const q = new Query("test", [
|
||||||
filters: [
|
{
|
||||||
{
|
kinds: [1],
|
||||||
kinds: [1],
|
authors: ["test"],
|
||||||
authors: ["test"],
|
},
|
||||||
},
|
]);
|
||||||
],
|
|
||||||
started: unixNow(),
|
|
||||||
});
|
|
||||||
const opt = {
|
const opt = {
|
||||||
read: true,
|
read: true,
|
||||||
write: true,
|
write: true,
|
||||||
@ -33,29 +29,26 @@ describe("query", () => {
|
|||||||
q.sendToRelay(c3);
|
q.sendToRelay(c3);
|
||||||
|
|
||||||
expect(q.progress).toBe(0);
|
expect(q.progress).toBe(0);
|
||||||
q.eose(q.id, c1.Address);
|
q.eose(q.id, c1);
|
||||||
expect(q.progress).toBe(1 / 3);
|
expect(q.progress).toBe(1 / 3);
|
||||||
q.eose(q.id, c1.Address);
|
q.eose(q.id, c1);
|
||||||
expect(q.progress).toBe(1 / 3);
|
expect(q.progress).toBe(1 / 3);
|
||||||
q.eose(q.id, c2.Address);
|
q.eose(q.id, c2);
|
||||||
expect(q.progress).toBe(2 / 3);
|
expect(q.progress).toBe(2 / 3);
|
||||||
q.eose(q.id, c3.Address);
|
q.eose(q.id, c3);
|
||||||
expect(q.progress).toBe(1);
|
expect(q.progress).toBe(1);
|
||||||
|
|
||||||
const qs = new Query("test-1", {
|
const qs = new Query("test-1", [
|
||||||
filters: [
|
{
|
||||||
{
|
kinds: [1],
|
||||||
kinds: [1],
|
authors: ["test-sub"],
|
||||||
authors: ["test-sub"],
|
},
|
||||||
},
|
]);
|
||||||
],
|
|
||||||
started: unixNow(),
|
|
||||||
});
|
|
||||||
q.subQueries.push(qs);
|
q.subQueries.push(qs);
|
||||||
qs.sendToRelay(c1);
|
qs.sendToRelay(c1);
|
||||||
|
|
||||||
expect(q.progress).toBe(0.5);
|
expect(q.progress).toBe(0.5);
|
||||||
q.eose(qs.id, c1.Address);
|
q.eose(qs.id, c1);
|
||||||
expect(q.progress).toBe(1);
|
expect(q.progress).toBe(1);
|
||||||
qs.sendToRelay(c2);
|
qs.sendToRelay(c2);
|
||||||
// 1 + 0.5 (1/2 sent sub query)
|
// 1 + 0.5 (1/2 sent sub query)
|
||||||
|
@ -1,10 +1,77 @@
|
|||||||
|
import { v4 as uuid } from "uuid";
|
||||||
import { Connection, RawReqFilter, Nips } from "@snort/nostr";
|
import { Connection, RawReqFilter, Nips } from "@snort/nostr";
|
||||||
import { unixNowMs } from "Util";
|
import { unixNowMs } from "Util";
|
||||||
|
import { NoteStore } from "./NoteCollection";
|
||||||
|
/**
|
||||||
|
* Tracing for relay query status
|
||||||
|
*/
|
||||||
|
class QueryTrace {
|
||||||
|
readonly id: string;
|
||||||
|
readonly subId: string;
|
||||||
|
readonly relay: string;
|
||||||
|
readonly connId: string;
|
||||||
|
readonly start: number;
|
||||||
|
sent?: number;
|
||||||
|
eose?: number;
|
||||||
|
close?: number;
|
||||||
|
#wasForceClosed = false;
|
||||||
|
readonly #fnClose: (id: string) => void;
|
||||||
|
|
||||||
export interface QueryRequest {
|
constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void) {
|
||||||
filters: Array<RawReqFilter>;
|
this.id = uuid();
|
||||||
started: number;
|
this.subId = sub;
|
||||||
finished?: number;
|
this.relay = relay;
|
||||||
|
this.connId = connId;
|
||||||
|
this.start = unixNowMs();
|
||||||
|
this.#fnClose = fnClose;
|
||||||
|
}
|
||||||
|
|
||||||
|
sentToRelay() {
|
||||||
|
this.sent = unixNowMs();
|
||||||
|
}
|
||||||
|
|
||||||
|
gotEose() {
|
||||||
|
this.eose = unixNowMs();
|
||||||
|
}
|
||||||
|
|
||||||
|
forceEose() {
|
||||||
|
this.eose = unixNowMs();
|
||||||
|
this.#wasForceClosed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
sendClose() {
|
||||||
|
this.close = unixNowMs();
|
||||||
|
this.#fnClose(this.subId);
|
||||||
|
}
|
||||||
|
|
||||||
|
log() {
|
||||||
|
console.debug(
|
||||||
|
`QT:${this.id}, ${this.relay}, ${this.subId}, finished=${
|
||||||
|
this.finished
|
||||||
|
}, queued=${this.queued.toLocaleString()}ms, runtime=${this.runtime?.toLocaleString()}ms`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time spent in queue
|
||||||
|
*/
|
||||||
|
get queued() {
|
||||||
|
return (this.sent === undefined ? unixNowMs() : this.sent) - this.start;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total query runtime
|
||||||
|
*/
|
||||||
|
get runtime() {
|
||||||
|
return (this.eose === undefined ? unixNowMs() : this.eose) - this.start;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If tracing is finished, we got EOSE or timeout
|
||||||
|
*/
|
||||||
|
get finished() {
|
||||||
|
return this.eose !== undefined;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -19,7 +86,7 @@ export class Query {
|
|||||||
/**
|
/**
|
||||||
* The query payload (REQ filters)
|
* The query payload (REQ filters)
|
||||||
*/
|
*/
|
||||||
request: QueryRequest;
|
filters: Array<RawReqFilter>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sub-Queries which are connected to this subscription
|
* Sub-Queries which are connected to this subscription
|
||||||
@ -29,12 +96,7 @@ export class Query {
|
|||||||
/**
|
/**
|
||||||
* Which relays this query has already been executed on
|
* Which relays this query has already been executed on
|
||||||
*/
|
*/
|
||||||
#sentToRelays: Array<Readonly<Connection>> = [];
|
#tracing: Array<QueryTrace> = [];
|
||||||
|
|
||||||
/**
|
|
||||||
* When each relay returned EOSE
|
|
||||||
*/
|
|
||||||
#eoseRelays: Map<string, number> = new Map();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Leave the query open until its removed
|
* Leave the query open until its removed
|
||||||
@ -51,9 +113,21 @@ export class Query {
|
|||||||
*/
|
*/
|
||||||
#cancelTimeout?: number;
|
#cancelTimeout?: number;
|
||||||
|
|
||||||
constructor(id: string, request: QueryRequest) {
|
/**
|
||||||
|
* Timer used to track tracing status
|
||||||
|
*/
|
||||||
|
#checkTrace?: ReturnType<typeof setInterval>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Feed object which collects events
|
||||||
|
*/
|
||||||
|
#feed?: NoteStore;
|
||||||
|
|
||||||
|
constructor(id: string, filters: Array<RawReqFilter>, feed?: NoteStore) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.request = request;
|
this.filters = filters;
|
||||||
|
this.#feed = feed;
|
||||||
|
this.#checkTraces();
|
||||||
}
|
}
|
||||||
|
|
||||||
get closing() {
|
get closing() {
|
||||||
@ -64,6 +138,10 @@ export class Query {
|
|||||||
return this.#cancelTimeout;
|
return this.#cancelTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get feed() {
|
||||||
|
return this.#feed;
|
||||||
|
}
|
||||||
|
|
||||||
cancel() {
|
cancel() {
|
||||||
this.#cancelTimeout = unixNowMs() + 5_000;
|
this.#cancelTimeout = unixNowMs() + 5_000;
|
||||||
}
|
}
|
||||||
@ -72,6 +150,11 @@ export class Query {
|
|||||||
this.#cancelTimeout = undefined;
|
this.#cancelTimeout = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanup() {
|
||||||
|
console.debug("Cleanup", this.id);
|
||||||
|
this.#stopCheckTraces();
|
||||||
|
}
|
||||||
|
|
||||||
sendToRelay(c: Connection) {
|
sendToRelay(c: Connection) {
|
||||||
if (this.relays.length > 0 && !this.relays.includes(c.Address)) {
|
if (this.relays.length > 0 && !this.relays.includes(c.Address)) {
|
||||||
return;
|
return;
|
||||||
@ -80,31 +163,47 @@ export class Query {
|
|||||||
console.debug("Cant send non-specific REQ to ephemeral connection");
|
console.debug("Cant send non-specific REQ to ephemeral connection");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (this.request.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) {
|
if (this.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) {
|
||||||
console.debug("Cant send REQ to non-search relay", c.Address);
|
console.debug("Cant send REQ to non-search relay", c.Address);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
c.QueueReq(["REQ", this.id, ...this.request.filters]);
|
const qt = new QueryTrace(this.id, c.Address, c.Id, x => c.CloseReq(x));
|
||||||
this.#sentToRelays.push(c);
|
this.#tracing.push(qt);
|
||||||
|
c.QueueReq(["REQ", this.id, ...this.filters], () => qt.sentToRelay());
|
||||||
|
}
|
||||||
|
|
||||||
|
connectionLost(c: Connection, active: Array<string>, pending: Array<string>) {
|
||||||
|
const allQueriesLost = [...active, ...pending].filter(a => this.id === a || this.subQueries.some(b => b.id === a));
|
||||||
|
if (allQueriesLost.length > 0) {
|
||||||
|
console.debug("Lost", allQueriesLost, c.Address, c.Id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sendClose() {
|
sendClose() {
|
||||||
for (const c of this.#sentToRelays) {
|
for (const qt of this.#tracing) {
|
||||||
c.CloseReq(this.id);
|
qt.sendClose();
|
||||||
}
|
}
|
||||||
for (const sq of this.subQueries) {
|
for (const sq of this.subQueries) {
|
||||||
sq.sendClose();
|
sq.sendClose();
|
||||||
}
|
}
|
||||||
|
this.cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
eose(sub: string, relay: string) {
|
eose(sub: string, conn: Readonly<Connection>) {
|
||||||
|
const qt = this.#tracing.filter(a => a.subId === sub && a.connId === conn.Id);
|
||||||
if (sub === this.id) {
|
if (sub === this.id) {
|
||||||
console.debug(`[EOSE][${sub}] ${relay}`);
|
console.debug(`[EOSE][${sub}] ${conn.Address}`);
|
||||||
this.#eoseRelays.set(relay, unixNowMs());
|
qt.forEach(a => a.gotEose());
|
||||||
|
if (this.#feed) {
|
||||||
|
this.#feed.loading = this.progress < 1;
|
||||||
|
}
|
||||||
|
if (!this.leaveOpen) {
|
||||||
|
this.sendClose();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
const subQ = this.subQueries.find(a => a.id === sub);
|
const subQ = this.subQueries.find(a => a.id === sub);
|
||||||
if (subQ) {
|
if (subQ) {
|
||||||
subQ.eose(sub, relay);
|
subQ.eose(sub, conn);
|
||||||
} else {
|
} else {
|
||||||
throw new Error("No query found");
|
throw new Error("No query found");
|
||||||
}
|
}
|
||||||
@ -115,7 +214,7 @@ export class Query {
|
|||||||
* Get the progress to EOSE, can be used to determine when we should load more content
|
* Get the progress to EOSE, can be used to determine when we should load more content
|
||||||
*/
|
*/
|
||||||
get progress() {
|
get progress() {
|
||||||
let thisProgress = this.#eoseRelays.size / this.#sentToRelays.reduce((acc, v) => (acc += v.Down ? 0 : 1), 0);
|
let thisProgress = this.#tracing.reduce((acc, v) => (acc += v.finished ? 1 : 0), 0) / this.#tracing.length;
|
||||||
if (isNaN(thisProgress)) {
|
if (isNaN(thisProgress)) {
|
||||||
thisProgress = 0;
|
thisProgress = 0;
|
||||||
}
|
}
|
||||||
@ -129,4 +228,22 @@ export class Query {
|
|||||||
}
|
}
|
||||||
return totalProgress / (this.subQueries.length + 1);
|
return totalProgress / (this.subQueries.length + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#stopCheckTraces() {
|
||||||
|
if (this.#checkTrace) {
|
||||||
|
clearInterval(this.#checkTrace);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#checkTraces() {
|
||||||
|
this.#stopCheckTraces();
|
||||||
|
this.#checkTrace = setInterval(() => {
|
||||||
|
for (const v of this.#tracing) {
|
||||||
|
//v.log();
|
||||||
|
if (v.runtime > 5_000 && !v.finished) {
|
||||||
|
v.forceEose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 2_000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,11 +50,6 @@ export class NostrSystem {
|
|||||||
*/
|
*/
|
||||||
Queries: Map<string, Query> = new Map();
|
Queries: Map<string, Query> = new Map();
|
||||||
|
|
||||||
/**
|
|
||||||
* Collection of all feeds which are keyed by subscription id
|
|
||||||
*/
|
|
||||||
Feeds: Map<string, NoteStore> = new Map();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handler function for NIP-42
|
* Handler function for NIP-42
|
||||||
*/
|
*/
|
||||||
@ -98,6 +93,7 @@ export class NostrSystem {
|
|||||||
this.Sockets.set(addr, c);
|
this.Sockets.set(addr, c);
|
||||||
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
||||||
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
||||||
|
c.OnDisconnect = (a, p) => this.OnRelayDisconnect(c, a, p);
|
||||||
c.OnConnected = () => {
|
c.OnConnected = () => {
|
||||||
for (const [, q] of this.Queries) {
|
for (const [, q] of this.Queries) {
|
||||||
q.sendToRelay(c);
|
q.sendToRelay(c);
|
||||||
@ -113,37 +109,26 @@ export class NostrSystem {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
OnEndOfStoredEvents(c: Connection, sub: string) {
|
OnRelayDisconnect(c: Connection, active: Array<string>, pending: Array<string>) {
|
||||||
|
for (const [, q] of this.Queries) {
|
||||||
|
q.connectionLost(c, active, pending);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
OnEndOfStoredEvents(c: Readonly<Connection>, sub: string) {
|
||||||
const q = this.GetQuery(sub);
|
const q = this.GetQuery(sub);
|
||||||
if (q) {
|
if (q) {
|
||||||
q.eose(sub, c.Address);
|
q.eose(sub, c);
|
||||||
const f = this.Feeds.get(q.id);
|
|
||||||
if (f) {
|
|
||||||
f.loading = q.progress <= 0.5;
|
|
||||||
console.debug(`${sub} loading=${f.loading}, progress=${q.progress}`);
|
|
||||||
}
|
|
||||||
if (!q.leaveOpen) {
|
|
||||||
c.CloseReq(sub);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
OnEvent(sub: string, ev: TaggedRawEvent) {
|
OnEvent(sub: string, ev: TaggedRawEvent) {
|
||||||
const feed = this.GetFeed(sub);
|
const q = this.GetQuery(sub);
|
||||||
if (feed) {
|
if (q?.feed) {
|
||||||
feed.add(ev);
|
q.feed.add(ev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
GetFeed(sub: string) {
|
|
||||||
const subFilterId = /-\d+$/i;
|
|
||||||
if (sub.match(subFilterId)) {
|
|
||||||
// feed events back into parent query
|
|
||||||
sub = sub.split(subFilterId)[0];
|
|
||||||
}
|
|
||||||
return this.Feeds.get(sub);
|
|
||||||
}
|
|
||||||
|
|
||||||
GetQuery(sub: string) {
|
GetQuery(sub: string) {
|
||||||
const subFilterId = /-\d+$/i;
|
const subFilterId = /-\d+$/i;
|
||||||
if (sub.match(subFilterId)) {
|
if (sub.match(subFilterId)) {
|
||||||
@ -165,6 +150,7 @@ export class NostrSystem {
|
|||||||
this.Sockets.set(addr, c);
|
this.Sockets.set(addr, c);
|
||||||
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
||||||
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
||||||
|
c.OnDisconnect = (a, p) => this.OnRelayDisconnect(c, a, p);
|
||||||
c.OnConnected = () => {
|
c.OnConnected = () => {
|
||||||
for (const [, q] of this.Queries) {
|
for (const [, q] of this.Queries) {
|
||||||
q.sendToRelay(c);
|
q.sendToRelay(c);
|
||||||
@ -221,18 +207,15 @@ export class NostrSystem {
|
|||||||
const q = unwrap(this.Queries.get(req.id));
|
const q = unwrap(this.Queries.get(req.id));
|
||||||
q.unCancel();
|
q.unCancel();
|
||||||
|
|
||||||
const diff = diffFilters(q.request.filters, filters);
|
const diff = diffFilters(q.filters, filters);
|
||||||
if (!diff.changed && !req.options?.skipDiff) {
|
if (!diff.changed && !req.options?.skipDiff) {
|
||||||
this.#changed();
|
this.#changed();
|
||||||
return unwrap(this.Feeds.get(req.id)) as Readonly<T>;
|
return unwrap(q.feed) as Readonly<T>;
|
||||||
} else {
|
} else {
|
||||||
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, {
|
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, filters);
|
||||||
filters: diff.filters,
|
|
||||||
started: unixNowMs(),
|
|
||||||
});
|
|
||||||
q.subQueries.push(subQ);
|
q.subQueries.push(subQ);
|
||||||
q.request.filters = filters;
|
q.filters = filters;
|
||||||
const f = unwrap(this.Feeds.get(req.id));
|
const f = unwrap(q.feed);
|
||||||
f.loading = true;
|
f.loading = true;
|
||||||
this.SendQuery(subQ);
|
this.SendQuery(subQ);
|
||||||
this.#changed();
|
this.#changed();
|
||||||
@ -244,11 +227,8 @@ export class NostrSystem {
|
|||||||
}
|
}
|
||||||
|
|
||||||
AddQuery<T extends NoteStore>(type: { new (): T }, rb: RequestBuilder): T {
|
AddQuery<T extends NoteStore>(type: { new (): T }, rb: RequestBuilder): T {
|
||||||
const q = new Query(rb.id, {
|
const store = new type();
|
||||||
filters: rb.build(),
|
const q = new Query(rb.id, rb.build(), store);
|
||||||
started: unixNowMs(),
|
|
||||||
finished: 0,
|
|
||||||
});
|
|
||||||
if (rb.options?.leaveOpen) {
|
if (rb.options?.leaveOpen) {
|
||||||
q.leaveOpen = rb.options.leaveOpen;
|
q.leaveOpen = rb.options.leaveOpen;
|
||||||
}
|
}
|
||||||
@ -257,8 +237,6 @@ export class NostrSystem {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.Queries.set(rb.id, q);
|
this.Queries.set(rb.id, q);
|
||||||
const store = new type();
|
|
||||||
this.Feeds.set(rb.id, store);
|
|
||||||
this.SendQuery(q);
|
this.SendQuery(q);
|
||||||
this.#changed();
|
this.#changed();
|
||||||
return store;
|
return store;
|
||||||
@ -301,9 +279,9 @@ export class NostrSystem {
|
|||||||
queries: [...this.Queries.values()].map(a => {
|
queries: [...this.Queries.values()].map(a => {
|
||||||
return {
|
return {
|
||||||
id: a.id,
|
id: a.id,
|
||||||
filters: a.request.filters,
|
filters: a.filters,
|
||||||
closing: a.closing,
|
closing: a.closing,
|
||||||
subFilters: a.subQueries.map(a => a.request.filters).flat(),
|
subFilters: a.subQueries.map(a => a.filters).flat(),
|
||||||
};
|
};
|
||||||
}),
|
}),
|
||||||
});
|
});
|
||||||
@ -319,7 +297,6 @@ export class NostrSystem {
|
|||||||
if (v.closingAt && v.closingAt < now) {
|
if (v.closingAt && v.closingAt < now) {
|
||||||
v.sendClose();
|
v.sendClose();
|
||||||
this.Queries.delete(k);
|
this.Queries.delete(k);
|
||||||
this.Feeds.delete(k);
|
|
||||||
console.debug("Removed:", k);
|
console.debug("Removed:", k);
|
||||||
changed = true;
|
changed = true;
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,10 @@ export class Connection {
|
|||||||
Socket: WebSocket | null = null;
|
Socket: WebSocket | null = null;
|
||||||
|
|
||||||
PendingRaw: Array<object> = [];
|
PendingRaw: Array<object> = [];
|
||||||
PendingRequests: Array<ReqCommand> = [];
|
PendingRequests: Array<{
|
||||||
|
cmd: ReqCommand,
|
||||||
|
cb: () => void
|
||||||
|
}> = [];
|
||||||
ActiveRequests: Set<string> = new Set();
|
ActiveRequests: Set<string> = new Set();
|
||||||
|
|
||||||
Settings: RelaySettings;
|
Settings: RelaySettings;
|
||||||
@ -60,6 +63,7 @@ export class Connection {
|
|||||||
OnConnected?: () => void;
|
OnConnected?: () => void;
|
||||||
OnEvent?: (sub: string, e: TaggedRawEvent) => void;
|
OnEvent?: (sub: string, e: TaggedRawEvent) => void;
|
||||||
OnEose?: (sub: string) => void;
|
OnEose?: (sub: string) => void;
|
||||||
|
OnDisconnect?: (active: Array<string>, pending: Array<string>) => void;
|
||||||
Auth?: AuthHandler;
|
Auth?: AuthHandler;
|
||||||
AwaitingAuth: Map<string, boolean>;
|
AwaitingAuth: Map<string, boolean>;
|
||||||
Authed = false;
|
Authed = false;
|
||||||
@ -162,7 +166,12 @@ export class Connection {
|
|||||||
|
|
||||||
OnClose(e: CloseEvent) {
|
OnClose(e: CloseEvent) {
|
||||||
if (!this.IsClosed) {
|
if (!this.IsClosed) {
|
||||||
|
this.OnDisconnect?.([...this.ActiveRequests], this.PendingRequests.map(a => a.cmd[1]))
|
||||||
this.#ResetQueues();
|
this.#ResetQueues();
|
||||||
|
|
||||||
|
// reset connection Id on disconnect, for query-tracking
|
||||||
|
this.Id = uuid();
|
||||||
|
|
||||||
this.ConnectTimeout = this.ConnectTimeout * 2;
|
this.ConnectTimeout = this.ConnectTimeout * 2;
|
||||||
console.log(
|
console.log(
|
||||||
`[${this.Address}] Closed (${e.reason}), trying again in ${(
|
`[${this.Address}] Closed (${e.reason}), trying again in ${(
|
||||||
@ -303,13 +312,16 @@ export class Connection {
|
|||||||
* Queue or send command to the relay
|
* Queue or send command to the relay
|
||||||
* @param cmd The REQ to send to the server
|
* @param cmd The REQ to send to the server
|
||||||
*/
|
*/
|
||||||
QueueReq(cmd: ReqCommand) {
|
QueueReq(cmd: ReqCommand, cbSent: () => void) {
|
||||||
if (this.ActiveRequests.size >= this.#maxSubscriptions) {
|
if (this.ActiveRequests.size >= this.#maxSubscriptions) {
|
||||||
this.PendingRequests.push(cmd);
|
this.PendingRequests.push({
|
||||||
|
cmd, cb: cbSent
|
||||||
|
});
|
||||||
console.debug("Queuing:", this.Address, cmd);
|
console.debug("Queuing:", this.Address, cmd);
|
||||||
} else {
|
} else {
|
||||||
this.ActiveRequests.add(cmd[1]);
|
this.ActiveRequests.add(cmd[1]);
|
||||||
this.#SendJson(cmd);
|
this.#SendJson(cmd);
|
||||||
|
cbSent();
|
||||||
}
|
}
|
||||||
this.#UpdateState();
|
this.#UpdateState();
|
||||||
}
|
}
|
||||||
@ -327,21 +339,18 @@ export class Connection {
|
|||||||
const canSend = this.#maxSubscriptions - this.ActiveRequests.size;
|
const canSend = this.#maxSubscriptions - this.ActiveRequests.size;
|
||||||
if (canSend > 0) {
|
if (canSend > 0) {
|
||||||
for (let x = 0; x < canSend; x++) {
|
for (let x = 0; x < canSend; x++) {
|
||||||
const cmd = this.PendingRequests.shift();
|
const p = this.PendingRequests.shift();
|
||||||
if (cmd) {
|
if (p) {
|
||||||
this.ActiveRequests.add(cmd[1]);
|
this.ActiveRequests.add(p.cmd[1]);
|
||||||
this.#SendJson(cmd);
|
this.#SendJson(p.cmd);
|
||||||
console.debug("Sent pending REQ", this.Address, cmd);
|
p.cb();
|
||||||
|
console.debug("Sent pending REQ", this.Address, p.cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#ResetQueues() {
|
#ResetQueues() {
|
||||||
//send EOSE on disconnect for active subs
|
|
||||||
this.ActiveRequests.forEach((v) => this.OnEose?.(v));
|
|
||||||
this.PendingRequests.forEach((v) => this.OnEose?.(v[1]));
|
|
||||||
|
|
||||||
this.ActiveRequests.clear();
|
this.ActiveRequests.clear();
|
||||||
this.PendingRequests = [];
|
this.PendingRequests = [];
|
||||||
this.PendingRaw = [];
|
this.PendingRaw = [];
|
||||||
@ -360,9 +369,7 @@ export class Connection {
|
|||||||
this.CurrentState.disconnects = this.Stats.Disconnects;
|
this.CurrentState.disconnects = this.Stats.Disconnects;
|
||||||
this.CurrentState.info = this.Info;
|
this.CurrentState.info = this.Info;
|
||||||
this.CurrentState.id = this.Id;
|
this.CurrentState.id = this.Id;
|
||||||
this.CurrentState.pendingRequests = [
|
this.CurrentState.pendingRequests = [...this.PendingRequests.map(a => a.cmd[1])];
|
||||||
...this.PendingRequests.map((a) => a[1]),
|
|
||||||
];
|
|
||||||
this.CurrentState.activeRequests = [...this.ActiveRequests];
|
this.CurrentState.activeRequests = [...this.ActiveRequests];
|
||||||
this.Stats.Latency = this.Stats.Latency.slice(-20); // trim
|
this.Stats.Latency = this.Stats.Latency.slice(-20); // trim
|
||||||
this.HasStateChange = true;
|
this.HasStateChange = true;
|
||||||
@ -380,7 +387,7 @@ export class Connection {
|
|||||||
const authPending = !this.Authed && (this.AwaitingAuth.size > 0 || this.Info?.limitation?.auth_required === true);
|
const authPending = !this.Authed && (this.AwaitingAuth.size > 0 || this.Info?.limitation?.auth_required === true);
|
||||||
if (this.Socket?.readyState !== WebSocket.OPEN || authPending) {
|
if (this.Socket?.readyState !== WebSocket.OPEN || authPending) {
|
||||||
this.PendingRaw.push(obj);
|
this.PendingRaw.push(obj);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.#sendPendingRaw();
|
this.#sendPendingRaw();
|
||||||
@ -402,6 +409,7 @@ export class Connection {
|
|||||||
}
|
}
|
||||||
const json = JSON.stringify(obj);
|
const json = JSON.stringify(obj);
|
||||||
this.Socket.send(json);
|
this.Socket.send(json);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
async _OnAuthAsync(challenge: string): Promise<void> {
|
async _OnAuthAsync(challenge: string): Promise<void> {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user