diff --git a/packages/app/src/Feed/RelaysFeedFollows.tsx b/packages/app/src/Feed/RelaysFeedFollows.tsx index 0f67815f..42566875 100644 --- a/packages/app/src/Feed/RelaysFeedFollows.tsx +++ b/packages/app/src/Feed/RelaysFeedFollows.tsx @@ -42,6 +42,7 @@ export default function useRelaysFeedFollows(pubkeys: HexKey[]): Array): Array { return notes.map(ev => { if (ev.content !== "" && ev.content !== "{}" && ev.content.startsWith("{") && ev.content.endsWith("}")) { diff --git a/packages/app/src/Feed/ThreadFeed.ts b/packages/app/src/Feed/ThreadFeed.ts index 9d36cbe3..f7a41f7a 100644 --- a/packages/app/src/Feed/ThreadFeed.ts +++ b/packages/app/src/Feed/ThreadFeed.ts @@ -6,10 +6,18 @@ import { FlatNoteStore, RequestBuilder } from "System"; import useRequestBuilder from "Hooks/useRequestBuilder"; import useLogin from "Hooks/useLogin"; +interface RelayTaggedEventId { + id: u256; + relay?: string; +} export default function useThreadFeed(link: NostrLink) { - const [trackingEvents, setTrackingEvent] = useState([link.id]); + const linkTagged = { + id: link.id, + relay: link.relays?.[0], + }; + const [trackingEvents, setTrackingEvent] = useState>([linkTagged]); const [trackingATags, setTrackingATags] = useState([]); - const [allEvents, setAllEvents] = useState([link.id]); + const [allEvents, setAllEvents] = useState>([linkTagged]); const pref = useLogin().preferences; const sub = useMemo(() => { @@ -17,7 +25,10 @@ export default function useThreadFeed(link: NostrLink) { sub.withOptions({ leaveOpen: true, }); - sub.withFilter().ids(trackingEvents); + const fTracking = sub.withFilter(); + for (const te of trackingEvents) { + fTracking.id(te.id, te.relay); + } sub .withFilter() .kinds( @@ -25,7 +36,10 @@ export default function useThreadFeed(link: NostrLink) { ? [EventKind.Reaction, EventKind.TextNote, EventKind.Repost, EventKind.ZapReceipt] : [EventKind.TextNote, EventKind.ZapReceipt, EventKind.Repost] ) - .tag("e", allEvents); + .tag( + "e", + allEvents.map(a => a.id) + ); if (trackingATags.length > 0) { const parsed = trackingATags.map(a => a.split(":")); @@ -45,16 +59,27 @@ export default function useThreadFeed(link: NostrLink) { useEffect(() => { setTrackingATags([]); - setTrackingEvent([link.id]); - setAllEvents([link.id]); + setTrackingEvent([linkTagged]); + setAllEvents([linkTagged]); }, [link.id]); useEffect(() => { if (store.data) { const mainNotes = store.data?.filter(a => a.kind === EventKind.TextNote || a.kind === EventKind.Polls) ?? []; - const eTags = mainNotes.map(a => a.tags.filter(b => b[0] === "e").map(b => b[1])).flat(); - const eTagsMissing = eTags.filter(a => !mainNotes.some(b => b.id === a)); + const eTags = mainNotes + .map(a => + a.tags + .filter(b => b[0] === "e") + .map(b => { + return { + id: b[1], + relay: b[2], + }; + }) + ) + .flat(); + const eTagsMissing = eTags.filter(a => !mainNotes.some(b => b.id === a.id)); setTrackingEvent(s => appendDedupe(s, eTagsMissing)); setAllEvents(s => appendDedupe(s, eTags)); diff --git a/packages/app/src/System/GossipModel.ts b/packages/app/src/System/GossipModel.ts index 06c359f4..914f57a6 100644 --- a/packages/app/src/System/GossipModel.ts +++ b/packages/app/src/System/GossipModel.ts @@ -1,5 +1,4 @@ -import { RawReqFilter } from "@snort/nostr"; -import { UserRelays } from "Cache/UserRelayCache"; +import { FullRelaySettings, RawReqFilter } from "@snort/nostr"; import { unwrap } from "SnortUtils"; import debug from "debug"; @@ -15,18 +14,24 @@ export interface RelayTaggedFilters { filters: Array; } -export function splitAllByWriteRelays(filters: Array) { - const allSplit = filters.map(splitByWriteRelays).reduce((acc, v) => { - for (const vn of v) { - const existing = acc.get(vn.relay); - if (existing) { - existing.push(vn.filter); - } else { - acc.set(vn.relay, [vn.filter]); +export interface RelayCache { + get(pubkey?: string): Array | undefined; +} + +export function splitAllByWriteRelays(cache: RelayCache, filters: Array) { + const allSplit = filters + .map(a => splitByWriteRelays(cache, a)) + .reduce((acc, v) => { + for (const vn of v) { + const existing = acc.get(vn.relay); + if (existing) { + existing.push(vn.filter); + } else { + acc.set(vn.relay, [vn.filter]); + } } - } - return acc; - }, new Map>()); + return acc; + }, new Map>()); return [...allSplit.entries()].map(([k, v]) => { return { @@ -41,7 +46,7 @@ export function splitAllByWriteRelays(filters: Array) { * @param filter * @returns */ -export function splitByWriteRelays(filter: RawReqFilter): Array { +export function splitByWriteRelays(cache: RelayCache, filter: RawReqFilter): Array { if ((filter.authors?.length ?? 0) === 0) return [ { @@ -53,7 +58,7 @@ export function splitByWriteRelays(filter: RawReqFilter): Array { return { key: a, - relays: UserRelays.getFromCache(a)?.relays, + relays: cache.get(a)?.filter(a => a.settings.write), }; }); diff --git a/packages/app/src/System/NoteCollection.ts b/packages/app/src/System/NoteCollection.ts index 3044d32b..da7efa64 100644 --- a/packages/app/src/System/NoteCollection.ts +++ b/packages/app/src/System/NoteCollection.ts @@ -1,5 +1,5 @@ import { TaggedRawEvent, u256 } from "@snort/nostr"; -import { findTag } from "SnortUtils"; +import { appendDedupe, findTag } from "SnortUtils"; export interface StoreSnapshot { data: TSnapshot | undefined; @@ -142,6 +142,11 @@ export class FlatNoteStore extends HookedNoteStore b.id === a.id); + if (existing) { + existing.relays = appendDedupe(existing.relays, a.relays); + } } }); diff --git a/packages/app/src/System/Query.ts b/packages/app/src/System/Query.ts index 16e0b8eb..8edc1bf6 100644 --- a/packages/app/src/System/Query.ts +++ b/packages/app/src/System/Query.ts @@ -13,28 +13,18 @@ class QueryTrace { readonly relay: string; readonly connId: string; readonly start: number; - readonly leaveOpen: boolean; sent?: number; eose?: number; close?: number; #wasForceClosed = false; readonly #fnClose: (id: string) => void; readonly #fnProgress: () => void; - readonly #log = debug("QueryTrace"); - constructor( - sub: string, - relay: string, - connId: string, - leaveOpen: boolean, - fnClose: (id: string) => void, - fnProgress: () => void - ) { + constructor(sub: string, relay: string, connId: string, fnClose: (id: string) => void, fnProgress: () => void) { this.id = uuid(); this.subId = sub; this.relay = relay; this.connId = connId; - this.leaveOpen = leaveOpen; this.start = unixNowMs(); this.#fnClose = fnClose; this.#fnProgress = fnProgress; @@ -48,10 +38,6 @@ class QueryTrace { gotEose() { this.eose = unixNowMs(); this.#fnProgress(); - if (!this.leaveOpen) { - this.sendClose(); - } - //this.#log("[EOSE] %s %s", this.subId, this.relay); } forceEose() { @@ -59,7 +45,6 @@ class QueryTrace { this.#wasForceClosed = true; this.#fnProgress(); this.sendClose(); - //this.#log("[F-EOSE] %s %s", this.subId, this.relay); } sendClose() { @@ -117,10 +102,16 @@ export interface QueryBase { /** * Active or queued query on the system */ -export class Query implements QueryBase { +export class Query { + /** + * Uniquie ID of this query + */ id: string; - filters: Array; - relays?: Array; + + /** + * A merged set of all filters send to relays for this query + */ + filters: Array = []; /** * Which relays this query has already been executed on @@ -150,9 +141,8 @@ export class Query implements QueryBase { subQueryCounter = 0; #log = debug("Query"); - constructor(id: string, filters: Array, feed: NoteStore) { + constructor(id: string, feed: NoteStore) { this.id = id; - this.filters = filters; this.#feed = feed; this.#checkTraces(); } @@ -181,14 +171,7 @@ export class Query implements QueryBase { this.#stopCheckTraces(); } - sendToRelay(c: Connection) { - if (!this.#canSendQuery(c, this)) { - return; - } - this.#sendQueryInternal(c, this); - } - - sendSubQueryToRelay(c: Connection, subq: QueryBase) { + sendToRelay(c: Connection, subq: QueryBase) { if (!this.#canSendQuery(c, subq)) { return; } @@ -200,9 +183,6 @@ export class Query implements QueryBase { } sendClose() { - for (const qt of this.#tracing) { - qt.sendClose(); - } for (const qt of this.#tracing) { qt.sendClose(); } @@ -212,6 +192,9 @@ export class Query implements QueryBase { eose(sub: string, conn: Readonly) { const qt = this.#tracing.find(a => a.subId === sub && a.connId === conn.Id); qt?.gotEose(); + if (!this.leaveOpen) { + qt?.sendClose(); + } } /** @@ -270,7 +253,6 @@ export class Query implements QueryBase { q.id, c.Address, c.Id, - this.leaveOpen, x => c.CloseReq(x), () => this.#onProgress() ); diff --git a/packages/app/src/System/RequestBuilder.test.ts b/packages/app/src/System/RequestBuilder.test.ts index 29527438..fe1a84c4 100644 --- a/packages/app/src/System/RequestBuilder.test.ts +++ b/packages/app/src/System/RequestBuilder.test.ts @@ -1,22 +1,29 @@ +import { RelayCache } from "./GossipModel"; import { RequestBuilder } from "./RequestBuilder"; import { describe, expect } from "@jest/globals"; describe("RequestBuilder", () => { + const relayCache = { + get: () => { + return undefined; + }, + } as RelayCache; + describe("basic", () => { test("empty filter", () => { const b = new RequestBuilder("test"); b.withFilter(); - expect(b.build()).toEqual([{}]); + expect(b.build(relayCache)).toEqual([{}]); }); test("only kind", () => { const b = new RequestBuilder("test"); b.withFilter().kinds([0]); - expect(b.build()).toEqual([{ kinds: [0] }]); + expect(b.build(relayCache)).toEqual([{ kinds: [0] }]); }); test("empty authors", () => { const b = new RequestBuilder("test"); b.withFilter().authors([]); - expect(b.build()).toEqual([{ authors: [] }]); + expect(b.build(relayCache)).toEqual([{ authors: [] }]); }); test("authors/kinds/ids", () => { const authors = ["a1", "a2"]; @@ -24,7 +31,7 @@ describe("RequestBuilder", () => { const ids = ["id1", "id2", "id3"]; const b = new RequestBuilder("test"); b.withFilter().authors(authors).kinds(kinds).ids(ids); - expect(b.build()).toEqual([{ ids, authors, kinds }]); + expect(b.build(relayCache)).toEqual([{ ids, authors, kinds }]); }); test("authors and kinds, duplicates removed", () => { const authors = ["a1", "a2"]; @@ -32,12 +39,12 @@ describe("RequestBuilder", () => { const ids = ["id1", "id2", "id3"]; const b = new RequestBuilder("test"); b.withFilter().ids(ids).authors(authors).kinds(kinds).ids(ids).authors(authors).kinds(kinds); - expect(b.build()).toEqual([{ ids, authors, kinds }]); + expect(b.build(relayCache)).toEqual([{ ids, authors, kinds }]); }); test("search", () => { const b = new RequestBuilder("test"); b.withFilter().kinds([1]).search("test-search"); - expect(b.build()).toEqual([{ kinds: [1], search: "test-search" }]); + expect(b.build(relayCache)).toEqual([{ kinds: [1], search: "test-search" }]); }); test("timeline", () => { const authors = ["a1", "a2"]; @@ -46,7 +53,7 @@ describe("RequestBuilder", () => { const since = 5; const b = new RequestBuilder("test"); b.withFilter().kinds(kinds).authors(authors).since(since).until(until); - expect(b.build()).toEqual([{ kinds, authors, until, since }]); + expect(b.build(relayCache)).toEqual([{ kinds, authors, until, since }]); }); test("multi-filter timeline", () => { const authors = ["a1", "a2"]; @@ -56,7 +63,7 @@ describe("RequestBuilder", () => { const b = new RequestBuilder("test"); b.withFilter().kinds(kinds).authors(authors).since(since).until(until); b.withFilter().kinds(kinds).authors(authors).since(since).until(until); - expect(b.build()).toEqual([ + expect(b.build(relayCache)).toEqual([ { kinds, authors, until, since }, { kinds, authors, until, since }, ]); diff --git a/packages/app/src/System/RequestBuilder.ts b/packages/app/src/System/RequestBuilder.ts index 5d3deb5d..16c7f653 100644 --- a/packages/app/src/System/RequestBuilder.ts +++ b/packages/app/src/System/RequestBuilder.ts @@ -1,10 +1,14 @@ import { RawReqFilter, u256, HexKey, EventKind } from "@snort/nostr"; -import { appendDedupe } from "SnortUtils"; +import { appendDedupe, dedupe } from "SnortUtils"; +import { QueryBase } from "./Query"; +import { diffFilters } from "./RequestSplitter"; +import { RelayCache, splitByWriteRelays } from "./GossipModel"; +import { mergeSimilar } from "./RequestMerger"; /** * Which strategy is used when building REQ filters */ -export enum NostrRequestStrategy { +export enum RequestStrategy { /** * Use the users default relays to fetch events, * this is the fallback option when there is no better way to query a given filter set @@ -26,10 +30,9 @@ export enum NostrRequestStrategy { * A built REQ filter ready for sending to System */ export interface BuiltRawReqFilter { - id: string; - filter: Array; - relays: Array; - strategy: NostrRequestStrategy; + filter: RawReqFilter; + relay: string; + strategy: RequestStrategy; } export interface RequestBuilderOptions { @@ -76,8 +79,55 @@ export class RequestBuilder { return this; } - build(): Array { - return this.#builders.map(a => a.filter); + buildRaw(): Array { + return this.#builders.map(f => f.filter); + } + + build(relays: RelayCache): Array { + const expanded = this.#builders.map(a => a.build(relays)).flat(); + return this.#mergeSimilar(expanded); + } + + /** + * Detects a change in request from a previous set of filters + * @param q All previous filters merged + * @returns + */ + buildDiff(relays: RelayCache, q: QueryBase): Array { + const next = this.buildRaw(); + const diff = diffFilters(q.filters, next); + if (diff.changed) { + } + return []; + } + + /** + * Merge a set of expanded filters into the smallest number of subscriptions by merging similar requests + * @param expanded + * @returns + */ + #mergeSimilar(expanded: Array) { + const relayMerged = expanded.reduce((acc, v) => { + const existing = acc.get(v.relay); + if (existing) { + existing.push(v); + } else { + acc.set(v.relay, [v]); + } + return acc; + }, new Map>()); + + const filtersSquashed = [...relayMerged.values()].flatMap(a => { + return mergeSimilar(a.map(b => b.filter)).map(b => { + return { + filter: b, + relay: a[0].relay, + strategy: a[0].strategy, + } as BuiltRawReqFilter; + }); + }); + + return filtersSquashed; } } @@ -86,7 +136,7 @@ export class RequestBuilder { */ export class RequestFilterBuilder { #filter: RawReqFilter = {}; - #relayHints: Map> = new Map(); + #relayHints = new Map>(); get filter() { return { ...this.#filter }; @@ -149,4 +199,44 @@ export class RequestFilterBuilder { this.#filter.search = keyword; return this; } + + /** + * Build/expand this filter into a set of relay specific queries + */ + build(relays: RelayCache): Array { + // when querying for specific event ids with relay hints + // take the first approach which is to split the filter by relay + if (this.#filter.ids && this.#relayHints.size > 0) { + const relays = dedupe([...this.#relayHints.values()].flat()); + return relays.map(r => { + return { + filter: { + ...this.#filter, + ids: [...this.#relayHints.entries()].filter(([, v]) => v.includes(r)).map(([k]) => k), + }, + relay: r, + strategy: RequestStrategy.RelayHintedEventIds, + }; + }); + } + + // If any authors are set use the gossip model to fetch data for each author + if (this.#filter.authors) { + const split = splitByWriteRelays(relays, this.#filter); + return split.map(a => { + return { + ...a, + strategy: RequestStrategy.AuthorsRelays, + }; + }); + } + + return [ + { + filter: this.filter, + relay: "*", + strategy: RequestStrategy.DefaultRelays, + }, + ]; + } } diff --git a/packages/app/src/System/RequestMerger.ts b/packages/app/src/System/RequestMerger.ts new file mode 100644 index 00000000..3e93e753 --- /dev/null +++ b/packages/app/src/System/RequestMerger.ts @@ -0,0 +1,5 @@ +import { RawReqFilter } from "@snort/nostr"; + +export function mergeSimilar(filters: Array): Array { + return filters; +} diff --git a/packages/app/src/System/RequestSplitter.ts b/packages/app/src/System/RequestSplitter.ts index 23794565..7f422cee 100644 --- a/packages/app/src/System/RequestSplitter.ts +++ b/packages/app/src/System/RequestSplitter.ts @@ -15,6 +15,9 @@ export function diffFilters(a: Array, b: Array) { for (const [k, v] of Object.entries(bN)) { if (Array.isArray(v)) { const prevArray = prev[k] as Array; + if (!prevArray) { + throw new Error(`Tried to add new filter prop ${k} which isnt supported!`); + } const thisArray = v as Array; const added = thisArray.filter(a => !prevArray.includes(a)); // support adding new values to array, removing values is ignored since we only care about getting new values diff --git a/packages/app/src/System/index.ts b/packages/app/src/System/index.ts index ed3ba4e8..f80b2133 100644 --- a/packages/app/src/System/index.ts +++ b/packages/app/src/System/index.ts @@ -15,6 +15,7 @@ import { diffFilters } from "./RequestSplitter"; import { Query, QueryBase } from "./Query"; import { splitAllByWriteRelays } from "./GossipModel"; import ExternalStore from "ExternalStore"; +import { UserRelays } from "Cache/UserRelayCache"; export { NoteStore, @@ -59,6 +60,9 @@ export class NostrSystem extends ExternalStore { HandleAuth?: AuthHandler; #log = debug("System"); + #relayCache = { + get: (pk?: string) => UserRelays.getFromCache(pk)?.relays, + }; constructor() { super(); @@ -188,17 +192,42 @@ export class NostrSystem extends ExternalStore { if (!req) return new type(); - if (this.Queries.has(req.id)) { - const filters = req.build(); - const q = unwrap(this.Queries.get(req.id)); - q.unCancel(); + const existing = this.Queries.get(req.id); + if (existing) { + const filters = req.buildDiff(this.#relayCache, existing); + existing.unCancel(); - const diff = diffFilters(q.filters, filters); - if (!diff.changed && !req.options?.skipDiff) { + if (filters.length === 0 && !req.options?.skipDiff) { this.notifyChange(); - return unwrap(q.feed) as Readonly; + return existing.feed as Readonly; } else { - const splitFilters = splitAllByWriteRelays(filters); + for (const subQ of filters) { + this.SendQuery( + existing, + { + id: `${existing.id}-${existing.subQueryCounter++}`, + filters: [subQ.filter], + relays: [], + }, + (q, s, c) => q.sendSubQueryToRelay(c, s) + ); + } + q.filters = filters; + this.notifyChange(); + return q.feed as Readonly; + } + } else { + const store = new type(); + + const filters = req.build(this.#relayCache); + const q = new Query(req.id, store); + if (req.options?.leaveOpen) { + q.leaveOpen = req.options.leaveOpen; + } + + this.Queries.set(rb.id, q); + const splitFilters = splitAllByWriteRelays(filters); + if (splitFilters.length > 1) { for (const sf of splitFilters) { const subQ = { id: `${q.id}-${q.subQueryCounter++}`, @@ -207,45 +236,14 @@ export class NostrSystem extends ExternalStore { } as QueryBase; this.SendQuery(q, subQ, (q, s, c) => q.sendSubQueryToRelay(c, s)); } - q.filters = filters; - this.notifyChange(); - return q.feed as Readonly; + } else { + this.SendQuery(q, q, (q, s, c) => q.sendToRelay(c)); } - } else { - return this.AddQuery(type, req); + this.notifyChange(); + return store; } } - AddQuery(type: { new (): T }, rb: RequestBuilder): T { - const store = new type(); - - const filters = rb.build(); - const q = new Query(rb.id, filters, store); - if (rb.options?.leaveOpen) { - q.leaveOpen = rb.options.leaveOpen; - } - if (rb.options?.relays && (rb.options?.relays?.length ?? 0) > 0) { - q.relays = rb.options.relays; - } - - this.Queries.set(rb.id, q); - const splitFilters = splitAllByWriteRelays(filters); - if (splitFilters.length > 1) { - for (const sf of splitFilters) { - const subQ = { - id: `${q.id}-${q.subQueryCounter++}`, - filters: sf.filters, - relays: sf.relay ? [sf.relay] : undefined, - } as QueryBase; - this.SendQuery(q, subQ, (q, s, c) => q.sendSubQueryToRelay(c, s)); - } - } else { - this.SendQuery(q, q, (q, s, c) => q.sendToRelay(c)); - } - this.notifyChange(); - return store; - } - CancelQuery(sub: string) { const q = this.Queries.get(sub); if (q) { diff --git a/packages/nostr/src/legacy/Connection.ts b/packages/nostr/src/legacy/Connection.ts index 36ed4c4d..c9eff0c9 100644 --- a/packages/nostr/src/legacy/Connection.ts +++ b/packages/nostr/src/legacy/Connection.ts @@ -211,7 +211,10 @@ export class Connection { break; } case "EVENT": { - this.OnEvent?.(msg[1], msg[2]); + this.OnEvent?.(msg[1], { + ...msg[2], + relays: [this.Address] + }); this.Stats.EventsReceived++; this.#UpdateState(); break;