refactor: Query emits Filters

This commit is contained in:
Kieran 2024-01-09 12:51:33 +00:00
parent 18beed13c3
commit 4455651d47
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
40 changed files with 416 additions and 404 deletions

View File

@ -44,6 +44,6 @@ export class FollowListCache extends RefreshFeedCache<TaggedNostrEvent> {
override async preload() {
await super.preload();
this.snapshot().forEach(e => socialGraphInstance.handleEvent(e));
this.cache.forEach(e => socialGraphInstance.handleEvent(e));
}
}

View File

@ -43,7 +43,10 @@ export class FollowsFeedCache extends RefreshFeedCache<TaggedNostrEvent> {
const filtered = evs.filter(a => this.#kinds.includes(a.kind));
if (filtered.length > 0) {
await this.bulkSet(filtered);
this.notifyChange(filtered.map(a => this.key(a)));
this.emit(
"change",
filtered.map(a => this.key(a)),
);
}
}
@ -64,7 +67,7 @@ export class FollowsFeedCache extends RefreshFeedCache<TaggedNostrEvent> {
const oldest = await this.table?.orderBy("created_at").first();
this.#oldest = oldest?.created_at;
this.notifyChange(latest?.map(a => this.key(a)) ?? []);
this.emit("change", latest?.map(a => this.key(a)) ?? []);
debug(this.name)(`Loaded %d/%d in %d ms`, latest?.length ?? 0, keys.length, (unixNowMs() - start).toLocaleString());
}
@ -96,7 +99,7 @@ export class FollowsFeedCache extends RefreshFeedCache<TaggedNostrEvent> {
this.onTable.add(k);
});
this.notifyChange(latest?.map(a => this.key(a)) ?? []);
this.emit("change", latest?.map(a => this.key(a)) ?? []);
}
}

View File

@ -33,7 +33,10 @@ export class NotificationsCache extends RefreshFeedCache<NostrEventForSession> {
forSession: pubKey,
})),
);
this.notifyChange(filtered.map(v => this.key(v)));
this.emit(
"change",
filtered.map(v => this.key(v)),
);
}
}

View File

@ -24,7 +24,6 @@ export abstract class RefreshFeedCache<T> extends FeedCache<TWithCreated<T>> {
override async preload(): Promise<void> {
await super.preload();
// load all dms to memory
await this.buffer([...this.onTable]);
}
}

View File

