feat: automate social graph

This commit is contained in:
2024-02-22 11:12:26 +00:00
parent 3f0bd88db8
commit 7558e91d28
16 changed files with 285 additions and 336 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@snort/system",
"version": "1.2.9",
"version": "1.2.10",
"description": "Snort nostr system package",
"type": "module",
"main": "dist/index.js",

View File

@ -47,44 +47,47 @@ export default class SocialGraph {
}
}
handleEvent(event: NostrEvent) {
if (event.kind !== 3) {
handleEvent(evs: NostrEvent | Array<NostrEvent>) {
const filtered = (Array.isArray(evs) ? evs : [evs]).filter(a => a.kind === 3);
if (filtered.length === 0) {
return;
}
queueMicrotask(() => {
try {
const author = ID(event.pubkey);
const timestamp = event.created_at;
const existingTimestamp = this.latestFollowEventTimestamps.get(author);
if (existingTimestamp && timestamp <= existingTimestamp) {
return;
}
this.latestFollowEventTimestamps.set(author, timestamp);
for (const event of filtered) {
const author = ID(event.pubkey);
const timestamp = event.created_at;
const existingTimestamp = this.latestFollowEventTimestamps.get(author);
if (existingTimestamp && timestamp <= existingTimestamp) {
return;
}
this.latestFollowEventTimestamps.set(author, timestamp);
// Collect all users followed in the new event.
const followedInEvent = new Set<UID>();
for (const tag of event.tags) {
if (tag[0] === "p") {
const followedUser = ID(tag[1]);
if (followedUser !== author) {
followedInEvent.add(followedUser);
// Collect all users followed in the new event.
const followedInEvent = new Set<UID>();
for (const tag of event.tags) {
if (tag[0] === "p") {
const followedUser = ID(tag[1]);
if (followedUser !== author) {
followedInEvent.add(followedUser);
}
}
}
}
// Get the set of users currently followed by the author.
const currentlyFollowed = this.followedByUser.get(author) || new Set<UID>();
// Get the set of users currently followed by the author.
const currentlyFollowed = this.followedByUser.get(author) || new Set<UID>();
// Find users that need to be removed.
for (const user of currentlyFollowed) {
if (!followedInEvent.has(user)) {
this.removeFollower(user, author);
// Find users that need to be removed.
for (const user of currentlyFollowed) {
if (!followedInEvent.has(user)) {
this.removeFollower(user, author);
}
}
}
// Add or update the followers based on the new event.
for (const user of followedInEvent) {
this.addFollower(user, author);
// Add or update the followers based on the new event.
for (const user of followedInEvent) {
this.addFollower(user, author);
}
}
} catch (e) {
// might not be logged in or sth

View File

@ -44,9 +44,16 @@ export interface RelayMetrics {
export interface UsersRelays {
pubkey: string;
relays: FullRelaySettings[];
created: number;
loaded: number;
relays: FullRelaySettings[];
}
export interface UsersFollows {
pubkey: string;
created: number;
loaded: number;
follows: Array<Array<string>>;
}
export function mapEventToProfile(ev: NostrEvent) {
@ -78,6 +85,7 @@ export interface SnortSystemDb {
relayMetrics: DexieTableLike<RelayMetrics>;
userRelays: DexieTableLike<UsersRelays>;
events: DexieTableLike<NostrEvent>;
contacts: DexieTableLike<UsersFollows>;
isAvailable(): Promise<boolean>;
}

View File

@ -0,0 +1,29 @@
import { UsersFollows } from ".";
import { DexieTableLike, FeedCache } from "@snort/shared";
export class UserFollowsCache extends FeedCache<UsersFollows> {
constructor(table?: DexieTableLike<UsersFollows>) {
super("UserFollowsCache", table);
}
key(of: UsersFollows): string {
return of.pubkey;
}
override async preload(follows?: Array<string>): Promise<void> {
await super.preload();
if (follows) {
await this.buffer(follows);
}
}
newest(): number {
let ret = 0;
this.cache.forEach(v => (ret = v.created > ret ? v.created : ret));
return ret;
}
takeSnapshot(): Array<UsersFollows> {
return [...this.cache.values()];
}
}

View File

@ -12,6 +12,7 @@ import EventEmitter from "eventemitter3";
import { QueryEvents } from "./query";
import { CacheRelay } from "./cache-relay";
import { RequestRouter } from "./request-router";
import { UsersFollows } from "./cache/index";
export { NostrSystem } from "./nostr-system";
export { default as EventKind } from "./event-kind";
@ -48,8 +49,6 @@ export * from "./cache/user-relays";
export * from "./cache/user-metadata";
export * from "./cache/relay-metric";
export * from "./worker/system-worker";
export type QueryLike = {
get progress(): number;
feed: {
@ -143,6 +142,11 @@ export interface SystemInterface {
*/
get eventsCache(): CachedTable<NostrEvent>;
/**
* ContactList cache
*/
get userFollowsCache(): CachedTable<UsersFollows>;
/**
* Relay loader loads relay metadata for a set of profiles
*/

View File

@ -1,7 +1,7 @@
import debug from "debug";
import EventEmitter from "eventemitter3";
import { CachedTable } from "@snort/shared";
import { CachedTable, isHex, unixNowMs } from "@snort/shared";
import { NostrEvent, TaggedNostrEvent, OkResponse } from "./nostr";
import { Connection, RelaySettings } from "./connection";
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
@ -19,6 +19,10 @@ import {
SnortSystemDb,
QueryLike,
OutboxModel,
socialGraphInstance,
EventKind,
UsersFollows,
ID,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayMetadataLoader } from "./outbox";
@ -26,7 +30,8 @@ import { Optimizer, DefaultOptimizer } from "./query-optimizer";
import { ConnectionPool, DefaultConnectionPool } from "./connection-pool";
import { QueryManager } from "./query-manager";
import { CacheRelay } from "./cache-relay";
import { RequestRouter } from "request-router";
import { RequestRouter } from "./request-router";
import { UserFollowsCache } from "./cache/user-follows-lists";
export interface NostrSystemEvents {
change: (state: SystemSnapshot) => void;
@ -56,6 +61,11 @@ export interface SystemConfig {
*/
events: CachedTable<NostrEvent>;
/**
* Cache of user ContactLists (kind 3)
*/
contactLists: CachedTable<UsersFollows>;
/**
* Optimized cache relay, usually `@snort/worker-relay`
*/
@ -83,6 +93,14 @@ export interface SystemConfig {
* 2. Write to inbox for all `p` tagged users in broadcasting events
*/
automaticOutboxModel: boolean;
/**
* Automatically populate SocialGraph from kind 3 events fetched.
*
* This is basically free because we always load relays (which includes kind 3 contact lists)
* for users when fetching by author.
*/
buildFollowGraph: boolean;
}
/**
@ -125,6 +143,10 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
return this.#config.events;
}
get userFollowsCache(): CachedTable<UsersFollows> {
return this.#config.contactLists;
}
get cacheRelay(): CacheRelay | undefined {
return this.#config.cachingRelay;
}
@ -153,11 +175,13 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
profiles: props.profiles ?? new UserProfileCache(props.db?.users),
relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics),
events: props.events ?? new EventsCache(props.db?.events),
contactLists: props.contactLists ?? new UserFollowsCache(props.db?.contacts),
optimizer: props.optimizer ?? DefaultOptimizer,
checkSigs: props.checkSigs ?? false,
cachingRelay: props.cachingRelay,
db: props.db,
automaticOutboxModel: props.automaticOutboxModel ?? true,
buildFollowGraph: props.buildFollowGraph ?? false,
};
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
@ -169,6 +193,32 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.requestRouter = OutboxModel.fromSystem(this);
}
// Hook on-event when building follow graph
if (this.#config.buildFollowGraph) {
let evBuf: Array<TaggedNostrEvent> = [];
let t: ReturnType<typeof setTimeout> | undefined;
this.on("event", (_, ev) => {
if (ev.kind === EventKind.ContactList) {
// fire&forget update
this.userFollowsCache.update({
loaded: unixNowMs(),
created: ev.created_at,
pubkey: ev.pubkey,
follows: ev.tags,
});
// buffer social graph updates into 500ms window
evBuf.push(ev);
if (!t) {
t = setTimeout(() => {
socialGraphInstance.handleEvent(evBuf);
evBuf = [];
}, 500);
}
}
});
}
this.pool = new DefaultConnectionPool(this);
this.#queryManager = new QueryManager(this);
@ -225,8 +275,24 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.profileCache.preload(),
this.relayMetricsCache.preload(),
this.eventsCache.preload(),
this.userFollowsCache.preload(),
];
await Promise.all(t);
await this.PreloadSocialGraph();
}
async PreloadSocialGraph() {
// Insert data to socialGraph from cache
if (this.#config.buildFollowGraph) {
for (const list of this.userFollowsCache.snapshot()) {
const user = ID(list.pubkey);
for (const fx of list.follows) {
if (fx[0] === "p" && fx[1].length === 64) {
socialGraphInstance.addFollower(ID(fx[1]), user);
}
}
}
}
}
async ConnectToRelay(address: string, options: RelaySettings) {

View File

@ -69,8 +69,12 @@ export class KeyedReplaceableNoteStore extends HookedNoteStore {
const changes: Array<TaggedNostrEvent> = [];
ev.forEach(a => {
const keyOnEvent = this.#keyFn(a);
const existingCreated = this.#events.get(keyOnEvent)?.created_at ?? 0;
const existing = this.#events.get(keyOnEvent);
const existingCreated = existing?.created_at ?? 0;
if (a.created_at > existingCreated) {
if (existing) {
a.relays.push(...existing.relays);
}
this.#events.set(keyOnEvent, a);
changes.push(a);
}

View File

@ -1,15 +0,0 @@
export const enum WorkerCommand {
OkResponse,
ErrorResponse,
Init,
ConnectRelay,
DisconnectRelay,
Query,
QueryResult,
}
export interface WorkerMessage<T> {
id: string;
type: WorkerCommand;
data: T;
}

View File

@ -1,49 +0,0 @@
/// <reference lib="webworker" />
import { NostrSystem } from "../nostr-system";
import { WorkerMessage, WorkerCommand } from ".";
const system = new NostrSystem({
checkSigs: true,
});
function reply<T>(id: string, type: WorkerCommand, data: T) {
globalThis.postMessage({
id,
type,
data,
} as WorkerMessage<T>);
}
function okReply(id: string, message?: string) {
reply<string | undefined>(id, WorkerCommand.OkResponse, message);
}
function errorReply(id: string, message: string) {
reply<string>(id, WorkerCommand.ErrorResponse, message);
}
globalThis.onmessage = async ev => {
console.debug(ev);
const data = ev.data as { id: string; type: WorkerCommand };
try {
switch (data.type) {
case WorkerCommand.Init: {
await system.Init();
okReply(data.id);
break;
}
case WorkerCommand.ConnectRelay: {
const cmd = ev.data as WorkerMessage<[string, { read: boolean; write: boolean }]>;
await system.ConnectToRelay(cmd.data[0], cmd.data[1]);
okReply(data.id, "Connected");
break;
}
default: {
errorReply(data.id, "Unknown command");
break;
}
}
} catch (e) {
if (e instanceof Error) {
errorReply(data.id, e.message);
}
}
};

View File

@ -1,208 +0,0 @@
import { v4 as uuid } from "uuid";
import EventEmitter from "eventemitter3";
import {
NostrEvent,
OkResponse,
ProfileLoaderService,
RelaySettings,
RequestBuilder,
SystemInterface,
TaggedNostrEvent,
CachedMetadata,
RelayMetadataLoader,
RelayMetricCache,
RelayMetrics,
UserProfileCache,
UserRelaysCache,
UsersRelays,
QueryLike,
Optimizer,
DefaultOptimizer,
} from "..";
import { NostrSystemEvents, SystemConfig } from "../nostr-system";
import { WorkerCommand, WorkerMessage } from ".";
import { CachedTable } from "@snort/shared";
import { EventsCache } from "../cache/events";
import { RelayMetricHandler } from "../relay-metric-handler";
import debug from "debug";
import { ConnectionPool } from "../connection-pool";
import { CacheRelay } from "../cache-relay";
export class SystemWorker extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#log = debug("SystemWorker");
#worker: Worker;
#commandQueue: Map<string, (v: unknown) => void> = new Map();
#config: SystemConfig;
/**
* Storage class for user relay lists
*/
get relayCache(): CachedTable<UsersRelays> {
return this.#config.relays;
}
/**
* Storage class for user profiles
*/
get profileCache(): CachedTable<CachedMetadata> {
return this.#config.profiles;
}
/**
* Storage class for relay metrics (connects/disconnects)
*/
get relayMetricsCache(): CachedTable<RelayMetrics> {
return this.#config.relayMetrics;
}
/**
* Optimizer instance, contains optimized functions for processing data
*/
get optimizer(): Optimizer {
return this.#config.optimizer;
}
get eventsCache(): CachedTable<NostrEvent> {
return this.#config.events;
}
/**
* Check event signatures (recommended)
*/
get checkSigs(): boolean {
return this.#config.checkSigs;
}
set checkSigs(v: boolean) {
this.#config.checkSigs = v;
}
get requestRouter() {
return undefined;
}
get cacheRelay(): CacheRelay | undefined {
return this.#config.cachingRelay;
}
get pool() {
return {} as ConnectionPool;
}
readonly relayLoader: RelayMetadataLoader;
readonly profileLoader: ProfileLoaderService;
readonly relayMetricsHandler: RelayMetricHandler;
constructor(scriptPath: string, props: Partial<SystemConfig>) {
super();
this.#config = {
relays: props.relays ?? new UserRelaysCache(props.db?.userRelays),
profiles: props.profiles ?? new UserProfileCache(props.db?.users),
relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics),
events: props.events ?? new EventsCache(props.db?.events),
optimizer: props.optimizer ?? DefaultOptimizer,
checkSigs: props.checkSigs ?? false,
cachingRelay: props.cachingRelay,
db: props.db,
automaticOutboxModel: props.automaticOutboxModel ?? true,
};
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);
this.relayLoader = new RelayMetadataLoader(this, this.relayCache);
this.#worker = new Worker(scriptPath, {
name: "SystemWorker",
type: "module",
});
this.#worker.onmessage = async e => {
const cmd = e.data as { id: string; type: WorkerCommand; data?: unknown };
if (cmd.type === WorkerCommand.OkResponse) {
const q = this.#commandQueue.get(cmd.id);
q?.(cmd.data);
this.#commandQueue.delete(cmd.id);
}
};
}
get Sockets(): never[] {
return [];
}
async Init() {
await this.#workerRpc(WorkerCommand.Init);
}
GetQuery(id: string): QueryLike | undefined {
return undefined;
}
Query(req: RequestBuilder): QueryLike {
const chan = this.#workerRpc<[RequestBuilder], { id: string; port: MessagePort }>(WorkerCommand.Query, [req]);
return {
on: (_: "event", cb) => {
chan.then(c => {
c.port.onmessage = e => {
//cb(e.data as Array<TaggedNostrEvent>);
};
});
},
off: (_: "event", cb) => {
chan.then(c => {
c.port.close();
});
},
cancel: () => {},
uncancel: () => {},
} as QueryLike;
}
Fetch(req: RequestBuilder, cb?: ((evs: TaggedNostrEvent[]) => void) | undefined): Promise<TaggedNostrEvent[]> {
throw new Error("Method not implemented.");
}
async ConnectToRelay(address: string, options: RelaySettings) {
await this.#workerRpc(WorkerCommand.ConnectRelay, [address, options, false]);
}
DisconnectRelay(address: string): void {
this.#workerRpc(WorkerCommand.DisconnectRelay, address);
}
HandleEvent(subId: string, ev: TaggedNostrEvent): void {
throw new Error("Method not implemented.");
}
BroadcastEvent(ev: NostrEvent, cb?: ((rsp: OkResponse) => void) | undefined): Promise<OkResponse[]> {
throw new Error("Method not implemented.");
}
WriteOnceToRelay(relay: string, ev: NostrEvent): Promise<OkResponse> {
throw new Error("Method not implemented.");
}
#workerRpc<T, R>(type: WorkerCommand, data?: T, timeout = 5_000) {
const id = uuid();
const msg = {
id,
type,
data,
} as WorkerMessage<T>;
this.#log(msg);
this.#worker.postMessage(msg);
return new Promise<R>((resolve, reject) => {
let t: ReturnType<typeof setTimeout>;
this.#commandQueue.set(id, v => {
clearTimeout(t);
const cmdReply = v as WorkerMessage<R>;
if (cmdReply.type === WorkerCommand.OkResponse) {
resolve(cmdReply.data);
} else {
reject(cmdReply.data);
}
});
t = setTimeout(() => {
reject("timeout");
}, timeout);
});
}
}