From a5992b0b86dcd858d4fc042019516436c0c9f998 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Fri, 20 Jan 2023 11:58:57 -0500 Subject: [PATCH] log connection compression stats on disconnect --- src/RelayWebsocket.cpp | 80 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 15 deletions(-) diff --git a/src/RelayWebsocket.cpp b/src/RelayWebsocket.cpp index f7adc50..729d166 100644 --- a/src/RelayWebsocket.cpp +++ b/src/RelayWebsocket.cpp @@ -1,7 +1,9 @@ +#include + #include "RelayServer.h" -std::string preGenerateHttpResponse(const std::string &contentType, const std::string &content) { +static std::string preGenerateHttpResponse(const std::string &contentType, const std::string &content) { std::string output = "HTTP/1.1 200 OK\r\n"; output += std::string("Content-Type: ") + contentType + "\r\n"; output += "Access-Control-Allow-Origin: *\r\n"; @@ -14,12 +16,59 @@ std::string preGenerateHttpResponse(const std::string &contentType, const std::s }; +static std::string renderSize(uint64_t si) { + if (si < 1024) return std::to_string(si) + " b"; + + double s = si; + char buf[128]; + char unit; + + do { + s /= 1024; + if (s < 1024) { + unit = 'K'; + break; + } + + s /= 1024; + if (s < 1024) { + unit = 'M'; + break; + } + + s /= 1024; + if (s < 1024) { + unit = 'G'; + break; + } + + s /= 1024; + unit = 'T'; + } while(0); + + ::snprintf(buf, sizeof(buf), "%.2f %c", s, unit); + return std::string(buf); +} + +static std::string renderPercent(double p) { + char buf[128]; + ::snprintf(buf, sizeof(buf), "%.1f%%", p * 100); + return std::string(buf); +} + + void RelayServer::runWebsocket(ThreadPool::Thread &thr) { struct Connection { uWS::WebSocket *websocket; uint64_t connId; uint64_t connectedTimestamp; std::string ipAddr; + struct Stats { + uint64_t bytesUp = 0; + uint64_t bytesUpCompressed = 0; + uint64_t bytesDown = 0; + uint64_t bytesDownCompressed = 0; + } stats; Connection(uWS::WebSocket *p, uint64_t connId_) : websocket(p), connId(connId_), connectedTimestamp(hoytech::curr_time_us()) { } @@ -51,7 +100,7 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { ver = cfg().version(); } - return std::string_view(rendered); + return std::string_view(rendered); // memory only valid until next call }; const std::string defaultHttpResponse = preGenerateHttpResponse("text/plain", "Please use a Nostr client to connect."); @@ -91,10 +140,16 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { }); hubGroup->onDisconnection([&](uWS::WebSocket *ws, int code, char *message, size_t length) { - Connection *c = (Connection*)ws->getUserData(); + auto *c = (Connection*)ws->getUserData(); uint64_t connId = c->connId; - LI << "[" << connId << "] Disconnect from " << c->ipAddr; + auto upComp = renderPercent(1.0 - (double)c->stats.bytesUpCompressed / c->stats.bytesUp); + auto downComp = renderPercent(1.0 - (double)c->stats.bytesDownCompressed / c->stats.bytesDown); + + LI << "[" << connId << "] Disconnect from " << c->ipAddr + << " UP: " << renderSize(c->stats.bytesUp) << " (" << upComp << " compressed)" + << " DN: " << renderSize(c->stats.bytesDown) << " (" << downComp << " compressed)" + ; tpIngester.dispatch(connId, MsgIngester{MsgIngester::CloseConn{connId}}); @@ -103,10 +158,11 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { }); hubGroup->onMessage2([&](uWS::WebSocket *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) { - //LI << "Decompression: " << compressedSize << " -> " << length; - auto &c = *(Connection*)ws->getUserData(); + c.stats.bytesDown += length; + c.stats.bytesDownCompressed += compressedSize; + tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length)}}); }); @@ -122,7 +178,8 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { size_t compressedSize; auto cb = [](uWS::WebSocket *webSocket, void *data, bool cancelled, void *reserved){}; c.websocket->send(payload.data(), payload.size(), opCode, cb, nullptr, true, &compressedSize); - //LI << "Compression: " << payload.size() << " -> " << compressedSize; + c.stats.bytesUp += payload.size(); + c.stats.bytesUpCompressed += compressedSize; }; for (auto &newMsg : newMsgs) { @@ -139,14 +196,7 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { tempBuf += msg->evJson; tempBuf += "]"; - auto it = connIdToConnection.find(item.connId); - if (it == connIdToConnection.end()) continue; - auto &c = *it->second; - - size_t compressedSize; - auto cb = [](uWS::WebSocket *webSocket, void *data, bool cancelled, void *reserved){}; - c.websocket->send(tempBuf.data(), tempBuf.size(), uWS::OpCode::TEXT, cb, nullptr, true, &compressedSize); - //LI << "Compression: " << msg->payload.size() << " -> " << compressedSize; + doSend(item.connId, tempBuf, uWS::OpCode::TEXT); } } }