upgrade negentropy

This commit is contained in:
Doug Hoyte
2023-12-05 14:27:53 -05:00
parent 8b634939ee
commit ed0ff4fc79
9 changed files with 161 additions and 91 deletions

View File

@ -1,6 +1,8 @@
#include <docopt.h>
#include <tao/json.hpp>
#include <Negentropy.h>
#include <negentropy.h>
#include <negentropy/storage/Vector.h>
#include <negentropy/storage/BTreeLMDB.h>
#include "golpe.h"
@ -40,18 +42,19 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
uint64_t frameSizeLimit = 60'000; // default frame limit is 128k. Halve that (hex encoding) and subtract a bit (JSON msg overhead)
if (args["--frame-size-limit"]) frameSizeLimit = args["--frame-size-limit"].asLong();
const uint64_t idSize = 16;
const bool doUp = dir == "both" || dir == "up";
const bool doDown = dir == "both" || dir == "down";
tao::json::value filter = tao::json::from_string(filterStr);
tao::json::value filterJson = tao::json::from_string(filterStr);
auto filterCompiled = NostrFilterGroup::unwrapped(filterJson);
Negentropy ne(idSize, frameSizeLimit);
bool isFullDbQuery = filterCompiled.isFullDbQuery();
negentropy::storage::Vector storageVector;
{
DBQuery query(filter);
if (!isFullDbQuery) {
DBQuery query(filterJson);
Decompressor decomp;
auto txn = env.txn_ro();
@ -72,14 +75,13 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize));
storageVector.insert(packed.created_at(), packed.id().substr(0, ne.idSize));
}
LI << "Filter matches " << numEvents << " events";
}
ne.seal();
storageVector.seal();
}
@ -91,13 +93,23 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
ws.reconnect = false;
ws.onConnect = [&]{
auto neMsg = to_hex(ne.initiate());
auto txn = env.txn_ro();
std::string neMsg;
if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);
neMsg = ne.initiate();
} else {
Negentropy ne(storageVector, frameSizeLimit);
neMsg = ne.initiate();
}
ws.send(tao::json::to_string(tao::json::value::array({
"NEG-OPEN",
"N",
filter,
idSize,
neMsg,
filterJson,
to_hex(neMsg),
})));
};
@ -122,6 +134,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
ws.onMessage = [&](auto msgStr, uWS::OpCode opCode, size_t compressedSize){
try {
auto txn = env.txn_ro();
tao::json::value msg = tao::json::from_string(msgStr);
if (msg.at(0) == "NEG-MSG") {
@ -130,7 +143,18 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
std::optional<std::string> neMsg;
try {
neMsg = ne.reconcile(from_hex(msg.at(2).get_string()), have, need);
auto inputMsg = from_hex(msg.at(2).get_string());
if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);
ne.setInitiator();
neMsg = ne.reconcile(inputMsg, have, need);
} else {
Negentropy ne(storageVector, frameSizeLimit);
ne.setInitiator();
neMsg = ne.reconcile(inputMsg, have, need);
}
} catch (std::exception &e) {
LE << "Unable to parse negentropy message from relay: " << e.what();
doExit(1);