From 29a9d653e795da7330564d3840d7c60e6859d0bb Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Wed, 4 Sep 2024 16:47:23 -0400 Subject: [PATCH] detect and ignore duplicates in negentropy sync --- src/apps/mesh/cmd_sync.cpp | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 76ececa..fadea8d 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -151,6 +151,7 @@ void cmd_sync(const std::vector &subArgs) { uint64_t inFlightUp = 0; bool inFlightDown = false; // bool because we can't count on getting every EVENT we request (might've been deleted mid-query) std::vector have, need; + flat_hash_set seenHave, seenNeed; bool syncDone = false; uint64_t totalHaves = 0, totalNeeds = 0; Decompressor decomp; @@ -168,6 +169,8 @@ void cmd_sync(const std::vector &subArgs) { try { auto inputMsg = from_hex(msg.at(2).get_string()); + std::vector currHave, currNeed; + if (treeId) { negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId); @@ -176,11 +179,23 @@ void cmd_sync(const std::vector &subArgs) { Negentropy ne(subStorage, frameSizeLimit); ne.setInitiator(); - neMsg = ne.reconcile(inputMsg, have, need); + neMsg = ne.reconcile(inputMsg, currHave, currNeed); } else { Negentropy ne(storageVector, frameSizeLimit); ne.setInitiator(); - neMsg = ne.reconcile(inputMsg, have, need); + neMsg = ne.reconcile(inputMsg, currHave, currNeed); + } + + for (auto &id : currHave) { + if (seenHave.contains(id)) continue; + seenHave.insert(id); + have.push_back(std::move(id)); + } + + for (auto &id : currNeed) { + if (seenNeed.contains(id)) continue; + seenNeed.insert(id); + need.push_back(std::move(id)); } } catch (std::exception &e) { LE << "Unable to parse negentropy message from relay: " << e.what();