config params to control logging verbosity

This commit is contained in:
Doug Hoyte
2023-01-22 11:33:12 -05:00
parent 4242290584
commit 0171fc9ec3
9 changed files with 100 additions and 22 deletions

View File

@ -98,6 +98,19 @@ config:
desc: "Maximum records that can be returned per filter"
default: 500
- name: relay__logging__dumpInAll
desc: "Dump all incoming messages"
default: false
- name: relay__logging__dumpInEvents
desc: "Dump all incoming EVENT messages"
default: false
- name: relay__logging__dumpInReqs
desc: "Dump all incoming REQ/CLOSE messages"
default: false
- name: relay__logging__dbScanPerf
desc: "Log performance metrics for initial REQ database scans"
default: false
- name: relay__numThreads__ingester
default: 3
noReload: true

View File

@ -58,8 +58,6 @@ struct DBScan {
remainingLimit = f.limit;
if (f.ids) {
LI << "ID Scan";
scanState = IdScan{};
auto *state = std::get_if<IdScan>(&scanState);
indexDbi = env.dbi_Event__id;
@ -79,8 +77,6 @@ struct DBScan {
return k.starts_with(state->prefix);
};
} else if (f.authors && f.kinds) {
LI << "PubkeyKind Scan";
scanState = PubkeyKindScan{};
auto *state = std::get_if<PubkeyKindScan>(&scanState);
indexDbi = env.dbi_Event__pubkeyKind;
@ -114,8 +110,6 @@ struct DBScan {
return false;
};
} else if (f.authors) {
LI << "Pubkey Scan";
scanState = PubkeyScan{};
auto *state = std::get_if<PubkeyScan>(&scanState);
indexDbi = env.dbi_Event__pubkey;
@ -135,8 +129,6 @@ struct DBScan {
return k.starts_with(state->prefix);
};
} else if (f.tags.size()) {
LI << "Tag Scan";
scanState = TagScan{f.tags.begin()};
auto *state = std::get_if<TagScan>(&scanState);
indexDbi = env.dbi_Event__tag;
@ -161,8 +153,6 @@ struct DBScan {
return k.substr(0, state->search.size()) == state->search;
};
} else if (f.kinds) {
LI << "Kind Scan";
scanState = KindScan{};
auto *state = std::get_if<KindScan>(&scanState);
indexDbi = env.dbi_Event__kind;
@ -183,8 +173,6 @@ struct DBScan {
return parsedKey.n1 == state->kind;
};
} else {
LI << "CreatedAt Scan";
scanState = CreatedAtScan{};
auto *state = std::get_if<CreatedAtScan>(&scanState);
indexDbi = env.dbi_Event__created_at;
@ -216,7 +204,6 @@ struct DBScan {
if (doPause()) {
resumeKey = std::string(k);
resumeVal = lmdb::from_sv<uint64_t>(v);
LI << "SAVING resumeKey: " << to_hex(resumeKey) << " / " << resumeVal;
pause = true;
return false;
}
@ -295,10 +282,17 @@ struct DBScanQuery : NonCopyable {
bool dead = false;
std::unordered_set<uint64_t> alreadySentEvents; // FIXME: flat_set here, or roaring bitmap/judy/whatever
uint64_t currScanTime = 0;
uint64_t currScanSaveRestores = 0;
uint64_t currScanRecordsTraversed = 0;
uint64_t currScanRecordsFound = 0;
uint64_t totalScanTime = 0;
DBScanQuery(Subscription &sub_) : sub(std::move(sub_)) {}
// If scan is complete, returns true
bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, std::function<void(const Subscription &, uint64_t)> cb) {
bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, bool logMetrics, std::function<void(const Subscription &, uint64_t)> cb) {
uint64_t startTime = hoytech::curr_time_us();
while (filterGroupIndex < sub.filterGroup.size()) {
@ -312,15 +306,61 @@ struct DBScanQuery : NonCopyable {
if (alreadySentEvents.find(quadId) != alreadySentEvents.end()) return;
alreadySentEvents.insert(quadId);
currScanRecordsFound++;
cb(sub, quadId);
}, [&]{
currScanRecordsTraversed++;
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;
});
if (!complete) return false;
currScanTime += hoytech::curr_time_us() - startTime;
if (!complete) {
currScanSaveRestores++;
return false;
}
totalScanTime += currScanTime;
if (logMetrics) {
std::string scanType = "?";
if (std::get_if<DBScan::IdScan>(&scanner->scanState)) {
scanType = "Id";
} else if (std::get_if<DBScan::PubkeyKindScan>(&scanner->scanState)) {
scanType = "PubkeyKind";
} else if (std::get_if<DBScan::PubkeyScan>(&scanner->scanState)) {
scanType = "Pubkey";
} else if (std::get_if<DBScan::TagScan>(&scanner->scanState)) {
scanType = "Tag";
} else if (std::get_if<DBScan::KindScan>(&scanner->scanState)) {
scanType = "Kind";
} else if (std::get_if<DBScan::CreatedAtScan>(&scanner->scanState)) {
scanType = "CreatedAt";
}
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
<< " scan=" << scanType
<< " indexOnly=" << scanner->f.indexOnlyScans
<< " time=" << currScanTime << "us"
<< " saveRestores=" << currScanSaveRestores
<< " recsFound=" << currScanRecordsFound
<< " recsScanned=" << currScanRecordsTraversed
;
}
filterGroupIndex++;
scanner.reset();
currScanTime = 0;
currScanSaveRestores = 0;
currScanRecordsTraversed = 0;
currScanRecordsFound = 0;
}
if (logMetrics) {
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
<< " totalTime=" << totalScanTime << "us"
;
}
return true;

View File

@ -45,7 +45,7 @@ void RelayServer::cleanupOldEvents() {
}
if (expiredEvents.size() > 0) {
LI << "Deleting " << expiredEvents.size() << " old events";
LI << "Deleting " << expiredEvents.size() << " ephemeral events";
auto txn = env.txn_rw();

View File

@ -17,6 +17,8 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
if (msg->payload.starts_with('[')) {
auto payload = tao::json::from_string(msg->payload);
if (cfg().relay__logging__dumpInAll) LI << "[" << msg->connId << "] dumpInAll: " << msg->payload;
if (!payload.is_array()) throw herr("message is not an array");
auto &arr = payload.get_array();
if (arr.size() < 2) throw herr("bad message");
@ -24,6 +26,8 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
auto &cmd = arr[0].get_string();
if (cmd == "EVENT") {
if (cfg().relay__logging__dumpInEvents) LI << "[" << msg->connId << "] dumpInEvent: " << msg->payload;
try {
ingesterProcessEvent(txn, msg->connId, secpCtx, arr[1], writerMsgs);
} catch (std::exception &e) {
@ -31,12 +35,16 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
LI << "Rejected invalid event: " << e.what();
}
} else if (cmd == "REQ") {
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;
try {
ingesterProcessReq(txn, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad req: ") + e.what());
}
} else if (cmd == "CLOSE") {
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;
try {
ingesterProcessClose(txn, msg->connId, arr);
} catch (std::exception &e) {

View File

@ -63,7 +63,7 @@ struct ActiveQueries : NonCopyable {
return;
}
bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, [&](const auto &sub, uint64_t quadId){
bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){
server->sendEvent(sub.connId, sub.subId, getEventJson(txn, quadId));
});

View File

@ -59,7 +59,7 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
DBScanQuery query(sub);
while (1) {
bool complete = query.process(txn, MAX_U64, [&](const auto &sub, uint64_t quadId){
bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){
quadEventIds.push_back(quadId);
});

View File

@ -10,7 +10,7 @@
static const char USAGE[] =
R"(
Usage:
scan [--pause=<pause>] <filter>
scan [--pause=<pause>] [--metrics] <filter>
)";
@ -20,6 +20,9 @@ void cmd_scan(const std::vector<std::string> &subArgs) {
uint64_t pause = 0;
if (args["--pause"]) pause = args["--pause"].asLong();
bool metrics = false;
if (args["--metrics"]) metrics = true;
std::string filterStr = args["<filter>"].asString();
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
@ -32,7 +35,7 @@ void cmd_scan(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
while (1) {
bool complete = query.process(txn, pause ? pause : MAX_U64, [&](const auto &sub, uint64_t quadId){
bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t quadId){
std::cout << getEventJson(txn, quadId) << "\n";
});

View File

@ -167,7 +167,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
while (1) {
bool complete = query.process(txn, MAX_U64, [&](const auto &sub, uint64_t quadId){
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t quadId){
quadEventIds.push_back(quadId);
});

View File

@ -41,6 +41,20 @@ relay {
# Maximum records that can be returned per filter
maxFilterLimit = 500
logging {
# Dump all incoming messages
dumpInAll = false
# Dump all incoming EVENT messages
dumpInEvents = false
# Dump all incoming REQ/CLOSE messages
dumpInReqs = false
# Log performance metrics for initial REQ database scans
dbScanPerf = true
}
numThreads {
ingester = 3
@ -72,5 +86,5 @@ events {
maxNumTags = 250
# Maximum size for tag values, in bytes
maxTagValSize = 128
maxTagValSize = 255
}