feat: gossip model

This commit is contained in:
Kieran 2023-05-08 17:55:46 +01:00
parent 1923273f6f
commit d47aaba7d1
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
11 changed files with 276 additions and 251 deletions

View File

@ -107,6 +107,37 @@ export default abstract class FeedCache<TCached> {
this.notifyChange(obj.map(a => this.key(a)));
}
/**
* Try to update an entry where created values exists
* @param m Profile metadata
* @returns
*/
async update<TCachedWithCreated extends TCached & { created: number; loaded: number }>(m: TCachedWithCreated) {
const k = this.key(m);
const existing = this.getFromCache(k) as TCachedWithCreated;
const updateType = (() => {
if (!existing) {
return "new";
}
if (existing.created < m.created) {
return "updated";
}
if (existing && existing.loaded < m.loaded) {
return "refresh";
}
return "no_change";
})();
console.debug(`Updating ${k} ${updateType}`, m);
if (updateType !== "no_change") {
const updated = {
...existing,
...m,
};
await this.set(updated);
}
return updateType;
}
/**
* Loads a list of rows from disk cache
* @param keys List of ids to load

View File

@ -50,35 +50,15 @@ class UserProfileCache extends FeedCache<MetadataCache> {
* @param m Profile metadata
* @returns
*/
async update(m: MetadataCache) {
const existing = this.getFromCache(m.pubkey);
const updateType = (() => {
if (!existing) {
return "new_profile";
}
if (existing.created < m.created) {
return "updated_profile";
}
if (existing && existing.loaded < m.loaded) {
return "refresh_profile";
}
return "no_change";
})();
console.debug(`Updating ${m.pubkey} ${updateType}`, m);
if (updateType !== "no_change") {
const writeProfile = {
...existing,
...m,
};
await this.#setItem(writeProfile);
if (updateType !== "refresh_profile") {
const lnurl = m.lud16 ?? m.lud06;
if (lnurl) {
this.#zapperQueue.push({
pubkey: m.pubkey,
lnurl,
});
}
override async update(m: MetadataCache) {
const updateType = await super.update(m);
if (updateType !== "refresh") {
const lnurl = m.lud16 ?? m.lud06;
if (lnurl) {
this.#zapperQueue.push({
pubkey: m.pubkey,
lnurl,
});
}
}
return updateType;
@ -88,15 +68,6 @@ class UserProfileCache extends FeedCache<MetadataCache> {
return [];
}
async #setItem(m: MetadataCache) {
this.cache.set(m.pubkey, m);
if (db.ready) {
await db.users.put(m);
this.onTable.add(m.pubkey);
}
this.notifyChange([m.pubkey]);
}
async #processZapperQueue() {
while (this.#zapperQueue.length > 0) {
const i = this.#zapperQueue.shift();
@ -106,7 +77,7 @@ class UserProfileCache extends FeedCache<MetadataCache> {
await svc.load();
const p = this.getFromCache(i.pubkey);
if (p) {
this.#setItem({
await this.set({
...p,
zapService: svc.zapperPubkey,
});

View File

@ -0,0 +1,18 @@
import { db, UsersRelays } from "Db";
import FeedCache from "./FeedCache";
class UsersRelaysCache extends FeedCache<UsersRelays> {
constructor() {
super("UserRelays", db.userRelays);
}
key(of: UsersRelays): string {
return of.pubkey;
}
takeSnapshot(): Array<UsersRelays> {
return [...this.cache.values()];
}
}
export const UserRelays = new UsersRelaysCache();

View File

@ -3,6 +3,7 @@ import { hexToBech32, unixNowMs } from "Util";
import { DmCache } from "./DMCache";
import { InteractionCache } from "./EventInteractionCache";
import { UserCache } from "./UserCache";
import { UserRelays } from "./UserRelayCache";
export interface MetadataCache extends UserMetadata {
/**
@ -50,6 +51,7 @@ export async function preload() {
await UserCache.preload();
await DmCache.preload();
await InteractionCache.preload();
await UserRelays.preload();
}
export { UserCache, DmCache };

View File

@ -13,13 +13,15 @@ import useLogin from "Hooks/useLogin";
import { addSubscription, setBlocked, setBookmarked, setFollows, setMuted, setPinned, setRelays, setTags } from "Login";
import { SnortPubKey } from "Const";
import { SubscriptionEvent } from "Subscription";
import useRelaysFeedFollows from "./RelaysFeedFollows";
import { UserRelays } from "Cache/UserRelayCache";
/**
* Managed loading data for the current logged in user
*/
export default function useLoginFeed() {
const login = useLogin();
const { publicKey: pubKey, readNotifications } = login;
const { publicKey: pubKey, readNotifications, follows } = login;
const { isMuted } = useModeration();
const publisher = useEventPublisher();
@ -171,8 +173,12 @@ export default function useLoginFeed() {
}
}, [listsFeed]);
/*const fRelays = useRelaysFeedFollows(follows);
useEffect(() => {
FollowsRelays.bulkSet(fRelays).catch(console.error);
}, [dispatch, fRelays]);*/
UserRelays.buffer(follows.item).catch(console.error);
}, [follows.item]);
const fRelays = useRelaysFeedFollows(follows.item);
useEffect(() => {
UserRelays.bulkSet(fRelays).catch(console.error);
}, [fRelays]);
}

