feat: negentropy

This commit is contained in:
2024-01-25 15:21:42 +00:00
parent 9a0bbb8b74
commit d7460651c8
31 changed files with 924 additions and 266 deletions

View File

@ -0,0 +1,60 @@
import { sha256 } from "@noble/hashes/sha256";
import { encodeVarInt, FINGERPRINT_SIZE } from "./utils";
export class Accumulator {
#buf!: Uint8Array;
constructor() {
this.setToZero();
}
setToZero() {
this.#buf = new Uint8Array(32);
}
add(otherBuf: Uint8Array) {
let currCarry = 0,
nextCarry = 0;
const p = new DataView(this.#buf.buffer);
const po = new DataView(otherBuf.buffer);
for (let i = 0; i < 8; i++) {
const offset = i * 4;
const orig = p.getUint32(offset, true);
const otherV = po.getUint32(offset, true);
let next = orig;
next += currCarry;
next += otherV;
if (next > 4294967295) nextCarry = 1;
p.setUint32(offset, next & 4294967295, true);
currCarry = nextCarry;
nextCarry = 0;
}
}
negate() {
const p = new DataView(this.#buf.buffer);
for (let i = 0; i < 8; i++) {
let offset = i * 4;
p.setUint32(offset, ~p.getUint32(offset, true));
}
const one = new Uint8Array(32);
one[0] = 1;
this.add(one);
}
getFingerprint(n: number) {
const varInt = encodeVarInt(n);
const copy = new Uint8Array(this.#buf.length + varInt.length);
copy.set(this.#buf);
copy.set(varInt, this.#buf.length);
const hash = sha256(copy);
return hash.subarray(0, FINGERPRINT_SIZE);
}
}

View File

@ -0,0 +1,94 @@
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
import { Connection } from "../connection";
import { ReqFilter, TaggedNostrEvent } from "../nostr";
import { Negentropy } from "./negentropy";
import { NegentropyStorageVector } from "./vector-storage";
import debug from "debug";
import EventEmitter from "eventemitter3";
export interface NegentropyFlowEvents {
/**
* When sync is finished emit a set of filters which can resolve sync
*/
finish: (req: Array<ReqFilter>) => void;
}
/**
* Negentropy sync flow on connection
*/
export class NegentropyFlow extends EventEmitter<NegentropyFlowEvents> {
readonly idSize: number = 16;
#log = debug("NegentropyFlow");
#id: string;
#connection: Connection;
#filters: Array<ReqFilter>;
#negentropy: Negentropy;
#need: Array<string> = [];
constructor(id: string, conn: Connection, set: Array<TaggedNostrEvent>, filters: Array<ReqFilter>) {
super();
this.#id = id;
this.#connection = conn;
this.#filters = filters;
this.#connection.on("unknownMessage", this.#handleMessage.bind(this));
this.#connection.on("notice", n => this.#handleMessage.bind(this));
const storage = new NegentropyStorageVector();
set.forEach(a => storage.insert(a.created_at, a.id));
storage.seal();
this.#negentropy = new Negentropy(storage, 50_000);
}
/**
* Start sync
*/
start() {
const init = this.#negentropy.initiate();
this.#connection.send(["NEG-OPEN", this.#id, this.#filters, bytesToHex(init)]);
}
#handleMessage(msg: Array<any>) {
try {
switch (msg[0] as string) {
case "NOTICE": {
if ((msg[1] as string).includes("negentropy disabled")) {
this.#log("SYNC ERROR: %s", msg[1]);
this.#cleanup();
}
break;
}
case "NEG-ERROR": {
if (msg[1] !== this.#id) break;
this.#log("SYNC ERROR %s", msg[2]);
this.#cleanup();
break;
}
case "NEG-MSG": {
if (msg[1] !== this.#id) break;
const query = hexToBytes(msg[2] as string);
const [nextMsg, _, need] = this.#negentropy.reconcile(query);
if (need.length > 0) {
this.#need.push(...need.map(bytesToHex));
}
if (nextMsg) {
this.#connection.send(["NEG-MSG", this.#id, bytesToHex(nextMsg)]);
} else {
this.#connection.send(["NEG-CLOSE", this.#id]);
this.#cleanup();
}
break;
}
}
} catch (e) {
debugger;
console.error(e);
}
}
#cleanup() {
this.#connection.off("unknownMessage", this.#handleMessage.bind(this));
this.#connection.off("notice", n => this.#handleMessage.bind(this));
this.emit("finish", this.#need.length > 0 ? [{ ids: this.#need }] : []);
}
}

View File

@ -0,0 +1,303 @@
import { bytesToHex } from "@noble/hashes/utils";
import { WrappedBuffer } from "./wrapped-buffer";
import { NegentropyStorageVector, VectorStorageItem } from "./vector-storage";
import {
PROTOCOL_VERSION,
getByte,
encodeVarInt,
Mode,
decodeVarInt,
getBytes,
FINGERPRINT_SIZE,
compareUint8Array,
} from "./utils";
export class Negentropy {
readonly #storage: NegentropyStorageVector;
readonly #frameSizeLimit: number;
#lastTimestampIn: number;
#lastTimestampOut: number;
#isInitiator: boolean = false;
constructor(storage: NegentropyStorageVector, frameSizeLimit = 0) {
if (frameSizeLimit !== 0 && frameSizeLimit < 4096) throw Error("frameSizeLimit too small");
this.#storage = storage;
this.#frameSizeLimit = frameSizeLimit;
this.#lastTimestampIn = 0;
this.#lastTimestampOut = 0;
}
#bound(timestamp: number, id?: Uint8Array) {
return { timestamp, id: id ? id : new Uint8Array(0) };
}
initiate() {
if (this.#isInitiator) throw Error("already initiated");
this.#isInitiator = true;
const output = new WrappedBuffer();
output.set([PROTOCOL_VERSION]);
this.splitRange(0, this.#storage.size(), this.#bound(Number.MAX_VALUE), output);
return this.#renderOutput(output);
}
setInitiator() {
this.#isInitiator = true;
}
reconcile(query: WrappedBuffer | Uint8Array): [Uint8Array | undefined, Array<Uint8Array>, Array<Uint8Array>] {
let haveIds: Array<Uint8Array> = [],
needIds: Array<Uint8Array> = [];
query = query instanceof WrappedBuffer ? query : new WrappedBuffer(query);
this.#lastTimestampIn = this.#lastTimestampOut = 0; // reset for each message
const fullOutput = new WrappedBuffer();
fullOutput.set([PROTOCOL_VERSION]);
const protocolVersion = getByte(query);
if (protocolVersion < 96 || protocolVersion > 111) throw Error("invalid negentropy protocol version byte");
if (protocolVersion !== PROTOCOL_VERSION) {
if (this.#isInitiator)
throw Error("unsupported negentropy protocol version requested: " + (protocolVersion - 96));
else return [this.#renderOutput(fullOutput), haveIds, needIds];
}
const storageSize = this.#storage.size();
let prevBound = this.#bound(0);
let prevIndex = 0;
let skip = false;
while (query.length !== 0) {
let o = new WrappedBuffer();
let doSkip = () => {
if (skip) {
skip = false;
o.append(this.encodeBound(prevBound));
o.append(encodeVarInt(Mode.Skip));
}
};
let currBound = this.decodeBound(query);
let mode = query.length === 0 ? 0 : decodeVarInt(query);
let lower = prevIndex;
let upper = this.#storage.findLowerBound(prevIndex, storageSize, currBound);
if (mode === Mode.Skip) {
skip = true;
} else if (mode === Mode.Fingerprint) {
let theirFingerprint = getBytes(query, FINGERPRINT_SIZE);
let ourFingerprint = this.#storage.fingerprint(lower, upper);
if (compareUint8Array(theirFingerprint, ourFingerprint) !== 0) {
doSkip();
this.splitRange(lower, upper, currBound, o);
} else {
skip = true;
}
} else if (mode === Mode.IdList) {
let numIds = decodeVarInt(query);
let theirElems = {} as Record<string, Uint8Array>; // stringified Uint8Array -> original Uint8Array (or hex)
for (let i = 0; i < numIds; i++) {
let e = getBytes(query, this.#storage.idSize);
theirElems[bytesToHex(e)] = e;
}
this.#storage.iterate(lower, upper, item => {
let k = bytesToHex(item.id);
if (!theirElems[k]) {
// ID exists on our side, but not their side
if (this.#isInitiator) haveIds.push(item.id);
} else {
// ID exists on both sides
delete theirElems[k];
}
return true;
});
if (this.#isInitiator) {
skip = true;
for (let v of Object.values(theirElems)) {
// ID exists on their side, but not our side
needIds.push(v);
}
} else {
doSkip();
let responseIds = new WrappedBuffer();
let numResponseIds = 0;
let endBound = currBound;
this.#storage.iterate(lower, upper, (item, index) => {
if (this.exceededFrameSizeLimit(fullOutput.length + responseIds.length)) {
endBound = item;
upper = index; // shrink upper so that remaining range gets correct fingerprint
return false;
}
responseIds.append(item.id);
numResponseIds++;
return true;
});
o.append(this.encodeBound(endBound));
o.append(encodeVarInt(Mode.IdList));
o.append(encodeVarInt(numResponseIds));
o.append(responseIds.unwrap());
fullOutput.append(o.unwrap());
o.clear();
}
} else {
throw Error("unexpected mode");
}
if (this.exceededFrameSizeLimit(fullOutput.length + o.length)) {
// frameSizeLimit exceeded: Stop range processing and return a fingerprint for the remaining range
let remainingFingerprint = this.#storage.fingerprint(upper, storageSize);
fullOutput.append(this.encodeBound(this.#bound(Number.MAX_VALUE)));
fullOutput.append(encodeVarInt(Mode.Fingerprint));
fullOutput.append(remainingFingerprint);
break;
} else {
fullOutput.append(o.unwrap());
}
prevIndex = upper;
prevBound = currBound;
}
return [
fullOutput.length === 1 && this.#isInitiator ? undefined : this.#renderOutput(fullOutput),
haveIds,
needIds,
];
}
async splitRange(lower: number, upper: number, upperBound: VectorStorageItem, o: WrappedBuffer) {
const numElems = upper - lower;
const buckets = 16;
if (numElems < buckets * 2) {
o.append(this.encodeBound(upperBound));
o.append(encodeVarInt(Mode.IdList));
o.append(encodeVarInt(numElems));
this.#storage.iterate(lower, upper, item => {
o.append(item.id);
return true;
});
} else {
const itemsPerBucket = Math.floor(numElems / buckets);
const bucketsWithExtra = numElems % buckets;
let curr = lower;
for (let i = 0; i < buckets; i++) {
let bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0);
let ourFingerprint = this.#storage.fingerprint(curr, curr + bucketSize);
curr += bucketSize;
let nextBound;
if (curr === upper) {
nextBound = upperBound;
} else {
let prevItem: VectorStorageItem, currItem: VectorStorageItem;
this.#storage.iterate(curr - 1, curr + 1, (item, index) => {
if (index === curr - 1) prevItem = item;
else currItem = item;
return true;
});
nextBound = this.getMinimalBound(prevItem!, currItem!);
}
o.append(this.encodeBound(nextBound));
o.append(encodeVarInt(Mode.Fingerprint));
o.append(ourFingerprint);
}
}
}
#renderOutput(o: WrappedBuffer) {
return o.unwrap();
}
exceededFrameSizeLimit(n: number) {
return this.#frameSizeLimit && n > this.#frameSizeLimit - 200;
}
// Decoding
decodeTimestampIn(encoded: Uint8Array | WrappedBuffer) {
let timestamp = decodeVarInt(encoded);
timestamp = timestamp === 0 ? Number.MAX_VALUE : timestamp - 1;
if (this.#lastTimestampIn === Number.MAX_VALUE || timestamp === Number.MAX_VALUE) {
this.#lastTimestampIn = Number.MAX_VALUE;
return Number.MAX_VALUE;
}
timestamp += this.#lastTimestampIn;
this.#lastTimestampIn = timestamp;
return timestamp;
}
decodeBound(encoded: Uint8Array | WrappedBuffer) {
const timestamp = this.decodeTimestampIn(encoded);
const len = decodeVarInt(encoded);
if (len > this.#storage.idSize) throw Error("bound key too long");
const id = new Uint8Array(this.#storage.idSize);
const encodedId = getBytes(encoded, Math.min(len, encoded.length));
id.set(encodedId);
return { timestamp, id };
}
// Encoding
encodeTimestampOut(timestamp: number) {
if (timestamp === Number.MAX_VALUE) {
this.#lastTimestampOut = Number.MAX_VALUE;
return encodeVarInt(0);
}
let temp = timestamp;
timestamp -= this.#lastTimestampOut;
this.#lastTimestampOut = temp;
return encodeVarInt(timestamp + 1);
}
encodeBound(key: VectorStorageItem) {
const tsBytes = this.encodeTimestampOut(key.timestamp);
const idLenBytes = encodeVarInt(key.id.length);
const output = new Uint8Array(tsBytes.length + idLenBytes.length + key.id.length);
output.set(tsBytes);
output.set(idLenBytes, tsBytes.length);
output.set(key.id, tsBytes.length + idLenBytes.length);
return output;
}
getMinimalBound(prev: VectorStorageItem, curr: VectorStorageItem) {
if (curr.timestamp !== prev.timestamp) {
return this.#bound(curr.timestamp);
} else {
let sharedPrefixBytes = 0;
let currKey = curr.id;
let prevKey = prev.id;
for (let i = 0; i < this.#storage.idSize; i++) {
if (currKey[i] !== prevKey[i]) break;
sharedPrefixBytes++;
}
return this.#bound(curr.timestamp, curr.id.subarray(0, sharedPrefixBytes + 1));
}
}
}

View File

@ -0,0 +1,83 @@
import { VectorStorageItem } from "./vector-storage";
import { WrappedBuffer } from "./wrapped-buffer";
export const PROTOCOL_VERSION = 0x61; // Version 1
export const FINGERPRINT_SIZE = 16;
export const enum Mode {
Skip = 0,
Fingerprint = 1,
IdList = 2,
}
/**
* Decode variable int, also consumes the bytes from buf
*/
export function decodeVarInt(buf: Uint8Array | WrappedBuffer) {
let res = 0;
while (1) {
if (buf.length === 0) throw Error("parse ends prematurely");
let byte = 0;
if (buf instanceof WrappedBuffer) {
byte = buf.shift();
} else {
byte = buf[0];
buf = buf.subarray(1);
}
res = (res << 7) | (byte & 127);
if ((byte & 128) === 0) break;
}
return res;
}
export function encodeVarInt(n: number) {
if (n === 0) return new Uint8Array([0]);
let o = [];
while (n !== 0) {
o.push(n & 127);
n >>>= 7;
}
o.reverse();
for (let i = 0; i < o.length - 1; i++) o[i] |= 128;
return new Uint8Array(o);
}
export function getByte(buf: WrappedBuffer) {
return getBytes(buf, 1)[0];
}
export function getBytes(buf: WrappedBuffer | Uint8Array, n: number) {
if (buf.length < n) throw Error("parse ends prematurely");
if (buf instanceof WrappedBuffer) {
return buf.shiftN(n);
} else {
const ret = buf.subarray(0, n);
buf = buf.subarray(n);
return ret;
}
}
export function compareUint8Array(a: Uint8Array, b: Uint8Array) {
for (let i = 0; i < a.byteLength; i++) {
if (a[i] < b[i]) return -1;
if (a[i] > b[i]) return 1;
}
if (a.byteLength > b.byteLength) return 1;
if (a.byteLength < b.byteLength) return -1;
return 0;
}
export function itemCompare(a: VectorStorageItem, b: VectorStorageItem) {
if (a.timestamp === b.timestamp) {
return compareUint8Array(a.id, b.id);
}
return a.timestamp - b.timestamp;
}

View File

@ -0,0 +1,116 @@
import { hexToBytes } from "@noble/hashes/utils";
import { Accumulator } from "./accumulator";
import { itemCompare } from "./utils";
export interface VectorStorageItem {
timestamp: number;
id: Uint8Array;
}
const IdSize = 32;
export class NegentropyStorageVector {
#items: Array<VectorStorageItem> = [];
#sealed = false;
constructor(other?: Array<VectorStorageItem>) {
if (other) {
this.#items = other;
this.#sealed = true;
}
}
get idSize() {
return IdSize;
}
insert(timestamp: number, id: string) {
if (this.#sealed) throw Error("already sealed");
const idData = hexToBytes(id);
if (idData.byteLength !== IdSize) throw Error("bad id size for added item");
this.#items.push({ timestamp, id: idData });
}
seal() {
if (this.#sealed) throw Error("already sealed");
this.#sealed = true;
this.#items.sort(itemCompare);
for (let i = 1; i < this.#items.length; i++) {
if (itemCompare(this.#items[i - 1], this.#items[i]) === 0) {
debugger;
throw Error("duplicate item inserted");
}
}
}
unseal() {
this.#sealed = false;
}
size() {
this.#checkSealed();
return this.#items.length;
}
getItem(i: number) {
this.#checkSealed();
if (i >= this.#items.length) throw Error("out of range");
return this.#items[i];
}
iterate(begin: number, end: number, cb: (item: VectorStorageItem, index: number) => boolean) {
this.#checkSealed();
this.#checkBounds(begin, end);
for (let i = begin; i < end; ++i) {
if (!cb(this.#items[i], i)) break;
}
}
findLowerBound(begin: number, end: number, bound: VectorStorageItem) {
this.#checkSealed();
this.#checkBounds(begin, end);
return this.#binarySearch(this.#items, begin, end, a => itemCompare(a, bound) < 0);
}
fingerprint(begin: number, end: number) {
const out = new Accumulator();
this.iterate(begin, end, item => {
out.add(item.id);
return true;
});
return out.getFingerprint(end - begin);
}
#checkSealed() {
if (!this.#sealed) throw Error("not sealed");
}
#checkBounds(begin: number, end: number) {
if (begin > end || end > this.#items.length) throw Error("bad range");
}
#binarySearch(arr: Array<VectorStorageItem>, first: number, last: number, cmp: (item: VectorStorageItem) => boolean) {
let count = last - first;
while (count > 0) {
let it = first;
let step = Math.floor(count / 2);
it += step;
if (cmp(arr[it])) {
first = ++it;
count -= step + 1;
} else {
count = step;
}
}
return first;
}
}

View File

@ -0,0 +1,62 @@
export class WrappedBuffer {
#raw: Uint8Array;
#length: number;
constructor(buffer?: Uint8Array) {
this.#raw = buffer ? new Uint8Array(buffer) : new Uint8Array(512);
this.#length = buffer ? buffer.length : 0;
}
unwrap() {
return this.#raw.subarray(0, this.#length);
}
get capacity() {
return this.#raw.byteLength;
}
get length() {
return this.#length;
}
set(val: ArrayLike<number>, offset?: number) {
this.#raw.set(val, offset);
this.#length = (offset ?? 0) + val.length;
}
append(val: ArrayLike<number>) {
const targetSize = val.length + this.#length;
this.resize(targetSize);
this.#raw.set(val, this.#length);
this.#length += val.length;
}
clear() {
this.#length = 0;
this.#raw.fill(0);
}
resize(newSize: number) {
if (this.capacity < newSize) {
const newCapacity = Math.max(this.capacity * 2, newSize);
const newArr = new Uint8Array(newCapacity);
newArr.set(this.#raw);
this.#raw = newArr;
}
}
shift() {
const first = this.#raw[0];
this.#raw = this.#raw.subarray(1);
this.#length--;
return first;
}
shiftN(n = 1) {
const firstSubarray = this.#raw.subarray(0, n);
this.#raw = this.#raw.subarray(n);
this.#length -= n;
return firstSubarray;
}
}