support multiple concurrent negentropy trees

This commit is contained in:
Doug Hoyte
2024-09-04 14:15:16 -04:00
parent 09b26a2297
commit e181627842
14 changed files with 371 additions and 138 deletions

View File

@ -68,13 +68,10 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
{
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
NegentropyFilterCache neFilterCache;
for (auto levId : levIds) {
deleteEvent(txn, levId, negentropyStorage);
}
deleteEvents(txn, neFilterCache, levIds);
negentropyStorage.flush();
txn.commit();
}
}

View File

@ -0,0 +1,122 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "NegentropyFilterCache.h"
#include "events.h"
#include "DBQuery.h"
static const char USAGE[] =
R"(
Usage:
negentropy list
negentropy add <filter>
negentropy build <treeId>
)";
static void increaseModCounter(lmdb::txn &txn) {
auto m = env.lookup_Meta(txn, 1);
if (!m) throw herr("no Meta entry?");
env.update_Meta(txn, *m, { .negentropyModificationCounter = m->negentropyModificationCounter() + 1 });
}
void cmd_negentropy(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
if (args["list"].asBool()) {
auto txn = env.txn_ro();
env.foreach_NegentropyFilter(txn, [&](auto &f){
auto treeId = f.primaryKeyId;
std::cout << "tree " << treeId << "\n";
std::cout << " filter: " << f.filter() << "\n";
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, treeId);
auto size = storage.size();
std::cout << " size: " << size << "\n";
std::cout << " fingerprint: " << to_hex(storage.fingerprint(0, size).sv()) << "\n";
return true;
});
} else if (args["add"].asBool()) {
std::string filterStr = args["<filter>"].asString();
tao::json::value filterJson = tao::json::from_string(filterStr);
auto compiledFilter = NostrFilterGroup::unwrapped(filterJson);
if (compiledFilter.filters.size() == 1 && (compiledFilter.filters[0].since != 0 || compiledFilter.filters[0].until != MAX_U64)) {
throw herr("single filters should not have since/until");
}
if (compiledFilter.filters.size() == 0) throw herr("filter will never match");
filterStr = tao::json::to_string(filterJson); // make canonical
auto txn = env.txn_rw();
increaseModCounter(txn);
env.foreach_NegentropyFilter(txn, [&](auto &f){
if (f.filter() == filterStr) throw herr("filter already exists as tree: ", f.primaryKeyId);
return true;
});
uint64_t treeId = env.insert_NegentropyFilter(txn, filterStr);
txn.commit();
std::cout << "created tree " << treeId << "\n";
std::cout << " to populate, run: strfry negentropy build " << treeId << "\n";
} else if (args["build"].asBool()) {
uint64_t treeId = args["<treeId>"].asLong();
struct Record {
uint64_t created_at;
uint8_t id[32];
};
std::vector<Record> recs;
auto txn = env.txn_rw(); // FIXME: split this into a read-only phase followed by a write
increaseModCounter(txn);
// Get filter
std::string filterStr;
{
auto view = env.lookup_NegentropyFilter(txn, treeId);
if (!view) throw herr("couldn't find treeId: ", treeId);
filterStr = view->filter();
}
// Query all matching events
DBQuery query(tao::json::from_string(filterStr));
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
auto ev = lookupEventByLevId(txn, levId);
auto packed = PackedEventView(ev.buf);
recs.push_back({ packed.created_at(), });
memcpy(recs.back().id, packed.id().data(), 32);
});
if (complete) break;
}
// Store events in negentropy tree
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, treeId);
for (const auto &r : recs) {
storage.insert(r.created_at, std::string_view((char*)r.id, 32));
}
storage.flush();
txn.commit();
}
}

View File

@ -50,39 +50,56 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
tao::json::value filterJson = tao::json::from_string(filterStr);
auto filterCompiled = NostrFilterGroup::unwrapped(filterJson);
bool isFullDbQuery = filterCompiled.isFullDbQuery();
std::optional<uint64_t> treeId;
negentropy::storage::Vector storageVector;
if (!isFullDbQuery) {
DBQuery query(filterJson);
Decompressor decomp;
{
auto txn = env.txn_ro();
uint64_t numEvents = 0;
std::vector<uint64_t> levIds;
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
numEvents++;
});
if (complete) break;
auto filterJsonNoTimes = filterJson;
if (filterJsonNoTimes.is_object()) {
filterJsonNoTimes.get_object().erase("since");
filterJsonNoTimes.get_object().erase("until");
}
auto filterJsonNoTimesStr = tao::json::to_string(filterJsonNoTimes);
std::sort(levIds.begin(), levIds.end());
env.foreach_NegentropyFilter(txn, [&](auto &f){
if (f.filter() == filterJsonNoTimesStr) {
treeId = f.primaryKeyId;
return false;
}
return true;
});
for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
storageVector.insert(packed.created_at(), packed.id());
if (!treeId) {
DBQuery query(filterJson);
Decompressor decomp;
uint64_t numEvents = 0;
std::vector<uint64_t> levIds;
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
numEvents++;
});
if (complete) break;
}
std::sort(levIds.begin(), levIds.end());
for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
storageVector.insert(packed.created_at(), packed.id());
}
LI << "Filter matches " << numEvents << " events";
storageVector.seal();
}
LI << "Filter matches " << numEvents << " events";
storageVector.seal();
}
@ -98,8 +115,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
std::string neMsg;
if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
if (treeId) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId);
const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
@ -151,8 +168,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
try {
auto inputMsg = from_hex(msg.at(2).get_string());
if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
if (treeId) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId);
const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));

