feat: diff-sync follows

This commit is contained in:
2024-04-15 22:31:51 +01:00
parent edf64e4125
commit 5a7657a95d
25 changed files with 320 additions and 154 deletions

View File

@ -2,7 +2,7 @@ import { EventKind, HexKey, NostrPrefix, NostrEvent, EventSigner, PowMiner } fro
import { HashtagRegex, MentionNostrEntityRegex } from "./const";
import { getPublicKey, jitter, unixNow } from "@snort/shared";
import { EventExt } from "./event-ext";
import { tryParseNostrLink } from "./nostr-link";
import { NostrLink, tryParseNostrLink } from "./nostr-link";
export class EventBuilder {
#kind?: EventKind;
@ -14,6 +14,21 @@ export class EventBuilder {
#powMiner?: PowMiner;
#jitter?: number;
/**
* Populate builder with values from link
*/
fromLink(link: NostrLink) {
if (link.kind) {
this.#kind = link.kind;
}
if (link.author) {
this.#pubkey = link.author;
}
if (link.type === NostrPrefix.Address && link.id) {
this.tag(["d", link.id]);
}
}
jitter(n: number) {
this.#jitter = n;
return this;

View File

@ -78,7 +78,10 @@ export class NostrLink implements ToNostrEventTag {
return true;
}
} else if (this.type === NostrPrefix.Event || this.type === NostrPrefix.Note) {
return this.id === ev.id;
const ifSetCheck = <T>(a: T | undefined, b: T) => {
return !Boolean(a) || a === b;
};
return ifSetCheck(this.id, ev.id) && ifSetCheck(this.author, ev.pubkey) && ifSetCheck(this.kind, ev.kind);
}
return false;

View File

@ -245,10 +245,15 @@ export class RequestFilterBuilder {
.kinds([unwrap(link.kind)])
.authors([unwrap(link.author)]);
} else {
this.ids([link.id]);
if (link.id) {
this.ids([link.id]);
}
if (link.author) {
this.authors([link.author]);
}
if (link.kind !== undefined) {
this.kinds([link.kind]);
}
}
link.relays?.forEach(v => this.relay(v));
return this;

View File

@ -0,0 +1,100 @@
import { EventBuilder, EventSigner, NostrLink, SystemInterface } from "..";
import { SafeSync } from "./safe-sync";
import debug from "debug";
interface TagDiff {
type: "add" | "remove" | "replace";
tag: Array<string> | Array<Array<string>>;
}
/**
* Add/Remove tags from event
*/
export class DiffSyncTags {
#log = debug("DiffSyncTags");
#sync = new SafeSync();
#changes: Array<TagDiff> = [];
constructor(readonly link: NostrLink) {}
/**
* Add a tag
*/
add(tag: Array<string> | Array<Array<string>>) {
this.#changes.push({
type: "add",
tag,
});
}
/**
* Remove a tag
*/
remove(tag: Array<string> | Array<Array<string>>) {
this.#changes.push({
type: "remove",
tag,
});
}
/**
* Replace all the tags
*/
replace(tag: Array<Array<string>>) {
this.#changes.push({
type: "replace",
tag,
});
}
/**
* Apply changes and save
*/
async persist(signer: EventSigner, system: SystemInterface, content?: string) {
const cloneChanges = [...this.#changes];
this.#changes = [];
// always start with sync
const res = await this.#sync.sync(this.link, system);
let isNew = false;
let next = res ? { ...res } : undefined;
if (!next) {
const eb = new EventBuilder();
eb.fromLink(this.link);
next = eb.build();
isNew = true;
}
if (content) {
next.content = content;
}
// apply changes onto next
for (const change of cloneChanges) {
for (const changeTag of Array.isArray(change.tag[0])
? (change.tag as Array<Array<string>>)
: [change.tag as Array<string>]) {
const existing = next.tags.findIndex(a => a.every((b, i) => changeTag[i] === b));
switch (change.type) {
case "add": {
if (existing === -1) {
next.tags.push(changeTag);
} else {
this.#log("Tag already exists: %O", changeTag);
}
break;
}
case "remove": {
if (existing !== -1) {
next.tags.splice(existing, 1);
} else {
this.#log("Could not find tag to remove: %O", changeTag);
}
}
}
}
}
await this.#sync.update(next, signer, system, !isNew);
}
}

View File

