refactor: polish
This commit is contained in:
@ -1,7 +1,15 @@
|
||||
import { ID, STR, UID } from "./UniqueIds";
|
||||
import { HexKey, NostrEvent } from "..";
|
||||
import EventEmitter from "eventemitter3";
|
||||
import { unixNowMs } from "@snort/shared";
|
||||
import debug from "debug";
|
||||
|
||||
export default class SocialGraph {
|
||||
export interface SocialGraphEvents {
|
||||
changeRoot: () => void;
|
||||
}
|
||||
|
||||
export default class SocialGraph extends EventEmitter<SocialGraphEvents> {
|
||||
#log = debug("SocialGraph");
|
||||
root: UID;
|
||||
followDistanceByUser = new Map<UID, number>();
|
||||
usersByFollowDistance = new Map<number, Set<UID>>();
|
||||
@ -10,6 +18,7 @@ export default class SocialGraph {
|
||||
latestFollowEventTimestamps = new Map<UID, number>();
|
||||
|
||||
constructor(root: HexKey) {
|
||||
super();
|
||||
this.root = ID(root);
|
||||
this.followDistanceByUser.set(this.root, 0);
|
||||
this.usersByFollowDistance.set(0, new Set([this.root]));
|
||||
@ -20,6 +29,7 @@ export default class SocialGraph {
|
||||
if (rootId === this.root) {
|
||||
return;
|
||||
}
|
||||
const start = unixNowMs();
|
||||
this.root = rootId;
|
||||
this.followDistanceByUser.clear();
|
||||
this.usersByFollowDistance.clear();
|
||||
@ -45,6 +55,8 @@ export default class SocialGraph {
|
||||
}
|
||||
}
|
||||
}
|
||||
this.emit("changeRoot");
|
||||
this.#log(`Rebuilding root took ${(unixNowMs() - start).toFixed(2)} ms`);
|
||||
}
|
||||
|
||||
handleEvent(evs: NostrEvent | Array<NostrEvent>) {
|
||||
|
@ -130,7 +130,7 @@ export class DefaultConnectionPool<T extends ConnectionType = Connection>
|
||||
this.#connectionBuilder = builder;
|
||||
} else {
|
||||
this.#connectionBuilder = (addr, options, ephemeral) => {
|
||||
const sync = new DefaultSyncModule(this.#system.config.fallbackSync);
|
||||
const sync = new DefaultSyncModule(this.#system.config.fallbackSync, this.#system);
|
||||
return new Connection(addr, options, ephemeral, sync) as unknown as T;
|
||||
};
|
||||
}
|
||||
|
@ -240,7 +240,6 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
|
||||
return;
|
||||
}
|
||||
this.emit("event", msg[1] as string, ev);
|
||||
// todo: stats events received
|
||||
break;
|
||||
}
|
||||
case "EOSE": {
|
||||
|
@ -24,7 +24,7 @@ export interface Thread {
|
||||
export const enum EventType {
|
||||
Regular,
|
||||
Replaceable,
|
||||
ParameterizedReplaceable,
|
||||
Addressable,
|
||||
}
|
||||
|
||||
export abstract class EventExt {
|
||||
@ -164,7 +164,7 @@ export abstract class EventExt {
|
||||
static getType(kind: number) {
|
||||
const legacyReplaceable = [0, 3, 41];
|
||||
if (kind >= 30_000 && kind < 40_000) {
|
||||
return EventType.ParameterizedReplaceable;
|
||||
return EventType.Addressable;
|
||||
} else if (kind >= 10_000 && kind < 20_000) {
|
||||
return EventType.Replaceable;
|
||||
} else if (legacyReplaceable.includes(kind)) {
|
||||
@ -174,9 +174,14 @@ export abstract class EventExt {
|
||||
}
|
||||
}
|
||||
|
||||
static isReplaceable(kind: number) {
|
||||
const t = EventExt.getType(kind);
|
||||
return t === EventType.Replaceable || t === EventType.Addressable;
|
||||
}
|
||||
|
||||
static isValid(ev: NostrEvent) {
|
||||
const type = EventExt.getType(ev.kind);
|
||||
if (type === EventType.ParameterizedReplaceable) {
|
||||
if (type === EventType.Addressable) {
|
||||
if (!findTag(ev, "d")) return false;
|
||||
}
|
||||
return ev.sig !== undefined;
|
||||
|
@ -100,7 +100,7 @@ export class NoteCollection extends KeyedReplaceableNoteStore {
|
||||
constructor() {
|
||||
super(e => {
|
||||
switch (EventExt.getType(e.kind)) {
|
||||
case EventType.ParameterizedReplaceable:
|
||||
case EventType.Addressable:
|
||||
return `${e.kind}:${e.pubkey}:${findTag(e, "d")}`;
|
||||
case EventType.Replaceable:
|
||||
return `${e.kind}:${e.pubkey}`;
|
||||
|
@ -29,6 +29,7 @@ export interface Optimizer {
|
||||
flatMerge(all: Array<FlatReqFilter>): Array<ReqFilter>;
|
||||
compress(all: Array<ReqFilter>): Array<ReqFilter>;
|
||||
schnorrVerify(ev: NostrEvent): boolean;
|
||||
batchVerify(evs: Array<NostrEvent>): Array<boolean>;
|
||||
}
|
||||
|
||||
export const DefaultOptimizer = {
|
||||
|
@ -3,14 +3,18 @@ import { EventExt, EventType } from "../event-ext";
|
||||
import { NoteCollection } from "../note-collection";
|
||||
import { RangeSync } from "./range-sync";
|
||||
import { NegentropyFlow } from "../negentropy/negentropy-flow";
|
||||
import { SystemConfig } from "../system";
|
||||
import { SystemConfig, SystemInterface } from "../system";
|
||||
import { findTag } from "../utils";
|
||||
|
||||
export interface ConnectionSyncModule {
|
||||
sync: (c: Connection, item: SyncCommand, cb?: () => void) => void;
|
||||
}
|
||||
|
||||
export class DefaultSyncModule implements ConnectionSyncModule {
|
||||
constructor(readonly method: SystemConfig["fallbackSync"]) {}
|
||||
constructor(
|
||||
readonly method: SystemConfig["fallbackSync"],
|
||||
readonly system: SystemInterface,
|
||||
) {}
|
||||
|
||||
sync(c: Connection, item: SyncCommand, cb?: () => void) {
|
||||
const [_, id, eventSet, ...filters] = item;
|
||||
@ -58,6 +62,36 @@ export class DefaultSyncModule implements ConnectionSyncModule {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Split a set of filters down into individual filters
|
||||
* which can be used to since request updates to replaceable events
|
||||
*/
|
||||
#breakdownReplaceable(item: SyncCommand) {
|
||||
const [type, id, eventSet, ...filters] = item;
|
||||
|
||||
const flat = filters.flatMap(a => this.system.optimizer.expandFilter(a));
|
||||
const mapped = flat.map(a => {
|
||||
if (!a.kinds || !a.authors) return a;
|
||||
if (EventExt.isReplaceable(a.kinds)) {
|
||||
const latest = eventSet.find(
|
||||
b => b.kind === a.kinds && b.pubkey === a.authors && (!a["#d"] || findTag(b, "d") === a["#d"]),
|
||||
);
|
||||
if (latest) {
|
||||
return {
|
||||
...a,
|
||||
since: latest.created_at + 1,
|
||||
};
|
||||
}
|
||||
}
|
||||
return a;
|
||||
});
|
||||
const compressed = this.system.optimizer.flatMerge(mapped);
|
||||
if (compressed.length !== filters.length) {
|
||||
console.debug("COMPRESSED", id, filters, compressed);
|
||||
}
|
||||
return compressed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Using the latest data, fetch only newer items
|
||||
*
|
||||
@ -66,11 +100,17 @@ export class DefaultSyncModule implements ConnectionSyncModule {
|
||||
#syncSince(c: Connection, item: SyncCommand, cb?: () => void) {
|
||||
const [type, id, eventSet, ...filters] = item;
|
||||
if (type !== "SYNC") throw new Error("Must be a SYNC command");
|
||||
const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
|
||||
const newFilters = filters.map(a => ({
|
||||
...a,
|
||||
since: latest + 1,
|
||||
}));
|
||||
//const broken = this.#breakdownReplaceable(item);
|
||||
const latest = eventSet
|
||||
//.filter(a => !EventExt.isReplaceable(a.kind))
|
||||
.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
|
||||
const newFilters = filters.map(a => {
|
||||
if (a.since || latest === 0) return a;
|
||||
return {
|
||||
...a,
|
||||
since: latest + 1,
|
||||
};
|
||||
});
|
||||
c.request(["REQ", id, ...newFilters], cb);
|
||||
}
|
||||
|
||||
|
@ -129,10 +129,7 @@ export class SafeSync extends EventEmitter<SafeSyncEvents> {
|
||||
if (prevTag && prevTag[1] !== this.#base.id) {
|
||||
throw new Error("Previous tag does not match our version");
|
||||
}
|
||||
if (
|
||||
EventExt.getType(ev.kind) !== EventType.Replaceable &&
|
||||
EventExt.getType(ev.kind) !== EventType.ParameterizedReplaceable
|
||||
) {
|
||||
if (EventExt.getType(ev.kind) !== EventType.Replaceable && EventExt.getType(ev.kind) !== EventType.Addressable) {
|
||||
throw new Error("Not a replacable event kind");
|
||||
}
|
||||
if (this.#base.created_at >= ev.created_at) {
|
||||
|
Reference in New Issue
Block a user