From 8d0c9952ab7c025583ba93a9330a05cba57949a7 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Sun, 29 Jan 2023 17:31:28 -0500 Subject: [PATCH] flat maps/sets --- TODO | 4 ---- golpe | 2 +- golpe.yaml | 1 + src/ActiveMonitors.h | 18 +++++++++--------- src/DBScan.h | 4 ++-- src/Decompressor.h | 5 ++--- src/RelayReqWorker.cpp | 4 ++-- src/RelayWebsocket.cpp | 2 +- src/RelayYesstr.cpp | 2 +- src/Subscription.h | 15 +++++++++++++-- src/cmd_dict.cpp | 2 +- src/cmd_stream.cpp | 2 +- src/filters.h | 2 +- src/gc.h | 4 +--- src/global.h | 6 ++++++ 15 files changed, 42 insertions(+), 31 deletions(-) create mode 100644 src/global.h diff --git a/TODO b/TODO index e062d76..844145a 100644 --- a/TODO +++ b/TODO @@ -9,11 +9,7 @@ features * `strfry sync` command always takes at least 1 second due to batching delay. figure out better way to flush bool values in config config for compression - config for TCP keepalive - db versioning - document config options, detailed default config file less verbose default logging - nice new config "units" feature, ie 1d instead of 86400 make it easier for a thread to setup a quadrable env rate limits diff --git a/golpe b/golpe index ea1ea8f..dd543e3 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit ea1ea8f5ce1208fef8fe895f68b623369276f8de +Subproject commit dd543e3fef89e976db92b6b4420bdac6fa2e2257 diff --git a/golpe.yaml b/golpe.yaml index c66eadd..c13e4b1 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -1,6 +1,7 @@ appName: strfry quadrable: true onAppStartup: true +useGlobalH: true flatBuffers: | include "../fbs/nostr-index.fbs"; diff --git a/src/ActiveMonitors.h b/src/ActiveMonitors.h index e192c15..3aa6890 100644 --- a/src/ActiveMonitors.h +++ b/src/ActiveMonitors.h @@ -15,19 +15,19 @@ struct ActiveMonitors : NonCopyable { Monitor(Subscription &sub_) : sub(std::move(sub_)) {} }; - using ConnMonitor = std::map; - std::map conns; // connId -> subId -> Monitor + using ConnMonitor = flat_hash_map; + flat_hash_map conns; // connId -> subId -> Monitor struct MonitorItem { Monitor *mon; uint64_t latestEventId; }; - using MonitorSet = std::map; // FIXME: flat_map here - std::map allIds; - std::map allAuthors; - std::map allTags; - std::map allKinds; + using MonitorSet = flat_hash_map; + btree_map allIds; + btree_map allAuthors; + btree_map allTags; + btree_map allKinds; MonitorSet allOthers; std::string tagSpecBuf = std::string(256, '\0'); @@ -92,7 +92,7 @@ struct ActiveMonitors : NonCopyable { } }; - auto processMonitorsPrefix = [&](std::map &m, const std::string &key, std::function matches){ + auto processMonitorsPrefix = [&](btree_map &m, const std::string &key, std::function matches){ auto it = m.lower_bound(key.substr(0, 1)); if (it == m.end()) return; @@ -103,7 +103,7 @@ struct ActiveMonitors : NonCopyable { } }; - auto processMonitorsExact = [&](std::map &m, const T &key, std::function matches){ + auto processMonitorsExact = [&](btree_map &m, const T &key, std::function matches){ auto it = m.upper_bound(key); if (it == m.begin()) return; diff --git a/src/DBScan.h b/src/DBScan.h index dddc178..b463bd5 100644 --- a/src/DBScan.h +++ b/src/DBScan.h @@ -30,7 +30,7 @@ struct DBScan { }; struct TagScan { - std::map::const_iterator indexTagName; + flat_hash_map::const_iterator indexTagName; size_t indexTagVal = 0; std::string search; }; @@ -295,7 +295,7 @@ struct DBScanQuery : NonCopyable { size_t filterGroupIndex = 0; bool dead = false; - std::unordered_set alreadySentEvents; // FIXME: flat_set here, or roaring bitmap/judy/whatever + flat_hash_set alreadySentEvents; uint64_t currScanTime = 0; uint64_t currScanSaveRestores = 0; diff --git a/src/Decompressor.h b/src/Decompressor.h index 0a7a6ef..a0f9905 100644 --- a/src/Decompressor.h +++ b/src/Decompressor.h @@ -3,7 +3,6 @@ #include #include -#include #include #include "golpe.h" @@ -11,7 +10,7 @@ struct DictionaryBroker { std::mutex mutex; - std::unordered_map dicts; + flat_hash_map dicts; ZSTD_DDict *getDict(lmdb::txn &txn, uint32_t dictId) { std::lock_guard guard(mutex); @@ -34,7 +33,7 @@ extern DictionaryBroker globalDictionaryBroker; struct Decompressor { ZSTD_DCtx *dctx; - std::unordered_map dicts; + flat_hash_map dicts; std::string buffer; Decompressor() { diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index 5b00d03..30ca2f0 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -5,8 +5,8 @@ struct ActiveQueries : NonCopyable { Decompressor decomp; - using ConnQueries = std::map; - std::map conns; // connId -> subId -> DBScanQuery* + using ConnQueries = flat_hash_map; + flat_hash_map conns; // connId -> subId -> DBScanQuery* std::deque running; void addSub(lmdb::txn &txn, Subscription &&sub) { diff --git a/src/RelayWebsocket.cpp b/src/RelayWebsocket.cpp index 0e856a8..a47c9f4 100644 --- a/src/RelayWebsocket.cpp +++ b/src/RelayWebsocket.cpp @@ -40,7 +40,7 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { uWS::Hub hub; uWS::Group *hubGroup; - std::map connIdToConnection; + flat_hash_map connIdToConnection; uint64_t nextConnectionId = 1; std::string tempBuf; diff --git a/src/RelayYesstr.cpp b/src/RelayYesstr.cpp index a81cf7c..4d5f2c5 100644 --- a/src/RelayYesstr.cpp +++ b/src/RelayYesstr.cpp @@ -20,7 +20,7 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { struct SyncStateCollection { RelayServer *server; quadrable::Quadrable *qdb; - std::map> conns; // connId -> reqId -> SyncState + flat_hash_map> conns; // connId -> reqId -> SyncState SyncStateCollection(RelayServer *server_, quadrable::Quadrable *qdb_) : server(server_), qdb(qdb_) {} diff --git a/src/Subscription.h b/src/Subscription.h index 037bf6e..181be44 100644 --- a/src/Subscription.h +++ b/src/Subscription.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "filters.h" @@ -28,10 +30,19 @@ struct SubId { std::string str() const { return std::string(sv()); } + + bool operator==(const SubId &o) const { + return o.sv() == sv(); + } }; -inline bool operator <(const SubId &s1, const SubId &s2) { - return s1.sv() < s2.sv(); +namespace std { + // inject specialization of std::hash + template<> struct hash { + std::size_t operator()(SubId const &p) const { + return phmap::HashState().combine(0, p.sv()); + } + }; } diff --git a/src/cmd_dict.cpp b/src/cmd_dict.cpp index fdf841d..79973b7 100644 --- a/src/cmd_dict.cpp +++ b/src/cmd_dict.cpp @@ -71,7 +71,7 @@ void cmd_dict(const std::vector &subArgs) { auto txn = env.txn_ro(); - std::map dicts; + btree_map dicts; env.foreach_CompressionDictionary(txn, [&](auto &view){ auto dictId = view.primaryKeyId; diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index 9917d10..7435e0b 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -31,7 +31,7 @@ void cmd_stream(const std::vector &subArgs) { if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both"); - std::unordered_set downloadedIds; + flat_hash_set downloadedIds; WriterPipeline writer; WSConnection ws(url); Decompressor decomp; diff --git a/src/filters.h b/src/filters.h index cdabe44..0ef7ded 100644 --- a/src/filters.h +++ b/src/filters.h @@ -114,7 +114,7 @@ struct NostrFilter { std::optional ids; std::optional authors; std::optional kinds; - std::map tags; + flat_hash_map tags; uint64_t since = 0; uint64_t until = MAX_U64; diff --git a/src/gc.h b/src/gc.h index 334e8bb..6bfab0c 100644 --- a/src/gc.h +++ b/src/gc.h @@ -1,14 +1,12 @@ #pragma once -#include - #include "golpe.h" #include "render.h" inline void quadrableGarbageCollect(quadrable::Quadrable &qdb, int logLevel = 0) { - quadrable::Quadrable::GarbageCollector> gc(qdb); + quadrable::Quadrable::GarbageCollector> gc(qdb); quadrable::Quadrable::GCStats stats; if (logLevel >= 2) LI << "Running garbage collection"; diff --git a/src/global.h b/src/global.h new file mode 100644 index 0000000..a7a4ce7 --- /dev/null +++ b/src/global.h @@ -0,0 +1,6 @@ +#pragma once + +#include +#include + +using namespace phmap;