upgrade negentropy

This commit is contained in:
Doug Hoyte
2023-12-05 14:27:53 -05:00
parent 684d083c6b
commit eb24824b53
9 changed files with 161 additions and 91 deletions

View File

@ -1,6 +1,8 @@
#include <docopt.h>
#include <tao/json.hpp>
#include <Negentropy.h>
#include <negentropy.h>
#include <negentropy/storage/Vector.h>
#include <negentropy/storage/BTreeLMDB.h>
#include "golpe.h"
@ -40,18 +42,19 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
uint64_t frameSizeLimit = 60'000; // default frame limit is 128k. Halve that (hex encoding) and subtract a bit (JSON msg overhead)
if (args["--frame-size-limit"]) frameSizeLimit = args["--frame-size-limit"].asLong();
const uint64_t idSize = 16;
const bool doUp = dir == "both" || dir == "up";
const bool doDown = dir == "both" || dir == "down";
tao::json::value filter = tao::json::from_string(filterStr);
tao::json::value filterJson = tao::json::from_string(filterStr);
auto filterCompiled = NostrFilterGroup::unwrapped(filterJson);
Negentropy ne(idSize, frameSizeLimit);
bool isFullDbQuery = filterCompiled.isFullDbQuery();
negentropy::storage::Vector storageVector;
{
DBQuery query(filter);
if (!isFullDbQuery) {
DBQuery query(filterJson);
Decompressor decomp;
auto txn = env.txn_ro();
@ -72,14 +75,13 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize));
storageVector.insert(packed.created_at(), packed.id().substr(0, ne.idSize));
}
LI << "Filter matches " << numEvents << " events";
}
ne.seal();
storageVector.seal();
}
@ -91,13 +93,23 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
ws.reconnect = false;
ws.onConnect = [&]{
auto neMsg = to_hex(ne.initiate());
auto txn = env.txn_ro();
std::string neMsg;
if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);
neMsg = ne.initiate();
} else {
Negentropy ne(storageVector, frameSizeLimit);
neMsg = ne.initiate();
}
ws.send(tao::json::to_string(tao::json::value::array({
"NEG-OPEN",
"N",
filter,
idSize,
neMsg,
filterJson,
to_hex(neMsg),
})));
};
@ -122,6 +134,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
ws.onMessage = [&](auto msgStr, uWS::OpCode opCode, size_t compressedSize){
try {
auto txn = env.txn_ro();
tao::json::value msg = tao::json::from_string(msgStr);
if (msg.at(0) == "NEG-MSG") {
@ -130,7 +143,18 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
std::optional<std::string> neMsg;
try {
neMsg = ne.reconcile(from_hex(msg.at(2).get_string()), have, need);
auto inputMsg = from_hex(msg.at(2).get_string());
if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);
ne.setInitiator();
neMsg = ne.reconcile(inputMsg, have, need);
} else {
Negentropy ne(storageVector, frameSizeLimit);
ne.setInitiator();
neMsg = ne.reconcile(inputMsg, have, need);
}
} catch (std::exception &e) {
LE << "Unable to parse negentropy message from relay: " << e.what();
doExit(1);

View File

@ -168,12 +168,9 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp
Subscription sub(connId, arr[1].get_string(), std::move(filter));
uint64_t idSize = arr.at(3).get_unsigned();
if (idSize < 8 || idSize > 32) throw herr("idSize out of range");
std::string negPayload = from_hex(arr.at(3).get_string());
std::string negPayload = from_hex(arr.at(4).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), idSize, std::move(negPayload)}});
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(negPayload)}});
} else if (arr.at(0) == "NEG-MSG") {
std::string negPayload = from_hex(arr.at(2).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}});

View File

