update negentropy

This commit is contained in:
Doug Hoyte
2023-12-06 15:20:42 -05:00
parent 9f2b6dcc66
commit fc43a2260b
3 changed files with 75 additions and 6 deletions

View File

@ -3,6 +3,7 @@
#include <negentropy.h> #include <negentropy.h>
#include <negentropy/storage/Vector.h> #include <negentropy/storage/Vector.h>
#include <negentropy/storage/BTreeLMDB.h> #include <negentropy/storage/BTreeLMDB.h>
#include <negentropy/storage/SubRange.h>
#include "golpe.h" #include "golpe.h"
@ -98,7 +99,11 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
if (isFullDbQuery) { if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);
const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
Negentropy ne(subStorage, frameSizeLimit);
neMsg = ne.initiate(); neMsg = ne.initiate();
} else { } else {
Negentropy ne(storageVector, frameSizeLimit); Negentropy ne(storageVector, frameSizeLimit);
@ -147,7 +152,11 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
if (isFullDbQuery) { if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);
const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
Negentropy ne(subStorage, frameSizeLimit);
ne.setInitiator(); ne.setInitiator();
neMsg = ne.reconcile(inputMsg, have, need); neMsg = ne.reconcile(inputMsg, have, need);
} else { } else {

View File

@ -1,6 +1,7 @@
#include <negentropy.h> #include <negentropy.h>
#include <negentropy/storage/Vector.h> #include <negentropy/storage/Vector.h>
#include <negentropy/storage/BTreeLMDB.h> #include <negentropy/storage/BTreeLMDB.h>
#include <negentropy/storage/SubRange.h>
#include "RelayServer.h" #include "RelayServer.h"
#include "QueryScheduler.h" #include "QueryScheduler.h"
@ -97,7 +98,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
Negentropy ne(storage, 500'000); Negentropy ne(storage, 500'000);
resp = ne.reconcile(msg); resp = ne.reconcile(msg);
} catch (std::exception &e) { } catch (std::exception &e) {
LI << "[" << connId << "] Error parsing negentropy initial message: " << e.what(); LI << "[" << connId << "] Error parsing negentropy message: " << e.what();
sendToConn(connId, tao::json::to_string(tao::json::value::array({ sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR", "NEG-ERR",
@ -189,7 +190,10 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
if (msg->sub.filterGroup.isFullDbQuery()) { if (msg->sub.filterGroup.isFullDbQuery()) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
handleReconcile(connId, subId, storage, msg->negPayload);
const auto &f = msg->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
handleReconcile(connId, subId, subStorage, msg->negPayload);
if (!views.addStatelessView(connId, subId, std::move(msg->sub))) { if (!views.addStatelessView(connId, subId, std::move(msg->sub))) {
queries.removeSub(connId, subId); queries.removeSub(connId, subId);
@ -225,9 +229,13 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
continue; continue;
} }
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload); handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (std::get_if<NegentropyViews::StatelessView>(userView)) { } else if (auto *view = std::get_if<NegentropyViews::StatelessView>(userView)) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
handleReconcile(msg->connId, msg->subId, storage, msg->negPayload);
const auto &f = view->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
handleReconcile(msg->connId, msg->subId, subStorage, msg->negPayload);
} }
} else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) { } else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId); queries.removeSub(msg->connId, msg->subId);

52
test/runSyncTests.pl Normal file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env perl
use strict;
## Full DB sync tests
{
my $f = '{}';
test(qq{ 1 0 0 '$f' });
test(qq{ 0 1 0 '$f' });
test(qq{ 0 0 1 '$f' });
test(qq{ 1 1 1000 '$f' });
}
## Vector DB sync tests
{
my $f = '{"kinds":[1]}';
test(qq{ 1 0 0 '$f' });
test(qq{ 0 1 0 '$f' });
test(qq{ 0 0 1 '$f' });
test(qq{ 1 1 1000 '$f' });
}
## Full DB sync tests with time bounds
{
my $f = '{"since":1652985767,"until":1662969916}';
test(qq{ 1 1 1000 '$f' }, 100000);
test(qq{ 0 0 1 '$f' }, 100000);
$f = '{"since":1652985767}';
test(qq{ 1 1 1100 '$f' }, 100000);
$f = '{"until":1662969916}';
test(qq{ 1 1 1200 '$f' }, 100000);
}
print "All OK\n";
sub test {
my $params = shift;
my $num = shift // 1000;
print "---------------------------\n";
print "TEST: params = $params num = $num\n";
my $redir = $ENV{VERBOSE} ? '' : '2>/dev/null';
my $cmd = qq{ zstdcat ../nostr-dumps/nostr-wellorder-early-500k-v1.jsonl.zst | head -$num | perl test/syncTest.pl $params $redir};
print "CMD: $cmd\n";
system($cmd) && die "failed";
print "\n";
}