feat: safe sync appdata
This commit is contained in:
7
packages/system/src/sync/index.ts
Normal file
7
packages/system/src/sync/index.ts
Normal file
@ -0,0 +1,7 @@
|
||||
export interface HasId {
|
||||
id: string;
|
||||
}
|
||||
|
||||
export * from "./safe-sync";
|
||||
export * from "./range-sync";
|
||||
export * from "./json-in-event-sync";
|
76
packages/system/src/sync/json-in-event-sync.ts
Normal file
76
packages/system/src/sync/json-in-event-sync.ts
Normal file
@ -0,0 +1,76 @@
|
||||
import { SafeSync } from "./safe-sync";
|
||||
import { HasId } from ".";
|
||||
import { EventExt, EventSigner, NostrEvent, NostrLink, SystemInterface } from "..";
|
||||
import debug from "debug";
|
||||
import EventEmitter from "eventemitter3";
|
||||
import { unixNow } from "@snort/shared";
|
||||
|
||||
export interface JsonSyncEvents {
|
||||
change: () => void;
|
||||
}
|
||||
|
||||
export class JsonEventSync<T extends HasId> extends EventEmitter<JsonSyncEvents> {
|
||||
#log = debug("JsonEventSync");
|
||||
#sync: SafeSync;
|
||||
#json: T;
|
||||
|
||||
constructor(
|
||||
initValue: T,
|
||||
readonly encrypt: boolean,
|
||||
) {
|
||||
super();
|
||||
this.#sync = new SafeSync();
|
||||
this.#json = initValue;
|
||||
|
||||
this.#sync.on("change", () => this.emit("change"));
|
||||
}
|
||||
|
||||
get json(): Readonly<T> {
|
||||
const ret = { ...this.#json };
|
||||
return Object.freeze(ret);
|
||||
}
|
||||
|
||||
async sync(link: NostrLink, signer: EventSigner, system: SystemInterface) {
|
||||
const res = await this.#sync.sync(link, system);
|
||||
this.#log("Sync result %O", res);
|
||||
if (res) {
|
||||
if (this.encrypt) {
|
||||
this.#json = JSON.parse(await signer.nip4Decrypt(res.content, await signer.getPubKey())) as T;
|
||||
} else {
|
||||
this.#json = JSON.parse(res.content) as T;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the json content in the event
|
||||
* @param val
|
||||
* @param signer
|
||||
*/
|
||||
async updateJson(val: T, signer: EventSigner, system: SystemInterface) {
|
||||
this.#log("Updating: %O", val);
|
||||
const next = this.#sync.value ? ({ ...this.#sync.value } as NostrEvent) : undefined;
|
||||
if (!next) {
|
||||
throw new Error("Cannot update with no previous value");
|
||||
}
|
||||
|
||||
next.content = JSON.stringify(val);
|
||||
next.created_at = unixNow();
|
||||
|
||||
const prevTag = next.tags.find(a => a[0] === "previous");
|
||||
if (prevTag) {
|
||||
prevTag[1] = next.id;
|
||||
} else {
|
||||
next.tags.push(["previous", next.id]);
|
||||
}
|
||||
if (this.encrypt) {
|
||||
next.content = await signer.nip4Encrypt(next.content, await signer.getPubKey());
|
||||
}
|
||||
next.id = EventExt.createId(next);
|
||||
const signed = await signer.sign(next);
|
||||
|
||||
await this.#sync.update(signed, system);
|
||||
this.#json = val;
|
||||
this.#json.id = next.id;
|
||||
}
|
||||
}
|
70
packages/system/src/sync/range-sync.ts
Normal file
70
packages/system/src/sync/range-sync.ts
Normal file
@ -0,0 +1,70 @@
|
||||
import { unixNow } from "@snort/shared";
|
||||
import EventEmitter from "eventemitter3";
|
||||
import { ReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from "..";
|
||||
|
||||
/**
|
||||
* When nostr was created
|
||||
*/
|
||||
const NostrBirthday: number = new Date(2021, 1, 1).getTime() / 1000;
|
||||
|
||||
interface RangeSyncEvents {
|
||||
event: (ev: Array<TaggedNostrEvent>) => void;
|
||||
scan: (from: number) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple time based sync for pulling lots of data from nostr
|
||||
*/
|
||||
export class RangeSync extends EventEmitter<RangeSyncEvents> {
|
||||
#start: number = NostrBirthday;
|
||||
#windowSize: number = 60 * 60 * 12;
|
||||
|
||||
constructor(readonly system: SystemInterface) {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set window size in seconds
|
||||
*/
|
||||
setWindowSize(n: number) {
|
||||
if (n < 60) {
|
||||
throw new Error("Window size too small");
|
||||
}
|
||||
this.#windowSize = n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set start time for range sync
|
||||
* @param n Unix timestamp
|
||||
*/
|
||||
setStartPoint(n: number) {
|
||||
if (n < NostrBirthday) {
|
||||
throw new Error("Start point cannot be before nostr's birthday");
|
||||
}
|
||||
this.#start = n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Request to sync with a given filter
|
||||
*/
|
||||
async sync(filter: ReqFilter) {
|
||||
if (filter.since !== undefined || filter.until !== undefined || filter.limit !== undefined) {
|
||||
throw new Error("Filter must not contain since/until/limit");
|
||||
}
|
||||
|
||||
if (!this.system.requestRouter) {
|
||||
throw new Error("RangeSync cannot work without request router!");
|
||||
}
|
||||
|
||||
const now = unixNow();
|
||||
for (let end = now; end > this.#start; end -= this.#windowSize) {
|
||||
const rb = new RequestBuilder(`range-query:${end}`);
|
||||
rb.withBareFilter(filter)
|
||||
.since(end - this.#windowSize)
|
||||
.until(end);
|
||||
this.emit("scan", end);
|
||||
const results = await this.system.Fetch(rb);
|
||||
this.emit("event", results);
|
||||
}
|
||||
}
|
||||
}
|
101
packages/system/src/sync/safe-sync.ts
Normal file
101
packages/system/src/sync/safe-sync.ts
Normal file
@ -0,0 +1,101 @@
|
||||
import EventEmitter from "eventemitter3";
|
||||
import { EventExt, EventType, NostrEvent, NostrLink, RequestBuilder, SystemInterface } from "..";
|
||||
|
||||
export interface SafeSyncEvents {
|
||||
change: () => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely sync replacable events to nostr
|
||||
*
|
||||
* Usefule for the following critical kinds:
|
||||
* 0 (Metadata)
|
||||
* 3 (Contacts)
|
||||
* 10002 (Relays)
|
||||
* 30078 (AppData)
|
||||
*/
|
||||
export class SafeSync extends EventEmitter<SafeSyncEvents> {
|
||||
#base: NostrEvent | undefined;
|
||||
|
||||
get value() {
|
||||
return this.#base;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the latest version
|
||||
* @param link A link to the kind
|
||||
*/
|
||||
async sync(link: NostrLink, system: SystemInterface) {
|
||||
if (link.kind === undefined) {
|
||||
throw new Error("Kind must be set");
|
||||
}
|
||||
|
||||
const rb = new RequestBuilder("sync");
|
||||
const f = rb.withFilter().link(link);
|
||||
if (this.#base) {
|
||||
f.since(this.#base.created_at);
|
||||
}
|
||||
const results = await system.Fetch(rb);
|
||||
const res = results.find(a => link.matchesEvent(a));
|
||||
if (res && res.created_at > (this.#base?.created_at ?? 0)) {
|
||||
this.#base = res;
|
||||
this.emit("change");
|
||||
}
|
||||
return this.#base;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the base value
|
||||
* @param ev
|
||||
*/
|
||||
setBase(ev: NostrEvent) {
|
||||
this.#checkForUpdate(ev, false);
|
||||
this.#base = ev;
|
||||
this.emit("change");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish an update for this event
|
||||
* @param ev
|
||||
*/
|
||||
async update(ev: NostrEvent, system: SystemInterface) {
|
||||
console.debug(this.#base, ev);
|
||||
this.#checkForUpdate(ev, true);
|
||||
|
||||
const link = NostrLink.fromEvent(ev);
|
||||
// always attempt to get a newer version before broadcasting
|
||||
await this.sync(link, system);
|
||||
this.#checkForUpdate(ev, true);
|
||||
|
||||
system.BroadcastEvent(ev);
|
||||
this.#base = ev;
|
||||
this.emit("change");
|
||||
}
|
||||
|
||||
#checkForUpdate(ev: NostrEvent, mustExist: boolean) {
|
||||
if (!this.#base) {
|
||||
if (mustExist) {
|
||||
throw new Error("No previous version detected");
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
const prevTag = ev.tags.find(a => a[0] === "previous");
|
||||
if (prevTag && prevTag[1] !== this.#base.id) {
|
||||
throw new Error("Previous tag does not match our version");
|
||||
}
|
||||
if (
|
||||
EventExt.getType(ev.kind) !== EventType.Replaceable &&
|
||||
EventExt.getType(ev.kind) !== EventType.ParameterizedReplaceable
|
||||
) {
|
||||
throw new Error("Not a replacable event kind");
|
||||
}
|
||||
if (this.#base.created_at >= ev.created_at) {
|
||||
throw new Error("Same version, cannot update");
|
||||
}
|
||||
const link = NostrLink.fromEvent(ev);
|
||||
if (!link.matchesEvent(this.#base)) {
|
||||
throw new Error("Invalid event");
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user