diff --git a/src/DBQuery.h b/src/DBQuery.h new file mode 100644 index 0000000..0083fcf --- /dev/null +++ b/src/DBQuery.h @@ -0,0 +1,375 @@ +#pragma once + +#include "golpe.h" + +#include "Subscription.h" +#include "filters.h" + + +struct DBScan : NonCopyable { + struct CandidateEvent { + private: + uint64_t packed; + uint64_t levIdStorage; + + public: + CandidateEvent(uint64_t levId, uint64_t created, uint64_t scanIndex) : packed(scanIndex << 40 | created), levIdStorage(levId) {} + + uint64_t levId() { return levIdStorage; } + uint64_t created() { return packed & 0xFF'FFFFFFFF; } + uint64_t scanIndex() { return packed >> 40; } + }; + + enum class KeyMatchResult { + Yes, + No, + NoButContinue, + }; + + struct ScanCursor { + std::string resumeKey; + uint64_t resumeVal; + std::function keyMatch; + uint64_t outstanding = 0; // number of records remaining in eventQueue, decremented in DBScan::scan + + bool active() { + return resumeKey.size() > 0; + } + + uint64_t collect(lmdb::txn &txn, DBScan &s, uint64_t scanIndex, uint64_t limit, std::deque &output) { + uint64_t added = 0; + + while (active() && limit > 0) { + bool finished = env.generic_foreachFull(txn, s.indexDbi, resumeKey, lmdb::to_sv(resumeVal), [&](auto k, auto v) { + if (limit == 0) { + resumeKey = std::string(k); + resumeVal = lmdb::from_sv(v); + return false; + } + + auto matched = keyMatch(k); + if (matched == KeyMatchResult::No) { + resumeKey = ""; + return false; + } + + uint64_t created; + + { + ParsedKey_StringUint64 parsedKey(k); + created = parsedKey.n; + + if (s.f.since && created < s.f.since) { + resumeKey = makeKey_StringUint64(parsedKey.s, 0); + resumeVal = 0; + return false; + } + + if (s.f.until && created > s.f.until) { + resumeKey = makeKey_StringUint64(parsedKey.s, s.f.until); + resumeVal = MAX_U64; + return false; + } + } + + if (matched == KeyMatchResult::Yes) { + uint64_t levId = lmdb::from_sv(v); + output.emplace_back(levId, created, scanIndex); + added++; + limit--; + } + + return true; + }, true); + + if (finished) resumeKey = ""; + } + + outstanding += added; + return added; + } + }; + + const NostrFilter &f; + bool indexOnly; + lmdb::dbi indexDbi; + const char *desc = "?"; + std::vector cursors; + std::deque eventQueue; // sorted descending by created + uint64_t initialScanDepth; + uint64_t refillScanDepth; + uint64_t nextInitIndex = 0; + uint64_t approxWork = 0; + + DBScan(const NostrFilter &f) : f(f) { + indexOnly = f.indexOnlyScans; + + if (f.ids) { + indexDbi = env.dbi_Event__id; + desc = "ID"; + + cursors.reserve(f.ids->size()); + for (uint64_t i = 0; i < f.ids->size(); i++) { + std::string prefix = f.ids->at(i); + + cursors.emplace_back( + padBytes(prefix, 32 + 8, '\xFF'), + MAX_U64, + [prefix](std::string_view k){ + return k.starts_with(prefix) ? KeyMatchResult::Yes : KeyMatchResult::No; + } + ); + } + } else if (f.tags.size()) { + indexDbi = env.dbi_Event__tag; + desc = "Tag"; + + char tagName = '\0'; + { + uint64_t numTags = MAX_U64; + for (const auto &[tn, filterSet] : f.tags) { + if (filterSet.size() < numTags) { + numTags = filterSet.size(); + tagName = tn; + } + } + } + + const auto &filterSet = f.tags.at(tagName); + + cursors.reserve(filterSet.size()); + for (uint64_t i = 0; i < filterSet.size(); i++) { + std::string search; + search += tagName; + search += filterSet.at(i); + + cursors.emplace_back( + search + std::string(8, '\xFF'), + MAX_U64, + [search](std::string_view k){ + return k.size() == search.size() + 8 && k.starts_with(search) ? KeyMatchResult::Yes : KeyMatchResult::No; + } + ); + } + } else if (f.authors && f.kinds && f.authors->size() * f.kinds->size() < 1'000) { + indexDbi = env.dbi_Event__pubkeyKind; + desc = "PubkeyKind"; + + cursors.reserve(f.authors->size() * f.kinds->size()); + for (uint64_t i = 0; i < f.authors->size(); i++) { + for (uint64_t j = 0; j < f.kinds->size(); j++) { + uint64_t kind = f.kinds->at(j); + + std::string prefix = f.authors->at(i); + if (prefix.size() == 32) prefix += lmdb::to_sv(kind); + + cursors.emplace_back( + padBytes(prefix, 32 + 8 + 8, '\xFF'), + MAX_U64, + [prefix, kind](std::string_view k){ + if (!k.starts_with(prefix)) return KeyMatchResult::No; + if (prefix.size() == 32 + 8) return KeyMatchResult::Yes; + + ParsedKey_StringUint64Uint64 parsedKey(k); + if (parsedKey.n1 == kind) return KeyMatchResult::Yes; + + // With a prefix pubkey, continue scanning (pubkey,kind) backwards because with this index + // we don't know the next pubkey to jump back to + return KeyMatchResult::NoButContinue; + } + ); + } + } + } else if (f.authors) { + if (f.kinds) indexOnly = false; // because of the size limit in previous test + + indexDbi = env.dbi_Event__pubkey; + desc = "Pubkey"; + + cursors.reserve(f.authors->size()); + for (uint64_t i = 0; i < f.authors->size(); i++) { + std::string prefix = f.authors->at(i); + + cursors.emplace_back( + padBytes(prefix, 32 + 8, '\xFF'), + MAX_U64, + [prefix](std::string_view k){ + return k.starts_with(prefix) ? KeyMatchResult::Yes : KeyMatchResult::No; + } + ); + } + } else if (f.kinds) { + indexDbi = env.dbi_Event__kind; + desc = "Kind"; + + cursors.reserve(f.kinds->size()); + for (uint64_t i = 0; i < f.kinds->size(); i++) { + uint64_t kind = f.kinds->at(i); + + cursors.emplace_back( + std::string(lmdb::to_sv(kind)) + std::string(8, '\xFF'), + MAX_U64, + [kind](std::string_view k){ + ParsedKey_Uint64Uint64 parsedKey(k); + return parsedKey.n1 == kind ? KeyMatchResult::Yes : KeyMatchResult::No; + } + ); + } + } else { + indexDbi = env.dbi_Event__created_at; + desc = "CreatedAt"; + + cursors.reserve(1); + cursors.emplace_back( + std::string(8, '\xFF'), + MAX_U64, + [](std::string_view){ + return KeyMatchResult::Yes; + } + ); + } + + initialScanDepth = std::clamp(f.limit / cursors.size(), uint64_t(5), uint64_t(50)); + refillScanDepth = 10 * initialScanDepth; + } + + bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { + auto cmp = [](auto &a, auto &b){ + return a.created() == b.created() ? a.levId() > b.levId() : a.created() > b.created(); + }; + + while (1) { + approxWork++; + if (doPause(approxWork)) return false; + + if (nextInitIndex < cursors.size()) { + approxWork += cursors[nextInitIndex].collect(txn, *this, nextInitIndex, initialScanDepth, eventQueue); + nextInitIndex++; + + if (nextInitIndex == cursors.size()) { + std::sort(eventQueue.begin(), eventQueue.end(), cmp); + } + + continue; + } else if (eventQueue.size() == 0) { + return true; + } + + auto ev = eventQueue.front(); + eventQueue.pop_front(); + bool doSend = false; + + if (indexOnly) { + if (f.doesMatchTimes(ev.created())) doSend = true; + } else { + approxWork += 10; + auto view = env.lookup_Event(txn, ev.levId()); + if (!view) throw herr("missing event from index, corrupt DB?"); + if (f.doesMatch(view->flat_nested())) doSend = true; + } + + if (doSend) { + if (handleEvent(ev.levId())) return true; + } + + cursors[ev.scanIndex()].outstanding--; + + if (cursors[ev.scanIndex()].outstanding == 0) { + std::deque moreEvents; + std::deque newEventQueue; + approxWork += cursors[ev.scanIndex()].collect(txn, *this, ev.scanIndex(), refillScanDepth, moreEvents); + + std::merge(eventQueue.begin(), eventQueue.end(), moreEvents.begin(), moreEvents.end(), std::back_inserter(newEventQueue), cmp); + eventQueue.swap(newEventQueue); + } + } + } +}; + + +struct DBQuery : NonCopyable { + Subscription sub; + + std::unique_ptr scanner; + size_t filterGroupIndex = 0; + bool dead = false; // external flag + flat_hash_set sentEventsFull; + flat_hash_set sentEventsCurr; + uint64_t lastWorkChecked = 0; + + uint64_t currScanTime = 0; + uint64_t currScanSaveRestores = 0; + uint64_t totalTime = 0; + uint64_t totalWork = 0; + + DBQuery(Subscription &sub) : sub(std::move(sub)) {} + DBQuery(const tao::json::value &filter, uint64_t maxLimit = MAX_U64) : sub(Subscription(1, ".", NostrFilterGroup::unwrapped(filter, maxLimit))) {} + + // If scan is complete, returns true + bool process(lmdb::txn &txn, std::function cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) { + while (filterGroupIndex < sub.filterGroup.size()) { + const auto &f = sub.filterGroup.filters[filterGroupIndex]; + + if (!scanner) scanner = std::make_unique(f); + + uint64_t startTime = hoytech::curr_time_us(); + + bool complete = scanner->scan(txn, [&](uint64_t levId){ + // If this event came in after our query began, don't send it. It will be sent after the EOSE. + if (levId > sub.latestEventId) return false; + + if (sentEventsFull.find(levId) == sentEventsFull.end()) { + sentEventsFull.insert(levId); + cb(sub, levId); + } + + sentEventsCurr.insert(levId); + return sentEventsCurr.size() >= f.limit; + }, [&](uint64_t approxWork){ + if (approxWork > lastWorkChecked + 2'000) { + lastWorkChecked = approxWork; + return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; + } + return false; + }); + + currScanTime += hoytech::curr_time_us() - startTime; + + if (!complete) { + currScanSaveRestores++; + return false; + } + + totalTime += currScanTime; + totalWork += scanner->approxWork; + + if (logMetrics) { + LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'" + << " scan=" << scanner->desc + << " indexOnly=" << scanner->indexOnly + << " time=" << currScanTime << "us" + << " saveRestores=" << currScanSaveRestores + << " recsFound=" << sentEventsCurr.size() + << " work=" << scanner->approxWork; + ; + } + + scanner.reset(); + filterGroupIndex++; + sentEventsCurr.clear(); + + currScanTime = 0; + currScanSaveRestores = 0; + } + + if (logMetrics) { + LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'" + << " totalTime=" << totalTime << "us" + << " totalWork=" << totalWork + << " recsSent=" << sentEventsFull.size() + ; + } + + return true; + } +}; diff --git a/src/DBScan.h b/src/DBScan.h deleted file mode 100644 index 9bfe11f..0000000 --- a/src/DBScan.h +++ /dev/null @@ -1,405 +0,0 @@ -#pragma once - -#include "golpe.h" - -#include "Subscription.h" -#include "filters.h" - - -struct DBScan { - const NostrFilter &f; - uint64_t remainingLimit; - - struct NullState { - }; - - struct IdScan { - size_t index = 0; - std::string prefix; - }; - - struct PubkeyKindScan { - size_t indexAuthor = 0; - size_t indexKind = 0; - std::string prefix; - }; - - struct PubkeyScan { - size_t index = 0; - std::string prefix; - }; - - struct TagScan { - flat_hash_map::const_iterator indexTagName; - size_t indexTagVal = 0; - std::string search; - }; - - struct KindScan { - size_t index = 0; - uint64_t kind; - }; - - struct CreatedAtScan { - bool done = false; - }; - - std::variant scanState = NullState{}; - lmdb::dbi indexDbi; - std::string resumeKey; - uint64_t resumeVal; - - enum class KeyMatchResult { - Yes, - No, - NoButContinue, - }; - - std::function isComplete; - std::function nextFilterItem; - std::function resetResume; - std::function keyMatch; - - DBScan(const NostrFilter &f_) : f(f_) { - remainingLimit = f.limit; - - if (f.ids) { - scanState = IdScan{}; - auto *state = std::get_if(&scanState); - indexDbi = env.dbi_Event__id; - - isComplete = [&, state]{ - return state->index >= f.ids->size(); - }; - nextFilterItem = [&, state]{ - state->index++; - }; - resetResume = [&, state]{ - state->prefix = f.ids->at(state->index); - resumeKey = padBytes(state->prefix, 32 + 8, '\xFF'); - resumeVal = MAX_U64; - }; - keyMatch = [&, state](std::string_view k, bool&){ - return k.starts_with(state->prefix) ? KeyMatchResult::Yes : KeyMatchResult::No; - }; - } else if (f.authors && f.kinds) { - scanState = PubkeyKindScan{}; - auto *state = std::get_if(&scanState); - indexDbi = env.dbi_Event__pubkeyKind; - - isComplete = [&, state]{ - return state->indexAuthor >= f.authors->size(); - }; - nextFilterItem = [&, state]{ - state->indexKind++; - if (state->indexKind >= f.kinds->size()) { - state->indexAuthor++; - state->indexKind = 0; - } - }; - resetResume = [&, state]{ - state->prefix = f.authors->at(state->indexAuthor); - if (state->prefix.size() == 32) state->prefix += lmdb::to_sv(f.kinds->at(state->indexKind)); - resumeKey = padBytes(state->prefix, 32 + 8 + 8, '\xFF'); - resumeVal = MAX_U64; - }; - keyMatch = [&, state](std::string_view k, bool &skipBack){ - if (!k.starts_with(state->prefix)) return KeyMatchResult::No; - if (state->prefix.size() == 32 + 8) return KeyMatchResult::Yes; - - ParsedKey_StringUint64Uint64 parsedKey(k); - if (parsedKey.n1 == f.kinds->at(state->indexKind)) { - return KeyMatchResult::Yes; - } else if (parsedKey.n1 < f.kinds->at(state->indexKind)) { - // With a prefix pubkey, continue scanning (pubkey,kind) backwards because with this index - // we don't know the next pubkey to jump back to - return KeyMatchResult::NoButContinue; - } - - resumeKey = makeKey_StringUint64Uint64(parsedKey.s, f.kinds->at(state->indexKind), MAX_U64); - resumeVal = MAX_U64; - skipBack = true; - return KeyMatchResult::No; - }; - } else if (f.authors) { - scanState = PubkeyScan{}; - auto *state = std::get_if(&scanState); - indexDbi = env.dbi_Event__pubkey; - - isComplete = [&, state]{ - return state->index >= f.authors->size(); - }; - nextFilterItem = [&, state]{ - state->index++; - }; - resetResume = [&, state]{ - state->prefix = f.authors->at(state->index); - resumeKey = padBytes(state->prefix, 32 + 8, '\xFF'); - resumeVal = MAX_U64; - }; - keyMatch = [&, state](std::string_view k, bool&){ - return k.starts_with(state->prefix) ? KeyMatchResult::Yes : KeyMatchResult::No; - }; - } else if (f.tags.size()) { - scanState = TagScan{f.tags.begin()}; - auto *state = std::get_if(&scanState); - indexDbi = env.dbi_Event__tag; - - isComplete = [&, state]{ - return state->indexTagName == f.tags.end(); - }; - nextFilterItem = [&, state]{ - state->indexTagVal++; - if (state->indexTagVal >= state->indexTagName->second.size()) { - state->indexTagName = std::next(state->indexTagName); - state->indexTagVal = 0; - } - }; - resetResume = [&, state]{ - state->search = state->indexTagName->first; - state->search += state->indexTagName->second.at(state->indexTagVal); - resumeKey = state->search + std::string(8, '\xFF'); - resumeVal = MAX_U64; - }; - keyMatch = [&, state](std::string_view k, bool&){ - return k.substr(0, state->search.size()) == state->search ? KeyMatchResult::Yes : KeyMatchResult::No; - }; - } else if (f.kinds) { - scanState = KindScan{}; - auto *state = std::get_if(&scanState); - indexDbi = env.dbi_Event__kind; - - isComplete = [&, state]{ - return state->index >= f.kinds->size(); - }; - nextFilterItem = [&, state]{ - state->index++; - }; - resetResume = [&, state]{ - state->kind = f.kinds->at(state->index); - resumeKey = std::string(lmdb::to_sv(state->kind)) + std::string(8, '\xFF'); - resumeVal = MAX_U64; - }; - keyMatch = [&, state](std::string_view k, bool&){ - ParsedKey_Uint64Uint64 parsedKey(k); - return parsedKey.n1 == state->kind ? KeyMatchResult::Yes : KeyMatchResult::No; - }; - } else { - scanState = CreatedAtScan{}; - auto *state = std::get_if(&scanState); - indexDbi = env.dbi_Event__created_at; - - isComplete = [&, state]{ - return state->done; - }; - nextFilterItem = [&, state]{ - state->done = true; - }; - resetResume = [&, state]{ - resumeKey = std::string(8, '\xFF'); - resumeVal = MAX_U64; - }; - keyMatch = [&, state](std::string_view k, bool&){ - return KeyMatchResult::Yes; - }; - } - } - - // If scan is complete, returns true - bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { - while (remainingLimit && !isComplete()) { - if (resumeKey == "") resetResume(); - - bool pause = false, skipBack = false; - - env.generic_foreachFull(txn, indexDbi, resumeKey, lmdb::to_sv(resumeVal), [&](auto k, auto v) { - if (doPause()) { - resumeKey = std::string(k); - resumeVal = lmdb::from_sv(v); - pause = true; - return false; - } - - auto matched = keyMatch(k, skipBack); - if (matched == KeyMatchResult::No) return false; - - uint64_t created; - - { - ParsedKey_StringUint64 parsedKey(k); - created = parsedKey.n; - - if ((f.since && created < f.since)) { - resumeKey = makeKey_StringUint64(parsedKey.s, 0); - resumeVal = 0; - skipBack = true; - return false; - } - - if (f.until && created > f.until) { - resumeKey = makeKey_StringUint64(parsedKey.s, f.until); - resumeVal = MAX_U64; - skipBack = true; - return false; - } - } - - bool sent = false; - uint64_t levId = lmdb::from_sv(v); - - if (matched == KeyMatchResult::NoButContinue) { - // Don't attempt to match filter - } else if (f.indexOnlyScans) { - if (f.doesMatchTimes(created)) { - handleEvent(levId, created); - sent = true; - } - } else { - auto view = env.lookup_Event(txn, levId); - if (!view) throw herr("missing event from index, corrupt DB?"); - if (f.doesMatch(view->flat_nested())) { - handleEvent(levId, created); - sent = true; - } - } - - if (sent) { - if (remainingLimit) remainingLimit--; - if (!remainingLimit) return false; - } - - return true; - }, true); - - if (pause) return false; - - if (!skipBack) { - remainingLimit = f.limit; - nextFilterItem(); - resumeKey = ""; - } - } - - return true; - } - - std::string padBytes(std::string_view str, size_t n, char padChar) { - if (str.size() > n) throw herr("unable to pad, string longer than expected"); - return std::string(str) + std::string(n - str.size(), padChar); - } -}; - - -struct DBScanQuery : NonCopyable { - struct PendingRecord { - uint64_t levId; - uint64_t created; - }; - - Subscription sub; - std::unique_ptr scanner; - std::vector pending; - - size_t filterGroupIndex = 0; - bool dead = false; - flat_hash_set alreadySentEvents; - - uint64_t currScanTime = 0; - uint64_t currScanSaveRestores = 0; - uint64_t currScanRecordsTraversed = 0; - uint64_t currScanRecordsFound = 0; - - uint64_t totalScanTime = 0; - - DBScanQuery(Subscription &sub_) : sub(std::move(sub_)) {} - - // If scan is complete, returns true - bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, bool logMetrics, std::function cb) { - uint64_t startTime = hoytech::curr_time_us(); - - while (filterGroupIndex < sub.filterGroup.size()) { - if (!scanner) scanner = std::make_unique(sub.filterGroup.filters[filterGroupIndex]); - - bool complete = scanner->scan(txn, [&](uint64_t levId, uint64_t created){ - // If this event came in after our query began, don't send it. It will be sent after the EOSE. - if (levId > sub.latestEventId) return; - - // We already sent this event - if (alreadySentEvents.find(levId) != alreadySentEvents.end()) return; - alreadySentEvents.insert(levId); - - currScanRecordsFound++; - pending.emplace_back(levId, created); - }, [&]{ - currScanRecordsTraversed++; - return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; - }); - - currScanTime += hoytech::curr_time_us() - startTime; - - if (!complete) { - currScanSaveRestores++; - return false; - } - - - std::sort(pending.begin(), pending.end(), [](auto &a, auto &b){ - return a.created == b.created ? a.levId > b.levId : a.created > b.created; - }); - - - uint64_t limit = sub.filterGroup.filters[filterGroupIndex].limit; - if (pending.size() > limit) pending.resize(limit); - - for (const auto &p : pending) { - cb(sub, p.levId); - } - - - totalScanTime += currScanTime; - - if (logMetrics) { - std::string scanType = "?"; - - if (std::get_if(&scanner->scanState)) { - scanType = "Id"; - } else if (std::get_if(&scanner->scanState)) { - scanType = "PubkeyKind"; - } else if (std::get_if(&scanner->scanState)) { - scanType = "Pubkey"; - } else if (std::get_if(&scanner->scanState)) { - scanType = "Tag"; - } else if (std::get_if(&scanner->scanState)) { - scanType = "Kind"; - } else if (std::get_if(&scanner->scanState)) { - scanType = "CreatedAt"; - } - - LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'" - << " scan=" << scanType - << " indexOnly=" << scanner->f.indexOnlyScans - << " time=" << currScanTime << "us" - << " saveRestores=" << currScanSaveRestores - << " recsFound=" << currScanRecordsFound - << " recsScanned=" << currScanRecordsTraversed - ; - } - - filterGroupIndex++; - scanner.reset(); - pending.clear(); - currScanTime = 0; - currScanSaveRestores = 0; - currScanRecordsTraversed = 0; - currScanRecordsFound = 0; - } - - if (logMetrics) { - LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'" - << " totalTime=" << totalScanTime << "us" - ; - } - - return true; - } -}; diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index dfd4b19..0e9c2dc 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -1,13 +1,13 @@ #include "RelayServer.h" -#include "DBScan.h" +#include "DBQuery.h" struct ActiveQueries : NonCopyable { Decompressor decomp; - using ConnQueries = flat_hash_map; - flat_hash_map conns; // connId -> subId -> DBScanQuery* - std::deque running; + using ConnQueries = flat_hash_map; + flat_hash_map conns; // connId -> subId -> DBQuery* + std::deque running; bool addSub(lmdb::txn &txn, Subscription &&sub) { sub.latestEventId = getMostRecentLevId(txn); @@ -24,7 +24,7 @@ struct ActiveQueries : NonCopyable { return false; } - DBScanQuery *q = new DBScanQuery(sub); + DBQuery *q = new DBQuery(sub); connQueries.try_emplace(q->sub.subId, q); running.push_front(q); @@ -32,7 +32,7 @@ struct ActiveQueries : NonCopyable { return true; } - DBScanQuery *findQuery(uint64_t connId, const SubId &subId) { + DBQuery *findQuery(uint64_t connId, const SubId &subId) { auto f1 = conns.find(connId); if (f1 == conns.end()) return nullptr; @@ -62,7 +62,7 @@ struct ActiveQueries : NonCopyable { void process(RelayServer *server, lmdb::txn &txn) { if (running.empty()) return; - DBScanQuery *q = running.front(); + DBQuery *q = running.front(); running.pop_front(); if (q->dead) { @@ -72,11 +72,11 @@ struct ActiveQueries : NonCopyable { auto cursor = lmdb::cursor::open(txn, env.dbi_EventPayload); - bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){ + bool complete = q->process(txn, [&](const auto &sub, uint64_t levId){ std::string_view key = lmdb::to_sv(levId), val; if (!cursor.get(key, val, MDB_SET_KEY)) throw herr("couldn't find event in EventPayload, corrupted DB?"); server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, val, nullptr, nullptr)); - }); + }, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf); if (complete) { auto connId = q->sub.connId; diff --git a/src/RelayYesstr.cpp b/src/RelayYesstr.cpp index fef3eb8..1969924 100644 --- a/src/RelayYesstr.cpp +++ b/src/RelayYesstr.cpp @@ -2,7 +2,7 @@ #include #include "RelayServer.h" -#include "DBScan.h" +#include "DBQuery.h" void RelayServer::runYesstr(ThreadPool::Thread &thr) { @@ -49,14 +49,12 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { // with other requests like RelayReqWorker does. std::vector levIds; - auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr)); - Subscription sub(1, "junkSub", filterGroup); - DBScanQuery query(sub); + DBQuery query(tao::json::from_string(filterStr)); while (1) { - bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ levIds.push_back(levId); - }); + }, MAX_U64, cfg().relay__logging__dbScanPerf); if (complete) break; } diff --git a/src/cmd_delete.cpp b/src/cmd_delete.cpp index 5c4cb7f..11f2481 100644 --- a/src/cmd_delete.cpp +++ b/src/cmd_delete.cpp @@ -3,7 +3,7 @@ #include #include "golpe.h" -#include "DBScan.h" +#include "DBQuery.h" #include "events.h" #include "gc.h" @@ -43,10 +43,7 @@ void cmd_delete(const std::vector &subArgs) { } - auto filterGroup = NostrFilterGroup::unwrapped(filter, MAX_U64); - Subscription sub(1, "junkSub", filterGroup); - DBScanQuery query(sub); - + DBQuery query(filter); btree_set levIds; @@ -54,7 +51,7 @@ void cmd_delete(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ levIds.insert(levId); }); diff --git a/src/cmd_dict.cpp b/src/cmd_dict.cpp index 088d147..58b7104 100644 --- a/src/cmd_dict.cpp +++ b/src/cmd_dict.cpp @@ -7,7 +7,7 @@ #include #include "golpe.h" -#include "DBScan.h" +#include "DBQuery.h" #include "events.h" @@ -47,12 +47,10 @@ void cmd_dict(const std::vector &subArgs) { auto txn = env.txn_ro(); - auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64); - Subscription sub(1, "junkSub", filterGroup); - DBScanQuery query(sub); + DBQuery query(tao::json::from_string(filterStr)); while (1) { - bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ levIds.push_back(levId); }); diff --git a/src/cmd_scan.cpp b/src/cmd_scan.cpp index 73786f3..8c73eb7 100644 --- a/src/cmd_scan.cpp +++ b/src/cmd_scan.cpp @@ -3,14 +3,14 @@ #include #include "golpe.h" -#include "DBScan.h" +#include "DBQuery.h" #include "events.h" static const char USAGE[] = R"( Usage: - scan [--pause=] [--metrics] + scan [--pause=] [--metrics] [--count] )"; @@ -21,24 +21,27 @@ void cmd_scan(const std::vector &subArgs) { if (args["--pause"]) pause = args["--pause"].asLong(); bool metrics = args["--metrics"].asBool(); + bool count = args["--count"].asBool(); std::string filterStr = args[""].asString(); - auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64); - Subscription sub(1, "junkSub", filterGroup); - DBScanQuery query(sub); - + DBQuery query(tao::json::from_string(filterStr)); Decompressor decomp; auto txn = env.txn_ro(); + uint64_t numEvents = 0; + while (1) { - bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t levId){ - std::cout << getEventJson(txn, decomp, levId) << "\n"; - }); + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + if (count) numEvents++; + else std::cout << getEventJson(txn, decomp, levId) << "\n"; + }, pause ? pause : MAX_U64, metrics); if (complete) break; } + + if (count) std::cout << numEvents << std::endl; } diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index cf1dc71..4473182 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -9,7 +9,7 @@ #include "WriterPipeline.h" #include "Subscription.h" #include "WSConnection.h" -#include "DBScan.h" +#include "DBQuery.h" #include "filters.h" #include "events.h" #include "yesstr.h" @@ -151,15 +151,12 @@ void cmd_sync(const std::vector &subArgs) { ::exit(1); } - auto filterGroup = NostrFilterGroup::unwrapped(filterJson); + DBQuery query(filterJson); - Subscription sub(1, "junkSub", filterGroup); - - DBScanQuery query(sub); auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ levIds.push_back(levId); }); diff --git a/src/global.h b/src/global.h index 8291ca4..5272cbb 100644 --- a/src/global.h +++ b/src/global.h @@ -21,3 +21,4 @@ std::string renderPercent(double p); uint64_t parseUint64(const std::string &s); std::string parseIP(const std::string &ip); uint64_t getDBVersion(lmdb::txn &txn); +std::string padBytes(std::string_view str, size_t n, char padChar); diff --git a/src/misc.cpp b/src/misc.cpp index d1f5589..df13ff7 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -103,3 +103,9 @@ uint64_t getDBVersion(lmdb::txn &txn) { return dbVersion; } + + +std::string padBytes(std::string_view str, size_t n, char padChar) { + if (str.size() > n) throw herr("unable to pad, string longer than expected"); + return std::string(str) + std::string(n - str.size(), padChar); +} diff --git a/test/filterFuzzTest.pl b/test/filterFuzzTest.pl index 8cfe613..2fb87c7 100644 --- a/test/filterFuzzTest.pl +++ b/test/filterFuzzTest.pl @@ -198,7 +198,7 @@ sub testScan { my $headCmd = @$fg == 1 && $fg->[0]->{limit} ? "| head -n $fg->[0]->{limit}" : ""; my $resA = `./strfry export --reverse 2>/dev/null | perl test/dumbFilter.pl '$fge' $headCmd | jq -r .id | sort | sha256sum`; - my $resB = `./strfry scan --metrics '$fge' | jq -r .id | sort | sha256sum`; + my $resB = `./strfry scan --pause 1 --metrics '$fge' | jq -r .id | sort | sha256sum`; print "$resA\n$resB\n";