diff --git a/src/apps/dbutils/cmd_delete.cpp b/src/apps/dbutils/cmd_delete.cpp index e95a8fb..058f285 100644 --- a/src/apps/dbutils/cmd_delete.cpp +++ b/src/apps/dbutils/cmd_delete.cpp @@ -68,11 +68,13 @@ void cmd_delete(const std::vector &subArgs) { { auto txn = env.txn_rw(); + negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); for (auto levId : levIds) { - deleteEvent(txn, levId); + deleteEvent(txn, levId, negentropyStorage); } + negentropyStorage.flush(); txn.commit(); } } diff --git a/src/apps/relay/RelayCron.cpp b/src/apps/relay/RelayCron.cpp index d78a17b..d15709f 100644 --- a/src/apps/relay/RelayCron.cpp +++ b/src/apps/relay/RelayCron.cpp @@ -107,13 +107,15 @@ void RelayServer::runCron() { if (expiredLevIds.size() > 0) { auto txn = env.txn_rw(); + negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); uint64_t numDeleted = 0; for (auto levId : expiredLevIds) { - if (deleteEvent(txn, levId)) numDeleted++; + if (deleteEvent(txn, levId, negentropyStorage)) numDeleted++; } + negentropyStorage.flush(); txn.commit(); if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")"; diff --git a/src/apps/relay/RelayIngester.cpp b/src/apps/relay/RelayIngester.cpp index 23ba371..ebeb31d 100644 --- a/src/apps/relay/RelayIngester.cpp +++ b/src/apps/relay/RelayIngester.cpp @@ -132,7 +132,7 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &arr) { if (arr.at(0) == "NEG-OPEN") { - if (arr.get_array().size() < 5) throw herr("negentropy query missing elements"); + if (arr.get_array().size() < 4) throw herr("negentropy query missing elements"); NostrFilterGroup filter; auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1; diff --git a/src/apps/relay/RelayNegentropy.cpp b/src/apps/relay/RelayNegentropy.cpp index 3da267c..83936d4 100644 --- a/src/apps/relay/RelayNegentropy.cpp +++ b/src/apps/relay/RelayNegentropy.cpp @@ -42,6 +42,20 @@ struct NegentropyViews { } bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) { + { + auto *existing = findView(connId, subId); + if (existing) removeView(connId, subId); + } + + auto res = conns.try_emplace(connId); + auto &connViews = res.first->second; + + if (connViews.size() >= cfg().relay__maxSubsPerConnection) { + return false; + } + + connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), } }); + return true; } @@ -205,15 +219,16 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { continue; } - auto *view = std::get_if(userView); - if (!view) throw herr("bad variant, expected memory view"); - - if (!view->storageVector.sealed) { - sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete"); - continue; + if (auto *view = std::get_if(userView)) { + if (!view->storageVector.sealed) { + sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete"); + continue; + } + handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload); + } else if (std::get_if(userView)) { + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); + handleReconcile(msg->connId, msg->subId, storage, msg->negPayload); } - - handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload); } else if (auto msg = std::get_if(&newMsg.msg)) { queries.removeSub(msg->connId, msg->subId); views.removeView(msg->connId, msg->subId); diff --git a/src/events.cpp b/src/events.cpp index 34cb516..ee64ee2 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -1,6 +1,5 @@ #include #include -#include #include "events.h" @@ -229,14 +228,12 @@ std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t lev -bool deleteEvent(lmdb::txn &txn, uint64_t levId) { +bool deleteEvent(lmdb::txn &txn, uint64_t levId, negentropy::storage::BTreeLMDB &negentropyStorage) { auto view = env.lookup_Event(txn, levId); if (!view) return false; auto *flat = view->flat_nested(); - negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); negentropyStorage.erase(flat->created_at(), sv(flat->id())); - negentropyStorage.flush(); bool deleted = env.dbi_EventPayload.del(txn, lmdb::to_sv(levId)); env.delete_Event(txn, levId); @@ -340,7 +337,7 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev // Deletions happen after event was written to ensure levIds are not reused - for (auto levId : levIdsToDelete) deleteEvent(txn, levId); + for (auto levId : levIdsToDelete) deleteEvent(txn, levId, negentropyStorage); levIdsToDelete.clear(); } diff --git a/src/events.h b/src/events.h index d05bc71..9191397 100644 --- a/src/events.h +++ b/src/events.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "golpe.h" @@ -109,4 +110,4 @@ struct EventToWrite { void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLevel = 1); -bool deleteEvent(lmdb::txn &txn, uint64_t levId); +bool deleteEvent(lmdb::txn &txn, uint64_t levId, negentropy::storage::BTreeLMDB &negentropyStorage); diff --git a/src/filters.h b/src/filters.h index e555e4c..b849494 100644 --- a/src/filters.h +++ b/src/filters.h @@ -200,7 +200,7 @@ struct NostrFilter { } bool isFullDbQuery() { - return !ids && !authors && !kinds && tags.size() == 0 && limit == MAX_U64; + return !ids && !authors && !kinds && tags.size() == 0; } }; diff --git a/test/cfgs/syncTest1.conf b/test/cfgs/syncTest1.conf new file mode 100644 index 0000000..e587a38 --- /dev/null +++ b/test/cfgs/syncTest1.conf @@ -0,0 +1,11 @@ +db = "./strfry-db-test-1/" + +events { + rejectEventsOlderThanSeconds = 9999999999 +} + +relay { + port = 40551 + + #logging { dumpInAll = true } +} diff --git a/test/cfgs/syncTest2.conf b/test/cfgs/syncTest2.conf new file mode 100644 index 0000000..576f6f1 --- /dev/null +++ b/test/cfgs/syncTest2.conf @@ -0,0 +1,9 @@ +db = "./strfry-db-test-2/" + +events { + rejectEventsOlderThanSeconds = 9999999999 +} + +relay { + port = 40552 +} diff --git a/test/strfry.conf b/test/cfgs/writeTest.conf similarity index 100% rename from test/strfry.conf rename to test/cfgs/writeTest.conf diff --git a/test/syncTest.pl b/test/syncTest.pl new file mode 100755 index 0000000..c2943ce --- /dev/null +++ b/test/syncTest.pl @@ -0,0 +1,86 @@ +#!/usr/bin/env perl + +## zstdcat ../nostr-dumps/nostr-wellorder-early-500k-v1.jsonl.zst | head -100000 | perl test/syncTest.pl 1 1 10000 '{}' + +use strict; + +my $prob1 = shift // 1; +my $prob2 = shift // 1; +my $prob3 = shift // 98; +my $filter = shift // '{}'; + +{ + my $total = $prob1 + $prob2 + $prob3; + die "zero prob" if $total == 0; + $prob1 = $prob1 / $total; + $prob2 = $prob2 / $total; + $prob3 = $prob3 / $total; +} + +srand($ENV{SEED} || 0); +system("mkdir -p strfry-db-test-1 strfry-db-test-2"); +system("rm -f strfry-db-test-1/data.mdb strfry-db-test-2/data.mdb"); + + +my $ids1 = {}; +my $ids2 = {}; + +{ + open(my $infile1, '|-', "./strfry --config test/cfgs/syncTest1.conf import"); + open(my $infile2, '|-', "./strfry --config test/cfgs/syncTest2.conf import"); + + while () { + /"id":"(\w+)"/ || next; + my $id = $1; + + my $modeRnd = rand(); + + if ($modeRnd < $prob1) { + print $infile1 $_; + $ids1->{$id} = 1; + } elsif ($modeRnd < $prob1 + $prob2) { + print $infile2 $_; + $ids2->{$id} = 1; + } else { + print $infile1 $_; + print $infile2 $_; + $ids1->{$id} = 1; + $ids2->{$id} = 1; + } + } +} + + +withRelay(sub { + system("./strfry --config test/cfgs/syncTest2.conf sync ws://127.0.0.1:40551 --dir both --filter '$filter'"); +}); + +my $hash1 = `./strfry --config test/cfgs/syncTest1.conf export | perl test/dumbFilter.pl '$filter' | sort | sha256sum`; +my $hash2 = `./strfry --config test/cfgs/syncTest2.conf export | perl test/dumbFilter.pl '$filter' | sort | sha256sum`; + +die "hashes differ" unless $hash1 eq $hash2; + +print "OK.\n"; + + +sub withRelay { + my $cb = shift; + + my $relayPid = startRelay(); + + $cb->(); + + kill 'KILL', $relayPid; + wait; +} + +sub startRelay { + my $pid = fork(); + + if (!$pid) { + exec("./strfry --config test/cfgs/syncTest1.conf relay") || die "couldn't exec strfry"; + } + + sleep 1; ## FIXME + return $pid; +} diff --git a/test/writeTest.pl b/test/writeTest.pl index 9d7e3be..d5f387d 100644 --- a/test/writeTest.pl +++ b/test/writeTest.pl @@ -250,7 +250,7 @@ sub doTest { my $finalEventIds = []; { - open(my $fh, '-|', './strfry --config test/strfry.conf export 2>/dev/null') || die "$!"; + open(my $fh, '-|', './strfry --config test/cfgs/writeTest.conf export 2>/dev/null') || die "$!"; while(<$fh>) { push @$finalEventIds, decode_json($_)->{id}; } @@ -276,7 +276,7 @@ sub addEvent { my $eventJson = `cat test-eventXYZ.json`; - system(qq{ /dev/null }); + system(qq{ /dev/null }); system(qq{ rm test-eventXYZ.json });