update negentropy

This commit is contained in:
Doug Hoyte
2023-12-05 18:17:58 -05:00
parent ed0ff4fc79
commit 2262d2ac54
12 changed files with 143 additions and 20 deletions

View File

@ -68,11 +68,13 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
{
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
for (auto levId : levIds) {
deleteEvent(txn, levId);
deleteEvent(txn, levId, negentropyStorage);
}
negentropyStorage.flush();
txn.commit();
}
}

View File

@ -107,13 +107,15 @@ void RelayServer::runCron() {
if (expiredLevIds.size() > 0) {
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
uint64_t numDeleted = 0;
for (auto levId : expiredLevIds) {
if (deleteEvent(txn, levId)) numDeleted++;
if (deleteEvent(txn, levId, negentropyStorage)) numDeleted++;
}
negentropyStorage.flush();
txn.commit();
if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")";

View File

@ -132,7 +132,7 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta
void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &arr) {
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 5) throw herr("negentropy query missing elements");
if (arr.get_array().size() < 4) throw herr("negentropy query missing elements");
NostrFilterGroup filter;
auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1;

View File

@ -42,6 +42,20 @@ struct NegentropyViews {
}
bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) {
{
auto *existing = findView(connId, subId);
if (existing) removeView(connId, subId);
}
auto res = conns.try_emplace(connId);
auto &connViews = res.first->second;
if (connViews.size() >= cfg().relay__maxSubsPerConnection) {
return false;
}
connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), } });
return true;
}
@ -205,15 +219,16 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
continue;
}
auto *view = std::get_if<NegentropyViews::MemoryView>(userView);
if (!view) throw herr("bad variant, expected memory view");
if (!view->storageVector.sealed) {
sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete");
continue;
if (auto *view = std::get_if<NegentropyViews::MemoryView>(userView)) {
if (!view->storageVector.sealed) {
sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete");
continue;
}
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (std::get_if<NegentropyViews::StatelessView>(userView)) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
handleReconcile(msg->connId, msg->subId, storage, msg->negPayload);
}
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
views.removeView(msg->connId, msg->subId);

View File

@ -1,6 +1,5 @@
#include <openssl/sha.h>
#include <negentropy.h>
#include <negentropy/storage/BTreeLMDB.h>
#include "events.h"
@ -229,14 +228,12 @@ std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t lev
bool deleteEvent(lmdb::txn &txn, uint64_t levId) {
bool deleteEvent(lmdb::txn &txn, uint64_t levId, negentropy::storage::BTreeLMDB &negentropyStorage) {
auto view = env.lookup_Event(txn, levId);
if (!view) return false;
auto *flat = view->flat_nested();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
negentropyStorage.erase(flat->created_at(), sv(flat->id()));
negentropyStorage.flush();
bool deleted = env.dbi_EventPayload.del(txn, lmdb::to_sv<uint64_t>(levId));
env.delete_Event(txn, levId);
@ -340,7 +337,7 @@ void writeEvents(lmdb::txn &txn, std::vector<EventToWrite> &evs, uint64_t logLev
// Deletions happen after event was written to ensure levIds are not reused
for (auto levId : levIdsToDelete) deleteEvent(txn, levId);
for (auto levId : levIdsToDelete) deleteEvent(txn, levId, negentropyStorage);
levIdsToDelete.clear();
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <secp256k1_schnorrsig.h>
#include <negentropy/storage/BTreeLMDB.h>
#include "golpe.h"
@ -109,4 +110,4 @@ struct EventToWrite {
void writeEvents(lmdb::txn &txn, std::vector<EventToWrite> &evs, uint64_t logLevel = 1);
bool deleteEvent(lmdb::txn &txn, uint64_t levId);
bool deleteEvent(lmdb::txn &txn, uint64_t levId, negentropy::storage::BTreeLMDB &negentropyStorage);

View File

@ -200,7 +200,7 @@ struct NostrFilter {
}
bool isFullDbQuery() {
return !ids && !authors && !kinds && tags.size() == 0 && limit == MAX_U64;
return !ids && !authors && !kinds && tags.size() == 0;
}
};

11
test/cfgs/syncTest1.conf Normal file
View File

@ -0,0 +1,11 @@
db = "./strfry-db-test-1/"
events {
rejectEventsOlderThanSeconds = 9999999999
}
relay {
port = 40551
#logging { dumpInAll = true }
}

9
test/cfgs/syncTest2.conf Normal file
View File

@ -0,0 +1,9 @@
db = "./strfry-db-test-2/"
events {
rejectEventsOlderThanSeconds = 9999999999
}
relay {
port = 40552
}

86
test/syncTest.pl Executable file
View File

@ -0,0 +1,86 @@
#!/usr/bin/env perl
## zstdcat ../nostr-dumps/nostr-wellorder-early-500k-v1.jsonl.zst | head -100000 | perl test/syncTest.pl 1 1 10000 '{}'
use strict;
my $prob1 = shift // 1;
my $prob2 = shift // 1;
my $prob3 = shift // 98;
my $filter = shift // '{}';
{
my $total = $prob1 + $prob2 + $prob3;
die "zero prob" if $total == 0;
$prob1 = $prob1 / $total;
$prob2 = $prob2 / $total;
$prob3 = $prob3 / $total;
}
srand($ENV{SEED} || 0);
system("mkdir -p strfry-db-test-1 strfry-db-test-2");
system("rm -f strfry-db-test-1/data.mdb strfry-db-test-2/data.mdb");
my $ids1 = {};
my $ids2 = {};
{
open(my $infile1, '|-', "./strfry --config test/cfgs/syncTest1.conf import");
open(my $infile2, '|-', "./strfry --config test/cfgs/syncTest2.conf import");
while (<STDIN>) {
/"id":"(\w+)"/ || next;
my $id = $1;
my $modeRnd = rand();
if ($modeRnd < $prob1) {
print $infile1 $_;
$ids1->{$id} = 1;
} elsif ($modeRnd < $prob1 + $prob2) {
print $infile2 $_;
$ids2->{$id} = 1;
} else {
print $infile1 $_;
print $infile2 $_;
$ids1->{$id} = 1;
$ids2->{$id} = 1;
}
}
}
withRelay(sub {
system("./strfry --config test/cfgs/syncTest2.conf sync ws://127.0.0.1:40551 --dir both --filter '$filter'");
});
my $hash1 = `./strfry --config test/cfgs/syncTest1.conf export | perl test/dumbFilter.pl '$filter' | sort | sha256sum`;
my $hash2 = `./strfry --config test/cfgs/syncTest2.conf export | perl test/dumbFilter.pl '$filter' | sort | sha256sum`;
die "hashes differ" unless $hash1 eq $hash2;
print "OK.\n";
sub withRelay {
my $cb = shift;
my $relayPid = startRelay();
$cb->();
kill 'KILL', $relayPid;
wait;
}
sub startRelay {
my $pid = fork();
if (!$pid) {
exec("./strfry --config test/cfgs/syncTest1.conf relay") || die "couldn't exec strfry";
}
sleep 1; ## FIXME
return $pid;
}

View File

@ -250,7 +250,7 @@ sub doTest {
my $finalEventIds = [];
{
open(my $fh, '-|', './strfry --config test/strfry.conf export 2>/dev/null') || die "$!";
open(my $fh, '-|', './strfry --config test/cfgs/writeTest.conf export 2>/dev/null') || die "$!";
while(<$fh>) {
push @$finalEventIds, decode_json($_)->{id};
}
@ -276,7 +276,7 @@ sub addEvent {
my $eventJson = `cat test-eventXYZ.json`;
system(qq{ <test-eventXYZ.json ./strfry --config test/strfry.conf import 2>/dev/null });
system(qq{ <test-eventXYZ.json ./strfry --config test/cfgs/writeTest.conf import 2>/dev/null });
system(qq{ rm test-eventXYZ.json });