@ -1,21 +1,29 @@
#include <Negentropy.h>
#include <negentropy.h>
#include <negentropy/storage/Vector.h>
#include <negentropy/storage/BTreeLMDB.h>
#include "RelayServer.h"
#include "QueryScheduler.h"
struct NegentropyViews {
struct UserView {
Negentropy ne;
struct MemoryView {
std::string initialMsg;
negentropy::storage::Vector storageVector;
std::vector<uint64_t> levIds;
uint64_t startTime = hoytech::curr_time_us();
};
using ConnViews = flat_hash_map<SubId, UserView>;
flat_hash_map<uint64_t, ConnViews> conns; // connId -> subId -> Negentropy
struct StatelessView {
Subscription sub;
};
bool addView(uint64_t connId, const SubId &subId, uint64_t idSize, const std::string &initialMsg) {
using UserView = std::variant<MemoryView, StatelessView>;
using ConnViews = flat_hash_map<SubId, UserView>;
flat_hash_map<uint64_t, ConnViews> conns; // connId -> subId -> UserView
bool addMemoryView(uint64_t connId, const SubId &subId, const std::string &initialMsg) {
{
auto *existing = findView(connId, subId);
if (existing) removeView(connId, subId);
@ -28,11 +36,15 @@ struct NegentropyViews {
return false;
}
connViews.try_emplace(subId, UserView{ Negentropy(idSize, 500'000), initialMsg });
connViews.try_emplace(subId, UserView{ MemoryView{ initialMsg, } });
return true;
}
bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) {
return true;
}
UserView *findView(uint64_t connId, const SubId &subId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return nullptr;
@ -63,11 +75,42 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
QueryScheduler queries;
NegentropyViews views;
auto handleReconcile = [&](uint64_t connId, const SubId &subId, negentropy::StorageBase &storage, const std::string &msg) {
std::string resp;
try {
Negentropy ne(storage, 500'000);
resp = ne.reconcile(msg);
} catch (std::exception &e) {
LI << "[" << connId << "] Error parsing negentropy initial message: " << e.what();
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
subId.str(),
"PROTOCOL-ERROR"
})));
views.removeView(connId, subId);
return;
}
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
subId.str(),
to_hex(resp)
})));
};
queries.ensureExists = false;
queries.onEventBatch = [&](lmdb::txn &txn, const auto &sub, const std::vector<uint64_t> &levIds){
auto *view = views.findView(sub.connId, sub.subId);
if (!view) return;
auto *userView = views.findView(sub.connId, sub.subId);
if (!userView) return;
auto *view = std::get_if<NegentropyViews::MemoryView>(userView);
if (!view) throw herr("bad variant, expected memory view");
for (auto levId : levIds) {
view->levIds.push_back(levId);
@ -75,8 +118,11 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
};
queries.onComplete = [&](lmdb::txn &txn, Subscription &sub){
auto *view = views.findView(sub.connId, sub.subId);
if (!view) return;
auto *userView = views.findView(sub.connId, sub.subId);
if (!userView) return;
auto *view = std::get_if<NegentropyViews::MemoryView>(userView);
if (!view) throw herr("bad variant, expected memory view");
LI << "[" << sub.connId << "] Negentropy query matched " << view->levIds.size() << " events in "
<< (hoytech::curr_time_us() - view->startTime) << "us";
@ -100,8 +146,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
for (auto levId : view->levIds) {
try {
auto ev = lookupEventByLevId(txn, levId);
auto packed = PackedEventView(ev.buf);
view->ne.addItem(packed.created_at(), packed.id().substr(0, view->ne.idSize));
view->storageVector.insert(packed.created_at(), packed.id().substr(0, view->ne.idSize));
} catch (std::exception &) {
// levId was deleted when query was paused
}
@ -110,34 +155,14 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
view->levIds.clear();
view->levIds.shrink_to_fit();
view->ne.seal();
view->storageVector.seal();
std::string resp;
try {
resp = view->ne.reconcile(view->initialMsg);
} catch (std::exception &e) {
LI << "[" << sub.connId << "] Error parsing negentropy initial message: " << e.what();
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
sub.subId.str(),
"PROTOCOL-ERROR"
})));
views.removeView(sub.connId, sub.subId);
return;
}
handleReconcile(sub.connId, sub.subId, view->storageVector, view->initialMsg);
view->initialMsg = "";
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
sub.subId.str(),
to_hex(resp)
})));
};
while(1) {
auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait();
@ -148,19 +173,29 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
auto connId = msg->sub.connId;
auto subId = msg->sub.subId;
if (msg->sub.filterGroup.isFullDbQuery()) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
handleReconcile(connId, subId, storage, msg->negPayload);
if (!views.addStatelessView(connId, subId, std::move(msg->sub))) {
queries.removeSub(connId, subId);
sendNoticeError(connId, std::string("too many concurrent NEG requests"));
}
} else {
if (!queries.addSub(txn, std::move(msg->sub))) {
sendNoticeError(connId, std::string("too many concurrent REQs"));
}
if (!views.addView(connId, subId, msg->idSize, msg->negPayload)) {
if (!views.addMemoryView(connId, subId, msg->negPayload)) {
queries.removeSub(connId, subId);
sendNoticeError(connId, std::string("too many concurrent NEG requests"));
}
queries.process(txn);
}
} else if (auto msg = std::get_if<MsgNegentropy::NegMsg>(&newMsg.msg)) {
auto *view = views.findView(msg->connId, msg->subId);
if (!view) {
auto *userView = views.findView(msg->connId, msg->subId);
if (!userView) {
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
msg->subId.str(),
@ -170,33 +205,15 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
continue;
}
if (!view->ne.sealed) {
auto *view = std::get_if<NegentropyViews::MemoryView>(userView);
if (!view) throw herr("bad variant, expected memory view");
if (!view->storageVector.sealed) {
sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete");
continue;
}
std::string resp;
try {
resp = view->ne.reconcile(msg->negPayload);
} catch (std::exception &e) {
LI << "[" << msg->connId << "] Error parsing negentropy continuation message: " << e.what();
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
msg->subId.str(),
"PROTOCOL-ERROR"
})));
views.removeView(msg->connId, msg->subId);
continue;
}
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
msg->subId.str(),
to_hex(resp)
})));
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
views.removeView(msg->connId, msg->subId);

