refactor: use nostr-social-graph

This commit is contained in:
2024-09-21 23:22:21 +01:00
parent 304976d66d
commit 415e4e3705
30 changed files with 106 additions and 895 deletions

View File

@ -1,210 +0,0 @@
import { ID, ReqFilter as Filter, STR, TaggedNostrEvent, UID } from ".";
import loki from "lokijs";
import debug from "debug";
type PackedNostrEvent = {
id: UID;
pubkey: number;
kind: number;
tags: Array<string | UID>[];
flatTags: string[];
sig: string;
created_at: number;
content?: string;
relays: string[];
saved_at: number;
};
const DEFAULT_MAX_SIZE = 5000;
class InMemoryDB {
private loki = new loki("EventDB");
private eventsCollection: Collection<PackedNostrEvent>;
private maxSize: number;
constructor(maxSize = DEFAULT_MAX_SIZE) {
this.maxSize = maxSize;
this.eventsCollection = this.loki.addCollection("events", {
unique: ["id"],
indices: ["pubkey", "kind", "flatTags", "created_at", "saved_at"],
});
this.startRemoveOldestInterval();
}
private startRemoveOldestInterval() {
const removeOldest = () => {
this.removeOldest();
setTimeout(() => removeOldest(), 3000);
};
setTimeout(() => removeOldest(), 3000);
}
#log = debug("InMemoryDB");
get(id: string): TaggedNostrEvent | undefined {
const event = this.eventsCollection.by("id", ID(id)); // throw if db not ready yet?
if (event) {
return this.unpack(event);
}
}
has(id: string): boolean {
return !!this.eventsCollection.by("id", ID(id));
}
// map to internal UIDs to save memory
private pack(event: TaggedNostrEvent): PackedNostrEvent {
return {
id: ID(event.id),
pubkey: ID(event.pubkey),
sig: event.sig,
kind: event.kind,
tags: event.tags.map(tag => {
if (["e", "p"].includes(tag[0]) && typeof tag[1] === "string") {
return [tag[0], ID(tag[1] as string), ...tag.slice(2)];
} else {
return tag;
}
}),
flatTags: event.tags.filter(tag => ["e", "p", "d"].includes(tag[0])).map(tag => `${tag[0]}_${ID(tag[1])}`),
created_at: event.created_at,
content: event.content,
relays: event.relays,
saved_at: Date.now(),
};
}
private unpack(packedEvent: PackedNostrEvent): TaggedNostrEvent {
return <TaggedNostrEvent>{
id: STR(packedEvent.id),
pubkey: STR(packedEvent.pubkey),
sig: packedEvent.sig,
kind: packedEvent.kind,
tags: packedEvent.tags.map(tag => {
if (["e", "p"].includes(tag[0] as string) && typeof tag[1] === "number") {
return [tag[0], STR(tag[1] as number), ...tag.slice(2)];
} else {
return tag;
}
}),
created_at: packedEvent.created_at,
content: packedEvent.content,
relays: packedEvent.relays,
};
}
handleEvent(event: TaggedNostrEvent): boolean {
if (!event || !event.id || !event.created_at) {
throw new Error("Invalid event");
}
const id = ID(event.id);
if (this.eventsCollection.by("id", id)) {
return false; // this prevents updating event.relays?
}
const packed = this.pack(event);
// we might want to limit the kinds of events we save, e.g. no kind 0, 3 or only 1, 6
try {
this.eventsCollection.insert(packed);
} catch (e) {
return false;
}
return true;
}
remove(eventId: string): void {
const id = ID(eventId);
this.eventsCollection.findAndRemove({ id });
}
removeOldest(): void {
const count = this.eventsCollection.count();
this.#log("InMemoryDB: count", count, this.maxSize);
if (count > this.maxSize) {
this.#log("InMemoryDB: removing oldest events", count - this.maxSize);
this.eventsCollection
.chain()
.simplesort("saved_at")
.limit(count - this.maxSize)
.remove();
}
}
find(filter: Filter, callback: (event: TaggedNostrEvent) => void): void {
this.findArray(filter).forEach(event => {
callback(event);
});
}
findArray(filter: Filter): TaggedNostrEvent[] {
const query = this.constructQuery(filter);
const searchRegex = filter.search ? new RegExp(filter.search, "i") : undefined;
let chain = this.eventsCollection
.chain()
.find(query)
.where((e: PackedNostrEvent) => {
if (searchRegex && !e.content?.match(searchRegex)) {
return false;
}
return true;
})
.simplesort("created_at", true);
if (filter.limit) {
chain = chain.limit(filter.limit);
}
return chain.data().map(e => this.unpack(e));
}
findAndRemove(filter: Filter) {
const query = this.constructQuery(filter);
this.eventsCollection.findAndRemove(query);
}
private constructQuery(filter: Filter): LokiQuery<PackedNostrEvent> {
const query: LokiQuery<PackedNostrEvent> = {};
if (filter.ids) {
query.id = { $in: filter.ids.map(ID) };
} else {
if (filter.authors) {
query.pubkey = { $in: filter.authors.map(ID) };
}
if (filter.kinds) {
query.kind = { $in: filter.kinds };
}
if (filter["#e"]) {
query.flatTags = { $contains: "e_" + filter["#e"]!.map(ID) };
} else if (filter["#p"]) {
query.flatTags = { $contains: "p_" + filter["#p"]!.map(ID) };
} else if (filter["#d"]) {
query.flatTags = { $contains: "d_" + filter["#d"]!.map(ID) };
}
if (filter.since && filter.until) {
query.created_at = { $between: [filter.since, filter.until] };
}
if (filter.since) {
query.created_at = { $gte: filter.since };
}
if (filter.until) {
query.created_at = { $lte: filter.until };
}
}
return query;
}
findOne(filter: Filter): TaggedNostrEvent | undefined {
return this.findArray(filter)[0];
}
}
export { InMemoryDB };
export default new InMemoryDB();