@ -285,15 +285,20 @@ class IrisAccount extends Component<Props> {
componentDidMount() {
const session = LoginStore.snapshot();
const myPub = session.publicKey;
System.ProfileLoader.Cache.hook(() => {
const profile = System.ProfileLoader.Cache.getFromCache(myPub);
const irisToActive = profile && profile.nip05 && profile.nip05.endsWith("@iris.to");
this.setState({ profile, irisToActive });
if (profile && !irisToActive) {
this.checkExistingAccount(myPub);
}
}, myPub);
this.checkExistingAccount(myPub);
if (myPub) {
System.profileLoader.cache.on("change", keys => {
if (keys.includes(myPub)) {
const profile = System.profileLoader.cache.getFromCache(myPub);
const irisToActive = profile && profile.nip05 && profile.nip05.endsWith("@iris.to");
this.setState({ profile, irisToActive });
if (profile && !irisToActive) {
this.checkExistingAccount(myPub);
}
}
});
this.checkExistingAccount(myPub);
}
}
async checkExistingAccount(pub: any) {

View File

@ -18,7 +18,7 @@ export function ProfileLink({
children?: ReactNode;
} & Omit<LinkProps, "to">) {
const system = useContext(SnortContext);
const relays = system.RelayCache.getFromCache(pubkey)
const relays = system.relayCache.getFromCache(pubkey)
?.relays?.filter(a => a.settings.write)
?.map(a => a.url);

View File

@ -1,4 +1,4 @@
import { EventKind, NoteCollection, RequestBuilder } from "@snort/system";
import { EventKind, RequestBuilder } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -15,5 +15,5 @@ export function useArticles() {
return rb;
}, [follows.timestamp]);
return useRequestBuilder(NoteCollection, sub);
return useRequestBuilder(sub);
}

View File

@ -1,4 +1,4 @@
import { EventKind, HexKey, NoteCollection, ReplaceableNoteStore, RequestBuilder } from "@snort/system";
import { EventKind, HexKey, RequestBuilder } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -17,12 +17,12 @@ export default function useProfileBadges(pubkey?: HexKey) {
return b;
}, [pubkey]);
const profileBadges = useRequestBuilder(ReplaceableNoteStore, sub);
const profileBadges = useRequestBuilder(sub);
const profile = useMemo(() => {
if (profileBadges.data) {
return chunks(
profileBadges.data.tags.filter(t => t[0] === "a" || t[0] === "e"),
profileBadges.data[0].tags.filter(t => t[0] === "a" || t[0] === "e"),
2,
).reduce((acc, [a, e]) => {
return {
@ -57,7 +57,7 @@ export default function useProfileBadges(pubkey?: HexKey) {
return b;
}, [profile, ds]);
const awards = useRequestBuilder(NoteCollection, awardsSub);
const awards = useRequestBuilder(awardsSub);
const result = useMemo(() => {
if (awards.data) {

View File

@ -1,4 +1,4 @@
import { EventKind, HexKey, NoteCollection, RequestBuilder, socialGraphInstance } from "@snort/system";
import { EventKind, HexKey, RequestBuilder, socialGraphInstance } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -10,7 +10,7 @@ export default function useFollowersFeed(pubkey?: HexKey) {
return b;
}, [pubkey]);
const followersFeed = useRequestBuilder(NoteCollection, sub);
const followersFeed = useRequestBuilder(sub);
const followers = useMemo(() => {
const contactLists = followersFeed.data?.filter(

View File

@ -1,4 +1,4 @@
import { EventKind, HexKey, NoteCollection, RequestBuilder, TaggedNostrEvent } from "@snort/system";
import { EventKind, HexKey, RequestBuilder, TaggedNostrEvent } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -15,7 +15,7 @@ export default function useFollowsFeed(pubkey?: HexKey) {
return b;
}, [isMe, pubkey]);
const contactFeed = useRequestBuilder(NoteCollection, sub);
const contactFeed = useRequestBuilder(sub);
return useMemo(() => {
if (isMe) {
return follows.item;

View File

@ -1,5 +1,5 @@
import { unixNow } from "@snort/shared";
import { EventKind, NoteCollection, RequestBuilder } from "@snort/system";
import { EventKind, RequestBuilder } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -18,7 +18,7 @@ export default function useHashtagsFeed() {
}, [hashtags]);
return {
data: useRequestBuilder(NoteCollection, sub),
data: useRequestBuilder(sub),
hashtags,
};
}

View File

@ -1,9 +1,9 @@
import { EventKind, NostrLink, NoteCollection, parseRelayTags, RequestBuilder, TaggedNostrEvent } from "@snort/system";
import { EventKind, NostrLink, parseRelayTags, RequestBuilder, TaggedNostrEvent } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { usePrevious } from "@uidotdev/usehooks";
import { useEffect, useMemo } from "react";
import { FollowLists, FollowsFeed, GiftsCache, Notifications, UserRelays } from "@/Cache";
import { FollowLists, FollowsFeed, GiftsCache, Notifications } from "@/Cache";
import { Nip4Chats, Nip28Chats } from "@/chat";
import { Nip28ChatSystem } from "@/chat/nip28";
import useEventPublisher from "@/Hooks/useEventPublisher";
@ -96,7 +96,7 @@ export default function useLoginFeed() {
return b;
}, [login]);
const loginFeed = useRequestBuilder(NoteCollection, subLogin);
const loginFeed = useRequestBuilder(subLogin);
// update relays and follow lists
useEffect(() => {
@ -219,7 +219,6 @@ export default function useLoginFeed() {
}, [loginFeed]);
useEffect(() => {
UserRelays.buffer(follows.item).catch(console.error);
system.ProfileLoader.TrackKeys(follows.item); // always track follows profiles
system.profileLoader.TrackKeys(follows.item); // always track follows profiles
}, [follows.item]);
}

View File

@ -1,4 +1,4 @@
import { EventKind, HexKey, parseRelayTags, ReplaceableNoteStore, RequestBuilder } from "@snort/system";
import { EventKind, HexKey, parseRelayTags, RequestBuilder } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -10,6 +10,6 @@ export default function useRelaysFeed(pubkey?: HexKey) {
return b;
}, [pubkey]);
const relays = useRequestBuilder(ReplaceableNoteStore, sub);
return parseRelayTags(relays.data?.tags.filter(a => a[0] === "r") ?? []);
const relays = useRequestBuilder(sub);
return parseRelayTags(relays.data?.[0].tags.filter(a => a[0] === "r") ?? []);
}

View File

@ -1,5 +1,5 @@
import { unixNow } from "@snort/shared";
import { EventKind, NoteCollection, RequestBuilder } from "@snort/system";
import { EventKind, RequestBuilder } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -18,7 +18,7 @@ export function useStatusFeed(id?: string, leaveOpen = false) {
return rb;
}, [id]);
const status = useRequestBuilder(NoteCollection, sub);
const status = useRequestBuilder(sub);
const statusFiltered = status.data?.filter(a => {
const exp = Number(findTag(a, "expiration"));

View File

@ -1,4 +1,4 @@
import { EventExt, EventKind, NostrLink, NoteCollection, RequestBuilder } from "@snort/system";
import { EventExt, EventKind, NostrLink, RequestBuilder } from "@snort/system";
import { useReactions, useRequestBuilder } from "@snort/system-react";
import { useEffect, useMemo, useState } from "react";
@ -30,7 +30,7 @@ export default function useThreadFeed(link: NostrLink) {
return sub;
}, [allEvents.length]);
const store = useRequestBuilder(NoteCollection, sub);
const store = useRequestBuilder(sub);
useEffect(() => {
if (store.data) {

View File

@ -1,5 +1,5 @@
import { unixNow } from "@snort/shared";
import { EventKind, NoteCollection, RequestBuilder } from "@snort/system";
import { EventKind, RequestBuilder } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useCallback, useMemo } from "react";
@ -116,7 +116,7 @@ export default function useTimelineFeed(subject: TimelineSubject, options: Timel
return rb?.builder ?? null;
}, [until, since, options.method, pref, createBuilder]);
const main = useRequestBuilder(NoteCollection, sub);
const main = useRequestBuilder(sub);
const subRealtime = useMemo(() => {
const rb = createBuilder();
@ -130,7 +130,7 @@ export default function useTimelineFeed(subject: TimelineSubject, options: Timel
return rb?.builder ?? null;
}, [pref.autoShowLatest, createBuilder]);
const latest = useRequestBuilder(NoteCollection, subRealtime);
const latest = useRequestBuilder(subRealtime);
return {
main: main.data,

View File

@ -1,4 +1,4 @@
import { EventKind, NostrLink, NoteCollection, parseZap, RequestBuilder } from "@snort/system";
import { EventKind, NostrLink, parseZap, RequestBuilder } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { useMemo } from "react";
@ -10,7 +10,7 @@ export default function useZapsFeed(link?: NostrLink) {
return b;
}, [link]);
const zapsFeed = useRequestBuilder(NoteCollection, sub);
const zapsFeed = useRequestBuilder(sub);
const zaps = useMemo(() => {
if (zapsFeed.data) {

View File

@ -146,7 +146,7 @@ const NetworkGraph = () => {
const node = {
id: UID,
address: pubkey,
profile: system.ProfileLoader.Cache.getFromCache(pubkey),
profile: system.profileLoader.cache.getFromCache(pubkey),
distance,
inboundCount,
outboundCount,

View File

@ -23,7 +23,7 @@ export function FollowsRelayHealth({
const uniqueFollows = dedupe(follows.item);
const hasRelays = useMemo(() => {
return uniqueFollows.filter(a => (system.RelayCache.getFromCache(a)?.relays.length ?? 0) > 0);
return uniqueFollows.filter(a => (system.relayCache.getFromCache(a)?.relays.length ?? 0) > 0);
}, [uniqueFollows]);
const missingRelays = useMemo(() => {
@ -31,7 +31,7 @@ export function FollowsRelayHealth({
}, [hasRelays]);
const topWriteRelays = useMemo(() => {
return pickTopRelays(system.RelayCache, uniqueFollows, 1e31, "write");
return pickTopRelays(system.relayCache, uniqueFollows, 1e31, "write");
}, [uniqueFollows]);
return (

View File

@ -12,7 +12,7 @@ export class Nip24ChatSystem extends ExternalStore<Array<Chat>> implements ChatS
constructor(cache: GiftWrapCache) {
super();
this.#cache = cache;
this.#cache.hook(() => this.notifyChange(), "*");
this.#cache.on("change", () => this.notifyChange());
}
subscription() {

View File

@ -19,7 +19,7 @@ import { LoginSession } from "@/Utils/Login";
export class Nip28ChatSystem extends ExternalStore<Array<Chat>> implements ChatSystem {
#cache: FeedCache<NostrEvent>;
#log = debug("NIP-04");
#log = debug("NIP-28");
readonly ChannelKinds = [
EventKind.PublicChatChannel,
EventKind.PublicChatMessage,

View File

@ -30,21 +30,20 @@ System.on("event", (_, ev) => {
socialGraphInstance.handleEvent(ev);
});
System.profileCache.on("change", keys => {
const changed = removeUndefined(keys.map(a => System.profileCache.getFromCache(a)));
changed.forEach(addCachedMetadataToFuzzySearch);
});
/**
* Add profile loader fn
*/
if (CONFIG.httpCache) {
System.ProfileLoader.loaderFn = async (keys: Array<string>) => {
System.profileLoader.loaderFn = async (keys: Array<string>) => {
return removeUndefined(await Promise.all(keys.map(a => fetchProfile(a))));
};
}
setTimeout(() => {
System.UserProfileCache.snapshot().forEach(a => {
addCachedMetadataToFuzzySearch(a);
});
}, 2000);
export async function fetchProfile(key: string) {
try {
throwIfOffline();

View File

@ -1,6 +1,7 @@
import debug from "debug";
import { removeUndefined, unixNowMs, unwrap } from "./utils";
import { removeUndefined, unixNowMs } from "./utils";
import { DexieTableLike } from "./dexie-like";
import EventEmitter from "eventemitter3";
type HookFn = () => void;
@ -9,14 +10,17 @@ export interface KeyedHookFilter {
fn: HookFn;
}
export interface FeedCacheEvents {
change: (keys: Array<string>) => void;
}
/**
* Dexie backed generic hookable store
*/
export abstract class FeedCache<TCached> {
#name: string;
#hooks: Array<KeyedHookFilter> = [];
export abstract class FeedCache<TCached> extends EventEmitter<FeedCacheEvents> {
readonly name: string;
#snapshot: Array<TCached> = [];
#changed = true;
#log: ReturnType<typeof debug>;
#hits = 0;
#miss = 0;
protected table?: DexieTableLike<TCached>;
@ -24,21 +28,22 @@ export abstract class FeedCache<TCached> {
protected cache: Map<string, TCached> = new Map();
constructor(name: string, table?: DexieTableLike<TCached>) {
this.#name = name;
super();
this.name = name;
this.table = table;
this.#log = debug(name);
setInterval(() => {
debug(this.#name)(
this.#log(
"%d loaded, %d on-disk, %d hooks, %d% hit",
this.cache.size,
this.onTable.size,
this.#hooks.length,
this.listenerCount("change"),
((this.#hits / (this.#hits + this.#miss)) * 100).toFixed(1),
);
}, 30_000);
}
get name() {
return this.#name;
this.on("change", () => {
this.#snapshot = this.takeSnapshot();
});
}
async preload() {
@ -49,28 +54,25 @@ export abstract class FeedCache<TCached> {
}
}
keysOnTable() {
return [...this.onTable];
}
hook(fn: HookFn, key: string | undefined) {
if (!key) {
return () => {
//noop
if (key) {
const handle = (keys: Array<string>) => {
if (keys.includes(key)) {
fn();
}
};
this.on("change", handle);
return () => this.off("change", handle);
}
this.#hooks.push({
key,
fn,
});
return () => {
const idx = this.#hooks.findIndex(a => a.fn === fn);
if (idx >= 0) {
this.#hooks.splice(idx, 1);
}
// noop
};
}
keysOnTable() {
return [...this.onTable];
}
getFromCache(key?: string) {
if (key) {
@ -89,7 +91,7 @@ export abstract class FeedCache<TCached> {
const cached = await this.table.get(key);
if (cached) {
this.cache.set(this.key(cached), cached);
this.notifyChange([key]);
this.emit("change", [key]);
return cached;
}
}
@ -120,7 +122,7 @@ export abstract class FeedCache<TCached> {
console.error(e);
}
}
this.notifyChange([k]);
this.emit("change", [k]);
}
async bulkSet(obj: Array<TCached> | Readonly<Array<TCached>>) {
@ -133,7 +135,10 @@ export abstract class FeedCache<TCached> {
}
}
obj.forEach(v => this.cache.set(this.key(v), v));
this.notifyChange(obj.map(a => this.key(a)));
this.emit(
"change",
obj.map(a => this.key(a)),
);
}
/**
@ -156,7 +161,7 @@ export abstract class FeedCache<TCached> {
}
return "no_change";
})();
debug(this.#name)("Updating %s %s %o", k, updateType, m);
this.#log("Updating %s %s %o", k, updateType, m);
if (updateType !== "no_change") {
const updated = {
...existing,
@ -184,8 +189,11 @@ export abstract class FeedCache<TCached> {
fromCache.forEach(a => {
this.cache.set(this.key(a), a);
});
this.notifyChange(fromCache.map(a => this.key(a)));
debug(this.#name)(`Loaded %d/%d in %d ms`, fromCache.length, keys.length, (unixNowMs() - start).toLocaleString());
this.emit(
"change",
fromCache.map(a => this.key(a)),
);
this.#log(`Loaded %d/%d in %d ms`, fromCache.length, keys.length, (unixNowMs() - start).toLocaleString());
return mapped.filter(a => !a.has).map(a => a.key);
}
@ -197,23 +205,12 @@ export abstract class FeedCache<TCached> {
await this.table?.clear();
this.cache.clear();
this.onTable.clear();
this.#changed = true;
this.#hooks.forEach(h => h.fn());
}
snapshot() {
if (this.#changed) {
this.#snapshot = this.takeSnapshot();
this.#changed = false;
}
return this.#snapshot;
}
protected notifyChange(keys: Array<string>) {
this.#changed = true;
this.#hooks.filter(a => keys.includes(a.key) || a.key === "*").forEach(h => h.fn());
}
abstract key(of: TCached): string;
abstract takeSnapshot(): Array<TCached>;
}

View File

@ -1,5 +1,5 @@
import { useMemo } from "react";
import { RequestBuilder, ReplaceableNoteStore, NostrLink, NoteCollection } from "@snort/system";
import { RequestBuilder, NostrLink } from "@snort/system";
import { useRequestBuilder } from "./useRequestBuilder";
export function useEventFeed(link: NostrLink) {
@ -9,7 +9,7 @@ export function useEventFeed(link: NostrLink) {
return b;
}, [link]);
return useRequestBuilder(ReplaceableNoteStore, sub);
return useRequestBuilder(sub);
}
export function useEventsFeed(id: string, links: Array<NostrLink>) {
@ -19,5 +19,5 @@ export function useEventsFeed(id: string, links: Array<NostrLink>) {
return b;
}, [id, links]);
return useRequestBuilder(NoteCollection, sub);
return useRequestBuilder(sub);
}

View File

@ -30,5 +30,5 @@ export function useReactions(
return rb.numFilters > 0 ? rb : null;
}, [ids]);
return useRequestBuilder(NoteCollection, sub);
return useRequestBuilder(sub);
}

View File

@ -6,14 +6,13 @@ import { SnortContext } from "./context";
/**
* Send a query to the relays and wait for data
*/
const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TStore["getSnapshotData"]>>(
type: { new (): TStore },
const useRequestBuilder = (
rb: RequestBuilder | null,
) => {
const system = useContext(SnortContext);
const subscribe = (onChanged: () => void) => {
if (rb) {
const q = system.Query<TStore>(type, rb);
const q = system.Query(rb);
q.on("event", onChanged);
q.uncancel();
return () => {
@ -25,14 +24,14 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
// noop
};
};
const getState = (): StoreSnapshot<TSnapshot> => {
const getState = () => {
const q = system.GetQuery(rb?.id ?? "");
if (q) {
return q.snapshot as StoreSnapshot<TSnapshot>;
return q.snapshot;
}
return EmptySnapshot as StoreSnapshot<TSnapshot>;
return EmptySnapshot;
};
return useSyncExternalStore<StoreSnapshot<TSnapshot>>(
return useSyncExternalStore(
v => subscribe(v),
() => getState(),
);

View File

@ -10,16 +10,23 @@ export function useUserProfile(pubKey?: HexKey): CachedMetadata | undefined {
return useSyncExternalStore<CachedMetadata | undefined>(
h => {
if (pubKey) {
system.ProfileLoader.TrackKeys(pubKey);
const handler = (keys: Array<string>) => {
if (keys.includes(pubKey)) {
h();
}
};
system.profileLoader.cache.on("change", handler);
system.profileLoader.TrackKeys(pubKey);
return () => {
system.profileLoader.cache.off("change", handler);
system.profileLoader.UntrackKeys(pubKey);
};
}
const release = system.ProfileLoader.Cache.hook(h, pubKey);
return () => {
release();
if (pubKey) {
system.ProfileLoader.UntrackKeys(pubKey);
}
// noop
};
},
() => system.ProfileLoader.Cache.getFromCache(pubKey),
() => system.profileLoader.cache.getFromCache(pubKey),
);
}

View File

@ -5,7 +5,7 @@ import { SnortContext } from "./context";
export function useUserSearch() {
const system = useContext(SnortContext);
const cache = system.ProfileLoader.Cache as UserProfileCache;
const cache = system.profileLoader.cache as UserProfileCache;
async function search(input: string): Promise<Array<string>> {
// try exact match first

View File

@ -4,7 +4,7 @@ import { SystemInterface, TaggedNostrEvent, RequestBuilder } from ".";
export abstract class BackgroundLoader<T extends { loaded: number; created: number }> {
#system: SystemInterface;
#cache: FeedCache<T>;
readonly cache: FeedCache<T>;
#log = debug(this.name());
/**
@ -19,14 +19,10 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
constructor(system: SystemInterface, cache: FeedCache<T>) {
this.#system = system;
this.#cache = cache;
this.cache = cache;
this.#FetchMetadata();
}
get Cache() {
return this.#cache;
}
/**
* Name of this loader service
*/
@ -74,38 +70,40 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
* Get object from cache or fetch if missing
*/
async fetch(key: string) {
const existing = this.Cache.get(key);
const existing = this.cache.get(key);
if (existing) {
return existing;
} else {
return await new Promise<T>((resolve, reject) => {
this.TrackKeys(key);
const release = this.Cache.hook(() => {
const existing = this.Cache.getFromCache(key);
if (existing) {
resolve(existing);
release();
this.UntrackKeys(key);
this.cache.on("change", keys => {
if (keys.includes(key)) {
const existing = this.cache.getFromCache(key);
if (existing) {
resolve(existing);
this.UntrackKeys(key);
this.cache.off("change");
}
}
}, key);
});
});
}
}
async #FetchMetadata() {
const loading = [...this.#wantsKeys];
await this.#cache.buffer(loading);
await this.cache.buffer(loading);
const missing = loading.filter(a => (this.#cache.getFromCache(a)?.loaded ?? 0) < this.getExpireCutoff());
const missing = loading.filter(a => (this.cache.getFromCache(a)?.loaded ?? 0) < this.getExpireCutoff());
if (missing.length > 0) {
this.#log("Fetching keys: %O", missing);
try {
const found = await this.#loadData(missing);
const noResult = removeUndefined(
missing.filter(a => !found.some(b => a === this.#cache.key(b))).map(a => this.makePlaceholder(a)),
missing.filter(a => !found.some(b => a === this.cache.key(b))).map(a => this.makePlaceholder(a)),
);
if (noResult.length > 0) {
await Promise.all(noResult.map(a => this.#cache.update(a)));
await Promise.all(noResult.map(a => this.cache.update(a)));
}
} catch (e) {
this.#log("Error: %O", e);
@ -119,14 +117,14 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
async #loadData(missing: Array<string>) {
if (this.loaderFn) {
const results = await this.loaderFn(missing);
await Promise.all(results.map(a => this.#cache.update(a)));
await Promise.all(results.map(a => this.cache.update(a)));
return results;
} else {
const v = await this.#system.Fetch(this.buildSub(missing), async e => {
for (const pe of e) {
const m = this.onEvent(pe);
if (m) {
await this.#cache.update(m);
await this.cache.update(m);
}
}
});

View File

@ -1,11 +1,13 @@
import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { RequestBuilder } from "./request-builder";
import { NoteStore, NoteStoreSnapshotData, StoreSnapshot } from "./note-collection";
import { NoteCollection, NoteStore, NoteStoreSnapshotData, StoreSnapshot } from "./note-collection";
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
import { ProfileLoaderService } from "./profile-cache";
import { RelayCache } from "./outbox-model";
import { RelayCache, RelayMetadataLoader } from "./outbox-model";
import { Optimizer } from "./query-optimizer";
import { base64 } from "@scure/base";
import { FeedCache } from "@snort/shared";
import { ConnectionPool } from "nostr-connection-pool";
export { NostrSystem } from "./nostr-system";
export { default as EventKind } from "./event-kind";
@ -49,7 +51,7 @@ export interface QueryLike {
off: (event: "event", fn?: (evs: Array<TaggedNostrEvent>) => void) => void;
cancel: () => void;
uncancel: () => void;
get snapshot(): StoreSnapshot<NoteStoreSnapshotData>;
get snapshot(): StoreSnapshot<Array<TaggedNostrEvent>>;
}
export interface SystemInterface {
@ -76,10 +78,9 @@ export interface SystemInterface {
/**
* Open a new query to relays
* @param type Store type
* @param req Request to send to relays
*/
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): QueryLike;
Query(req: RequestBuilder): QueryLike;
/**
* Fetch data from nostr relays asynchronously
@ -123,17 +124,32 @@ export interface SystemInterface {
/**
* Profile cache/loader
*/
get ProfileLoader(): ProfileLoaderService;
get profileLoader(): ProfileLoaderService;
/**
* Relay cache for "Gossip" model
*/
get RelayCache(): RelayCache;
get relayCache(): RelayCache;
/**
* Query optimizer
*/
get Optimizer(): Optimizer;
get optimizer(): Optimizer;
/**
* Generic cache store for events
*/
get eventsCache(): FeedCache<NostrEvent>;
/**
* Relay loader loads relay metadata for a set of profiles
*/
get relayLoader(): RelayMetadataLoader;
/**
* Main connection pool
*/
get pool(): ConnectionPool;
}
export interface SystemSnapshot {

View File

@ -24,7 +24,8 @@ export type ConnectionPool = {
disconnect(address: string): void;
broadcast(system: SystemInterface, ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]>;
broadcastTo(address: string, ev: NostrEvent): Promise<OkResponse>;
} & EventEmitter<NostrConnectionPoolEvents>;
} & EventEmitter<NostrConnectionPoolEvents> &
Iterable<[string, Connection]>;
/**
* Simple connection pool containing connections to multiple nostr relays

View File

@ -1,15 +1,19 @@
import debug from "debug";
import EventEmitter from "eventemitter3";
import { BuiltRawReqFilter, NoteCollection, NoteStore, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".";
import { BuiltRawReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".";
import { Query, TraceReport } from "./query";
import { unwrap } from "@snort/shared";
import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer";
import { trimFilters } from "./request-trim";
interface NostrQueryManagerEvents {
change: () => void;
trace: (report: TraceReport) => void;
sendQuery: (q: Query, filter: BuiltRawReqFilter) => void;
}
/**
* Query manager handles sending requests to the nostr network
*/
export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
#log = debug("NostrQueryManager");
@ -23,9 +27,15 @@ export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
*/
#system: SystemInterface;
/**
* Query cache processing layers which can take data from a cache
*/
#queryCacheLayers: Array<FilterCacheLayer> = [];
constructor(system: SystemInterface) {
super();
this.#system = system;
this.#queryCacheLayers.push(new IdsFilterCacheLayer(system.eventsCache));
setInterval(() => this.#cleanup(), 1_000);
}
@ -37,34 +47,21 @@ export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
/**
* Compute query to send to relays
*/
query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query {
query(req: RequestBuilder): Query {
const existing = this.#queries.get(req.id);
if (existing) {
// if same instance, just return query
if (existing.fromInstance === req.instance) {
return existing;
}
const filters = !req.options?.skipDiff ? req.buildDiff(this.#system, existing.filters) : req.build(this.#system);
if (filters.length === 0 && !!req.options?.skipDiff) {
return existing;
} else {
for (const subQ of filters) {
this.emit("sendQuery", existing, subQ);
}
if (existing.addRequest(req)) {
this.emit("change");
return existing;
}
return existing;
} else {
const store = new type();
const filters = req.build(this.#system);
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen, req.options?.timeout);
const q = new Query(this.#system, req);
q.on("trace", r => this.emit("trace", r));
q.on("filters", fx => {
this.#send(q, fx);
});
this.#queries.set(req.id, q);
for (const subQ of filters) {
this.emit("sendQuery", q, subQ);
}
this.emit("change");
return q;
}
@ -74,10 +71,8 @@ export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
* Async fetch results
*/
fetch(req: RequestBuilder, cb?: (evs: ReadonlyArray<TaggedNostrEvent>) => void) {
const q = this.query(NoteCollection, req);
const q = this.query(req);
return new Promise<Array<TaggedNostrEvent>>(resolve => {
let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = [];
if (cb) {
q.feed.on("event", cb);
}
@ -85,7 +80,7 @@ export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
if (!loading) {
q.feed.off("event");
q.cancel();
resolve(unwrap((q.feed as NoteCollection).snapshot.data));
resolve(unwrap(q.snapshot.data));
}
});
});
@ -97,6 +92,56 @@ export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
}
}
async #send(q: Query, qSend: BuiltRawReqFilter) {
for (const qfl of this.#queryCacheLayers) {
qSend = await qfl.processFilter(q, qSend);
}
for (const f of qSend.filters) {
if (f.authors) {
this.#system.relayLoader.TrackKeys(f.authors);
}
}
// check for empty filters
const fNew = trimFilters(qSend.filters);
if (fNew.length === 0) {
return;
}
qSend.filters = fNew;
if (qSend.relay) {
this.#log("Sending query to %s %O", qSend.relay, qSend);
const s = this.#system.pool.getConnection(qSend.relay);
if (s) {
const qt = q.sendToRelay(s, qSend);
if (qt) {
return [qt];
}
} else {
const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true);
if (nc) {
const qt = q.sendToRelay(nc, qSend);
if (qt) {
return [qt];
}
} else {
console.warn("Failed to connect to new relay for:", qSend.relay, q);
}
}
} else {
const ret = [];
for (const [a, s] of this.#system.pool) {
if (!s.Ephemeral) {
this.#log("Sending query to %s %O", a, qSend);
const qt = q.sendToRelay(s, qSend);
if (qt) {
ret.push(qt);
}
}
}
return ret;
}
}
#cleanup() {
let changed = false;
for (const [k, v] of this.#queries) {

View File

@ -4,9 +4,7 @@ import EventEmitter from "eventemitter3";
import { FeedCache } from "@snort/shared";
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { Query } from "./query";
import { NoteStore } from "./note-collection";
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
import { RequestBuilder } from "./request-builder";
import { RelayMetricHandler } from "./relay-metric-handler";
import {
CachedMetadata,
@ -23,12 +21,10 @@ import {
QueryLike,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayCache, RelayMetadataLoader } from "./outbox-model";
import { RelayMetadataLoader } from "./outbox-model";
import { Optimizer, DefaultOptimizer } from "./query-optimizer";
import { trimFilters } from "./request-trim";
import { NostrConnectionPool } from "./nostr-connection-pool";
import { NostrQueryManager } from "./nostr-query-manager";
import { FilterCacheLayer, IdsFilterCacheLayer } from "./filter-cache-layer";
export interface NostrSystemEvents {
change: (state: SystemSnapshot) => void;
@ -52,77 +48,67 @@ export interface NostrsystemProps {
*/
export class NostrSystem extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#log = debug("System");
#pool = new NostrConnectionPool();
#queryManager: NostrQueryManager;
/**
* Storage class for user relay lists
*/
#relayCache: FeedCache<UsersRelays>;
readonly relayCache: FeedCache<UsersRelays>;
/**
* Storage class for user profiles
*/
#profileCache: FeedCache<CachedMetadata>;
readonly profileCache: FeedCache<CachedMetadata>;
/**
* Storage class for relay metrics (connects/disconnects)
*/
#relayMetricsCache: FeedCache<RelayMetrics>;
readonly relayMetricsCache: FeedCache<RelayMetrics>;
/**
* Profile loading service
*/
#profileLoader: ProfileLoaderService;
readonly profileLoader: ProfileLoaderService;
/**
* Relay metrics handler cache
*/
#relayMetrics: RelayMetricHandler;
/**
* General events cache
*/
#eventsCache: FeedCache<NostrEvent>;
readonly relayMetricsHandler: RelayMetricHandler;
/**
* Optimizer instance, contains optimized functions for processing data
*/
#optimizer: Optimizer;
readonly optimizer: Optimizer;
readonly pool = new NostrConnectionPool();
readonly eventsCache: FeedCache<NostrEvent>;
readonly relayLoader: RelayMetadataLoader;
/**
* Check event signatures (reccomended)
*/
checkSigs: boolean;
#relayLoader: RelayMetadataLoader;
/**
* Query cache processing layers which can take data from a cache
*/
#queryCacheLayers: Array<FilterCacheLayer> = [];
constructor(props: NostrsystemProps) {
super();
this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics);
this.#eventsCache = props.eventsCache ?? new EventsCache(props.db?.events);
this.#optimizer = props.optimizer ?? DefaultOptimizer;
this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics);
this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events);
this.optimizer = props.optimizer ?? DefaultOptimizer;
this.#profileLoader = new ProfileLoaderService(this, this.#profileCache);
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache);
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);
this.relayLoader = new RelayMetadataLoader(this, this.relayCache);
this.checkSigs = props.checkSigs ?? true;
this.#queryManager = new NostrQueryManager(this);
this.#queryCacheLayers.push(new IdsFilterCacheLayer(this.#eventsCache));
// hook connection pool
this.#pool.on("connected", (id, wasReconnect) => {
const c = this.#pool.getConnection(id);
this.pool.on("connected", (id, wasReconnect) => {
const c = this.pool.getConnection(id);
if (c) {
this.#relayMetrics.onConnect(c.Address);
this.relayMetricsHandler.onConnect(c.Address);
if (wasReconnect) {
for (const [, q] of this.#queryManager) {
q.connectionRestored(c);
@ -130,18 +116,18 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
}
});
this.#pool.on("connectFailed", address => {
this.#relayMetrics.onDisconnect(address, 0);
this.pool.on("connectFailed", address => {
this.relayMetricsHandler.onDisconnect(address, 0);
});
this.#pool.on("event", (_, sub, ev) => {
ev.relays?.length && this.#relayMetrics.onEvent(ev.relays[0]);
this.pool.on("event", (_, sub, ev) => {
ev.relays?.length && this.relayMetricsHandler.onEvent(ev.relays[0]);
if (!EventExt.isValid(ev)) {
this.#log("Rejecting invalid event %O", ev);
return;
}
if (this.checkSigs) {
if (!this.#optimizer.schnorrVerify(ev)) {
if (!this.optimizer.schnorrVerify(ev)) {
this.#log("Invalid sig %O", ev);
return;
}
@ -149,84 +135,68 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.emit("event", sub, ev);
});
this.#pool.on("disconnect", (id, code) => {
const c = this.#pool.getConnection(id);
this.pool.on("disconnect", (id, code) => {
const c = this.pool.getConnection(id);
if (c) {
this.#relayMetrics.onDisconnect(c.Address, code);
this.relayMetricsHandler.onDisconnect(c.Address, code);
for (const [, q] of this.#queryManager) {
q.connectionLost(c.Id);
}
}
});
this.#pool.on("eose", (id, sub) => {
const c = this.#pool.getConnection(id);
this.pool.on("eose", (id, sub) => {
const c = this.pool.getConnection(id);
if (c) {
for (const [, v] of this.#queryManager) {
v.eose(sub, c);
}
}
});
this.#pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb));
this.#pool.on("notice", (addr, msg) => {
this.pool.on("auth", (_, c, r, cb) => this.emit("auth", c, r, cb));
this.pool.on("notice", (addr, msg) => {
this.#log("NOTICE: %s %s", addr, msg);
});
this.#queryManager.on("change", () => this.emit("change", this.takeSnapshot()));
this.#queryManager.on("sendQuery", (q, f) => this.#sendQuery(q, f));
this.#queryManager.on("trace", t => {
this.#relayMetrics.onTraceReport(t);
this.relayMetricsHandler.onTraceReport(t);
});
// internal handler for on-event
this.on("event", (sub, ev) => {
for (const [, v] of this.#queryManager) {
const trace = v.handleEvent(sub, ev);
// inject events to cache if query by id
if (trace && trace.filters.some(a => a.ids)) {
this.#eventsCache.set(ev);
this.eventsCache.set(ev);
}
}
});
}
get ProfileLoader() {
return this.#profileLoader;
}
get Sockets(): ConnectionStateSnapshot[] {
return this.#pool.getState();
}
get RelayCache(): RelayCache {
return this.#relayCache;
}
get UserProfileCache(): FeedCache<CachedMetadata> {
return this.#profileCache;
}
get Optimizer(): Optimizer {
return this.#optimizer;
return this.pool.getState();
}
async Init() {
const t = [
this.#relayCache.preload(),
this.#profileCache.preload(),
this.#relayMetricsCache.preload(),
this.#eventsCache.preload(),
this.relayCache.preload(),
this.profileCache.preload(),
this.relayMetricsCache.preload(),
this.eventsCache.preload(),
];
await Promise.all(t);
}
async ConnectToRelay(address: string, options: RelaySettings) {
await this.#pool.connect(address, options, false);
await this.pool.connect(address, options, false);
}
ConnectEphemeralRelay(address: string) {
return this.#pool.connect(address, { read: true, write: true }, true);
return this.pool.connect(address, { read: true, write: true }, true);
}
DisconnectRelay(address: string) {
this.#pool.disconnect(address);
this.pool.disconnect(address);
}
GetQuery(id: string): QueryLike | undefined {
@ -237,60 +207,8 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
return this.#queryManager.fetch(req, cb);
}
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): QueryLike {
return this.#queryManager.query(type, req) as QueryLike;
}
async #sendQuery(q: Query, qSend: BuiltRawReqFilter) {
for (const qfl of this.#queryCacheLayers) {
qSend = await qfl.processFilter(q, qSend);
}
for (const f of qSend.filters) {
if (f.authors) {
this.#relayLoader.TrackKeys(f.authors);
}
}
// check for empty filters
const fNew = trimFilters(qSend.filters);
if (fNew.length === 0) {
return;
}
qSend.filters = fNew;
if (qSend.relay) {
this.#log("Sending query to %s %O", qSend.relay, qSend);
const s = this.#pool.getConnection(qSend.relay);
if (s) {
const qt = q.sendToRelay(s, qSend);
if (qt) {
return [qt];
}
} else {
const nc = await this.ConnectEphemeralRelay(qSend.relay);
if (nc) {
const qt = q.sendToRelay(nc, qSend);
if (qt) {
return [qt];
}
} else {
console.warn("Failed to connect to new relay for:", qSend.relay, q);
}
}
} else {
const ret = [];
for (const [a, s] of this.#pool) {
if (!s.Ephemeral) {
this.#log("Sending query to %s %O", a, qSend);
const qt = q.sendToRelay(s, qSend);
if (qt) {
ret.push(qt);
}
}
}
return ret;
}
return [];
Query(req: RequestBuilder): QueryLike {
return this.#queryManager.query(req) as QueryLike;
}
HandleEvent(ev: TaggedNostrEvent) {
@ -299,11 +217,11 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]> {
this.HandleEvent({ ...ev, relays: [] });
return await this.#pool.broadcast(this, ev, cb);
return await this.pool.broadcast(this, ev, cb);
}
async WriteOnceToRelay(address: string, ev: NostrEvent): Promise<OkResponse> {
return await this.#pool.broadcastTo(address, ev);
return await this.pool.broadcastTo(address, ev);
}
takeSnapshot(): SystemSnapshot {

View File

@ -3,7 +3,7 @@ import { EventExt, EventType, TaggedNostrEvent, u256 } from ".";
import { findTag } from "./utils";
import EventEmitter from "eventemitter3";
export interface StoreSnapshot<TSnapshot> {
export interface StoreSnapshot<TSnapshot extends NoteStoreSnapshotData> {
data: TSnapshot | undefined;
clear: () => void;
loading: () => boolean;
@ -19,7 +19,7 @@ export const EmptySnapshot = {
add: () => {
// empty
},
} as StoreSnapshot<FlatNoteStore>;
} as StoreSnapshot<Array<TaggedNostrEvent>>;
export type NoteStoreSnapshotData = Array<TaggedNostrEvent> | TaggedNostrEvent;
export type NoteStoreHook = () => void;

View File

@ -205,7 +205,7 @@ export function pickTopRelays(cache: RelayCache, authors: Array<string>, n: numb
export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface, pickN?: number) {
const recipients = dedupe(ev.tags.filter(a => a[0] === "p").map(a => a[1]));
await updateRelayLists(recipients, system);
const relays = pickTopRelays(system.RelayCache, recipients, pickN ?? DefaultPickNRelays, "read");
const relays = pickTopRelays(system.relayCache, recipients, pickN ?? DefaultPickNRelays, "read");
const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat()));
logger("Picked %O from authors %O", ret, recipients);
return ret;
@ -247,15 +247,15 @@ export function parseRelaysFromKind(ev: NostrEvent) {
}
export async function updateRelayLists(authors: Array<string>, system: SystemInterface) {
await system.RelayCache.buffer(authors);
await system.relayCache.buffer(authors);
const expire = unixNowMs() - RelayListCacheExpire;
const expired = authors.filter(a => (system.RelayCache.getFromCache(a)?.loaded ?? 0) < expire);
const expired = authors.filter(a => (system.relayCache.getFromCache(a)?.loaded ?? 0) < expire);
if (expired.length > 0) {
logger("Updating relays for authors: %O", expired);
const rb = new RequestBuilder("system-update-relays-for-outbox");
rb.withFilter().authors(expired).kinds([EventKind.Relays, EventKind.ContactList]);
const relayLists = await system.Fetch(rb);
await system.RelayCache.bulkSet(
await system.relayCache.bulkSet(
removeUndefined(
relayLists.map(a => {
const relays = parseRelaysFromKind(a);

View File

@ -3,9 +3,9 @@ import debug from "debug";
import EventEmitter from "eventemitter3";
import { unixNowMs, unwrap } from "@snort/shared";
import { Connection, ReqFilter, Nips, TaggedNostrEvent } from ".";
import { NoteStore } from "./note-collection";
import { BuiltRawReqFilter } from "./request-builder";
import { Connection, ReqFilter, Nips, TaggedNostrEvent, SystemInterface } from ".";
import { NoteCollection, NoteStore } from "./note-collection";
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
import { eventMatchesFilter } from "./request-matcher";
interface QueryTraceEvents {
@ -89,23 +89,6 @@ export class QueryTrace extends EventEmitter<QueryTraceEvents> {
}
}
export interface QueryBase {
/**
* Uniquie ID of this query
*/
id: string;
/**
* The query payload (REQ filters)
*/
filters: Array<ReqFilter>;
/**
* List of relays to send this query to
*/
relays?: Array<string>;
}
export interface TraceReport {
id: string;
conn: Connection;
@ -116,22 +99,28 @@ export interface TraceReport {
interface QueryEvents {
trace: (report: TraceReport) => void;
filters: (req: BuiltRawReqFilter) => void;
event: (evs: ReadonlyArray<TaggedNostrEvent>) => void;
}
/**
* Active or queued query on the system
*/
export class Query extends EventEmitter<QueryEvents> implements QueryBase {
export class Query extends EventEmitter<QueryEvents> {
/**
* Uniquie ID of this query
* Unique id of this query
*/
id: string;
readonly id: string;
/**
* RequestBuilder instance
*/
fromInstance: string;
requests: Array<RequestBuilder> = [];
/**
* Nostr system interface
*/
#system: SystemInterface;
/**
* Which relays this query has already been executed on
@ -156,27 +145,66 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
/**
* Feed object which collects events
*/
#feed: NoteStore;
#feed: NoteCollection;
/**
* Maximum waiting time for this query
*/
#timeout: number;
/**
* Milliseconds to wait before sending query (debounce)
*/
#groupingDelay?: number;
/**
* Timer which waits for no-change before emitting filters
*/
#groupTimeout?: ReturnType<typeof setTimeout>;
#log = debug("Query");
constructor(id: string, instance: string, feed: NoteStore, leaveOpen?: boolean, timeout?: number) {
constructor(system: SystemInterface, req: RequestBuilder) {
super();
this.id = id;
this.#feed = feed;
this.fromInstance = instance;
this.#leaveOpen = leaveOpen ?? false;
this.#timeout = timeout ?? 5_000;
this.id = uuid();
this.requests.push(req);
this.#system = system;
this.#feed = new NoteCollection();
this.#leaveOpen = req.options?.leaveOpen ?? false;
this.#timeout = req.options?.timeout ?? 5_000;
this.#groupingDelay = req.options?.groupingDelay ?? 100;
this.#checkTraces();
this.feed.on("event", evs => this.emit("event", evs));
}
/**
* Adds another request to this one
*/
addRequest(req: RequestBuilder) {
if (this.#groupTimeout) {
clearTimeout(this.#groupTimeout);
this.#groupTimeout = undefined;
}
if (this.requests.some(a => a.instance === req.instance)) {
// already exists, nothing to add
return false;
}
if (this.requests.some(a => a.options?.skipDiff !== req.options?.skipDiff)) {
throw new Error("Mixing skipDiff option is not supported");
}
this.requests.push(req);
if (this.#groupingDelay) {
this.#groupTimeout = setTimeout(() => {
this.#emitFilters();
}, this.#groupingDelay);
} else {
this.#emitFilters();
}
return true;
}
isOpen() {
return this.#cancelAt === undefined && this.#leaveOpen;
}
@ -232,7 +260,7 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
/**
* Insert a new trace as a placeholder
*/
insertCompletedTrace(subq: BuiltRawReqFilter, data: Readonly<Array<TaggedNostrEvent>>) {
insertCompletedTrace(subq: BuiltRawReqFilter, data: Array<TaggedNostrEvent>) {
const qt = new QueryTrace(subq.relay, subq.filters, "");
qt.sentToRelay();
qt.gotEose();
@ -288,6 +316,24 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
return thisProgress;
}
#emitFilters() {
if (this.requests.every(a => !!a.options?.skipDiff)) {
const existing = this.filters;
const rb = new RequestBuilder(this.id);
this.requests.forEach(a => rb.add(a));
const filters = rb.buildDiff(this.#system, existing);
filters.forEach(f => this.emit("filters", f));
this.requests = [];
} else {
// send without diff
const rb = new RequestBuilder(this.id);
this.requests.forEach(a => rb.add(a));
const filters = rb.build(this.#system);
filters.forEach(f => this.emit("filters", f));
this.requests = [];
}
}
#onProgress() {
const isFinished = this.progress === 1;
if (this.feed.loading !== isFinished) {

View File

@ -1,7 +1,6 @@
import { FeedCache, unixNowMs } from "@snort/shared";
import { Connection } from "connection";
import { RelayMetrics } from "cache";
import { TraceReport } from "query";
import { RelayMetrics } from "./cache";
import { TraceReport } from "./query";
export class RelayMetricHandler {
readonly #cache: FeedCache<RelayMetrics>;

View File

@ -53,6 +53,11 @@ export interface RequestBuilderOptions {
* Max wait time for this request
*/
timeout?: number;
/**
* How many milli-seconds to wait to allow grouping
*/
groupingDelay?: number;
}
/**
@ -111,7 +116,7 @@ export class RequestBuilder {
}
build(system: SystemInterface): Array<BuiltRawReqFilter> {
const expanded = this.#builders.flatMap(a => a.build(system.RelayCache, this.#options));
const expanded = this.#builders.flatMap(a => a.build(system.relayCache, this.#options));
return this.#groupByRelay(system, expanded);
}
@ -121,14 +126,14 @@ export class RequestBuilder {
buildDiff(system: SystemInterface, prev: Array<ReqFilter>): Array<BuiltRawReqFilter> {
const start = unixNowMs();
const diff = system.Optimizer.getDiff(prev, this.buildRaw());
const diff = system.optimizer.getDiff(prev, this.buildRaw());
const ts = unixNowMs() - start;
this.#log("buildDiff %s %d ms +%d", this.id, ts, diff.length);
if (diff.length > 0) {
return splitFlatByWriteRelays(system.RelayCache, diff).map(a => {
return splitFlatByWriteRelays(system.relayCache, diff).map(a => {
return {
strategy: RequestStrategy.AuthorsRelays,
filters: system.Optimizer.flatMerge(a.filters),
filters: system.optimizer.flatMerge(a.filters),
relay: a.relay,
};
});
@ -154,7 +159,7 @@ export class RequestBuilder {
const filtersSquashed = [...relayMerged.values()].map(a => {
return {
filters: system.Optimizer.flatMerge(a.flatMap(b => b.filters.flatMap(c => system.Optimizer.expandFilter(c)))),
filters: system.optimizer.flatMerge(a.flatMap(b => b.filters.flatMap(c => system.optimizer.expandFilter(c)))),
relay: a[0].relay,
strategy: a[0].strategy,
} as BuiltRawReqFilter;

View File

@ -3,11 +3,8 @@ import EventEmitter from "eventemitter3";
import {
ConnectionStateSnapshot,
NostrEvent,
NoteStore,
OkResponse,
ProfileLoaderService,
Optimizer,
RelayCache,
RelaySettings,
RequestBuilder,
SystemInterface,
@ -28,18 +25,19 @@ import { FeedCache } from "@snort/shared";
import { EventsCache } from "../cache/events";
import { RelayMetricHandler } from "../relay-metric-handler";
import debug from "debug";
import { ConnectionPool } from "nostr-connection-pool";
export class SystemWorker extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#log = debug("SystemWorker");
#worker: Worker;
#commandQueue: Map<string, (v: unknown) => void> = new Map();
#relayCache: FeedCache<UsersRelays>;
#profileCache: FeedCache<CachedMetadata>;
#relayMetricsCache: FeedCache<RelayMetrics>;
#profileLoader: ProfileLoaderService;
#relayMetrics: RelayMetricHandler;
#eventsCache: FeedCache<NostrEvent>;
#relayLoader: RelayMetadataLoader;
readonly relayCache: FeedCache<UsersRelays>;
readonly profileCache: FeedCache<CachedMetadata>;
readonly relayMetricsCache: FeedCache<RelayMetrics>;
readonly profileLoader: ProfileLoaderService;
readonly relayMetricsHandler: RelayMetricHandler;
readonly eventsCache: FeedCache<NostrEvent>;
readonly relayLoader: RelayMetadataLoader;
get checkSigs() {
return true;
@ -49,17 +47,25 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
// not used
}
get optimizer() {
return DefaultOptimizer;
}
get pool() {
return {} as ConnectionPool;
}
constructor(scriptPath: string, props: NostrsystemProps) {
super();
this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics);
this.#eventsCache = props.eventsCache ?? new EventsCache(props.db?.events);
this.relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
this.relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics);
this.eventsCache = props.eventsCache ?? new EventsCache(props.db?.events);
this.#profileLoader = new ProfileLoaderService(this, this.#profileCache);
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache);
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);
this.relayLoader = new RelayMetadataLoader(this, this.relayCache);
this.#worker = new Worker(scriptPath, {
name: "SystemWorker",
type: "module",
@ -86,7 +92,7 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
return undefined;
}
Query<T extends NoteStore>(type: new () => T, req: RequestBuilder): QueryLike {
Query(req: RequestBuilder): QueryLike {
const chan = this.#workerRpc<[RequestBuilder], { id: string; port: MessagePort }>(WorkerCommand.Query, [req]);
return {
on: (_: "event", cb) => {
@ -130,18 +136,6 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
throw new Error("Method not implemented.");
}
get ProfileLoader(): ProfileLoaderService {
return this.#profileLoader;
}
get RelayCache(): RelayCache {
return this.#relayCache;
}
get Optimizer(): Optimizer {
return DefaultOptimizer;
}
#workerRpc<T, R>(type: WorkerCommand, data?: T, timeout = 5_000) {
const id = uuid();
const msg = {

View File

@ -1,21 +0,0 @@
/// <reference lib="webworker" />
import { UsersRelaysCache } from "../Cache/UserRelayCache";
import { NostrSystem } from ".";
declare const self: SharedWorkerGlobalScope;
const RelayCache = new UsersRelaysCache();
const System = new NostrSystem({
get: pk => RelayCache.getFromCache(pk)?.relays,
});
self.onconnect = e => {
const port = e.ports[0];
port.addEventListener("message", async e1 => {
console.debug(e1);
const [cmd, ...others] = e1.data;
switch (cmd) {
}
});
port.start();
};