From d6151072fe026a3e2ca163b79d013c5a83354b8a Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Tue, 21 Feb 2023 14:48:44 -0500 Subject: [PATCH] simple sorting implementation, for benchmarking --- src/DBScan.h | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/DBScan.h b/src/DBScan.h index b463bd5..9bfe11f 100644 --- a/src/DBScan.h +++ b/src/DBScan.h @@ -206,7 +206,7 @@ struct DBScan { } // If scan is complete, returns true - bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { + bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { while (remainingLimit && !isComplete()) { if (resumeKey == "") resetResume(); @@ -251,14 +251,14 @@ struct DBScan { // Don't attempt to match filter } else if (f.indexOnlyScans) { if (f.doesMatchTimes(created)) { - handleEvent(levId); + handleEvent(levId, created); sent = true; } } else { auto view = env.lookup_Event(txn, levId); if (!view) throw herr("missing event from index, corrupt DB?"); if (f.doesMatch(view->flat_nested())) { - handleEvent(levId); + handleEvent(levId, created); sent = true; } } @@ -274,6 +274,7 @@ struct DBScan { if (pause) return false; if (!skipBack) { + remainingLimit = f.limit; nextFilterItem(); resumeKey = ""; } @@ -290,8 +291,14 @@ struct DBScan { struct DBScanQuery : NonCopyable { + struct PendingRecord { + uint64_t levId; + uint64_t created; + }; + Subscription sub; std::unique_ptr scanner; + std::vector pending; size_t filterGroupIndex = 0; bool dead = false; @@ -313,7 +320,7 @@ struct DBScanQuery : NonCopyable { while (filterGroupIndex < sub.filterGroup.size()) { if (!scanner) scanner = std::make_unique(sub.filterGroup.filters[filterGroupIndex]); - bool complete = scanner->scan(txn, [&](uint64_t levId){ + bool complete = scanner->scan(txn, [&](uint64_t levId, uint64_t created){ // If this event came in after our query began, don't send it. It will be sent after the EOSE. if (levId > sub.latestEventId) return; @@ -322,7 +329,7 @@ struct DBScanQuery : NonCopyable { alreadySentEvents.insert(levId); currScanRecordsFound++; - cb(sub, levId); + pending.emplace_back(levId, created); }, [&]{ currScanRecordsTraversed++; return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; @@ -335,6 +342,20 @@ struct DBScanQuery : NonCopyable { return false; } + + std::sort(pending.begin(), pending.end(), [](auto &a, auto &b){ + return a.created == b.created ? a.levId > b.levId : a.created > b.created; + }); + + + uint64_t limit = sub.filterGroup.filters[filterGroupIndex].limit; + if (pending.size() > limit) pending.resize(limit); + + for (const auto &p : pending) { + cb(sub, p.levId); + } + + totalScanTime += currScanTime; if (logMetrics) { @@ -366,6 +387,7 @@ struct DBScanQuery : NonCopyable { filterGroupIndex++; scanner.reset(); + pending.clear(); currScanTime = 0; currScanSaveRestores = 0; currScanRecordsTraversed = 0;