From a93ad7ff3db286a9f66bdbc8031ff6b6b0d03110 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Fri, 3 Mar 2023 02:16:57 -0500 Subject: [PATCH] wip --- golpe.yaml | 4 + src/QueryScheduler.h | 12 ++- src/RelayIngester.cpp | 24 +++++ src/RelayServer.h | 31 ++++++ src/RelayXor.cpp | 231 ++++++++++++++++++++++++++++++++++++++++++ src/cmd_relay.cpp | 4 + src/transport.h | 54 ++++++++++ 7 files changed, 358 insertions(+), 2 deletions(-) create mode 100644 src/RelayXor.cpp create mode 100644 src/transport.h diff --git a/golpe.yaml b/golpe.yaml index 5a29e20..71393f8 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -211,6 +211,10 @@ config: desc: yesstr threads: Experimental yesstr protocol default: 1 noReload: true + - name: relay__numThreads__xor + desc: xor threads: Experimental XOR protocol + default: 3 + noReload: true - name: events__maxEventSize desc: "Maximum size of normalised JSON, in bytes" diff --git a/src/QueryScheduler.h b/src/QueryScheduler.h index ee17e34..be2269d 100644 --- a/src/QueryScheduler.h +++ b/src/QueryScheduler.h @@ -5,11 +5,13 @@ struct QueryScheduler : NonCopyable { std::function onEvent; + std::function &levIds)> onEventBatch; std::function onComplete; using ConnQueries = flat_hash_map; flat_hash_map conns; // connId -> subId -> DBQuery* std::deque running; + std::vector levIdBatch; bool addSub(lmdb::txn &txn, Subscription &&sub) { sub.latestEventId = getMostRecentLevId(txn); @@ -73,14 +75,20 @@ struct QueryScheduler : NonCopyable { } bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ - onEvent(txn, sub, levId, eventPayload); + if (onEvent) onEvent(txn, sub, levId, eventPayload); + if (onEventBatch) levIdBatch.push_back(levId); }, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf); + if (onEventBatch) { + onEventBatch(txn, q->sub, levIdBatch); + levIdBatch.clear(); + } + if (complete) { auto connId = q->sub.connId; removeSub(connId, q->sub.subId); - onComplete(q->sub); + if (onComplete) onComplete(q->sub); delete q; } else { diff --git a/src/RelayIngester.cpp b/src/RelayIngester.cpp index dc8139a..0456e93 100644 --- a/src/RelayIngester.cpp +++ b/src/RelayIngester.cpp @@ -50,6 +50,12 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } catch (std::exception &e) { sendNoticeError(msg->connId, std::string("bad close: ") + e.what()); } + } else if (cmd.starts_with("XOR-")) { + try { + ingesterProcessXor(txn, msg->connId, arr); + } catch (std::exception &e) { + sendNoticeError(msg->connId, std::string("bad xor: ") + e.what()); + } } else { throw herr("unknown cmd"); } @@ -73,6 +79,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { auto connId = msg->connId; tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::CloseConn{connId}}); tpYesstr.dispatch(connId, MsgYesstr{MsgYesstr::CloseConn{connId}}); + tpXor.dispatch(connId, MsgXor{MsgXor::CloseConn{connId}}); } } @@ -115,3 +122,20 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::RemoveSub{connId, SubId(arr[1].get_string())}}); } + +void RelayServer::ingesterProcessXor(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) { + if (arr.at(0) == "XOR-OPEN") { + if (arr.get_array().size() < 5) throw herr("xor arr too small"); + + Subscription sub(connId, arr[1].get_string(), NostrFilterGroup::unwrapped(arr.at(2))); + + uint64_t idSize = arr.at(3).get_unsigned(); + if (idSize < 8 || idSize > 32) throw herr("idSize out of range"); + + std::string query = from_hex(arr.at(4).get_string()); + + tpXor.dispatch(connId, MsgXor{MsgXor::NewView{std::move(sub), idSize, std::move(query)}}); + } else { + throw herr("unknown command"); + } +} diff --git a/src/RelayServer.h b/src/RelayServer.h index deba101..fc56f6c 100644 --- a/src/RelayServer.h +++ b/src/RelayServer.h @@ -129,6 +129,33 @@ struct MsgYesstr : NonCopyable { MsgYesstr(Var &&msg_) : msg(std::move(msg_)) {} }; +struct MsgXor : NonCopyable { + struct NewView { + Subscription sub; + uint64_t idSize; + std::string query; + }; + + struct QueryView { + uint64_t connId; + SubId subId; + uint64_t queryId; + std::string query; + }; + + struct RemoveView { + uint64_t connId; + SubId subId; + }; + + struct CloseConn { + uint64_t connId; + }; + + using Var = std::variant; + Var msg; + MsgXor(Var &&msg_) : msg(std::move(msg_)) {} +}; struct RelayServer { std::unique_ptr hubTrigger; @@ -141,6 +168,7 @@ struct RelayServer { ThreadPool tpReqWorker; ThreadPool tpReqMonitor; ThreadPool tpYesstr; + ThreadPool tpXor; std::thread cronThread; void run(); @@ -151,6 +179,7 @@ struct RelayServer { void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output); void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); + void ingesterProcessXor(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); void runWriter(ThreadPool::Thread &thr); @@ -160,6 +189,8 @@ struct RelayServer { void runYesstr(ThreadPool::Thread &thr); + void runXor(ThreadPool::Thread &thr); + void runCron(); // Utils (can be called by any thread) diff --git a/src/RelayXor.cpp b/src/RelayXor.cpp new file mode 100644 index 0000000..ab17d02 --- /dev/null +++ b/src/RelayXor.cpp @@ -0,0 +1,231 @@ +#include "RelayServer.h" +#include "DBQuery.h" +#include "QueryScheduler.h" +#include "transport.h" + + +struct XorViews { + struct Elem { + char data[5 * 8]; + + Elem() { + memset(data, '\0', sizeof(data)); + } + + Elem(uint64_t created, std::string_view id, uint64_t idSize) { + memset(data, '\0', sizeof(data)); + data[3] = (created >> (4*8)) & 0xFF; + data[4] = (created >> (3*8)) & 0xFF; + data[5] = (created >> (2*8)) & 0xFF; + data[6] = (created >> (1*8)) & 0xFF; + data[7] = (created >> (0*8)) & 0xFF; + memcpy(data + 8, id.data(), idSize); + } + + Elem(std::string_view id) { + memset(data, '\0', sizeof(data)); + memcpy(data + 3, id.data(), id.size()); + } + + std::string_view getCompare(uint64_t idSize) const { + return std::string_view(data + 3, idSize + 5); + } + + std::string_view getId(uint64_t idSize) const { + return std::string_view(data + 8, idSize); + } + + bool isZero() { + uint64_t *ours = reinterpret_cast(data + 8); + return ours[0] == 0 && ours[1] == 0 && ours[2] == 0 && ours[3] == 0; + } + + void doXor(const Elem &other) { + uint64_t *ours = reinterpret_cast(data + 8); + const uint64_t *theirs = reinterpret_cast(other.data + 8); + + ours[0] ^= theirs[0]; + ours[1] ^= theirs[1]; + ours[2] ^= theirs[2]; + ours[3] ^= theirs[3]; + } + }; + + struct View { + uint64_t connId; + SubId subId; + uint64_t idSize; + std::string initialQuery; + + std::vector elems; + bool ready = false; + + View(uint64_t connId, SubId subId, uint64_t idSize, const std::string &initialQuery) : connId(connId), subId(subId), idSize(idSize), initialQuery(initialQuery) { + if (idSize < 8 || idSize > 32) throw herr("idSize out of range"); + } + + void addElem(uint64_t createdAt, std::string_view id) { + elems.emplace_back(createdAt, id, idSize); + } + + void finalise() { + std::reverse(elems.begin(), elems.end()); // pushed in approximately descending order, so hopefully this speeds up the sort + + std::sort(elems.begin(), elems.end(), [&](const auto &a, const auto &b) { return a.getCompare(idSize) < b.getCompare(idSize); }); + + ready = true; + + handleQuery(initialQuery); + initialQuery = ""; + } + + void handleQuery(std::string_view query) { // FIXME: this can throw + std::string output; + std::vector idsToSend; + + auto cmp = [&](const auto &a, const auto &b){ return a.getCompare(idSize) < b.getCompare(idSize); }; + + while (query.size()) { + uint64_t lowerLength = decodeVarInt(query); + if (lowerLength > idSize + 5) throw herr("lower too long"); + Elem lowerKey(getBytes(query, lowerLength)); + + uint64_t upperLength = decodeVarInt(query); + if (upperLength > idSize + 5) throw herr("upper too long"); + Elem upperKey(getBytes(query, upperLength)); + + std::string xorSet = getBytes(query, idSize); + + auto lower = std::lower_bound(elems.begin(), elems.end(), lowerKey, cmp); + auto upper = std::upper_bound(elems.begin(), elems.end(), upperKey, cmp); + + Elem myXorSet; + for (auto i = lower; i < upper; ++i) myXorSet.doXor(*i); + } + + /* + sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ + "XOR-RES", + sub.subId.str(), + to_hex(view->xorRange(0, view->elems.size())), + view->elems.size(), + }))); + */ + } + + std::string xorRange(uint64_t start, uint64_t len) { + Elem output; + + for (uint64_t i = 0; i < len; i++) { + output.doXor(elems[i]); + } + + return std::string(output.getId(idSize)); + } + }; + + using ConnViews = flat_hash_map; + flat_hash_map conns; // connId -> subId -> View + + bool addView(uint64_t connId, const SubId &subId, uint64_t idSize, const std::string &query) { + { + auto *existing = findView(connId, subId); + if (existing) removeView(connId, subId); + } + + auto res = conns.try_emplace(connId); + auto &connViews = res.first->second; + + if (connViews.size() >= cfg().relay__maxSubsPerConnection) { + return false; + } + + connViews.try_emplace(subId, connId, subId, idSize, query); + + return true; + } + + View *findView(uint64_t connId, const SubId &subId) { + auto f1 = conns.find(connId); + if (f1 == conns.end()) return nullptr; + + auto f2 = f1->second.find(subId); + if (f2 == f1->second.end()) return nullptr; + + return &f2->second; + } + + void removeView(uint64_t connId, const SubId &subId) { + auto *view = findView(connId, subId); + if (!view) return; + conns[connId].erase(subId); + if (conns[connId].empty()) conns.erase(connId); + } + + void closeConn(uint64_t connId) { + auto f1 = conns.find(connId); + if (f1 == conns.end()) return; + + conns.erase(connId); + } +}; + + +void RelayServer::runXor(ThreadPool::Thread &thr) { + QueryScheduler queries; + XorViews views; + + queries.onEventBatch = [&](lmdb::txn &txn, const auto &sub, const std::vector &levIds){ + auto *view = views.findView(sub.connId, sub.subId); + if (!view) return; + + for (auto levId : levIds) { + auto ev = lookupEventByLevId(txn, levId); + view->addElem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->idSize)); + } + }; + + queries.onComplete = [&](Subscription &sub){ + auto *view = views.findView(sub.connId, sub.subId); + if (!view) return; + + view->finalise(); + }; + + while(1) { + auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait(); + + auto txn = env.txn_ro(); + + for (auto &newMsg : newMsgs) { + if (auto msg = std::get_if(&newMsg.msg)) { + auto connId = msg->sub.connId; + auto subId = msg->sub.subId; + + if (!queries.addSub(txn, std::move(msg->sub))) { + sendNoticeError(connId, std::string("too many concurrent REQs")); + } + + if (!views.addView(connId, subId, msg->idSize, msg->query)) { + queries.removeSub(connId, subId); + sendNoticeError(connId, std::string("too many concurrent XORs")); + } + + queries.process(txn); + } else if (auto msg = std::get_if(&newMsg.msg)) { + (void)msg; + //... + } else if (auto msg = std::get_if(&newMsg.msg)) { + queries.removeSub(msg->connId, msg->subId); + views.removeView(msg->connId, msg->subId); + } else if (auto msg = std::get_if(&newMsg.msg)) { + queries.closeConn(msg->connId); + views.closeConn(msg->connId); + } + } + + queries.process(txn); + + txn.abort(); + } +} diff --git a/src/cmd_relay.cpp b/src/cmd_relay.cpp index f62ce3f..ac723df 100644 --- a/src/cmd_relay.cpp +++ b/src/cmd_relay.cpp @@ -32,6 +32,10 @@ void RelayServer::run() { runYesstr(thr); }); + tpXor.init("Xor", cfg().relay__numThreads__xor, [this](auto &thr){ + runXor(thr); + }); + cronThread = std::thread([this]{ runCron(); }); diff --git a/src/transport.h b/src/transport.h new file mode 100644 index 0000000..d5b9693 --- /dev/null +++ b/src/transport.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include + +#include "golpe.h" + + +inline unsigned char getByte(std::string_view &encoded){ + if (encoded.size() < 1) throw herr("parse ends prematurely"); + auto res = static_cast(encoded[0]); + encoded = encoded.substr(1); + return res; +}; + +inline std::string getBytes(std::string_view &encoded, size_t n) { + if (encoded.size() < n) throw herr("parse ends prematurely"); + auto res = encoded.substr(0, n); + encoded = encoded.substr(n); + return std::string(res); +}; + +inline std::string encodeVarInt(uint64_t n) { + if (n == 0) return std::string(1, '\0'); + + std::string o; + + while (n) { + o.push_back(static_cast(n & 0x7F)); + n >>= 7; + } + + std::reverse(o.begin(), o.end()); + + for (size_t i = 0; i < o.size() - 1; i++) { + o[i] |= 0x80; + } + + return o; +} + +inline uint64_t decodeVarInt(std::string_view &encoded) { + uint64_t res = 0; + + while (1) { + if (encoded.size() == 0) throw herr("premature end of varint"); + uint64_t byte = encoded[0]; + encoded = encoded.substr(1); + res = (res << 7) | (byte & 0b0111'1111); + if ((byte & 0b1000'0000) == 0) break; + } + + return res; +}