mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-18 01:08:51 +00:00
log connection compression stats on disconnect
This commit is contained in:
@ -1,7 +1,9 @@
|
|||||||
|
#include <stdio.h>
|
||||||
|
|
||||||
#include "RelayServer.h"
|
#include "RelayServer.h"
|
||||||
|
|
||||||
|
|
||||||
std::string preGenerateHttpResponse(const std::string &contentType, const std::string &content) {
|
static std::string preGenerateHttpResponse(const std::string &contentType, const std::string &content) {
|
||||||
std::string output = "HTTP/1.1 200 OK\r\n";
|
std::string output = "HTTP/1.1 200 OK\r\n";
|
||||||
output += std::string("Content-Type: ") + contentType + "\r\n";
|
output += std::string("Content-Type: ") + contentType + "\r\n";
|
||||||
output += "Access-Control-Allow-Origin: *\r\n";
|
output += "Access-Control-Allow-Origin: *\r\n";
|
||||||
@ -14,12 +16,59 @@ std::string preGenerateHttpResponse(const std::string &contentType, const std::s
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static std::string renderSize(uint64_t si) {
|
||||||
|
if (si < 1024) return std::to_string(si) + " b";
|
||||||
|
|
||||||
|
double s = si;
|
||||||
|
char buf[128];
|
||||||
|
char unit;
|
||||||
|
|
||||||
|
do {
|
||||||
|
s /= 1024;
|
||||||
|
if (s < 1024) {
|
||||||
|
unit = 'K';
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
s /= 1024;
|
||||||
|
if (s < 1024) {
|
||||||
|
unit = 'M';
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
s /= 1024;
|
||||||
|
if (s < 1024) {
|
||||||
|
unit = 'G';
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
s /= 1024;
|
||||||
|
unit = 'T';
|
||||||
|
} while(0);
|
||||||
|
|
||||||
|
::snprintf(buf, sizeof(buf), "%.2f %c", s, unit);
|
||||||
|
return std::string(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::string renderPercent(double p) {
|
||||||
|
char buf[128];
|
||||||
|
::snprintf(buf, sizeof(buf), "%.1f%%", p * 100);
|
||||||
|
return std::string(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
|
void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
|
||||||
struct Connection {
|
struct Connection {
|
||||||
uWS::WebSocket<uWS::SERVER> *websocket;
|
uWS::WebSocket<uWS::SERVER> *websocket;
|
||||||
uint64_t connId;
|
uint64_t connId;
|
||||||
uint64_t connectedTimestamp;
|
uint64_t connectedTimestamp;
|
||||||
std::string ipAddr;
|
std::string ipAddr;
|
||||||
|
struct Stats {
|
||||||
|
uint64_t bytesUp = 0;
|
||||||
|
uint64_t bytesUpCompressed = 0;
|
||||||
|
uint64_t bytesDown = 0;
|
||||||
|
uint64_t bytesDownCompressed = 0;
|
||||||
|
} stats;
|
||||||
|
|
||||||
Connection(uWS::WebSocket<uWS::SERVER> *p, uint64_t connId_)
|
Connection(uWS::WebSocket<uWS::SERVER> *p, uint64_t connId_)
|
||||||
: websocket(p), connId(connId_), connectedTimestamp(hoytech::curr_time_us()) { }
|
: websocket(p), connId(connId_), connectedTimestamp(hoytech::curr_time_us()) { }
|
||||||
@ -51,7 +100,7 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
|
|||||||
ver = cfg().version();
|
ver = cfg().version();
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::string_view(rendered);
|
return std::string_view(rendered); // memory only valid until next call
|
||||||
};
|
};
|
||||||
|
|
||||||
const std::string defaultHttpResponse = preGenerateHttpResponse("text/plain", "Please use a Nostr client to connect.");
|
const std::string defaultHttpResponse = preGenerateHttpResponse("text/plain", "Please use a Nostr client to connect.");
|
||||||
@ -91,10 +140,16 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
hubGroup->onDisconnection([&](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) {
|
hubGroup->onDisconnection([&](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) {
|
||||||
Connection *c = (Connection*)ws->getUserData();
|
auto *c = (Connection*)ws->getUserData();
|
||||||
uint64_t connId = c->connId;
|
uint64_t connId = c->connId;
|
||||||
|
|
||||||
LI << "[" << connId << "] Disconnect from " << c->ipAddr;
|
auto upComp = renderPercent(1.0 - (double)c->stats.bytesUpCompressed / c->stats.bytesUp);
|
||||||
|
auto downComp = renderPercent(1.0 - (double)c->stats.bytesDownCompressed / c->stats.bytesDown);
|
||||||
|
|
||||||
|
LI << "[" << connId << "] Disconnect from " << c->ipAddr
|
||||||
|
<< " UP: " << renderSize(c->stats.bytesUp) << " (" << upComp << " compressed)"
|
||||||
|
<< " DN: " << renderSize(c->stats.bytesDown) << " (" << downComp << " compressed)"
|
||||||
|
;
|
||||||
|
|
||||||
tpIngester.dispatch(connId, MsgIngester{MsgIngester::CloseConn{connId}});
|
tpIngester.dispatch(connId, MsgIngester{MsgIngester::CloseConn{connId}});
|
||||||
|
|
||||||
@ -103,10 +158,11 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
hubGroup->onMessage2([&](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) {
|
hubGroup->onMessage2([&](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) {
|
||||||
//LI << "Decompression: " << compressedSize << " -> " << length;
|
|
||||||
|
|
||||||
auto &c = *(Connection*)ws->getUserData();
|
auto &c = *(Connection*)ws->getUserData();
|
||||||
|
|
||||||
|
c.stats.bytesDown += length;
|
||||||
|
c.stats.bytesDownCompressed += compressedSize;
|
||||||
|
|
||||||
tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length)}});
|
tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length)}});
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -122,7 +178,8 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
|
|||||||
size_t compressedSize;
|
size_t compressedSize;
|
||||||
auto cb = [](uWS::WebSocket<uWS::SERVER> *webSocket, void *data, bool cancelled, void *reserved){};
|
auto cb = [](uWS::WebSocket<uWS::SERVER> *webSocket, void *data, bool cancelled, void *reserved){};
|
||||||
c.websocket->send(payload.data(), payload.size(), opCode, cb, nullptr, true, &compressedSize);
|
c.websocket->send(payload.data(), payload.size(), opCode, cb, nullptr, true, &compressedSize);
|
||||||
//LI << "Compression: " << payload.size() << " -> " << compressedSize;
|
c.stats.bytesUp += payload.size();
|
||||||
|
c.stats.bytesUpCompressed += compressedSize;
|
||||||
};
|
};
|
||||||
|
|
||||||
for (auto &newMsg : newMsgs) {
|
for (auto &newMsg : newMsgs) {
|
||||||
@ -139,14 +196,7 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
|
|||||||
tempBuf += msg->evJson;
|
tempBuf += msg->evJson;
|
||||||
tempBuf += "]";
|
tempBuf += "]";
|
||||||
|
|
||||||
auto it = connIdToConnection.find(item.connId);
|
doSend(item.connId, tempBuf, uWS::OpCode::TEXT);
|
||||||
if (it == connIdToConnection.end()) continue;
|
|
||||||
auto &c = *it->second;
|
|
||||||
|
|
||||||
size_t compressedSize;
|
|
||||||
auto cb = [](uWS::WebSocket<uWS::SERVER> *webSocket, void *data, bool cancelled, void *reserved){};
|
|
||||||
c.websocket->send(tempBuf.data(), tempBuf.size(), uWS::OpCode::TEXT, cb, nullptr, true, &compressedSize);
|
|
||||||
//LI << "Compression: " << msg->payload.size() << " -> " << compressedSize;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user