diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 08a3ecf..a677eb6 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "golpe.h" @@ -98,7 +99,11 @@ void cmd_sync(const std::vector &subArgs) { if (isFullDbQuery) { negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); - Negentropy ne(storageBtree, frameSizeLimit); + + const auto &f = filterCompiled.filters.at(0); + negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); + + Negentropy ne(subStorage, frameSizeLimit); neMsg = ne.initiate(); } else { Negentropy ne(storageVector, frameSizeLimit); @@ -147,7 +152,11 @@ void cmd_sync(const std::vector &subArgs) { if (isFullDbQuery) { negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); - Negentropy ne(storageBtree, frameSizeLimit); + + const auto &f = filterCompiled.filters.at(0); + negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); + + Negentropy ne(subStorage, frameSizeLimit); ne.setInitiator(); neMsg = ne.reconcile(inputMsg, have, need); } else { diff --git a/src/apps/relay/RelayNegentropy.cpp b/src/apps/relay/RelayNegentropy.cpp index 83936d4..23cf2d1 100644 --- a/src/apps/relay/RelayNegentropy.cpp +++ b/src/apps/relay/RelayNegentropy.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include "RelayServer.h" #include "QueryScheduler.h" @@ -97,7 +98,7 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { Negentropy ne(storage, 500'000); resp = ne.reconcile(msg); } catch (std::exception &e) { - LI << "[" << connId << "] Error parsing negentropy initial message: " << e.what(); + LI << "[" << connId << "] Error parsing negentropy message: " << e.what(); sendToConn(connId, tao::json::to_string(tao::json::value::array({ "NEG-ERR", @@ -189,7 +190,10 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { if (msg->sub.filterGroup.isFullDbQuery()) { negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); - handleReconcile(connId, subId, storage, msg->negPayload); + + const auto &f = msg->sub.filterGroup.filters.at(0); + negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); + handleReconcile(connId, subId, subStorage, msg->negPayload); if (!views.addStatelessView(connId, subId, std::move(msg->sub))) { queries.removeSub(connId, subId); @@ -225,9 +229,13 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { continue; } handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload); - } else if (std::get_if(userView)) { + } else if (auto *view = std::get_if(userView)) { negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); - handleReconcile(msg->connId, msg->subId, storage, msg->negPayload); + + const auto &f = view->sub.filterGroup.filters.at(0); + negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); + + handleReconcile(msg->connId, msg->subId, subStorage, msg->negPayload); } } else if (auto msg = std::get_if(&newMsg.msg)) { queries.removeSub(msg->connId, msg->subId); diff --git a/test/runSyncTests.pl b/test/runSyncTests.pl new file mode 100644 index 0000000..5bbb875 --- /dev/null +++ b/test/runSyncTests.pl @@ -0,0 +1,52 @@ +#!/usr/bin/env perl + +use strict; + +## Full DB sync tests +{ + my $f = '{}'; + test(qq{ 1 0 0 '$f' }); + test(qq{ 0 1 0 '$f' }); + test(qq{ 0 0 1 '$f' }); + test(qq{ 1 1 1000 '$f' }); +} + +## Vector DB sync tests +{ + my $f = '{"kinds":[1]}'; + test(qq{ 1 0 0 '$f' }); + test(qq{ 0 1 0 '$f' }); + test(qq{ 0 0 1 '$f' }); + test(qq{ 1 1 1000 '$f' }); +} + +## Full DB sync tests with time bounds +{ + my $f = '{"since":1652985767,"until":1662969916}'; + test(qq{ 1 1 1000 '$f' }, 100000); + test(qq{ 0 0 1 '$f' }, 100000); + + $f = '{"since":1652985767}'; + test(qq{ 1 1 1100 '$f' }, 100000); + + $f = '{"until":1662969916}'; + test(qq{ 1 1 1200 '$f' }, 100000); +} + + +print "All OK\n"; + +sub test { + my $params = shift; + my $num = shift // 1000; + + print "---------------------------\n"; + print "TEST: params = $params num = $num\n"; + + my $redir = $ENV{VERBOSE} ? '' : '2>/dev/null'; + + my $cmd = qq{ zstdcat ../nostr-dumps/nostr-wellorder-early-500k-v1.jsonl.zst | head -$num | perl test/syncTest.pl $params $redir}; + print "CMD: $cmd\n"; + system($cmd) && die "failed"; + print "\n"; +}