sub-query via query trace
This commit is contained in:
@ -16,18 +16,21 @@ class QueryTrace {
|
||||
close?: number;
|
||||
#wasForceClosed = false;
|
||||
readonly #fnClose: (id: string) => void;
|
||||
readonly #fnProgress: () => void;
|
||||
|
||||
constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void) {
|
||||
constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void, fnProgress: () => void) {
|
||||
this.id = uuid();
|
||||
this.subId = sub;
|
||||
this.relay = relay;
|
||||
this.connId = connId;
|
||||
this.start = unixNowMs();
|
||||
this.#fnClose = fnClose;
|
||||
this.#fnProgress = fnProgress;
|
||||
}
|
||||
|
||||
sentToRelay() {
|
||||
this.sent = unixNowMs();
|
||||
this.#fnProgress();
|
||||
}
|
||||
|
||||
gotEose() {
|
||||
@ -35,23 +38,28 @@ class QueryTrace {
|
||||
if (this.responseTime > 5_000) {
|
||||
console.debug(`Slow query ${this.subId} on ${this.relay} took ${this.responseTime.toLocaleString()}ms`);
|
||||
}
|
||||
this.#fnProgress();
|
||||
console.debug(`[EOSE][${this.subId}] ${this.relay}`);
|
||||
}
|
||||
|
||||
forceEose() {
|
||||
this.eose = unixNowMs();
|
||||
this.#wasForceClosed = true;
|
||||
this.#fnProgress();
|
||||
console.debug(`[F-EOSE][${this.subId}] ${this.relay}`);
|
||||
}
|
||||
|
||||
sendClose() {
|
||||
this.close = unixNowMs();
|
||||
this.#fnClose(this.subId);
|
||||
this.#fnProgress();
|
||||
}
|
||||
|
||||
log() {
|
||||
console.debug(
|
||||
`QT:${this.id}, ${this.relay}, ${this.subId}, finished=${
|
||||
`QT:${this.id}, ${this.subId}, finished=${
|
||||
this.finished
|
||||
}, queued=${this.queued.toLocaleString()}ms, runtime=${this.runtime?.toLocaleString()}ms`
|
||||
}, queued=${this.queued.toLocaleString()}ms, runtime=${this.runtime?.toLocaleString()}ms, ${this.relay}`
|
||||
);
|
||||
}
|
||||
|
||||
@ -59,7 +67,7 @@ class QueryTrace {
|
||||
* Time spent in queue
|
||||
*/
|
||||
get queued() {
|
||||
return (this.sent === undefined ? unixNowMs() : this.sent) - this.start;
|
||||
return (this.sent === undefined ? unixNowMs() : this.#wasForceClosed ? unwrap(this.eose) : this.sent) - this.start;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,10 +92,7 @@ class QueryTrace {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Active or queued query on the system
|
||||
*/
|
||||
export class Query {
|
||||
export interface QueryBase {
|
||||
/**
|
||||
* Uniquie ID of this query
|
||||
*/
|
||||
@ -99,9 +104,18 @@ export class Query {
|
||||
filters: Array<RawReqFilter>;
|
||||
|
||||
/**
|
||||
* Sub-Queries which are connected to this subscription
|
||||
* List of relays to send this query to
|
||||
*/
|
||||
subQueries: Array<Query> = [];
|
||||
relays?: Array<string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Active or queued query on the system
|
||||
*/
|
||||
export class Query implements QueryBase {
|
||||
id: string;
|
||||
filters: Array<RawReqFilter>;
|
||||
relays?: Array<string>;
|
||||
|
||||
/**
|
||||
* Which relays this query has already been executed on
|
||||
@ -113,11 +127,6 @@ export class Query {
|
||||
*/
|
||||
leaveOpen = false;
|
||||
|
||||
/**
|
||||
* List of relays to send this query to
|
||||
*/
|
||||
relays: Array<string> = [];
|
||||
|
||||
/**
|
||||
* Time when this query can be removed
|
||||
*/
|
||||
@ -133,6 +142,8 @@ export class Query {
|
||||
*/
|
||||
#feed: NoteStore;
|
||||
|
||||
subQueryCounter = 0;
|
||||
|
||||
constructor(id: string, filters: Array<RawReqFilter>, feed: NoteStore) {
|
||||
this.id = id;
|
||||
this.filters = filters;
|
||||
@ -165,35 +176,36 @@ export class Query {
|
||||
}
|
||||
|
||||
sendToRelay(c: Connection) {
|
||||
if (this.relays.length > 0 && !this.relays.includes(c.Address)) {
|
||||
if (!this.#canSendQuery(c, this)) {
|
||||
return;
|
||||
}
|
||||
if (this.relays.length === 0 && c.Ephemeral) {
|
||||
console.debug("Cant send non-specific REQ to ephemeral connection");
|
||||
this.#sendQueryInternal(c, this);
|
||||
}
|
||||
|
||||
sendSubQueryToRelay(c: Connection, subq: QueryBase) {
|
||||
if (!this.#canSendQuery(c, subq)) {
|
||||
return;
|
||||
}
|
||||
if (this.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) {
|
||||
console.debug("Cant send REQ to non-search relay", c.Address);
|
||||
return;
|
||||
}
|
||||
const qt = new QueryTrace(this.id, c.Address, c.Id, x => c.CloseReq(x));
|
||||
this.#tracing.push(qt);
|
||||
c.QueueReq(["REQ", this.id, ...this.filters], () => qt.sentToRelay());
|
||||
this.#sendQueryInternal(c, subq);
|
||||
}
|
||||
|
||||
connectionLost(c: Connection, active: Array<string>, pending: Array<string>) {
|
||||
const allQueriesLost = [...active, ...pending].filter(a => this.id === a || this.subQueries.some(b => b.id === a));
|
||||
const allQueriesLost = [...active, ...pending].filter(a => this.id === a || this.#tracing.some(b => b.subId === a));
|
||||
if (allQueriesLost.length > 0) {
|
||||
console.debug("Lost", allQueriesLost, c.Address, c.Id);
|
||||
}
|
||||
for (const qLost of allQueriesLost) {
|
||||
const qt = this.#tracing.find(a => a.subId === qLost && a.connId == c.Id);
|
||||
qt?.forceEose();
|
||||
}
|
||||
}
|
||||
|
||||
sendClose() {
|
||||
for (const qt of this.#tracing) {
|
||||
qt.sendClose();
|
||||
}
|
||||
for (const sq of this.subQueries) {
|
||||
sq.sendClose();
|
||||
for (const qt of this.#tracing) {
|
||||
qt.sendClose();
|
||||
}
|
||||
this.cleanup();
|
||||
}
|
||||
@ -202,15 +214,9 @@ export class Query {
|
||||
const qt = this.#tracing.find(a => a.subId === sub && a.connId === conn.Id);
|
||||
qt?.gotEose();
|
||||
if (sub === this.id) {
|
||||
console.debug(`[EOSE][${sub}] ${conn.Address}`);
|
||||
if (!this.leaveOpen) {
|
||||
qt?.sendClose();
|
||||
}
|
||||
} else {
|
||||
const subQ = this.subQueries.find(a => a.id === sub);
|
||||
if (subQ) {
|
||||
subQ.eose(sub, conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -218,19 +224,19 @@ export class Query {
|
||||
* Get the progress to EOSE, can be used to determine when we should load more content
|
||||
*/
|
||||
get progress() {
|
||||
let thisProgress = this.#tracing.reduce((acc, v) => (acc += v.finished ? 1 : 0), 0) / this.#tracing.length;
|
||||
const thisProgress = this.#tracing.reduce((acc, v) => (acc += v.finished ? 1 : 0), 0) / this.#tracing.length;
|
||||
if (isNaN(thisProgress)) {
|
||||
thisProgress = 0;
|
||||
}
|
||||
if (this.subQueries.length === 0) {
|
||||
return thisProgress;
|
||||
return 0;
|
||||
}
|
||||
return thisProgress;
|
||||
}
|
||||
|
||||
let totalProgress = thisProgress;
|
||||
for (const sq of this.subQueries) {
|
||||
totalProgress += sq.progress;
|
||||
#onProgress() {
|
||||
const isFinished = this.progress === 1;
|
||||
if (this.feed.loading !== isFinished) {
|
||||
console.debug(`[QT] ${this.id}, loading=${this.feed.loading}, progress=${this.progress}`);
|
||||
this.feed.loading = isFinished;
|
||||
}
|
||||
return totalProgress / (this.subQueries.length + 1);
|
||||
}
|
||||
|
||||
#stopCheckTraces() {
|
||||
@ -243,11 +249,37 @@ export class Query {
|
||||
this.#stopCheckTraces();
|
||||
this.#checkTrace = setInterval(() => {
|
||||
for (const v of this.#tracing) {
|
||||
//v.log();
|
||||
if (v.runtime > 5_000 && !v.finished) {
|
||||
v.forceEose();
|
||||
}
|
||||
}
|
||||
}, 2_000);
|
||||
}, 500);
|
||||
}
|
||||
|
||||
#canSendQuery(c: Connection, q: QueryBase) {
|
||||
if (q.relays && !q.relays.includes(c.Address)) {
|
||||
return false;
|
||||
}
|
||||
if ((q.relays?.length ?? 0) === 0 && c.Ephemeral) {
|
||||
console.debug("Cant send non-specific REQ to ephemeral connection");
|
||||
return false;
|
||||
}
|
||||
if (q.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) {
|
||||
console.debug("Cant send REQ to non-search relay", c.Address);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
#sendQueryInternal(c: Connection, q: QueryBase) {
|
||||
const qt = new QueryTrace(
|
||||
q.id,
|
||||
c.Address,
|
||||
c.Id,
|
||||
x => c.CloseReq(x),
|
||||
() => this.#onProgress()
|
||||
);
|
||||
this.#tracing.push(qt);
|
||||
c.QueueReq(["REQ", q.id, ...q.filters], () => qt.sentToRelay());
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user