feat: add fallback sync
This commit is contained in:
@ -11,6 +11,11 @@ export interface NegentropyFlowEvents {
|
||||
* When sync is finished emit a set of filters which can resolve sync
|
||||
*/
|
||||
finish: (req: Array<ReqFilter>) => void;
|
||||
|
||||
/**
|
||||
* If an error is detected and Negentropy flow is not supported
|
||||
*/
|
||||
error: () => void;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -31,8 +36,8 @@ export class NegentropyFlow extends EventEmitter<NegentropyFlowEvents> {
|
||||
this.#connection = conn;
|
||||
this.#filters = filters;
|
||||
|
||||
this.#connection.on("unknownMessage", this.#handleMessage.bind(this));
|
||||
this.#connection.on("notice", n => this.#handleMessage.bind(this));
|
||||
this.#connection.on("unknownMessage", msg => this.#handleMessage(msg));
|
||||
this.#connection.on("notice", n => this.#handleMessage(["NOTICE", n]));
|
||||
|
||||
const storage = new NegentropyStorageVector();
|
||||
set.forEach(a => storage.insert(a.created_at, a.id));
|
||||
@ -52,22 +57,25 @@ export class NegentropyFlow extends EventEmitter<NegentropyFlowEvents> {
|
||||
try {
|
||||
switch (msg[0] as string) {
|
||||
case "NOTICE": {
|
||||
if ((msg[1] as string).includes("negentropy disabled")) {
|
||||
this.#log("SYNC ERROR: %s", msg[1]);
|
||||
this.#cleanup();
|
||||
const [_, errorMsg] = msg as [string, string];
|
||||
if (errorMsg.includes("negentropy disabled") || errorMsg.includes("negentropy error")) {
|
||||
this.#log("SYNC ERROR: %s", errorMsg);
|
||||
this.#cleanup(true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "NEG-ERROR": {
|
||||
if (msg[1] !== this.#id) break;
|
||||
this.#log("SYNC ERROR %s", msg[2]);
|
||||
this.#cleanup();
|
||||
const [_, id, errorMsg] = msg as [string, string, string];
|
||||
if (id !== this.#id) break;
|
||||
this.#log("SYNC ERROR %s", errorMsg);
|
||||
this.#cleanup(true);
|
||||
break;
|
||||
}
|
||||
case "NEG-MSG": {
|
||||
if (msg[1] !== this.#id) break;
|
||||
const query = hexToBytes(msg[2] as string);
|
||||
const [nextMsg, _, need] = this.#negentropy.reconcile(query);
|
||||
const [, id, payload] = msg as [string, string, string];
|
||||
if (id !== this.#id) break;
|
||||
const query = hexToBytes(payload);
|
||||
const [nextMsg, , need] = this.#negentropy.reconcile(query);
|
||||
if (need.length > 0) {
|
||||
this.#need.push(...need.map(bytesToHex));
|
||||
}
|
||||
@ -81,14 +89,16 @@ export class NegentropyFlow extends EventEmitter<NegentropyFlowEvents> {
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
debugger;
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
#cleanup() {
|
||||
this.#connection.off("unknownMessage", this.#handleMessage.bind(this));
|
||||
this.#connection.off("notice", n => this.#handleMessage.bind(this));
|
||||
#cleanup(error = false) {
|
||||
this.#connection.off("unknownMessage", msg => this.#handleMessage(msg));
|
||||
this.#connection.off("notice", n => this.#handleMessage(["NOTICE", n]));
|
||||
this.emit("finish", this.#need.length > 0 ? [{ ids: this.#need }] : []);
|
||||
if (error) {
|
||||
this.emit("error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -39,7 +39,6 @@ export class NegentropyStorageVector {
|
||||
|
||||
for (let i = 1; i < this.#items.length; i++) {
|
||||
if (itemCompare(this.#items[i - 1], this.#items[i]) === 0) {
|
||||
debugger;
|
||||
throw Error("duplicate item inserted");
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user