From 206b14a473d350ef836fcccbb0180f0cf9aea6aa Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Sat, 29 Jul 2023 01:14:38 -0400 Subject: [PATCH] sync optimisations, DBQuery no longer loads eventPayload - It is now up to the caller to do so - QueryScheduler now can optionally not bother to ensure that the events are fresh --- src/DBQuery.h | 27 +++++++++------------------ src/QueryScheduler.h | 19 ++++++++++++++++--- src/apps/dbutils/cmd_delete.cpp | 2 +- src/apps/dbutils/cmd_dict.cpp | 2 +- src/apps/dbutils/cmd_scan.cpp | 4 ++-- src/apps/mesh/cmd_sync.cpp | 14 ++++++++++---- src/apps/relay/RelayNegentropy.cpp | 23 +++++++++++++++++++---- src/apps/relay/RelayReqWorker.cpp | 3 +-- 8 files changed, 59 insertions(+), 35 deletions(-) diff --git a/src/DBQuery.h b/src/DBQuery.h index 2f2475d..9ca6b7e 100644 --- a/src/DBQuery.h +++ b/src/DBQuery.h @@ -234,13 +234,11 @@ struct DBScan : NonCopyable { refillScanDepth = 10 * initialScanDepth; } - bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { + bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { auto cmp = [](auto &a, auto &b){ return a.created() == b.created() ? a.levId() > b.levId() : a.created() > b.created(); }; - auto eventPayloadCursor = lmdb::cursor::open(txn, env.dbi_EventPayload); - while (1) { approxWork++; if (doPause(approxWork)) return false; @@ -262,23 +260,16 @@ struct DBScan : NonCopyable { eventQueue.pop_front(); bool doSend = false; uint64_t levId = ev.levId(); - std::string_view eventPayload; - - auto loadEventPayload = [&]{ - std::string_view key = lmdb::to_sv(levId); - return eventPayloadCursor.get(key, eventPayload, MDB_SET_KEY); // If not found, was deleted while scan was paused - }; if (indexOnly) { if (f.doesMatchTimes(ev.created())) doSend = true; - if (!loadEventPayload()) doSend = false; - } else if (loadEventPayload()) { + } else { approxWork += 10; if (f.doesMatch(lookupEventByLevId(txn, levId).flat_nested())) doSend = true; } if (doSend) { - if (handleEvent(levId, eventPayload)) return true; + if (handleEvent(levId)) return true; } cursors[ev.scanIndex()].outstanding--; @@ -315,7 +306,7 @@ struct DBQuery : NonCopyable { DBQuery(const tao::json::value &filter, uint64_t maxLimit = MAX_U64) : sub(Subscription(1, ".", NostrFilterGroup::unwrapped(filter, maxLimit))) {} // If scan is complete, returns true - bool process(lmdb::txn &txn, std::function cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) { + bool process(lmdb::txn &txn, std::function cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) { while (filterGroupIndex < sub.filterGroup.size()) { const auto &f = sub.filterGroup.filters[filterGroupIndex]; @@ -323,7 +314,7 @@ struct DBQuery : NonCopyable { uint64_t startTime = hoytech::curr_time_us(); - bool complete = scanner->scan(txn, [&](uint64_t levId, std::string_view eventPayload){ + bool complete = scanner->scan(txn, [&](uint64_t levId){ if (f.limit == 0) return true; // If this event came in after our query began, don't send it. It will be sent after the EOSE. @@ -331,7 +322,7 @@ struct DBQuery : NonCopyable { if (sentEventsFull.find(levId) == sentEventsFull.end()) { sentEventsFull.insert(levId); - cb(sub, levId, eventPayload); + cb(sub, levId); } sentEventsCurr.insert(levId); @@ -386,10 +377,10 @@ struct DBQuery : NonCopyable { }; -inline void foreachByFilter(lmdb::txn &txn, const tao::json::value &filter, std::function cb) { +inline void foreachByFilter(lmdb::txn &txn, const tao::json::value &filter, std::function cb) { DBQuery query(filter); - query.process(txn, [&](const auto &, uint64_t levId, std::string_view eventPayload){ - cb(levId, eventPayload); + query.process(txn, [&](const auto &, uint64_t levId){ + cb(levId); }); } diff --git a/src/QueryScheduler.h b/src/QueryScheduler.h index be2269d..1343c66 100644 --- a/src/QueryScheduler.h +++ b/src/QueryScheduler.h @@ -6,7 +6,11 @@ struct QueryScheduler : NonCopyable { std::function onEvent; std::function &levIds)> onEventBatch; - std::function onComplete; + std::function onComplete; + + // If false, then levIds returned to above callbacks can be stale (because they were deleted) + // If false, then onEvent's eventPayload will always be "" + bool ensureExists = true; using ConnQueries = flat_hash_map; flat_hash_map conns; // connId -> subId -> DBQuery* @@ -74,7 +78,16 @@ struct QueryScheduler : NonCopyable { return; } - bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ + auto eventPayloadCursor = lmdb::cursor::open(txn, env.dbi_EventPayload); + + bool complete = q->process(txn, [&](const auto &sub, uint64_t levId){ + std::string_view eventPayload; + + if (ensureExists) { + std::string_view key = lmdb::to_sv(levId); + if (!eventPayloadCursor.get(key, eventPayload, MDB_SET_KEY)) return; // If not found, was deleted while scan was paused + } + if (onEvent) onEvent(txn, sub, levId, eventPayload); if (onEventBatch) levIdBatch.push_back(levId); }, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf); @@ -88,7 +101,7 @@ struct QueryScheduler : NonCopyable { auto connId = q->sub.connId; removeSub(connId, q->sub.subId); - if (onComplete) onComplete(q->sub); + if (onComplete) onComplete(txn, q->sub); delete q; } else { diff --git a/src/apps/dbutils/cmd_delete.cpp b/src/apps/dbutils/cmd_delete.cpp index b3d8363..e95a8fb 100644 --- a/src/apps/dbutils/cmd_delete.cpp +++ b/src/apps/dbutils/cmd_delete.cpp @@ -50,7 +50,7 @@ void cmd_delete(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ levIds.insert(levId); }); diff --git a/src/apps/dbutils/cmd_dict.cpp b/src/apps/dbutils/cmd_dict.cpp index 15dc652..a0fcf0b 100644 --- a/src/apps/dbutils/cmd_dict.cpp +++ b/src/apps/dbutils/cmd_dict.cpp @@ -50,7 +50,7 @@ void cmd_dict(const std::vector &subArgs) { DBQuery query(tao::json::from_string(filterStr)); while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ levIds.push_back(levId); }); diff --git a/src/apps/dbutils/cmd_scan.cpp b/src/apps/dbutils/cmd_scan.cpp index 713c29d..66a2ccd 100644 --- a/src/apps/dbutils/cmd_scan.cpp +++ b/src/apps/dbutils/cmd_scan.cpp @@ -37,9 +37,9 @@ void cmd_scan(const std::vector &subArgs) { exitOnSigPipe(); while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ if (count) numEvents++; - else std::cout << getEventJson(txn, decomp, levId, eventPayload) << "\n"; + else std::cout << getEventJson(txn, decomp, levId) << "\n"; }, pause ? pause : MAX_U64, metrics); if (complete) break; diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 69b2d90..fdc4adc 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -57,18 +57,24 @@ void cmd_sync(const std::vector &subArgs) { auto txn = env.txn_ro(); uint64_t numEvents = 0; + std::vector levIds; while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ - auto ev = lookupEventByLevId(txn, levId); - ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize)); - + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + levIds.push_back(levId); numEvents++; }); if (complete) break; } + std::sort(levIds.begin(), levIds.end()); + + for (auto levId : levIds) { + auto ev = lookupEventByLevId(txn, levId); + ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize)); + } + LI << "Filter matches " << numEvents << " events"; } diff --git a/src/apps/relay/RelayNegentropy.cpp b/src/apps/relay/RelayNegentropy.cpp index 2a58410..bf0465c 100644 --- a/src/apps/relay/RelayNegentropy.cpp +++ b/src/apps/relay/RelayNegentropy.cpp @@ -1,7 +1,6 @@ #include #include "RelayServer.h" -#include "DBQuery.h" #include "QueryScheduler.h" @@ -9,6 +8,7 @@ struct NegentropyViews { struct UserView { Negentropy ne; std::string initialMsg; + std::vector levIds; uint64_t startTime = hoytech::curr_time_us(); }; @@ -63,17 +63,18 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { QueryScheduler queries; NegentropyViews views; + queries.ensureExists = false; + queries.onEventBatch = [&](lmdb::txn &txn, const auto &sub, const std::vector &levIds){ auto *view = views.findView(sub.connId, sub.subId); if (!view) return; for (auto levId : levIds) { - auto ev = lookupEventByLevId(txn, levId); - view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize)); + view->levIds.push_back(levId); } }; - queries.onComplete = [&](Subscription &sub){ + queries.onComplete = [&](lmdb::txn &txn, Subscription &sub){ auto *view = views.findView(sub.connId, sub.subId); if (!view) return; @@ -94,6 +95,20 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { return; } + std::sort(view->levIds.begin(), view->levIds.end()); + + for (auto levId : view->levIds) { + try { + auto ev = lookupEventByLevId(txn, levId); + view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize)); + } catch (std::exception &) { + // levId was deleted when query was paused + } + } + + view->levIds.clear(); + view->levIds.shrink_to_fit(); + view->ne.seal(); auto resp = view->ne.reconcile(view->initialMsg); diff --git a/src/apps/relay/RelayReqWorker.cpp b/src/apps/relay/RelayReqWorker.cpp index 972f272..92ec2f3 100644 --- a/src/apps/relay/RelayReqWorker.cpp +++ b/src/apps/relay/RelayReqWorker.cpp @@ -1,5 +1,4 @@ #include "RelayServer.h" -#include "DBQuery.h" #include "QueryScheduler.h" @@ -11,7 +10,7 @@ void RelayServer::runReqWorker(ThreadPool::Thread &thr) { sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr)); }; - queries.onComplete = [&](Subscription &sub){ + queries.onComplete = [&](lmdb::txn &, Subscription &sub){ sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ "EOSE", sub.subId.str() }))); tpReqMonitor.dispatch(sub.connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(sub)}}); };