View File

@ -1,251 +0,0 @@
import { ID, STR, UID } from "./UniqueIds";
import { HexKey, NostrEvent } from "..";
import EventEmitter from "eventemitter3";
import { unixNowMs } from "@snort/shared";
import debug from "debug";
export interface SocialGraphEvents {
changeRoot: () => void;
}
export default class SocialGraph extends EventEmitter<SocialGraphEvents> {
#log = debug("SocialGraph");
root: UID;
followDistanceByUser = new Map<UID, number>();
usersByFollowDistance = new Map<number, Set<UID>>();
followedByUser = new Map<UID, Set<UID>>();
followersByUser = new Map<UID, Set<UID>>();
latestFollowEventTimestamps = new Map<UID, number>();
constructor(root: HexKey) {
super();
this.root = ID(root);
this.followDistanceByUser.set(this.root, 0);
this.usersByFollowDistance.set(0, new Set([this.root]));
}
setRoot(root: HexKey) {
const rootId = ID(root);
if (rootId === this.root) {
return;
}
const start = unixNowMs();
this.root = rootId;
this.followDistanceByUser.clear();
this.usersByFollowDistance.clear();
this.followDistanceByUser.set(this.root, 0);
this.usersByFollowDistance.set(0, new Set([this.root]));
const queue = [this.root];
while (queue.length > 0) {
const user = queue.shift()!;
const distance = this.followDistanceByUser.get(user)!;
const followers = this.followersByUser.get(user) || new Set<UID>();
for (const follower of followers) {
if (!this.followDistanceByUser.has(follower)) {
const newFollowDistance = distance + 1;
this.followDistanceByUser.set(follower, newFollowDistance);
if (!this.usersByFollowDistance.has(newFollowDistance)) {
this.usersByFollowDistance.set(newFollowDistance, new Set());
}
this.usersByFollowDistance.get(newFollowDistance)!.add(follower);
queue.push(follower);
}
}
}
this.emit("changeRoot");
this.#log(`Rebuilding root took ${(unixNowMs() - start).toFixed(2)} ms`);
}
handleEvent(evs: NostrEvent | Array<NostrEvent>) {
const filtered = (Array.isArray(evs) ? evs : [evs]).filter(a => a.kind === 3);
if (filtered.length === 0) {
return;
}
queueMicrotask(() => {
try {
for (const event of filtered) {
const author = ID(event.pubkey);
const timestamp = event.created_at;
const existingTimestamp = this.latestFollowEventTimestamps.get(author);
if (existingTimestamp && timestamp <= existingTimestamp) {
return;
}
this.latestFollowEventTimestamps.set(author, timestamp);
// Collect all users followed in the new event.
const followedInEvent = new Set<UID>();
for (const tag of event.tags) {
if (tag[0] === "p") {
const followedUser = ID(tag[1]);
if (followedUser !== author) {
followedInEvent.add(followedUser);
}
}
}
// Get the set of users currently followed by the author.
const currentlyFollowed = this.followedByUser.get(author) || new Set<UID>();
// Find users that need to be removed.
for (const user of currentlyFollowed) {
if (!followedInEvent.has(user)) {
this.removeFollower(user, author);
}
}
// Add or update the followers based on the new event.
for (const user of followedInEvent) {
this.addFollower(user, author);
}
}
} catch (e) {
// might not be logged in or sth
}
});
}
isFollowing(follower: HexKey, followedUser: HexKey): boolean {
const followedUserId = ID(followedUser);
const followerId = ID(follower);
return !!this.followedByUser.get(followerId)?.has(followedUserId);
}
getFollowDistance(user: HexKey): number {
try {
const userId = ID(user);
if (userId === this.root) {
return 0;
}
const distance = this.followDistanceByUser.get(userId);
return distance === undefined ? 1000 : distance;
} catch (e) {
// might not be logged in or sth
return 1000;
}
}
addUserByFollowDistance(distance: number, user: UID) {
if (!this.usersByFollowDistance.has(distance)) {
this.usersByFollowDistance.set(distance, new Set());
}
this.usersByFollowDistance.get(distance)?.add(user);
// remove from higher distances
for (const d of this.usersByFollowDistance.keys()) {
if (d > distance) {
this.usersByFollowDistance.get(d)?.delete(user);
}
}
}
addFollower(followedUser: UID, follower: UID) {
if (typeof followedUser !== "number" || typeof follower !== "number") {
throw new Error("Invalid user id");
}
if (!this.followersByUser.has(followedUser)) {
this.followersByUser.set(followedUser, new Set<UID>());
}
this.followersByUser.get(followedUser)?.add(follower);
if (!this.followedByUser.has(follower)) {
this.followedByUser.set(follower, new Set<UID>());
}
if (followedUser !== this.root) {
let newFollowDistance;
if (follower === this.root) {
// basically same as the next "else" block, but faster
newFollowDistance = 1;
this.addUserByFollowDistance(newFollowDistance, followedUser);
this.followDistanceByUser.set(followedUser, newFollowDistance);
} else {
const existingFollowDistance = this.followDistanceByUser.get(followedUser);
const followerDistance = this.followDistanceByUser.get(follower);
newFollowDistance = followerDistance && followerDistance + 1;
if (existingFollowDistance === undefined || (newFollowDistance && newFollowDistance < existingFollowDistance)) {
this.followDistanceByUser.set(followedUser, newFollowDistance!);
this.addUserByFollowDistance(newFollowDistance!, followedUser);
}
}
}
this.followedByUser.get(follower)?.add(followedUser);
}
removeFollower(unfollowedUser: UID, follower: UID) {
this.followersByUser.get(unfollowedUser)?.delete(follower);
this.followedByUser.get(follower)?.delete(unfollowedUser);
if (unfollowedUser === this.root) {
return;
}
// iterate over remaining followers and set the smallest follow distance
let smallest = Infinity;
for (const follower of this.followersByUser.get(unfollowedUser) || []) {
const followerDistance = this.followDistanceByUser.get(follower);
if (followerDistance !== undefined && followerDistance + 1 < smallest) {
smallest = followerDistance + 1;
}
}
if (smallest === Infinity) {
this.followDistanceByUser.delete(unfollowedUser);
} else {
this.followDistanceByUser.set(unfollowedUser, smallest);
}
}
// TODO subscription methods for followersByUser and followedByUser. and maybe messagesByTime. and replies
followerCount(address: HexKey) {
const id = ID(address);
return this.followersByUser.get(id)?.size ?? 0;
}
followedByFriendsCount(address: HexKey) {
let count = 0;
const id = ID(address);
for (const follower of this.followersByUser.get(id) ?? []) {
if (this.followedByUser.get(this.root)?.has(follower)) {
count++; // should we stop at 10?
}
}
return count;
}
followedByFriends(address: HexKey) {
const id = ID(address);
const set = new Set<HexKey>();
for (const follower of this.followersByUser.get(id) ?? []) {
if (this.followedByUser.get(this.root)?.has(follower)) {
set.add(STR(follower));
}
}
return set;
}
getFollowedByUser(user: HexKey, includeSelf = false): Set<HexKey> {
const userId = ID(user);
const set = new Set<HexKey>();
for (const id of this.followedByUser.get(userId) || []) {
set.add(STR(id));
}
if (includeSelf) {
set.add(user);
}
return set;
}
getFollowersByUser(address: HexKey): Set<HexKey> {
const userId = ID(address);
const set = new Set<HexKey>();
for (const id of this.followersByUser.get(userId) || []) {
set.add(STR(id));
}
return set;
}
}
export const socialGraphInstance = new SocialGraph("");