View File

@ -5,69 +5,72 @@ import { sanitizeRelayUrl } from "Util";
import { PubkeyReplaceableNoteStore, RequestBuilder } from "System";
import useRequestBuilder from "Hooks/useRequestBuilder";
type UserRelayMap = Record<HexKey, Array<FullRelaySettings>>;
interface RelayList {
pubkey: string;
created: number;
relays: FullRelaySettings[];
}
export default function useRelaysFeedFollows(pubkeys: HexKey[]): UserRelayMap {
export default function useRelaysFeedFollows(pubkeys: HexKey[]): Array<RelayList> {
const sub = useMemo(() => {
const b = new RequestBuilder(`relays:follows`);
b.withFilter().authors(pubkeys).kinds([EventKind.Relays, EventKind.ContactList]);
return b;
}, [pubkeys]);
function mapFromRelays(notes: Array<TaggedRawEvent>): UserRelayMap {
return Object.fromEntries(
notes.map(ev => {
return [
ev.pubkey,
ev.tags
.map(a => {
return {
url: sanitizeRelayUrl(a[1]),
settings: {
read: a[2] === "read" || a[2] === undefined,
write: a[2] === "write" || a[2] === undefined,
},
} as FullRelaySettings;
})
.filter(a => a.url !== undefined),
];
})
);
function mapFromRelays(notes: Array<TaggedRawEvent>): Array<RelayList> {
return notes.map(ev => {
return {
pubkey: ev.pubkey,
created: ev.created_at,
relays: ev.tags
.map(a => {
return {
url: sanitizeRelayUrl(a[1]),
settings: {
read: a[2] === "read" || a[2] === undefined,
write: a[2] === "write" || a[2] === undefined,
},
} as FullRelaySettings;
})
.filter(a => a.url !== undefined),
};
});
}
function mapFromContactList(notes: Array<TaggedRawEvent>): UserRelayMap {
return Object.fromEntries(
notes.map(ev => {
if (ev.content !== "" && ev.content !== "{}" && ev.content.startsWith("{") && ev.content.endsWith("}")) {
try {
const relays: Record<string, RelaySettings> = JSON.parse(ev.content);
return [
ev.pubkey,
Object.entries(relays)
.map(([k, v]) => {
return {
url: sanitizeRelayUrl(k),
settings: v,
} as FullRelaySettings;
})
.filter(a => a.url !== undefined),
];
} catch {
// ignored
}
function mapFromContactList(notes: Array<TaggedRawEvent>): Array<RelayList> {
return notes.map(ev => {
if (ev.content !== "" && ev.content !== "{}" && ev.content.startsWith("{") && ev.content.endsWith("}")) {
try {
const relays: Record<string, RelaySettings> = JSON.parse(ev.content);
return {
pubkey: ev.pubkey,
created: ev.created_at,
relays: Object.entries(relays)
.map(([k, v]) => {
return {
url: sanitizeRelayUrl(k),
settings: v,
} as FullRelaySettings;
})
.filter(a => a.url !== undefined),
};
} catch {
// ignored
}
return [ev.pubkey, []];
})
);
}
return {
pubkey: ev.pubkey,
created: 0,
relays: [],
};
});
}
const relays = useRequestBuilder<PubkeyReplaceableNoteStore>(PubkeyReplaceableNoteStore, sub);
const notesRelays = relays.data?.filter(a => a.kind === EventKind.Relays) ?? [];
const notesContactLists = relays.data?.filter(a => a.kind === EventKind.ContactList) ?? [];
return useMemo(() => {
return {
...mapFromContactList(notesContactLists),
...mapFromRelays(notesRelays),
} as UserRelayMap;
return [...mapFromContactList(notesContactLists), ...mapFromRelays(notesRelays)];
}, [relays]);
}

View File

@ -1,65 +0,0 @@
import { HexKey } from "@snort/nostr";
import { useMemo } from "react";
import { FollowsRelays } from "State/Relays";
import { unwrap } from "Util";
export type RelayPicker = ReturnType<typeof useRelaysForFollows>;
/**
* Number of relays to pick per pubkey
*/
const PickNRelays = 2;
export default function useRelaysForFollows(keys: Array<HexKey>) {
return useMemo(() => {
if (keys.length === 0) {
return {};
}
const allRelays = keys.map(a => {
return {
key: a,
relays: FollowsRelays.snapshot.get(a),
};
});
const missing = allRelays.filter(a => a.relays === undefined);
const hasRelays = allRelays.filter(a => a.relays !== undefined);
const relayUserMap = hasRelays.reduce((acc, v) => {
for (const r of unwrap(v.relays)) {
if (!acc.has(r.url)) {
acc.set(r.url, new Set([v.key]));
} else {
unwrap(acc.get(r.url)).add(v.key);
}
}
return acc;
}, new Map<string, Set<HexKey>>());
const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size);
// <relay, key[]> - count keys per relay
// <key, relay[]> - pick n top relays
// <relay, key[]> - map keys per relay (for subscription filter)
const userPickedRelays = keys.map(k => {
// pick top 3 relays for this key
const relaysForKey = topRelays
.filter(([, v]) => v.has(k))
.slice(0, PickNRelays)
.map(([k]) => k);
return { k, relaysForKey };
});
const pickedRelays = new Set(userPickedRelays.map(a => a.relaysForKey).flat());
const picked = Object.fromEntries(
[...pickedRelays].map(a => {
const keysOnPickedRelay = new Set(userPickedRelays.filter(b => b.relaysForKey.includes(a)).map(b => b.k));
return [a, [...keysOnPickedRelay]];
})
);
picked[""] = missing.map(a => a.key);
console.debug(picked);
return picked;
}, [keys]);
}

View File

@ -1,83 +0,0 @@
import { FullRelaySettings, HexKey } from "@snort/nostr";
import { db } from "Db";
import { unixNowMs, unwrap } from "Util";
export class UserRelays {
#store: Map<HexKey, Array<FullRelaySettings>>;
#snapshot: Readonly<Map<HexKey, Array<FullRelaySettings>>>;
constructor() {
this.#store = new Map();
this.#snapshot = Object.freeze(new Map());
}
get snapshot() {
return this.#snapshot;
}
async get(key: HexKey) {
if (!this.#store.has(key) && db.ready) {
const cached = await db.userRelays.get(key);
if (cached) {
this.#store.set(key, cached.relays);
return cached.relays;
}
}
return this.#store.get(key);
}
async bulkGet(keys: Array<HexKey>) {
const missing = keys.filter(a => !this.#store.has(a));
if (missing.length > 0 && db.ready) {
const cached = await db.userRelays.bulkGet(missing);
cached.forEach(a => {
if (a) {
this.#store.set(a.pubkey, a.relays);
}
});
}
return new Map(keys.map(a => [a, this.#store.get(a) ?? []]));
}
async set(key: HexKey, relays: Array<FullRelaySettings>) {
this.#store.set(key, relays);
if (db.ready) {
await db.userRelays.put({
pubkey: key,
relays,
});
}
this._update();
}
async bulkSet(obj: Record<HexKey, Array<FullRelaySettings>>) {
if (db.ready) {
await db.userRelays.bulkPut(
Object.entries(obj).map(([k, v]) => {
return {
pubkey: k,
relays: v,
};
})
);
}
Object.entries(obj).forEach(([k, v]) => this.#store.set(k, v));
this._update();
}
async preload() {
const start = unixNowMs();
const keys = await db.userRelays.toCollection().keys();
const fullCache = await db.userRelays.bulkGet(keys);
this.#store = new Map(fullCache.filter(a => a !== undefined).map(a => [unwrap(a).pubkey, a?.relays ?? []]));
this._update();
console.debug(`Preloaded ${this.#store.size} users relays in ${(unixNowMs() - start).toLocaleString()} ms`);
}
private _update() {
this.#snapshot = Object.freeze(new Map(this.#store));
}
}
export const FollowsRelays = new UserRelays();

View File

@ -0,0 +1,109 @@
import { RawReqFilter } from "@snort/nostr";
import { UserRelays } from "Cache/UserRelayCache";
import { unwrap } from "Util";
const PickNRelays = 2;
export interface RelayTaggedFilter {
relay: string;
filter: RawReqFilter;
}
export interface RelayTaggedFilters {
relay: string;
filters: Array<RawReqFilter>;
}
export function splitAllByWriteRelays(filters: Array<RawReqFilter>) {
const allSplit = filters.map(splitByWriteRelays).reduce((acc, v) => {
for (const vn of v) {
const existing = acc.get(vn.relay);
if (existing) {
existing.push(vn.filter);
} else {
acc.set(vn.relay, [vn.filter]);
}
}
return acc;
}, new Map<string, Array<RawReqFilter>>());
return [...allSplit.entries()].map(([k, v]) => {
return {
relay: k,
filters: v,
} as RelayTaggedFilters;
});
}
/**
* Split filters by authors
* @param filter
* @returns
*/
export function splitByWriteRelays(filter: RawReqFilter): Array<RelayTaggedFilter> {
if ((filter.authors?.length ?? 0) === 0)
return [
{
relay: "",
filter,
},
];
const allRelays = unwrap(filter.authors).map(a => {
return {
key: a,
relays: UserRelays.getFromCache(a)?.relays,
};
});
const missing = allRelays.filter(a => a.relays === undefined);
const hasRelays = allRelays.filter(a => a.relays !== undefined);
const relayUserMap = hasRelays.reduce((acc, v) => {
for (const r of unwrap(v.relays)) {
if (!acc.has(r.url)) {
acc.set(r.url, new Set([v.key]));
} else {
unwrap(acc.get(r.url)).add(v.key);
}
}
return acc;
}, new Map<string, Set<string>>());
// selection algo will just pick relays with the most users
const topRelays = [...relayUserMap.entries()].sort(([, v], [, v1]) => v1.size - v.size);
// <relay, key[]> - count keys per relay
// <key, relay[]> - pick n top relays
// <relay, key[]> - map keys per relay (for subscription filter)
const userPickedRelays = unwrap(filter.authors).map(k => {
// pick top 3 relays for this key
const relaysForKey = topRelays
.filter(([, v]) => v.has(k))
.slice(0, PickNRelays)
.map(([k]) => k);
return { k, relaysForKey };
});
const pickedRelays = new Set(userPickedRelays.map(a => a.relaysForKey).flat());
const picked = [...pickedRelays].map(a => {
const keysOnPickedRelay = new Set(userPickedRelays.filter(b => b.relaysForKey.includes(a)).map(b => b.k));
return {
relay: a,
filter: {
...filter,
authors: [...keysOnPickedRelay],
},
} as RelayTaggedFilter;
});
picked.push({
relay: "",
filter: {
...filter,
authors: missing.map(a => a.key),
},
});
console.debug("GOSSIP", picked);
return picked;
}

View File

@ -151,7 +151,6 @@ export class Query {
}
cleanup() {
console.debug("Cleanup", this.id);
this.#stopCheckTraces();
}

View File

@ -12,6 +12,7 @@ import {
} from "./NoteCollection";
import { diffFilters } from "./RequestSplitter";
import { Query } from "./Query";
import { splitAllByWriteRelays } from "./GossipModel";
export {
NoteStore,
@ -89,7 +90,7 @@ export class NostrSystem {
try {
const addr = unwrap(sanitizeRelayUrl(address));
if (!this.Sockets.has(addr)) {
const c = new Connection(addr, options, this.HandleAuth);
const c = new Connection(addr, options, this.HandleAuth?.bind(this));
this.Sockets.set(addr, c);
c.OnEvent = (s, e) => this.OnEvent(s, e);
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
@ -146,7 +147,7 @@ export class NostrSystem {
try {
const addr = unwrap(sanitizeRelayUrl(address));
if (!this.Sockets.has(addr)) {
const c = new Connection(addr, { read: true, write: false }, this.HandleAuth, true);
const c = new Connection(addr, { read: true, write: false }, this.HandleAuth?.bind(this), true);
this.Sockets.set(addr, c);
c.OnEvent = (s, e) => this.OnEvent(s, e);
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
@ -212,11 +213,15 @@ export class NostrSystem {
this.#changed();
return unwrap(q.feed) as Readonly<T>;
} else {
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, filters, q.feed);
q.subQueries.push(subQ);
const splitFilters = splitAllByWriteRelays(filters);
for (const sf of splitFilters) {
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed);
subQ.relays = sf.relay ? [sf.relay] : [];
q.subQueries.push(subQ);
this.SendQuery(subQ);
}
q.filters = filters;
q.feed.loading = true;
this.SendQuery(subQ);
this.#changed();
return q.feed as Readonly<T>;
}
@ -227,7 +232,9 @@ export class NostrSystem {
AddQuery<T extends NoteStore>(type: { new (): T }, rb: RequestBuilder): T {
const store = new type();
const q = new Query(rb.id, rb.build(), store);
const filters = rb.build();
const q = new Query(rb.id, filters, store);
if (rb.options?.leaveOpen) {
q.leaveOpen = rb.options.leaveOpen;
}
@ -236,7 +243,17 @@ export class NostrSystem {
}
this.Queries.set(rb.id, q);
this.SendQuery(q);
const splitFilters = splitAllByWriteRelays(filters);
if (splitFilters.length > 1) {
for (const sf of splitFilters) {
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed);
subQ.relays = sf.relay ? [sf.relay] : [];
q.subQueries.push(subQ);
this.SendQuery(subQ);
}
} else {
this.SendQuery(q);
}
this.#changed();
return store;
}
@ -248,9 +265,27 @@ export class NostrSystem {
}
}
SendQuery(q: Query) {
for (const [, s] of this.Sockets) {
q.sendToRelay(s);
async SendQuery(q: Query) {
if (q.relays.length > 0) {
for (const r of q.relays) {
const s = this.Sockets.get(r);
if (s) {
q.sendToRelay(s);
} else {
const nc = await this.ConnectEphemeralRelay(r);
if (nc) {
q.sendToRelay(nc);
} else {
console.warn("Failed to connect to new relay for:", r, q);
}
}
}
} else {
for (const [, s] of this.Sockets) {
if (!s.Ephemeral) {
q.sendToRelay(s);
}
}
}
}
@ -304,7 +339,6 @@ export class NostrSystem {
if (v.closingAt && v.closingAt < now) {
v.sendClose();
this.Queries.delete(k);
console.debug("Removed:", k);
changed = true;
}
}