From 80915f969dfcc140847713a4b572e5deea01cf8d Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Fri, 21 Jul 2023 07:46:37 -0400 Subject: [PATCH] If a client disconnects before its pending EVENT write messages have been processed, drop those messages instead of trying to write them --- CHANGES | 4 ++++ TODO | 1 - src/apps/relay/RelayIngester.cpp | 1 + src/apps/relay/RelayServer.h | 6 +++++- src/apps/relay/RelayWriter.cpp | 26 ++++++++++++++++++++++++++ 5 files changed, 36 insertions(+), 2 deletions(-) diff --git a/CHANGES b/CHANGES index ec0be5f..566976d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +0.9.4 + * If a client disconnects before its pending EVENT write messages have been processed, + drop those messages instead of trying to write them + 0.9.3 * Limit on events that can be processed by a sync - Configurable with relay.negentropy.maxSyncEvents diff --git a/TODO b/TODO index 612716e..2b017e7 100644 --- a/TODO +++ b/TODO @@ -15,7 +15,6 @@ features * delete by receivedAt, IP addrs, etc * inverted filter: delete events that *don't* match the provided filter ? less verbose default logging - ? if a client disconnects, delete all its pending write messages ? kill plugin if it times out rate limits (maybe not needed now that we have plugins?) diff --git a/src/apps/relay/RelayIngester.cpp b/src/apps/relay/RelayIngester.cpp index ca65cb6..c49c30c 100644 --- a/src/apps/relay/RelayIngester.cpp +++ b/src/apps/relay/RelayIngester.cpp @@ -71,6 +71,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } } else if (auto msg = std::get_if(&newMsg.msg)) { auto connId = msg->connId; + tpWriter.dispatch(connId, MsgWriter{MsgWriter::CloseConn{connId}}); tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::CloseConn{connId}}); tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::CloseConn{connId}}); } diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index 0acb83a..0ccea26 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -70,7 +70,11 @@ struct MsgWriter : NonCopyable { std::string jsonStr; }; - using Var = std::variant; + struct CloseConn { + uint64_t connId; + }; + + using Var = std::variant; Var msg; MsgWriter(Var &&msg_) : msg(std::move(msg_)) {} }; diff --git a/src/apps/relay/RelayWriter.cpp b/src/apps/relay/RelayWriter.cpp index ea3bfea..9123d52 100644 --- a/src/apps/relay/RelayWriter.cpp +++ b/src/apps/relay/RelayWriter.cpp @@ -9,6 +9,28 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { while(1) { auto newMsgs = thr.inbox.pop_all(); + // Filter out messages from already closed sockets + + { + flat_hash_set closedConns; + + for (auto &newMsg : newMsgs) { + if (auto msg = std::get_if(&newMsg.msg)) closedConns.insert(msg->connId); + } + + if (closedConns.size()) { + decltype(newMsgs) newMsgsFiltered; + + for (auto &newMsg : newMsgs) { + if (auto msg = std::get_if(&newMsg.msg)) { + if (!closedConns.contains(msg->connId)) newMsgsFiltered.emplace_back(std::move(newMsg)); + } + } + + std::swap(newMsgs, newMsgsFiltered); + } + } + // Prepare messages std::vector newEvents; @@ -33,6 +55,10 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { } } + if (!newEvents.size()) continue; + + // Do write + try { auto txn = env.txn_rw(); writeEvents(txn, newEvents);