diff --git a/packages/app/src/Cache/FeedCache.ts b/packages/app/src/Cache/FeedCache.ts index 6300ebeb0..10feed3a0 100644 --- a/packages/app/src/Cache/FeedCache.ts +++ b/packages/app/src/Cache/FeedCache.ts @@ -118,6 +118,37 @@ export default abstract class FeedCache { this.notifyChange(obj.map(a => this.key(a))); } + /** + * Try to update an entry where created values exists + * @param m Profile metadata + * @returns + */ + async update(m: TCachedWithCreated) { + const k = this.key(m); + const existing = this.getFromCache(k) as TCachedWithCreated; + const updateType = (() => { + if (!existing) { + return "new"; + } + if (existing.created < m.created) { + return "updated"; + } + if (existing && existing.loaded < m.loaded) { + return "refresh"; + } + return "no_change"; + })(); + console.debug(`Updating ${k} ${updateType}`, m); + if (updateType !== "no_change") { + const updated = { + ...existing, + ...m, + }; + await this.set(updated); + } + return updateType; + } + /** * Loads a list of rows from disk cache * @param keys List of ids to load diff --git a/packages/app/src/Cache/UserCache.ts b/packages/app/src/Cache/UserCache.ts index ab6feade0..e2b8b94a6 100644 --- a/packages/app/src/Cache/UserCache.ts +++ b/packages/app/src/Cache/UserCache.ts @@ -53,35 +53,15 @@ class UserProfileCache extends FeedCache { * @param m Profile metadata * @returns */ - async update(m: MetadataCache) { - const existing = this.getFromCache(m.pubkey); - const updateType = (() => { - if (!existing) { - return "new_profile"; - } - if (existing.created < m.created) { - return "updated_profile"; - } - if (existing && existing.loaded < m.loaded) { - return "refresh_profile"; - } - return "no_change"; - })(); - console.debug(`Updating ${m.pubkey} ${updateType}`, m); - if (updateType !== "no_change") { - const writeProfile = { - ...existing, - ...m, - }; - await this.#setItem(writeProfile); - if (updateType !== "refresh_profile") { - const lnurl = m.lud16 ?? m.lud06; - if (lnurl) { - this.#zapperQueue.push({ - pubkey: m.pubkey, - lnurl, - }); - } + override async update(m: MetadataCache) { + const updateType = await super.update(m); + if (updateType !== "refresh") { + const lnurl = m.lud16 ?? m.lud06; + if (lnurl) { + this.#zapperQueue.push({ + pubkey: m.pubkey, + lnurl, + }); } if (m.nip05) { this.#nip5Queue.push({ @@ -97,15 +77,6 @@ class UserProfileCache extends FeedCache { return []; } - async #setItem(m: MetadataCache) { - this.cache.set(m.pubkey, m); - if (db.ready) { - await db.users.put(m); - this.onTable.add(m.pubkey); - } - this.notifyChange([m.pubkey]); - } - async #processZapperQueue() { await this.#batchQueue( this.#zapperQueue, diff --git a/packages/app/src/Cache/UserRelayCache.ts b/packages/app/src/Cache/UserRelayCache.ts new file mode 100644 index 000000000..2f0a6edef --- /dev/null +++ b/packages/app/src/Cache/UserRelayCache.ts @@ -0,0 +1,18 @@ +import { db, UsersRelays } from "Db"; +import FeedCache from "./FeedCache"; + +class UsersRelaysCache extends FeedCache { + constructor() { + super("UserRelays", db.userRelays); + } + + key(of: UsersRelays): string { + return of.pubkey; + } + + takeSnapshot(): Array { + return [...this.cache.values()]; + } +} + +export const UserRelays = new UsersRelaysCache(); diff --git a/packages/app/src/Cache/index.ts b/packages/app/src/Cache/index.ts index b8ac6bc4e..cf1bd971d 100644 --- a/packages/app/src/Cache/index.ts +++ b/packages/app/src/Cache/index.ts @@ -3,6 +3,7 @@ import { hexToBech32, unixNowMs } from "Util"; import { DmCache } from "./DMCache"; import { InteractionCache } from "./EventInteractionCache"; import { UserCache } from "./UserCache"; +import { UserRelays } from "./UserRelayCache"; export interface MetadataCache extends UserMetadata { /** @@ -55,6 +56,7 @@ export async function preload() { await UserCache.preload(); await DmCache.preload(); await InteractionCache.preload(); + await UserRelays.preload(); } export { UserCache, DmCache }; diff --git a/packages/app/src/Feed/LoginFeed.ts b/packages/app/src/Feed/LoginFeed.ts index 840663701..8f0be3c70 100644 --- a/packages/app/src/Feed/LoginFeed.ts +++ b/packages/app/src/Feed/LoginFeed.ts @@ -13,13 +13,15 @@ import useLogin from "Hooks/useLogin"; import { addSubscription, setBlocked, setBookmarked, setFollows, setMuted, setPinned, setRelays, setTags } from "Login"; import { SnortPubKey } from "Const"; import { SubscriptionEvent } from "Subscription"; +import useRelaysFeedFollows from "./RelaysFeedFollows"; +import { UserRelays } from "Cache/UserRelayCache"; /** * Managed loading data for the current logged in user */ export default function useLoginFeed() { const login = useLogin(); - const { publicKey: pubKey, readNotifications } = login; + const { publicKey: pubKey, readNotifications, follows } = login; const { isMuted } = useModeration(); const publisher = useEventPublisher(); @@ -171,8 +173,12 @@ export default function useLoginFeed() { } }, [listsFeed]); - /*const fRelays = useRelaysFeedFollows(follows); useEffect(() => { - FollowsRelays.bulkSet(fRelays).catch(console.error); - }, [dispatch, fRelays]);*/ + UserRelays.buffer(follows.item).catch(console.error); + }, [follows.item]); + + const fRelays = useRelaysFeedFollows(follows.item); + useEffect(() => { + UserRelays.bulkSet(fRelays).catch(console.error); + }, [fRelays]); } diff --git a/packages/app/src/Feed/RelaysFeedFollows.tsx b/packages/app/src/Feed/RelaysFeedFollows.tsx index 839fbd482..cf5ea5c58 100644 --- a/packages/app/src/Feed/RelaysFeedFollows.tsx +++ b/packages/app/src/Feed/RelaysFeedFollows.tsx @@ -5,69 +5,72 @@ import { sanitizeRelayUrl } from "Util"; import { PubkeyReplaceableNoteStore, RequestBuilder } from "System"; import useRequestBuilder from "Hooks/useRequestBuilder"; -type UserRelayMap = Record>; +interface RelayList { + pubkey: string; + created: number; + relays: FullRelaySettings[]; +} -export default function useRelaysFeedFollows(pubkeys: HexKey[]): UserRelayMap { +export default function useRelaysFeedFollows(pubkeys: HexKey[]): Array { const sub = useMemo(() => { const b = new RequestBuilder(`relays:follows`); b.withFilter().authors(pubkeys).kinds([EventKind.Relays, EventKind.ContactList]); return b; }, [pubkeys]); - function mapFromRelays(notes: Array): UserRelayMap { - return Object.fromEntries( - notes.map(ev => { - return [ - ev.pubkey, - ev.tags - .map(a => { - return { - url: sanitizeRelayUrl(a[1]), - settings: { - read: a[2] === "read" || a[2] === undefined, - write: a[2] === "write" || a[2] === undefined, - }, - } as FullRelaySettings; - }) - .filter(a => a.url !== undefined), - ]; - }) - ); + function mapFromRelays(notes: Array): Array { + return notes.map(ev => { + return { + pubkey: ev.pubkey, + created: ev.created_at, + relays: ev.tags + .map(a => { + return { + url: sanitizeRelayUrl(a[1]), + settings: { + read: a[2] === "read" || a[2] === undefined, + write: a[2] === "write" || a[2] === undefined, + }, + } as FullRelaySettings; + }) + .filter(a => a.url !== undefined), + }; + }); } - function mapFromContactList(notes: Array): UserRelayMap { - return Object.fromEntries( - notes.map(ev => { - if (ev.content !== "" && ev.content !== "{}" && ev.content.startsWith("{") && ev.content.endsWith("}")) { - try { - const relays: Record = JSON.parse(ev.content); - return [ - ev.pubkey, - Object.entries(relays) - .map(([k, v]) => { - return { - url: sanitizeRelayUrl(k), - settings: v, - } as FullRelaySettings; - }) - .filter(a => a.url !== undefined), - ]; - } catch { - // ignored - } + function mapFromContactList(notes: Array): Array { + return notes.map(ev => { + if (ev.content !== "" && ev.content !== "{}" && ev.content.startsWith("{") && ev.content.endsWith("}")) { + try { + const relays: Record = JSON.parse(ev.content); + return { + pubkey: ev.pubkey, + created: ev.created_at, + relays: Object.entries(relays) + .map(([k, v]) => { + return { + url: sanitizeRelayUrl(k), + settings: v, + } as FullRelaySettings; + }) + .filter(a => a.url !== undefined), + }; + } catch { + // ignored } - return [ev.pubkey, []]; - }) - ); + } + return { + pubkey: ev.pubkey, + created: 0, + relays: [], + }; + }); } const relays = useRequestBuilder(PubkeyReplaceableNoteStore, sub); const notesRelays = relays.data?.filter(a => a.kind === EventKind.Relays) ?? []; const notesContactLists = relays.data?.filter(a => a.kind === EventKind.ContactList) ?? []; return useMemo(() => { - return { - ...mapFromContactList(notesContactLists), - ...mapFromRelays(notesRelays), - } as UserRelayMap; + return [...mapFromContactList(notesContactLists), ...mapFromRelays(notesRelays)]; }, [relays]); } diff --git a/packages/app/src/Hooks/useRelaysForFollows.tsx b/packages/app/src/Hooks/useRelaysForFollows.tsx deleted file mode 100644 index bb14fce78..000000000 --- a/packages/app/src/Hooks/useRelaysForFollows.tsx +++ /dev/null @@ -1,65 +0,0 @@ -import { HexKey } from "@snort/nostr"; -import { useMemo } from "react"; -import { FollowsRelays } from "State/Relays"; -import { unwrap } from "Util"; - -export type RelayPicker = ReturnType; - -/** - * Number of relays to pick per pubkey - */ -const PickNRelays = 2; - -export default function useRelaysForFollows(keys: Array) { - return useMemo(() => { - if (keys.length === 0) { - return {}; - } - - const allRelays = keys.map(a => { - return { - key: a, - relays: FollowsRelays.snapshot.get(a), - }; - }); - - const missing = allRelays.filter(a => a.relays === undefined); - const hasRelays = allRelays.filter(a => a.relays !== undefined); - const relayUserMap = hasRelays.reduce((acc, v) => { - for (const r of unwrap(v.relays)) { - if (!acc.has(r.url)) { - acc.set(r.url, new Set([v.key])); - } else { - unwrap(acc.get(r.url)).add(v.key); - } - } - return acc; - }, new Map>()); - const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size); - - // - count keys per relay - // - pick n top relays - // - map keys per relay (for subscription filter) - - const userPickedRelays = keys.map(k => { - // pick top 3 relays for this key - const relaysForKey = topRelays - .filter(([, v]) => v.has(k)) - .slice(0, PickNRelays) - .map(([k]) => k); - return { k, relaysForKey }; - }); - - const pickedRelays = new Set(userPickedRelays.map(a => a.relaysForKey).flat()); - - const picked = Object.fromEntries( - [...pickedRelays].map(a => { - const keysOnPickedRelay = new Set(userPickedRelays.filter(b => b.relaysForKey.includes(a)).map(b => b.k)); - return [a, [...keysOnPickedRelay]]; - }) - ); - picked[""] = missing.map(a => a.key); - console.debug(picked); - return picked; - }, [keys]); -} diff --git a/packages/app/src/State/Relays/index.ts b/packages/app/src/State/Relays/index.ts deleted file mode 100644 index 6918255f0..000000000 --- a/packages/app/src/State/Relays/index.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { FullRelaySettings, HexKey } from "@snort/nostr"; -import { db } from "Db"; -import { unixNowMs, unwrap } from "Util"; - -export class UserRelays { - #store: Map>; - - #snapshot: Readonly>>; - - constructor() { - this.#store = new Map(); - this.#snapshot = Object.freeze(new Map()); - } - - get snapshot() { - return this.#snapshot; - } - - async get(key: HexKey) { - if (!this.#store.has(key) && db.ready) { - const cached = await db.userRelays.get(key); - if (cached) { - this.#store.set(key, cached.relays); - return cached.relays; - } - } - return this.#store.get(key); - } - - async bulkGet(keys: Array) { - const missing = keys.filter(a => !this.#store.has(a)); - if (missing.length > 0 && db.ready) { - const cached = await db.userRelays.bulkGet(missing); - cached.forEach(a => { - if (a) { - this.#store.set(a.pubkey, a.relays); - } - }); - } - return new Map(keys.map(a => [a, this.#store.get(a) ?? []])); - } - - async set(key: HexKey, relays: Array) { - this.#store.set(key, relays); - if (db.ready) { - await db.userRelays.put({ - pubkey: key, - relays, - }); - } - this._update(); - } - - async bulkSet(obj: Record>) { - if (db.ready) { - await db.userRelays.bulkPut( - Object.entries(obj).map(([k, v]) => { - return { - pubkey: k, - relays: v, - }; - }) - ); - } - Object.entries(obj).forEach(([k, v]) => this.#store.set(k, v)); - this._update(); - } - - async preload() { - const start = unixNowMs(); - const keys = await db.userRelays.toCollection().keys(); - const fullCache = await db.userRelays.bulkGet(keys); - this.#store = new Map(fullCache.filter(a => a !== undefined).map(a => [unwrap(a).pubkey, a?.relays ?? []])); - this._update(); - console.debug(`Preloaded ${this.#store.size} users relays in ${(unixNowMs() - start).toLocaleString()} ms`); - } - - private _update() { - this.#snapshot = Object.freeze(new Map(this.#store)); - } -} - -export const FollowsRelays = new UserRelays(); diff --git a/packages/app/src/System/GossipModel.ts b/packages/app/src/System/GossipModel.ts new file mode 100644 index 000000000..ed5895013 --- /dev/null +++ b/packages/app/src/System/GossipModel.ts @@ -0,0 +1,109 @@ +import { RawReqFilter } from "@snort/nostr"; +import { UserRelays } from "Cache/UserRelayCache"; +import { unwrap } from "Util"; + +const PickNRelays = 2; + +export interface RelayTaggedFilter { + relay: string; + filter: RawReqFilter; +} + +export interface RelayTaggedFilters { + relay: string; + filters: Array; +} + +export function splitAllByWriteRelays(filters: Array) { + const allSplit = filters.map(splitByWriteRelays).reduce((acc, v) => { + for (const vn of v) { + const existing = acc.get(vn.relay); + if (existing) { + existing.push(vn.filter); + } else { + acc.set(vn.relay, [vn.filter]); + } + } + return acc; + }, new Map>()); + + return [...allSplit.entries()].map(([k, v]) => { + return { + relay: k, + filters: v, + } as RelayTaggedFilters; + }); +} + +/** + * Split filters by authors + * @param filter + * @returns + */ +export function splitByWriteRelays(filter: RawReqFilter): Array { + if ((filter.authors?.length ?? 0) === 0) + return [ + { + relay: "", + filter, + }, + ]; + + const allRelays = unwrap(filter.authors).map(a => { + return { + key: a, + relays: UserRelays.getFromCache(a)?.relays, + }; + }); + + const missing = allRelays.filter(a => a.relays === undefined); + const hasRelays = allRelays.filter(a => a.relays !== undefined); + const relayUserMap = hasRelays.reduce((acc, v) => { + for (const r of unwrap(v.relays)) { + if (!acc.has(r.url)) { + acc.set(r.url, new Set([v.key])); + } else { + unwrap(acc.get(r.url)).add(v.key); + } + } + return acc; + }, new Map>()); + + // selection algo will just pick relays with the most users + const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size); + + // - count keys per relay + // - pick n top relays + // - map keys per relay (for subscription filter) + + const userPickedRelays = unwrap(filter.authors).map(k => { + // pick top 3 relays for this key + const relaysForKey = topRelays + .filter(([, v]) => v.has(k)) + .slice(0, PickNRelays) + .map(([k]) => k); + return { k, relaysForKey }; + }); + + const pickedRelays = new Set(userPickedRelays.map(a => a.relaysForKey).flat()); + + const picked = [...pickedRelays].map(a => { + const keysOnPickedRelay = new Set(userPickedRelays.filter(b => b.relaysForKey.includes(a)).map(b => b.k)); + return { + relay: a, + filter: { + ...filter, + authors: [...keysOnPickedRelay], + }, + } as RelayTaggedFilter; + }); + picked.push({ + relay: "", + filter: { + ...filter, + authors: missing.map(a => a.key), + }, + }); + console.debug("GOSSIP", picked); + return picked; +} diff --git a/packages/app/src/System/Query.ts b/packages/app/src/System/Query.ts index 9b9f7a3e9..4cf26b6c6 100644 --- a/packages/app/src/System/Query.ts +++ b/packages/app/src/System/Query.ts @@ -151,7 +151,6 @@ export class Query { } cleanup() { - console.debug("Cleanup", this.id); this.#stopCheckTraces(); } diff --git a/packages/app/src/System/index.ts b/packages/app/src/System/index.ts index 42a43aef1..ffeba0739 100644 --- a/packages/app/src/System/index.ts +++ b/packages/app/src/System/index.ts @@ -12,6 +12,7 @@ import { } from "./NoteCollection"; import { diffFilters } from "./RequestSplitter"; import { Query } from "./Query"; +import { splitAllByWriteRelays } from "./GossipModel"; export { NoteStore, @@ -89,7 +90,7 @@ export class NostrSystem { try { const addr = unwrap(sanitizeRelayUrl(address)); if (!this.Sockets.has(addr)) { - const c = new Connection(addr, options, this.HandleAuth); + const c = new Connection(addr, options, this.HandleAuth?.bind(this)); this.Sockets.set(addr, c); c.OnEvent = (s, e) => this.OnEvent(s, e); c.OnEose = s => this.OnEndOfStoredEvents(c, s); @@ -146,7 +147,7 @@ export class NostrSystem { try { const addr = unwrap(sanitizeRelayUrl(address)); if (!this.Sockets.has(addr)) { - const c = new Connection(addr, { read: true, write: false }, this.HandleAuth, true); + const c = new Connection(addr, { read: true, write: false }, this.HandleAuth?.bind(this), true); this.Sockets.set(addr, c); c.OnEvent = (s, e) => this.OnEvent(s, e); c.OnEose = s => this.OnEndOfStoredEvents(c, s); @@ -212,11 +213,15 @@ export class NostrSystem { this.#changed(); return unwrap(q.feed) as Readonly; } else { - const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, filters, q.feed); - q.subQueries.push(subQ); + const splitFilters = splitAllByWriteRelays(filters); + for (const sf of splitFilters) { + const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed); + subQ.relays = sf.relay ? [sf.relay] : []; + q.subQueries.push(subQ); + this.SendQuery(subQ); + } q.filters = filters; q.feed.loading = true; - this.SendQuery(subQ); this.#changed(); return q.feed as Readonly; } @@ -227,7 +232,9 @@ export class NostrSystem { AddQuery(type: { new (): T }, rb: RequestBuilder): T { const store = new type(); - const q = new Query(rb.id, rb.build(), store); + + const filters = rb.build(); + const q = new Query(rb.id, filters, store); if (rb.options?.leaveOpen) { q.leaveOpen = rb.options.leaveOpen; } @@ -236,7 +243,17 @@ export class NostrSystem { } this.Queries.set(rb.id, q); - this.SendQuery(q); + const splitFilters = splitAllByWriteRelays(filters); + if (splitFilters.length > 1) { + for (const sf of splitFilters) { + const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed); + subQ.relays = sf.relay ? [sf.relay] : []; + q.subQueries.push(subQ); + this.SendQuery(subQ); + } + } else { + this.SendQuery(q); + } this.#changed(); return store; } @@ -248,9 +265,27 @@ export class NostrSystem { } } - SendQuery(q: Query) { - for (const [, s] of this.Sockets) { - q.sendToRelay(s); + async SendQuery(q: Query) { + if (q.relays.length > 0) { + for (const r of q.relays) { + const s = this.Sockets.get(r); + if (s) { + q.sendToRelay(s); + } else { + const nc = await this.ConnectEphemeralRelay(r); + if (nc) { + q.sendToRelay(nc); + } else { + console.warn("Failed to connect to new relay for:", r, q); + } + } + } + } else { + for (const [, s] of this.Sockets) { + if (!s.Ephemeral) { + q.sendToRelay(s); + } + } } } @@ -304,7 +339,6 @@ export class NostrSystem { if (v.closingAt && v.closingAt < now) { v.sendClose(); this.Queries.delete(k); - console.debug("Removed:", k); changed = true; } }