use @snort/system cache

This commit is contained in:
2023-06-15 12:03:05 +01:00
parent c2a3a706de
commit fc11381ccd
79 changed files with 679 additions and 524 deletions

View File

@ -1,6 +1,6 @@
import { NostrEvent } from "@snort/system";
import { FeedCache } from "@snort/shared";
import { db } from "Db";
import FeedCache from "./FeedCache";
class DMCache extends FeedCache<NostrEvent> {
constructor() {

View File

@ -1,7 +1,7 @@
import { FeedCache } from "@snort/shared";
import { db, EventInteraction } from "Db";
import { LoginStore } from "Login";
import { sha256 } from "SnortUtils";
import FeedCache from "./FeedCache";
class EventInteractionCache extends FeedCache<EventInteraction> {
constructor() {

View File

@ -1,207 +0,0 @@
import { db } from "Db";
import debug from "debug";
import { Table } from "dexie";
import { unixNowMs, unwrap } from "SnortUtils";
type HookFn = () => void;
interface HookFilter {
key: string;
fn: HookFn;
}
export default abstract class FeedCache<TCached> {
#name: string;
#hooks: Array<HookFilter> = [];
#snapshot: Readonly<Array<TCached>> = [];
#changed = true;
#hits = 0;
#miss = 0;
protected table: Table<TCached>;
protected onTable: Set<string> = new Set();
protected cache: Map<string, TCached> = new Map();
constructor(name: string, table: Table<TCached>) {
this.#name = name;
this.table = table;
setInterval(() => {
debug(this.#name)(
"%d loaded, %d on-disk, %d hooks, %d% hit",
this.cache.size,
this.onTable.size,
this.#hooks.length,
((this.#hits / (this.#hits + this.#miss)) * 100).toFixed(1)
);
}, 30_000);
}
async preload() {
if (db.ready) {
const keys = await this.table.toCollection().primaryKeys();
this.onTable = new Set<string>(keys.map(a => a as string));
}
}
hook(fn: HookFn, key: string | undefined) {
if (!key) {
return () => {
//noop
};
}
this.#hooks.push({
key,
fn,
});
return () => {
const idx = this.#hooks.findIndex(a => a.fn === fn);
if (idx >= 0) {
this.#hooks.splice(idx, 1);
}
};
}
getFromCache(key?: string) {
if (key) {
const ret = this.cache.get(key);
if (ret) {
this.#hits++;
} else {
this.#miss++;
}
return ret;
}
}
async get(key?: string) {
if (key && !this.cache.has(key) && db.ready) {
const cached = await this.table.get(key);
if (cached) {
this.cache.set(this.key(cached), cached);
this.notifyChange([key]);
return cached;
}
}
return key ? this.cache.get(key) : undefined;
}
async bulkGet(keys: Array<string>) {
const missing = keys.filter(a => !this.cache.has(a));
if (missing.length > 0 && db.ready) {
const cached = await this.table.bulkGet(missing);
cached.forEach(a => {
if (a) {
this.cache.set(this.key(a), a);
}
});
}
return keys
.map(a => this.cache.get(a))
.filter(a => a)
.map(a => unwrap(a));
}
async set(obj: TCached) {
const k = this.key(obj);
this.cache.set(k, obj);
if (db.ready) {
await this.table.put(obj);
this.onTable.add(k);
}
this.notifyChange([k]);
}
async bulkSet(obj: Array<TCached>) {
if (db.ready) {
await this.table.bulkPut(obj);
obj.forEach(a => this.onTable.add(this.key(a)));
}
obj.forEach(v => this.cache.set(this.key(v), v));
this.notifyChange(obj.map(a => this.key(a)));
}
/**
* Try to update an entry where created values exists
* @param m Profile metadata
* @returns
*/
async update<TCachedWithCreated extends TCached & { created: number; loaded: number }>(m: TCachedWithCreated) {
const k = this.key(m);
const existing = this.getFromCache(k) as TCachedWithCreated;
const updateType = (() => {
if (!existing) {
return "new";
}
if (existing.created < m.created) {
return "updated";
}
if (existing && existing.loaded < m.loaded) {
return "refresh";
}
return "no_change";
})();
debug(this.#name)("Updating %s %s %o", k, updateType, m);
if (updateType !== "no_change") {
const updated = {
...existing,
...m,
};
await this.set(updated);
}
return updateType;
}
/**
* Loads a list of rows from disk cache
* @param keys List of ids to load
* @returns Keys that do not exist on disk cache
*/
async buffer(keys: Array<string>): Promise<Array<string>> {
const needsBuffer = keys.filter(a => !this.cache.has(a));
if (db.ready && needsBuffer.length > 0) {
const mapped = needsBuffer.map(a => ({
has: this.onTable.has(a),
key: a,
}));
const start = unixNowMs();
const fromCache = await this.table.bulkGet(mapped.filter(a => a.has).map(a => a.key));
const fromCacheFiltered = fromCache.filter(a => a !== undefined).map(a => unwrap(a));
fromCacheFiltered.forEach(a => {
this.cache.set(this.key(a), a);
});
this.notifyChange(fromCacheFiltered.map(a => this.key(a)));
debug(this.#name)(
`Loaded %d/%d in %d ms`,
fromCacheFiltered.length,
keys.length,
(unixNowMs() - start).toLocaleString()
);
return mapped.filter(a => !a.has).map(a => a.key);
}
// no IndexdDB always return all keys
return needsBuffer;
}
async clear() {
await this.table.clear();
this.cache.clear();
this.onTable.clear();
}
snapshot() {
if (this.#changed) {
this.#snapshot = this.takeSnapshot();
this.#changed = false;
}
return this.#snapshot;
}
protected notifyChange(keys: Array<string>) {
this.#changed = true;
this.#hooks.filter(a => keys.includes(a.key) || a.key === "*").forEach(h => h.fn());
}
abstract key(of: TCached): string;
abstract takeSnapshot(): Array<TCached>;
}

View File

@ -1,5 +1,5 @@
import { Payment, db } from "Db";
import FeedCache from "./FeedCache";
import { FeedCache } from "@snort/shared";
class Payments extends FeedCache<Payment> {
constructor() {

View File

@ -1,153 +0,0 @@
import FeedCache from "Cache/FeedCache";
import { db } from "Db";
import { MetadataCache } from "@snort/system";
import { LNURL } from "LNURL";
import { fetchNip05Pubkey } from "Nip05/Verifier";
class UserProfileCache extends FeedCache<MetadataCache> {
#zapperQueue: Array<{ pubkey: string; lnurl: string }> = [];
#nip5Queue: Array<{ pubkey: string; nip05: string }> = [];
constructor() {
super("UserCache", db.users);
this.#processZapperQueue();
this.#processNip5Queue();
}
key(of: MetadataCache): string {
return of.pubkey;
}
override async preload(follows?: Array<string>): Promise<void> {
await super.preload();
// load follows profiles
if (follows) {
await this.buffer(follows);
}
}
async search(q: string): Promise<Array<MetadataCache>> {
if (db.ready) {
// on-disk cache will always have more data
return (
await db.users
.where("npub")
.startsWithIgnoreCase(q)
.or("name")
.startsWithIgnoreCase(q)
.or("display_name")
.startsWithIgnoreCase(q)
.or("nip05")
.startsWithIgnoreCase(q)
.toArray()
).slice(0, 5);
} else {
return [...this.cache.values()]
.filter(user => {
const profile = user as MetadataCache;
return (
profile.name?.includes(q) ||
profile.npub?.includes(q) ||
profile.display_name?.includes(q) ||
profile.nip05?.includes(q)
);
})
.slice(0, 5);
}
}
/**
* Try to update the profile metadata cache with a new version
* @param m Profile metadata
* @returns
*/
override async update(m: MetadataCache) {
const updateType = await super.update(m);
if (updateType !== "refresh") {
const lnurl = m.lud16 ?? m.lud06;
if (lnurl) {
this.#zapperQueue.push({
pubkey: m.pubkey,
lnurl,
});
}
if (m.nip05) {
this.#nip5Queue.push({
pubkey: m.pubkey,
nip05: m.nip05,
});
}
}
return updateType;
}
takeSnapshot(): MetadataCache[] {
return [];
}
async #processZapperQueue() {
await this.#batchQueue(
this.#zapperQueue,
async i => {
const svc = new LNURL(i.lnurl);
await svc.load();
const p = this.getFromCache(i.pubkey);
if (p) {
await this.set({
...p,
zapService: svc.zapperPubkey,
});
}
},
5
);
setTimeout(() => this.#processZapperQueue(), 1_000);
}
async #processNip5Queue() {
await this.#batchQueue(
this.#nip5Queue,
async i => {
const [name, domain] = i.nip05.split("@");
const nip5pk = await fetchNip05Pubkey(name, domain);
const p = this.getFromCache(i.pubkey);
if (p) {
await this.set({
...p,
isNostrAddressValid: i.pubkey === nip5pk,
});
}
},
5
);
setTimeout(() => this.#processNip5Queue(), 1_000);
}
async #batchQueue<T>(queue: Array<T>, proc: (v: T) => Promise<void>, batchSize = 3) {
const batch = [];
while (queue.length > 0) {
const i = queue.shift();
if (i) {
batch.push(
(async () => {
try {
await proc(i);
} catch {
console.warn("Failed to process item", i);
}
batch.pop(); // pop any
})()
);
if (batch.length === batchSize) {
await Promise.all(batch);
}
} else {
await Promise.all(batch);
}
}
}
}
export const UserCache = new UserProfileCache();

