diff --git a/golpe.yaml b/golpe.yaml index 8670529..2dfa186 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -140,6 +140,9 @@ config: - name: relay__maxFilterLimit desc: "Maximum records that can be returned per filter" default: 500 + - name: relay__maxSubsPerConnection + desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time" + default: 20 - name: relay__compression__enabled desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU" diff --git a/src/ActiveMonitors.h b/src/ActiveMonitors.h index 3aa6890..129b917 100644 --- a/src/ActiveMonitors.h +++ b/src/ActiveMonitors.h @@ -40,7 +40,7 @@ struct ActiveMonitors : NonCopyable { public: - void addSub(lmdb::txn &txn, Subscription &&sub, uint64_t currEventId) { + bool addSub(lmdb::txn &txn, Subscription &&sub, uint64_t currEventId) { if (sub.latestEventId != currEventId) throw herr("sub not up to date"); { @@ -51,10 +51,15 @@ struct ActiveMonitors : NonCopyable { auto res = conns.try_emplace(sub.connId); auto &connMonitors = res.first->second; + if (connMonitors.size() >= cfg().relay__maxSubsPerConnection) { + return false; + } + auto subId = sub.subId; auto *m = &connMonitors.try_emplace(subId, sub).first->second; installLookups(m, currEventId); + return true; } void removeSub(uint64_t connId, const SubId &subId) { diff --git a/src/RelayReqMonitor.cpp b/src/RelayReqMonitor.cpp index d9feb80..4fadd70 100644 --- a/src/RelayReqMonitor.cpp +++ b/src/RelayReqMonitor.cpp @@ -28,9 +28,11 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { for (auto &newMsg : newMsgs) { if (auto msg = std::get_if(&newMsg.msg)) { + auto connId = msg->sub.connId; + env.foreach_Event(txn, [&](auto &ev){ if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) { - sendEvent(msg->sub.connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId)); + sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId)); } return true; @@ -38,7 +40,9 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { msg->sub.latestEventId = latestEventId; - monitors.addSub(txn, std::move(msg->sub), latestEventId); + if (!monitors.addSub(txn, std::move(msg->sub), latestEventId)) { + sendNoticeError(connId, std::string("too many concurrent REQs")); + } } else if (auto msg = std::get_if(&newMsg.msg)) { monitors.removeSub(msg->connId, msg->subId); } else if (auto msg = std::get_if(&newMsg.msg)) { diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index 30ca2f0..dfd4b19 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -9,7 +9,7 @@ struct ActiveQueries : NonCopyable { flat_hash_map conns; // connId -> subId -> DBScanQuery* std::deque running; - void addSub(lmdb::txn &txn, Subscription &&sub) { + bool addSub(lmdb::txn &txn, Subscription &&sub) { sub.latestEventId = getMostRecentLevId(txn); { @@ -20,10 +20,16 @@ struct ActiveQueries : NonCopyable { auto res = conns.try_emplace(sub.connId); auto &connQueries = res.first->second; + if (connQueries.size() >= cfg().relay__maxSubsPerConnection) { + return false; + } + DBScanQuery *q = new DBScanQuery(sub); connQueries.try_emplace(q->sub.subId, q); running.push_front(q); + + return true; } DBScanQuery *findQuery(uint64_t connId, const SubId &subId) { @@ -98,7 +104,12 @@ void RelayServer::runReqWorker(ThreadPool::Thread &thr) { for (auto &newMsg : newMsgs) { if (auto msg = std::get_if(&newMsg.msg)) { - queries.addSub(txn, std::move(msg->sub)); + auto connId = msg->sub.connId; + + if (!queries.addSub(txn, std::move(msg->sub))) { + sendNoticeError(connId, std::string("too many concurrent REQs")); + } + queries.process(this, txn); } else if (auto msg = std::get_if(&newMsg.msg)) { queries.removeSub(msg->connId, msg->subId); diff --git a/strfry.conf b/strfry.conf index 02cd1dd..f41bebc 100644 --- a/strfry.conf +++ b/strfry.conf @@ -52,6 +52,9 @@ relay { # Maximum records that can be returned per filter maxFilterLimit = 500 + # Maximum number of subscriptions (concurrent REQs) a connection can have open at any time + maxSubsPerConnection = 2 + compression { # Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required) enabled = true