View File

@ -122,7 +122,6 @@ struct MsgReqMonitor : NonCopyable {
struct MsgNegentropy : NonCopyable {
struct NegOpen {
Subscription sub;
uint64_t idSize;
std::string negPayload;
};

View File

@ -1,4 +1,6 @@
#include <openssl/sha.h>
#include <negentropy.h>
#include <negentropy/storage/BTreeLMDB.h>
#include "events.h"
@ -228,8 +230,17 @@ std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t lev
bool deleteEvent(lmdb::txn &txn, uint64_t levId) {
auto view = env.lookup_Event(txn, levId);
if (!view) return false;
auto *flat = view->flat_nested();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
negentropyStorage.erase(flat->created_at(), sv(flat->id()));
negentropyStorage.flush();
bool deleted = env.dbi_EventPayload.del(txn, lmdb::to_sv<uint64_t>(levId));
env.delete_Event(txn, levId);
return deleted;
}
@ -246,6 +257,8 @@ void writeEvents(lmdb::txn &txn, std::vector<EventToWrite> &evs, uint64_t logLev
std::vector<uint64_t> levIdsToDelete;
std::string tmpBuf;
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
for (size_t i = 0; i < evs.size(); i++) {
auto &ev = evs[i];
@ -321,6 +334,8 @@ void writeEvents(lmdb::txn &txn, std::vector<EventToWrite> &evs, uint64_t logLev
tmpBuf += ev.jsonStr;
env.dbi_EventPayload.put(txn, lmdb::to_sv<uint64_t>(ev.levId), tmpBuf);
negentropyStorage.insert(ev.createdAt(), ev.id());
ev.status = EventWriteStatus::Written;
// Deletions happen after event was written to ensure levIds are not reused
@ -331,4 +346,6 @@ void writeEvents(lmdb::txn &txn, std::vector<EventToWrite> &evs, uint64_t logLev
if (levIdsToDelete.size()) throw herr("unprocessed deletion");
}
negentropyStorage.flush();
}

View File

@ -198,6 +198,10 @@ struct NostrFilter {
return true;
}
bool isFullDbQuery() {
return !ids && !authors && !kinds && tags.size() == 0 && limit == MAX_U64;
}
};
struct NostrFilterGroup {
@ -242,4 +246,8 @@ struct NostrFilterGroup {
size_t size() const {
return filters.size();
}
bool isFullDbQuery() {
return size() == 1 && filters[0].isFullDbQuery();
}
};

View File

@ -16,3 +16,5 @@ uint64_t parseUint64(const std::string &s);
std::string parseIP(const std::string &ip);
uint64_t getDBVersion(lmdb::txn &txn);
void exitOnSigPipe();
extern lmdb::dbi negentropyDbi;

View File

@ -5,6 +5,8 @@
#include "golpe.h"
#include <negentropy/storage/BTreeLMDB.h>
static void dbCheck(lmdb::txn &txn, const std::string &cmd) {
auto dbTooOld = [&](uint64_t ver) {
@ -65,8 +67,12 @@ static void setRLimits() {
}
lmdb::dbi negentropyDbi;
void onAppStartup(lmdb::txn &txn, const std::string &cmd) {
dbCheck(txn, cmd);
setRLimits();
negentropyDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "negentropy");
}