mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-18 09:17:12 +00:00
QueryScheduler refactor
This commit is contained in:
90
src/QueryScheduler.h
Normal file
90
src/QueryScheduler.h
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "DBQuery.h"
|
||||||
|
|
||||||
|
|
||||||
|
struct QueryScheduler : NonCopyable {
|
||||||
|
std::function<void(lmdb::txn &txn, const Subscription &sub, uint64_t levId, std::string_view eventPayload)> onEvent;
|
||||||
|
std::function<void(Subscription &sub)> onComplete;
|
||||||
|
|
||||||
|
using ConnQueries = flat_hash_map<SubId, DBQuery*>;
|
||||||
|
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBQuery*
|
||||||
|
std::deque<DBQuery*> running;
|
||||||
|
|
||||||
|
bool addSub(lmdb::txn &txn, Subscription &&sub) {
|
||||||
|
sub.latestEventId = getMostRecentLevId(txn);
|
||||||
|
|
||||||
|
{
|
||||||
|
auto *existing = findQuery(sub.connId, sub.subId);
|
||||||
|
if (existing) removeSub(sub.connId, sub.subId);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto res = conns.try_emplace(sub.connId);
|
||||||
|
auto &connQueries = res.first->second;
|
||||||
|
|
||||||
|
if (connQueries.size() >= cfg().relay__maxSubsPerConnection) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
DBQuery *q = new DBQuery(sub);
|
||||||
|
|
||||||
|
connQueries.try_emplace(q->sub.subId, q);
|
||||||
|
running.push_front(q);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
DBQuery *findQuery(uint64_t connId, const SubId &subId) {
|
||||||
|
auto f1 = conns.find(connId);
|
||||||
|
if (f1 == conns.end()) return nullptr;
|
||||||
|
|
||||||
|
auto f2 = f1->second.find(subId);
|
||||||
|
if (f2 == f1->second.end()) return nullptr;
|
||||||
|
|
||||||
|
return f2->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
void removeSub(uint64_t connId, const SubId &subId) {
|
||||||
|
auto *query = findQuery(connId, subId);
|
||||||
|
if (!query) return;
|
||||||
|
query->dead = true;
|
||||||
|
conns[connId].erase(subId);
|
||||||
|
if (conns[connId].empty()) conns.erase(connId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void closeConn(uint64_t connId) {
|
||||||
|
auto f1 = conns.find(connId);
|
||||||
|
if (f1 == conns.end()) return;
|
||||||
|
|
||||||
|
for (auto &[k, v] : f1->second) v->dead = true;
|
||||||
|
|
||||||
|
conns.erase(connId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void process(lmdb::txn &txn) {
|
||||||
|
if (running.empty()) return;
|
||||||
|
|
||||||
|
DBQuery *q = running.front();
|
||||||
|
running.pop_front();
|
||||||
|
|
||||||
|
if (q->dead) {
|
||||||
|
delete q;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
|
||||||
|
onEvent(txn, sub, levId, eventPayload);
|
||||||
|
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
|
||||||
|
|
||||||
|
if (complete) {
|
||||||
|
auto connId = q->sub.connId;
|
||||||
|
removeSub(connId, q->sub.subId);
|
||||||
|
|
||||||
|
onComplete(q->sub);
|
||||||
|
|
||||||
|
delete q;
|
||||||
|
} else {
|
||||||
|
running.push_back(q);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
@ -1,97 +1,20 @@
|
|||||||
#include "RelayServer.h"
|
#include "RelayServer.h"
|
||||||
#include "DBQuery.h"
|
#include "DBQuery.h"
|
||||||
|
#include "QueryScheduler.h"
|
||||||
|
|
||||||
|
|
||||||
struct ActiveQueries : NonCopyable {
|
|
||||||
Decompressor decomp;
|
|
||||||
using ConnQueries = flat_hash_map<SubId, DBQuery*>;
|
|
||||||
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBQuery*
|
|
||||||
std::deque<DBQuery*> running;
|
|
||||||
|
|
||||||
bool addSub(lmdb::txn &txn, Subscription &&sub) {
|
|
||||||
sub.latestEventId = getMostRecentLevId(txn);
|
|
||||||
|
|
||||||
{
|
|
||||||
auto *existing = findQuery(sub.connId, sub.subId);
|
|
||||||
if (existing) removeSub(sub.connId, sub.subId);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto res = conns.try_emplace(sub.connId);
|
|
||||||
auto &connQueries = res.first->second;
|
|
||||||
|
|
||||||
if (connQueries.size() >= cfg().relay__maxSubsPerConnection) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
DBQuery *q = new DBQuery(sub);
|
|
||||||
|
|
||||||
connQueries.try_emplace(q->sub.subId, q);
|
|
||||||
running.push_front(q);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
DBQuery *findQuery(uint64_t connId, const SubId &subId) {
|
|
||||||
auto f1 = conns.find(connId);
|
|
||||||
if (f1 == conns.end()) return nullptr;
|
|
||||||
|
|
||||||
auto f2 = f1->second.find(subId);
|
|
||||||
if (f2 == f1->second.end()) return nullptr;
|
|
||||||
|
|
||||||
return f2->second;
|
|
||||||
}
|
|
||||||
|
|
||||||
void removeSub(uint64_t connId, const SubId &subId) {
|
|
||||||
auto *query = findQuery(connId, subId);
|
|
||||||
if (!query) return;
|
|
||||||
query->dead = true;
|
|
||||||
conns[connId].erase(subId);
|
|
||||||
if (conns[connId].empty()) conns.erase(connId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void closeConn(uint64_t connId) {
|
|
||||||
auto f1 = conns.find(connId);
|
|
||||||
if (f1 == conns.end()) return;
|
|
||||||
|
|
||||||
for (auto &[k, v] : f1->second) v->dead = true;
|
|
||||||
|
|
||||||
conns.erase(connId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void process(RelayServer *server, lmdb::txn &txn) {
|
|
||||||
if (running.empty()) return;
|
|
||||||
|
|
||||||
DBQuery *q = running.front();
|
|
||||||
running.pop_front();
|
|
||||||
|
|
||||||
if (q->dead) {
|
|
||||||
delete q;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
|
|
||||||
server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr));
|
|
||||||
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
|
|
||||||
|
|
||||||
if (complete) {
|
|
||||||
auto connId = q->sub.connId;
|
|
||||||
|
|
||||||
server->sendToConn(connId, tao::json::to_string(tao::json::value::array({ "EOSE", q->sub.subId.str() })));
|
|
||||||
removeSub(connId, q->sub.subId);
|
|
||||||
|
|
||||||
server->tpReqMonitor.dispatch(connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(q->sub)}});
|
|
||||||
|
|
||||||
delete q;
|
|
||||||
} else {
|
|
||||||
running.push_back(q);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
|
void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
|
||||||
ActiveQueries queries;
|
Decompressor decomp;
|
||||||
|
QueryScheduler queries;
|
||||||
|
|
||||||
|
queries.onEvent = [&](lmdb::txn &txn, const auto &sub, uint64_t levId, std::string_view eventPayload){
|
||||||
|
sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr));
|
||||||
|
};
|
||||||
|
|
||||||
|
queries.onComplete = [&](Subscription &sub){
|
||||||
|
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ "EOSE", sub.subId.str() })));
|
||||||
|
tpReqMonitor.dispatch(sub.connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(sub)}});
|
||||||
|
};
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait();
|
auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait();
|
||||||
@ -106,7 +29,7 @@ void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
|
|||||||
sendNoticeError(connId, std::string("too many concurrent REQs"));
|
sendNoticeError(connId, std::string("too many concurrent REQs"));
|
||||||
}
|
}
|
||||||
|
|
||||||
queries.process(this, txn);
|
queries.process(txn);
|
||||||
} else if (auto msg = std::get_if<MsgReqWorker::RemoveSub>(&newMsg.msg)) {
|
} else if (auto msg = std::get_if<MsgReqWorker::RemoveSub>(&newMsg.msg)) {
|
||||||
queries.removeSub(msg->connId, msg->subId);
|
queries.removeSub(msg->connId, msg->subId);
|
||||||
tpReqMonitor.dispatch(msg->connId, MsgReqMonitor{MsgReqMonitor::RemoveSub{msg->connId, msg->subId}});
|
tpReqMonitor.dispatch(msg->connId, MsgReqMonitor{MsgReqMonitor::RemoveSub{msg->connId, msg->subId}});
|
||||||
@ -116,7 +39,7 @@ void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
queries.process(this, txn);
|
queries.process(txn);
|
||||||
|
|
||||||
txn.abort();
|
txn.abort();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user