View File

@ -50,15 +50,10 @@ void RelayServer::runCron() {
if (expiredLevIds.size() > 0) {
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
NegentropyFilterCache neFilterCache;
uint64_t numDeleted = 0;
uint64_t numDeleted = deleteEvents(txn, neFilterCache, expiredLevIds);
for (auto levId : expiredLevIds) {
if (deleteEvent(txn, levId, negentropyStorage)) numDeleted++;
}
negentropyStorage.flush();
txn.commit();
if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")";

View File

@ -134,15 +134,22 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 4) throw herr("negentropy query missing elements");
NostrFilterGroup filter;
auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1;
filter = std::move(NostrFilterGroup::unwrapped(arr.at(2), maxFilterLimit));
auto filterJson = arr.at(2);
NostrFilterGroup filter = NostrFilterGroup::unwrapped(filterJson, maxFilterLimit);
Subscription sub(connId, arr[1].get_string(), std::move(filter));
if (filterJson.is_object()) {
filterJson.get_object().erase("since");
filterJson.get_object().erase("until");
}
std::string filterStr = tao::json::to_string(filterJson);
std::string negPayload = from_hex(arr.at(3).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(negPayload)}});
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(filterStr), std::move(negPayload)}});
} else if (arr.at(0) == "NEG-MSG") {
std::string negPayload = from_hex(arr.at(2).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}});

View File

@ -17,6 +17,7 @@ struct NegentropyViews {
struct StatelessView {
Subscription sub;
uint64_t treeId;
};
using UserView = std::variant<MemoryView, StatelessView>;
@ -42,7 +43,7 @@ struct NegentropyViews {
return true;
}
bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) {
bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub, uint64_t treeId) {
{
auto *existing = findView(connId, subId);
if (existing) removeView(connId, subId);
@ -55,7 +56,7 @@ struct NegentropyViews {
return false;
}
connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), } });
connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), treeId, } });
return true;
}
@ -188,15 +189,24 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
if (auto msg = std::get_if<MsgNegentropy::NegOpen>(&newMsg.msg)) {
auto connId = msg->sub.connId;
auto subId = msg->sub.subId;
std::optional<uint64_t> treeId;
if (msg->sub.filterGroup.isFullDbQuery()) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
env.foreach_NegentropyFilter(txn, [&](auto &f){
if (f.filter() == msg->filterStr) {
treeId = f.primaryKeyId;
return false;
}
return true;
});
if (treeId) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, *treeId);
const auto &f = msg->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
handleReconcile(connId, subId, subStorage, msg->negPayload);
if (!views.addStatelessView(connId, subId, std::move(msg->sub))) {
if (!views.addStatelessView(connId, subId, std::move(msg->sub), *treeId)) {
queries.removeSub(connId, subId);
sendNoticeError(connId, std::string("too many concurrent NEG requests"));
}
@ -231,7 +241,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
}
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (auto *view = std::get_if<NegentropyViews::StatelessView>(userView)) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, view->treeId);
const auto &f = view->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));

View File

@ -122,6 +122,7 @@ struct MsgReqMonitor : NonCopyable {
struct MsgNegentropy : NonCopyable {
struct NegOpen {
Subscription sub;
std::string filterStr;
std::string negPayload;
};

View File

@ -5,6 +5,7 @@
void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
PluginEventSifter writePolicyPlugin;
NegentropyFilterCache neFilterCache;
while(1) {
auto newMsgs = thr.inbox.pop_all();
@ -61,7 +62,7 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
try {
auto txn = env.txn_rw();
writeEvents(txn, newEvents);
writeEvents(txn, neFilterCache, newEvents);
txn.commit();
} catch (std::exception &e) {
LE << "Error writing " << newEvents.size() << " events: " << e.what();