diff --git a/golpe.yaml b/golpe.yaml index edf909a..d0b6d18 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -98,6 +98,19 @@ config: desc: "Maximum records that can be returned per filter" default: 500 + - name: relay__logging__dumpInAll + desc: "Dump all incoming messages" + default: false + - name: relay__logging__dumpInEvents + desc: "Dump all incoming EVENT messages" + default: false + - name: relay__logging__dumpInReqs + desc: "Dump all incoming REQ/CLOSE messages" + default: false + - name: relay__logging__dbScanPerf + desc: "Log performance metrics for initial REQ database scans" + default: false + - name: relay__numThreads__ingester default: 3 noReload: true diff --git a/src/DBScan.h b/src/DBScan.h index b69abe2..00fb479 100644 --- a/src/DBScan.h +++ b/src/DBScan.h @@ -58,8 +58,6 @@ struct DBScan { remainingLimit = f.limit; if (f.ids) { - LI << "ID Scan"; - scanState = IdScan{}; auto *state = std::get_if(&scanState); indexDbi = env.dbi_Event__id; @@ -79,8 +77,6 @@ struct DBScan { return k.starts_with(state->prefix); }; } else if (f.authors && f.kinds) { - LI << "PubkeyKind Scan"; - scanState = PubkeyKindScan{}; auto *state = std::get_if(&scanState); indexDbi = env.dbi_Event__pubkeyKind; @@ -114,8 +110,6 @@ struct DBScan { return false; }; } else if (f.authors) { - LI << "Pubkey Scan"; - scanState = PubkeyScan{}; auto *state = std::get_if(&scanState); indexDbi = env.dbi_Event__pubkey; @@ -135,8 +129,6 @@ struct DBScan { return k.starts_with(state->prefix); }; } else if (f.tags.size()) { - LI << "Tag Scan"; - scanState = TagScan{f.tags.begin()}; auto *state = std::get_if(&scanState); indexDbi = env.dbi_Event__tag; @@ -161,8 +153,6 @@ struct DBScan { return k.substr(0, state->search.size()) == state->search; }; } else if (f.kinds) { - LI << "Kind Scan"; - scanState = KindScan{}; auto *state = std::get_if(&scanState); indexDbi = env.dbi_Event__kind; @@ -183,8 +173,6 @@ struct DBScan { return parsedKey.n1 == state->kind; }; } else { - LI << "CreatedAt Scan"; - scanState = CreatedAtScan{}; auto *state = std::get_if(&scanState); indexDbi = env.dbi_Event__created_at; @@ -216,7 +204,6 @@ struct DBScan { if (doPause()) { resumeKey = std::string(k); resumeVal = lmdb::from_sv(v); - LI << "SAVING resumeKey: " << to_hex(resumeKey) << " / " << resumeVal; pause = true; return false; } @@ -295,10 +282,17 @@ struct DBScanQuery : NonCopyable { bool dead = false; std::unordered_set alreadySentEvents; // FIXME: flat_set here, or roaring bitmap/judy/whatever + uint64_t currScanTime = 0; + uint64_t currScanSaveRestores = 0; + uint64_t currScanRecordsTraversed = 0; + uint64_t currScanRecordsFound = 0; + + uint64_t totalScanTime = 0; + DBScanQuery(Subscription &sub_) : sub(std::move(sub_)) {} // If scan is complete, returns true - bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, std::function cb) { + bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, bool logMetrics, std::function cb) { uint64_t startTime = hoytech::curr_time_us(); while (filterGroupIndex < sub.filterGroup.size()) { @@ -312,15 +306,61 @@ struct DBScanQuery : NonCopyable { if (alreadySentEvents.find(quadId) != alreadySentEvents.end()) return; alreadySentEvents.insert(quadId); + currScanRecordsFound++; cb(sub, quadId); }, [&]{ + currScanRecordsTraversed++; return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; }); - if (!complete) return false; + currScanTime += hoytech::curr_time_us() - startTime; + + if (!complete) { + currScanSaveRestores++; + return false; + } + + totalScanTime += currScanTime; + + if (logMetrics) { + std::string scanType = "?"; + + if (std::get_if(&scanner->scanState)) { + scanType = "Id"; + } else if (std::get_if(&scanner->scanState)) { + scanType = "PubkeyKind"; + } else if (std::get_if(&scanner->scanState)) { + scanType = "Pubkey"; + } else if (std::get_if(&scanner->scanState)) { + scanType = "Tag"; + } else if (std::get_if(&scanner->scanState)) { + scanType = "Kind"; + } else if (std::get_if(&scanner->scanState)) { + scanType = "CreatedAt"; + } + + LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'" + << " scan=" << scanType + << " indexOnly=" << scanner->f.indexOnlyScans + << " time=" << currScanTime << "us" + << " saveRestores=" << currScanSaveRestores + << " recsFound=" << currScanRecordsFound + << " recsScanned=" << currScanRecordsTraversed + ; + } filterGroupIndex++; scanner.reset(); + currScanTime = 0; + currScanSaveRestores = 0; + currScanRecordsTraversed = 0; + currScanRecordsFound = 0; + } + + if (logMetrics) { + LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'" + << " totalTime=" << totalScanTime << "us" + ; } return true; diff --git a/src/RelayCron.cpp b/src/RelayCron.cpp index b9e20ec..c0da185 100644 --- a/src/RelayCron.cpp +++ b/src/RelayCron.cpp @@ -45,7 +45,7 @@ void RelayServer::cleanupOldEvents() { } if (expiredEvents.size() > 0) { - LI << "Deleting " << expiredEvents.size() << " old events"; + LI << "Deleting " << expiredEvents.size() << " ephemeral events"; auto txn = env.txn_rw(); diff --git a/src/RelayIngester.cpp b/src/RelayIngester.cpp index 1d8d7c6..f02ca80 100644 --- a/src/RelayIngester.cpp +++ b/src/RelayIngester.cpp @@ -17,6 +17,8 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { if (msg->payload.starts_with('[')) { auto payload = tao::json::from_string(msg->payload); + if (cfg().relay__logging__dumpInAll) LI << "[" << msg->connId << "] dumpInAll: " << msg->payload; + if (!payload.is_array()) throw herr("message is not an array"); auto &arr = payload.get_array(); if (arr.size() < 2) throw herr("bad message"); @@ -24,6 +26,8 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { auto &cmd = arr[0].get_string(); if (cmd == "EVENT") { + if (cfg().relay__logging__dumpInEvents) LI << "[" << msg->connId << "] dumpInEvent: " << msg->payload; + try { ingesterProcessEvent(txn, msg->connId, secpCtx, arr[1], writerMsgs); } catch (std::exception &e) { @@ -31,12 +35,16 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { LI << "Rejected invalid event: " << e.what(); } } else if (cmd == "REQ") { + if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload; + try { ingesterProcessReq(txn, msg->connId, arr); } catch (std::exception &e) { sendNoticeError(msg->connId, std::string("bad req: ") + e.what()); } } else if (cmd == "CLOSE") { + if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload; + try { ingesterProcessClose(txn, msg->connId, arr); } catch (std::exception &e) { diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index 6ecb92b..16391ea 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -63,7 +63,7 @@ struct ActiveQueries : NonCopyable { return; } - bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, [&](const auto &sub, uint64_t quadId){ + bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){ server->sendEvent(sub.connId, sub.subId, getEventJson(txn, quadId)); }); diff --git a/src/RelayYesstr.cpp b/src/RelayYesstr.cpp index 6f8d151..9eabc4c 100644 --- a/src/RelayYesstr.cpp +++ b/src/RelayYesstr.cpp @@ -59,7 +59,7 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { DBScanQuery query(sub); while (1) { - bool complete = query.process(txn, MAX_U64, [&](const auto &sub, uint64_t quadId){ + bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){ quadEventIds.push_back(quadId); }); diff --git a/src/cmd_scan.cpp b/src/cmd_scan.cpp index 1f3a0d0..96b0700 100644 --- a/src/cmd_scan.cpp +++ b/src/cmd_scan.cpp @@ -10,7 +10,7 @@ static const char USAGE[] = R"( Usage: - scan [--pause=] + scan [--pause=] [--metrics] )"; @@ -20,6 +20,9 @@ void cmd_scan(const std::vector &subArgs) { uint64_t pause = 0; if (args["--pause"]) pause = args["--pause"].asLong(); + bool metrics = false; + if (args["--metrics"]) metrics = true; + std::string filterStr = args[""].asString(); auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr)); @@ -32,7 +35,7 @@ void cmd_scan(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, pause ? pause : MAX_U64, [&](const auto &sub, uint64_t quadId){ + bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t quadId){ std::cout << getEventJson(txn, quadId) << "\n"; }); diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index deb9643..d6723f1 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -167,7 +167,7 @@ void cmd_sync(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, MAX_U64, [&](const auto &sub, uint64_t quadId){ + bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t quadId){ quadEventIds.push_back(quadId); }); diff --git a/strfry.conf b/strfry.conf index 196c8f3..ae7ee45 100644 --- a/strfry.conf +++ b/strfry.conf @@ -41,6 +41,20 @@ relay { # Maximum records that can be returned per filter maxFilterLimit = 500 + logging { + # Dump all incoming messages + dumpInAll = false + + # Dump all incoming EVENT messages + dumpInEvents = false + + # Dump all incoming REQ/CLOSE messages + dumpInReqs = false + + # Log performance metrics for initial REQ database scans + dbScanPerf = true + } + numThreads { ingester = 3 @@ -72,5 +86,5 @@ events { maxNumTags = 250 # Maximum size for tag values, in bytes - maxTagValSize = 128 + maxTagValSize = 255 }