View File

@ -1,38 +0,0 @@
// should this be a class instead? convert all strings to internal representation, enable comparison
export type UID = number;
// save space by mapping strs to internal unique ids
export class UniqueIds {
static strToUniqueId = new Map<string, UID>();
static uniqueIdToStr = new Map<UID, string>();
static currentUniqueId = 0;
static id(str: string): UID {
if (str.startsWith("npub")) {
throw new Error("use hex instead of npub " + str);
}
const existing = UniqueIds.strToUniqueId.get(str);
if (existing) {
return existing;
}
const newId = UniqueIds.currentUniqueId++;
UniqueIds.strToUniqueId.set(str, newId);
UniqueIds.uniqueIdToStr.set(newId, str);
return newId;
}
static str(id: UID): string {
const pub = UniqueIds.uniqueIdToStr.get(id);
if (!pub) {
throw new Error("pub: invalid id " + id);
}
return pub;
}
static has(str: string): boolean {
return UniqueIds.strToUniqueId.has(str);
}
}
export const STR = UniqueIds.str;
export const ID = UniqueIds.id;

View File

@ -1,9 +1,7 @@
export { NostrSystem } from "./nostr-system";
export { NDKSystem } from "./ndk-system";
export { default as EventKind } from "./event-kind";
export { default as SocialGraph, socialGraphInstance } from "./SocialGraph/SocialGraph";
export * from "./system";
export * from "./SocialGraph/UniqueIds";
export * from "./nostr";
export * from "./links";
export * from "./nips";

