fix query close bug
This commit is contained in:
parent
8e6a1ecbc2
commit
2b80109e3b
@ -30,10 +30,10 @@ function Queries() {
|
||||
return total;
|
||||
}
|
||||
|
||||
function queryInfo(q: { id: string; filters: Array<ReqFilter>; closing: boolean; subFilters: Array<ReqFilter> }) {
|
||||
function queryInfo(q: { id: string; filters: Array<ReqFilter>; subFilters: Array<ReqFilter> }) {
|
||||
return (
|
||||
<div key={q.id}>
|
||||
{q.closing ? <s>{q.id}</s> : <>{q.id}</>}
|
||||
{q.id}
|
||||
<br />
|
||||
<span onClick={() => copy(JSON.stringify(q.filters))} className="pointer">
|
||||
Filters: {q.filters.length} ({countElements(q.filters)} elements)
|
||||
|
@ -12,6 +12,7 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
|
||||
if (rb) {
|
||||
const q = System.Query<TStore>(type, rb);
|
||||
const release = q.feed.hook(onChanged);
|
||||
q.uncancel();
|
||||
return () => {
|
||||
q.cancel();
|
||||
release();
|
||||
|
@ -122,7 +122,9 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query {
|
||||
const existing = this.Queries.get(req.id);
|
||||
if (existing) {
|
||||
const filters = req.buildDiff(this.#relayCache, existing.filters);
|
||||
const filters = !req.options?.skipDiff
|
||||
? req.buildDiff(this.#relayCache, existing.filters)
|
||||
: req.build(this.#relayCache);
|
||||
if (filters.length === 0 && !!req.options?.skipDiff) {
|
||||
return existing;
|
||||
} else {
|
||||
@ -138,11 +140,7 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
const store = new type();
|
||||
|
||||
const filters = req.build(this.#relayCache);
|
||||
const q = new Query(req.id, store);
|
||||
if (req.options?.leaveOpen) {
|
||||
q.leaveOpen = req.options.leaveOpen;
|
||||
}
|
||||
|
||||
const q = new Query(req.id, store, req.options?.leaveOpen);
|
||||
this.Queries.set(req.id, q);
|
||||
for (const subQ of filters) {
|
||||
this.SendQuery(q, subQ).then(qta =>
|
||||
@ -222,7 +220,6 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
return {
|
||||
id: a.id,
|
||||
filters: a.filters,
|
||||
closing: a.closing,
|
||||
subFilters: [],
|
||||
};
|
||||
}),
|
||||
@ -230,12 +227,12 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
}
|
||||
|
||||
#cleanup() {
|
||||
const now = unixNowMs();
|
||||
let changed = false;
|
||||
for (const [k, v] of this.Queries) {
|
||||
if (v.closingAt && v.closingAt < now) {
|
||||
if (v.canRemove()) {
|
||||
v.sendClose();
|
||||
this.Queries.delete(k);
|
||||
this.#log("Deleted query %s", k);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
@ -120,12 +120,12 @@ export class Query implements QueryBase {
|
||||
/**
|
||||
* Leave the query open until its removed
|
||||
*/
|
||||
leaveOpen = false;
|
||||
#leaveOpen = false;
|
||||
|
||||
/**
|
||||
* Time when this query can be removed
|
||||
*/
|
||||
#cancelTimeout?: number;
|
||||
#cancelAt?: number;
|
||||
|
||||
/**
|
||||
* Timer used to track tracing status
|
||||
@ -140,18 +140,15 @@ export class Query implements QueryBase {
|
||||
#log = debug("Query");
|
||||
#allFilters: Array<ReqFilter> = [];
|
||||
|
||||
constructor(id: string, feed: NoteStore) {
|
||||
constructor(id: string, feed: NoteStore, leaveOpen?: boolean) {
|
||||
this.id = id;
|
||||
this.#feed = feed;
|
||||
this.#leaveOpen = leaveOpen ?? false;
|
||||
this.#checkTraces();
|
||||
}
|
||||
|
||||
get closing() {
|
||||
return this.#cancelTimeout !== undefined;
|
||||
}
|
||||
|
||||
get closingAt() {
|
||||
return this.#cancelTimeout;
|
||||
canRemove() {
|
||||
return this.#cancelAt !== undefined && this.#cancelAt < unixNowMs();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -174,8 +171,15 @@ export class Query implements QueryBase {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function should be called when this Query object and FeedStore is no longer needed
|
||||
*/
|
||||
cancel() {
|
||||
this.#cancelTimeout = unixNowMs() + 5_000;
|
||||
this.#cancelAt = unixNowMs() + 5_000;
|
||||
}
|
||||
|
||||
uncancel() {
|
||||
this.#cancelAt = undefined;
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
@ -203,7 +207,7 @@ export class Query implements QueryBase {
|
||||
eose(sub: string, conn: Readonly<Connection>) {
|
||||
const qt = this.#tracing.find(a => a.id === sub && a.connId === conn.Id);
|
||||
qt?.gotEose();
|
||||
if (!this.leaveOpen) {
|
||||
if (!this.#leaveOpen) {
|
||||
qt?.sendClose();
|
||||
}
|
||||
}
|
||||
@ -272,6 +276,7 @@ export class Query implements QueryBase {
|
||||
c.QueueReq(["REQ", qt.id, ...q.filters], () => qt.sentToRelay());
|
||||
return qt;
|
||||
}
|
||||
|
||||
#reComputeFilters() {
|
||||
console.time("reComputeFilters");
|
||||
this.#allFilters = flatMerge(this.#tracing.flatMap(a => a.filters).flatMap(expandFilter));
|
||||
|
@ -7,7 +7,7 @@ import { distance } from "./Util";
|
||||
*/
|
||||
const DiscriminatorKeys = ["since", "until", "limit", "search"];
|
||||
|
||||
export function canMergeFilters(a: FlatReqFilter, b: FlatReqFilter): boolean {
|
||||
export function canMergeFilters(a: FlatReqFilter | ReqFilter, b: FlatReqFilter | ReqFilter): boolean {
|
||||
const aObj = a as Record<string, string | number | undefined>;
|
||||
const bObj = b as Record<string, string | number | undefined>;
|
||||
for (const key of DiscriminatorKeys) {
|
||||
@ -17,34 +17,21 @@ export function canMergeFilters(a: FlatReqFilter, b: FlatReqFilter): boolean {
|
||||
}
|
||||
}
|
||||
}
|
||||
const keys1 = Object.keys(aObj);
|
||||
const keys2 = Object.keys(bObj);
|
||||
const maxKeys = keys1.length > keys2.length ? keys1 : keys2;
|
||||
|
||||
let distance = 0;
|
||||
for (const key of maxKeys) {
|
||||
if (key in aObj && key in bObj) {
|
||||
if (aObj[key] !== bObj[key]) {
|
||||
distance++;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return distance <= 1;
|
||||
return distance(aObj, bObj) <= 1;
|
||||
}
|
||||
|
||||
export function mergeSimilar(filters: Array<ReqFilter>): Array<ReqFilter> {
|
||||
console.time("mergeSimilar");
|
||||
const ret = [];
|
||||
|
||||
while (filters.length > 0) {
|
||||
const current = filters.shift()!;
|
||||
const fCopy = [...filters];
|
||||
while (fCopy.length > 0) {
|
||||
const current = fCopy.shift()!;
|
||||
const mergeSet = [current];
|
||||
for (let i = 0; i < filters.length; i++) {
|
||||
const f = filters[i];
|
||||
for (let i = 0; i < fCopy.length; i++) {
|
||||
const f = fCopy[i];
|
||||
if (mergeSet.every(v => canMergeFilters(v, f))) {
|
||||
mergeSet.push(filters.splice(i, 1)[0]);
|
||||
mergeSet.push(fCopy.splice(i, 1)[0]);
|
||||
i--;
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,6 @@ export interface SystemSnapshot {
|
||||
id: string;
|
||||
filters: Array<ReqFilter>;
|
||||
subFilters: Array<ReqFilter>;
|
||||
closing: boolean;
|
||||
}>;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user