View File

@ -1,31 +0,0 @@
import { db, UsersRelays } from "Db";
import FeedCache from "./FeedCache";
export class UsersRelaysCache extends FeedCache<UsersRelays> {
constructor() {
super("UserRelays", db.userRelays);
}
key(of: UsersRelays): string {
return of.pubkey;
}
override async preload(follows?: Array<string>): Promise<void> {
await super.preload();
if (follows) {
await this.buffer(follows);
}
}
newest(): number {
let ret = 0;
this.cache.forEach(v => (ret = v.created_at > ret ? v.created_at : ret));
return ret;
}
takeSnapshot(): Array<UsersRelays> {
return [...this.cache.values()];
}
}
export const UserRelays = new UsersRelaysCache();

View File

@ -1,7 +1,10 @@
import { UserProfileCache, UserRelaysCache } from "@snort/system";
import { DmCache } from "./DMCache";
import { InteractionCache } from "./EventInteractionCache";
import { UserCache } from "./UserCache";
import { UserRelays } from "./UserRelayCache";
export const UserCache = new UserProfileCache();
export const UserRelays = new UserRelaysCache();
export { DmCache };
export async function preload(follows?: Array<string>) {
const preloads = [
@ -12,5 +15,3 @@ export async function preload(follows?: Array<string>) {
];
await Promise.all(preloads);
}
export { UserCache, DmCache };