View File

@ -11,9 +11,7 @@ import {
SystemSnapshot,
QueryLike,
OutboxModel,
socialGraphInstance,
EventKind,
ID,
SystemConfig,
} from ".";
import { RelayMetadataLoader } from "./outbox";
@ -21,6 +19,7 @@ import { ConnectionPool, DefaultConnectionPool } from "./connection-pool";
import { QueryManager } from "./query-manager";
import { RequestRouter } from "./request-router";
import { SystemBase } from "./system-base";
import { SerializedSocialGraph, SocialGraph, UniqueIds } from "nostr-social-graph";
/**
* Manages nostr content retrieval system
@ -72,7 +71,7 @@ export class NostrSystem extends SystemBase implements SystemInterface {
evBuf.push(ev);
if (!t) {
t = setTimeout(() => {
socialGraphInstance.handleEvent(evBuf);
this.config.socialGraphInstance.handleEvent(evBuf);
evBuf = [];
}, 500);
}
@ -134,18 +133,35 @@ export class NostrSystem extends SystemBase implements SystemInterface {
await this.PreloadSocialGraph(follows);
}
async PreloadSocialGraph(follows?: Array<string>) {
async PreloadSocialGraph(follows?: Array<string>, root?: string) {
// Insert data to socialGraph from cache
if (this.config.buildFollowGraph) {
for (const list of this.userFollowsCache.snapshot()) {
if (follows && !follows.includes(list.pubkey)) continue;
const user = ID(list.pubkey);
for (const fx of list.follows) {
if (fx[0] === "p" && fx[1]?.length === 64) {
socialGraphInstance.addFollower(ID(fx[1]), user);
// load saved social graph
if ("localStorage" in globalThis) {
const saved = localStorage.getItem("social-graph");
if (saved) {
try {
const data = JSON.parse(saved) as SerializedSocialGraph;
this.config.socialGraphInstance = new SocialGraph(root ?? "", data);
} catch (e) {
this.#log("Failed to load serialzied social-graph: %O", e);
localStorage.removeItem("social-graph");
}
}
}
this.config.socialGraphInstance.setRoot(root ?? "");
for (const list of this.userFollowsCache.snapshot()) {
if (follows && !follows.includes(list.pubkey)) continue;
this.config.socialGraphInstance.handleEvent({
id: "",
sig: "",
content: "",
kind: 3,
pubkey: list.pubkey,
created_at: list.created,
tags: list.follows,
});
}
}
}

View File

@ -44,13 +44,7 @@ export class DefaultSyncModule implements ConnectionSyncModule {
// if the event is replaceable there is no need to use any special sync query,
// just send the filters directly
const isReplaceableSync = filters.every(
a =>
a.kinds?.every(
b =>
EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable,
) ?? false,
);
const isReplaceableSync = filters.every(a => a.kinds?.every(b => EventExt.isReplaceable(b) ?? false));
if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplaceableSync) {
c.request(["REQ", id, ...filters], cb);
} else if (this.method === "since") {

View File

@ -7,6 +7,7 @@ import { UserRelaysCache, UserProfileCache, RelayMetricCache, NostrEvent } from
import { DefaultOptimizer, Optimizer } from "./query-optimizer";
import { NostrSystemEvents, SystemConfig } from "./system";
import { EventEmitter } from "eventemitter3";
import { SocialGraph } from "nostr-social-graph";
export abstract class SystemBase extends EventEmitter<NostrSystemEvents> {
#config: SystemConfig;
@ -31,6 +32,7 @@ export abstract class SystemBase extends EventEmitter<NostrSystemEvents> {
automaticOutboxModel: props.automaticOutboxModel ?? true,
buildFollowGraph: props.buildFollowGraph ?? false,
fallbackSync: props.fallbackSync ?? "since",
socialGraphInstance: props.socialGraphInstance ?? new SocialGraph(""),
};
}

View File

@ -11,6 +11,7 @@ import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
import { RequestRouter } from "./request-router";
import { QueryEvents } from "./query";
import EventEmitter from "eventemitter3";
import { SocialGraph } from "nostr-social-graph";
export type QueryLike = {
get progress(): number;
@ -96,6 +97,11 @@ export interface SystemConfig {
* Pick a fallback sync method when negentropy is not available
*/
fallbackSync: "since" | "range-sync";
/**
* Internal social graph used for WoT filtering
*/
socialGraphInstance: SocialGraph;
}
export interface SystemInterface {