zero downtime restarts

This commit is contained in:
Doug Hoyte
2023-06-05 15:31:42 -04:00
parent c2f9d61e91
commit abb488a570
6 changed files with 85 additions and 2 deletions

View File

@ -37,7 +37,10 @@ struct MsgWebsocket : NonCopyable {
std::string evJson;
};
using Var = std::variant<Send, SendBinary, SendEventToBatch>;
struct GracefulShutdown {
};
using Var = std::variant<Send, SendBinary, SendEventToBatch, GracefulShutdown>;
Var msg;
MsgWebsocket(Var &&msg_) : msg(std::move(msg_)) {}
};
@ -153,6 +156,7 @@ struct RelayServer {
ThreadPool<MsgReqMonitor> tpReqMonitor;
ThreadPool<MsgNegentropy> tpNegentropy;
std::thread cronThread;
std::thread signalHandlerThread;
void run();
@ -174,6 +178,8 @@ struct RelayServer {
void runCron();
void runSignalHandler();
// Utils (can be called by any thread)
void sendToConn(uint64_t connId, std::string &&payload) {

View File

@ -0,0 +1,25 @@
#include <signal.h>
#include "RelayServer.h"
void RelayServer::runSignalHandler() {
setThreadName("signalHandler");
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGUSR1);
while (1) {
int sig;
int s = sigwait(&sigset, &sig);
if (s != 0) throw herr("unable to sigwait: ", strerror(errno));
if (sig == SIGUSR1) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::GracefulShutdown{}});
hubTrigger->send();
} else {
LW << "Got unexpected signal: " << sig;
}
}
}

View File

@ -41,6 +41,7 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
uWS::Group<uWS::SERVER> *hubGroup;
flat_hash_map<uint64_t, Connection*> connIdToConnection;
uint64_t nextConnectionId = 1;
bool gracefulShutdown = false;
std::string tempBuf;
tempBuf.reserve(cfg().events__maxEventSize + MAX_SUBID_SIZE + 100);
@ -137,6 +138,14 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
connIdToConnection.erase(connId);
delete c;
if (gracefulShutdown) {
LI << "Graceful shutdown in progress: " << connIdToConnection.size() << " connections remaining";
if (connIdToConnection.size() == 0) {
LW << "All connections closed, shutting down";
::exit(0);
}
}
});
hubGroup->onMessage2([&](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) {
@ -183,6 +192,10 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
memcpy(p + 10, subIdSv.data(), subIdSv.size());
doSend(item.connId, std::string_view(p, 13 + subIdSv.size() + msg->evJson.size()), uWS::OpCode::TEXT);
}
} else if (std::get_if<MsgWebsocket::GracefulShutdown>(&newMsg.msg)) {
LW << "Initiating graceful shutdown: " << connIdToConnection.size() << " connections remaining";
gracefulShutdown = true;
hubGroup->stopListening();
}
}
};

View File

@ -1,3 +1,6 @@
#include <pthread.h>
#include <signal.h>
#include "RelayServer.h"
@ -8,6 +11,14 @@ void cmd_relay(const std::vector<std::string> &subArgs) {
}
void RelayServer::run() {
{
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGUSR1);
int s = pthread_sigmask(SIG_BLOCK, &set, NULL);
if (s != 0) throw herr("Unable to set sigmask: ", strerror(errno));
}
tpWebsocket.init("Websocket", 1, [this](auto &thr){
runWebsocket(thr);
});
@ -36,6 +47,10 @@ void RelayServer::run() {
runCron();
});
signalHandlerThread = std::thread([this]{
runSignalHandler();
});
// Monitor for config file reloads
auto configFileChangeWatcher = hoytech::file_change_monitor(configFile);