refactor request builder to handle relay hints

This commit is contained in:
Kieran 2023-05-25 19:52:03 +01:00
parent ca92b365e0
commit 9a33466c7c
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
11 changed files with 242 additions and 118 deletions

View File

@ -42,6 +42,7 @@ export default function useRelaysFeedFollows(pubkeys: HexKey[]): Array<RelayList
});
}
// instead of discarding the follow list we should also use it for follow graph
function mapFromContactList(notes: Array<TaggedRawEvent>): Array<RelayList> {
return notes.map(ev => {
if (ev.content !== "" && ev.content !== "{}" && ev.content.startsWith("{") && ev.content.endsWith("}")) {

View File

@ -6,10 +6,18 @@ import { FlatNoteStore, RequestBuilder } from "System";
import useRequestBuilder from "Hooks/useRequestBuilder";
import useLogin from "Hooks/useLogin";
interface RelayTaggedEventId {
id: u256;
relay?: string;
}
export default function useThreadFeed(link: NostrLink) {
const [trackingEvents, setTrackingEvent] = useState<u256[]>([link.id]);
const linkTagged = {
id: link.id,
relay: link.relays?.[0],
};
const [trackingEvents, setTrackingEvent] = useState<Array<RelayTaggedEventId>>([linkTagged]);
const [trackingATags, setTrackingATags] = useState<string[]>([]);
const [allEvents, setAllEvents] = useState<u256[]>([link.id]);
const [allEvents, setAllEvents] = useState<Array<RelayTaggedEventId>>([linkTagged]);
const pref = useLogin().preferences;
const sub = useMemo(() => {
@ -17,7 +25,10 @@ export default function useThreadFeed(link: NostrLink) {
sub.withOptions({
leaveOpen: true,
});
sub.withFilter().ids(trackingEvents);
const fTracking = sub.withFilter();
for (const te of trackingEvents) {
fTracking.id(te.id, te.relay);
}
sub
.withFilter()
.kinds(
@ -25,7 +36,10 @@ export default function useThreadFeed(link: NostrLink) {
? [EventKind.Reaction, EventKind.TextNote, EventKind.Repost, EventKind.ZapReceipt]
: [EventKind.TextNote, EventKind.ZapReceipt, EventKind.Repost]
)
.tag("e", allEvents);
.tag(
"e",
allEvents.map(a => a.id)
);
if (trackingATags.length > 0) {
const parsed = trackingATags.map(a => a.split(":"));
@ -45,16 +59,27 @@ export default function useThreadFeed(link: NostrLink) {
useEffect(() => {
setTrackingATags([]);
setTrackingEvent([link.id]);
setAllEvents([link.id]);
setTrackingEvent([linkTagged]);
setAllEvents([linkTagged]);
}, [link.id]);
useEffect(() => {
if (store.data) {
const mainNotes = store.data?.filter(a => a.kind === EventKind.TextNote || a.kind === EventKind.Polls) ?? [];
const eTags = mainNotes.map(a => a.tags.filter(b => b[0] === "e").map(b => b[1])).flat();
const eTagsMissing = eTags.filter(a => !mainNotes.some(b => b.id === a));
const eTags = mainNotes
.map(a =>
a.tags
.filter(b => b[0] === "e")
.map(b => {
return {
id: b[1],
relay: b[2],
};
})
)
.flat();
const eTagsMissing = eTags.filter(a => !mainNotes.some(b => b.id === a.id));
setTrackingEvent(s => appendDedupe(s, eTagsMissing));
setAllEvents(s => appendDedupe(s, eTags));

View File

@ -1,5 +1,4 @@
import { RawReqFilter } from "@snort/nostr";
import { UserRelays } from "Cache/UserRelayCache";
import { FullRelaySettings, RawReqFilter } from "@snort/nostr";
import { unwrap } from "SnortUtils";
import debug from "debug";
@ -15,18 +14,24 @@ export interface RelayTaggedFilters {
filters: Array<RawReqFilter>;
}
export function splitAllByWriteRelays(filters: Array<RawReqFilter>) {
const allSplit = filters.map(splitByWriteRelays).reduce((acc, v) => {
for (const vn of v) {
const existing = acc.get(vn.relay);
if (existing) {
existing.push(vn.filter);
} else {
acc.set(vn.relay, [vn.filter]);
export interface RelayCache {
get(pubkey?: string): Array<FullRelaySettings> | undefined;
}
export function splitAllByWriteRelays(cache: RelayCache, filters: Array<RawReqFilter>) {
const allSplit = filters
.map(a => splitByWriteRelays(cache, a))
.reduce((acc, v) => {
for (const vn of v) {
const existing = acc.get(vn.relay);
if (existing) {
existing.push(vn.filter);
} else {
acc.set(vn.relay, [vn.filter]);
}
}
}
return acc;
}, new Map<string, Array<RawReqFilter>>());
return acc;
}, new Map<string, Array<RawReqFilter>>());
return [...allSplit.entries()].map(([k, v]) => {
return {
@ -41,7 +46,7 @@ export function splitAllByWriteRelays(filters: Array<RawReqFilter>) {
* @param filter
* @returns
*/
export function splitByWriteRelays(filter: RawReqFilter): Array<RelayTaggedFilter> {
export function splitByWriteRelays(cache: RelayCache, filter: RawReqFilter): Array<RelayTaggedFilter> {
if ((filter.authors?.length ?? 0) === 0)
return [
{
@ -53,7 +58,7 @@ export function splitByWriteRelays(filter: RawReqFilter): Array<RelayTaggedFilte
const allRelays = unwrap(filter.authors).map(a => {
return {
key: a,
relays: UserRelays.getFromCache(a)?.relays,
relays: cache.get(a)?.filter(a => a.settings.write),
};
});

View File

@ -1,5 +1,5 @@
import { TaggedRawEvent, u256 } from "@snort/nostr";
import { findTag } from "SnortUtils";
import { appendDedupe, findTag } from "SnortUtils";
export interface StoreSnapshot<TSnapshot> {
data: TSnapshot | undefined;
@ -142,6 +142,11 @@ export class FlatNoteStore extends HookedNoteStore<Readonly<Array<TaggedRawEvent
this.#events.push(a);
this.#ids.add(a.id);
changes.push(a);
} else {
const existing = this.#events.find(b => b.id === a.id);
if (existing) {
existing.relays = appendDedupe(existing.relays, a.relays);
}
}
});

View File

@ -13,28 +13,18 @@ class QueryTrace {
readonly relay: string;
readonly connId: string;
readonly start: number;
readonly leaveOpen: boolean;
sent?: number;
eose?: number;
close?: number;
#wasForceClosed = false;
readonly #fnClose: (id: string) => void;
readonly #fnProgress: () => void;
readonly #log = debug("QueryTrace");
constructor(
sub: string,
relay: string,
connId: string,
leaveOpen: boolean,
fnClose: (id: string) => void,
fnProgress: () => void
) {
constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void, fnProgress: () => void) {
this.id = uuid();
this.subId = sub;
this.relay = relay;
this.connId = connId;
this.leaveOpen = leaveOpen;
this.start = unixNowMs();
this.#fnClose = fnClose;
this.#fnProgress = fnProgress;
@ -48,10 +38,6 @@ class QueryTrace {
gotEose() {
this.eose = unixNowMs();
this.#fnProgress();
if (!this.leaveOpen) {
this.sendClose();
}
//this.#log("[EOSE] %s %s", this.subId, this.relay);
}
forceEose() {
@ -59,7 +45,6 @@ class QueryTrace {
this.#wasForceClosed = true;
this.#fnProgress();
this.sendClose();
//this.#log("[F-EOSE] %s %s", this.subId, this.relay);
}
sendClose() {
@ -117,10 +102,16 @@ export interface QueryBase {
/**
* Active or queued query on the system
*/
export class Query implements QueryBase {
export class Query {
/**
* Uniquie ID of this query
*/
id: string;
filters: Array<RawReqFilter>;
relays?: Array<string>;
/**
* A merged set of all filters send to relays for this query
*/
filters: Array<RawReqFilter> = [];
/**
* Which relays this query has already been executed on
@ -150,9 +141,8 @@ export class Query implements QueryBase {
subQueryCounter = 0;
#log = debug("Query");
constructor(id: string, filters: Array<RawReqFilter>, feed: NoteStore) {
constructor(id: string, feed: NoteStore) {
this.id = id;
this.filters = filters;
this.#feed = feed;
this.#checkTraces();
}
@ -181,14 +171,7 @@ export class Query implements QueryBase {
this.#stopCheckTraces();
}
sendToRelay(c: Connection) {
if (!this.#canSendQuery(c, this)) {
return;
}
this.#sendQueryInternal(c, this);
}
sendSubQueryToRelay(c: Connection, subq: QueryBase) {
sendToRelay(c: Connection, subq: QueryBase) {
if (!this.#canSendQuery(c, subq)) {
return;
}
@ -200,9 +183,6 @@ export class Query implements QueryBase {
}
sendClose() {
for (const qt of this.#tracing) {
qt.sendClose();
}
for (const qt of this.#tracing) {
qt.sendClose();
}
@ -212,6 +192,9 @@ export class Query implements QueryBase {
eose(sub: string, conn: Readonly<Connection>) {
const qt = this.#tracing.find(a => a.subId === sub && a.connId === conn.Id);
qt?.gotEose();
if (!this.leaveOpen) {
qt?.sendClose();
}
}
/**
@ -270,7 +253,6 @@ export class Query implements QueryBase {
q.id,
c.Address,
c.Id,
this.leaveOpen,
x => c.CloseReq(x),
() => this.#onProgress()
);

View File

@ -1,22 +1,29 @@
import { RelayCache } from "./GossipModel";
import { RequestBuilder } from "./RequestBuilder";
import { describe, expect } from "@jest/globals";
describe("RequestBuilder", () => {
const relayCache = {
get: () => {
return undefined;
},
} as RelayCache;
describe("basic", () => {
test("empty filter", () => {
const b = new RequestBuilder("test");
b.withFilter();
expect(b.build()).toEqual([{}]);
expect(b.build(relayCache)).toEqual([{}]);
});
test("only kind", () => {
const b = new RequestBuilder("test");
b.withFilter().kinds([0]);
expect(b.build()).toEqual([{ kinds: [0] }]);
expect(b.build(relayCache)).toEqual([{ kinds: [0] }]);
});
test("empty authors", () => {
const b = new RequestBuilder("test");
b.withFilter().authors([]);
expect(b.build()).toEqual([{ authors: [] }]);
expect(b.build(relayCache)).toEqual([{ authors: [] }]);
});
test("authors/kinds/ids", () => {
const authors = ["a1", "a2"];
@ -24,7 +31,7 @@ describe("RequestBuilder", () => {
const ids = ["id1", "id2", "id3"];
const b = new RequestBuilder("test");
b.withFilter().authors(authors).kinds(kinds).ids(ids);
expect(b.build()).toEqual([{ ids, authors, kinds }]);
expect(b.build(relayCache)).toEqual([{ ids, authors, kinds }]);
});
test("authors and kinds, duplicates removed", () => {
const authors = ["a1", "a2"];
@ -32,12 +39,12 @@ describe("RequestBuilder", () => {
const ids = ["id1", "id2", "id3"];
const b = new RequestBuilder("test");
b.withFilter().ids(ids).authors(authors).kinds(kinds).ids(ids).authors(authors).kinds(kinds);
expect(b.build()).toEqual([{ ids, authors, kinds }]);
expect(b.build(relayCache)).toEqual([{ ids, authors, kinds }]);
});
test("search", () => {
const b = new RequestBuilder("test");
b.withFilter().kinds([1]).search("test-search");
expect(b.build()).toEqual([{ kinds: [1], search: "test-search" }]);
expect(b.build(relayCache)).toEqual([{ kinds: [1], search: "test-search" }]);
});
test("timeline", () => {
const authors = ["a1", "a2"];
@ -46,7 +53,7 @@ describe("RequestBuilder", () => {
const since = 5;
const b = new RequestBuilder("test");
b.withFilter().kinds(kinds).authors(authors).since(since).until(until);
expect(b.build()).toEqual([{ kinds, authors, until, since }]);
expect(b.build(relayCache)).toEqual([{ kinds, authors, until, since }]);
});
test("multi-filter timeline", () => {
const authors = ["a1", "a2"];
@ -56,7 +63,7 @@ describe("RequestBuilder", () => {
const b = new RequestBuilder("test");
b.withFilter().kinds(kinds).authors(authors).since(since).until(until);
b.withFilter().kinds(kinds).authors(authors).since(since).until(until);
expect(b.build()).toEqual([
expect(b.build(relayCache)).toEqual([
{ kinds, authors, until, since },
{ kinds, authors, until, since },
]);

View File

@ -1,10 +1,14 @@
import { RawReqFilter, u256, HexKey, EventKind } from "@snort/nostr";
import { appendDedupe } from "SnortUtils";
import { appendDedupe, dedupe } from "SnortUtils";
import { QueryBase } from "./Query";
import { diffFilters } from "./RequestSplitter";
import { RelayCache, splitByWriteRelays } from "./GossipModel";
import { mergeSimilar } from "./RequestMerger";
/**
* Which strategy is used when building REQ filters
*/
export enum NostrRequestStrategy {
export enum RequestStrategy {
/**
* Use the users default relays to fetch events,
* this is the fallback option when there is no better way to query a given filter set
@ -26,10 +30,9 @@ export enum NostrRequestStrategy {
* A built REQ filter ready for sending to System
*/
export interface BuiltRawReqFilter {
id: string;
filter: Array<RawReqFilter>;
relays: Array<string>;
strategy: NostrRequestStrategy;
filter: RawReqFilter;
relay: string;
strategy: RequestStrategy;
}
export interface RequestBuilderOptions {
@ -76,8 +79,55 @@ export class RequestBuilder {
return this;
}
build(): Array<RawReqFilter> {
return this.#builders.map(a => a.filter);
buildRaw(): Array<RawReqFilter> {
return this.#builders.map(f => f.filter);
}
build(relays: RelayCache): Array<BuiltRawReqFilter> {
const expanded = this.#builders.map(a => a.build(relays)).flat();
return this.#mergeSimilar(expanded);
}
/**
* Detects a change in request from a previous set of filters
* @param q All previous filters merged
* @returns
*/
buildDiff(relays: RelayCache, q: QueryBase): Array<BuiltRawReqFilter> {
const next = this.buildRaw();
const diff = diffFilters(q.filters, next);
if (diff.changed) {
}
return [];
}
/**
* Merge a set of expanded filters into the smallest number of subscriptions by merging similar requests
* @param expanded
* @returns
*/
#mergeSimilar(expanded: Array<BuiltRawReqFilter>) {
const relayMerged = expanded.reduce((acc, v) => {
const existing = acc.get(v.relay);
if (existing) {
existing.push(v);
} else {
acc.set(v.relay, [v]);
}
return acc;
}, new Map<string, Array<BuiltRawReqFilter>>());
const filtersSquashed = [...relayMerged.values()].flatMap(a => {
return mergeSimilar(a.map(b => b.filter)).map(b => {
return {
filter: b,
relay: a[0].relay,
strategy: a[0].strategy,
} as BuiltRawReqFilter;
});
});
return filtersSquashed;
}
}
@ -86,7 +136,7 @@ export class RequestBuilder {
*/
export class RequestFilterBuilder {
#filter: RawReqFilter = {};
#relayHints: Map<u256, Array<string>> = new Map();
#relayHints = new Map<u256, Array<string>>();
get filter() {
return { ...this.#filter };
@ -149,4 +199,44 @@ export class RequestFilterBuilder {
this.#filter.search = keyword;
return this;
}
/**
* Build/expand this filter into a set of relay specific queries
*/
build(relays: RelayCache): Array<BuiltRawReqFilter> {
// when querying for specific event ids with relay hints
// take the first approach which is to split the filter by relay
if (this.#filter.ids && this.#relayHints.size > 0) {
const relays = dedupe([...this.#relayHints.values()].flat());
return relays.map(r => {
return {
filter: {
...this.#filter,
ids: [...this.#relayHints.entries()].filter(([, v]) => v.includes(r)).map(([k]) => k),
},
relay: r,
strategy: RequestStrategy.RelayHintedEventIds,
};
});
}
// If any authors are set use the gossip model to fetch data for each author
if (this.#filter.authors) {
const split = splitByWriteRelays(relays, this.#filter);
return split.map(a => {
return {
...a,
strategy: RequestStrategy.AuthorsRelays,
};
});
}
return [
{
filter: this.filter,
relay: "*",
strategy: RequestStrategy.DefaultRelays,
},
];
}
}

View File

@ -0,0 +1,5 @@
import { RawReqFilter } from "@snort/nostr";
export function mergeSimilar(filters: Array<RawReqFilter>): Array<RawReqFilter> {
return filters;
}

View File

@ -15,6 +15,9 @@ export function diffFilters(a: Array<RawReqFilter>, b: Array<RawReqFilter>) {
for (const [k, v] of Object.entries(bN)) {
if (Array.isArray(v)) {
const prevArray = prev[k] as Array<string | number>;
if (!prevArray) {
throw new Error(`Tried to add new filter prop ${k} which isnt supported!`);
}
const thisArray = v as Array<string | number>;
const added = thisArray.filter(a => !prevArray.includes(a));
// support adding new values to array, removing values is ignored since we only care about getting new values

View File

@ -15,6 +15,7 @@ import { diffFilters } from "./RequestSplitter";
import { Query, QueryBase } from "./Query";
import { splitAllByWriteRelays } from "./GossipModel";
import ExternalStore from "ExternalStore";
import { UserRelays } from "Cache/UserRelayCache";
export {
NoteStore,
@ -59,6 +60,9 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> {
HandleAuth?: AuthHandler;
#log = debug("System");
#relayCache = {
get: (pk?: string) => UserRelays.getFromCache(pk)?.relays,
};
constructor() {
super();
@ -188,17 +192,42 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> {
if (!req) return new type();
if (this.Queries.has(req.id)) {
const filters = req.build();
const q = unwrap(this.Queries.get(req.id));
q.unCancel();
const existing = this.Queries.get(req.id);
if (existing) {
const filters = req.buildDiff(this.#relayCache, existing);
existing.unCancel();
const diff = diffFilters(q.filters, filters);
if (!diff.changed && !req.options?.skipDiff) {
if (filters.length === 0 && !req.options?.skipDiff) {
this.notifyChange();
return unwrap(q.feed) as Readonly<T>;
return existing.feed as Readonly<T>;
} else {
const splitFilters = splitAllByWriteRelays(filters);
for (const subQ of filters) {
this.SendQuery(
existing,
{
id: `${existing.id}-${existing.subQueryCounter++}`,
filters: [subQ.filter],
relays: [],
},
(q, s, c) => q.sendSubQueryToRelay(c, s)
);
}
q.filters = filters;
this.notifyChange();
return q.feed as Readonly<T>;
}
} else {
const store = new type();
const filters = req.build(this.#relayCache);
const q = new Query(req.id, store);
if (req.options?.leaveOpen) {
q.leaveOpen = req.options.leaveOpen;
}
this.Queries.set(rb.id, q);
const splitFilters = splitAllByWriteRelays(filters);
if (splitFilters.length > 1) {
for (const sf of splitFilters) {
const subQ = {
id: `${q.id}-${q.subQueryCounter++}`,
@ -207,45 +236,14 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> {
} as QueryBase;
this.SendQuery(q, subQ, (q, s, c) => q.sendSubQueryToRelay(c, s));
}
q.filters = filters;
this.notifyChange();
return q.feed as Readonly<T>;
} else {
this.SendQuery(q, q, (q, s, c) => q.sendToRelay(c));
}
} else {
return this.AddQuery<T>(type, req);
this.notifyChange();
return store;
}
}
AddQuery<T extends NoteStore>(type: { new (): T }, rb: RequestBuilder): T {
const store = new type();
const filters = rb.build();
const q = new Query(rb.id, filters, store);
if (rb.options?.leaveOpen) {
q.leaveOpen = rb.options.leaveOpen;
}
if (rb.options?.relays && (rb.options?.relays?.length ?? 0) > 0) {
q.relays = rb.options.relays;
}
this.Queries.set(rb.id, q);
const splitFilters = splitAllByWriteRelays(filters);
if (splitFilters.length > 1) {
for (const sf of splitFilters) {
const subQ = {
id: `${q.id}-${q.subQueryCounter++}`,
filters: sf.filters,
relays: sf.relay ? [sf.relay] : undefined,
} as QueryBase;
this.SendQuery(q, subQ, (q, s, c) => q.sendSubQueryToRelay(c, s));
}
} else {
this.SendQuery(q, q, (q, s, c) => q.sendToRelay(c));
}
this.notifyChange();
return store;
}
CancelQuery(sub: string) {
const q = this.Queries.get(sub);
if (q) {

View File

@ -211,7 +211,10 @@ export class Connection {
break;
}
case "EVENT": {
this.OnEvent?.(msg[1], msg[2]);
this.OnEvent?.(msg[1], {
...msg[2],
relays: [this.Address]
});
this.Stats.EventsReceived++;
this.#UpdateState();
break;