feat: automated outbox model
This commit is contained in:
parent
a80c330e5b
commit
a67263e5e1
@ -21,7 +21,6 @@ import {
|
||||
} from "@/Login";
|
||||
import { SnortPubKey } from "@/Const";
|
||||
import { SubscriptionEvent } from "@/Subscription";
|
||||
import useRelaysFeedFollows from "./RelaysFeedFollows";
|
||||
import { FollowLists, FollowsFeed, GiftsCache, Notifications, UserRelays } from "@/Cache";
|
||||
import { Nip28Chats, Nip4Chats } from "@/chat";
|
||||
import { useRefreshFeedCache } from "@/Hooks/useRefreshFeedcache";
|
||||
@ -226,11 +225,6 @@ export default function useLoginFeed() {
|
||||
|
||||
useEffect(() => {
|
||||
UserRelays.buffer(follows.item).catch(console.error);
|
||||
system.ProfileLoader.TrackMetadata(follows.item); // always track follows profiles
|
||||
system.ProfileLoader.TrackKeys(follows.item); // always track follows profiles
|
||||
}, [follows.item]);
|
||||
|
||||
const fRelays = useRelaysFeedFollows(follows.item);
|
||||
useEffect(() => {
|
||||
UserRelays.bulkSet(fRelays).catch(console.error);
|
||||
}, [fRelays]);
|
||||
}
|
||||
|
@ -1,46 +0,0 @@
|
||||
import { useMemo } from "react";
|
||||
import {
|
||||
HexKey,
|
||||
FullRelaySettings,
|
||||
TaggedNostrEvent,
|
||||
EventKind,
|
||||
NoteCollection,
|
||||
RequestBuilder,
|
||||
parseRelayTags,
|
||||
} from "@snort/system";
|
||||
import { useRequestBuilder } from "@snort/system-react";
|
||||
import debug from "debug";
|
||||
|
||||
import { UserRelays } from "@/Cache";
|
||||
|
||||
interface RelayList {
|
||||
pubkey: string;
|
||||
created_at: number;
|
||||
relays: FullRelaySettings[];
|
||||
}
|
||||
|
||||
export default function useRelaysFeedFollows(pubkeys: HexKey[]): Array<RelayList> {
|
||||
const sub = useMemo(() => {
|
||||
const b = new RequestBuilder(`relays:follows`);
|
||||
const since = UserRelays.newest();
|
||||
debug("LoginFeed")("Loading relay lists since %s", new Date(since * 1000).toISOString());
|
||||
b.withFilter().authors(pubkeys).kinds([EventKind.Relays]).since(since);
|
||||
return b;
|
||||
}, [pubkeys]);
|
||||
|
||||
function mapFromRelays(notes: Array<TaggedNostrEvent>): Array<RelayList> {
|
||||
return notes.map(ev => {
|
||||
return {
|
||||
pubkey: ev.pubkey,
|
||||
created_at: ev.created_at,
|
||||
relays: parseRelayTags(ev.tags),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
const relays = useRequestBuilder(NoteCollection, sub);
|
||||
const notesRelays = relays.data?.filter(a => a.kind === EventKind.Relays) ?? [];
|
||||
return useMemo(() => {
|
||||
return mapFromRelays(notesRelays);
|
||||
}, [relays]);
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
import { createContext } from "react";
|
||||
import { NostrSystem, SystemInterface } from "@snort/system";
|
||||
|
||||
export const SnortContext = createContext<SystemInterface>(new NostrSystem({}));
|
||||
export const SnortContext = createContext<SystemInterface>({} as SystemInterface);
|
||||
|
@ -10,13 +10,13 @@ export function useUserProfile(pubKey?: HexKey): MetadataCache | undefined {
|
||||
return useSyncExternalStore<MetadataCache | undefined>(
|
||||
h => {
|
||||
if (pubKey) {
|
||||
system.ProfileLoader.TrackMetadata(pubKey);
|
||||
system.ProfileLoader.TrackKeys(pubKey);
|
||||
}
|
||||
const release = system.ProfileLoader.Cache.hook(h, pubKey);
|
||||
return () => {
|
||||
release();
|
||||
if (pubKey) {
|
||||
system.ProfileLoader.UntrackMetadata(pubKey);
|
||||
system.ProfileLoader.UntrackKeys(pubKey);
|
||||
}
|
||||
};
|
||||
},
|
||||
|
136
packages/system/src/background-loader.ts
Normal file
136
packages/system/src/background-loader.ts
Normal file
@ -0,0 +1,136 @@
|
||||
import debug from "debug";
|
||||
import { FeedCache, removeUndefined } from "@snort/shared";
|
||||
import { SystemInterface, TaggedNostrEvent, RequestBuilder } from ".";
|
||||
|
||||
export abstract class BackgroundLoader<T extends { loaded: number; created: number }> {
|
||||
#system: SystemInterface;
|
||||
#cache: FeedCache<T>;
|
||||
#log = debug(this.name());
|
||||
|
||||
/**
|
||||
* List of pubkeys to fetch metadata for
|
||||
*/
|
||||
#wantsKeys = new Set<string>();
|
||||
|
||||
/**
|
||||
* Custom loader function for fetching data from alternative sources
|
||||
*/
|
||||
loaderFn?: (pubkeys: Array<string>) => Promise<Array<T>>;
|
||||
|
||||
constructor(system: SystemInterface, cache: FeedCache<T>) {
|
||||
this.#system = system;
|
||||
this.#cache = cache;
|
||||
this.#FetchMetadata();
|
||||
}
|
||||
|
||||
get Cache() {
|
||||
return this.#cache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Name of this loader service
|
||||
*/
|
||||
abstract name(): string;
|
||||
|
||||
/**
|
||||
* Handle fetched data
|
||||
*/
|
||||
abstract onEvent(e: Readonly<TaggedNostrEvent>): T | undefined;
|
||||
|
||||
/**
|
||||
* Get expire time as uxix milliseconds
|
||||
*/
|
||||
abstract getExpireCutoff(): number;
|
||||
|
||||
/**
|
||||
* Build subscription for missing keys
|
||||
*/
|
||||
protected abstract buildSub(missing: Array<string>): RequestBuilder;
|
||||
|
||||
/**
|
||||
* Create a placeholder value when no data can be found
|
||||
*/
|
||||
protected abstract makePlaceholder(key: string): T | undefined;
|
||||
|
||||
/**
|
||||
* Start requesting a set of keys to be loaded
|
||||
*/
|
||||
TrackKeys(pk: string | Array<string>) {
|
||||
for (const p of Array.isArray(pk) ? pk : [pk]) {
|
||||
this.#wantsKeys.add(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop requesting a set of keys to be loaded
|
||||
*/
|
||||
UntrackKeys(pk: string | Array<string>) {
|
||||
for (const p of Array.isArray(pk) ? pk : [pk]) {
|
||||
this.#wantsKeys.delete(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get object from cache or fetch if missing
|
||||
*/
|
||||
async fetch(key: string) {
|
||||
const existing = this.Cache.get(key);
|
||||
if (existing) {
|
||||
return existing;
|
||||
} else {
|
||||
return await new Promise<T>((resolve, reject) => {
|
||||
this.TrackKeys(key);
|
||||
const release = this.Cache.hook(() => {
|
||||
const existing = this.Cache.getFromCache(key);
|
||||
if (existing) {
|
||||
resolve(existing);
|
||||
release();
|
||||
this.UntrackKeys(key);
|
||||
}
|
||||
}, key);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async #FetchMetadata() {
|
||||
const loading = [...this.#wantsKeys];
|
||||
await this.#cache.buffer(loading);
|
||||
|
||||
const missing = loading.filter(a => (this.#cache.getFromCache(a)?.loaded ?? 0) < this.getExpireCutoff());
|
||||
if (missing.length > 0) {
|
||||
this.#log("Fetching keys: %O", missing);
|
||||
try {
|
||||
const found = await this.#loadData(missing);
|
||||
const noResult = removeUndefined(
|
||||
missing.filter(a => !found.some(b => a === this.#cache.key(b))).map(a => this.makePlaceholder(a)),
|
||||
);
|
||||
if (noResult.length > 0) {
|
||||
await Promise.all(noResult.map(a => this.#cache.update(a)));
|
||||
}
|
||||
} catch (e) {
|
||||
this.#log("Error: %O", e);
|
||||
debugger;
|
||||
}
|
||||
}
|
||||
|
||||
setTimeout(() => this.#FetchMetadata(), 500);
|
||||
}
|
||||
|
||||
async #loadData(missing: Array<string>) {
|
||||
if (this.loaderFn) {
|
||||
const results = await this.loaderFn(missing);
|
||||
await Promise.all(results.map(a => this.#cache.update(a)));
|
||||
return results;
|
||||
} else {
|
||||
const v = await this.#system.Fetch(this.buildSub(missing), async e => {
|
||||
for (const pe of e) {
|
||||
const m = this.onEvent(pe);
|
||||
if (m) {
|
||||
await this.#cache.update(m);
|
||||
}
|
||||
}
|
||||
});
|
||||
return removeUndefined(v.map(this.onEvent));
|
||||
}
|
||||
}
|
||||
}
|
2
packages/system/src/cache/index.ts
vendored
2
packages/system/src/cache/index.ts
vendored
@ -44,8 +44,8 @@ export interface RelayMetrics {
|
||||
|
||||
export interface UsersRelays {
|
||||
pubkey: string;
|
||||
created_at: number;
|
||||
relays: FullRelaySettings[];
|
||||
created: number;
|
||||
loaded: number;
|
||||
}
|
||||
|
||||
|
2
packages/system/src/cache/user-relays.ts
vendored
2
packages/system/src/cache/user-relays.ts
vendored
@ -19,7 +19,7 @@ export class UserRelaysCache extends FeedCache<UsersRelays> {
|
||||
|
||||
newest(): number {
|
||||
let ret = 0;
|
||||
this.cache.forEach(v => (ret = v.created_at > ret ? v.created_at : ret));
|
||||
this.cache.forEach(v => (ret = v.created > ret ? v.created : ret));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,7 @@ import { unwrap, sanitizeRelayUrl, FeedCache, removeUndefined } from "@snort/sha
|
||||
import { NostrEvent, TaggedNostrEvent } from "./nostr";
|
||||
import { Connection, RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
|
||||
import { Query } from "./query";
|
||||
import { NoteCollection, NoteStore, NoteStoreSnapshotData } from "./note-collection";
|
||||
import { NoteCollection, NoteStore } from "./note-collection";
|
||||
import { BuiltRawReqFilter, RequestBuilder, RequestStrategy } from "./request-builder";
|
||||
import { RelayMetricHandler } from "./relay-metric-handler";
|
||||
import {
|
||||
@ -22,7 +22,7 @@ import {
|
||||
EventExt,
|
||||
} from ".";
|
||||
import { EventsCache } from "./cache/events";
|
||||
import { RelayCache, pickRelaysForReply } from "./outbox-model";
|
||||
import { RelayCache, RelayMetadataLoader, pickRelaysForReply } from "./outbox-model";
|
||||
import { QueryOptimizer, DefaultQueryOptimizer } from "./query-optimizer";
|
||||
import { trimFilters } from "./request-trim";
|
||||
|
||||
@ -88,6 +88,8 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
|
||||
*/
|
||||
checkSigs: boolean;
|
||||
|
||||
#relayLoader: RelayMetadataLoader;
|
||||
|
||||
constructor(props: {
|
||||
relayCache?: FeedCache<UsersRelays>;
|
||||
profileCache?: FeedCache<MetadataCache>;
|
||||
@ -106,6 +108,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
|
||||
|
||||
this.#profileLoader = new ProfileLoaderService(this, this.#profileCache);
|
||||
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
|
||||
this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache);
|
||||
this.checkSigs = props.checkSigs ?? true;
|
||||
this.#cleanup();
|
||||
}
|
||||
@ -333,6 +336,9 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
|
||||
);
|
||||
}
|
||||
}
|
||||
if (f.authors) {
|
||||
this.#relayLoader.TrackKeys(f.authors);
|
||||
}
|
||||
}
|
||||
|
||||
// check for empty filters
|
||||
|
@ -3,6 +3,7 @@ import { dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared";
|
||||
import debug from "debug";
|
||||
import { FlatReqFilter } from "./query-optimizer";
|
||||
import { RelayListCacheExpire } from "./const";
|
||||
import { BackgroundLoader } from "./background-loader";
|
||||
|
||||
const PickNRelays = 2;
|
||||
|
||||
@ -224,9 +225,44 @@ export async function updateRelayLists(authors: Array<string>, system: SystemInt
|
||||
relayLists.map(a => ({
|
||||
relays: parseRelayTags(a.tags),
|
||||
pubkey: a.pubkey,
|
||||
created_at: a.created_at,
|
||||
created: a.created_at,
|
||||
loaded: unixNowMs(),
|
||||
})),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
|
||||
override name(): string {
|
||||
return "RelayMetadataLoader";
|
||||
}
|
||||
|
||||
override onEvent(e: Readonly<TaggedNostrEvent>): UsersRelays | undefined {
|
||||
return {
|
||||
relays: parseRelayTags(e.tags),
|
||||
pubkey: e.pubkey,
|
||||
created: e.created_at,
|
||||
loaded: unixNowMs(),
|
||||
};
|
||||
}
|
||||
|
||||
override getExpireCutoff(): number {
|
||||
return unixNowMs() - RelayListCacheExpire;
|
||||
}
|
||||
|
||||
protected override buildSub(missing: string[]): RequestBuilder {
|
||||
const rb = new RequestBuilder("relay-loader");
|
||||
rb.withOptions({ skipDiff: true });
|
||||
rb.withFilter().authors(missing).kinds([EventKind.Relays]);
|
||||
return rb;
|
||||
}
|
||||
|
||||
protected override makePlaceholder(key: string): UsersRelays | undefined {
|
||||
return {
|
||||
relays: [],
|
||||
pubkey: key,
|
||||
created: 0,
|
||||
loaded: this.getExpireCutoff() + 300_000,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -1,156 +1,40 @@
|
||||
import debug from "debug";
|
||||
import { unixNowMs, FeedCache } from "@snort/shared";
|
||||
import { EventKind, HexKey, SystemInterface, TaggedNostrEvent, RequestBuilder } from ".";
|
||||
import { unixNowMs } from "@snort/shared";
|
||||
import { EventKind, TaggedNostrEvent, RequestBuilder } from ".";
|
||||
import { ProfileCacheExpire } from "./const";
|
||||
import { mapEventToProfile, MetadataCache } from "./cache";
|
||||
import { v4 as uuid } from "uuid";
|
||||
import { BackgroundLoader } from "./background-loader";
|
||||
|
||||
const MetadataRelays = ["wss://purplepag.es"];
|
||||
|
||||
export class ProfileLoaderService {
|
||||
#system: SystemInterface;
|
||||
#cache: FeedCache<MetadataCache>;
|
||||
|
||||
/**
|
||||
* A set of pubkeys we could not find last run,
|
||||
* This list will attempt to use known profile metadata relays
|
||||
*/
|
||||
#missingLastRun: Set<string> = new Set();
|
||||
|
||||
/**
|
||||
* List of pubkeys to fetch metadata for
|
||||
*/
|
||||
#wantsMetadata: Set<HexKey> = new Set();
|
||||
|
||||
readonly #log = debug("ProfileCache");
|
||||
|
||||
/**
|
||||
* Custom loader function for fetching profiles from alternative sources
|
||||
*/
|
||||
loaderFn?: (pubkeys: Array<string>) => Promise<Array<MetadataCache>>;
|
||||
|
||||
constructor(system: SystemInterface, cache: FeedCache<MetadataCache>) {
|
||||
this.#system = system;
|
||||
this.#cache = cache;
|
||||
this.#FetchMetadata();
|
||||
export class ProfileLoaderService extends BackgroundLoader<MetadataCache> {
|
||||
override name(): string {
|
||||
return "ProfileLoaderService";
|
||||
}
|
||||
|
||||
get Cache() {
|
||||
return this.#cache;
|
||||
override onEvent(e: Readonly<TaggedNostrEvent>): MetadataCache | undefined {
|
||||
return mapEventToProfile(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Request profile metadata for a set of pubkeys
|
||||
*/
|
||||
TrackMetadata(pk: HexKey | Array<HexKey>) {
|
||||
for (const p of Array.isArray(pk) ? pk : [pk]) {
|
||||
if (p.length === 64) {
|
||||
this.#wantsMetadata.add(p);
|
||||
}
|
||||
}
|
||||
override getExpireCutoff(): number {
|
||||
return unixNowMs() - ProfileCacheExpire;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop tracking metadata for a set of pubkeys
|
||||
*/
|
||||
UntrackMetadata(pk: HexKey | Array<HexKey>) {
|
||||
for (const p of Array.isArray(pk) ? pk : [pk]) {
|
||||
if (p.length > 0) {
|
||||
this.#wantsMetadata.delete(p);
|
||||
}
|
||||
}
|
||||
override buildSub(missing: string[]): RequestBuilder {
|
||||
const sub = new RequestBuilder(`profiles-${uuid()}`);
|
||||
sub
|
||||
.withOptions({
|
||||
skipDiff: true,
|
||||
})
|
||||
.withFilter()
|
||||
.kinds([EventKind.SetMetadata])
|
||||
.authors(missing);
|
||||
return sub;
|
||||
}
|
||||
|
||||
async onProfileEvent(e: Readonly<TaggedNostrEvent>) {
|
||||
const profile = mapEventToProfile(e);
|
||||
if (profile) {
|
||||
await this.#cache.update(profile);
|
||||
}
|
||||
}
|
||||
|
||||
async fetchProfile(key: string) {
|
||||
const existing = this.Cache.get(key);
|
||||
if (existing) {
|
||||
return existing;
|
||||
} else {
|
||||
return await new Promise<MetadataCache>((resolve, reject) => {
|
||||
this.TrackMetadata(key);
|
||||
const release = this.Cache.hook(() => {
|
||||
const existing = this.Cache.getFromCache(key);
|
||||
if (existing) {
|
||||
resolve(existing);
|
||||
release();
|
||||
this.UntrackMetadata(key);
|
||||
}
|
||||
}, key);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async #FetchMetadata() {
|
||||
const missingFromCache = await this.#cache.buffer([...this.#wantsMetadata]);
|
||||
|
||||
const expire = unixNowMs() - ProfileCacheExpire;
|
||||
const expired = [...this.#wantsMetadata]
|
||||
.filter(a => !missingFromCache.includes(a))
|
||||
.filter(a => (this.#cache.getFromCache(a)?.loaded ?? 0) < expire);
|
||||
const missing = new Set([...missingFromCache, ...expired]);
|
||||
if (missing.size > 0) {
|
||||
this.#log("Wants profiles: %d missing, %d expired", missingFromCache.length, expired.length);
|
||||
|
||||
const results = await this.#loadProfiles([...missing]);
|
||||
|
||||
const couldNotFetch = [...missing].filter(a => !results.some(b => b.pubkey === a));
|
||||
this.#missingLastRun = new Set(couldNotFetch);
|
||||
if (couldNotFetch.length > 0) {
|
||||
this.#log("No profiles: %o", couldNotFetch);
|
||||
const empty = couldNotFetch.map(a =>
|
||||
this.#cache.update({
|
||||
pubkey: a,
|
||||
loaded: unixNowMs() - ProfileCacheExpire + 30_000, // expire in 30s
|
||||
created: 69,
|
||||
} as MetadataCache),
|
||||
);
|
||||
await Promise.all(empty);
|
||||
}
|
||||
|
||||
/* When we fetch an expired profile and its the same as what we already have
|
||||
// onEvent is not fired and the loaded timestamp never gets updated
|
||||
const expiredSame = results.filter(a => !newProfiles.has(a.id) && expired.includes(a.pubkey));
|
||||
await Promise.all(expiredSame.map(v => this.onProfileEvent(v)));*/
|
||||
}
|
||||
|
||||
setTimeout(() => this.#FetchMetadata(), 500);
|
||||
}
|
||||
|
||||
async #loadProfiles(missing: Array<string>) {
|
||||
if (this.loaderFn) {
|
||||
const results = await this.loaderFn(missing);
|
||||
await Promise.all(results.map(a => this.#cache.update(a)));
|
||||
return results;
|
||||
} else {
|
||||
const sub = new RequestBuilder(`profiles-${uuid()}`);
|
||||
sub
|
||||
.withOptions({
|
||||
skipDiff: true,
|
||||
})
|
||||
.withFilter()
|
||||
.kinds([EventKind.SetMetadata])
|
||||
.authors(missing);
|
||||
|
||||
if (this.#missingLastRun.size > 0) {
|
||||
const fMissing = sub
|
||||
.withFilter()
|
||||
.kinds([EventKind.SetMetadata])
|
||||
.authors([...this.#missingLastRun]);
|
||||
MetadataRelays.forEach(r => fMissing.relay(r));
|
||||
}
|
||||
const results = (await this.#system.Fetch(sub, async e => {
|
||||
for (const pe of e) {
|
||||
await this.onProfileEvent(pe);
|
||||
}
|
||||
})) as ReadonlyArray<TaggedNostrEvent>;
|
||||
return results;
|
||||
}
|
||||
protected override makePlaceholder(key: string): MetadataCache | undefined {
|
||||
return {
|
||||
pubkey: key,
|
||||
loaded: unixNowMs() - ProfileCacheExpire + 30_000,
|
||||
created: 0,
|
||||
} as MetadataCache;
|
||||
}
|
||||
}
|
||||
|
@ -7,11 +7,12 @@ export interface RelayInfo {
|
||||
software?: string;
|
||||
version?: string;
|
||||
limitation?: {
|
||||
payment_required: boolean;
|
||||
max_subscriptions: number;
|
||||
max_filters: number;
|
||||
max_event_tags: number;
|
||||
auth_required: boolean;
|
||||
payment_required?: boolean;
|
||||
max_subscriptions?: number;
|
||||
max_filters?: number;
|
||||
max_event_tags?: number;
|
||||
auth_required?: boolean;
|
||||
write_restricted?: boolean;
|
||||
};
|
||||
relay_countries?: Array<string>;
|
||||
language_tags?: Array<string>;
|
||||
|
Loading…
x
Reference in New Issue
Block a user