From eb24824b53d83bcdacd760051b7077388503e54e Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Tue, 5 Dec 2023 14:27:53 -0500 Subject: [PATCH] upgrade negentropy --- external/negentropy | 2 +- src/apps/mesh/cmd_sync.cpp | 54 +++++++--- src/apps/relay/RelayIngester.cpp | 7 +- src/apps/relay/RelayNegentropy.cpp | 155 ++++++++++++++++------------- src/apps/relay/RelayServer.h | 1 - src/events.cpp | 17 ++++ src/filters.h | 8 ++ src/global.h | 2 + src/onAppStartup.cpp | 6 ++ 9 files changed, 161 insertions(+), 91 deletions(-) diff --git a/external/negentropy b/external/negentropy index bdd421c..d12a440 160000 --- a/external/negentropy +++ b/external/negentropy @@ -1 +1 @@ -Subproject commit bdd421c1fbf8ca6d4404597411674f09eb578eb5 +Subproject commit d12a44082f3d77b8417da2d6bcf3508a35c07146 diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index ad61a3f..08a3ecf 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -1,6 +1,8 @@ #include #include -#include +#include +#include +#include #include "golpe.h" @@ -40,18 +42,19 @@ void cmd_sync(const std::vector &subArgs) { uint64_t frameSizeLimit = 60'000; // default frame limit is 128k. Halve that (hex encoding) and subtract a bit (JSON msg overhead) if (args["--frame-size-limit"]) frameSizeLimit = args["--frame-size-limit"].asLong(); - const uint64_t idSize = 16; const bool doUp = dir == "both" || dir == "up"; const bool doDown = dir == "both" || dir == "down"; - tao::json::value filter = tao::json::from_string(filterStr); + tao::json::value filterJson = tao::json::from_string(filterStr); + auto filterCompiled = NostrFilterGroup::unwrapped(filterJson); - Negentropy ne(idSize, frameSizeLimit); + bool isFullDbQuery = filterCompiled.isFullDbQuery(); + negentropy::storage::Vector storageVector; - { - DBQuery query(filter); + if (!isFullDbQuery) { + DBQuery query(filterJson); Decompressor decomp; auto txn = env.txn_ro(); @@ -72,14 +75,13 @@ void cmd_sync(const std::vector &subArgs) { for (auto levId : levIds) { auto ev = lookupEventByLevId(txn, levId); - PackedEventView packed(ev.buf); - ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize)); + storageVector.insert(packed.created_at(), packed.id().substr(0, ne.idSize)); } LI << "Filter matches " << numEvents << " events"; - } - ne.seal(); + storageVector.seal(); + } @@ -91,13 +93,23 @@ void cmd_sync(const std::vector &subArgs) { ws.reconnect = false; ws.onConnect = [&]{ - auto neMsg = to_hex(ne.initiate()); + auto txn = env.txn_ro(); + std::string neMsg; + + if (isFullDbQuery) { + negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); + Negentropy ne(storageBtree, frameSizeLimit); + neMsg = ne.initiate(); + } else { + Negentropy ne(storageVector, frameSizeLimit); + neMsg = ne.initiate(); + } + ws.send(tao::json::to_string(tao::json::value::array({ "NEG-OPEN", "N", - filter, - idSize, - neMsg, + filterJson, + to_hex(neMsg), }))); }; @@ -122,6 +134,7 @@ void cmd_sync(const std::vector &subArgs) { ws.onMessage = [&](auto msgStr, uWS::OpCode opCode, size_t compressedSize){ try { + auto txn = env.txn_ro(); tao::json::value msg = tao::json::from_string(msgStr); if (msg.at(0) == "NEG-MSG") { @@ -130,7 +143,18 @@ void cmd_sync(const std::vector &subArgs) { std::optional neMsg; try { - neMsg = ne.reconcile(from_hex(msg.at(2).get_string()), have, need); + auto inputMsg = from_hex(msg.at(2).get_string()); + + if (isFullDbQuery) { + negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); + Negentropy ne(storageBtree, frameSizeLimit); + ne.setInitiator(); + neMsg = ne.reconcile(inputMsg, have, need); + } else { + Negentropy ne(storageVector, frameSizeLimit); + ne.setInitiator(); + neMsg = ne.reconcile(inputMsg, have, need); + } } catch (std::exception &e) { LE << "Unable to parse negentropy message from relay: " << e.what(); doExit(1); diff --git a/src/apps/relay/RelayIngester.cpp b/src/apps/relay/RelayIngester.cpp index 0021434..23ba371 100644 --- a/src/apps/relay/RelayIngester.cpp +++ b/src/apps/relay/RelayIngester.cpp @@ -168,12 +168,9 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp Subscription sub(connId, arr[1].get_string(), std::move(filter)); - uint64_t idSize = arr.at(3).get_unsigned(); - if (idSize < 8 || idSize > 32) throw herr("idSize out of range"); + std::string negPayload = from_hex(arr.at(3).get_string()); - std::string negPayload = from_hex(arr.at(4).get_string()); - - tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), idSize, std::move(negPayload)}}); + tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(negPayload)}}); } else if (arr.at(0) == "NEG-MSG") { std::string negPayload = from_hex(arr.at(2).get_string()); tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}}); diff --git a/src/apps/relay/RelayNegentropy.cpp b/src/apps/relay/RelayNegentropy.cpp index a62d0ef..3da267c 100644 --- a/src/apps/relay/RelayNegentropy.cpp +++ b/src/apps/relay/RelayNegentropy.cpp @@ -1,21 +1,29 @@ -#include +#include +#include +#include #include "RelayServer.h" #include "QueryScheduler.h" struct NegentropyViews { - struct UserView { - Negentropy ne; + struct MemoryView { std::string initialMsg; + negentropy::storage::Vector storageVector; std::vector levIds; uint64_t startTime = hoytech::curr_time_us(); }; - using ConnViews = flat_hash_map; - flat_hash_map conns; // connId -> subId -> Negentropy + struct StatelessView { + Subscription sub; + }; - bool addView(uint64_t connId, const SubId &subId, uint64_t idSize, const std::string &initialMsg) { + using UserView = std::variant; + + using ConnViews = flat_hash_map; + flat_hash_map conns; // connId -> subId -> UserView + + bool addMemoryView(uint64_t connId, const SubId &subId, const std::string &initialMsg) { { auto *existing = findView(connId, subId); if (existing) removeView(connId, subId); @@ -28,11 +36,15 @@ struct NegentropyViews { return false; } - connViews.try_emplace(subId, UserView{ Negentropy(idSize, 500'000), initialMsg }); + connViews.try_emplace(subId, UserView{ MemoryView{ initialMsg, } }); return true; } + bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) { + return true; + } + UserView *findView(uint64_t connId, const SubId &subId) { auto f1 = conns.find(connId); if (f1 == conns.end()) return nullptr; @@ -63,11 +75,42 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { QueryScheduler queries; NegentropyViews views; + + auto handleReconcile = [&](uint64_t connId, const SubId &subId, negentropy::StorageBase &storage, const std::string &msg) { + std::string resp; + + try { + Negentropy ne(storage, 500'000); + resp = ne.reconcile(msg); + } catch (std::exception &e) { + LI << "[" << connId << "] Error parsing negentropy initial message: " << e.what(); + + sendToConn(connId, tao::json::to_string(tao::json::value::array({ + "NEG-ERR", + subId.str(), + "PROTOCOL-ERROR" + }))); + + views.removeView(connId, subId); + return; + } + + sendToConn(connId, tao::json::to_string(tao::json::value::array({ + "NEG-MSG", + subId.str(), + to_hex(resp) + }))); + }; + + queries.ensureExists = false; queries.onEventBatch = [&](lmdb::txn &txn, const auto &sub, const std::vector &levIds){ - auto *view = views.findView(sub.connId, sub.subId); - if (!view) return; + auto *userView = views.findView(sub.connId, sub.subId); + if (!userView) return; + + auto *view = std::get_if(userView); + if (!view) throw herr("bad variant, expected memory view"); for (auto levId : levIds) { view->levIds.push_back(levId); @@ -75,8 +118,11 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { }; queries.onComplete = [&](lmdb::txn &txn, Subscription &sub){ - auto *view = views.findView(sub.connId, sub.subId); - if (!view) return; + auto *userView = views.findView(sub.connId, sub.subId); + if (!userView) return; + + auto *view = std::get_if(userView); + if (!view) throw herr("bad variant, expected memory view"); LI << "[" << sub.connId << "] Negentropy query matched " << view->levIds.size() << " events in " << (hoytech::curr_time_us() - view->startTime) << "us"; @@ -100,8 +146,7 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { for (auto levId : view->levIds) { try { auto ev = lookupEventByLevId(txn, levId); - auto packed = PackedEventView(ev.buf); - view->ne.addItem(packed.created_at(), packed.id().substr(0, view->ne.idSize)); + view->storageVector.insert(packed.created_at(), packed.id().substr(0, view->ne.idSize)); } catch (std::exception &) { // levId was deleted when query was paused } @@ -110,34 +155,14 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { view->levIds.clear(); view->levIds.shrink_to_fit(); - view->ne.seal(); + view->storageVector.seal(); - std::string resp; - - try { - resp = view->ne.reconcile(view->initialMsg); - } catch (std::exception &e) { - LI << "[" << sub.connId << "] Error parsing negentropy initial message: " << e.what(); - - sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ - "NEG-ERR", - sub.subId.str(), - "PROTOCOL-ERROR" - }))); - - views.removeView(sub.connId, sub.subId); - return; - } + handleReconcile(sub.connId, sub.subId, view->storageVector, view->initialMsg); view->initialMsg = ""; - - sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ - "NEG-MSG", - sub.subId.str(), - to_hex(resp) - }))); }; + while(1) { auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait(); @@ -148,19 +173,29 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { auto connId = msg->sub.connId; auto subId = msg->sub.subId; - if (!queries.addSub(txn, std::move(msg->sub))) { - sendNoticeError(connId, std::string("too many concurrent REQs")); - } + if (msg->sub.filterGroup.isFullDbQuery()) { + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); + handleReconcile(connId, subId, storage, msg->negPayload); - if (!views.addView(connId, subId, msg->idSize, msg->negPayload)) { - queries.removeSub(connId, subId); - sendNoticeError(connId, std::string("too many concurrent NEG requests")); - } + if (!views.addStatelessView(connId, subId, std::move(msg->sub))) { + queries.removeSub(connId, subId); + sendNoticeError(connId, std::string("too many concurrent NEG requests")); + } + } else { + if (!queries.addSub(txn, std::move(msg->sub))) { + sendNoticeError(connId, std::string("too many concurrent REQs")); + } - queries.process(txn); + if (!views.addMemoryView(connId, subId, msg->negPayload)) { + queries.removeSub(connId, subId); + sendNoticeError(connId, std::string("too many concurrent NEG requests")); + } + + queries.process(txn); + } } else if (auto msg = std::get_if(&newMsg.msg)) { - auto *view = views.findView(msg->connId, msg->subId); - if (!view) { + auto *userView = views.findView(msg->connId, msg->subId); + if (!userView) { sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({ "NEG-ERR", msg->subId.str(), @@ -170,33 +205,15 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { continue; } - if (!view->ne.sealed) { + auto *view = std::get_if(userView); + if (!view) throw herr("bad variant, expected memory view"); + + if (!view->storageVector.sealed) { sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete"); continue; } - std::string resp; - - try { - resp = view->ne.reconcile(msg->negPayload); - } catch (std::exception &e) { - LI << "[" << msg->connId << "] Error parsing negentropy continuation message: " << e.what(); - - sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({ - "NEG-ERR", - msg->subId.str(), - "PROTOCOL-ERROR" - }))); - - views.removeView(msg->connId, msg->subId); - continue; - } - - sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({ - "NEG-MSG", - msg->subId.str(), - to_hex(resp) - }))); + handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload); } else if (auto msg = std::get_if(&newMsg.msg)) { queries.removeSub(msg->connId, msg->subId); views.removeView(msg->connId, msg->subId); diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index cc57c69..de9b51e 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -122,7 +122,6 @@ struct MsgReqMonitor : NonCopyable { struct MsgNegentropy : NonCopyable { struct NegOpen { Subscription sub; - uint64_t idSize; std::string negPayload; }; diff --git a/src/events.cpp b/src/events.cpp index 3ab8267..34cb516 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include "events.h" @@ -228,8 +230,17 @@ std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t lev bool deleteEvent(lmdb::txn &txn, uint64_t levId) { + auto view = env.lookup_Event(txn, levId); + if (!view) return false; + auto *flat = view->flat_nested(); + + negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); + negentropyStorage.erase(flat->created_at(), sv(flat->id())); + negentropyStorage.flush(); + bool deleted = env.dbi_EventPayload.del(txn, lmdb::to_sv(levId)); env.delete_Event(txn, levId); + return deleted; } @@ -246,6 +257,8 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev std::vector levIdsToDelete; std::string tmpBuf; + negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); + for (size_t i = 0; i < evs.size(); i++) { auto &ev = evs[i]; @@ -321,6 +334,8 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev tmpBuf += ev.jsonStr; env.dbi_EventPayload.put(txn, lmdb::to_sv(ev.levId), tmpBuf); + negentropyStorage.insert(ev.createdAt(), ev.id()); + ev.status = EventWriteStatus::Written; // Deletions happen after event was written to ensure levIds are not reused @@ -331,4 +346,6 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev if (levIdsToDelete.size()) throw herr("unprocessed deletion"); } + + negentropyStorage.flush(); } diff --git a/src/filters.h b/src/filters.h index 5c80ac9..e555e4c 100644 --- a/src/filters.h +++ b/src/filters.h @@ -198,6 +198,10 @@ struct NostrFilter { return true; } + + bool isFullDbQuery() { + return !ids && !authors && !kinds && tags.size() == 0 && limit == MAX_U64; + } }; struct NostrFilterGroup { @@ -242,4 +246,8 @@ struct NostrFilterGroup { size_t size() const { return filters.size(); } + + bool isFullDbQuery() { + return size() == 1 && filters[0].isFullDbQuery(); + } }; diff --git a/src/global.h b/src/global.h index e023f51..7aefbf4 100644 --- a/src/global.h +++ b/src/global.h @@ -16,3 +16,5 @@ uint64_t parseUint64(const std::string &s); std::string parseIP(const std::string &ip); uint64_t getDBVersion(lmdb::txn &txn); void exitOnSigPipe(); + +extern lmdb::dbi negentropyDbi; diff --git a/src/onAppStartup.cpp b/src/onAppStartup.cpp index 241da57..0a522ae 100644 --- a/src/onAppStartup.cpp +++ b/src/onAppStartup.cpp @@ -5,6 +5,8 @@ #include "golpe.h" +#include + static void dbCheck(lmdb::txn &txn, const std::string &cmd) { auto dbTooOld = [&](uint64_t ver) { @@ -65,8 +67,12 @@ static void setRLimits() { } +lmdb::dbi negentropyDbi; + void onAppStartup(lmdb::txn &txn, const std::string &cmd) { dbCheck(txn, cmd); setRLimits(); + + negentropyDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "negentropy"); }