diff --git a/src/QueryScheduler.h b/src/QueryScheduler.h new file mode 100644 index 0000000..ee17e34 --- /dev/null +++ b/src/QueryScheduler.h @@ -0,0 +1,90 @@ +#pragma once + +#include "DBQuery.h" + + +struct QueryScheduler : NonCopyable { + std::function onEvent; + std::function onComplete; + + using ConnQueries = flat_hash_map; + flat_hash_map conns; // connId -> subId -> DBQuery* + std::deque running; + + bool addSub(lmdb::txn &txn, Subscription &&sub) { + sub.latestEventId = getMostRecentLevId(txn); + + { + auto *existing = findQuery(sub.connId, sub.subId); + if (existing) removeSub(sub.connId, sub.subId); + } + + auto res = conns.try_emplace(sub.connId); + auto &connQueries = res.first->second; + + if (connQueries.size() >= cfg().relay__maxSubsPerConnection) { + return false; + } + + DBQuery *q = new DBQuery(sub); + + connQueries.try_emplace(q->sub.subId, q); + running.push_front(q); + + return true; + } + + DBQuery *findQuery(uint64_t connId, const SubId &subId) { + auto f1 = conns.find(connId); + if (f1 == conns.end()) return nullptr; + + auto f2 = f1->second.find(subId); + if (f2 == f1->second.end()) return nullptr; + + return f2->second; + } + + void removeSub(uint64_t connId, const SubId &subId) { + auto *query = findQuery(connId, subId); + if (!query) return; + query->dead = true; + conns[connId].erase(subId); + if (conns[connId].empty()) conns.erase(connId); + } + + void closeConn(uint64_t connId) { + auto f1 = conns.find(connId); + if (f1 == conns.end()) return; + + for (auto &[k, v] : f1->second) v->dead = true; + + conns.erase(connId); + } + + void process(lmdb::txn &txn) { + if (running.empty()) return; + + DBQuery *q = running.front(); + running.pop_front(); + + if (q->dead) { + delete q; + return; + } + + bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ + onEvent(txn, sub, levId, eventPayload); + }, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf); + + if (complete) { + auto connId = q->sub.connId; + removeSub(connId, q->sub.subId); + + onComplete(q->sub); + + delete q; + } else { + running.push_back(q); + } + } +}; diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index f504bbd..972f272 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -1,97 +1,20 @@ #include "RelayServer.h" #include "DBQuery.h" - - - -struct ActiveQueries : NonCopyable { - Decompressor decomp; - using ConnQueries = flat_hash_map; - flat_hash_map conns; // connId -> subId -> DBQuery* - std::deque running; - - bool addSub(lmdb::txn &txn, Subscription &&sub) { - sub.latestEventId = getMostRecentLevId(txn); - - { - auto *existing = findQuery(sub.connId, sub.subId); - if (existing) removeSub(sub.connId, sub.subId); - } - - auto res = conns.try_emplace(sub.connId); - auto &connQueries = res.first->second; - - if (connQueries.size() >= cfg().relay__maxSubsPerConnection) { - return false; - } - - DBQuery *q = new DBQuery(sub); - - connQueries.try_emplace(q->sub.subId, q); - running.push_front(q); - - return true; - } - - DBQuery *findQuery(uint64_t connId, const SubId &subId) { - auto f1 = conns.find(connId); - if (f1 == conns.end()) return nullptr; - - auto f2 = f1->second.find(subId); - if (f2 == f1->second.end()) return nullptr; - - return f2->second; - } - - void removeSub(uint64_t connId, const SubId &subId) { - auto *query = findQuery(connId, subId); - if (!query) return; - query->dead = true; - conns[connId].erase(subId); - if (conns[connId].empty()) conns.erase(connId); - } - - void closeConn(uint64_t connId) { - auto f1 = conns.find(connId); - if (f1 == conns.end()) return; - - for (auto &[k, v] : f1->second) v->dead = true; - - conns.erase(connId); - } - - void process(RelayServer *server, lmdb::txn &txn) { - if (running.empty()) return; - - DBQuery *q = running.front(); - running.pop_front(); - - if (q->dead) { - delete q; - return; - } - - bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ - server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr)); - }, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf); - - if (complete) { - auto connId = q->sub.connId; - - server->sendToConn(connId, tao::json::to_string(tao::json::value::array({ "EOSE", q->sub.subId.str() }))); - removeSub(connId, q->sub.subId); - - server->tpReqMonitor.dispatch(connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(q->sub)}}); - - delete q; - } else { - running.push_back(q); - } - } -}; +#include "QueryScheduler.h" void RelayServer::runReqWorker(ThreadPool::Thread &thr) { - ActiveQueries queries; + Decompressor decomp; + QueryScheduler queries; + + queries.onEvent = [&](lmdb::txn &txn, const auto &sub, uint64_t levId, std::string_view eventPayload){ + sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr)); + }; + + queries.onComplete = [&](Subscription &sub){ + sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ "EOSE", sub.subId.str() }))); + tpReqMonitor.dispatch(sub.connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(sub)}}); + }; while(1) { auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait(); @@ -106,7 +29,7 @@ void RelayServer::runReqWorker(ThreadPool::Thread &thr) { sendNoticeError(connId, std::string("too many concurrent REQs")); } - queries.process(this, txn); + queries.process(txn); } else if (auto msg = std::get_if(&newMsg.msg)) { queries.removeSub(msg->connId, msg->subId); tpReqMonitor.dispatch(msg->connId, MsgReqMonitor{MsgReqMonitor::RemoveSub{msg->connId, msg->subId}}); @@ -116,7 +39,7 @@ void RelayServer::runReqWorker(ThreadPool::Thread &thr) { } } - queries.process(this, txn); + queries.process(txn); txn.abort(); }