From de475c59ff48bdabb2a211e91ca6e13848c0727b Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Mon, 1 May 2023 17:51:14 -0400 Subject: [PATCH] negentropy integration --- .gitmodules | 3 + Makefile | 1 + docs/negentropy.md | 85 ++++++++++++++++++++++++ external/negentropy | 1 + golpe.yaml | 4 ++ src/QueryScheduler.h | 12 +++- src/RelayIngester.cpp | 29 ++++++++ src/RelayNegentropy.cpp | 143 ++++++++++++++++++++++++++++++++++++++++ src/RelayServer.h | 30 +++++++++ src/cmd_relay.cpp | 4 ++ 10 files changed, 310 insertions(+), 2 deletions(-) create mode 100644 docs/negentropy.md create mode 160000 external/negentropy create mode 100644 src/RelayNegentropy.cpp diff --git a/.gitmodules b/.gitmodules index fd042f5..e59bb8c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "golpe"] path = golpe url = https://github.com/hoytech/golpe.git +[submodule "external/negentropy"] + path = external/negentropy + url = https://github.com/hoytech/negentropy.git diff --git a/Makefile b/Makefile index 87d74c9..f448aa0 100644 --- a/Makefile +++ b/Makefile @@ -4,3 +4,4 @@ OPT = -O3 -g include golpe/rules.mk LDLIBS += -lsecp256k1 -lb2 -lzstd +INCS += -Iexternal/negentropy/cpp diff --git a/docs/negentropy.md b/docs/negentropy.md new file mode 100644 index 0000000..2cfd3b7 --- /dev/null +++ b/docs/negentropy.md @@ -0,0 +1,85 @@ +# Negentropy Syncing + +This document describes a nostr protocol extension for syncing nostr events. It works for both client-relay and relay-relay scenarios. If both sides of the sync have events in common, then this protocol will use less bandwidth than transferring the full set of events (or even just their IDs). + +It is a nostr-friendly wrapper around the [Negentropy](https://github.com/hoytech/negentropy) protocol. + +## High-Level Protcol Description + +We're going to call the two sides engaged in the sync the client and the relay (even though the initiator could be another relay instead of a client). + +1. Client (initiator) chooses a nostr filter, and retrieves the set of events that it has locally that match this filter. + * Client creates a `Negentropy` object, adds all events to it, seals it, and then calls `initiate()` to create the initial message. +1. Client sends a `NEG-OPEN` message to the relay, which includes the filter and the initial message. +1. Relay selects the set of events that it has locally that match the filter + * Relay creates a `Negentropy` object, adds all events to it, and seals it. +1. Relay calls `reconcile()` on its `Negentropy` object, and returns the results as a `NEG-MSG` answer to the client. +1. Client calls `reconcile()` on its `Negentropy` object using the value sent by the relay. + 1. If the empty string is returned, the sync is complete. + 1. This call will return `have` and `need` arrays, which correspond to nostr IDs (or ID prefixes, if `idSize < 32`) that should be uploaded and downloaded, respectively. + 1. Otherwise, the result is sent back to the relay in another `NEG-MSG`. Goto step 4. + +## Nostr Messages + +### Initial message (client to relay): + +```json +[ + "NEG-OPEN", + , + , + , + +] +``` + +* The subscription ID is used by each side to identify which query a message refers to. It only needs to be long enough to distinguish it from any other concurrent NEG requests on this websocket connection (an integer that increments once per `NEG-OPEN` is fine). If a `NEG-OPEN` is issued for a currently open subscription ID, the existing subscription is first closed. +* The nostr filter is as described in [NIP-01](https://github.com/nostr-protocol/nips/blob/master/01.md), or is an event ID whose `content` contains the JSON-encoded filter/array of filters. +* `idSize` indicates the truncation byte size for IDs. It should be an integer between 8 and 32, inclusive. Smaller values will reduce the amount of bandwidth used, but increase the chance of a collision. 16 is a good default. +* `initialMessage` is the string returned by `initiate()`, hex-encoded. + +### Error message (relay to client): + +If a request cannot be serviced by the relay, an error is returned to the client: + +```json +[ + "NEG-ERR", + , + +] +``` + +Current reason codes are: + +* `RESULTS_TOO_BIG` + * Relays can optionally reject queries that would require them to process too many records, or records that are too old +* `CLOSED` + * Because the `NEG-OPEN` queries are stateful, relays may choose to time-out inactive queries to recover memory resources +* `FILTER_NOT_FOUND` + * If an event ID is used as the filter, this error will be returned if the relay does not have this event. The client should retry with the full filter, or upload the event to the relay. + +After a `NEG-ERR` is issued, the subscription is considered to be closed. + +### Subsequent messages (bidirectional): + +Relay and client alternate sending each other `NEG-MSG`s: + +```json +[ + "NEG-MSG", + , + +] +``` + +### Close message (client to relay): + +When finished, the client should tell the relay it can release its resources with a `NEG-CLOSE`: + +```json +[ + "NEG-CLOSE", + +] +``` diff --git a/external/negentropy b/external/negentropy new file mode 160000 index 0000000..4a9bd3b --- /dev/null +++ b/external/negentropy @@ -0,0 +1 @@ +Subproject commit 4a9bd3b9be6616b51fc6b80e34bed10ac129e1a7 diff --git a/golpe.yaml b/golpe.yaml index 7ed3a48..38dcc31 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -207,6 +207,10 @@ config: desc: reqMonitor threads: Handle filtering of new events default: 3 noReload: true + - name: relay__numThreads__negentropy + desc: negentropy threads: Handle negentropy protocol messages + default: 2 + noReload: true - name: events__maxEventSize desc: "Maximum size of normalised JSON, in bytes" diff --git a/src/QueryScheduler.h b/src/QueryScheduler.h index ee17e34..be2269d 100644 --- a/src/QueryScheduler.h +++ b/src/QueryScheduler.h @@ -5,11 +5,13 @@ struct QueryScheduler : NonCopyable { std::function onEvent; + std::function &levIds)> onEventBatch; std::function onComplete; using ConnQueries = flat_hash_map; flat_hash_map conns; // connId -> subId -> DBQuery* std::deque running; + std::vector levIdBatch; bool addSub(lmdb::txn &txn, Subscription &&sub) { sub.latestEventId = getMostRecentLevId(txn); @@ -73,14 +75,20 @@ struct QueryScheduler : NonCopyable { } bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ - onEvent(txn, sub, levId, eventPayload); + if (onEvent) onEvent(txn, sub, levId, eventPayload); + if (onEventBatch) levIdBatch.push_back(levId); }, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf); + if (onEventBatch) { + onEventBatch(txn, q->sub, levIdBatch); + levIdBatch.clear(); + } + if (complete) { auto connId = q->sub.connId; removeSub(connId, q->sub.subId); - onComplete(q->sub); + if (onComplete) onComplete(q->sub); delete q; } else { diff --git a/src/RelayIngester.cpp b/src/RelayIngester.cpp index ffc0152..3f31b57 100644 --- a/src/RelayIngester.cpp +++ b/src/RelayIngester.cpp @@ -50,6 +50,12 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } catch (std::exception &e) { sendNoticeError(msg->connId, std::string("bad close: ") + e.what()); } + } else if (cmd.starts_with("NEG-")) { + try { + ingesterProcessNegentropy(txn, msg->connId, arr); + } catch (std::exception &e) { + sendNoticeError(msg->connId, std::string("bad negentropy: ") + e.what()); + } } else { throw herr("unknown cmd"); } @@ -65,6 +71,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } else if (auto msg = std::get_if(&newMsg.msg)) { auto connId = msg->connId; tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::CloseConn{connId}}); + tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::CloseConn{connId}}); } } @@ -107,3 +114,25 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::RemoveSub{connId, SubId(arr[1].get_string())}}); } + +void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) { + if (arr.at(0) == "NEG-OPEN") { + if (arr.get_array().size() < 5) throw herr("negentropy query missing elements"); + + Subscription sub(connId, arr[1].get_string(), NostrFilterGroup::unwrapped(arr.at(2))); + + uint64_t idSize = arr.at(3).get_unsigned(); + if (idSize < 8 || idSize > 32) throw herr("idSize out of range"); + + std::string negPayload = from_hex(arr.at(4).get_string()); + + tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), idSize, std::move(negPayload)}}); + } else if (arr.at(0) == "NEG-MSG") { + std::string negPayload = from_hex(arr.at(2).get_string()); + tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}}); + } else if (arr.at(0) == "NEG-CLOSE") { + tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegClose{connId, SubId(arr[1].get_string())}}); + } else { + throw herr("unknown command"); + } +} diff --git a/src/RelayNegentropy.cpp b/src/RelayNegentropy.cpp new file mode 100644 index 0000000..f35e0b0 --- /dev/null +++ b/src/RelayNegentropy.cpp @@ -0,0 +1,143 @@ +#include "Negentropy.h" + +#include "RelayServer.h" +#include "DBQuery.h" +#include "QueryScheduler.h" + + +struct NegentropyViews { + struct UserView { + Negentropy ne; + std::string initialMsg; + }; + + using ConnViews = flat_hash_map; + flat_hash_map conns; // connId -> subId -> Negentropy + + bool addView(uint64_t connId, const SubId &subId, uint64_t idSize, const std::string &initialMsg) { + { + auto *existing = findView(connId, subId); + if (existing) removeView(connId, subId); + } + + auto res = conns.try_emplace(connId); + auto &connViews = res.first->second; + + if (connViews.size() >= cfg().relay__maxSubsPerConnection) { + return false; + } + + connViews.try_emplace(subId, UserView{ Negentropy(idSize), initialMsg }); + + return true; + } + + UserView *findView(uint64_t connId, const SubId &subId) { + auto f1 = conns.find(connId); + if (f1 == conns.end()) return nullptr; + + auto f2 = f1->second.find(subId); + if (f2 == f1->second.end()) return nullptr; + + return &f2->second; + } + + void removeView(uint64_t connId, const SubId &subId) { + auto *view = findView(connId, subId); + if (!view) return; + conns[connId].erase(subId); + if (conns[connId].empty()) conns.erase(connId); + } + + void closeConn(uint64_t connId) { + auto f1 = conns.find(connId); + if (f1 == conns.end()) return; + + conns.erase(connId); + } +}; + + +void RelayServer::runNegentropy(ThreadPool::Thread &thr) { + QueryScheduler queries; + NegentropyViews views; + + queries.onEventBatch = [&](lmdb::txn &txn, const auto &sub, const std::vector &levIds){ + auto *view = views.findView(sub.connId, sub.subId); + if (!view) return; + + for (auto levId : levIds) { + auto ev = lookupEventByLevId(txn, levId); + view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize)); + } + }; + + queries.onComplete = [&](Subscription &sub){ + auto *view = views.findView(sub.connId, sub.subId); + if (!view) return; + + view->ne.seal(); + + auto resp = view->ne.reconcile(view->initialMsg); + view->initialMsg = ""; + + sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ + "NEG-MSG", + sub.subId.str(), + to_hex(resp) + }))); + }; + + while(1) { + auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait(); + + auto txn = env.txn_ro(); + + for (auto &newMsg : newMsgs) { + if (auto msg = std::get_if(&newMsg.msg)) { + auto connId = msg->sub.connId; + auto subId = msg->sub.subId; + + if (!queries.addSub(txn, std::move(msg->sub))) { + sendNoticeError(connId, std::string("too many concurrent REQs")); + } + + if (!views.addView(connId, subId, msg->idSize, msg->negPayload)) { + queries.removeSub(connId, subId); + sendNoticeError(connId, std::string("too many concurrent NEG requests")); + } + + queries.process(txn); + } else if (auto msg = std::get_if(&newMsg.msg)) { + auto *view = views.findView(msg->connId, msg->subId); + if (!view) { + sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({ + "NEG-ERR", + msg->subId.str(), + "CLOSED" + }))); + + return; + } + + auto resp = view->ne.reconcile(view->initialMsg); + + sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({ + "NEG-MSG", + msg->subId.str(), + to_hex(resp) + }))); + } else if (auto msg = std::get_if(&newMsg.msg)) { + queries.removeSub(msg->connId, msg->subId); + views.removeView(msg->connId, msg->subId); + } else if (auto msg = std::get_if(&newMsg.msg)) { + queries.closeConn(msg->connId); + views.closeConn(msg->connId); + } + } + + queries.process(txn); + + txn.abort(); + } +} diff --git a/src/RelayServer.h b/src/RelayServer.h index 97d1a54..52aeee7 100644 --- a/src/RelayServer.h +++ b/src/RelayServer.h @@ -112,6 +112,32 @@ struct MsgReqMonitor : NonCopyable { MsgReqMonitor(Var &&msg_) : msg(std::move(msg_)) {} }; +struct MsgNegentropy : NonCopyable { + struct NegOpen { + Subscription sub; + uint64_t idSize; + std::string negPayload; + }; + + struct NegMsg { + uint64_t connId; + SubId subId; + std::string negPayload; + }; + + struct NegClose { + uint64_t connId; + SubId subId; + }; + + struct CloseConn { + uint64_t connId; + }; + + using Var = std::variant; + Var msg; + MsgNegentropy(Var &&msg_) : msg(std::move(msg_)) {} +}; struct RelayServer { @@ -124,6 +150,7 @@ struct RelayServer { ThreadPool tpWriter; ThreadPool tpReqWorker; ThreadPool tpReqMonitor; + ThreadPool tpNegentropy; std::thread cronThread; void run(); @@ -134,6 +161,7 @@ struct RelayServer { void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output); void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); + void ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); void runWriter(ThreadPool::Thread &thr); @@ -141,6 +169,8 @@ struct RelayServer { void runReqMonitor(ThreadPool::Thread &thr); + void runNegentropy(ThreadPool::Thread &thr); + void runCron(); // Utils (can be called by any thread) diff --git a/src/cmd_relay.cpp b/src/cmd_relay.cpp index 67faa55..c9d0a3c 100644 --- a/src/cmd_relay.cpp +++ b/src/cmd_relay.cpp @@ -28,6 +28,10 @@ void RelayServer::run() { runReqMonitor(thr); }); + tpNegentropy.init("Negentropy", cfg().relay__numThreads__negentropy, [this](auto &thr){ + runNegentropy(thr); + }); + cronThread = std::thread([this]{ runCron(); });