This commit is contained in:
Doug Hoyte
2023-03-06 00:30:24 -05:00
parent 45cc598b22
commit f4cd35d2ed

View File

@ -66,7 +66,8 @@ struct XorView {
if (!ready) throw herr("xor view not ready");
std::string output;
splitRange(elems.begin(), elems.end(), 0, "", 1678053147ULL, "", output);
uint64_t lastTimestampOut = 0;
splitRange(elems.begin(), elems.end(), 0, "", MAX_U64, "", lastTimestampOut, output);
return output;
}
@ -76,15 +77,26 @@ struct XorView {
std::string output;
auto prevUpper = elems.begin();
uint64_t lastTimestampIn = 0;
uint64_t lastTimestampOut = 0;
auto decodeTimestampIn = [&](std::string_view &query){
uint64_t timestamp = decodeVarInt(query);
timestamp = timestamp == 0 ? MAX_U64 : timestamp - 1;
timestamp += lastTimestampIn;
if (timestamp < lastTimestampIn) timestamp = MAX_U64; // saturate
lastTimestampIn = timestamp;
return timestamp;
};
while (query.size()) {
uint64_t lowerTimestamp = decodeVarInt(query);
uint64_t lowerTimestamp = decodeTimestampIn(query);
uint64_t lowerLength = decodeVarInt(query);
if (lowerLength > idSize) throw herr("lower too long");
auto lowerKeyRaw = getBytes(query, lowerLength);
XorElem lowerKey(lowerTimestamp, lowerKeyRaw);
uint64_t upperTimestamp = decodeVarInt(query);
uint64_t upperTimestamp = decodeTimestampIn(query);
uint64_t upperLength = decodeVarInt(query);
if (upperLength > idSize) throw herr("upper too long");
auto upperKeyRaw = getBytes(query, upperLength);
@ -102,7 +114,7 @@ struct XorView {
XorElem ourXorSet;
for (auto i = lower; i < upper; ++i) ourXorSet ^= *i;
if (theirXorSet.getId(idSize) != ourXorSet.getId(idSize)) splitRange(lower, upper, lowerTimestamp, lowerKeyRaw, upperTimestamp, upperKeyRaw, output);
if (theirXorSet.getId(idSize) != ourXorSet.getId(idSize)) splitRange(lower, upper, lowerTimestamp, lowerKeyRaw, upperTimestamp, upperKeyRaw, lastTimestampOut, output);
} else if (mode >= 8) {
flat_hash_map<XorElem, bool> theirElems;
for (uint64_t i = 0; i < mode - 8; i++) {
@ -136,7 +148,45 @@ struct XorView {
private:
void splitRange(std::vector<XorElem>::iterator lower, std::vector<XorElem>::iterator upper, uint64_t lowerTimestamp, const std::string &lowerKey, uint64_t upperTimestamp, const std::string &upperKey, std::string &output) {
void splitRange(std::vector<XorElem>::iterator lower, std::vector<XorElem>::iterator upper, uint64_t lowerTimestamp, const std::string &lowerKey, uint64_t upperTimestamp, const std::string &upperKey, uint64_t &lastTimestampOut, std::string &output) {
auto encodeTimestampOut = [&](uint64_t timestamp){
if (timestamp == MAX_U64) {
lastTimestampOut = MAX_U64;
return encodeVarInt(0);
}
uint64_t temp = timestamp;
timestamp -= lastTimestampOut;
lastTimestampOut = temp;
return encodeVarInt(timestamp + 1);
};
auto appendBoundKey = [&](uint64_t t, std::string k, std::string &output) {
output += encodeTimestampOut(t);
output += encodeVarInt(k.size());
output += k;
};
auto appendMinimalBoundKey = [&](const XorElem &curr, const XorElem &prev, std::string &output) {
output += encodeTimestampOut(curr.timestamp);
if (curr.timestamp != prev.timestamp) {
output += encodeVarInt(0);
} else {
uint64_t sharedPrefixBytes = 0;
auto currKey = curr.getId(idSize);
auto prevKey = prev.getId(idSize);
for (uint64_t i = 0; i < idSize; i++) {
if (currKey[i] != prevKey[i]) break;
sharedPrefixBytes++;
}
output += encodeVarInt(sharedPrefixBytes + 1);
output += currKey.substr(0, sharedPrefixBytes + 1);
}
};
// Split our range
uint64_t numElems = upper - lower;
const uint64_t buckets = 16;
@ -169,32 +219,6 @@ struct XorView {
}
}
}
void appendBoundKey(uint64_t t, std::string k, std::string &output) {
output += encodeVarInt(t);
output += encodeVarInt(k.size());
output += k;
}
void appendMinimalBoundKey(const XorElem &curr, const XorElem &prev, std::string &output) {
output += encodeVarInt(curr.timestamp);
if (curr.timestamp != prev.timestamp) {
output += encodeVarInt(0);
} else {
uint64_t sharedPrefixBytes = 0;
auto currKey = curr.getId(idSize);
auto prevKey = prev.getId(idSize);
for (uint64_t i = 0; i < idSize; i++) {
if (currKey[i] != prevKey[i]) break;
sharedPrefixBytes++;
}
output += encodeVarInt(sharedPrefixBytes + 1);
output += currKey.substr(0, sharedPrefixBytes + 1);
}
}
};