Merge remote-tracking branch 'kieran/main'

This commit is contained in:
Martti Malmi
2023-11-26 15:52:50 +02:00
69 changed files with 761 additions and 437 deletions

View File

@ -0,0 +1,136 @@
import debug from "debug";
import { FeedCache, removeUndefined } from "@snort/shared";
import { SystemInterface, TaggedNostrEvent, RequestBuilder } from ".";
export abstract class BackgroundLoader<T extends { loaded: number; created: number }> {
#system: SystemInterface;
#cache: FeedCache<T>;
#log = debug(this.name());
/**
* List of pubkeys to fetch metadata for
*/
#wantsKeys = new Set<string>();
/**
* Custom loader function for fetching data from alternative sources
*/
loaderFn?: (pubkeys: Array<string>) => Promise<Array<T>>;
constructor(system: SystemInterface, cache: FeedCache<T>) {
this.#system = system;
this.#cache = cache;
this.#FetchMetadata();
}
get Cache() {
return this.#cache;
}
/**
* Name of this loader service
*/
abstract name(): string;
/**
* Handle fetched data
*/
abstract onEvent(e: Readonly<TaggedNostrEvent>): T | undefined;
/**
* Get expire time as uxix milliseconds
*/
abstract getExpireCutoff(): number;
/**
* Build subscription for missing keys
*/
protected abstract buildSub(missing: Array<string>): RequestBuilder;
/**
* Create a placeholder value when no data can be found
*/
protected abstract makePlaceholder(key: string): T | undefined;
/**
* Start requesting a set of keys to be loaded
*/
TrackKeys(pk: string | Array<string>) {
for (const p of Array.isArray(pk) ? pk : [pk]) {
this.#wantsKeys.add(p);
}
}
/**
* Stop requesting a set of keys to be loaded
*/
UntrackKeys(pk: string | Array<string>) {
for (const p of Array.isArray(pk) ? pk : [pk]) {
this.#wantsKeys.delete(p);
}
}
/**
* Get object from cache or fetch if missing
*/
async fetch(key: string) {
const existing = this.Cache.get(key);
if (existing) {
return existing;
} else {
return await new Promise<T>((resolve, reject) => {
this.TrackKeys(key);
const release = this.Cache.hook(() => {
const existing = this.Cache.getFromCache(key);
if (existing) {
resolve(existing);
release();
this.UntrackKeys(key);
}
}, key);
});
}
}
async #FetchMetadata() {
const loading = [...this.#wantsKeys];
await this.#cache.buffer(loading);
const missing = loading.filter(a => (this.#cache.getFromCache(a)?.loaded ?? 0) < this.getExpireCutoff());
if (missing.length > 0) {
this.#log("Fetching keys: %O", missing);
try {
const found = await this.#loadData(missing);
const noResult = removeUndefined(
missing.filter(a => !found.some(b => a === this.#cache.key(b))).map(a => this.makePlaceholder(a)),
);
if (noResult.length > 0) {
await Promise.all(noResult.map(a => this.#cache.update(a)));
}
} catch (e) {
this.#log("Error: %O", e);
debugger;
}
}
setTimeout(() => this.#FetchMetadata(), 500);
}
async #loadData(missing: Array<string>) {
if (this.loaderFn) {
const results = await this.loaderFn(missing);
await Promise.all(results.map(a => this.#cache.update(a)));
return results;
} else {
const v = await this.#system.Fetch(this.buildSub(missing), async e => {
for (const pe of e) {
const m = this.onEvent(pe);
if (m) {
await this.#cache.update(m);
}
}
});
return removeUndefined(v.map(this.onEvent));
}
}
}

View File

@ -44,8 +44,9 @@ export interface RelayMetrics {
export interface UsersRelays {
pubkey: string;
created_at: number;
relays: FullRelaySettings[];
created: number;
loaded: number;
}
export function mapEventToProfile(ev: NostrEvent) {

View File

@ -19,7 +19,7 @@ export class UserRelaysCache extends FeedCache<UsersRelays> {
newest(): number {
let ret = 0;
this.cache.forEach(v => (ret = v.created_at > ret ? v.created_at : ret));
this.cache.forEach(v => (ret = v.created > ret ? v.created : ret));
return ret;
}

View File

@ -19,6 +19,11 @@ export const TagRefRegex = /(#\[\d+\])/gm;
*/
export const ProfileCacheExpire = 1_000 * 60 * 60 * 6;
/**
* How long before relay lists should be refreshed
*/
export const RelayListCacheExpire = 1_000 * 60 * 60 * 12;
/**
* Extract file extensions regex
*/

View File

@ -1,4 +1,4 @@
enum EventKind {
const enum EventKind {
Unknown = -1,
SetMetadata = 0,
TextNote = 1,
@ -27,9 +27,20 @@ enum EventKind {
MuteList = 10_000, // NIP-51
PinList = 10_001, // NIP-51
BookmarksList = 10_003, // NIP-51
CommunitiesList = 10_004, // NIP-51
PublicChatsList = 10_005, // NIP-51
BlockedRelaysList = 10_006, // NIP-51
SearchRelaysList = 10_007, // NIP-51
InterestsList = 10_015, // NIP-51
EmojisList = 10_030, // NIP-51
CategorizedPeople = 30000, // NIP-51a
CategorizedBookmarks = 30001, // NIP-51b
FollowSet = 30_000, // NIP-51
RelaySet = 30_002, // NIP-51
BookmarkSet = 30_003, // NIP-51
CurationSet = 30_004, // NIP-51
InterestSet = 30_015, // NIP-15
EmojiSet = 30_030, // NIP-51
Badge = 30009, // NIP-58
ProfileBadges = 30008, // NIP-58

View File

@ -137,9 +137,8 @@ export class EventPublisher {
* Build a categorized bookmarks event with a given label
* @param notes List of bookmarked links
*/
async bookmarks(notes: Array<ToNostrEventTag>, list: "bookmark" | "follow") {
const eb = this.#eb(EventKind.CategorizedBookmarks);
eb.tag(["d", list]);
async bookmarks(notes: Array<ToNostrEventTag>) {
const eb = this.#eb(EventKind.BookmarksList);
notes.forEach(n => {
eb.tag(unwrap(n.toEventTag()));
});

View File

@ -4,7 +4,7 @@ import { NoteStore, NoteStoreSnapshotData } from "./note-collection";
import { Query } from "./query";
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
import { ProfileLoaderService } from "./profile-cache";
import { RelayCache } from "./gossip-model";
import { RelayCache } from "./outbox-model";
import { QueryOptimizer } from "./query-optimizer";
import { base64 } from "@scure/base";
@ -31,6 +31,7 @@ export * from "./pow";
export * from "./pow-util";
export * from "./query-optimizer";
export * from "./encrypted";
export * from "./outbox-model";
export * from "./impl/nip4";
export * from "./impl/nip44";
@ -71,7 +72,7 @@ export interface SystemInterface {
* @param req Request to send to relays
* @param cb A callback which will fire every 100ms when new data is received
*/
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void): Promise<NoteStoreSnapshotData>;
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void): Promise<Array<TaggedNostrEvent>>;
/**
* Create a new permanent connection to a relay

View File

@ -5,7 +5,7 @@ import { unwrap, sanitizeRelayUrl, FeedCache, removeUndefined } from "@snort/sha
import { NostrEvent, TaggedNostrEvent } from "./nostr";
import { Connection, RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { Query } from "./query";
import { NoteCollection, NoteStore, NoteStoreSnapshotData } from "./note-collection";
import { NoteCollection, NoteStore } from "./note-collection";
import { BuiltRawReqFilter, RequestBuilder, RequestStrategy } from "./request-builder";
import { RelayMetricHandler } from "./relay-metric-handler";
import {
@ -22,7 +22,7 @@ import {
EventExt,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayCache } from "./gossip-model";
import { RelayCache, RelayMetadataLoader, pickRelaysForReply } from "./outbox-model";
import { QueryOptimizer, DefaultQueryOptimizer } from "./query-optimizer";
import { trimFilters } from "./request-trim";
@ -88,6 +88,8 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
*/
checkSigs: boolean;
#relayLoader: RelayMetadataLoader;
constructor(props: {
relayCache?: FeedCache<UsersRelays>;
profileCache?: FeedCache<MetadataCache>;
@ -106,6 +108,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.#profileLoader = new ProfileLoaderService(this, this.#profileCache);
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache);
this.checkSigs = props.checkSigs ?? true;
this.#cleanup();
}
@ -246,7 +249,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
const q = this.Query(NoteCollection, req);
return new Promise<NoteStoreSnapshotData>(resolve => {
return new Promise<Array<TaggedNostrEvent>>(resolve => {
let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = [];
const releaseOnEvent = cb
@ -267,7 +270,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
releaseOnEvent?.();
releaseFeedHook();
q.cancel();
resolve(unwrap(q.feed.snapshot.data));
resolve(unwrap((q.feed as NoteCollection).snapshot.data));
}
});
});
@ -333,6 +336,9 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
);
}
}
if (f.authors) {
this.#relayLoader.TrackKeys(f.authors);
}
}
// check for empty filters
@ -382,8 +388,9 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
*/
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void) {
const socks = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write);
const oks = await Promise.all(
socks.map(async s => {
const replyRelays = await pickRelaysForReply(ev, this);
const oks = await Promise.all([
...socks.map(async s => {
try {
const rsp = await s.SendAsync(ev);
cb?.(rsp);
@ -393,7 +400,8 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
return;
}),
);
...replyRelays.filter(a => !this.#sockets.has(a)).map(a => this.WriteOnceToRelay(a, ev)),
]);
return removeUndefined(oks);
}

View File

@ -1,7 +1,18 @@
import { ReqFilter, UsersRelays } from ".";
import { dedupe, unwrap } from "@snort/shared";
import {
EventKind,
FullRelaySettings,
NostrEvent,
ReqFilter,
RequestBuilder,
SystemInterface,
TaggedNostrEvent,
UsersRelays,
} from ".";
import { dedupe, removeUndefined, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared";
import debug from "debug";
import { FlatReqFilter } from "./query-optimizer";
import { RelayListCacheExpire } from "./const";
import { BackgroundLoader } from "./background-loader";
const PickNRelays = 2;
@ -20,8 +31,13 @@ export interface RelayTaggedFilters {
filters: Array<ReqFilter>;
}
const logger = debug("OutboxModel");
export interface RelayCache {
getFromCache(pubkey?: string): UsersRelays | undefined;
update(obj: UsersRelays): Promise<"new" | "updated" | "refresh" | "no_change">;
buffer(keys: Array<string>): Promise<Array<string>>;
bulkSet(objs: Array<UsersRelays>): Promise<void>;
}
export function splitAllByWriteRelays(cache: RelayCache, filters: Array<ReqFilter>) {
@ -61,7 +77,7 @@ export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter): Array<
];
}
const topRelays = pickTopRelays(cache, unwrap(authors), PickNRelays);
const topRelays = pickTopRelays(cache, unwrap(authors), PickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
@ -84,7 +100,7 @@ export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter): Array<
},
});
}
debug("GOSSIP")("Picked %O => %O", filter, picked);
logger("Picked %O => %O", filter, picked);
return picked;
}
@ -101,7 +117,7 @@ export function splitFlatByWriteRelays(cache: RelayCache, input: Array<FlatReqFi
},
];
}
const topRelays = pickTopRelays(cache, authors, PickNRelays);
const topRelays = pickTopRelays(cache, authors, PickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
@ -119,21 +135,21 @@ export function splitFlatByWriteRelays(cache: RelayCache, input: Array<FlatReqFi
} as RelayTaggedFlatFilters);
}
debug("GOSSIP")("Picked %d relays from %d filters", picked.length, input.length);
logger("Picked %d relays from %d filters", picked.length, input.length);
return picked;
}
/**
* Pick most popular relays for each authors
*/
function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number) {
function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number, type: "write" | "read") {
// map of pubkey -> [write relays]
const allRelays = authors.map(a => {
return {
key: a,
relays: cache
.getFromCache(a)
?.relays?.filter(a => a.settings.write)
?.relays?.filter(a => (type === "write" ? a.settings.write : a.settings.read))
.sort(() => (Math.random() < 0.5 ? 1 : -1)),
};
});
@ -178,3 +194,84 @@ function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number) {
}),
);
}
/**
* Pick read relays for sending reply events
*/
export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface) {
const recipients = dedupe(ev.tags.filter(a => a[0] === "p").map(a => a[1]));
await updateRelayLists(recipients, system);
const relays = pickTopRelays(system.RelayCache, recipients, 2, "read");
const ret = removeUndefined(dedupe(relays.map(a => a.relays).flat()));
logger("Picked %O from authors %O", ret, recipients);
return ret;
}
export function parseRelayTag(tag: Array<string>) {
return {
url: sanitizeRelayUrl(tag[1]),
settings: {
read: tag[2] === "read" || tag[2] === undefined,
write: tag[2] === "write" || tag[2] === undefined,
},
} as FullRelaySettings;
}
export function parseRelayTags(tag: Array<Array<string>>) {
return tag.map(parseRelayTag).filter(a => a !== null);
}
export async function updateRelayLists(authors: Array<string>, system: SystemInterface) {
await system.RelayCache.buffer(authors);
const expire = unixNowMs() - RelayListCacheExpire;
const expired = authors.filter(a => (system.RelayCache.getFromCache(a)?.loaded ?? 0) < expire);
if (expired.length > 0) {
logger("Updating relays for authors: %O", expired);
const rb = new RequestBuilder("system-update-relays-for-outbox");
rb.withFilter().authors(expired).kinds([EventKind.Relays]);
const relayLists = await system.Fetch(rb);
await system.RelayCache.bulkSet(
relayLists.map(a => ({
relays: parseRelayTags(a.tags),
pubkey: a.pubkey,
created: a.created_at,
loaded: unixNowMs(),
})),
);
}
}
export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
override name(): string {
return "RelayMetadataLoader";
}
override onEvent(e: Readonly<TaggedNostrEvent>): UsersRelays | undefined {
return {
relays: parseRelayTags(e.tags),
pubkey: e.pubkey,
created: e.created_at,
loaded: unixNowMs(),
};
}
override getExpireCutoff(): number {
return unixNowMs() - RelayListCacheExpire;
}
protected override buildSub(missing: string[]): RequestBuilder {
const rb = new RequestBuilder("relay-loader");
rb.withOptions({ skipDiff: true });
rb.withFilter().authors(missing).kinds([EventKind.Relays]);
return rb;
}
protected override makePlaceholder(key: string): UsersRelays | undefined {
return {
relays: [],
pubkey: key,
created: 0,
loaded: this.getExpireCutoff() + 300_000,
};
}
}

