Cache all the things
This commit is contained in:
23
packages/system/src/cache/events.ts
vendored
Normal file
23
packages/system/src/cache/events.ts
vendored
Normal file
@ -0,0 +1,23 @@
|
||||
import { NostrEvent } from "nostr";
|
||||
import { db } from ".";
|
||||
import { FeedCache } from "@snort/shared";
|
||||
|
||||
export class EventsCache extends FeedCache<NostrEvent> {
|
||||
constructor() {
|
||||
super("EventsCache", db.events);
|
||||
}
|
||||
|
||||
key(of: NostrEvent): string {
|
||||
return of.id;
|
||||
}
|
||||
|
||||
override async preload(): Promise<void> {
|
||||
await super.preload();
|
||||
// load everything
|
||||
await this.buffer([...this.onTable]);
|
||||
}
|
||||
|
||||
takeSnapshot(): Array<NostrEvent> {
|
||||
return [...this.cache.values()];
|
||||
}
|
||||
}
|
@ -13,6 +13,7 @@ import {
|
||||
PowMiner,
|
||||
PrivateKeySigner,
|
||||
RelaySettings,
|
||||
SignerSupports,
|
||||
TaggedNostrEvent,
|
||||
u256,
|
||||
UserMetadata,
|
||||
@ -57,6 +58,10 @@ export class EventPublisher {
|
||||
return new EventPublisher(signer, signer.getPubKey());
|
||||
}
|
||||
|
||||
supports(t: SignerSupports) {
|
||||
return this.#signer.supports.includes(t);
|
||||
}
|
||||
|
||||
get pubKey() {
|
||||
return this.#pubKey;
|
||||
}
|
||||
|
@ -63,6 +63,10 @@ export class Nip46Signer implements EventSigner {
|
||||
this.#insideSigner = insideSigner ?? new PrivateKeySigner(secp256k1.utils.randomPrivateKey());
|
||||
}
|
||||
|
||||
get supports(): string[] {
|
||||
return ["nip04"]
|
||||
}
|
||||
|
||||
get relays() {
|
||||
return [this.#relay];
|
||||
}
|
||||
|
@ -21,6 +21,10 @@ declare global {
|
||||
}
|
||||
|
||||
export class Nip7Signer implements EventSigner {
|
||||
get supports(): string[] {
|
||||
return ["nip04"];
|
||||
}
|
||||
|
||||
init(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
import { AuthHandler, RelaySettings, ConnectionStateSnapshot } from "./connection";
|
||||
import { RequestBuilder } from "./request-builder";
|
||||
import { NoteStore } from "./note-collection";
|
||||
import { NoteStore, NoteStoreHook, NoteStoreSnapshotData } from "./note-collection";
|
||||
import { Query } from "./query";
|
||||
import { NostrEvent, ReqFilter } from "./nostr";
|
||||
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
|
||||
import { ProfileLoaderService } from "./profile-cache";
|
||||
|
||||
export * from "./nostr-system";
|
||||
@ -40,13 +40,61 @@ export interface SystemInterface {
|
||||
* Handler function for NIP-42
|
||||
*/
|
||||
HandleAuth?: AuthHandler;
|
||||
|
||||
/**
|
||||
* Get a snapshot of the relay connections
|
||||
*/
|
||||
get Sockets(): Array<ConnectionStateSnapshot>;
|
||||
|
||||
/**
|
||||
* Get an active query by ID
|
||||
* @param id Query ID
|
||||
*/
|
||||
GetQuery(id: string): Query | undefined;
|
||||
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder | null): Query;
|
||||
|
||||
/**
|
||||
* Open a new query to relays
|
||||
* @param type Store type
|
||||
* @param req Request to send to relays
|
||||
*/
|
||||
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query;
|
||||
|
||||
/**
|
||||
* Fetch data from nostr relays asynchronously
|
||||
* @param req Request to send to relays
|
||||
* @param cb A callback which will fire every 100ms when new data is received
|
||||
*/
|
||||
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) : Promise<NoteStoreSnapshotData>;
|
||||
|
||||
/**
|
||||
* Create a new permanent connection to a relay
|
||||
* @param address Relay URL
|
||||
* @param options Read/Write settings
|
||||
*/
|
||||
ConnectToRelay(address: string, options: RelaySettings): Promise<void>;
|
||||
|
||||
/**
|
||||
* Disconnect permanent relay connection
|
||||
* @param address Relay URL
|
||||
*/
|
||||
DisconnectRelay(address: string): void;
|
||||
|
||||
/**
|
||||
* Send an event to all permanent connections
|
||||
* @param ev Event to broadcast
|
||||
*/
|
||||
BroadcastEvent(ev: NostrEvent): void;
|
||||
|
||||
/**
|
||||
* Connect to a specific relay and send an event and wait for the response
|
||||
* @param relay Relay URL
|
||||
* @param ev Event to send
|
||||
*/
|
||||
WriteOnceToRelay(relay: string, ev: NostrEvent): Promise<void>;
|
||||
|
||||
/**
|
||||
* Profile cache/loader
|
||||
*/
|
||||
get ProfileLoader(): ProfileLoaderService;
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
import { bech32ToHex, hexToBech32 } from "@snort/shared";
|
||||
import { NostrPrefix, decodeTLV, TLVEntryType, encodeTLV } from ".";
|
||||
import { bech32ToHex, hexToBech32, unwrap } from "@snort/shared";
|
||||
import { NostrPrefix, decodeTLV, TLVEntryType, encodeTLV, NostrEvent, TaggedNostrEvent } from ".";
|
||||
import { findTag } from "./utils";
|
||||
|
||||
export interface NostrLink {
|
||||
type: NostrPrefix;
|
||||
@ -10,6 +11,29 @@ export interface NostrLink {
|
||||
encode(): string;
|
||||
}
|
||||
|
||||
export function createNostrLinkToEvent(ev: TaggedNostrEvent | NostrEvent) {
|
||||
const relays = "relays" in ev ? ev.relays : undefined;
|
||||
|
||||
if (ev.kind >= 30_000 && ev.kind < 40_000) {
|
||||
const dTag = unwrap(findTag(ev, "d"));
|
||||
return createNostrLink(NostrPrefix.Address, dTag, relays, ev.kind, ev.pubkey);
|
||||
}
|
||||
return createNostrLink(NostrPrefix.Event, ev.id, relays, ev.kind, ev.pubkey);
|
||||
}
|
||||
|
||||
export function linkMatch(link: NostrLink, ev: NostrEvent) {
|
||||
if(link.type === NostrPrefix.Address) {
|
||||
const dTag = findTag(ev, "d");
|
||||
if(dTag && dTag === link.id && unwrap(link.author) === ev.pubkey && unwrap(link.kind) === ev.kind) {
|
||||
return true;
|
||||
}
|
||||
} else if(link.type === NostrPrefix.Event || link.type === NostrPrefix.Note) {
|
||||
return link.id === ev.id;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export function createNostrLink(prefix: NostrPrefix, id: string, relays?: string[], kind?: number, author?: string) {
|
||||
return {
|
||||
type: prefix,
|
||||
|
@ -4,7 +4,7 @@ import { unwrap, sanitizeRelayUrl, ExternalStore, FeedCache } from "@snort/share
|
||||
import { NostrEvent, TaggedNostrEvent } from "./nostr";
|
||||
import { AuthHandler, Connection, RelaySettings, ConnectionStateSnapshot } from "./connection";
|
||||
import { Query } from "./query";
|
||||
import { NoteStore } from "./note-collection";
|
||||
import { NoteCollection, NoteStore, NoteStoreHook, NoteStoreSnapshotData } from "./note-collection";
|
||||
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
|
||||
import { RelayMetricHandler } from "./relay-metric-handler";
|
||||
import {
|
||||
@ -19,6 +19,7 @@ import {
|
||||
db,
|
||||
UsersRelays,
|
||||
} from ".";
|
||||
import { EventsCache } from "./cache/events";
|
||||
|
||||
/**
|
||||
* Manages nostr content retrieval system
|
||||
@ -66,22 +67,30 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
*/
|
||||
#relayMetrics: RelayMetricHandler;
|
||||
|
||||
/**
|
||||
* General events cache
|
||||
*/
|
||||
#eventsCache: FeedCache<NostrEvent>;
|
||||
|
||||
constructor(props: {
|
||||
authHandler?: AuthHandler;
|
||||
relayCache?: FeedCache<UsersRelays>;
|
||||
profileCache?: FeedCache<MetadataCache>;
|
||||
relayMetrics?: FeedCache<RelayMetrics>;
|
||||
eventsCache?: FeedCache<NostrEvent>;
|
||||
}) {
|
||||
super();
|
||||
this.#handleAuth = props.authHandler;
|
||||
this.#relayCache = props.relayCache ?? new UserRelaysCache();
|
||||
this.#profileCache = props.profileCache ?? new UserProfileCache();
|
||||
this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache();
|
||||
this.#eventsCache = props.eventsCache ?? new EventsCache();
|
||||
|
||||
this.#profileLoader = new ProfileLoaderService(this, this.#profileCache);
|
||||
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
|
||||
this.#cleanup();
|
||||
}
|
||||
HandleAuth?: AuthHandler | undefined;
|
||||
|
||||
/**
|
||||
* Profile loader service allows you to request profiles
|
||||
@ -99,7 +108,12 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
*/
|
||||
async Init() {
|
||||
db.ready = await db.isAvailable();
|
||||
const t = [this.#relayCache.preload(), this.#profileCache.preload(), this.#relayMetricsCache.preload()];
|
||||
const t = [
|
||||
this.#relayCache.preload(),
|
||||
this.#profileCache.preload(),
|
||||
this.#relayMetricsCache.preload(),
|
||||
this.#eventsCache.preload()
|
||||
];
|
||||
await Promise.all(t);
|
||||
}
|
||||
|
||||
@ -190,6 +204,33 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
return this.Queries.get(id);
|
||||
}
|
||||
|
||||
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
|
||||
const q = this.Query(NoteCollection, req);
|
||||
return new Promise<NoteStoreSnapshotData>((resolve) => {
|
||||
let t: ReturnType<typeof setTimeout> | undefined;
|
||||
let tBuf: Array<TaggedNostrEvent> = [];
|
||||
const releaseOnEvent = cb ? q.feed.onEvent(evs => {
|
||||
if(!t) {
|
||||
tBuf = [...evs];
|
||||
t = setTimeout(() => {
|
||||
t = undefined;
|
||||
cb(tBuf);
|
||||
}, 100);
|
||||
} else {
|
||||
tBuf.push(...evs);
|
||||
}
|
||||
}) : undefined;
|
||||
const releaseFeedHook = q.feed.hook(() => {
|
||||
if(q.progress === 1) {
|
||||
releaseOnEvent?.();
|
||||
releaseFeedHook();
|
||||
q.cancel();
|
||||
resolve(unwrap(q.feed.snapshot.data));
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query {
|
||||
const existing = this.Queries.get(req.id);
|
||||
if (existing) {
|
||||
@ -214,6 +255,11 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
|
||||
const filters = req.build(this.#relayCache);
|
||||
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen);
|
||||
if(filters.some(a => a.filters.some(b=>b.ids))) {
|
||||
q.feed.onEvent(async evs => {
|
||||
await this.#eventsCache.bulkSet(evs);
|
||||
});
|
||||
}
|
||||
this.Queries.set(req.id, q);
|
||||
for (const subQ of filters) {
|
||||
this.SendQuery(q, subQ);
|
||||
@ -224,6 +270,24 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
}
|
||||
|
||||
async SendQuery(q: Query, qSend: BuiltRawReqFilter) {
|
||||
// trim query of cached ids
|
||||
for(const f of qSend.filters) {
|
||||
if (f.ids) {
|
||||
const cacheResults = await this.#eventsCache.bulkGet(f.ids);
|
||||
if(cacheResults.length > 0) {
|
||||
const resultIds = new Set(cacheResults.map(a => a.id));
|
||||
f.ids = f.ids.filter(a => !resultIds.has(a));
|
||||
q.feed.add(cacheResults as Array<TaggedNostrEvent>);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check for empty filters
|
||||
qSend.filters = qSend.filters.filter(a => Object.values(a).filter(v => Array.isArray(v)).every(b => (b as Array<string | number>).length > 0));
|
||||
if(qSend.filters.length === 0) {
|
||||
return;
|
||||
|
||||
}
|
||||
if (qSend.relay) {
|
||||
this.#log("Sending query to %s %O", qSend.relay, qSend);
|
||||
const s = this.#sockets.get(qSend.relay);
|
||||
|
@ -20,7 +20,7 @@ export const EmptySnapshot = {
|
||||
},
|
||||
} as StoreSnapshot<FlatNoteStore>;
|
||||
|
||||
export type NoteStoreSnapshotData = Readonly<Array<TaggedNostrEvent>> | Readonly<TaggedNostrEvent>;
|
||||
export type NoteStoreSnapshotData = Array<TaggedNostrEvent> | TaggedNostrEvent;
|
||||
export type NoteStoreHook = () => void;
|
||||
export type NoteStoreHookRelease = () => void;
|
||||
export type OnEventCallback = (e: Readonly<Array<TaggedNostrEvent>>) => void;
|
||||
@ -134,10 +134,28 @@ export abstract class HookedNoteStore<TSnapshot extends NoteStoreSnapshotData> i
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A store which doesnt store anything, useful for hooks only
|
||||
*/
|
||||
export class NoopStore extends HookedNoteStore<Array<TaggedNostrEvent>> {
|
||||
override add(ev: readonly TaggedNostrEvent[] | Readonly<TaggedNostrEvent>): void {
|
||||
this.onChange(Array.isArray(ev) ? ev : [ev]);
|
||||
}
|
||||
|
||||
override clear(): void {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
protected override takeSnapshot(): TaggedNostrEvent[] | undefined {
|
||||
// nothing to do
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple flat container of events with no duplicates
|
||||
*/
|
||||
export class FlatNoteStore extends HookedNoteStore<Readonly<Array<TaggedNostrEvent>>> {
|
||||
export class FlatNoteStore extends HookedNoteStore<Array<TaggedNostrEvent>> {
|
||||
#events: Array<TaggedNostrEvent> = [];
|
||||
#ids: Set<u256> = new Set();
|
||||
|
||||
@ -176,7 +194,7 @@ export class FlatNoteStore extends HookedNoteStore<Readonly<Array<TaggedNostrEve
|
||||
/**
|
||||
* A note store that holds a single replaceable event for a given user defined key generator function
|
||||
*/
|
||||
export class KeyedReplaceableNoteStore extends HookedNoteStore<Readonly<Array<TaggedNostrEvent>>> {
|
||||
export class KeyedReplaceableNoteStore extends HookedNoteStore<Array<TaggedNostrEvent>> {
|
||||
#keyFn: (ev: TaggedNostrEvent) => string;
|
||||
#events: Map<string, TaggedNostrEvent> = new Map();
|
||||
|
||||
|
@ -7,6 +7,8 @@ import { MessageEncryptorPayload, MessageEncryptorVersion } from "./index";
|
||||
import { NostrEvent } from "./nostr";
|
||||
import { base64 } from "@scure/base";
|
||||
|
||||
export type SignerSupports = "nip04" | "nip44" | string;
|
||||
|
||||
export interface EventSigner {
|
||||
init(): Promise<void>;
|
||||
getPubKey(): Promise<string> | string;
|
||||
@ -15,6 +17,7 @@ export interface EventSigner {
|
||||
nip44Encrypt(content: string, key: string): Promise<string>;
|
||||
nip44Decrypt(content: string, otherKey: string): Promise<string>;
|
||||
sign(ev: NostrEvent): Promise<NostrEvent>;
|
||||
get supports(): Array<SignerSupports>;
|
||||
}
|
||||
|
||||
export class PrivateKeySigner implements EventSigner {
|
||||
@ -30,6 +33,10 @@ export class PrivateKeySigner implements EventSigner {
|
||||
this.#publicKey = getPublicKey(this.#privateKey);
|
||||
}
|
||||
|
||||
get supports(): string[] {
|
||||
return ["nip04", "nip44"]
|
||||
}
|
||||
|
||||
get privateKey() {
|
||||
return this.#privateKey;
|
||||
}
|
||||
|
@ -20,6 +20,10 @@ export class SystemWorker extends ExternalStore<SystemSnapshot> implements Syste
|
||||
throw new Error("SharedWorker is not supported");
|
||||
}
|
||||
}
|
||||
|
||||
Fetch(req: RequestBuilder): Promise<Query> {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
get ProfileLoader(): ProfileLoaderService {
|
||||
throw new Error("Method not implemented.");
|
||||
|
Reference in New Issue
Block a user