This commit is contained in:
Kieran 2023-05-29 22:25:40 +01:00
parent 9a33466c7c
commit baec8d6904
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
8 changed files with 255 additions and 84 deletions

View File

@ -9,16 +9,7 @@ window.crypto.getRandomValues = getRandomValues as any;
describe("query", () => {
test("progress", () => {
const q = new Query(
"test",
[
{
kinds: [1],
authors: ["test"],
},
],
new FlatNoteStore()
);
const q = new Query("test", new FlatNoteStore());
const opt = {
read: true,
write: true,
@ -30,7 +21,15 @@ describe("query", () => {
const c3 = new Connection("wss://three.com", opt);
c3.Down = false;
q.sendToRelay(c1);
q.sendToRelay(c1, {
id: "test",
filters: [
{
kinds: [1],
authors: ["test"],
},
],
});
q.sendToRelay(c2);
q.sendToRelay(c3);
@ -53,12 +52,12 @@ describe("query", () => {
},
],
} as QueryBase;
q.sendSubQueryToRelay(c1, qs);
q.sendToRelay(c1, qs);
expect(q.progress).toBe(3 / 4);
q.eose(qs.id, c1);
expect(q.progress).toBe(1);
q.sendSubQueryToRelay(c2, qs);
q.sendToRelay(c2, qs);
expect(q.progress).toBe(4 / 5);
});
});

View File

@ -3,15 +3,13 @@ import debug from "debug";
import { Connection, RawReqFilter, Nips } from "@snort/nostr";
import { unixNowMs, unwrap } from "SnortUtils";
import { NoteStore } from "./NoteCollection";
import { mergeSimilar } from "./RequestMerger";
/**
* Tracing for relay query status
*/
class QueryTrace {
readonly id: string;
readonly subId: string;
readonly relay: string;
readonly connId: string;
readonly start: number;
sent?: number;
eose?: number;
@ -20,11 +18,15 @@ class QueryTrace {
readonly #fnClose: (id: string) => void;
readonly #fnProgress: () => void;
constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void, fnProgress: () => void) {
constructor(
readonly subId: string,
readonly relay: string,
readonly filters: Array<RawReqFilter>,
readonly connId: string,
fnClose: (id: string) => void,
fnProgress: () => void
) {
this.id = uuid();
this.subId = sub;
this.relay = relay;
this.connId = connId;
this.start = unixNowMs();
this.#fnClose = fnClose;
this.#fnProgress = fnProgress;
@ -102,17 +104,12 @@ export interface QueryBase {
/**
* Active or queued query on the system
*/
export class Query {
export class Query implements QueryBase {
/**
* Uniquie ID of this query
*/
id: string;
/**
* A merged set of all filters send to relays for this query
*/
filters: Array<RawReqFilter> = [];
/**
* Which relays this query has already been executed on
*/
@ -159,6 +156,11 @@ export class Query {
return this.#feed;
}
get filters() {
const filters = this.#tracing.flatMap(a => a.filters);
return mergeSimilar(filters);
}
cancel() {
this.#cancelTimeout = unixNowMs() + 5_000;
}
@ -171,11 +173,11 @@ export class Query {
this.#stopCheckTraces();
}
sendToRelay(c: Connection, subq: QueryBase) {
if (!this.#canSendQuery(c, subq)) {
sendToRelay(c: Connection, subq?: QueryBase) {
if (!this.#canSendQuery(c, subq ?? this)) {
return;
}
this.#sendQueryInternal(c, subq);
this.#sendQueryInternal(c, subq ?? this);
}
connectionLost(id: string) {
@ -252,6 +254,7 @@ export class Query {
const qt = new QueryTrace(
q.id,
c.Address,
q.filters,
c.Id,
x => c.CloseReq(x),
() => this.#onProgress()

View File

@ -1,29 +1,39 @@
import { RelayCache } from "./GossipModel";
import { RequestBuilder } from "./RequestBuilder";
import { RequestBuilder, RequestStrategy } from "./RequestBuilder";
import { describe, expect } from "@jest/globals";
describe("RequestBuilder", () => {
const relayCache = {
get: () => {
return undefined;
},
} as RelayCache;
const DummyCache = {
get: (pk?: string) => {
if (!pk) return undefined;
return [
{
url: `wss://${pk}.com/`,
settings: {
read: true,
write: true,
},
},
];
},
} as RelayCache;
describe("RequestBuilder", () => {
describe("basic", () => {
test("empty filter", () => {
const b = new RequestBuilder("test");
b.withFilter();
expect(b.build(relayCache)).toEqual([{}]);
expect(b.buildRaw()).toEqual([{}]);
});
test("only kind", () => {
const b = new RequestBuilder("test");
b.withFilter().kinds([0]);
expect(b.build(relayCache)).toEqual([{ kinds: [0] }]);
expect(b.buildRaw()).toMatchObject([{ kinds: [0] }]);
});
test("empty authors", () => {
const b = new RequestBuilder("test");
b.withFilter().authors([]);
expect(b.build(relayCache)).toEqual([{ authors: [] }]);
expect(b.buildRaw()).toMatchObject([{ authors: [] }]);
});
test("authors/kinds/ids", () => {
const authors = ["a1", "a2"];
@ -31,7 +41,7 @@ describe("RequestBuilder", () => {
const ids = ["id1", "id2", "id3"];
const b = new RequestBuilder("test");
b.withFilter().authors(authors).kinds(kinds).ids(ids);
expect(b.build(relayCache)).toEqual([{ ids, authors, kinds }]);
expect(b.buildRaw()).toMatchObject([{ ids, authors, kinds }]);
});
test("authors and kinds, duplicates removed", () => {
const authors = ["a1", "a2"];
@ -39,12 +49,12 @@ describe("RequestBuilder", () => {
const ids = ["id1", "id2", "id3"];
const b = new RequestBuilder("test");
b.withFilter().ids(ids).authors(authors).kinds(kinds).ids(ids).authors(authors).kinds(kinds);
expect(b.build(relayCache)).toEqual([{ ids, authors, kinds }]);
expect(b.buildRaw()).toMatchObject([{ ids, authors, kinds }]);
});
test("search", () => {
const b = new RequestBuilder("test");
b.withFilter().kinds([1]).search("test-search");
expect(b.build(relayCache)).toEqual([{ kinds: [1], search: "test-search" }]);
expect(b.buildRaw()).toMatchObject([{ kinds: [1], search: "test-search" }]);
});
test("timeline", () => {
const authors = ["a1", "a2"];
@ -53,7 +63,7 @@ describe("RequestBuilder", () => {
const since = 5;
const b = new RequestBuilder("test");
b.withFilter().kinds(kinds).authors(authors).since(since).until(until);
expect(b.build(relayCache)).toEqual([{ kinds, authors, until, since }]);
expect(b.buildRaw()).toMatchObject([{ kinds, authors, until, since }]);
});
test("multi-filter timeline", () => {
const authors = ["a1", "a2"];
@ -63,10 +73,94 @@ describe("RequestBuilder", () => {
const b = new RequestBuilder("test");
b.withFilter().kinds(kinds).authors(authors).since(since).until(until);
b.withFilter().kinds(kinds).authors(authors).since(since).until(until);
expect(b.build(relayCache)).toEqual([
expect(b.buildRaw()).toMatchObject([
{ kinds, authors, until, since },
{ kinds, authors, until, since },
]);
});
});
describe("diff basic", () => {
const rb = new RequestBuilder("test");
const f0 = rb.withFilter();
const a = rb.buildRaw();
f0.authors(["a"]);
expect(a).toEqual([{}]);
const b = rb.buildDiff(DummyCache, {
filters: a,
id: "test",
});
expect(b).toMatchObject([
{
filters: [{ authors: ["a"] }],
},
]);
});
describe("build gossip simply", () => {
const rb = new RequestBuilder("test");
rb.withFilter().authors(["a", "b"]).kinds([0]);
const a = rb.build(DummyCache);
expect(a).toEqual([
{
strategy: RequestStrategy.AuthorsRelays,
relay: "wss://a.com/",
filters: [
{
kinds: [0],
authors: ["a"],
},
],
},
{
strategy: RequestStrategy.AuthorsRelays,
relay: "wss://b.com/",
filters: [
{
kinds: [0],
authors: ["b"],
},
],
},
]);
});
describe("build gossip merged similar filters", () => {
const rb = new RequestBuilder("test");
rb.withFilter().authors(["a", "b"]).kinds([0]);
rb.withFilter().authors(["a", "b"]).kinds([10002]);
rb.withFilter().authors(["a"]).limit(10).kinds([4]);
const a = rb.build(DummyCache);
expect(a).toEqual([
{
strategy: RequestStrategy.AuthorsRelays,
relay: "wss://a.com/",
filters: [
{
kinds: [0, 10002],
authors: ["a"],
},
{
kinds: [4],
authors: ["a"],
limit: 10,
},
],
},
{
strategy: RequestStrategy.AuthorsRelays,
relay: "wss://b.com/",
filters: [
{
kinds: [0, 10002],
authors: ["b"],
},
],
},
]);
});
});

View File

@ -2,7 +2,7 @@ import { RawReqFilter, u256, HexKey, EventKind } from "@snort/nostr";
import { appendDedupe, dedupe } from "SnortUtils";
import { QueryBase } from "./Query";
import { diffFilters } from "./RequestSplitter";
import { RelayCache, splitByWriteRelays } from "./GossipModel";
import { RelayCache, splitAllByWriteRelays, splitByWriteRelays } from "./GossipModel";
import { mergeSimilar } from "./RequestMerger";
/**
@ -30,7 +30,7 @@ export enum RequestStrategy {
* A built REQ filter ready for sending to System
*/
export interface BuiltRawReqFilter {
filter: RawReqFilter;
filters: Array<RawReqFilter>;
relay: string;
strategy: RequestStrategy;
}
@ -97,6 +97,14 @@ export class RequestBuilder {
const next = this.buildRaw();
const diff = diffFilters(q.filters, next);
if (diff.changed) {
console.debug("DIFF", q.filters, next, diff);
return splitAllByWriteRelays(relays, diff.filters).map(a => {
return {
strategy: RequestStrategy.AuthorsRelays,
filters: a.filters,
relay: a.relay,
};
});
}
return [];
}
@ -118,9 +126,9 @@ export class RequestBuilder {
}, new Map<string, Array<BuiltRawReqFilter>>());
const filtersSquashed = [...relayMerged.values()].flatMap(a => {
return mergeSimilar(a.map(b => b.filter)).map(b => {
return mergeSimilar(a.flatMap(b => b.filters)).map(b => {
return {
filter: b,
filters: [b],
relay: a[0].relay,
strategy: a[0].strategy,
} as BuiltRawReqFilter;
@ -210,10 +218,12 @@ export class RequestFilterBuilder {
const relays = dedupe([...this.#relayHints.values()].flat());
return relays.map(r => {
return {
filter: {
...this.#filter,
ids: [...this.#relayHints.entries()].filter(([, v]) => v.includes(r)).map(([k]) => k),
},
filters: [
{
...this.#filter,
ids: [...this.#relayHints.entries()].filter(([, v]) => v.includes(r)).map(([k]) => k),
},
],
relay: r,
strategy: RequestStrategy.RelayHintedEventIds,
};
@ -225,7 +235,8 @@ export class RequestFilterBuilder {
const split = splitByWriteRelays(relays, this.#filter);
return split.map(a => {
return {
...a,
filters: [a.filter],
relay: a.relay,
strategy: RequestStrategy.AuthorsRelays,
};
});
@ -233,7 +244,7 @@ export class RequestFilterBuilder {
return [
{
filter: this.filter,
filters: [this.filter],
relay: "*",
strategy: RequestStrategy.DefaultRelays,
},

View File

@ -0,0 +1,44 @@
import { RawReqFilter } from "@snort/nostr";
import { mergeSimilar } from "./RequestMerger";
describe("RequestMerger", () => {
it("should simple merge authors", () => {
const a = {
authors: ["a"],
} as RawReqFilter;
const b = {
authors: ["b"],
} as RawReqFilter;
const merged = mergeSimilar([a, b]);
expect(merged).toMatchObject([
{
authors: ["a", "b"],
},
]);
});
it("should append non-mergable filters", () => {
const a = {
authors: ["a"],
} as RawReqFilter;
const b = {
authors: ["b"],
} as RawReqFilter;
const c = {
limit: 5,
authors: ["a"],
};
const merged = mergeSimilar([a, b, c]);
expect(merged).toMatchObject([
{
authors: ["a", "b"],
},
{
limit: 5,
authors: ["a"],
},
]);
});
});

View File

@ -1,5 +1,30 @@
import { RawReqFilter } from "@snort/nostr";
export function mergeSimilar(filters: Array<RawReqFilter>): Array<RawReqFilter> {
return filters;
const hasCriticalKeySet = (a: RawReqFilter) => {
return a.limit !== undefined || a.since !== undefined || a.until !== undefined;
};
const canEasilyMerge = filters.filter(a => !hasCriticalKeySet(a));
const cannotMerge = filters.filter(a => hasCriticalKeySet(a));
return [...(canEasilyMerge.length > 0 ? [simpleMerge(canEasilyMerge)] : []), ...cannotMerge];
}
function simpleMerge(filters: Array<RawReqFilter>) {
const result: any = {};
filters.forEach(filter => {
Object.entries(filter).forEach(([key, value]) => {
if (Array.isArray(value)) {
if (result[key] === undefined) {
result[key] = [...value];
} else {
result[key] = [...new Set([...result[key], ...value])];
}
} else {
throw new Error("Cannot simple merge with non-array filter properties");
}
});
});
return result as RawReqFilter;
}

View File

@ -1,5 +1,8 @@
import { RawReqFilter } from "@snort/nostr";
// Critical keys changing means the entire filter has changed
export const CriticalKeys = ["since", "until", "limit"];
export function diffFilters(a: Array<RawReqFilter>, b: Array<RawReqFilter>) {
const result: Array<RawReqFilter> = [];
let anyChanged = false;
@ -9,25 +12,20 @@ export function diffFilters(a: Array<RawReqFilter>, b: Array<RawReqFilter>) {
result.push(bN);
anyChanged = true;
} else {
// Critical keys changing means the entire filter has changed
const criticalKeys = ["since", "until", "limit"];
let anyCriticalKeyChanged = false;
for (const [k, v] of Object.entries(bN)) {
if (Array.isArray(v)) {
const prevArray = prev[k] as Array<string | number>;
if (!prevArray) {
throw new Error(`Tried to add new filter prop ${k} which isnt supported!`);
}
const prevArray = prev[k] as Array<string | number> | undefined;
const thisArray = v as Array<string | number>;
const added = thisArray.filter(a => !prevArray.includes(a));
const added = thisArray.filter(a => !prevArray?.includes(a));
// support adding new values to array, removing values is ignored since we only care about getting new values
result[i] = { ...result[i], [k]: added.length === 0 ? prevArray : added };
result[i] = { ...result[i], [k]: added.length === 0 ? prevArray ?? thisArray : added };
if (added.length > 0) {
anyChanged = true;
}
} else if (prev[k] !== v) {
result[i] = { ...result[i], [k]: v };
if (criticalKeys.includes(k)) {
if (CriticalKeys.includes(k)) {
anyCriticalKeyChanged = anyChanged = true;
break;
}

View File

@ -84,7 +84,7 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> {
c.OnDisconnect = id => this.OnRelayDisconnect(id);
c.OnConnected = () => {
for (const [, q] of this.Queries) {
q.sendToRelay(c);
q.sendToRelay(c, q);
}
};
await c.Connect();
@ -142,7 +142,7 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> {
c.OnConnected = () => {
for (const [, q] of this.Queries) {
if (q.progress !== 1) {
q.sendToRelay(c);
q.sendToRelay(c, q);
}
}
};
@ -206,15 +206,14 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> {
existing,
{
id: `${existing.id}-${existing.subQueryCounter++}`,
filters: [subQ.filter],
relays: [],
filters: subQ.filters,
relays: [subQ.relay],
},
(q, s, c) => q.sendSubQueryToRelay(c, s)
(q, s, c) => q.sendToRelay(c, s)
);
}
q.filters = filters;
this.notifyChange();
return q.feed as Readonly<T>;
return existing.feed as Readonly<T>;
}
} else {
const store = new type();
@ -225,22 +224,20 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> {
q.leaveOpen = req.options.leaveOpen;
}
this.Queries.set(rb.id, q);
const splitFilters = splitAllByWriteRelays(filters);
if (splitFilters.length > 1) {
for (const sf of splitFilters) {
const subQ = {
this.Queries.set(req.id, q);
for (const subQ of filters) {
this.SendQuery(
q,
{
id: `${q.id}-${q.subQueryCounter++}`,
filters: sf.filters,
relays: sf.relay ? [sf.relay] : undefined,
} as QueryBase;
this.SendQuery(q, subQ, (q, s, c) => q.sendSubQueryToRelay(c, s));
}
} else {
this.SendQuery(q, q, (q, s, c) => q.sendToRelay(c));
filters: subQ.filters,
relays: [subQ.relay],
},
(q, s, c) => q.sendToRelay(c, s)
);
}
this.notifyChange();
return store;
return q.feed as Readonly<T>;
}
}