diff --git a/packages/app/src/Components/Embed/NostrLink.tsx b/packages/app/src/Components/Embed/NostrLink.tsx
index 5af0eb56..d9722f42 100644
--- a/packages/app/src/Components/Embed/NostrLink.tsx
+++ b/packages/app/src/Components/Embed/NostrLink.tsx
@@ -8,10 +8,6 @@ export default function NostrLink({ link, depth }: { link: string; depth?: numbe
const nav = tryParseNostrLink(link);
if (nav?.type === NostrPrefix.PublicKey || nav?.type === NostrPrefix.Profile) {
- if (nav.id.startsWith("npub")) {
- // eslint-disable-next-line no-debugger
- debugger;
- }
return ;
} else if (nav?.type === NostrPrefix.Note || nav?.type === NostrPrefix.Event || nav?.type === NostrPrefix.Address) {
if ((depth ?? 0) > 0) {
diff --git a/packages/app/src/Pages/onboarding/start.tsx b/packages/app/src/Pages/onboarding/start.tsx
index 2667b090..49c57603 100644
--- a/packages/app/src/Pages/onboarding/start.tsx
+++ b/packages/app/src/Pages/onboarding/start.tsx
@@ -104,7 +104,7 @@ export function SignIn() {
id: "X7xU8J",
})}
value={key}
- onChange={onChange} // TODO should log in directly if nsec or npub is pasted
+ onChange={onChange}
className="new-username"
/>
{error && {error}}
diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts
index 96a21e93..ff17d4fe 100644
--- a/packages/system/src/connection.ts
+++ b/packages/system/src/connection.ts
@@ -31,7 +31,15 @@ interface ConnectionEvents {
unknownMessage: (obj: Array) => void;
}
+/**
+ * SYNC command is an internal command that requests the connection to devise a strategy
+ * to synchronize based on a set of existing cached events and a filter set.
+ */
export type SyncCommand = ["SYNC", id: string, fromSet: Array, ...filters: Array];
+
+/**
+ * Pending REQ queue
+ */
interface ConnectionQueueItem {
obj: ReqCommand | SyncCommand;
cb: () => void;
@@ -228,7 +236,6 @@ export class Connection extends EventEmitter {
break;
}
default: {
- this.#log(`Unknown tag: ${tag}`);
this.emit("unknownMessage", msg);
break;
}
@@ -333,7 +340,10 @@ export class Connection extends EventEmitter {
this.#log("Queuing: %O", cmd);
} else {
this.ActiveRequests.add(cmd[1]);
- this.#sendRequestCommand(cmd);
+ this.#sendRequestCommand({
+ obj: cmd,
+ cb: cbSent,
+ });
cbSent();
}
this.emit("change");
@@ -354,30 +364,42 @@ export class Connection extends EventEmitter {
for (let x = 0; x < canSend; x++) {
const p = this.PendingRequests.shift();
if (p) {
- this.#sendRequestCommand(p.obj);
- p.cb();
+ this.#sendRequestCommand(p);
this.#log("Sent pending REQ %O", p.obj);
}
}
}
}
- #sendRequestCommand(cmd: ReqCommand | SyncCommand) {
+ #sendRequestCommand(item: ConnectionQueueItem) {
try {
+ const cmd = item.obj;
if (cmd[0] === "REQ") {
this.ActiveRequests.add(cmd[1]);
this.send(cmd);
} else if (cmd[0] === "SYNC") {
+ const [_, id, eventSet, ...filters] = cmd;
+ const lastResortSync = () => {
+ const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
+ const newFilters = filters.map(a => ({
+ ...a,
+ since: latest,
+ }));
+ this.queueReq(["REQ", id, ...newFilters], item.cb);
+ };
if (this.Info?.software?.includes("strfry")) {
- const neg = new NegentropyFlow(cmd[1], this, cmd[2], cmd.slice(3) as Array);
+ const neg = new NegentropyFlow(id, this, eventSet, filters);
neg.once("finish", filters => {
if (filters.length > 0) {
- this.queueReq(["REQ", cmd[1], ...filters], () => {});
+ this.queueReq(["REQ", cmd[1], ...filters], item.cb);
}
});
+ neg.once("error", () => {
+ lastResortSync();
+ });
neg.start();
} else {
- throw new Error("SYNC not supported");
+ lastResortSync();
}
}
} catch (e) {
diff --git a/packages/system/src/negentropy/negentropy-flow.ts b/packages/system/src/negentropy/negentropy-flow.ts
index 7d0f34fc..396ddf4b 100644
--- a/packages/system/src/negentropy/negentropy-flow.ts
+++ b/packages/system/src/negentropy/negentropy-flow.ts
@@ -11,6 +11,11 @@ export interface NegentropyFlowEvents {
* When sync is finished emit a set of filters which can resolve sync
*/
finish: (req: Array) => void;
+
+ /**
+ * If an error is detected and Negentropy flow is not supported
+ */
+ error: () => void;
}
/**
@@ -31,8 +36,8 @@ export class NegentropyFlow extends EventEmitter {
this.#connection = conn;
this.#filters = filters;
- this.#connection.on("unknownMessage", this.#handleMessage.bind(this));
- this.#connection.on("notice", n => this.#handleMessage.bind(this));
+ this.#connection.on("unknownMessage", msg => this.#handleMessage(msg));
+ this.#connection.on("notice", n => this.#handleMessage(["NOTICE", n]));
const storage = new NegentropyStorageVector();
set.forEach(a => storage.insert(a.created_at, a.id));
@@ -52,22 +57,25 @@ export class NegentropyFlow extends EventEmitter {
try {
switch (msg[0] as string) {
case "NOTICE": {
- if ((msg[1] as string).includes("negentropy disabled")) {
- this.#log("SYNC ERROR: %s", msg[1]);
- this.#cleanup();
+ const [_, errorMsg] = msg as [string, string];
+ if (errorMsg.includes("negentropy disabled") || errorMsg.includes("negentropy error")) {
+ this.#log("SYNC ERROR: %s", errorMsg);
+ this.#cleanup(true);
}
break;
}
case "NEG-ERROR": {
- if (msg[1] !== this.#id) break;
- this.#log("SYNC ERROR %s", msg[2]);
- this.#cleanup();
+ const [_, id, errorMsg] = msg as [string, string, string];
+ if (id !== this.#id) break;
+ this.#log("SYNC ERROR %s", errorMsg);
+ this.#cleanup(true);
break;
}
case "NEG-MSG": {
- if (msg[1] !== this.#id) break;
- const query = hexToBytes(msg[2] as string);
- const [nextMsg, _, need] = this.#negentropy.reconcile(query);
+ const [, id, payload] = msg as [string, string, string];
+ if (id !== this.#id) break;
+ const query = hexToBytes(payload);
+ const [nextMsg, , need] = this.#negentropy.reconcile(query);
if (need.length > 0) {
this.#need.push(...need.map(bytesToHex));
}
@@ -81,14 +89,16 @@ export class NegentropyFlow extends EventEmitter {
}
}
} catch (e) {
- debugger;
console.error(e);
}
}
- #cleanup() {
- this.#connection.off("unknownMessage", this.#handleMessage.bind(this));
- this.#connection.off("notice", n => this.#handleMessage.bind(this));
+ #cleanup(error = false) {
+ this.#connection.off("unknownMessage", msg => this.#handleMessage(msg));
+ this.#connection.off("notice", n => this.#handleMessage(["NOTICE", n]));
this.emit("finish", this.#need.length > 0 ? [{ ids: this.#need }] : []);
+ if (error) {
+ this.emit("error");
+ }
}
}
diff --git a/packages/system/src/negentropy/vector-storage.ts b/packages/system/src/negentropy/vector-storage.ts
index 5d3c89b2..e206831b 100644
--- a/packages/system/src/negentropy/vector-storage.ts
+++ b/packages/system/src/negentropy/vector-storage.ts
@@ -39,7 +39,6 @@ export class NegentropyStorageVector {
for (let i = 1; i < this.#items.length; i++) {
if (itemCompare(this.#items[i - 1], this.#items[i]) === 0) {
- debugger;
throw Error("duplicate item inserted");
}
}
diff --git a/packages/system/src/request-builder.ts b/packages/system/src/request-builder.ts
index 1e4a7251..b71b8ed4 100644
--- a/packages/system/src/request-builder.ts
+++ b/packages/system/src/request-builder.ts
@@ -147,7 +147,7 @@ export class RequestBuilder {
const ts = unixNowMs() - start;
this.#log("buildDiff %s %d ms +%d", this.id, ts, diff.length);
if (diff.length > 0) {
- // todo: fix
+ // todo: fix for explicit relays
return splitFlatByWriteRelays(system.relayCache, diff).map(a => {
return {
strategy: RequestStrategy.AuthorsRelays,