From 0cbc937bf03889c40cc69c10149741ca5b71c6b4 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Sat, 29 Apr 2023 15:20:04 -0400 Subject: [PATCH] remove quadrable and yesstr --- fbs/yesstr.fbs | 41 --------- golpe | 2 +- golpe.yaml | 6 +- src/RelayCron.cpp | 19 +--- src/RelayIngester.cpp | 11 --- src/RelayServer.h | 19 ---- src/RelayWriter.cpp | 4 +- src/RelayYesstr.cpp | 142 ------------------------------ src/WriterPipeline.h | 4 +- src/cmd_compact.cpp | 2 - src/cmd_delete.cpp | 9 +- src/cmd_export.cpp | 11 +-- src/cmd_gc.cpp | 22 ----- src/cmd_import.cpp | 5 +- src/cmd_info.cpp | 3 - src/cmd_relay.cpp | 4 - src/cmd_sync.cpp | 196 +----------------------------------------- src/constants.h | 1 - src/events.cpp | 25 +++--- src/events.h | 21 ++--- src/gc.h | 33 ------- src/global.h | 6 -- src/onAppStartup.cpp | 15 ---- src/yesstr.h | 29 ------- strfry.conf | 5 +- 25 files changed, 35 insertions(+), 600 deletions(-) delete mode 100644 fbs/yesstr.fbs delete mode 100644 src/RelayYesstr.cpp delete mode 100644 src/cmd_gc.cpp delete mode 100644 src/gc.h delete mode 100644 src/yesstr.h diff --git a/fbs/yesstr.fbs b/fbs/yesstr.fbs deleted file mode 100644 index e8a2eaa..0000000 --- a/fbs/yesstr.fbs +++ /dev/null @@ -1,41 +0,0 @@ -namespace Yesstr; - - - -// FilterSync - -table RequestSync { - filter: string; // only required for first in a sequence - reqsEncoded: [ubyte]; -} - -table ResponseSync { - respsEncoded: [ubyte]; -} - - - -// Request/Response wrappers - -union RequestPayload { - RequestSync, -} - -union ResponsePayload { - ResponseSync, -} - -table Request { - requestId: uint64; - payload: RequestPayload; -} - -table Response { - requestId: uint64; - payload: ResponsePayload; -} - - - -table Empty {} -root_type Empty; diff --git a/golpe b/golpe index 30b8b49..d14d64e 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit 30b8b49d561ff27a6a681525121efa32088556a6 +Subproject commit d14d64e892149752e2c6b049e166ec425b6daa8f diff --git a/golpe.yaml b/golpe.yaml index 5a29e20..7ed3a48 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -146,7 +146,7 @@ config: default: "unset" - name: relay__maxWebsocketPayloadSize - desc: "Maximum accepted incoming websocket frame size (should be larger than max event and yesstr msg)" + desc: "Maximum accepted incoming websocket frame size (should be larger than max event)" default: 131072 noReload: true - name: relay__autoPingSeconds @@ -207,10 +207,6 @@ config: desc: reqMonitor threads: Handle filtering of new events default: 3 noReload: true - - name: relay__numThreads__yesstr - desc: yesstr threads: Experimental yesstr protocol - default: 1 - noReload: true - name: events__maxEventSize desc: "Maximum size of normalised JSON, in bytes" diff --git a/src/RelayCron.cpp b/src/RelayCron.cpp index c52287d..968f147 100644 --- a/src/RelayCron.cpp +++ b/src/RelayCron.cpp @@ -2,12 +2,8 @@ #include "RelayServer.h" -#include "gc.h" - void RelayServer::runCron() { - auto qdb = getQdbInstance(); - hoytech::timer cron; cron.setupCb = []{ setThreadName("cron"); }; @@ -59,17 +55,14 @@ void RelayServer::runCron() { auto txn = env.txn_rw(); uint64_t numDeleted = 0; - auto changes = qdb.change(); for (auto levId : expiredLevIds) { auto view = env.lookup_Event(txn, levId); if (!view) continue; // Deleted in between transactions - deleteEvent(txn, changes, *view); + deleteEvent(txn, *view); numDeleted++; } - changes.apply(txn); - txn.commit(); if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events"; @@ -119,17 +112,14 @@ void RelayServer::runCron() { auto txn = env.txn_rw(); uint64_t numDeleted = 0; - auto changes = qdb.change(); for (auto levId : expiredLevIds) { auto view = env.lookup_Event(txn, levId); if (!view) continue; // Deleted in between transactions - deleteEvent(txn, changes, *view); + deleteEvent(txn, *view); numDeleted++; } - changes.apply(txn); - txn.commit(); if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")"; @@ -137,11 +127,6 @@ void RelayServer::runCron() { }); - // Garbage collect quadrable nodes - - cron.repeat(60 * 60 * 1'000'000UL, [&]{ - quadrableGarbageCollect(qdb, 1); - }); cron.run(); diff --git a/src/RelayIngester.cpp b/src/RelayIngester.cpp index 2561816..ffc0152 100644 --- a/src/RelayIngester.cpp +++ b/src/RelayIngester.cpp @@ -53,16 +53,6 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } else { throw herr("unknown cmd"); } - } else if (msg->payload.starts_with("Y")) { - verifyYesstrRequest(msg->payload); - - auto *req = parseYesstrRequest(msg->payload); - - if (req->payload_type() == Yesstr::RequestPayload::RequestPayload_RequestSync) { - tpYesstr.dispatch(msg->connId, MsgYesstr{MsgYesstr::SyncRequest{ msg->connId, std::move(msg->payload) }}); - } else { - throw herr("unrecognised yesstr request"); - } } else if (msg->payload == "\n") { // Do nothing. // This is for when someone is just sending newlines on websocat for debugging purposes. @@ -75,7 +65,6 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } else if (auto msg = std::get_if(&newMsg.msg)) { auto connId = msg->connId; tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::CloseConn{connId}}); - tpYesstr.dispatch(connId, MsgYesstr{MsgYesstr::CloseConn{connId}}); } } diff --git a/src/RelayServer.h b/src/RelayServer.h index deba101..97d1a54 100644 --- a/src/RelayServer.h +++ b/src/RelayServer.h @@ -9,7 +9,6 @@ #include #include #include -#include #include "golpe.h" @@ -17,7 +16,6 @@ #include "ThreadPool.h" #include "events.h" #include "filters.h" -#include "yesstr.h" @@ -114,20 +112,6 @@ struct MsgReqMonitor : NonCopyable { MsgReqMonitor(Var &&msg_) : msg(std::move(msg_)) {} }; -struct MsgYesstr : NonCopyable { - struct SyncRequest { - uint64_t connId; - std::string yesstrMessage; - }; - - struct CloseConn { - uint64_t connId; - }; - - using Var = std::variant; - Var msg; - MsgYesstr(Var &&msg_) : msg(std::move(msg_)) {} -}; struct RelayServer { @@ -140,7 +124,6 @@ struct RelayServer { ThreadPool tpWriter; ThreadPool tpReqWorker; ThreadPool tpReqMonitor; - ThreadPool tpYesstr; std::thread cronThread; void run(); @@ -158,8 +141,6 @@ struct RelayServer { void runReqMonitor(ThreadPool::Thread &thr); - void runYesstr(ThreadPool::Thread &thr); - void runCron(); // Utils (can be called by any thread) diff --git a/src/RelayWriter.cpp b/src/RelayWriter.cpp index c1e2639..4dcfc05 100644 --- a/src/RelayWriter.cpp +++ b/src/RelayWriter.cpp @@ -4,8 +4,6 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { - auto qdb = getQdbInstance(); - PluginWritePolicy writePolicy; while(1) { @@ -37,7 +35,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { { auto txn = env.txn_rw(); - writeEvents(txn, qdb, newEvents); + writeEvents(txn, newEvents); txn.commit(); } diff --git a/src/RelayYesstr.cpp b/src/RelayYesstr.cpp deleted file mode 100644 index 5217cbb..0000000 --- a/src/RelayYesstr.cpp +++ /dev/null @@ -1,142 +0,0 @@ -#include -#include - -#include "RelayServer.h" -#include "DBQuery.h" - - -void RelayServer::runYesstr(ThreadPool::Thread &thr) { - auto qdb = getQdbInstance(); - - struct SyncState { - quadrable::MemStore m; - }; - - struct SyncStateCollection { - RelayServer *server; - quadrable::Quadrable *qdb; - flat_hash_map> conns; // connId -> reqId -> SyncState - - SyncStateCollection(RelayServer *server_, quadrable::Quadrable *qdb_) : server(server_), qdb(qdb_) {} - - SyncState *lookup(uint64_t connId, uint64_t reqId) { - if (!conns.contains(connId)) return nullptr; - if (!conns[connId].contains(reqId)) return nullptr; - return &conns[connId][reqId]; - } - - SyncState *newRequest(lmdb::txn &txn, uint64_t connId, uint64_t reqId, std::string_view filterStr) { - if (!conns.contains(connId)) conns.try_emplace(connId); - if (conns[connId].contains(reqId)) { - LI << "Client tried to re-use reqId for new filter, ignoring"; - return &conns[connId][reqId]; - } - conns[connId].try_emplace(reqId); - auto &s = conns[connId][reqId]; - - LI << "Yesstr sync. filter: '" << filterStr << "'"; - - if (filterStr == "{}") { - qdb->checkout("events"); - uint64_t nodeId = qdb->getHeadNodeId(txn); - - qdb->withMemStore(s.m, [&]{ - qdb->writeToMemStore = true; - qdb->checkout(nodeId); - }); - } else { - // FIXME: The following blocks the whole thread for the query duration. Should interleave it - // with other requests like RelayReqWorker does. - - std::vector levIds; - DBQuery query(tao::json::from_string(filterStr)); - - while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ - levIds.push_back(levId); - }, MAX_U64, cfg().relay__logging__dbScanPerf); - - if (complete) break; - } - - LI << "Filter matched " << levIds.size() << " local events"; - - qdb->withMemStore(s.m, [&]{ - qdb->writeToMemStore = true; - qdb->checkout(); - - auto changes = qdb->change(); - - for (auto levId : levIds) { - changes.putReuse(txn, levId); - } - - changes.apply(txn); - }); - } - - return &s; - } - - - void handleRequest(lmdb::txn &txn, uint64_t connId, uint64_t reqId, std::string_view filterStr, std::string_view reqsEncoded) { - SyncState *s = lookup(connId, reqId); - - if (!s) s = newRequest(txn, connId, reqId, filterStr); - - auto reqs = quadrable::transport::decodeSyncRequests(reqsEncoded); - - quadrable::SyncResponses resps; - - qdb->withMemStore(s->m, [&]{ - qdb->writeToMemStore = true; - resps = qdb->handleSyncRequests(txn, qdb->getHeadNodeId(txn), reqs, 100'000); - }); - - std::string respsEncoded = quadrable::transport::encodeSyncResponses(resps); - - flatbuffers::FlatBufferBuilder builder; - - auto respOffset = Yesstr::CreateResponse(builder, - reqId, - Yesstr::ResponsePayload::ResponsePayload_ResponseSync, - Yesstr::CreateResponseSync(builder, - builder.CreateVector((uint8_t*)respsEncoded.data(), respsEncoded.size()) - ).Union() - ); - - builder.Finish(respOffset); - - std::string respMsg = std::string("Y") + std::string(reinterpret_cast(builder.GetBufferPointer()), builder.GetSize()); - server->sendToConnBinary(connId, std::move(respMsg)); - } - - void closeConn(uint64_t connId) { - conns.erase(connId); - } - }; - - SyncStateCollection states(this, &qdb); - - - while(1) { - auto newMsgs = thr.inbox.pop_all(); - - auto txn = env.txn_ro(); - - for (auto &newMsg : newMsgs) { - if (auto msg = std::get_if(&newMsg.msg)) { - const auto *req = parseYesstrRequest(msg->yesstrMessage); // validated by ingester - const auto *reqSync = req->payload_as(); - - try { - states.handleRequest(txn, msg->connId, req->requestId(), sv(reqSync->filter()), sv(reqSync->reqsEncoded())); - } catch (std::exception &e) { - sendNoticeError(msg->connId, std::string("yesstr failure: ") + e.what()); - } - } else if (auto msg = std::get_if(&newMsg.msg)) { - states.closeConn(msg->connId); - } - } - } -} diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index 0fd041f..bd10d0c 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -58,8 +58,6 @@ struct WriterPipeline { writerThread = std::thread([&]() { setThreadName("Writer"); - auto qdb = getQdbInstance(); - while (1) { // Debounce writerInbox.wait(); @@ -103,7 +101,7 @@ struct WriterPipeline { if (newEventsToProc.size()) { { auto txn = env.txn_rw(); - writeEvents(txn, qdb, newEventsToProc); + writeEvents(txn, newEventsToProc); txn.commit(); } diff --git a/src/cmd_compact.cpp b/src/cmd_compact.cpp index ebcb6bf..11604f6 100644 --- a/src/cmd_compact.cpp +++ b/src/cmd_compact.cpp @@ -4,8 +4,6 @@ #include #include "golpe.h" -#include "gc.h" - static const char USAGE[] = R"( diff --git a/src/cmd_delete.cpp b/src/cmd_delete.cpp index 8532ec9..999b0fc 100644 --- a/src/cmd_delete.cpp +++ b/src/cmd_delete.cpp @@ -5,7 +5,6 @@ #include "DBQuery.h" #include "events.h" -#include "gc.h" static const char USAGE[] = @@ -65,23 +64,17 @@ void cmd_delete(const std::vector &subArgs) { } - auto qdb = getQdbInstance(); - LI << "Deleting " << levIds.size() << " events"; { auto txn = env.txn_rw(); - auto changes = qdb.change(); - for (auto levId : levIds) { auto view = env.lookup_Event(txn, levId); if (!view) continue; // Deleted in between transactions - deleteEvent(txn, changes, *view); + deleteEvent(txn, *view); } - changes.apply(txn); - txn.commit(); } } diff --git a/src/cmd_export.cpp b/src/cmd_export.cpp index d2341ad..3cf2636 100644 --- a/src/cmd_export.cpp +++ b/src/cmd_export.cpp @@ -26,7 +26,8 @@ void cmd_export(const std::vector &subArgs) { auto txn = env.txn_ro(); auto dbVersion = getDBVersion(txn); - auto qdb = getQdbInstance(txn); + + if (dbVersion == 0) throw herr("migration from DB version 0 not supported by this version of strfry"); uint64_t start = reverse ? until : since; uint64_t startDup = reverse ? MAX_U64 : 0; @@ -40,14 +41,6 @@ void cmd_export(const std::vector &subArgs) { auto view = lookupEventByLevId(txn, lmdb::from_sv(v)); - if (dbVersion == 0) { - std::string_view raw; - bool found = qdb.dbi_nodesLeaf.get(txn, lmdb::to_sv(view.primaryKeyId), raw); - if (!found) throw herr("couldn't find leaf node in quadrable table"); - std::cout << raw.substr(8 + 32 + 32) << "\n"; - return true; - } - std::cout << getEventJson(txn, decomp, view.primaryKeyId) << "\n"; return true; diff --git a/src/cmd_gc.cpp b/src/cmd_gc.cpp deleted file mode 100644 index ca40ed1..0000000 --- a/src/cmd_gc.cpp +++ /dev/null @@ -1,22 +0,0 @@ -#include -#include - -#include -#include "golpe.h" - -#include "gc.h" - - -static const char USAGE[] = -R"( - Usage: - gc -)"; - - -void cmd_gc(const std::vector &subArgs) { - std::map args = docopt::docopt(USAGE, subArgs, true, ""); - - auto qdb = getQdbInstance(); - quadrableGarbageCollect(qdb, 2); -} diff --git a/src/cmd_import.cpp b/src/cmd_import.cpp index 45cdbeb..b70c158 100644 --- a/src/cmd_import.cpp +++ b/src/cmd_import.cpp @@ -5,7 +5,6 @@ #include "events.h" #include "filters.h" -#include "gc.h" static const char USAGE[] = @@ -23,8 +22,6 @@ void cmd_import(const std::vector &subArgs) { if (noVerify) LW << "not verifying event IDs or signatures!"; - auto qdb = getQdbInstance(); - auto txn = env.txn_rw(); secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); @@ -38,7 +35,7 @@ void cmd_import(const std::vector &subArgs) { }; auto flushChanges = [&]{ - writeEvents(txn, qdb, newEvents, 0); + writeEvents(txn, newEvents, 0); uint64_t numCommits = 0; diff --git a/src/cmd_info.cpp b/src/cmd_info.cpp index e5d1dda..5409e81 100644 --- a/src/cmd_info.cpp +++ b/src/cmd_info.cpp @@ -14,10 +14,7 @@ R"( void cmd_info(const std::vector &subArgs) { std::map args = docopt::docopt(USAGE, subArgs, true, ""); - auto qdb = getQdbInstance(); - auto txn = env.txn_ro(); std::cout << "DB version: " << getDBVersion(txn) << "\n"; - std::cout << "merkle root: " << to_hex(qdb.root(txn)) << "\n"; } diff --git a/src/cmd_relay.cpp b/src/cmd_relay.cpp index f62ce3f..67faa55 100644 --- a/src/cmd_relay.cpp +++ b/src/cmd_relay.cpp @@ -28,10 +28,6 @@ void RelayServer::run() { runReqMonitor(thr); }); - tpYesstr.init("Yesstr", cfg().relay__numThreads__yesstr, [this](auto &thr){ - runYesstr(thr); - }); - cronThread = std::thread([this]{ runCron(); }); diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index 778c494..32a673f 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -1,9 +1,6 @@ #include #include -#include -#include - #include "golpe.h" #include "WriterPipeline.h" @@ -12,7 +9,6 @@ #include "DBQuery.h" #include "filters.h" #include "events.h" -#include "yesstr.h" static const char USAGE[] = @@ -26,95 +22,6 @@ R"( )"; -struct SyncController { - quadrable::Quadrable *qdb; - WSConnection *ws; - - quadrable::Quadrable::Sync sync; - quadrable::MemStore m; - - uint64_t ourNodeId = 0; - quadrable::SyncRequests reqs; - bool sentFirstReq = false; - - SyncController(quadrable::Quadrable *qdb_, WSConnection *ws_) : qdb(qdb_), ws(ws_), sync(qdb_) { } - - void init(lmdb::txn &txn) { - qdb->withMemStore(m, [&]{ - qdb->writeToMemStore = true; - ourNodeId = qdb->getHeadNodeId(txn); - sync.init(txn, ourNodeId); - }); - } - - bool sendRequests(lmdb::txn &txn, const std::string &filterStr) { - qdb->withMemStore(m, [&]{ - qdb->writeToMemStore = true; - reqs = sync.getReqs(txn, 10'000); - }); - - if (reqs.size() == 0) return false; - - std::string reqsEncoded = quadrable::transport::encodeSyncRequests(reqs); - - flatbuffers::FlatBufferBuilder builder; - - auto reqOffset = Yesstr::CreateRequest(builder, - 123, - Yesstr::RequestPayload::RequestPayload_RequestSync, - Yesstr::CreateRequestSync(builder, - (filterStr.size() && !sentFirstReq) ? builder.CreateString(filterStr) : 0, - builder.CreateVector((uint8_t*)reqsEncoded.data(), reqsEncoded.size()) - ).Union() - ); - - builder.Finish(reqOffset); - - std::string reqMsg = std::string("Y") + std::string(reinterpret_cast(builder.GetBufferPointer()), builder.GetSize()); - size_t compressedSize; - ws->send(reqMsg, uWS::OpCode::BINARY, &compressedSize); - LI << "SEND size=" << reqMsg.size() << " compressed=" << compressedSize; - - sentFirstReq = true; - - return true; - } - - void handleResponses(lmdb::txn &txn, std::string_view msg) { - verifyYesstrResponse(msg); - const auto *resp = parseYesstrResponse(msg); - const auto *respSync = resp->payload_as_ResponseSync(); - - auto resps = quadrable::transport::decodeSyncResponses(sv(respSync->respsEncoded())); - - qdb->withMemStore(m, [&]{ - qdb->writeToMemStore = true; - sync.addResps(txn, reqs, resps); - }); - } - - void finish(lmdb::txn &txn, std::function onNewLeaf, std::function onMissingLeaf) { - qdb->withMemStore(m, [&]{ - qdb->writeToMemStore = true; - - sync.diff(txn, ourNodeId, sync.nodeIdShadow, [&](auto dt, const auto &node){ - if (dt == quadrable::Quadrable::DiffType::Added) { - // node exists only on the provider-side - LI << "NEW LEAF: " << node.leafVal(); - onNewLeaf(node.leafVal()); - } else if (dt == quadrable::Quadrable::DiffType::Deleted) { - // node exists only on the syncer-side - LI << "MISSING LEAF: " << node.leafVal(); - onMissingLeaf(node.leafVal()); - } else if (dt == quadrable::Quadrable::DiffType::Changed) { - // nodes differ. node is the one on the provider-side - } - }); - }); - } -}; - - void cmd_sync(const std::vector &subArgs) { std::map args = docopt::docopt(USAGE, subArgs, true, ""); @@ -130,106 +37,5 @@ void cmd_sync(const std::vector &subArgs) { if (dir != "down") throw herr("only down currently supported"); // FIXME - std::unique_ptr controller; - WriterPipeline writer; - WSConnection ws(url); - auto qdb = getQdbInstance(); - - - ws.reconnect = false; - - - if (filterStr.size()) { - std::vector levIds; - - tao::json::value filterJson; - - try { - filterJson = tao::json::from_string(filterStr); - } catch (std::exception &e) { - LE << "Couldn't parse filter JSON: " << filterStr; - ::exit(1); - } - - DBQuery query(filterJson); - - auto txn = env.txn_ro(); - - while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ - levIds.push_back(levId); - }); - - if (complete) break; - } - - LI << "Filter matched " << levIds.size() << " local events"; - - controller = std::make_unique(&qdb, &ws); - - qdb.withMemStore(controller->m, [&]{ - qdb.writeToMemStore = true; - qdb.checkout(); - - auto changes = qdb.change(); - - for (auto levId : levIds) { - changes.putReuse(txn, levId); - } - - changes.apply(txn); - }); - - controller->init(txn); - } else { - auto txn = env.txn_ro(); - - controller = std::make_unique(&qdb, &ws); - controller->init(txn); - } - - - - ws.onConnect = [&]{ - auto txn = env.txn_ro(); - - controller->sendRequests(txn, filterStr); - }; - - ws.onMessage = [&](auto msg, uWS::OpCode opCode, size_t compressedSize){ - auto txn = env.txn_ro(); - - if (!controller) { - LW << "No sync active, ignoring message"; - return; - } - - if (opCode == uWS::OpCode::TEXT) { - LW << "Unexpected non-yesstr message from relay: " << msg; - return; - } - - LI << "RECV size=" << msg.size() << " compressed=" << compressedSize; - controller->handleResponses(txn, msg); - - if (!controller->sendRequests(txn, filterStr)) { - LI << "Syncing done, writing/sending events"; - controller->finish(txn, - [&](std::string_view newLeaf){ - // FIXME: relay could crash client here by sending invalid JSON - writer.inbox.push_move(WriterPipelineInput{ tao::json::from_string(std::string(newLeaf)), EventSourceType::Sync, url }); - }, - [&](std::string_view){ - } - ); - - writer.flush(); - ::exit(0); - } - }; - - - - - ws.run(); + throw herr("sync is temporarily not implemented"); } diff --git a/src/constants.h b/src/constants.h index b331055..2a4782d 100644 --- a/src/constants.h +++ b/src/constants.h @@ -2,5 +2,4 @@ const uint64_t CURR_DB_VERSION = 1; const size_t MAX_SUBID_SIZE = 71; // Statically allocated size in SubId -const uint64_t MAX_TIMESTAMP = 17179869184; // Safety limit to ensure it can fit in quadrable key. Good until year 2514. const size_t MAX_INDEXED_TAG_VAL_SIZE = 255; diff --git a/src/events.cpp b/src/events.cpp index 82b5d00..6a60068 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -140,7 +140,7 @@ void verifyEventTimestamp(const NostrIndex::Event *flat) { uint64_t latest = now + cfg().events__rejectEventsNewerThanSeconds; if (ts < earliest) throw herr("created_at too early"); - if (ts > latest || ts > MAX_TIMESTAMP) throw herr("created_at too late"); + if (ts > latest) throw herr("created_at too late"); if (flat->expiration() != 0 && flat->expiration() <= now) throw herr("event expired"); } @@ -243,18 +243,21 @@ std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t lev -void deleteEvent(lmdb::txn &txn, quadrable::Quadrable::UpdateSet &changes, defaultDb::environment::View_Event &ev) { - changes.del(flatEventToQuadrableKey(ev.flat_nested())); +void deleteEvent(lmdb::txn &txn, defaultDb::environment::View_Event &ev) { env.dbi_EventPayload.del(txn, lmdb::to_sv(ev.primaryKeyId)); env.delete_Event(txn, ev.primaryKeyId); } -void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector &evs, uint64_t logLevel) { - std::sort(evs.begin(), evs.end(), [](auto &a, auto &b) { return a.quadKey < b.quadKey; }); +void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLevel) { + std::sort(evs.begin(), evs.end(), [](auto &a, auto &b) { + auto aC = a.createdAt(); + auto bC = b.createdAt(); + if (aC == bC) return a.id() < b.id(); + return aC < bC; + }); - auto changes = qdb.change(); std::string tmpBuf; for (size_t i = 0; i < evs.size(); i++) { @@ -262,7 +265,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector(ev.flatStr.data()); - if (lookupEventById(txn, sv(flat->id())) || (i != 0 && ev.quadKey == evs[i-1].quadKey)) { + if (lookupEventById(txn, sv(flat->id())) || (i != 0 && ev.id() == evs[i-1].id())) { ev.status = EventWriteStatus::Duplicate; continue; } @@ -293,7 +296,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vectorcreated_at() < flat->created_at()) { if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv.flat_nested()->id())); - deleteEvent(txn, changes, otherEv); + deleteEvent(txn, otherEv); } else { ev.status = EventWriteStatus::Replaced; } @@ -311,7 +314,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vectorval())); if (otherEv && sv(otherEv->flat_nested()->pubkey()) == sv(flat->pubkey())) { if (logLevel >= 1) LI << "Deleting event (kind 5). id=" << to_hex(sv(tagPair->val())); - deleteEvent(txn, changes, *otherEv); + deleteEvent(txn, *otherEv); } } } @@ -325,11 +328,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector(ev.levId), tmpBuf); - changes.put(ev.quadKey, ""); - ev.status = EventWriteStatus::Written; } } - - changes.apply(txn); } diff --git a/src/events.h b/src/events.h index 415aad9..50397cb 100644 --- a/src/events.h +++ b/src/events.h @@ -52,12 +52,6 @@ std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::s std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId); std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId, std::string_view eventPayload); -inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) { - uint64_t timestamp = flat->created_at(); - if (timestamp > MAX_TIMESTAMP) throw herr("timestamp is too large to encode in quadrable key"); - return quadrable::Key::fromIntegerAndHash(timestamp, sv(flat->id()).substr(0, 27)); -} - @@ -97,18 +91,25 @@ struct EventToWrite { EventSourceType sourceType; std::string sourceInfo; void *userData = nullptr; - quadrable::Key quadKey; EventWriteStatus status = EventWriteStatus::Pending; uint64_t levId = 0; EventToWrite() {} EventToWrite(std::string flatStr, std::string jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string sourceInfo, void *userData = nullptr) : flatStr(flatStr), jsonStr(jsonStr), receivedAt(receivedAt), sourceType(sourceType), sourceInfo(sourceInfo), userData(userData) { + } + + std::string_view id() { const NostrIndex::Event *flat = flatbuffers::GetRoot(flatStr.data()); - quadKey = flatEventToQuadrableKey(flat); + return sv(flat->id()); + } + + uint64_t createdAt() { + const NostrIndex::Event *flat = flatbuffers::GetRoot(flatStr.data()); + return flat->created_at(); } }; -void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector &evs, uint64_t logLevel = 1); -void deleteEvent(lmdb::txn &txn, quadrable::Quadrable::UpdateSet &changes, defaultDb::environment::View_Event &ev); +void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLevel = 1); +void deleteEvent(lmdb::txn &txn, defaultDb::environment::View_Event &ev); diff --git a/src/gc.h b/src/gc.h deleted file mode 100644 index 90da437..0000000 --- a/src/gc.h +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#include "golpe.h" - - -inline void quadrableGarbageCollect(quadrable::Quadrable &qdb, int logLevel = 0) { - quadrable::Quadrable::GarbageCollector> gc(qdb); - quadrable::Quadrable::GCStats stats; - - if (logLevel >= 2) LI << "Running garbage collection"; - - { - auto txn = env.txn_ro(); - - if (logLevel >= 2) LI << "GC: mark phase"; - gc.markAllHeads(txn); - if (logLevel >= 2) LI << "GC: sweep phase"; - stats = gc.sweep(txn); - } - - if (logLevel >= 2) { - LI << "GC: Total nodes: " << stats.total; - LI << "GC: Garbage nodes: " << stats.garbage << " (" << renderPercent((double)stats.garbage / stats.total) << ")"; - } - - if (stats.garbage) { - auto txn = env.txn_rw(); - if (logLevel >= 1) LI << "GC: deleting " << stats.garbage << " garbage nodes"; - gc.deleteNodes(txn); - txn.commit(); - } - -} diff --git a/src/global.h b/src/global.h index 5272cbb..dfd6b1e 100644 --- a/src/global.h +++ b/src/global.h @@ -6,12 +6,6 @@ using namespace phmap; -#include - -quadrable::Quadrable getQdbInstance(lmdb::txn &txn); -quadrable::Quadrable getQdbInstance(); - - #include "constants.h" diff --git a/src/onAppStartup.cpp b/src/onAppStartup.cpp index 4d43334..55fcef4 100644 --- a/src/onAppStartup.cpp +++ b/src/onAppStartup.cpp @@ -66,23 +66,8 @@ static void setRLimits() { } - -quadrable::Quadrable getQdbInstance(lmdb::txn &txn) { - quadrable::Quadrable qdb; - qdb.init(txn); - qdb.checkout("events"); - return qdb; -} - -quadrable::Quadrable getQdbInstance() { - auto txn = env.txn_ro(); - return getQdbInstance(txn); -} - void onAppStartup(lmdb::txn &txn, const std::string &cmd) { dbCheck(txn, cmd); setRLimits(); - - (void)getQdbInstance(txn); } diff --git a/src/yesstr.h b/src/yesstr.h deleted file mode 100644 index 72570a1..0000000 --- a/src/yesstr.h +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include "golpe.h" - - -inline void verifyYesstrRequest(std::string_view msg) { - if (!msg.starts_with("Y")) throw herr("invalid yesstr magic char"); - msg = msg.substr(1); - auto verifier = flatbuffers::Verifier(reinterpret_cast(msg.data()), msg.size()); - bool ok = verifier.VerifyBuffer(nullptr); - if (!ok) throw herr("yesstr request verification failed"); -} - -inline void verifyYesstrResponse(std::string_view msg) { - if (!msg.starts_with("Y")) throw herr("invalid yesstr magic char"); - msg = msg.substr(1); - auto verifier = flatbuffers::Verifier(reinterpret_cast(msg.data()), msg.size()); - bool ok = verifier.VerifyBuffer(nullptr); - if (!ok) throw herr("yesstr response verification failed"); -} - - -inline const Yesstr::Request *parseYesstrRequest(std::string_view msg) { - return flatbuffers::GetRoot(msg.substr(1).data()); -} - -inline const Yesstr::Response *parseYesstrResponse(std::string_view msg) { - return flatbuffers::GetRoot(msg.substr(1).data()); -} diff --git a/strfry.conf b/strfry.conf index 0b909e7..6b304ec 100644 --- a/strfry.conf +++ b/strfry.conf @@ -40,7 +40,7 @@ relay { contact = "unset" } - # Maximum accepted incoming websocket frame size (should be larger than max event and yesstr msg) (restart required) + # Maximum accepted incoming websocket frame size (should be larger than max event) (restart required) maxWebsocketPayloadSize = 131072 # Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required) @@ -97,9 +97,6 @@ relay { # reqMonitor threads: Handle filtering of new events (restart required) reqMonitor = 3 - - # yesstr threads: Experimental yesstr protocol (restart required) - yesstr = 1 } }