@ -5,3 +5,4 @@ export interface HasId {
export * from "./safe-sync";
export * from "./range-sync";
export * from "./json-in-event-sync";
export * from "./diff-sync";

View File

@ -1,9 +1,8 @@
import { SafeSync } from "./safe-sync";
import { HasId } from ".";
import { EventExt, EventSigner, NostrEvent, NostrLink, SystemInterface } from "..";
import { EventBuilder, EventSigner, NostrEvent, NostrLink, NostrPrefix, SystemInterface } from "..";
import debug from "debug";
import EventEmitter from "eventemitter3";
import { unixNow } from "@snort/shared";
export interface JsonSyncEvents {
change: () => void;
@ -16,6 +15,7 @@ export class JsonEventSync<T extends HasId> extends EventEmitter<JsonSyncEvents>
constructor(
initValue: T,
readonly link: NostrLink,
readonly encrypt: boolean,
) {
super();
@ -30,8 +30,8 @@ export class JsonEventSync<T extends HasId> extends EventEmitter<JsonSyncEvents>
return Object.freeze(ret);
}
async sync(link: NostrLink, signer: EventSigner, system: SystemInterface) {
const res = await this.#sync.sync(link, system);
async sync(signer: EventSigner, system: SystemInterface) {
const res = await this.#sync.sync(this.link, system);
this.#log("Sync result %O", res);
if (res) {
if (this.encrypt) {
@ -40,6 +40,7 @@ export class JsonEventSync<T extends HasId> extends EventEmitter<JsonSyncEvents>
this.#json = JSON.parse(res.content) as T;
}
}
return res;
}
/**
@ -49,27 +50,26 @@ export class JsonEventSync<T extends HasId> extends EventEmitter<JsonSyncEvents>
*/
async updateJson(val: T, signer: EventSigner, system: SystemInterface) {
this.#log("Updating: %O", val);
const next = this.#sync.value ? ({ ...this.#sync.value } as NostrEvent) : undefined;
let next = this.#sync.value ? ({ ...this.#sync.value } as NostrEvent) : undefined;
let isNew = false;
if (!next) {
throw new Error("Cannot update with no previous value");
// create a new event if we already did sync and still undefined
if (this.#sync.didSync) {
const eb = new EventBuilder();
eb.fromLink(this.link);
next = eb.build();
isNew = true;
} else {
throw new Error("Cannot update with no previous value");
}
}
next.content = JSON.stringify(val);
next.created_at = unixNow();
const prevTag = next.tags.find(a => a[0] === "previous");
if (prevTag) {
prevTag[1] = next.id;
} else {
next.tags.push(["previous", next.id]);
}
if (this.encrypt) {
next.content = await signer.nip4Encrypt(next.content, await signer.getPubKey());
}
next.id = EventExt.createId(next);
const signed = await signer.sign(next);
await this.#sync.update(signed, system);
await this.#sync.update(next, signer, system, !isNew);
this.#json = val;
this.#json.id = next.id;
}

View File

@ -1,5 +1,7 @@
import EventEmitter from "eventemitter3";
import { EventExt, EventType, NostrEvent, NostrLink, RequestBuilder, SystemInterface } from "..";
import { EventExt, EventSigner, EventType, NostrEvent, NostrLink, RequestBuilder, SystemInterface } from "..";
import { unixNow } from "@snort/shared";
import debug from "debug";
export interface SafeSyncEvents {
change: () => void;
@ -15,10 +17,16 @@ export interface SafeSyncEvents {
* 30078 (AppData)
*/
export class SafeSync extends EventEmitter<SafeSyncEvents> {
#log = debug("SafeSync");
#base: NostrEvent | undefined;
#didSync = false;
get value() {
return this.#base;
return this.#base ? Object.freeze({ ...this.#base }) : undefined;
}
get didSync() {
return this.#didSync;
}
/**
@ -26,22 +34,11 @@ export class SafeSync extends EventEmitter<SafeSyncEvents> {
* @param link A link to the kind
*/
async sync(link: NostrLink, system: SystemInterface) {
if (link.kind === undefined) {
if (link.kind === undefined || link.author === undefined) {
throw new Error("Kind must be set");
}
const rb = new RequestBuilder("sync");
const f = rb.withFilter().link(link);
if (this.#base) {
f.since(this.#base.created_at);
}
const results = await system.Fetch(rb);
const res = results.find(a => link.matchesEvent(a));
if (res && res.created_at > (this.#base?.created_at ?? 0)) {
this.#base = res;
this.emit("change");
}
return this.#base;
return await this.#sync(link, system);
}
/**
@ -56,22 +53,57 @@ export class SafeSync extends EventEmitter<SafeSyncEvents> {
/**
* Publish an update for this event
*
* Event will be signed again inside
* @param ev
*/
async update(ev: NostrEvent, system: SystemInterface) {
console.debug(this.#base, ev);
this.#checkForUpdate(ev, true);
async update(next: NostrEvent, signer: EventSigner, system: SystemInterface, mustExist?: boolean) {
next.id = "";
next.sig = "";
console.debug(this.#base, next);
const link = NostrLink.fromEvent(ev);
const signed = await this.#signEvent(next, signer);
const link = NostrLink.fromEvent(signed);
// always attempt to get a newer version before broadcasting
await this.sync(link, system);
this.#checkForUpdate(ev, true);
await this.#sync(link, system);
this.#checkForUpdate(signed, mustExist ?? true);
system.BroadcastEvent(ev);
this.#base = ev;
system.BroadcastEvent(signed);
this.#base = signed;
this.emit("change");
}
async #signEvent(next: NostrEvent, signer: EventSigner) {
next.created_at = unixNow();
if (this.#base) {
const prevTag = next.tags.find(a => a[0] === "previous");
if (prevTag) {
prevTag[1] = this.#base.id;
} else {
next.tags.push(["previous", this.#base.id]);
}
}
next.id = EventExt.createId(next);
return await signer.sign(next);
}
async #sync(link: NostrLink, system: SystemInterface) {
const rb = new RequestBuilder(`sync:${link.encode()}`);
const f = rb.withFilter().link(link);
if (this.#base) {
f.since(this.#base.created_at);
}
const results = await system.Fetch(rb);
const res = results.find(a => link.matchesEvent(a));
this.#log("Got result %O", res);
if (res && res.created_at > (this.#base?.created_at ?? 0)) {
this.#base = res;
this.emit("change");
}
this.#didSync = true;
return this.#base;
}
#checkForUpdate(ev: NostrEvent, mustExist: boolean) {
if (!this.#base) {
if (mustExist) {
@ -93,9 +125,5 @@ export class SafeSync extends EventEmitter<SafeSyncEvents> {
if (this.#base.created_at >= ev.created_at) {
throw new Error("Same version, cannot update");
}
const link = NostrLink.fromEvent(ev);
if (!link.matchesEvent(this.#base)) {
throw new Error("Invalid event");
}
}
}