diff --git a/README.md b/README.md index 695106f..af96ddf 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ strfry is a relay for the [nostr protocol](https://github.com/nostr-protocol/nos * Supports most applicable NIPs: 1, 2, 4, 9, 11, 12, 15, 16, 20, 22, 28, 33, 40 * No external database required: All data is stored locally on the filesystem in LMDB * Hot reloading of config file: No server restart needed for many config param changes +* Zero downtime restarts, for upgrading binary without impacting users * Websocket compression: permessage-deflate with optional sliding window, when supported by clients * Built-in support for real-time streaming (up/down/both) events from remote relays, and bulk import/export of events from/to jsonl files * [negentropy](https://github.com/hoytech/negentropy)-based set reconcilliation for efficient syncing with remote relays @@ -71,6 +72,29 @@ In order to upgrade the DB, you should export and then import again: After you have confirmed everything is working OK, the `dbdump.jsonl` and `data.mdb.bak` files can be deleted. +### Zero Downtime Restarts + +strfry can have multiple different running instances simultaneously listening on the same port, because it uses the `REUSE_PORT` linux socket option. One of the reasons you may want to do this is to restart the relay without impacting currently connected users. This allows you to upgrade the strfry binary, or perform major configuration changes (for the subset of config options that require a restart). + +If you send a `SIGUSR1` signal to a strfry process, it will initiate a "graceful shutdown". This means that it will no longer accept new websocket connections, and after its last existing websocket connection is closed, it will exit. + +So, the typical flow for a zero downtime restart is: + +* Record the PID of the currently running strfry instance. + +* Start a new relay process using the same configuration as the currently running instance: + + strfry relay + + At this point, both instances will be accepting new connections. + +* Initiate the graceful shutdown: + + kill -USR1 $OLD_PID + + Now only the new strfry instance will be accepting connections. The old one will exit once all its connections have been closed. + + ### Stream This command opens a websocket connection to the specified relay and makes a nostr `REQ` request with filter `{"limit":0}`: diff --git a/golpe b/golpe index e670693..2b9d149 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit e6706930d151fa327042371a6f38c169f5526108 +Subproject commit 2b9d149e28d70dc188a3dec6b8dddbe284631f17 diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index 236267c..0acb83a 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -37,7 +37,10 @@ struct MsgWebsocket : NonCopyable { std::string evJson; }; - using Var = std::variant; + struct GracefulShutdown { + }; + + using Var = std::variant; Var msg; MsgWebsocket(Var &&msg_) : msg(std::move(msg_)) {} }; @@ -153,6 +156,7 @@ struct RelayServer { ThreadPool tpReqMonitor; ThreadPool tpNegentropy; std::thread cronThread; + std::thread signalHandlerThread; void run(); @@ -174,6 +178,8 @@ struct RelayServer { void runCron(); + void runSignalHandler(); + // Utils (can be called by any thread) void sendToConn(uint64_t connId, std::string &&payload) { diff --git a/src/apps/relay/RelaySignalHandler.cpp b/src/apps/relay/RelaySignalHandler.cpp new file mode 100644 index 0000000..4455b82 --- /dev/null +++ b/src/apps/relay/RelaySignalHandler.cpp @@ -0,0 +1,25 @@ +#include + +#include "RelayServer.h" + + +void RelayServer::runSignalHandler() { + setThreadName("signalHandler"); + + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGUSR1); + + while (1) { + int sig; + int s = sigwait(&sigset, &sig); + if (s != 0) throw herr("unable to sigwait: ", strerror(errno)); + + if (sig == SIGUSR1) { + tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::GracefulShutdown{}}); + hubTrigger->send(); + } else { + LW << "Got unexpected signal: " << sig; + } + } +} diff --git a/src/apps/relay/RelayWebsocket.cpp b/src/apps/relay/RelayWebsocket.cpp index 0542ee9..b295f4f 100644 --- a/src/apps/relay/RelayWebsocket.cpp +++ b/src/apps/relay/RelayWebsocket.cpp @@ -41,6 +41,7 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { uWS::Group *hubGroup; flat_hash_map connIdToConnection; uint64_t nextConnectionId = 1; + bool gracefulShutdown = false; std::string tempBuf; tempBuf.reserve(cfg().events__maxEventSize + MAX_SUBID_SIZE + 100); @@ -137,6 +138,14 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { connIdToConnection.erase(connId); delete c; + + if (gracefulShutdown) { + LI << "Graceful shutdown in progress: " << connIdToConnection.size() << " connections remaining"; + if (connIdToConnection.size() == 0) { + LW << "All connections closed, shutting down"; + ::exit(0); + } + } }); hubGroup->onMessage2([&](uWS::WebSocket *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) { @@ -183,6 +192,10 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { memcpy(p + 10, subIdSv.data(), subIdSv.size()); doSend(item.connId, std::string_view(p, 13 + subIdSv.size() + msg->evJson.size()), uWS::OpCode::TEXT); } + } else if (std::get_if(&newMsg.msg)) { + LW << "Initiating graceful shutdown: " << connIdToConnection.size() << " connections remaining"; + gracefulShutdown = true; + hubGroup->stopListening(); } } }; diff --git a/src/apps/relay/cmd_relay.cpp b/src/apps/relay/cmd_relay.cpp index c9d0a3c..0ba5c0c 100644 --- a/src/apps/relay/cmd_relay.cpp +++ b/src/apps/relay/cmd_relay.cpp @@ -1,3 +1,6 @@ +#include +#include + #include "RelayServer.h" @@ -8,6 +11,14 @@ void cmd_relay(const std::vector &subArgs) { } void RelayServer::run() { + { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + int s = pthread_sigmask(SIG_BLOCK, &set, NULL); + if (s != 0) throw herr("Unable to set sigmask: ", strerror(errno)); + } + tpWebsocket.init("Websocket", 1, [this](auto &thr){ runWebsocket(thr); }); @@ -36,6 +47,10 @@ void RelayServer::run() { runCron(); }); + signalHandlerThread = std::thread([this]{ + runSignalHandler(); + }); + // Monitor for config file reloads auto configFileChangeWatcher = hoytech::file_change_monitor(configFile);