From 60628d18c370dabc1269b20307c3d3f6c52af521 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Wed, 25 Jan 2023 00:26:03 -0500 Subject: [PATCH] DB compression --- Makefile | 2 +- src/Decompressor.cpp | 5 + src/Decompressor.h | 69 ++++++++++++ src/RelayReqMonitor.cpp | 5 +- src/RelayReqWorker.cpp | 7 +- src/RelayServer.h | 17 +-- src/RelayWebsocket.cpp | 62 ++--------- src/cmd_dict.cpp | 236 ++++++++++++++++++++++++++++++++++++++++ src/cmd_export.cpp | 4 +- src/cmd_monitor.cpp | 3 +- src/cmd_scan.cpp | 9 +- src/cmd_stream.cpp | 3 +- src/events.cpp | 41 ++++++- src/events.h | 4 +- src/filters.h | 12 +- src/render.h | 44 ++++++++ test/filterFuzzTest.pl | 8 +- test/strfry.conf | 6 - 18 files changed, 446 insertions(+), 91 deletions(-) create mode 100644 src/Decompressor.cpp create mode 100644 src/Decompressor.h create mode 100644 src/cmd_dict.cpp create mode 100644 src/render.h delete mode 100644 test/strfry.conf diff --git a/Makefile b/Makefile index 708da0b..87d74c9 100644 --- a/Makefile +++ b/Makefile @@ -3,4 +3,4 @@ OPT = -O3 -g include golpe/rules.mk -LDLIBS += -lsecp256k1 -lb2 +LDLIBS += -lsecp256k1 -lb2 -lzstd diff --git a/src/Decompressor.cpp b/src/Decompressor.cpp new file mode 100644 index 0000000..7455ddb --- /dev/null +++ b/src/Decompressor.cpp @@ -0,0 +1,5 @@ +#include "golpe.h" + +#include "Decompressor.h" + +DictionaryBroker globalDictionaryBroker; diff --git a/src/Decompressor.h b/src/Decompressor.h new file mode 100644 index 0000000..0a7a6ef --- /dev/null +++ b/src/Decompressor.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include + +#include +#include + +#include "golpe.h" + + +struct DictionaryBroker { + std::mutex mutex; + std::unordered_map dicts; + + ZSTD_DDict *getDict(lmdb::txn &txn, uint32_t dictId) { + std::lock_guard guard(mutex); + + auto it = dicts.find(dictId); + if (it != dicts.end()) return it->second; + + auto view = env.lookup_CompressionDictionary(txn, dictId); + if (!view) throw herr("couldn't find dictId ", dictId); + auto dictBuffer = view->dict(); + + auto *dict = dicts[dictId] = ZSTD_createDDict(dictBuffer.data(), dictBuffer.size()); + + return dict; + } +}; + +extern DictionaryBroker globalDictionaryBroker; + + +struct Decompressor { + ZSTD_DCtx *dctx; + std::unordered_map dicts; + std::string buffer; + + Decompressor() { + dctx = ZSTD_createDCtx(); + } + + ~Decompressor() { + ZSTD_freeDCtx(dctx); + } + + void reserve(size_t n) { + buffer.resize(n); + } + + // Return result only valid until one of: a) next call to decompress()/reserve(), or Decompressor destroyed + + std::string_view decompress(lmdb::txn &txn, uint32_t dictId, std::string_view src) { + auto it = dicts.find(dictId); + ZSTD_DDict *dict; + + if (it == dicts.end()) { + dict = dicts[dictId] = globalDictionaryBroker.getDict(txn, dictId); + } else { + dict = it->second; + } + + auto ret = ZSTD_decompress_usingDDict(dctx, buffer.data(), buffer.size(), src.data(), src.size(), dict); + if (ZDICT_isError(ret)) throw herr("zstd decompression failed: ", ZSTD_getErrorName(ret)); + + return std::string_view(buffer.data(), ret); + } +}; diff --git a/src/RelayReqMonitor.cpp b/src/RelayReqMonitor.cpp index 7c513ab..d9feb80 100644 --- a/src/RelayReqMonitor.cpp +++ b/src/RelayReqMonitor.cpp @@ -14,6 +14,7 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { }); + Decompressor decomp; ActiveMonitors monitors; uint64_t currEventId = MAX_U64; @@ -29,7 +30,7 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { if (auto msg = std::get_if(&newMsg.msg)) { env.foreach_Event(txn, [&](auto &ev){ if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) { - sendEvent(msg->sub.connId, msg->sub.subId, getEventJson(txn, ev.primaryKeyId)); + sendEvent(msg->sub.connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId)); } return true; @@ -45,7 +46,7 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { } else if (std::get_if(&newMsg.msg)) { env.foreach_Event(txn, [&](auto &ev){ monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){ - sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, levId))); + sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, decomp, levId))); }); return true; }, false, currEventId + 1); diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index 32e1a3b..5b00d03 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -4,6 +4,7 @@ struct ActiveQueries : NonCopyable { + Decompressor decomp; using ConnQueries = std::map; std::map conns; // connId -> subId -> DBScanQuery* std::deque running; @@ -63,8 +64,12 @@ struct ActiveQueries : NonCopyable { return; } + auto cursor = lmdb::cursor::open(txn, env.dbi_EventPayload); + bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){ - server->sendEvent(sub.connId, sub.subId, getEventJson(txn, levId)); + std::string_view key = lmdb::to_sv(levId), val; + if (!cursor.get(key, val, MDB_SET_KEY)) throw herr("couldn't find event in EventPayload, corrupted DB?"); + server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, val, nullptr, nullptr)); }); if (complete) { diff --git a/src/RelayServer.h b/src/RelayServer.h index bcad465..00b1ea1 100644 --- a/src/RelayServer.h +++ b/src/RelayServer.h @@ -168,23 +168,24 @@ struct RelayServer { hubTrigger->send(); } - void sendToConn(uint64_t connId, std::string &payload) { - tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(payload)}}); - hubTrigger->send(); - } - void sendToConnBinary(uint64_t connId, std::string &&payload) { tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::SendBinary{connId, std::move(payload)}}); hubTrigger->send(); } void sendEvent(uint64_t connId, const SubId &subId, std::string_view evJson) { - std::string reply = std::string("[\"EVENT\",\""); - reply += subId.sv(); + auto subIdSv = subId.sv(); + + std::string reply; + reply.reserve(13 + subIdSv.size() + evJson.size()); + + reply += "[\"EVENT\",\""; + reply += subIdSv; reply += "\","; reply += evJson; reply += "]"; - sendToConn(connId, reply); + + sendToConn(connId, std::move(reply)); } void sendEventToBatch(RecipientList &&list, std::string &&evJson) { diff --git a/src/RelayWebsocket.cpp b/src/RelayWebsocket.cpp index cea338c..0e856a8 100644 --- a/src/RelayWebsocket.cpp +++ b/src/RelayWebsocket.cpp @@ -1,6 +1,5 @@ -#include - #include "RelayServer.h" +#include "render.h" #include "app_git_version.h" @@ -19,46 +18,6 @@ static std::string preGenerateHttpResponse(const std::string &contentType, const }; -static std::string renderSize(uint64_t si) { - if (si < 1024) return std::to_string(si) + "b"; - - double s = si; - char buf[128]; - char unit; - - do { - s /= 1024; - if (s < 1024) { - unit = 'K'; - break; - } - - s /= 1024; - if (s < 1024) { - unit = 'M'; - break; - } - - s /= 1024; - if (s < 1024) { - unit = 'G'; - break; - } - - s /= 1024; - unit = 'T'; - } while(0); - - ::snprintf(buf, sizeof(buf), "%.2f%c", s, unit); - return std::string(buf); -} - -static std::string renderPercent(double p) { - char buf[128]; - ::snprintf(buf, sizeof(buf), "%.1f%%", p * 100); - return std::string(buf); -} - void RelayServer::runWebsocket(ThreadPool::Thread &thr) { struct Connection { @@ -198,15 +157,18 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { } else if (auto msg = std::get_if(&newMsg.msg)) { doSend(msg->connId, msg->payload, uWS::OpCode::BINARY); } else if (auto msg = std::get_if(&newMsg.msg)) { - for (auto &item : msg->list) { - tempBuf.clear(); - tempBuf += "[\"EVENT\",\""; - tempBuf += item.subId.sv(); - tempBuf += "\","; - tempBuf += msg->evJson; - tempBuf += "]"; + tempBuf.reserve(13 + MAX_SUBID_SIZE + msg->evJson.size()); + tempBuf.resize(10 + MAX_SUBID_SIZE); + tempBuf += "\","; + tempBuf += msg->evJson; + tempBuf += "]"; - doSend(item.connId, tempBuf, uWS::OpCode::TEXT); + for (auto &item : msg->list) { + auto subIdSv = item.subId.sv(); + auto *p = tempBuf.data() + MAX_SUBID_SIZE - subIdSv.size(); + memcpy(p, "[\"EVENT\",\"", 10); + memcpy(p + 10, subIdSv.data(), subIdSv.size()); + doSend(item.connId, std::string_view(p, 13 + subIdSv.size() + msg->evJson.size()), uWS::OpCode::TEXT); } } } diff --git a/src/cmd_dict.cpp b/src/cmd_dict.cpp new file mode 100644 index 0000000..fdf841d --- /dev/null +++ b/src/cmd_dict.cpp @@ -0,0 +1,236 @@ +#include +#include + +#include +#include + +#include +#include "golpe.h" + +#include "DBScan.h" +#include "events.h" +#include "render.h" + + +static const char USAGE[] = +R"( + Usage: + dict stats [--filter=] + dict train [--filter=] [--limit=] [--dictSize=] + dict compress [--filter=] [--dictId=] [--level=] + dict decompress [--filter=] +)"; + + +void cmd_dict(const std::vector &subArgs) { + std::map args = docopt::docopt(USAGE, subArgs, true, ""); + + std::string filterStr; + if (args["--filter"]) filterStr = args["--filter"].asString(); + else filterStr = "{}"; + + uint64_t limit = MAX_U64; + if (args["--limit"]) limit = args["--limit"].asLong(); + + uint64_t dictSize = 100'000; + if (args["--dictSize"]) dictSize = args["--dictSize"].asLong(); + + uint64_t dictId = 0; + if (args["--dictId"]) dictId = args["--dictId"].asLong(); + + int level = 3; + if (args["--level"]) level = args["--level"].asLong(); + + + Decompressor decomp; + std::vector levIds; + + { + auto txn = env.txn_ro(); + + auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64); + Subscription sub(1, "junkSub", filterGroup); + DBScanQuery query(sub); + + while (1) { + bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){ + levIds.push_back(levId); + }); + + if (complete) break; + } + + LI << "Filter matched " << levIds.size() << " records"; + } + + + if (args["stats"].asBool()) { + uint64_t totalSize = 0; + uint64_t totalCompressedSize = 0; + uint64_t numCompressed = 0; + + auto txn = env.txn_ro(); + + std::map dicts; + + env.foreach_CompressionDictionary(txn, [&](auto &view){ + auto dictId = view.primaryKeyId; + if (!dicts.contains(dictId)) dicts[dictId] = 0; + return true; + }); + + for (auto levId : levIds) { + std::string_view raw; + + bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), raw); + if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?"); + + uint32_t dictId; + size_t outCompressedSize; + + auto json = decodeEventPayload(txn, decomp, raw, &dictId, &outCompressedSize); + + totalSize += json.size(); + totalCompressedSize += dictId ? outCompressedSize : json.size(); + + if (dictId) { + numCompressed++; + dicts[dictId]++; + } + } + + auto ratio = renderPercent(1.0 - (double)totalCompressedSize / totalSize); + + std::cout << "Num compressed: " << numCompressed << " / " << levIds.size() << "\n"; + std::cout << "Uncompressed size: " << renderSize(totalSize) << "\n"; + std::cout << "Compressed size: " << renderSize(totalCompressedSize) << " (" << ratio << ")" << "\n"; + std::cout << "\ndictId : events\n"; + + for (auto &[dictId, n] : dicts) { + std::cout << " " << dictId << " : " << n << "\n"; + } + } else if (args["train"].asBool()) { + std::string trainingBuf; + std::vector trainingSizes; + + { + auto txn = env.txn_ro(); + + if (levIds.size() > limit) { + LI << "Randomly selecting " << limit << " records"; + std::random_device rd; + std::mt19937 g(rd()); + std::shuffle(levIds.begin(), levIds.end(), g); + levIds.resize(limit); + } + + for (auto levId : levIds) { + std::string json = std::string(getEventJson(txn, decomp, levId)); + trainingBuf += json; + trainingSizes.emplace_back(json.size()); + } + } + + std::string dict(dictSize, '\0'); + + LI << "Performing zstd training..."; + + auto ret = ZDICT_trainFromBuffer(dict.data(), dict.size(), trainingBuf.data(), trainingSizes.data(), trainingSizes.size()); + if (ZDICT_isError(ret)) throw herr("zstd training failed: ", ZSTD_getErrorName(ret)); + + { + auto txn = env.txn_rw(); + + uint64_t newDictId = env.insert_CompressionDictionary(txn, dict); + + std::cout << "Saved new dictionary, dictId = " << newDictId << std::endl; + + txn.commit(); + } + } else if (args["compress"].asBool()) { + if (dictId == 0) throw herr("specify --dictId or --decompress"); + + auto txn = env.txn_rw(); + + auto view = env.lookup_CompressionDictionary(txn, dictId); + if (!view) throw herr("couldn't find dictId ", dictId); + auto dict = view->dict(); + + auto *cctx = ZSTD_createCCtx(); + auto *cdict = ZSTD_createCDict(dict.data(), dict.size(), level); + + uint64_t origSizes = 0; + uint64_t compressedSizes = 0; + uint64_t pendingFlush = 0; + uint64_t processed = 0; + + std::string compressedData(500'000, '\0'); + + for (auto levId : levIds) { + auto orig = getEventJson(txn, decomp, levId); + auto ret = ZSTD_compress_usingCDict(cctx, compressedData.data(), compressedData.size(), orig.data(), orig.size(), cdict); + if (ZDICT_isError(ret)) throw herr("zstd compression failed: ", ZSTD_getErrorName(ret)); + + origSizes += orig.size(); + compressedSizes += ret; + + std::string newVal; + + if (ret + 4 < orig.size()) { + newVal += '\x01'; + newVal += lmdb::to_sv(dictId); + newVal += std::string_view(compressedData.data(), ret); + } else { + newVal += '\x00'; + newVal += orig; + } + + env.dbi_EventPayload.put(txn, lmdb::to_sv(levId), newVal); + + pendingFlush++; + processed++; + if (pendingFlush > 10'000) { + txn.commit(); + + LI << "Progress: " << processed << "/" << levIds.size(); + pendingFlush = 0; + + txn = env.txn_rw(); + } + } + + txn.commit(); + + LI << "Original event sizes: " << origSizes; + LI << "New event sizes: " << compressedSizes; + } else if (args["decompress"].asBool()) { + auto txn = env.txn_rw(); + + uint64_t pendingFlush = 0; + uint64_t processed = 0; + + for (auto levId : levIds) { + auto orig = getEventJson(txn, decomp, levId); + + std::string newVal; + + newVal += '\x00'; + newVal += orig; + + env.dbi_EventPayload.put(txn, lmdb::to_sv(levId), newVal); + + pendingFlush++; + processed++; + if (pendingFlush > 10'000) { + txn.commit(); + + LI << "Progress: " << processed << "/" << levIds.size(); + pendingFlush = 0; + + txn = env.txn_rw(); + } + } + + txn.commit(); + } +} diff --git a/src/cmd_export.cpp b/src/cmd_export.cpp index 5228d41..90e94fd 100644 --- a/src/cmd_export.cpp +++ b/src/cmd_export.cpp @@ -20,6 +20,8 @@ void cmd_export(const std::vector &subArgs) { if (args["--since"]) since = args["--since"].asLong(); if (args["--until"]) until = args["--until"].asLong(); + Decompressor decomp; + auto txn = env.txn_ro(); env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv(since), lmdb::to_sv(0), [&](auto k, auto v) { @@ -32,7 +34,7 @@ void cmd_export(const std::vector &subArgs) { if (isEphemeralEvent(view->flat_nested()->kind())) return true; } - std::cout << getEventJson(txn, view->primaryKeyId) << "\n"; + std::cout << getEventJson(txn, decomp, view->primaryKeyId) << "\n"; return true; }); diff --git a/src/cmd_monitor.cpp b/src/cmd_monitor.cpp index 91ad0c8..6c7304f 100644 --- a/src/cmd_monitor.cpp +++ b/src/cmd_monitor.cpp @@ -21,6 +21,7 @@ void cmd_monitor(const std::vector &subArgs) { auto txn = env.txn_ro(); + Decompressor decomp; ActiveMonitors monitors; std::string line; @@ -57,7 +58,7 @@ void cmd_monitor(const std::vector &subArgs) { monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){ for (auto &r : recipients) { if (r.connId == interestConnId && r.subId.str() == interestSubId) { - std::cout << getEventJson(txn, levId) << "\n"; + std::cout << getEventJson(txn, decomp, levId) << "\n"; } } }); diff --git a/src/cmd_scan.cpp b/src/cmd_scan.cpp index 10efce0..b533ac4 100644 --- a/src/cmd_scan.cpp +++ b/src/cmd_scan.cpp @@ -23,20 +23,21 @@ void cmd_scan(const std::vector &subArgs) { bool metrics = false; if (args["--metrics"]) metrics = true; - std::string filterStr = args[""].asString(); - auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr)); + + auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64); Subscription sub(1, "junkSub", filterGroup); - DBScanQuery query(sub); + Decompressor decomp; + auto txn = env.txn_ro(); while (1) { bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t levId){ - std::cout << getEventJson(txn, levId) << "\n"; + std::cout << getEventJson(txn, decomp, levId) << "\n"; }); if (complete) break; diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index ef54212..9917d10 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -34,6 +34,7 @@ void cmd_stream(const std::vector &subArgs) { std::unordered_set downloadedIds; WriterPipeline writer; WSConnection ws(url); + Decompressor decomp; ws.onConnect = [&]{ if (dir == "down" || dir == "both") { @@ -98,7 +99,7 @@ void cmd_stream(const std::vector &subArgs) { } std::string msg = std::string("[\"EVENT\","); - msg += getEventJson(txn, ev.primaryKeyId); + msg += getEventJson(txn, decomp, ev.primaryKeyId); msg += "]"; ws.send(msg); diff --git a/src/events.cpp b/src/events.cpp index 52dc3e5..80364e6 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -169,13 +169,44 @@ uint64_t getMostRecentLevId(lmdb::txn &txn) { return levId; } -std::string_view getEventJson(lmdb::txn &txn, uint64_t levId) { - std::string_view raw; - bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), raw); - if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?"); - return raw.substr(1); + +// Return result validity same as getEventJson(), see below + +std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::string_view raw, uint32_t *outDictId, size_t *outCompressedSize) { + if (raw.size() == 0) throw herr("empty event in EventPayload"); + + if (raw[0] == '\x00') { + if (outDictId) *outDictId = 0; + return raw.substr(1); + } else if (raw[0] == '\x01') { + raw = raw.substr(1); + if (raw.size() < 4) throw herr("EventPayload record too short to read dictId"); + uint32_t dictId = lmdb::from_sv(raw.substr(0, 4)); + raw = raw.substr(4); + + decomp.reserve(cfg().events__maxEventSize); + std::string_view buf = decomp.decompress(txn, dictId, raw); + + if (outDictId) *outDictId = dictId; + if (outCompressedSize) *outCompressedSize = raw.size(); + return buf; + } else { + throw("Unexpected first byte in EventPayload"); + } } +// Return result only valid until on of: next call to getEventJson/decodeEventPayload, write on or closing of txn, or any action on decomp object + +std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId) { + std::string_view raw; + + bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), raw); + if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?"); + + return decodeEventPayload(txn, decomp, raw, nullptr, nullptr); +} + + void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector &evs) { diff --git a/src/events.h b/src/events.h index d6529b8..b87b49b 100644 --- a/src/events.h +++ b/src/events.h @@ -4,6 +4,7 @@ #include "golpe.h" +#include "Decompressor.h" #include "constants.h" @@ -46,7 +47,8 @@ inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) { std::optional lookupEventById(lmdb::txn &txn, std::string_view id); uint64_t getMostRecentLevId(lmdb::txn &txn); -std::string_view getEventJson(lmdb::txn &txn, uint64_t levId); +std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::string_view raw, uint32_t *outDictId, size_t *outCompressedSize); +std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId); inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) { return quadrable::Key::fromIntegerAndHash(flat->created_at(), sv(flat->id()).substr(0, 23)); diff --git a/src/filters.h b/src/filters.h index 02f121d..ab1d054 100644 --- a/src/filters.h +++ b/src/filters.h @@ -122,7 +122,7 @@ struct NostrFilter { bool neverMatch = false; bool indexOnlyScans = false; - explicit NostrFilter(const tao::json::value &filterObj) { + explicit NostrFilter(const tao::json::value &filterObj, uint64_t maxFilterLimit) { uint64_t numMajorFields = 0; for (const auto &[k, v] : filterObj.get_object()) { @@ -166,7 +166,7 @@ struct NostrFilter { if (tags.size() > 2) throw herr("too many tags in filter"); // O(N^2) in matching, just prohibit it - if (limit > cfg().relay__maxFilterLimit) limit = cfg().relay__maxFilterLimit; + if (limit > maxFilterLimit) limit = maxFilterLimit; indexOnlyScans = numMajorFields <= 1; // FIXME: pubkeyKind scan could be serviced index-only too @@ -219,18 +219,18 @@ struct NostrFilterGroup { std::vector filters; // Note that this expects the full array, so the first two items are "REQ" and the subId - NostrFilterGroup(const tao::json::value &req) { + NostrFilterGroup(const tao::json::value &req, uint64_t maxFilterLimit = cfg().relay__maxFilterLimit) { const auto &arr = req.get_array(); if (arr.size() < 3) throw herr("too small"); for (size_t i = 2; i < arr.size(); i++) { - filters.emplace_back(arr[i]); + filters.emplace_back(arr[i], maxFilterLimit); if (filters.back().neverMatch) filters.pop_back(); } } // Hacky! Deserves a refactor - static NostrFilterGroup unwrapped(tao::json::value filter) { + static NostrFilterGroup unwrapped(tao::json::value filter, uint64_t maxFilterLimit = cfg().relay__maxFilterLimit) { if (!filter.is_array()) { filter = tao::json::value::array({ filter }); } @@ -241,7 +241,7 @@ struct NostrFilterGroup { pretendReqQuery.push_back(e); } - return NostrFilterGroup(pretendReqQuery); + return NostrFilterGroup(pretendReqQuery, maxFilterLimit); } bool doesMatch(const NostrIndex::Event *ev) const { diff --git a/src/render.h b/src/render.h new file mode 100644 index 0000000..2acfb6b --- /dev/null +++ b/src/render.h @@ -0,0 +1,44 @@ +#pragma once + +#include + + +inline std::string renderSize(uint64_t si) { + if (si < 1024) return std::to_string(si) + "b"; + + double s = si; + char buf[128]; + char unit; + + do { + s /= 1024; + if (s < 1024) { + unit = 'K'; + break; + } + + s /= 1024; + if (s < 1024) { + unit = 'M'; + break; + } + + s /= 1024; + if (s < 1024) { + unit = 'G'; + break; + } + + s /= 1024; + unit = 'T'; + } while(0); + + ::snprintf(buf, sizeof(buf), "%.2f%c", s, unit); + return std::string(buf); +} + +inline std::string renderPercent(double p) { + char buf[128]; + ::snprintf(buf, sizeof(buf), "%.1f%%", p * 100); + return std::string(buf); +} diff --git a/test/filterFuzzTest.pl b/test/filterFuzzTest.pl index eab026d..ec5d869 100644 --- a/test/filterFuzzTest.pl +++ b/test/filterFuzzTest.pl @@ -188,8 +188,8 @@ sub testScan { #print JSON::XS->new->pretty(1)->encode($fg); print "$fge\n"; - my $resA = `./strfry --config test/strfry.conf export 2>/dev/null | perl test/dumbFilter.pl '$fge' | jq -r .pubkey | sort | sha256sum`; - my $resB = `./strfry --config test/strfry.conf scan '$fge' | jq -r .pubkey | sort | sha256sum`; + my $resA = `./strfry export 2>/dev/null | perl test/dumbFilter.pl '$fge' | jq -r .pubkey | sort | sha256sum`; + my $resB = `./strfry scan '$fge' | jq -r .pubkey | sort | sha256sum`; print "$resA\n$resB\n"; @@ -220,7 +220,7 @@ if ($cmd eq 'scan') { print "filt: $fge\n\n"; print "DOING MONS\n"; - my $pid = open2(my $outfile, my $infile, './strfry --config test/strfry.conf monitor | jq -r .pubkey | sort | sha256sum'); + my $pid = open2(my $outfile, my $infile, './strfry monitor | jq -r .pubkey | sort | sha256sum'); for my $c (@$monCmds) { print $infile encode_json($c), "\n"; } close($infile); @@ -231,7 +231,7 @@ if ($cmd eq 'scan') { die "monitor cmd died" if $child_exit_status; print "DOING SCAN\n"; - my $resB = `./strfry --config test/strfry.conf scan '$fge' 2>/dev/null | jq -r .pubkey | sort | sha256sum`; + my $resB = `./strfry scan '$fge' 2>/dev/null | jq -r .pubkey | sort | sha256sum`; print "$resA\n$resB\n"; diff --git a/test/strfry.conf b/test/strfry.conf deleted file mode 100644 index 1b4b462..0000000 --- a/test/strfry.conf +++ /dev/null @@ -1,6 +0,0 @@ -db = "./strfry-db/" - -relay { - port = 7777 - maxFilterLimit = 1000000000000 -}