View File

@ -1,156 +1,40 @@
import debug from "debug";
import { unixNowMs, FeedCache } from "@snort/shared";
import { EventKind, HexKey, SystemInterface, TaggedNostrEvent, RequestBuilder } from ".";
import { unixNowMs } from "@snort/shared";
import { EventKind, TaggedNostrEvent, RequestBuilder } from ".";
import { ProfileCacheExpire } from "./const";
import { mapEventToProfile, MetadataCache } from "./cache";
import { v4 as uuid } from "uuid";
import { BackgroundLoader } from "./background-loader";
const MetadataRelays = ["wss://purplepag.es"];
export class ProfileLoaderService {
#system: SystemInterface;
#cache: FeedCache<MetadataCache>;
/**
* A set of pubkeys we could not find last run,
* This list will attempt to use known profile metadata relays
*/
#missingLastRun: Set<string> = new Set();
/**
* List of pubkeys to fetch metadata for
*/
#wantsMetadata: Set<HexKey> = new Set();
readonly #log = debug("ProfileCache");
/**
* Custom loader function for fetching profiles from alternative sources
*/
loaderFn?: (pubkeys: Array<string>) => Promise<Array<MetadataCache>>;
constructor(system: SystemInterface, cache: FeedCache<MetadataCache>) {
this.#system = system;
this.#cache = cache;
this.#FetchMetadata();
export class ProfileLoaderService extends BackgroundLoader<MetadataCache> {
override name(): string {
return "ProfileLoaderService";
}
get Cache() {
return this.#cache;
override onEvent(e: Readonly<TaggedNostrEvent>): MetadataCache | undefined {
return mapEventToProfile(e);
}
/**
* Request profile metadata for a set of pubkeys
*/
TrackMetadata(pk: HexKey | Array<HexKey>) {
for (const p of Array.isArray(pk) ? pk : [pk]) {
if (p.length === 64) {
this.#wantsMetadata.add(p);
}
}
override getExpireCutoff(): number {
return unixNowMs() - ProfileCacheExpire;
}
/**
* Stop tracking metadata for a set of pubkeys
*/
UntrackMetadata(pk: HexKey | Array<HexKey>) {
for (const p of Array.isArray(pk) ? pk : [pk]) {
if (p.length > 0) {
this.#wantsMetadata.delete(p);
}
}
override buildSub(missing: string[]): RequestBuilder {
const sub = new RequestBuilder(`profiles-${uuid()}`);
sub
.withOptions({
skipDiff: true,
})
.withFilter()
.kinds([EventKind.SetMetadata])
.authors(missing);
return sub;
}
async onProfileEvent(e: Readonly<TaggedNostrEvent>) {
const profile = mapEventToProfile(e);
if (profile) {
await this.#cache.update(profile);
}
}
async fetchProfile(key: string) {
const existing = this.Cache.get(key);
if (existing) {
return existing;
} else {
return await new Promise<MetadataCache>((resolve, reject) => {
this.TrackMetadata(key);
const release = this.Cache.hook(() => {
const existing = this.Cache.getFromCache(key);
if (existing) {
resolve(existing);
release();
this.UntrackMetadata(key);
}
}, key);
});
}
}
async #FetchMetadata() {
const missingFromCache = await this.#cache.buffer([...this.#wantsMetadata]);
const expire = unixNowMs() - ProfileCacheExpire;
const expired = [...this.#wantsMetadata]
.filter(a => !missingFromCache.includes(a))
.filter(a => (this.#cache.getFromCache(a)?.loaded ?? 0) < expire);
const missing = new Set([...missingFromCache, ...expired]);
if (missing.size > 0) {
this.#log("Wants profiles: %d missing, %d expired", missingFromCache.length, expired.length);
const results = await this.#loadProfiles([...missing]);
const couldNotFetch = [...missing].filter(a => !results.some(b => b.pubkey === a));
this.#missingLastRun = new Set(couldNotFetch);
if (couldNotFetch.length > 0) {
this.#log("No profiles: %o", couldNotFetch);
const empty = couldNotFetch.map(a =>
this.#cache.update({
pubkey: a,
loaded: unixNowMs() - ProfileCacheExpire + 30_000, // expire in 30s
created: 69,
} as MetadataCache),
);
await Promise.all(empty);
}
/* When we fetch an expired profile and its the same as what we already have
// onEvent is not fired and the loaded timestamp never gets updated
const expiredSame = results.filter(a => !newProfiles.has(a.id) && expired.includes(a.pubkey));
await Promise.all(expiredSame.map(v => this.onProfileEvent(v)));*/
}
setTimeout(() => this.#FetchMetadata(), 500);
}
async #loadProfiles(missing: Array<string>) {
if (this.loaderFn) {
const results = await this.loaderFn(missing);
await Promise.all(results.map(a => this.#cache.update(a)));
return results;
} else {
const sub = new RequestBuilder(`profiles-${uuid()}`);
sub
.withOptions({
skipDiff: true,
})
.withFilter()
.kinds([EventKind.SetMetadata])
.authors(missing);
if (this.#missingLastRun.size > 0) {
const fMissing = sub
.withFilter()
.kinds([EventKind.SetMetadata])
.authors([...this.#missingLastRun]);
MetadataRelays.forEach(r => fMissing.relay(r));
}
const results = (await this.#system.Fetch(sub, async e => {
for (const pe of e) {
await this.onProfileEvent(pe);
}
})) as ReadonlyArray<TaggedNostrEvent>;
return results;
}
protected override makePlaceholder(key: string): MetadataCache | undefined {
return {
pubkey: key,
loaded: unixNowMs() - ProfileCacheExpire + 30_000,
created: 0,
} as MetadataCache;
}
}

View File

@ -7,11 +7,12 @@ export interface RelayInfo {
software?: string;
version?: string;
limitation?: {
payment_required: boolean;
max_subscriptions: number;
max_filters: number;
max_event_tags: number;
auth_required: boolean;
payment_required?: boolean;
max_subscriptions?: number;
max_filters?: number;
max_event_tags?: number;
auth_required?: boolean;
write_restricted?: boolean;
};
relay_countries?: Array<string>;
language_tags?: Array<string>;

View File

@ -5,7 +5,7 @@ import { appendDedupe, dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snor
import EventKind from "./event-kind";
import { NostrLink, NostrPrefix, SystemInterface } from ".";
import { ReqFilter, u256, HexKey } from "./nostr";
import { RelayCache, splitByWriteRelays, splitFlatByWriteRelays } from "./gossip-model";
import { RelayCache, splitByWriteRelays, splitFlatByWriteRelays } from "./outbox-model";
/**
* Which strategy is used when building REQ filters