negentropy integration

This commit is contained in:
Doug Hoyte
2023-05-01 17:51:14 -04:00
parent 371f95bce3
commit de475c59ff
10 changed files with 310 additions and 2 deletions

View File

@ -5,11 +5,13 @@
struct QueryScheduler : NonCopyable {
std::function<void(lmdb::txn &txn, const Subscription &sub, uint64_t levId, std::string_view eventPayload)> onEvent;
std::function<void(lmdb::txn &txn, const Subscription &sub, const std::vector<uint64_t> &levIds)> onEventBatch;
std::function<void(Subscription &sub)> onComplete;
using ConnQueries = flat_hash_map<SubId, DBQuery*>;
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBQuery*
std::deque<DBQuery*> running;
std::vector<uint64_t> levIdBatch;
bool addSub(lmdb::txn &txn, Subscription &&sub) {
sub.latestEventId = getMostRecentLevId(txn);
@ -73,14 +75,20 @@ struct QueryScheduler : NonCopyable {
}
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
onEvent(txn, sub, levId, eventPayload);
if (onEvent) onEvent(txn, sub, levId, eventPayload);
if (onEventBatch) levIdBatch.push_back(levId);
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
if (onEventBatch) {
onEventBatch(txn, q->sub, levIdBatch);
levIdBatch.clear();
}
if (complete) {
auto connId = q->sub.connId;
removeSub(connId, q->sub.subId);
onComplete(q->sub);
if (onComplete) onComplete(q->sub);
delete q;
} else {

View File

@ -50,6 +50,12 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad close: ") + e.what());
}
} else if (cmd.starts_with("NEG-")) {
try {
ingesterProcessNegentropy(txn, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad negentropy: ") + e.what());
}
} else {
throw herr("unknown cmd");
}
@ -65,6 +71,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
} else if (auto msg = std::get_if<MsgIngester::CloseConn>(&newMsg.msg)) {
auto connId = msg->connId;
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::CloseConn{connId}});
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::CloseConn{connId}});
}
}
@ -107,3 +114,25 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::RemoveSub{connId, SubId(arr[1].get_string())}});
}
void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 5) throw herr("negentropy query missing elements");
Subscription sub(connId, arr[1].get_string(), NostrFilterGroup::unwrapped(arr.at(2)));
uint64_t idSize = arr.at(3).get_unsigned();
if (idSize < 8 || idSize > 32) throw herr("idSize out of range");
std::string negPayload = from_hex(arr.at(4).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), idSize, std::move(negPayload)}});
} else if (arr.at(0) == "NEG-MSG") {
std::string negPayload = from_hex(arr.at(2).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}});
} else if (arr.at(0) == "NEG-CLOSE") {
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegClose{connId, SubId(arr[1].get_string())}});
} else {
throw herr("unknown command");
}
}

143
src/RelayNegentropy.cpp Normal file
View File

@ -0,0 +1,143 @@
#include "Negentropy.h"
#include "RelayServer.h"
#include "DBQuery.h"
#include "QueryScheduler.h"
struct NegentropyViews {
struct UserView {
Negentropy ne;
std::string initialMsg;
};
using ConnViews = flat_hash_map<SubId, UserView>;
flat_hash_map<uint64_t, ConnViews> conns; // connId -> subId -> Negentropy
bool addView(uint64_t connId, const SubId &subId, uint64_t idSize, const std::string &initialMsg) {
{
auto *existing = findView(connId, subId);
if (existing) removeView(connId, subId);
}
auto res = conns.try_emplace(connId);
auto &connViews = res.first->second;
if (connViews.size() >= cfg().relay__maxSubsPerConnection) {
return false;
}
connViews.try_emplace(subId, UserView{ Negentropy(idSize), initialMsg });
return true;
}
UserView *findView(uint64_t connId, const SubId &subId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return nullptr;
auto f2 = f1->second.find(subId);
if (f2 == f1->second.end()) return nullptr;
return &f2->second;
}
void removeView(uint64_t connId, const SubId &subId) {
auto *view = findView(connId, subId);
if (!view) return;
conns[connId].erase(subId);
if (conns[connId].empty()) conns.erase(connId);
}
void closeConn(uint64_t connId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return;
conns.erase(connId);
}
};
void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
QueryScheduler queries;
NegentropyViews views;
queries.onEventBatch = [&](lmdb::txn &txn, const auto &sub, const std::vector<uint64_t> &levIds){
auto *view = views.findView(sub.connId, sub.subId);
if (!view) return;
for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize));
}
};
queries.onComplete = [&](Subscription &sub){
auto *view = views.findView(sub.connId, sub.subId);
if (!view) return;
view->ne.seal();
auto resp = view->ne.reconcile(view->initialMsg);
view->initialMsg = "";
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
sub.subId.str(),
to_hex(resp)
})));
};
while(1) {
auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait();
auto txn = env.txn_ro();
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgNegentropy::NegOpen>(&newMsg.msg)) {
auto connId = msg->sub.connId;
auto subId = msg->sub.subId;
if (!queries.addSub(txn, std::move(msg->sub))) {
sendNoticeError(connId, std::string("too many concurrent REQs"));
}
if (!views.addView(connId, subId, msg->idSize, msg->negPayload)) {
queries.removeSub(connId, subId);
sendNoticeError(connId, std::string("too many concurrent NEG requests"));
}
queries.process(txn);
} else if (auto msg = std::get_if<MsgNegentropy::NegMsg>(&newMsg.msg)) {
auto *view = views.findView(msg->connId, msg->subId);
if (!view) {
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
msg->subId.str(),
"CLOSED"
})));
return;
}
auto resp = view->ne.reconcile(view->initialMsg);
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
msg->subId.str(),
to_hex(resp)
})));
} else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
views.removeView(msg->connId, msg->subId);
} else if (auto msg = std::get_if<MsgNegentropy::CloseConn>(&newMsg.msg)) {
queries.closeConn(msg->connId);
views.closeConn(msg->connId);
}
}
queries.process(txn);
txn.abort();
}
}

View File

@ -112,6 +112,32 @@ struct MsgReqMonitor : NonCopyable {
MsgReqMonitor(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgNegentropy : NonCopyable {
struct NegOpen {
Subscription sub;
uint64_t idSize;
std::string negPayload;
};
struct NegMsg {
uint64_t connId;
SubId subId;
std::string negPayload;
};
struct NegClose {
uint64_t connId;
SubId subId;
};
struct CloseConn {
uint64_t connId;
};
using Var = std::variant<NegOpen, NegMsg, NegClose, CloseConn>;
Var msg;
MsgNegentropy(Var &&msg_) : msg(std::move(msg_)) {}
};
struct RelayServer {
@ -124,6 +150,7 @@ struct RelayServer {
ThreadPool<MsgWriter> tpWriter;
ThreadPool<MsgReqWorker> tpReqWorker;
ThreadPool<MsgReqMonitor> tpReqMonitor;
ThreadPool<MsgNegentropy> tpNegentropy;
std::thread cronThread;
void run();
@ -134,6 +161,7 @@ struct RelayServer {
void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output);
void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void runWriter(ThreadPool<MsgWriter>::Thread &thr);
@ -141,6 +169,8 @@ struct RelayServer {
void runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr);
void runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr);
void runCron();
// Utils (can be called by any thread)

View File

@ -28,6 +28,10 @@ void RelayServer::run() {
runReqMonitor(thr);
});
tpNegentropy.init("Negentropy", cfg().relay__numThreads__negentropy, [this](auto &thr){
runNegentropy(thr);
});
cronThread = std::thread([this]{
runCron();
});