make bulk export commands gracefully exit on SIGPIPE (for example, if piped to head)

This commit is contained in:
Doug Hoyte
2023-07-25 00:37:56 -04:00
parent 80915f969d
commit 0fe929ffcb
6 changed files with 15 additions and 1 deletions

1
TODO
View File

@ -28,5 +28,4 @@ rate limits (maybe not needed now that we have plugins?)
misc
? periodic reaping of disconnected sockets (maybe autoping is doing this already)
? export not dying on SIGPIPE, or not getting SIGPIPE
? why isn't the LMDB mapping CLOEXEC

View File

@ -32,6 +32,8 @@ void cmd_export(const std::vector<std::string> &subArgs) {
uint64_t start = reverse ? until : since;
uint64_t startDup = reverse ? MAX_U64 : 0;
exitOnSigPipe();
env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(startDup), [&](auto k, auto v) {
if (reverse) {
if (lmdb::from_sv<uint64_t>(k) < since) return false;

View File

@ -54,6 +54,8 @@ void cmd_monitor(const std::vector<std::string> &subArgs) {
}
}
exitOnSigPipe();
env.foreach_Event(txn, [&](auto &ev){
monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){
for (auto &r : recipients) {

View File

@ -34,6 +34,8 @@ void cmd_scan(const std::vector<std::string> &subArgs) {
uint64_t numEvents = 0;
exitOnSigPipe();
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
if (count) numEvents++;

View File

@ -16,3 +16,4 @@ uint64_t parseUint64(const std::string &s);
std::string parseIP(const std::string &ip);
uint64_t getDBVersion(lmdb::txn &txn);
std::string padBytes(std::string_view str, size_t n, char padChar);
void exitOnSigPipe();

View File

@ -1,5 +1,6 @@
#include <arpa/inet.h>
#include <stdio.h>
#include <signal.h>
#include <algorithm>
#include <string>
@ -109,3 +110,10 @@ std::string padBytes(std::string_view str, size_t n, char padChar) {
if (str.size() > n) throw herr("unable to pad, string longer than expected");
return std::string(str) + std::string(n - str.size(), padChar);
}
void exitOnSigPipe() {
struct sigaction act;
memset(&act, 0, sizeof act);
act.sa_sigaction = [](int, siginfo_t*, void*){ ::exit(1); };
if (sigaction(SIGPIPE, &act, nullptr)) throw herr("couldn't run sigaction(): ", strerror(errno));
}