flat maps/sets

This commit is contained in:
Doug Hoyte
2023-01-29 17:31:28 -05:00
parent 5117485ebf
commit 8d0c9952ab
15 changed files with 42 additions and 31 deletions

4
TODO
View File

@ -9,11 +9,7 @@ features
* `strfry sync` command always takes at least 1 second due to batching delay. figure out better way to flush
bool values in config
config for compression
config for TCP keepalive
db versioning
document config options, detailed default config file
less verbose default logging
nice new config "units" feature, ie 1d instead of 86400
make it easier for a thread to setup a quadrable env
rate limits

2
golpe

Submodule golpe updated: ea1ea8f5ce...dd543e3fef

View File

@ -1,6 +1,7 @@
appName: strfry
quadrable: true
onAppStartup: true
useGlobalH: true
flatBuffers: |
include "../fbs/nostr-index.fbs";

View File

@ -15,19 +15,19 @@ struct ActiveMonitors : NonCopyable {
Monitor(Subscription &sub_) : sub(std::move(sub_)) {}
};
using ConnMonitor = std::map<SubId, Monitor>;
std::map<uint64_t, ConnMonitor> conns; // connId -> subId -> Monitor
using ConnMonitor = flat_hash_map<SubId, Monitor>;
flat_hash_map<uint64_t, ConnMonitor> conns; // connId -> subId -> Monitor
struct MonitorItem {
Monitor *mon;
uint64_t latestEventId;
};
using MonitorSet = std::map<NostrFilter*, MonitorItem>; // FIXME: flat_map here
std::map<std::string, MonitorSet> allIds;
std::map<std::string, MonitorSet> allAuthors;
std::map<std::string, MonitorSet> allTags;
std::map<uint64_t, MonitorSet> allKinds;
using MonitorSet = flat_hash_map<NostrFilter*, MonitorItem>;
btree_map<std::string, MonitorSet> allIds;
btree_map<std::string, MonitorSet> allAuthors;
btree_map<std::string, MonitorSet> allTags;
btree_map<uint64_t, MonitorSet> allKinds;
MonitorSet allOthers;
std::string tagSpecBuf = std::string(256, '\0');
@ -92,7 +92,7 @@ struct ActiveMonitors : NonCopyable {
}
};
auto processMonitorsPrefix = [&](std::map<std::string, MonitorSet> &m, const std::string &key, std::function<bool(const std::string&)> matches){
auto processMonitorsPrefix = [&](btree_map<std::string, MonitorSet> &m, const std::string &key, std::function<bool(const std::string&)> matches){
auto it = m.lower_bound(key.substr(0, 1));
if (it == m.end()) return;
@ -103,7 +103,7 @@ struct ActiveMonitors : NonCopyable {
}
};
auto processMonitorsExact = [&]<typename T>(std::map<T, MonitorSet> &m, const T &key, std::function<bool(const T &)> matches){
auto processMonitorsExact = [&]<typename T>(btree_map<T, MonitorSet> &m, const T &key, std::function<bool(const T &)> matches){
auto it = m.upper_bound(key);
if (it == m.begin()) return;

View File

@ -30,7 +30,7 @@ struct DBScan {
};
struct TagScan {
std::map<char, FilterSetBytes>::const_iterator indexTagName;
flat_hash_map<char, FilterSetBytes>::const_iterator indexTagName;
size_t indexTagVal = 0;
std::string search;
};
@ -295,7 +295,7 @@ struct DBScanQuery : NonCopyable {
size_t filterGroupIndex = 0;
bool dead = false;
std::unordered_set<uint64_t> alreadySentEvents; // FIXME: flat_set here, or roaring bitmap/judy/whatever
flat_hash_set<uint64_t> alreadySentEvents;
uint64_t currScanTime = 0;
uint64_t currScanSaveRestores = 0;

View File

@ -3,7 +3,6 @@
#include <zstd.h>
#include <zdict.h>
#include <unordered_map>
#include <mutex>
#include "golpe.h"
@ -11,7 +10,7 @@
struct DictionaryBroker {
std::mutex mutex;
std::unordered_map<uint32_t, ZSTD_DDict*> dicts;
flat_hash_map<uint32_t, ZSTD_DDict*> dicts;
ZSTD_DDict *getDict(lmdb::txn &txn, uint32_t dictId) {
std::lock_guard<std::mutex> guard(mutex);
@ -34,7 +33,7 @@ extern DictionaryBroker globalDictionaryBroker;
struct Decompressor {
ZSTD_DCtx *dctx;
std::unordered_map<uint32_t, ZSTD_DDict*> dicts;
flat_hash_map<uint32_t, ZSTD_DDict*> dicts;
std::string buffer;
Decompressor() {

View File

@ -5,8 +5,8 @@
struct ActiveQueries : NonCopyable {
Decompressor decomp;
using ConnQueries = std::map<SubId, DBScanQuery*>;
std::map<uint64_t, ConnQueries> conns; // connId -> subId -> DBScanQuery*
using ConnQueries = flat_hash_map<SubId, DBScanQuery*>;
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBScanQuery*
std::deque<DBScanQuery*> running;
void addSub(lmdb::txn &txn, Subscription &&sub) {

View File

@ -40,7 +40,7 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
uWS::Hub hub;
uWS::Group<uWS::SERVER> *hubGroup;
std::map<uint64_t, Connection*> connIdToConnection;
flat_hash_map<uint64_t, Connection*> connIdToConnection;
uint64_t nextConnectionId = 1;
std::string tempBuf;

View File

@ -20,7 +20,7 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
struct SyncStateCollection {
RelayServer *server;
quadrable::Quadrable *qdb;
std::map<uint64_t, std::map<uint64_t, SyncState>> conns; // connId -> reqId -> SyncState
flat_hash_map<uint64_t, flat_hash_map<uint64_t, SyncState>> conns; // connId -> reqId -> SyncState
SyncStateCollection(RelayServer *server_, quadrable::Quadrable *qdb_) : server(server_), qdb(qdb_) {}

View File

@ -1,5 +1,7 @@
#pragma once
#include <parallel_hashmap/phmap_utils.h>
#include "filters.h"
@ -28,10 +30,19 @@ struct SubId {
std::string str() const {
return std::string(sv());
}
bool operator==(const SubId &o) const {
return o.sv() == sv();
}
};
inline bool operator <(const SubId &s1, const SubId &s2) {
return s1.sv() < s2.sv();
namespace std {
// inject specialization of std::hash
template<> struct hash<SubId> {
std::size_t operator()(SubId const &p) const {
return phmap::HashState().combine(0, p.sv());
}
};
}

View File

@ -71,7 +71,7 @@ void cmd_dict(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
std::map<uint32_t, uint64_t> dicts;
btree_map<uint32_t, uint64_t> dicts;
env.foreach_CompressionDictionary(txn, [&](auto &view){
auto dictId = view.primaryKeyId;

View File

@ -31,7 +31,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
std::unordered_set<std::string> downloadedIds;
flat_hash_set<std::string> downloadedIds;
WriterPipeline writer;
WSConnection ws(url);
Decompressor decomp;

View File

@ -114,7 +114,7 @@ struct NostrFilter {
std::optional<FilterSetBytes> ids;
std::optional<FilterSetBytes> authors;
std::optional<FilterSetUint> kinds;
std::map<char, FilterSetBytes> tags;
flat_hash_map<char, FilterSetBytes> tags;
uint64_t since = 0;
uint64_t until = MAX_U64;

View File

@ -1,14 +1,12 @@
#pragma once
#include <parallel_hashmap/phmap.h>
#include "golpe.h"
#include "render.h"
inline void quadrableGarbageCollect(quadrable::Quadrable &qdb, int logLevel = 0) {
quadrable::Quadrable::GarbageCollector<phmap::flat_hash_set<uint64_t>> gc(qdb);
quadrable::Quadrable::GarbageCollector<flat_hash_set<uint64_t>> gc(qdb);
quadrable::Quadrable::GCStats stats;
if (logLevel >= 2) LI << "Running garbage collection";

6
src/global.h Normal file
View File

@ -0,0 +1,6 @@
#pragma once
#include <parallel_hashmap/phmap.h>
#include <parallel_hashmap/btree.h>
using namespace phmap;