simple sorting implementation, for benchmarking

This commit is contained in:
Doug Hoyte
2023-02-21 14:48:44 -05:00
parent e7706be1b6
commit d6151072fe

View File

@ -206,7 +206,7 @@ struct DBScan {
} }
// If scan is complete, returns true // If scan is complete, returns true
bool scan(lmdb::txn &txn, std::function<void(uint64_t)> handleEvent, std::function<bool()> doPause) { bool scan(lmdb::txn &txn, std::function<void(uint64_t, uint64_t)> handleEvent, std::function<bool()> doPause) {
while (remainingLimit && !isComplete()) { while (remainingLimit && !isComplete()) {
if (resumeKey == "") resetResume(); if (resumeKey == "") resetResume();
@ -251,14 +251,14 @@ struct DBScan {
// Don't attempt to match filter // Don't attempt to match filter
} else if (f.indexOnlyScans) { } else if (f.indexOnlyScans) {
if (f.doesMatchTimes(created)) { if (f.doesMatchTimes(created)) {
handleEvent(levId); handleEvent(levId, created);
sent = true; sent = true;
} }
} else { } else {
auto view = env.lookup_Event(txn, levId); auto view = env.lookup_Event(txn, levId);
if (!view) throw herr("missing event from index, corrupt DB?"); if (!view) throw herr("missing event from index, corrupt DB?");
if (f.doesMatch(view->flat_nested())) { if (f.doesMatch(view->flat_nested())) {
handleEvent(levId); handleEvent(levId, created);
sent = true; sent = true;
} }
} }
@ -274,6 +274,7 @@ struct DBScan {
if (pause) return false; if (pause) return false;
if (!skipBack) { if (!skipBack) {
remainingLimit = f.limit;
nextFilterItem(); nextFilterItem();
resumeKey = ""; resumeKey = "";
} }
@ -290,8 +291,14 @@ struct DBScan {
struct DBScanQuery : NonCopyable { struct DBScanQuery : NonCopyable {
struct PendingRecord {
uint64_t levId;
uint64_t created;
};
Subscription sub; Subscription sub;
std::unique_ptr<DBScan> scanner; std::unique_ptr<DBScan> scanner;
std::vector<PendingRecord> pending;
size_t filterGroupIndex = 0; size_t filterGroupIndex = 0;
bool dead = false; bool dead = false;
@ -313,7 +320,7 @@ struct DBScanQuery : NonCopyable {
while (filterGroupIndex < sub.filterGroup.size()) { while (filterGroupIndex < sub.filterGroup.size()) {
if (!scanner) scanner = std::make_unique<DBScan>(sub.filterGroup.filters[filterGroupIndex]); if (!scanner) scanner = std::make_unique<DBScan>(sub.filterGroup.filters[filterGroupIndex]);
bool complete = scanner->scan(txn, [&](uint64_t levId){ bool complete = scanner->scan(txn, [&](uint64_t levId, uint64_t created){
// If this event came in after our query began, don't send it. It will be sent after the EOSE. // If this event came in after our query began, don't send it. It will be sent after the EOSE.
if (levId > sub.latestEventId) return; if (levId > sub.latestEventId) return;
@ -322,7 +329,7 @@ struct DBScanQuery : NonCopyable {
alreadySentEvents.insert(levId); alreadySentEvents.insert(levId);
currScanRecordsFound++; currScanRecordsFound++;
cb(sub, levId); pending.emplace_back(levId, created);
}, [&]{ }, [&]{
currScanRecordsTraversed++; currScanRecordsTraversed++;
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;
@ -335,6 +342,20 @@ struct DBScanQuery : NonCopyable {
return false; return false;
} }
std::sort(pending.begin(), pending.end(), [](auto &a, auto &b){
return a.created == b.created ? a.levId > b.levId : a.created > b.created;
});
uint64_t limit = sub.filterGroup.filters[filterGroupIndex].limit;
if (pending.size() > limit) pending.resize(limit);
for (const auto &p : pending) {
cb(sub, p.levId);
}
totalScanTime += currScanTime; totalScanTime += currScanTime;
if (logMetrics) { if (logMetrics) {
@ -366,6 +387,7 @@ struct DBScanQuery : NonCopyable {
filterGroupIndex++; filterGroupIndex++;
scanner.reset(); scanner.reset();
pending.clear();
currScanTime = 0; currScanTime = 0;
currScanSaveRestores = 0; currScanSaveRestores = 0;
currScanRecordsTraversed = 0; currScanRecordsTraversed = 0;