fix query behaviour when limit is present (reported by @Mazin)

This commit is contained in:
Doug Hoyte
2023-02-21 16:42:45 -05:00
parent e5e5ff6817
commit 5175664e2f
11 changed files with 417 additions and 447 deletions

375
src/DBQuery.h Normal file
View File

@ -0,0 +1,375 @@
#pragma once
#include "golpe.h"
#include "Subscription.h"
#include "filters.h"
struct DBScan : NonCopyable {
struct CandidateEvent {
private:
uint64_t packed;
uint64_t levIdStorage;
public:
CandidateEvent(uint64_t levId, uint64_t created, uint64_t scanIndex) : packed(scanIndex << 40 | created), levIdStorage(levId) {}
uint64_t levId() { return levIdStorage; }
uint64_t created() { return packed & 0xFF'FFFFFFFF; }
uint64_t scanIndex() { return packed >> 40; }
};
enum class KeyMatchResult {
Yes,
No,
NoButContinue,
};
struct ScanCursor {
std::string resumeKey;
uint64_t resumeVal;
std::function<KeyMatchResult(std::string_view)> keyMatch;
uint64_t outstanding = 0; // number of records remaining in eventQueue, decremented in DBScan::scan
bool active() {
return resumeKey.size() > 0;
}
uint64_t collect(lmdb::txn &txn, DBScan &s, uint64_t scanIndex, uint64_t limit, std::deque<CandidateEvent> &output) {
uint64_t added = 0;
while (active() && limit > 0) {
bool finished = env.generic_foreachFull(txn, s.indexDbi, resumeKey, lmdb::to_sv<uint64_t>(resumeVal), [&](auto k, auto v) {
if (limit == 0) {
resumeKey = std::string(k);
resumeVal = lmdb::from_sv<uint64_t>(v);
return false;
}
auto matched = keyMatch(k);
if (matched == KeyMatchResult::No) {
resumeKey = "";
return false;
}
uint64_t created;
{
ParsedKey_StringUint64 parsedKey(k);
created = parsedKey.n;
if (s.f.since && created < s.f.since) {
resumeKey = makeKey_StringUint64(parsedKey.s, 0);
resumeVal = 0;
return false;
}
if (s.f.until && created > s.f.until) {
resumeKey = makeKey_StringUint64(parsedKey.s, s.f.until);
resumeVal = MAX_U64;
return false;
}
}
if (matched == KeyMatchResult::Yes) {
uint64_t levId = lmdb::from_sv<uint64_t>(v);
output.emplace_back(levId, created, scanIndex);
added++;
limit--;
}
return true;
}, true);
if (finished) resumeKey = "";
}
outstanding += added;
return added;
}
};
const NostrFilter &f;
bool indexOnly;
lmdb::dbi indexDbi;
const char *desc = "?";
std::vector<ScanCursor> cursors;
std::deque<CandidateEvent> eventQueue; // sorted descending by created
uint64_t initialScanDepth;
uint64_t refillScanDepth;
uint64_t nextInitIndex = 0;
uint64_t approxWork = 0;
DBScan(const NostrFilter &f) : f(f) {
indexOnly = f.indexOnlyScans;
if (f.ids) {
indexDbi = env.dbi_Event__id;
desc = "ID";
cursors.reserve(f.ids->size());
for (uint64_t i = 0; i < f.ids->size(); i++) {
std::string prefix = f.ids->at(i);
cursors.emplace_back(
padBytes(prefix, 32 + 8, '\xFF'),
MAX_U64,
[prefix](std::string_view k){
return k.starts_with(prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
}
);
}
} else if (f.tags.size()) {
indexDbi = env.dbi_Event__tag;
desc = "Tag";
char tagName = '\0';
{
uint64_t numTags = MAX_U64;
for (const auto &[tn, filterSet] : f.tags) {
if (filterSet.size() < numTags) {
numTags = filterSet.size();
tagName = tn;
}
}
}
const auto &filterSet = f.tags.at(tagName);
cursors.reserve(filterSet.size());
for (uint64_t i = 0; i < filterSet.size(); i++) {
std::string search;
search += tagName;
search += filterSet.at(i);
cursors.emplace_back(
search + std::string(8, '\xFF'),
MAX_U64,
[search](std::string_view k){
return k.size() == search.size() + 8 && k.starts_with(search) ? KeyMatchResult::Yes : KeyMatchResult::No;
}
);
}
} else if (f.authors && f.kinds && f.authors->size() * f.kinds->size() < 1'000) {
indexDbi = env.dbi_Event__pubkeyKind;
desc = "PubkeyKind";
cursors.reserve(f.authors->size() * f.kinds->size());
for (uint64_t i = 0; i < f.authors->size(); i++) {
for (uint64_t j = 0; j < f.kinds->size(); j++) {
uint64_t kind = f.kinds->at(j);
std::string prefix = f.authors->at(i);
if (prefix.size() == 32) prefix += lmdb::to_sv<uint64_t>(kind);
cursors.emplace_back(
padBytes(prefix, 32 + 8 + 8, '\xFF'),
MAX_U64,
[prefix, kind](std::string_view k){
if (!k.starts_with(prefix)) return KeyMatchResult::No;
if (prefix.size() == 32 + 8) return KeyMatchResult::Yes;
ParsedKey_StringUint64Uint64 parsedKey(k);
if (parsedKey.n1 == kind) return KeyMatchResult::Yes;
// With a prefix pubkey, continue scanning (pubkey,kind) backwards because with this index
// we don't know the next pubkey to jump back to
return KeyMatchResult::NoButContinue;
}
);
}
}
} else if (f.authors) {
if (f.kinds) indexOnly = false; // because of the size limit in previous test
indexDbi = env.dbi_Event__pubkey;
desc = "Pubkey";
cursors.reserve(f.authors->size());
for (uint64_t i = 0; i < f.authors->size(); i++) {
std::string prefix = f.authors->at(i);
cursors.emplace_back(
padBytes(prefix, 32 + 8, '\xFF'),
MAX_U64,
[prefix](std::string_view k){
return k.starts_with(prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
}
);
}
} else if (f.kinds) {
indexDbi = env.dbi_Event__kind;
desc = "Kind";
cursors.reserve(f.kinds->size());
for (uint64_t i = 0; i < f.kinds->size(); i++) {
uint64_t kind = f.kinds->at(i);
cursors.emplace_back(
std::string(lmdb::to_sv<uint64_t>(kind)) + std::string(8, '\xFF'),
MAX_U64,
[kind](std::string_view k){
ParsedKey_Uint64Uint64 parsedKey(k);
return parsedKey.n1 == kind ? KeyMatchResult::Yes : KeyMatchResult::No;
}
);
}
} else {
indexDbi = env.dbi_Event__created_at;
desc = "CreatedAt";
cursors.reserve(1);
cursors.emplace_back(
std::string(8, '\xFF'),
MAX_U64,
[](std::string_view){
return KeyMatchResult::Yes;
}
);
}
initialScanDepth = std::clamp(f.limit / cursors.size(), uint64_t(5), uint64_t(50));
refillScanDepth = 10 * initialScanDepth;
}
bool scan(lmdb::txn &txn, std::function<bool(uint64_t)> handleEvent, std::function<bool(uint64_t)> doPause) {
auto cmp = [](auto &a, auto &b){
return a.created() == b.created() ? a.levId() > b.levId() : a.created() > b.created();
};
while (1) {
approxWork++;
if (doPause(approxWork)) return false;
if (nextInitIndex < cursors.size()) {
approxWork += cursors[nextInitIndex].collect(txn, *this, nextInitIndex, initialScanDepth, eventQueue);
nextInitIndex++;
if (nextInitIndex == cursors.size()) {
std::sort(eventQueue.begin(), eventQueue.end(), cmp);
}
continue;
} else if (eventQueue.size() == 0) {
return true;
}
auto ev = eventQueue.front();
eventQueue.pop_front();
bool doSend = false;
if (indexOnly) {
if (f.doesMatchTimes(ev.created())) doSend = true;
} else {
approxWork += 10;
auto view = env.lookup_Event(txn, ev.levId());
if (!view) throw herr("missing event from index, corrupt DB?");
if (f.doesMatch(view->flat_nested())) doSend = true;
}
if (doSend) {
if (handleEvent(ev.levId())) return true;
}
cursors[ev.scanIndex()].outstanding--;
if (cursors[ev.scanIndex()].outstanding == 0) {
std::deque<CandidateEvent> moreEvents;
std::deque<CandidateEvent> newEventQueue;
approxWork += cursors[ev.scanIndex()].collect(txn, *this, ev.scanIndex(), refillScanDepth, moreEvents);
std::merge(eventQueue.begin(), eventQueue.end(), moreEvents.begin(), moreEvents.end(), std::back_inserter(newEventQueue), cmp);
eventQueue.swap(newEventQueue);
}
}
}
};
struct DBQuery : NonCopyable {
Subscription sub;
std::unique_ptr<DBScan> scanner;
size_t filterGroupIndex = 0;
bool dead = false; // external flag
flat_hash_set<uint64_t> sentEventsFull;
flat_hash_set<uint64_t> sentEventsCurr;
uint64_t lastWorkChecked = 0;
uint64_t currScanTime = 0;
uint64_t currScanSaveRestores = 0;
uint64_t totalTime = 0;
uint64_t totalWork = 0;
DBQuery(Subscription &sub) : sub(std::move(sub)) {}
DBQuery(const tao::json::value &filter, uint64_t maxLimit = MAX_U64) : sub(Subscription(1, ".", NostrFilterGroup::unwrapped(filter, maxLimit))) {}
// If scan is complete, returns true
bool process(lmdb::txn &txn, std::function<void(const Subscription &, uint64_t)> cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) {
while (filterGroupIndex < sub.filterGroup.size()) {
const auto &f = sub.filterGroup.filters[filterGroupIndex];
if (!scanner) scanner = std::make_unique<DBScan>(f);
uint64_t startTime = hoytech::curr_time_us();
bool complete = scanner->scan(txn, [&](uint64_t levId){
// If this event came in after our query began, don't send it. It will be sent after the EOSE.
if (levId > sub.latestEventId) return false;
if (sentEventsFull.find(levId) == sentEventsFull.end()) {
sentEventsFull.insert(levId);
cb(sub, levId);
}
sentEventsCurr.insert(levId);
return sentEventsCurr.size() >= f.limit;
}, [&](uint64_t approxWork){
if (approxWork > lastWorkChecked + 2'000) {
lastWorkChecked = approxWork;
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;
}
return false;
});
currScanTime += hoytech::curr_time_us() - startTime;
if (!complete) {
currScanSaveRestores++;
return false;
}
totalTime += currScanTime;
totalWork += scanner->approxWork;
if (logMetrics) {
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
<< " scan=" << scanner->desc
<< " indexOnly=" << scanner->indexOnly
<< " time=" << currScanTime << "us"
<< " saveRestores=" << currScanSaveRestores
<< " recsFound=" << sentEventsCurr.size()
<< " work=" << scanner->approxWork;
;
}
scanner.reset();
filterGroupIndex++;
sentEventsCurr.clear();
currScanTime = 0;
currScanSaveRestores = 0;
}
if (logMetrics) {
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
<< " totalTime=" << totalTime << "us"
<< " totalWork=" << totalWork
<< " recsSent=" << sentEventsFull.size()
;
}
return true;
}
};

View File

@ -1,405 +0,0 @@
#pragma once
#include "golpe.h"
#include "Subscription.h"
#include "filters.h"
struct DBScan {
const NostrFilter &f;
uint64_t remainingLimit;
struct NullState {
};
struct IdScan {
size_t index = 0;
std::string prefix;
};
struct PubkeyKindScan {
size_t indexAuthor = 0;
size_t indexKind = 0;
std::string prefix;
};
struct PubkeyScan {
size_t index = 0;
std::string prefix;
};
struct TagScan {
flat_hash_map<char, FilterSetBytes>::const_iterator indexTagName;
size_t indexTagVal = 0;
std::string search;
};
struct KindScan {
size_t index = 0;
uint64_t kind;
};
struct CreatedAtScan {
bool done = false;
};
std::variant<NullState, IdScan, PubkeyKindScan, PubkeyScan, TagScan, KindScan, CreatedAtScan> scanState = NullState{};
lmdb::dbi indexDbi;
std::string resumeKey;
uint64_t resumeVal;
enum class KeyMatchResult {
Yes,
No,
NoButContinue,
};
std::function<bool()> isComplete;
std::function<void()> nextFilterItem;
std::function<void()> resetResume;
std::function<KeyMatchResult(std::string_view, bool&)> keyMatch;
DBScan(const NostrFilter &f_) : f(f_) {
remainingLimit = f.limit;
if (f.ids) {
scanState = IdScan{};
auto *state = std::get_if<IdScan>(&scanState);
indexDbi = env.dbi_Event__id;
isComplete = [&, state]{
return state->index >= f.ids->size();
};
nextFilterItem = [&, state]{
state->index++;
};
resetResume = [&, state]{
state->prefix = f.ids->at(state->index);
resumeKey = padBytes(state->prefix, 32 + 8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return k.starts_with(state->prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
};
} else if (f.authors && f.kinds) {
scanState = PubkeyKindScan{};
auto *state = std::get_if<PubkeyKindScan>(&scanState);
indexDbi = env.dbi_Event__pubkeyKind;
isComplete = [&, state]{
return state->indexAuthor >= f.authors->size();
};
nextFilterItem = [&, state]{
state->indexKind++;
if (state->indexKind >= f.kinds->size()) {
state->indexAuthor++;
state->indexKind = 0;
}
};
resetResume = [&, state]{
state->prefix = f.authors->at(state->indexAuthor);
if (state->prefix.size() == 32) state->prefix += lmdb::to_sv<uint64_t>(f.kinds->at(state->indexKind));
resumeKey = padBytes(state->prefix, 32 + 8 + 8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool &skipBack){
if (!k.starts_with(state->prefix)) return KeyMatchResult::No;
if (state->prefix.size() == 32 + 8) return KeyMatchResult::Yes;
ParsedKey_StringUint64Uint64 parsedKey(k);
if (parsedKey.n1 == f.kinds->at(state->indexKind)) {
return KeyMatchResult::Yes;
} else if (parsedKey.n1 < f.kinds->at(state->indexKind)) {
// With a prefix pubkey, continue scanning (pubkey,kind) backwards because with this index
// we don't know the next pubkey to jump back to
return KeyMatchResult::NoButContinue;
}
resumeKey = makeKey_StringUint64Uint64(parsedKey.s, f.kinds->at(state->indexKind), MAX_U64);
resumeVal = MAX_U64;
skipBack = true;
return KeyMatchResult::No;
};
} else if (f.authors) {
scanState = PubkeyScan{};
auto *state = std::get_if<PubkeyScan>(&scanState);
indexDbi = env.dbi_Event__pubkey;
isComplete = [&, state]{
return state->index >= f.authors->size();
};
nextFilterItem = [&, state]{
state->index++;
};
resetResume = [&, state]{
state->prefix = f.authors->at(state->index);
resumeKey = padBytes(state->prefix, 32 + 8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return k.starts_with(state->prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
};
} else if (f.tags.size()) {
scanState = TagScan{f.tags.begin()};
auto *state = std::get_if<TagScan>(&scanState);
indexDbi = env.dbi_Event__tag;
isComplete = [&, state]{
return state->indexTagName == f.tags.end();
};
nextFilterItem = [&, state]{
state->indexTagVal++;
if (state->indexTagVal >= state->indexTagName->second.size()) {
state->indexTagName = std::next(state->indexTagName);
state->indexTagVal = 0;
}
};
resetResume = [&, state]{
state->search = state->indexTagName->first;
state->search += state->indexTagName->second.at(state->indexTagVal);
resumeKey = state->search + std::string(8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return k.substr(0, state->search.size()) == state->search ? KeyMatchResult::Yes : KeyMatchResult::No;
};
} else if (f.kinds) {
scanState = KindScan{};
auto *state = std::get_if<KindScan>(&scanState);
indexDbi = env.dbi_Event__kind;
isComplete = [&, state]{
return state->index >= f.kinds->size();
};
nextFilterItem = [&, state]{
state->index++;
};
resetResume = [&, state]{
state->kind = f.kinds->at(state->index);
resumeKey = std::string(lmdb::to_sv<uint64_t>(state->kind)) + std::string(8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
ParsedKey_Uint64Uint64 parsedKey(k);
return parsedKey.n1 == state->kind ? KeyMatchResult::Yes : KeyMatchResult::No;
};
} else {
scanState = CreatedAtScan{};
auto *state = std::get_if<CreatedAtScan>(&scanState);
indexDbi = env.dbi_Event__created_at;
isComplete = [&, state]{
return state->done;
};
nextFilterItem = [&, state]{
state->done = true;
};
resetResume = [&, state]{
resumeKey = std::string(8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return KeyMatchResult::Yes;
};
}
}
// If scan is complete, returns true
bool scan(lmdb::txn &txn, std::function<void(uint64_t, uint64_t)> handleEvent, std::function<bool()> doPause) {
while (remainingLimit && !isComplete()) {
if (resumeKey == "") resetResume();
bool pause = false, skipBack = false;
env.generic_foreachFull(txn, indexDbi, resumeKey, lmdb::to_sv<uint64_t>(resumeVal), [&](auto k, auto v) {
if (doPause()) {
resumeKey = std::string(k);
resumeVal = lmdb::from_sv<uint64_t>(v);
pause = true;
return false;
}
auto matched = keyMatch(k, skipBack);
if (matched == KeyMatchResult::No) return false;
uint64_t created;
{
ParsedKey_StringUint64 parsedKey(k);
created = parsedKey.n;
if ((f.since && created < f.since)) {
resumeKey = makeKey_StringUint64(parsedKey.s, 0);
resumeVal = 0;
skipBack = true;
return false;
}
if (f.until && created > f.until) {
resumeKey = makeKey_StringUint64(parsedKey.s, f.until);
resumeVal = MAX_U64;
skipBack = true;
return false;
}
}
bool sent = false;
uint64_t levId = lmdb::from_sv<uint64_t>(v);
if (matched == KeyMatchResult::NoButContinue) {
// Don't attempt to match filter
} else if (f.indexOnlyScans) {
if (f.doesMatchTimes(created)) {
handleEvent(levId, created);
sent = true;
}
} else {
auto view = env.lookup_Event(txn, levId);
if (!view) throw herr("missing event from index, corrupt DB?");
if (f.doesMatch(view->flat_nested())) {
handleEvent(levId, created);
sent = true;
}
}
if (sent) {
if (remainingLimit) remainingLimit--;
if (!remainingLimit) return false;
}
return true;
}, true);
if (pause) return false;
if (!skipBack) {
remainingLimit = f.limit;
nextFilterItem();
resumeKey = "";
}
}
return true;
}
std::string padBytes(std::string_view str, size_t n, char padChar) {
if (str.size() > n) throw herr("unable to pad, string longer than expected");
return std::string(str) + std::string(n - str.size(), padChar);
}
};
struct DBScanQuery : NonCopyable {
struct PendingRecord {
uint64_t levId;
uint64_t created;
};
Subscription sub;
std::unique_ptr<DBScan> scanner;
std::vector<PendingRecord> pending;
size_t filterGroupIndex = 0;
bool dead = false;
flat_hash_set<uint64_t> alreadySentEvents;
uint64_t currScanTime = 0;
uint64_t currScanSaveRestores = 0;
uint64_t currScanRecordsTraversed = 0;
uint64_t currScanRecordsFound = 0;
uint64_t totalScanTime = 0;
DBScanQuery(Subscription &sub_) : sub(std::move(sub_)) {}
// If scan is complete, returns true
bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, bool logMetrics, std::function<void(const Subscription &, uint64_t)> cb) {
uint64_t startTime = hoytech::curr_time_us();
while (filterGroupIndex < sub.filterGroup.size()) {
if (!scanner) scanner = std::make_unique<DBScan>(sub.filterGroup.filters[filterGroupIndex]);
bool complete = scanner->scan(txn, [&](uint64_t levId, uint64_t created){
// If this event came in after our query began, don't send it. It will be sent after the EOSE.
if (levId > sub.latestEventId) return;
// We already sent this event
if (alreadySentEvents.find(levId) != alreadySentEvents.end()) return;
alreadySentEvents.insert(levId);
currScanRecordsFound++;
pending.emplace_back(levId, created);
}, [&]{
currScanRecordsTraversed++;
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;
});
currScanTime += hoytech::curr_time_us() - startTime;
if (!complete) {
currScanSaveRestores++;
return false;
}
std::sort(pending.begin(), pending.end(), [](auto &a, auto &b){
return a.created == b.created ? a.levId > b.levId : a.created > b.created;
});
uint64_t limit = sub.filterGroup.filters[filterGroupIndex].limit;
if (pending.size() > limit) pending.resize(limit);
for (const auto &p : pending) {
cb(sub, p.levId);
}
totalScanTime += currScanTime;
if (logMetrics) {
std::string scanType = "?";
if (std::get_if<DBScan::IdScan>(&scanner->scanState)) {
scanType = "Id";
} else if (std::get_if<DBScan::PubkeyKindScan>(&scanner->scanState)) {
scanType = "PubkeyKind";
} else if (std::get_if<DBScan::PubkeyScan>(&scanner->scanState)) {
scanType = "Pubkey";
} else if (std::get_if<DBScan::TagScan>(&scanner->scanState)) {
scanType = "Tag";
} else if (std::get_if<DBScan::KindScan>(&scanner->scanState)) {
scanType = "Kind";
} else if (std::get_if<DBScan::CreatedAtScan>(&scanner->scanState)) {
scanType = "CreatedAt";
}
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
<< " scan=" << scanType
<< " indexOnly=" << scanner->f.indexOnlyScans
<< " time=" << currScanTime << "us"
<< " saveRestores=" << currScanSaveRestores
<< " recsFound=" << currScanRecordsFound
<< " recsScanned=" << currScanRecordsTraversed
;
}
filterGroupIndex++;
scanner.reset();
pending.clear();
currScanTime = 0;
currScanSaveRestores = 0;
currScanRecordsTraversed = 0;
currScanRecordsFound = 0;
}
if (logMetrics) {
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
<< " totalTime=" << totalScanTime << "us"
;
}
return true;
}
};

View File

@ -1,13 +1,13 @@
#include "RelayServer.h"
#include "DBScan.h"
#include "DBQuery.h"
struct ActiveQueries : NonCopyable {
Decompressor decomp;
using ConnQueries = flat_hash_map<SubId, DBScanQuery*>;
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBScanQuery*
std::deque<DBScanQuery*> running;
using ConnQueries = flat_hash_map<SubId, DBQuery*>;
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBQuery*
std::deque<DBQuery*> running;
bool addSub(lmdb::txn &txn, Subscription &&sub) {
sub.latestEventId = getMostRecentLevId(txn);
@ -24,7 +24,7 @@ struct ActiveQueries : NonCopyable {
return false;
}
DBScanQuery *q = new DBScanQuery(sub);
DBQuery *q = new DBQuery(sub);
connQueries.try_emplace(q->sub.subId, q);
running.push_front(q);
@ -32,7 +32,7 @@ struct ActiveQueries : NonCopyable {
return true;
}
DBScanQuery *findQuery(uint64_t connId, const SubId &subId) {
DBQuery *findQuery(uint64_t connId, const SubId &subId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return nullptr;
@ -62,7 +62,7 @@ struct ActiveQueries : NonCopyable {
void process(RelayServer *server, lmdb::txn &txn) {
if (running.empty()) return;
DBScanQuery *q = running.front();
DBQuery *q = running.front();
running.pop_front();
if (q->dead) {
@ -72,11 +72,11 @@ struct ActiveQueries : NonCopyable {
auto cursor = lmdb::cursor::open(txn, env.dbi_EventPayload);
bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId){
std::string_view key = lmdb::to_sv<uint64_t>(levId), val;
if (!cursor.get(key, val, MDB_SET_KEY)) throw herr("couldn't find event in EventPayload, corrupted DB?");
server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, val, nullptr, nullptr));
});
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
if (complete) {
auto connId = q->sub.connId;

View File

@ -2,7 +2,7 @@
#include <quadrable/transport.h>
#include "RelayServer.h"
#include "DBScan.h"
#include "DBQuery.h"
void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
@ -49,14 +49,12 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
// with other requests like RelayReqWorker does.
std::vector<uint64_t> levIds;
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
DBQuery query(tao::json::from_string(filterStr));
while (1) {
bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
});
}, MAX_U64, cfg().relay__logging__dbScanPerf);
if (complete) break;
}

View File

@ -3,7 +3,7 @@
#include <docopt.h>
#include "golpe.h"
#include "DBScan.h"
#include "DBQuery.h"
#include "events.h"
#include "gc.h"
@ -43,10 +43,7 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
}
auto filterGroup = NostrFilterGroup::unwrapped(filter, MAX_U64);
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
DBQuery query(filter);
btree_set<uint64_t> levIds;
@ -54,7 +51,7 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
while (1) {
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.insert(levId);
});

View File

@ -7,7 +7,7 @@
#include <docopt.h>
#include "golpe.h"
#include "DBScan.h"
#include "DBQuery.h"
#include "events.h"
@ -47,12 +47,10 @@ void cmd_dict(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64);
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
DBQuery query(tao::json::from_string(filterStr));
while (1) {
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
});

View File

@ -3,14 +3,14 @@
#include <docopt.h>
#include "golpe.h"
#include "DBScan.h"
#include "DBQuery.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
scan [--pause=<pause>] [--metrics] <filter>
scan [--pause=<pause>] [--metrics] [--count] <filter>
)";
@ -21,24 +21,27 @@ void cmd_scan(const std::vector<std::string> &subArgs) {
if (args["--pause"]) pause = args["--pause"].asLong();
bool metrics = args["--metrics"].asBool();
bool count = args["--count"].asBool();
std::string filterStr = args["<filter>"].asString();
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64);
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
DBQuery query(tao::json::from_string(filterStr));
Decompressor decomp;
auto txn = env.txn_ro();
uint64_t numEvents = 0;
while (1) {
bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t levId){
std::cout << getEventJson(txn, decomp, levId) << "\n";
});
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
if (count) numEvents++;
else std::cout << getEventJson(txn, decomp, levId) << "\n";
}, pause ? pause : MAX_U64, metrics);
if (complete) break;
}
if (count) std::cout << numEvents << std::endl;
}

View File

@ -9,7 +9,7 @@
#include "WriterPipeline.h"
#include "Subscription.h"
#include "WSConnection.h"
#include "DBScan.h"
#include "DBQuery.h"
#include "filters.h"
#include "events.h"
#include "yesstr.h"
@ -151,15 +151,12 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
::exit(1);
}
auto filterGroup = NostrFilterGroup::unwrapped(filterJson);
DBQuery query(filterJson);
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
auto txn = env.txn_ro();
while (1) {
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
});

View File

@ -21,3 +21,4 @@ std::string renderPercent(double p);
uint64_t parseUint64(const std::string &s);
std::string parseIP(const std::string &ip);
uint64_t getDBVersion(lmdb::txn &txn);
std::string padBytes(std::string_view str, size_t n, char padChar);

View File

@ -103,3 +103,9 @@ uint64_t getDBVersion(lmdb::txn &txn) {
return dbVersion;
}
std::string padBytes(std::string_view str, size_t n, char padChar) {
if (str.size() > n) throw herr("unable to pad, string longer than expected");
return std::string(str) + std::string(n - str.size(), padChar);
}

View File

@ -198,7 +198,7 @@ sub testScan {
my $headCmd = @$fg == 1 && $fg->[0]->{limit} ? "| head -n $fg->[0]->{limit}" : "";
my $resA = `./strfry export --reverse 2>/dev/null | perl test/dumbFilter.pl '$fge' $headCmd | jq -r .id | sort | sha256sum`;
my $resB = `./strfry scan --metrics '$fge' | jq -r .id | sort | sha256sum`;
my $resB = `./strfry scan --pause 1 --metrics '$fge' | jq -r .id | sort | sha256sum`;
print "$resA\n$resB\n";