limit on max number of concurrent REQs

This commit is contained in:
Doug Hoyte
2023-02-05 15:43:11 -05:00
parent 43cdd64956
commit 2c86254fb9
5 changed files with 31 additions and 5 deletions

View File

@ -140,6 +140,9 @@ config:
- name: relay__maxFilterLimit
desc: "Maximum records that can be returned per filter"
default: 500
- name: relay__maxSubsPerConnection
desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time"
default: 20
- name: relay__compression__enabled
desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU"

View File

@ -40,7 +40,7 @@ struct ActiveMonitors : NonCopyable {
public:
void addSub(lmdb::txn &txn, Subscription &&sub, uint64_t currEventId) {
bool addSub(lmdb::txn &txn, Subscription &&sub, uint64_t currEventId) {
if (sub.latestEventId != currEventId) throw herr("sub not up to date");
{
@ -51,10 +51,15 @@ struct ActiveMonitors : NonCopyable {
auto res = conns.try_emplace(sub.connId);
auto &connMonitors = res.first->second;
if (connMonitors.size() >= cfg().relay__maxSubsPerConnection) {
return false;
}
auto subId = sub.subId;
auto *m = &connMonitors.try_emplace(subId, sub).first->second;
installLookups(m, currEventId);
return true;
}
void removeSub(uint64_t connId, const SubId &subId) {

View File

@ -28,9 +28,11 @@ void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgReqMonitor::NewSub>(&newMsg.msg)) {
auto connId = msg->sub.connId;
env.foreach_Event(txn, [&](auto &ev){
if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) {
sendEvent(msg->sub.connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
}
return true;
@ -38,7 +40,9 @@ void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
msg->sub.latestEventId = latestEventId;
monitors.addSub(txn, std::move(msg->sub), latestEventId);
if (!monitors.addSub(txn, std::move(msg->sub), latestEventId)) {
sendNoticeError(connId, std::string("too many concurrent REQs"));
}
} else if (auto msg = std::get_if<MsgReqMonitor::RemoveSub>(&newMsg.msg)) {
monitors.removeSub(msg->connId, msg->subId);
} else if (auto msg = std::get_if<MsgReqMonitor::CloseConn>(&newMsg.msg)) {

View File

@ -9,7 +9,7 @@ struct ActiveQueries : NonCopyable {
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBScanQuery*
std::deque<DBScanQuery*> running;
void addSub(lmdb::txn &txn, Subscription &&sub) {
bool addSub(lmdb::txn &txn, Subscription &&sub) {
sub.latestEventId = getMostRecentLevId(txn);
{
@ -20,10 +20,16 @@ struct ActiveQueries : NonCopyable {
auto res = conns.try_emplace(sub.connId);
auto &connQueries = res.first->second;
if (connQueries.size() >= cfg().relay__maxSubsPerConnection) {
return false;
}
DBScanQuery *q = new DBScanQuery(sub);
connQueries.try_emplace(q->sub.subId, q);
running.push_front(q);
return true;
}
DBScanQuery *findQuery(uint64_t connId, const SubId &subId) {
@ -98,7 +104,12 @@ void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgReqWorker::NewSub>(&newMsg.msg)) {
queries.addSub(txn, std::move(msg->sub));
auto connId = msg->sub.connId;
if (!queries.addSub(txn, std::move(msg->sub))) {
sendNoticeError(connId, std::string("too many concurrent REQs"));
}
queries.process(this, txn);
} else if (auto msg = std::get_if<MsgReqWorker::RemoveSub>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);

View File

@ -52,6 +52,9 @@ relay {
# Maximum records that can be returned per filter
maxFilterLimit = 500
# Maximum number of subscriptions (concurrent REQs) a connection can have open at any time
maxSubsPerConnection = 2
compression {
# Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required)
enabled = true