diff --git a/packages/app/src/Cache/FollowListCache.ts b/packages/app/src/Cache/FollowListCache.ts index c7be1c5d..34845ee5 100644 --- a/packages/app/src/Cache/FollowListCache.ts +++ b/packages/app/src/Cache/FollowListCache.ts @@ -44,6 +44,6 @@ export class FollowListCache extends RefreshFeedCache { override async preload() { await super.preload(); - this.snapshot().forEach(e => socialGraphInstance.handleEvent(e)); + this.cache.forEach(e => socialGraphInstance.handleEvent(e)); } } diff --git a/packages/app/src/Cache/FollowsFeed.ts b/packages/app/src/Cache/FollowsFeed.ts index 6136a258..09ef6e4b 100644 --- a/packages/app/src/Cache/FollowsFeed.ts +++ b/packages/app/src/Cache/FollowsFeed.ts @@ -43,7 +43,10 @@ export class FollowsFeedCache extends RefreshFeedCache { const filtered = evs.filter(a => this.#kinds.includes(a.kind)); if (filtered.length > 0) { await this.bulkSet(filtered); - this.notifyChange(filtered.map(a => this.key(a))); + this.emit( + "change", + filtered.map(a => this.key(a)), + ); } } @@ -64,7 +67,7 @@ export class FollowsFeedCache extends RefreshFeedCache { const oldest = await this.table?.orderBy("created_at").first(); this.#oldest = oldest?.created_at; - this.notifyChange(latest?.map(a => this.key(a)) ?? []); + this.emit("change", latest?.map(a => this.key(a)) ?? []); debug(this.name)(`Loaded %d/%d in %d ms`, latest?.length ?? 0, keys.length, (unixNowMs() - start).toLocaleString()); } @@ -96,7 +99,7 @@ export class FollowsFeedCache extends RefreshFeedCache { this.onTable.add(k); }); - this.notifyChange(latest?.map(a => this.key(a)) ?? []); + this.emit("change", latest?.map(a => this.key(a)) ?? []); } } diff --git a/packages/app/src/Cache/Notifications.ts b/packages/app/src/Cache/Notifications.ts index 69f2743c..48b82dca 100644 --- a/packages/app/src/Cache/Notifications.ts +++ b/packages/app/src/Cache/Notifications.ts @@ -33,7 +33,10 @@ export class NotificationsCache extends RefreshFeedCache { forSession: pubKey, })), ); - this.notifyChange(filtered.map(v => this.key(v))); + this.emit( + "change", + filtered.map(v => this.key(v)), + ); } } diff --git a/packages/app/src/Cache/RefreshFeedCache.ts b/packages/app/src/Cache/RefreshFeedCache.ts index 6a331ce4..0399f7de 100644 --- a/packages/app/src/Cache/RefreshFeedCache.ts +++ b/packages/app/src/Cache/RefreshFeedCache.ts @@ -24,7 +24,6 @@ export abstract class RefreshFeedCache extends FeedCache> { override async preload(): Promise { await super.preload(); - // load all dms to memory await this.buffer([...this.onTable]); } } diff --git a/packages/app/src/Components/IrisAccount/IrisAccount.tsx b/packages/app/src/Components/IrisAccount/IrisAccount.tsx index 2aebb307..3396d12d 100644 --- a/packages/app/src/Components/IrisAccount/IrisAccount.tsx +++ b/packages/app/src/Components/IrisAccount/IrisAccount.tsx @@ -285,15 +285,20 @@ class IrisAccount extends Component { componentDidMount() { const session = LoginStore.snapshot(); const myPub = session.publicKey; - System.ProfileLoader.Cache.hook(() => { - const profile = System.ProfileLoader.Cache.getFromCache(myPub); - const irisToActive = profile && profile.nip05 && profile.nip05.endsWith("@iris.to"); - this.setState({ profile, irisToActive }); - if (profile && !irisToActive) { - this.checkExistingAccount(myPub); - } - }, myPub); - this.checkExistingAccount(myPub); + if (myPub) { + System.profileLoader.cache.on("change", keys => { + if (keys.includes(myPub)) { + const profile = System.profileLoader.cache.getFromCache(myPub); + const irisToActive = profile && profile.nip05 && profile.nip05.endsWith("@iris.to"); + this.setState({ profile, irisToActive }); + if (profile && !irisToActive) { + this.checkExistingAccount(myPub); + } + } + }); + + this.checkExistingAccount(myPub); + } } async checkExistingAccount(pub: any) { diff --git a/packages/app/src/Components/User/ProfileLink.tsx b/packages/app/src/Components/User/ProfileLink.tsx index 98feee1c..b70a9f60 100644 --- a/packages/app/src/Components/User/ProfileLink.tsx +++ b/packages/app/src/Components/User/ProfileLink.tsx @@ -18,7 +18,7 @@ export function ProfileLink({ children?: ReactNode; } & Omit) { const system = useContext(SnortContext); - const relays = system.RelayCache.getFromCache(pubkey) + const relays = system.relayCache.getFromCache(pubkey) ?.relays?.filter(a => a.settings.write) ?.map(a => a.url); diff --git a/packages/app/src/Feed/ArticlesFeed.ts b/packages/app/src/Feed/ArticlesFeed.ts index 231819f0..fae90ffb 100644 --- a/packages/app/src/Feed/ArticlesFeed.ts +++ b/packages/app/src/Feed/ArticlesFeed.ts @@ -1,4 +1,4 @@ -import { EventKind, NoteCollection, RequestBuilder } from "@snort/system"; +import { EventKind, RequestBuilder } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -15,5 +15,5 @@ export function useArticles() { return rb; }, [follows.timestamp]); - return useRequestBuilder(NoteCollection, sub); + return useRequestBuilder(sub); } diff --git a/packages/app/src/Feed/BadgesFeed.ts b/packages/app/src/Feed/BadgesFeed.ts index 37f8927c..630e83d2 100644 --- a/packages/app/src/Feed/BadgesFeed.ts +++ b/packages/app/src/Feed/BadgesFeed.ts @@ -1,4 +1,4 @@ -import { EventKind, HexKey, NoteCollection, ReplaceableNoteStore, RequestBuilder } from "@snort/system"; +import { EventKind, HexKey, RequestBuilder } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -17,12 +17,12 @@ export default function useProfileBadges(pubkey?: HexKey) { return b; }, [pubkey]); - const profileBadges = useRequestBuilder(ReplaceableNoteStore, sub); + const profileBadges = useRequestBuilder(sub); const profile = useMemo(() => { if (profileBadges.data) { return chunks( - profileBadges.data.tags.filter(t => t[0] === "a" || t[0] === "e"), + profileBadges.data[0].tags.filter(t => t[0] === "a" || t[0] === "e"), 2, ).reduce((acc, [a, e]) => { return { @@ -57,7 +57,7 @@ export default function useProfileBadges(pubkey?: HexKey) { return b; }, [profile, ds]); - const awards = useRequestBuilder(NoteCollection, awardsSub); + const awards = useRequestBuilder(awardsSub); const result = useMemo(() => { if (awards.data) { diff --git a/packages/app/src/Feed/FollowersFeed.ts b/packages/app/src/Feed/FollowersFeed.ts index f2793f5f..6ebd6e99 100644 --- a/packages/app/src/Feed/FollowersFeed.ts +++ b/packages/app/src/Feed/FollowersFeed.ts @@ -1,4 +1,4 @@ -import { EventKind, HexKey, NoteCollection, RequestBuilder, socialGraphInstance } from "@snort/system"; +import { EventKind, HexKey, RequestBuilder, socialGraphInstance } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -10,7 +10,7 @@ export default function useFollowersFeed(pubkey?: HexKey) { return b; }, [pubkey]); - const followersFeed = useRequestBuilder(NoteCollection, sub); + const followersFeed = useRequestBuilder(sub); const followers = useMemo(() => { const contactLists = followersFeed.data?.filter( diff --git a/packages/app/src/Feed/FollowsFeed.ts b/packages/app/src/Feed/FollowsFeed.ts index a80d1902..d5efa388 100644 --- a/packages/app/src/Feed/FollowsFeed.ts +++ b/packages/app/src/Feed/FollowsFeed.ts @@ -1,4 +1,4 @@ -import { EventKind, HexKey, NoteCollection, RequestBuilder, TaggedNostrEvent } from "@snort/system"; +import { EventKind, HexKey, RequestBuilder, TaggedNostrEvent } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -15,7 +15,7 @@ export default function useFollowsFeed(pubkey?: HexKey) { return b; }, [isMe, pubkey]); - const contactFeed = useRequestBuilder(NoteCollection, sub); + const contactFeed = useRequestBuilder(sub); return useMemo(() => { if (isMe) { return follows.item; diff --git a/packages/app/src/Feed/HashtagsFeed.ts b/packages/app/src/Feed/HashtagsFeed.ts index e68cf38d..8e82a181 100644 --- a/packages/app/src/Feed/HashtagsFeed.ts +++ b/packages/app/src/Feed/HashtagsFeed.ts @@ -1,5 +1,5 @@ import { unixNow } from "@snort/shared"; -import { EventKind, NoteCollection, RequestBuilder } from "@snort/system"; +import { EventKind, RequestBuilder } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -18,7 +18,7 @@ export default function useHashtagsFeed() { }, [hashtags]); return { - data: useRequestBuilder(NoteCollection, sub), + data: useRequestBuilder(sub), hashtags, }; } diff --git a/packages/app/src/Feed/LoginFeed.ts b/packages/app/src/Feed/LoginFeed.ts index b1ef3d02..d848d952 100644 --- a/packages/app/src/Feed/LoginFeed.ts +++ b/packages/app/src/Feed/LoginFeed.ts @@ -1,9 +1,9 @@ -import { EventKind, NostrLink, NoteCollection, parseRelayTags, RequestBuilder, TaggedNostrEvent } from "@snort/system"; +import { EventKind, NostrLink, parseRelayTags, RequestBuilder, TaggedNostrEvent } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { usePrevious } from "@uidotdev/usehooks"; import { useEffect, useMemo } from "react"; -import { FollowLists, FollowsFeed, GiftsCache, Notifications, UserRelays } from "@/Cache"; +import { FollowLists, FollowsFeed, GiftsCache, Notifications } from "@/Cache"; import { Nip4Chats, Nip28Chats } from "@/chat"; import { Nip28ChatSystem } from "@/chat/nip28"; import useEventPublisher from "@/Hooks/useEventPublisher"; @@ -96,7 +96,7 @@ export default function useLoginFeed() { return b; }, [login]); - const loginFeed = useRequestBuilder(NoteCollection, subLogin); + const loginFeed = useRequestBuilder(subLogin); // update relays and follow lists useEffect(() => { @@ -219,7 +219,6 @@ export default function useLoginFeed() { }, [loginFeed]); useEffect(() => { - UserRelays.buffer(follows.item).catch(console.error); - system.ProfileLoader.TrackKeys(follows.item); // always track follows profiles + system.profileLoader.TrackKeys(follows.item); // always track follows profiles }, [follows.item]); } diff --git a/packages/app/src/Feed/RelaysFeed.tsx b/packages/app/src/Feed/RelaysFeed.tsx index 493d3b79..c2c717ed 100644 --- a/packages/app/src/Feed/RelaysFeed.tsx +++ b/packages/app/src/Feed/RelaysFeed.tsx @@ -1,4 +1,4 @@ -import { EventKind, HexKey, parseRelayTags, ReplaceableNoteStore, RequestBuilder } from "@snort/system"; +import { EventKind, HexKey, parseRelayTags, RequestBuilder } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -10,6 +10,6 @@ export default function useRelaysFeed(pubkey?: HexKey) { return b; }, [pubkey]); - const relays = useRequestBuilder(ReplaceableNoteStore, sub); - return parseRelayTags(relays.data?.tags.filter(a => a[0] === "r") ?? []); + const relays = useRequestBuilder(sub); + return parseRelayTags(relays.data?.[0].tags.filter(a => a[0] === "r") ?? []); } diff --git a/packages/app/src/Feed/StatusFeed.ts b/packages/app/src/Feed/StatusFeed.ts index ff96180b..4cedfe53 100644 --- a/packages/app/src/Feed/StatusFeed.ts +++ b/packages/app/src/Feed/StatusFeed.ts @@ -1,5 +1,5 @@ import { unixNow } from "@snort/shared"; -import { EventKind, NoteCollection, RequestBuilder } from "@snort/system"; +import { EventKind, RequestBuilder } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -18,7 +18,7 @@ export function useStatusFeed(id?: string, leaveOpen = false) { return rb; }, [id]); - const status = useRequestBuilder(NoteCollection, sub); + const status = useRequestBuilder(sub); const statusFiltered = status.data?.filter(a => { const exp = Number(findTag(a, "expiration")); diff --git a/packages/app/src/Feed/ThreadFeed.ts b/packages/app/src/Feed/ThreadFeed.ts index c32a8af8..aca4e372 100644 --- a/packages/app/src/Feed/ThreadFeed.ts +++ b/packages/app/src/Feed/ThreadFeed.ts @@ -1,4 +1,4 @@ -import { EventExt, EventKind, NostrLink, NoteCollection, RequestBuilder } from "@snort/system"; +import { EventExt, EventKind, NostrLink, RequestBuilder } from "@snort/system"; import { useReactions, useRequestBuilder } from "@snort/system-react"; import { useEffect, useMemo, useState } from "react"; @@ -30,7 +30,7 @@ export default function useThreadFeed(link: NostrLink) { return sub; }, [allEvents.length]); - const store = useRequestBuilder(NoteCollection, sub); + const store = useRequestBuilder(sub); useEffect(() => { if (store.data) { diff --git a/packages/app/src/Feed/TimelineFeed.ts b/packages/app/src/Feed/TimelineFeed.ts index dadcc929..fd38f44b 100644 --- a/packages/app/src/Feed/TimelineFeed.ts +++ b/packages/app/src/Feed/TimelineFeed.ts @@ -1,5 +1,5 @@ import { unixNow } from "@snort/shared"; -import { EventKind, NoteCollection, RequestBuilder } from "@snort/system"; +import { EventKind, RequestBuilder } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useCallback, useMemo } from "react"; @@ -116,7 +116,7 @@ export default function useTimelineFeed(subject: TimelineSubject, options: Timel return rb?.builder ?? null; }, [until, since, options.method, pref, createBuilder]); - const main = useRequestBuilder(NoteCollection, sub); + const main = useRequestBuilder(sub); const subRealtime = useMemo(() => { const rb = createBuilder(); @@ -130,7 +130,7 @@ export default function useTimelineFeed(subject: TimelineSubject, options: Timel return rb?.builder ?? null; }, [pref.autoShowLatest, createBuilder]); - const latest = useRequestBuilder(NoteCollection, subRealtime); + const latest = useRequestBuilder(subRealtime); return { main: main.data, diff --git a/packages/app/src/Feed/ZapsFeed.ts b/packages/app/src/Feed/ZapsFeed.ts index 0e03395f..d6a5062e 100644 --- a/packages/app/src/Feed/ZapsFeed.ts +++ b/packages/app/src/Feed/ZapsFeed.ts @@ -1,4 +1,4 @@ -import { EventKind, NostrLink, NoteCollection, parseZap, RequestBuilder } from "@snort/system"; +import { EventKind, NostrLink, parseZap, RequestBuilder } from "@snort/system"; import { useRequestBuilder } from "@snort/system-react"; import { useMemo } from "react"; @@ -10,7 +10,7 @@ export default function useZapsFeed(link?: NostrLink) { return b; }, [link]); - const zapsFeed = useRequestBuilder(NoteCollection, sub); + const zapsFeed = useRequestBuilder(sub); const zaps = useMemo(() => { if (zapsFeed.data) { diff --git a/packages/app/src/Pages/NetworkGraph.tsx b/packages/app/src/Pages/NetworkGraph.tsx index 20645daa..5e3d7749 100644 --- a/packages/app/src/Pages/NetworkGraph.tsx +++ b/packages/app/src/Pages/NetworkGraph.tsx @@ -146,7 +146,7 @@ const NetworkGraph = () => { const node = { id: UID, address: pubkey, - profile: system.ProfileLoader.Cache.getFromCache(pubkey), + profile: system.profileLoader.cache.getFromCache(pubkey), distance, inboundCount, outboundCount, diff --git a/packages/app/src/Pages/settings/tools/follows-relay-health.tsx b/packages/app/src/Pages/settings/tools/follows-relay-health.tsx index c06cf27a..0ecf8df0 100644 --- a/packages/app/src/Pages/settings/tools/follows-relay-health.tsx +++ b/packages/app/src/Pages/settings/tools/follows-relay-health.tsx @@ -23,7 +23,7 @@ export function FollowsRelayHealth({ const uniqueFollows = dedupe(follows.item); const hasRelays = useMemo(() => { - return uniqueFollows.filter(a => (system.RelayCache.getFromCache(a)?.relays.length ?? 0) > 0); + return uniqueFollows.filter(a => (system.relayCache.getFromCache(a)?.relays.length ?? 0) > 0); }, [uniqueFollows]); const missingRelays = useMemo(() => { @@ -31,7 +31,7 @@ export function FollowsRelayHealth({ }, [hasRelays]); const topWriteRelays = useMemo(() => { - return pickTopRelays(system.RelayCache, uniqueFollows, 1e31, "write"); + return pickTopRelays(system.relayCache, uniqueFollows, 1e31, "write"); }, [uniqueFollows]); return ( diff --git a/packages/app/src/chat/nip24.ts b/packages/app/src/chat/nip24.ts index 454f4967..ac2c7968 100644 --- a/packages/app/src/chat/nip24.ts +++ b/packages/app/src/chat/nip24.ts @@ -12,7 +12,7 @@ export class Nip24ChatSystem extends ExternalStore> implements ChatS constructor(cache: GiftWrapCache) { super(); this.#cache = cache; - this.#cache.hook(() => this.notifyChange(), "*"); + this.#cache.on("change", () => this.notifyChange()); } subscription() { diff --git a/packages/app/src/chat/nip28.ts b/packages/app/src/chat/nip28.ts index d0290547..e7f34259 100644 --- a/packages/app/src/chat/nip28.ts +++ b/packages/app/src/chat/nip28.ts @@ -19,7 +19,7 @@ import { LoginSession } from "@/Utils/Login"; export class Nip28ChatSystem extends ExternalStore> implements ChatSystem { #cache: FeedCache; - #log = debug("NIP-04"); + #log = debug("NIP-28"); readonly ChannelKinds = [ EventKind.PublicChatChannel, EventKind.PublicChatMessage, diff --git a/packages/app/src/system.ts b/packages/app/src/system.ts index 09047a86..52003da3 100644 --- a/packages/app/src/system.ts +++ b/packages/app/src/system.ts @@ -30,21 +30,20 @@ System.on("event", (_, ev) => { socialGraphInstance.handleEvent(ev); }); +System.profileCache.on("change", keys => { + const changed = removeUndefined(keys.map(a => System.profileCache.getFromCache(a))); + changed.forEach(addCachedMetadataToFuzzySearch); +}); + /** * Add profile loader fn */ if (CONFIG.httpCache) { - System.ProfileLoader.loaderFn = async (keys: Array) => { + System.profileLoader.loaderFn = async (keys: Array) => { return removeUndefined(await Promise.all(keys.map(a => fetchProfile(a)))); }; } -setTimeout(() => { - System.UserProfileCache.snapshot().forEach(a => { - addCachedMetadataToFuzzySearch(a); - }); -}, 2000); - export async function fetchProfile(key: string) { try { throwIfOffline(); diff --git a/packages/shared/src/feed-cache.ts b/packages/shared/src/feed-cache.ts index 710f7e8e..764c73a3 100644 --- a/packages/shared/src/feed-cache.ts +++ b/packages/shared/src/feed-cache.ts @@ -1,6 +1,7 @@ import debug from "debug"; -import { removeUndefined, unixNowMs, unwrap } from "./utils"; +import { removeUndefined, unixNowMs } from "./utils"; import { DexieTableLike } from "./dexie-like"; +import EventEmitter from "eventemitter3"; type HookFn = () => void; @@ -9,14 +10,17 @@ export interface KeyedHookFilter { fn: HookFn; } +export interface FeedCacheEvents { + change: (keys: Array) => void; +} + /** * Dexie backed generic hookable store */ -export abstract class FeedCache { - #name: string; - #hooks: Array = []; +export abstract class FeedCache extends EventEmitter { + readonly name: string; #snapshot: Array = []; - #changed = true; + #log: ReturnType; #hits = 0; #miss = 0; protected table?: DexieTableLike; @@ -24,21 +28,22 @@ export abstract class FeedCache { protected cache: Map = new Map(); constructor(name: string, table?: DexieTableLike) { - this.#name = name; + super(); + this.name = name; this.table = table; + this.#log = debug(name); setInterval(() => { - debug(this.#name)( + this.#log( "%d loaded, %d on-disk, %d hooks, %d% hit", this.cache.size, this.onTable.size, - this.#hooks.length, + this.listenerCount("change"), ((this.#hits / (this.#hits + this.#miss)) * 100).toFixed(1), ); }, 30_000); - } - - get name() { - return this.#name; + this.on("change", () => { + this.#snapshot = this.takeSnapshot(); + }); } async preload() { @@ -49,28 +54,25 @@ export abstract class FeedCache { } } - keysOnTable() { - return [...this.onTable]; - } - hook(fn: HookFn, key: string | undefined) { - if (!key) { - return () => { - //noop + if (key) { + const handle = (keys: Array) => { + if (keys.includes(key)) { + fn(); + } }; + this.on("change", handle); + return () => this.off("change", handle); } - this.#hooks.push({ - key, - fn, - }); return () => { - const idx = this.#hooks.findIndex(a => a.fn === fn); - if (idx >= 0) { - this.#hooks.splice(idx, 1); - } + // noop }; } + + keysOnTable() { + return [...this.onTable]; + } getFromCache(key?: string) { if (key) { @@ -89,7 +91,7 @@ export abstract class FeedCache { const cached = await this.table.get(key); if (cached) { this.cache.set(this.key(cached), cached); - this.notifyChange([key]); + this.emit("change", [key]); return cached; } } @@ -120,7 +122,7 @@ export abstract class FeedCache { console.error(e); } } - this.notifyChange([k]); + this.emit("change", [k]); } async bulkSet(obj: Array | Readonly>) { @@ -133,7 +135,10 @@ export abstract class FeedCache { } } obj.forEach(v => this.cache.set(this.key(v), v)); - this.notifyChange(obj.map(a => this.key(a))); + this.emit( + "change", + obj.map(a => this.key(a)), + ); } /** @@ -156,7 +161,7 @@ export abstract class FeedCache { } return "no_change"; })(); - debug(this.#name)("Updating %s %s %o", k, updateType, m); + this.#log("Updating %s %s %o", k, updateType, m); if (updateType !== "no_change") { const updated = { ...existing, @@ -184,8 +189,11 @@ export abstract class FeedCache { fromCache.forEach(a => { this.cache.set(this.key(a), a); }); - this.notifyChange(fromCache.map(a => this.key(a))); - debug(this.#name)(`Loaded %d/%d in %d ms`, fromCache.length, keys.length, (unixNowMs() - start).toLocaleString()); + this.emit( + "change", + fromCache.map(a => this.key(a)), + ); + this.#log(`Loaded %d/%d in %d ms`, fromCache.length, keys.length, (unixNowMs() - start).toLocaleString()); return mapped.filter(a => !a.has).map(a => a.key); } @@ -197,23 +205,12 @@ export abstract class FeedCache { await this.table?.clear(); this.cache.clear(); this.onTable.clear(); - this.#changed = true; - this.#hooks.forEach(h => h.fn()); } snapshot() { - if (this.#changed) { - this.#snapshot = this.takeSnapshot(); - this.#changed = false; - } return this.#snapshot; } - protected notifyChange(keys: Array) { - this.#changed = true; - this.#hooks.filter(a => keys.includes(a.key) || a.key === "*").forEach(h => h.fn()); - } - abstract key(of: TCached): string; abstract takeSnapshot(): Array; } diff --git a/packages/system-react/src/useEventFeed.ts b/packages/system-react/src/useEventFeed.ts index 14d1b450..5e03aa7b 100644 --- a/packages/system-react/src/useEventFeed.ts +++ b/packages/system-react/src/useEventFeed.ts @@ -1,5 +1,5 @@ import { useMemo } from "react"; -import { RequestBuilder, ReplaceableNoteStore, NostrLink, NoteCollection } from "@snort/system"; +import { RequestBuilder, NostrLink } from "@snort/system"; import { useRequestBuilder } from "./useRequestBuilder"; export function useEventFeed(link: NostrLink) { @@ -9,7 +9,7 @@ export function useEventFeed(link: NostrLink) { return b; }, [link]); - return useRequestBuilder(ReplaceableNoteStore, sub); + return useRequestBuilder(sub); } export function useEventsFeed(id: string, links: Array) { @@ -19,5 +19,5 @@ export function useEventsFeed(id: string, links: Array) { return b; }, [id, links]); - return useRequestBuilder(NoteCollection, sub); + return useRequestBuilder(sub); } diff --git a/packages/system-react/src/useReactions.ts b/packages/system-react/src/useReactions.ts index 3de1a96b..3b2b583e 100644 --- a/packages/system-react/src/useReactions.ts +++ b/packages/system-react/src/useReactions.ts @@ -30,5 +30,5 @@ export function useReactions( return rb.numFilters > 0 ? rb : null; }, [ids]); - return useRequestBuilder(NoteCollection, sub); + return useRequestBuilder(sub); } diff --git a/packages/system-react/src/useRequestBuilder.tsx b/packages/system-react/src/useRequestBuilder.tsx index 0d685708..11bfa16c 100644 --- a/packages/system-react/src/useRequestBuilder.tsx +++ b/packages/system-react/src/useRequestBuilder.tsx @@ -6,14 +6,13 @@ import { SnortContext } from "./context"; /** * Send a query to the relays and wait for data */ -const useRequestBuilder = >( - type: { new (): TStore }, +const useRequestBuilder = ( rb: RequestBuilder | null, ) => { const system = useContext(SnortContext); const subscribe = (onChanged: () => void) => { if (rb) { - const q = system.Query(type, rb); + const q = system.Query(rb); q.on("event", onChanged); q.uncancel(); return () => { @@ -25,14 +24,14 @@ const useRequestBuilder = => { + const getState = () => { const q = system.GetQuery(rb?.id ?? ""); if (q) { - return q.snapshot as StoreSnapshot; + return q.snapshot; } - return EmptySnapshot as StoreSnapshot; + return EmptySnapshot; }; - return useSyncExternalStore>( + return useSyncExternalStore( v => subscribe(v), () => getState(), ); diff --git a/packages/system-react/src/useUserProfile.ts b/packages/system-react/src/useUserProfile.ts index 4410e6e6..1bd894ef 100644 --- a/packages/system-react/src/useUserProfile.ts +++ b/packages/system-react/src/useUserProfile.ts @@ -10,16 +10,23 @@ export function useUserProfile(pubKey?: HexKey): CachedMetadata | undefined { return useSyncExternalStore( h => { if (pubKey) { - system.ProfileLoader.TrackKeys(pubKey); + const handler = (keys: Array) => { + if (keys.includes(pubKey)) { + h(); + } + }; + system.profileLoader.cache.on("change", handler); + system.profileLoader.TrackKeys(pubKey); + + return () => { + system.profileLoader.cache.off("change", handler); + system.profileLoader.UntrackKeys(pubKey); + }; } - const release = system.ProfileLoader.Cache.hook(h, pubKey); return () => { - release(); - if (pubKey) { - system.ProfileLoader.UntrackKeys(pubKey); - } + // noop }; }, - () => system.ProfileLoader.Cache.getFromCache(pubKey), + () => system.profileLoader.cache.getFromCache(pubKey), ); } diff --git a/packages/system-react/src/useUserSearch.tsx b/packages/system-react/src/useUserSearch.tsx index e8c3d2c6..c0be4edb 100644 --- a/packages/system-react/src/useUserSearch.tsx +++ b/packages/system-react/src/useUserSearch.tsx @@ -5,7 +5,7 @@ import { SnortContext } from "./context"; export function useUserSearch() { const system = useContext(SnortContext); - const cache = system.ProfileLoader.Cache as UserProfileCache; + const cache = system.profileLoader.cache as UserProfileCache; async function search(input: string): Promise> { // try exact match first diff --git a/packages/system/src/background-loader.ts b/packages/system/src/background-loader.ts index f48be16c..5ab617c1 100644 --- a/packages/system/src/background-loader.ts +++ b/packages/system/src/background-loader.ts @@ -4,7 +4,7 @@ import { SystemInterface, TaggedNostrEvent, RequestBuilder } from "."; export abstract class BackgroundLoader { #system: SystemInterface; - #cache: FeedCache; + readonly cache: FeedCache; #log = debug(this.name()); /** @@ -19,14 +19,10 @@ export abstract class BackgroundLoader) { this.#system = system; - this.#cache = cache; + this.cache = cache; this.#FetchMetadata(); } - get Cache() { - return this.#cache; - } - /** * Name of this loader service */ @@ -74,38 +70,40 @@ export abstract class BackgroundLoader((resolve, reject) => { this.TrackKeys(key); - const release = this.Cache.hook(() => { - const existing = this.Cache.getFromCache(key); - if (existing) { - resolve(existing); - release(); - this.UntrackKeys(key); + this.cache.on("change", keys => { + if (keys.includes(key)) { + const existing = this.cache.getFromCache(key); + if (existing) { + resolve(existing); + this.UntrackKeys(key); + this.cache.off("change"); + } } - }, key); + }); }); } } async #FetchMetadata() { const loading = [...this.#wantsKeys]; - await this.#cache.buffer(loading); + await this.cache.buffer(loading); - const missing = loading.filter(a => (this.#cache.getFromCache(a)?.loaded ?? 0) < this.getExpireCutoff()); + const missing = loading.filter(a => (this.cache.getFromCache(a)?.loaded ?? 0) < this.getExpireCutoff()); if (missing.length > 0) { this.#log("Fetching keys: %O", missing); try { const found = await this.#loadData(missing); const noResult = removeUndefined( - missing.filter(a => !found.some(b => a === this.#cache.key(b))).map(a => this.makePlaceholder(a)), + missing.filter(a => !found.some(b => a === this.cache.key(b))).map(a => this.makePlaceholder(a)), ); if (noResult.length > 0) { - await Promise.all(noResult.map(a => this.#cache.update(a))); + await Promise.all(noResult.map(a => this.cache.update(a))); } } catch (e) { this.#log("Error: %O", e); @@ -119,14 +117,14 @@ export abstract class BackgroundLoader) { if (this.loaderFn) { const results = await this.loaderFn(missing); - await Promise.all(results.map(a => this.#cache.update(a))); + await Promise.all(results.map(a => this.cache.update(a))); return results; } else { const v = await this.#system.Fetch(this.buildSub(missing), async e => { for (const pe of e) { const m = this.onEvent(pe); if (m) { - await this.#cache.update(m); + await this.cache.update(m); } } }); diff --git a/packages/system/src/index.ts b/packages/system/src/index.ts index 794f40d9..e0730ebf 100644 --- a/packages/system/src/index.ts +++ b/packages/system/src/index.ts @@ -1,11 +1,13 @@ import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; import { RequestBuilder } from "./request-builder"; -import { NoteStore, NoteStoreSnapshotData, StoreSnapshot } from "./note-collection"; +import { NoteCollection, NoteStore, NoteStoreSnapshotData, StoreSnapshot } from "./note-collection"; import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr"; import { ProfileLoaderService } from "./profile-cache"; -import { RelayCache } from "./outbox-model"; +import { RelayCache, RelayMetadataLoader } from "./outbox-model"; import { Optimizer } from "./query-optimizer"; import { base64 } from "@scure/base"; +import { FeedCache } from "@snort/shared"; +import { ConnectionPool } from "nostr-connection-pool"; export { NostrSystem } from "./nostr-system"; export { default as EventKind } from "./event-kind"; @@ -49,7 +51,7 @@ export interface QueryLike { off: (event: "event", fn?: (evs: Array) => void) => void; cancel: () => void; uncancel: () => void; - get snapshot(): StoreSnapshot; + get snapshot(): StoreSnapshot>; } export interface SystemInterface { @@ -76,10 +78,9 @@ export interface SystemInterface { /** * Open a new query to relays - * @param type Store type * @param req Request to send to relays */ - Query(type: { new (): T }, req: RequestBuilder): QueryLike; + Query(req: RequestBuilder): QueryLike; /** * Fetch data from nostr relays asynchronously @@ -123,17 +124,32 @@ export interface SystemInterface { /** * Profile cache/loader */ - get ProfileLoader(): ProfileLoaderService; + get profileLoader(): ProfileLoaderService; /** * Relay cache for "Gossip" model */ - get RelayCache(): RelayCache; + get relayCache(): RelayCache; /** * Query optimizer */ - get Optimizer(): Optimizer; + get optimizer(): Optimizer; + + /** + * Generic cache store for events + */ + get eventsCache(): FeedCache; + + /** + * Relay loader loads relay metadata for a set of profiles + */ + get relayLoader(): RelayMetadataLoader; + + /** + * Main connection pool + */ + get pool(): ConnectionPool; } export interface SystemSnapshot { diff --git a/packages/system/src/nostr-connection-pool.ts b/packages/system/src/nostr-connection-pool.ts index 2732ae1b..2679643f 100644 --- a/packages/system/src/nostr-connection-pool.ts +++ b/packages/system/src/nostr-connection-pool.ts @@ -24,7 +24,8 @@ export type ConnectionPool = { disconnect(address: string): void; broadcast(system: SystemInterface, ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise; broadcastTo(address: string, ev: NostrEvent): Promise; -} & EventEmitter; +} & EventEmitter & + Iterable<[string, Connection]>; /** * Simple connection pool containing connections to multiple nostr relays diff --git a/packages/system/src/nostr-query-manager.ts b/packages/system/src/nostr-query-manager.ts index c32d4fb8..9647b8b3 100644 --- a/packages/system/src/nostr-query-manager.ts +++ b/packages/system/src/nostr-query-manager.ts @@ -1,15 +1,19 @@ import debug from "debug"; import EventEmitter from "eventemitter3"; -import { BuiltRawReqFilter, NoteCollection, NoteStore, RequestBuilder, SystemInterface, TaggedNostrEvent } from "."; +import { BuiltRawReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from "."; import { Query, TraceReport } from "./query"; import { unwrap } from "@snort/shared"; +import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer"; +import { trimFilters } from "./request-trim"; interface NostrQueryManagerEvents { change: () => void; trace: (report: TraceReport) => void; - sendQuery: (q: Query, filter: BuiltRawReqFilter) => void; } +/** + * Query manager handles sending requests to the nostr network + */ export class NostrQueryManager extends EventEmitter { #log = debug("NostrQueryManager"); @@ -23,9 +27,15 @@ export class NostrQueryManager extends EventEmitter { */ #system: SystemInterface; + /** + * Query cache processing layers which can take data from a cache + */ + #queryCacheLayers: Array = []; + constructor(system: SystemInterface) { super(); this.#system = system; + this.#queryCacheLayers.push(new IdsFilterCacheLayer(system.eventsCache)); setInterval(() => this.#cleanup(), 1_000); } @@ -37,34 +47,21 @@ export class NostrQueryManager extends EventEmitter { /** * Compute query to send to relays */ - query(type: { new (): T }, req: RequestBuilder): Query { + query(req: RequestBuilder): Query { const existing = this.#queries.get(req.id); if (existing) { - // if same instance, just return query - if (existing.fromInstance === req.instance) { - return existing; - } - const filters = !req.options?.skipDiff ? req.buildDiff(this.#system, existing.filters) : req.build(this.#system); - if (filters.length === 0 && !!req.options?.skipDiff) { - return existing; - } else { - for (const subQ of filters) { - this.emit("sendQuery", existing, subQ); - } + if (existing.addRequest(req)) { this.emit("change"); - return existing; } + return existing; } else { - const store = new type(); - - const filters = req.build(this.#system); - const q = new Query(req.id, req.instance, store, req.options?.leaveOpen, req.options?.timeout); + const q = new Query(this.#system, req); q.on("trace", r => this.emit("trace", r)); + q.on("filters", fx => { + this.#send(q, fx); + }); this.#queries.set(req.id, q); - for (const subQ of filters) { - this.emit("sendQuery", q, subQ); - } this.emit("change"); return q; } @@ -74,10 +71,8 @@ export class NostrQueryManager extends EventEmitter { * Async fetch results */ fetch(req: RequestBuilder, cb?: (evs: ReadonlyArray) => void) { - const q = this.query(NoteCollection, req); + const q = this.query(req); return new Promise>(resolve => { - let t: ReturnType | undefined; - let tBuf: Array = []; if (cb) { q.feed.on("event", cb); } @@ -85,7 +80,7 @@ export class NostrQueryManager extends EventEmitter { if (!loading) { q.feed.off("event"); q.cancel(); - resolve(unwrap((q.feed as NoteCollection).snapshot.data)); + resolve(unwrap(q.snapshot.data)); } }); }); @@ -97,6 +92,56 @@ export class NostrQueryManager extends EventEmitter { } } + async #send(q: Query, qSend: BuiltRawReqFilter) { + for (const qfl of this.#queryCacheLayers) { + qSend = await qfl.processFilter(q, qSend); + } + for (const f of qSend.filters) { + if (f.authors) { + this.#system.relayLoader.TrackKeys(f.authors); + } + } + + // check for empty filters + const fNew = trimFilters(qSend.filters); + if (fNew.length === 0) { + return; + } + qSend.filters = fNew; + + if (qSend.relay) { + this.#log("Sending query to %s %O", qSend.relay, qSend); + const s = this.#system.pool.getConnection(qSend.relay); + if (s) { + const qt = q.sendToRelay(s, qSend); + if (qt) { + return [qt]; + } + } else { + const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true); + if (nc) { + const qt = q.sendToRelay(nc, qSend); + if (qt) { + return [qt]; + } + } else { + console.warn("Failed to connect to new relay for:", qSend.relay, q); + } + } + } else { + const ret = []; + for (const [a, s] of this.#system.pool) { + if (!s.Ephemeral) { + this.#log("Sending query to %s %O", a, qSend); + const qt = q.sendToRelay(s, qSend); + if (qt) { + ret.push(qt); + } + } + } + return ret; + } + } #cleanup() { let changed = false; for (const [k, v] of this.#queries) { diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts index 2734c6b6..726d1524 100644 --- a/packages/system/src/nostr-system.ts +++ b/packages/system/src/nostr-system.ts @@ -4,9 +4,7 @@ import EventEmitter from "eventemitter3"; import { FeedCache } from "@snort/shared"; import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr"; import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection"; -import { Query } from "./query"; -import { NoteStore } from "./note-collection"; -import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; +import { RequestBuilder } from "./request-builder"; import { RelayMetricHandler } from "./relay-metric-handler"; import { CachedMetadata, @@ -23,12 +21,10 @@ import { QueryLike, } from "."; import { EventsCache } from "./cache/events"; -import { RelayCache, RelayMetadataLoader } from "./outbox-model"; +import { RelayMetadataLoader } from "./outbox-model"; import { Optimizer, DefaultOptimizer } from "./query-optimizer"; -import { trimFilters } from "./request-trim"; import { NostrConnectionPool } from "./nostr-connection-pool"; import { NostrQueryManager } from "./nostr-query-manager"; -import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer"; export interface NostrSystemEvents { change: (state: SystemSnapshot) => void; @@ -52,77 +48,67 @@ export interface NostrsystemProps { */ export class NostrSystem extends EventEmitter implements SystemInterface { #log = debug("System"); - #pool = new NostrConnectionPool(); #queryManager: NostrQueryManager; /** * Storage class for user relay lists */ - #relayCache: FeedCache; + readonly relayCache: FeedCache; /** * Storage class for user profiles */ - #profileCache: FeedCache; + readonly profileCache: FeedCache; /** * Storage class for relay metrics (connects/disconnects) */ - #relayMetricsCache: FeedCache; + readonly relayMetricsCache: FeedCache; /** * Profile loading service */ - #profileLoader: ProfileLoaderService; + readonly profileLoader: ProfileLoaderService; /** * Relay metrics handler cache */ - #relayMetrics: RelayMetricHandler; - - /** - * General events cache - */ - #eventsCache: FeedCache; + readonly relayMetricsHandler: RelayMetricHandler; /** * Optimizer instance, contains optimized functions for processing data */ - #optimizer: Optimizer; + readonly optimizer: Optimizer; + + readonly pool = new NostrConnectionPool(); + readonly eventsCache: FeedCache; + readonly relayLoader: RelayMetadataLoader; /** * Check event signatures (reccomended) */ checkSigs: boolean; - #relayLoader: RelayMetadataLoader; - - /** - * Query cache processing layers which can take data from a cache - */ - #queryCacheLayers: Array = []; - constructor(props: NostrsystemProps) { super(); - this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); - this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); - this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); - this.#eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); - this.#optimizer = props.optimizer ?? DefaultOptimizer; + this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); + this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); + this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); + this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); + this.optimizer = props.optimizer ?? DefaultOptimizer; - this.#profileLoader = new ProfileLoaderService(this, this.#profileCache); - this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache); - this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache); + this.profileLoader = new ProfileLoaderService(this, this.profileCache); + this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache); + this.relayLoader = new RelayMetadataLoader(this, this.relayCache); this.checkSigs = props.checkSigs ?? true; this.#queryManager = new NostrQueryManager(this); - this.#queryCacheLayers.push(new IdsFilterCacheLayer(this.#eventsCache)); // hook connection pool - this.#pool.on("connected", (id, wasReconnect) => { - const c = this.#pool.getConnection(id); + this.pool.on("connected", (id, wasReconnect) => { + const c = this.pool.getConnection(id); if (c) { - this.#relayMetrics.onConnect(c.Address); + this.relayMetricsHandler.onConnect(c.Address); if (wasReconnect) { for (const [, q] of this.#queryManager) { q.connectionRestored(c); @@ -130,18 +116,18 @@ export class NostrSystem extends EventEmitter implements Syst } } }); - this.#pool.on("connectFailed", address => { - this.#relayMetrics.onDisconnect(address, 0); + this.pool.on("connectFailed", address => { + this.relayMetricsHandler.onDisconnect(address, 0); }); - this.#pool.on("event", (_, sub, ev) => { - ev.relays?.length && this.#relayMetrics.onEvent(ev.relays[0]); + this.pool.on("event", (_, sub, ev) => { + ev.relays?.length && this.relayMetricsHandler.onEvent(ev.relays[0]); if (!EventExt.isValid(ev)) { this.#log("Rejecting invalid event %O", ev); return; } if (this.checkSigs) { - if (!this.#optimizer.schnorrVerify(ev)) { + if (!this.optimizer.schnorrVerify(ev)) { this.#log("Invalid sig %O", ev); return; } @@ -149,84 +135,68 @@ export class NostrSystem extends EventEmitter implements Syst this.emit("event", sub, ev); }); - this.#pool.on("disconnect", (id, code) => { - const c = this.#pool.getConnection(id); + this.pool.on("disconnect", (id, code) => { + const c = this.pool.getConnection(id); if (c) { - this.#relayMetrics.onDisconnect(c.Address, code); + this.relayMetricsHandler.onDisconnect(c.Address, code); for (const [, q] of this.#queryManager) { q.connectionLost(c.Id); } } }); - this.#pool.on("eose", (id, sub) => { - const c = this.#pool.getConnection(id); + this.pool.on("eose", (id, sub) => { + const c = this.pool.getConnection(id); if (c) { for (const [, v] of this.#queryManager) { v.eose(sub, c); } } }); - this.#pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb)); - this.#pool.on("notice", (addr, msg) => { + this.pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb)); + this.pool.on("notice", (addr, msg) => { this.#log("NOTICE: %s %s", addr, msg); }); this.#queryManager.on("change", () => this.emit("change", this.takeSnapshot())); - this.#queryManager.on("sendQuery", (q, f) => this.#sendQuery(q, f)); this.#queryManager.on("trace", t => { - this.#relayMetrics.onTraceReport(t); + this.relayMetricsHandler.onTraceReport(t); }); // internal handler for on-event this.on("event", (sub, ev) => { for (const [, v] of this.#queryManager) { const trace = v.handleEvent(sub, ev); + // inject events to cache if query by id if (trace && trace.filters.some(a => a.ids)) { - this.#eventsCache.set(ev); + this.eventsCache.set(ev); } } }); } - get ProfileLoader() { - return this.#profileLoader; - } - get Sockets(): ConnectionStateSnapshot[] { - return this.#pool.getState(); - } - - get RelayCache(): RelayCache { - return this.#relayCache; - } - - get UserProfileCache(): FeedCache { - return this.#profileCache; - } - - get Optimizer(): Optimizer { - return this.#optimizer; + return this.pool.getState(); } async Init() { const t = [ - this.#relayCache.preload(), - this.#profileCache.preload(), - this.#relayMetricsCache.preload(), - this.#eventsCache.preload(), + this.relayCache.preload(), + this.profileCache.preload(), + this.relayMetricsCache.preload(), + this.eventsCache.preload(), ]; await Promise.all(t); } async ConnectToRelay(address: string, options: RelaySettings) { - await this.#pool.connect(address, options, false); + await this.pool.connect(address, options, false); } ConnectEphemeralRelay(address: string) { - return this.#pool.connect(address, { read: true, write: true }, true); + return this.pool.connect(address, { read: true, write: true }, true); } DisconnectRelay(address: string) { - this.#pool.disconnect(address); + this.pool.disconnect(address); } GetQuery(id: string): QueryLike | undefined { @@ -237,60 +207,8 @@ export class NostrSystem extends EventEmitter implements Syst return this.#queryManager.fetch(req, cb); } - Query(type: { new (): T }, req: RequestBuilder): QueryLike { - return this.#queryManager.query(type, req) as QueryLike; - } - - async #sendQuery(q: Query, qSend: BuiltRawReqFilter) { - for (const qfl of this.#queryCacheLayers) { - qSend = await qfl.processFilter(q, qSend); - } - for (const f of qSend.filters) { - if (f.authors) { - this.#relayLoader.TrackKeys(f.authors); - } - } - - // check for empty filters - const fNew = trimFilters(qSend.filters); - if (fNew.length === 0) { - return; - } - qSend.filters = fNew; - - if (qSend.relay) { - this.#log("Sending query to %s %O", qSend.relay, qSend); - const s = this.#pool.getConnection(qSend.relay); - if (s) { - const qt = q.sendToRelay(s, qSend); - if (qt) { - return [qt]; - } - } else { - const nc = await this.ConnectEphemeralRelay(qSend.relay); - if (nc) { - const qt = q.sendToRelay(nc, qSend); - if (qt) { - return [qt]; - } - } else { - console.warn("Failed to connect to new relay for:", qSend.relay, q); - } - } - } else { - const ret = []; - for (const [a, s] of this.#pool) { - if (!s.Ephemeral) { - this.#log("Sending query to %s %O", a, qSend); - const qt = q.sendToRelay(s, qSend); - if (qt) { - ret.push(qt); - } - } - } - return ret; - } - return []; + Query(req: RequestBuilder): QueryLike { + return this.#queryManager.query(req) as QueryLike; } HandleEvent(ev: TaggedNostrEvent) { @@ -299,11 +217,11 @@ export class NostrSystem extends EventEmitter implements Syst async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise { this.HandleEvent({ ...ev, relays: [] }); - return await this.#pool.broadcast(this, ev, cb); + return await this.pool.broadcast(this, ev, cb); } async WriteOnceToRelay(address: string, ev: NostrEvent): Promise { - return await this.#pool.broadcastTo(address, ev); + return await this.pool.broadcastTo(address, ev); } takeSnapshot(): SystemSnapshot { diff --git a/packages/system/src/note-collection.ts b/packages/system/src/note-collection.ts index f296b67c..f24b5261 100644 --- a/packages/system/src/note-collection.ts +++ b/packages/system/src/note-collection.ts @@ -3,7 +3,7 @@ import { EventExt, EventType, TaggedNostrEvent, u256 } from "."; import { findTag } from "./utils"; import EventEmitter from "eventemitter3"; -export interface StoreSnapshot { +export interface StoreSnapshot { data: TSnapshot | undefined; clear: () => void; loading: () => boolean; @@ -19,7 +19,7 @@ export const EmptySnapshot = { add: () => { // empty }, -} as StoreSnapshot; +} as StoreSnapshot>; export type NoteStoreSnapshotData = Array | TaggedNostrEvent; export type NoteStoreHook = () => void; diff --git a/packages/system/src/outbox-model.ts b/packages/system/src/outbox-model.ts index b907c2d1..b4fc883c 100644 --- a/packages/system/src/outbox-model.ts +++ b/packages/system/src/outbox-model.ts @@ -205,7 +205,7 @@ export function pickTopRelays(cache: RelayCache, authors: Array, n: numb export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface, pickN?: number) { const recipients = dedupe(ev.tags.filter(a => a[0] === "p").map(a => a[1])); await updateRelayLists(recipients, system); - const relays = pickTopRelays(system.RelayCache, recipients, pickN ?? DefaultPickNRelays, "read"); + const relays = pickTopRelays(system.relayCache, recipients, pickN ?? DefaultPickNRelays, "read"); const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat())); logger("Picked %O from authors %O", ret, recipients); return ret; @@ -247,15 +247,15 @@ export function parseRelaysFromKind(ev: NostrEvent) { } export async function updateRelayLists(authors: Array, system: SystemInterface) { - await system.RelayCache.buffer(authors); + await system.relayCache.buffer(authors); const expire = unixNowMs() - RelayListCacheExpire; - const expired = authors.filter(a => (system.RelayCache.getFromCache(a)?.loaded ?? 0) < expire); + const expired = authors.filter(a => (system.relayCache.getFromCache(a)?.loaded ?? 0) < expire); if (expired.length > 0) { logger("Updating relays for authors: %O", expired); const rb = new RequestBuilder("system-update-relays-for-outbox"); rb.withFilter().authors(expired).kinds([EventKind.Relays, EventKind.ContactList]); const relayLists = await system.Fetch(rb); - await system.RelayCache.bulkSet( + await system.relayCache.bulkSet( removeUndefined( relayLists.map(a => { const relays = parseRelaysFromKind(a); diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts index b19ac528..6a8451dc 100644 --- a/packages/system/src/query.ts +++ b/packages/system/src/query.ts @@ -3,9 +3,9 @@ import debug from "debug"; import EventEmitter from "eventemitter3"; import { unixNowMs, unwrap } from "@snort/shared"; -import { Connection, ReqFilter, Nips, TaggedNostrEvent } from "."; -import { NoteStore } from "./note-collection"; -import { BuiltRawReqFilter } from "./request-builder"; +import { Connection, ReqFilter, Nips, TaggedNostrEvent, SystemInterface } from "."; +import { NoteCollection, NoteStore } from "./note-collection"; +import { BuiltRawReqFilter, RequestBuilder } from "./request-builder"; import { eventMatchesFilter } from "./request-matcher"; interface QueryTraceEvents { @@ -89,23 +89,6 @@ export class QueryTrace extends EventEmitter { } } -export interface QueryBase { - /** - * Uniquie ID of this query - */ - id: string; - - /** - * The query payload (REQ filters) - */ - filters: Array; - - /** - * List of relays to send this query to - */ - relays?: Array; -} - export interface TraceReport { id: string; conn: Connection; @@ -116,22 +99,28 @@ export interface TraceReport { interface QueryEvents { trace: (report: TraceReport) => void; + filters: (req: BuiltRawReqFilter) => void; event: (evs: ReadonlyArray) => void; } /** * Active or queued query on the system */ -export class Query extends EventEmitter implements QueryBase { +export class Query extends EventEmitter { /** - * Uniquie ID of this query + * Unique id of this query */ - id: string; + readonly id: string; /** * RequestBuilder instance */ - fromInstance: string; + requests: Array = []; + + /** + * Nostr system interface + */ + #system: SystemInterface; /** * Which relays this query has already been executed on @@ -156,27 +145,66 @@ export class Query extends EventEmitter implements QueryBase { /** * Feed object which collects events */ - #feed: NoteStore; + #feed: NoteCollection; /** * Maximum waiting time for this query */ #timeout: number; + /** + * Milliseconds to wait before sending query (debounce) + */ + #groupingDelay?: number; + + /** + * Timer which waits for no-change before emitting filters + */ + #groupTimeout?: ReturnType; + #log = debug("Query"); - constructor(id: string, instance: string, feed: NoteStore, leaveOpen?: boolean, timeout?: number) { + constructor(system: SystemInterface, req: RequestBuilder) { super(); - this.id = id; - this.#feed = feed; - this.fromInstance = instance; - this.#leaveOpen = leaveOpen ?? false; - this.#timeout = timeout ?? 5_000; + this.id = uuid(); + this.requests.push(req); + this.#system = system; + this.#feed = new NoteCollection(); + this.#leaveOpen = req.options?.leaveOpen ?? false; + this.#timeout = req.options?.timeout ?? 5_000; + this.#groupingDelay = req.options?.groupingDelay ?? 100; this.#checkTraces(); this.feed.on("event", evs => this.emit("event", evs)); } + /** + * Adds another request to this one + */ + addRequest(req: RequestBuilder) { + if (this.#groupTimeout) { + clearTimeout(this.#groupTimeout); + this.#groupTimeout = undefined; + } + if (this.requests.some(a => a.instance === req.instance)) { + // already exists, nothing to add + return false; + } + if (this.requests.some(a => a.options?.skipDiff !== req.options?.skipDiff)) { + throw new Error("Mixing skipDiff option is not supported"); + } + this.requests.push(req); + + if (this.#groupingDelay) { + this.#groupTimeout = setTimeout(() => { + this.#emitFilters(); + }, this.#groupingDelay); + } else { + this.#emitFilters(); + } + return true; + } + isOpen() { return this.#cancelAt === undefined && this.#leaveOpen; } @@ -232,7 +260,7 @@ export class Query extends EventEmitter implements QueryBase { /** * Insert a new trace as a placeholder */ - insertCompletedTrace(subq: BuiltRawReqFilter, data: Readonly>) { + insertCompletedTrace(subq: BuiltRawReqFilter, data: Array) { const qt = new QueryTrace(subq.relay, subq.filters, ""); qt.sentToRelay(); qt.gotEose(); @@ -288,6 +316,24 @@ export class Query extends EventEmitter implements QueryBase { return thisProgress; } + #emitFilters() { + if (this.requests.every(a => !!a.options?.skipDiff)) { + const existing = this.filters; + const rb = new RequestBuilder(this.id); + this.requests.forEach(a => rb.add(a)); + const filters = rb.buildDiff(this.#system, existing); + filters.forEach(f => this.emit("filters", f)); + this.requests = []; + } else { + // send without diff + const rb = new RequestBuilder(this.id); + this.requests.forEach(a => rb.add(a)); + const filters = rb.build(this.#system); + filters.forEach(f => this.emit("filters", f)); + this.requests = []; + } + } + #onProgress() { const isFinished = this.progress === 1; if (this.feed.loading !== isFinished) { diff --git a/packages/system/src/relay-metric-handler.ts b/packages/system/src/relay-metric-handler.ts index 4ca768c0..8a717728 100644 --- a/packages/system/src/relay-metric-handler.ts +++ b/packages/system/src/relay-metric-handler.ts @@ -1,7 +1,6 @@ import { FeedCache, unixNowMs } from "@snort/shared"; -import { Connection } from "connection"; -import { RelayMetrics } from "cache"; -import { TraceReport } from "query"; +import { RelayMetrics } from "./cache"; +import { TraceReport } from "./query"; export class RelayMetricHandler { readonly #cache: FeedCache; diff --git a/packages/system/src/request-builder.ts b/packages/system/src/request-builder.ts index 1745f67e..701b91ed 100644 --- a/packages/system/src/request-builder.ts +++ b/packages/system/src/request-builder.ts @@ -53,6 +53,11 @@ export interface RequestBuilderOptions { * Max wait time for this request */ timeout?: number; + + /** + * How many milli-seconds to wait to allow grouping + */ + groupingDelay?: number; } /** @@ -111,7 +116,7 @@ export class RequestBuilder { } build(system: SystemInterface): Array { - const expanded = this.#builders.flatMap(a => a.build(system.RelayCache, this.#options)); + const expanded = this.#builders.flatMap(a => a.build(system.relayCache, this.#options)); return this.#groupByRelay(system, expanded); } @@ -121,14 +126,14 @@ export class RequestBuilder { buildDiff(system: SystemInterface, prev: Array): Array { const start = unixNowMs(); - const diff = system.Optimizer.getDiff(prev, this.buildRaw()); + const diff = system.optimizer.getDiff(prev, this.buildRaw()); const ts = unixNowMs() - start; this.#log("buildDiff %s %d ms +%d", this.id, ts, diff.length); if (diff.length > 0) { - return splitFlatByWriteRelays(system.RelayCache, diff).map(a => { + return splitFlatByWriteRelays(system.relayCache, diff).map(a => { return { strategy: RequestStrategy.AuthorsRelays, - filters: system.Optimizer.flatMerge(a.filters), + filters: system.optimizer.flatMerge(a.filters), relay: a.relay, }; }); @@ -154,7 +159,7 @@ export class RequestBuilder { const filtersSquashed = [...relayMerged.values()].map(a => { return { - filters: system.Optimizer.flatMerge(a.flatMap(b => b.filters.flatMap(c => system.Optimizer.expandFilter(c)))), + filters: system.optimizer.flatMerge(a.flatMap(b => b.filters.flatMap(c => system.optimizer.expandFilter(c)))), relay: a[0].relay, strategy: a[0].strategy, } as BuiltRawReqFilter; diff --git a/packages/system/src/worker/system-worker.ts b/packages/system/src/worker/system-worker.ts index 8ffaed2d..bcd9371d 100644 --- a/packages/system/src/worker/system-worker.ts +++ b/packages/system/src/worker/system-worker.ts @@ -3,11 +3,8 @@ import EventEmitter from "eventemitter3"; import { ConnectionStateSnapshot, NostrEvent, - NoteStore, OkResponse, ProfileLoaderService, - Optimizer, - RelayCache, RelaySettings, RequestBuilder, SystemInterface, @@ -28,18 +25,19 @@ import { FeedCache } from "@snort/shared"; import { EventsCache } from "../cache/events"; import { RelayMetricHandler } from "../relay-metric-handler"; import debug from "debug"; +import { ConnectionPool } from "nostr-connection-pool"; export class SystemWorker extends EventEmitter implements SystemInterface { #log = debug("SystemWorker"); #worker: Worker; #commandQueue: Map void> = new Map(); - #relayCache: FeedCache; - #profileCache: FeedCache; - #relayMetricsCache: FeedCache; - #profileLoader: ProfileLoaderService; - #relayMetrics: RelayMetricHandler; - #eventsCache: FeedCache; - #relayLoader: RelayMetadataLoader; + readonly relayCache: FeedCache; + readonly profileCache: FeedCache; + readonly relayMetricsCache: FeedCache; + readonly profileLoader: ProfileLoaderService; + readonly relayMetricsHandler: RelayMetricHandler; + readonly eventsCache: FeedCache; + readonly relayLoader: RelayMetadataLoader; get checkSigs() { return true; @@ -49,17 +47,25 @@ export class SystemWorker extends EventEmitter implements Sys // not used } + get optimizer() { + return DefaultOptimizer; + } + + get pool() { + return {} as ConnectionPool; + } + constructor(scriptPath: string, props: NostrsystemProps) { super(); - this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); - this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); - this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); - this.#eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); + this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays); + this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users); + this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics); + this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events); - this.#profileLoader = new ProfileLoaderService(this, this.#profileCache); - this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache); - this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache); + this.profileLoader = new ProfileLoaderService(this, this.profileCache); + this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache); + this.relayLoader = new RelayMetadataLoader(this, this.relayCache); this.#worker = new Worker(scriptPath, { name: "SystemWorker", type: "module", @@ -86,7 +92,7 @@ export class SystemWorker extends EventEmitter implements Sys return undefined; } - Query(type: new () => T, req: RequestBuilder): QueryLike { + Query(req: RequestBuilder): QueryLike { const chan = this.#workerRpc<[RequestBuilder], { id: string; port: MessagePort }>(WorkerCommand.Query, [req]); return { on: (_: "event", cb) => { @@ -130,18 +136,6 @@ export class SystemWorker extends EventEmitter implements Sys throw new Error("Method not implemented."); } - get ProfileLoader(): ProfileLoaderService { - return this.#profileLoader; - } - - get RelayCache(): RelayCache { - return this.#relayCache; - } - - get Optimizer(): Optimizer { - return DefaultOptimizer; - } - #workerRpc(type: WorkerCommand, data?: T, timeout = 5_000) { const id = uuid(); const msg = { diff --git a/packages/system/worker.ts b/packages/system/worker.ts deleted file mode 100644 index 88315188..00000000 --- a/packages/system/worker.ts +++ /dev/null @@ -1,21 +0,0 @@ -/// -import { UsersRelaysCache } from "../Cache/UserRelayCache"; -import { NostrSystem } from "."; -declare const self: SharedWorkerGlobalScope; - -const RelayCache = new UsersRelaysCache(); -const System = new NostrSystem({ - get: pk => RelayCache.getFromCache(pk)?.relays, -}); - -self.onconnect = e => { - const port = e.ports[0]; - - port.addEventListener("message", async e1 => { - console.debug(e1); - const [cmd, ...others] = e1.data; - switch (cmd) { - } - }); - port.start(); -};