From 1d2295bd7dc8ccff4111e9383d4e33118ad899fd Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Tue, 7 Feb 2023 14:00:21 -0500 Subject: [PATCH] write policy wip --- TODO | 1 + golpe | 2 +- golpe.yaml | 6 ++- src/PluginWritePolicy.h | 117 ++++++++++++++++++++++++++++++++++++++++ src/RelayWriter.cpp | 5 ++ strfry.conf | 8 ++- 6 files changed, 135 insertions(+), 4 deletions(-) create mode 100644 src/PluginWritePolicy.h diff --git a/TODO b/TODO index fa04ef1..7241902 100644 --- a/TODO +++ b/TODO @@ -6,6 +6,7 @@ fix sync when disk is full it should log warning but not crash ensure DB upgrade flow works + ? why isn't the LMDB mapping CLOEXEC features finish syncing diff --git a/golpe b/golpe index e938a71..620e823 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit e938a71c0d5bda1bf89594d3f745056af70ff7ec +Subproject commit 620e8233da82fb853d9a63797a7ca0ae95bddc8e diff --git a/golpe.yaml b/golpe.yaml index e1c1262..0637677 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -95,7 +95,7 @@ config: default: 256 noReload: true - name: dbParams__mapsize - desc: "Size of mmap() to use when loading LMDB (does *not* correspond to disk-space used, default is 10TB)" + desc: "Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used)" default: 10995116277760 noReload: true @@ -146,6 +146,10 @@ config: desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time" default: 20 + - name: relay__plugins__writePolicyPath + desc: "" + default: "" + - name: relay__compression__enabled desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU" default: true diff --git a/src/PluginWritePolicy.h b/src/PluginWritePolicy.h new file mode 100644 index 0000000..11d3290 --- /dev/null +++ b/src/PluginWritePolicy.h @@ -0,0 +1,117 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "golpe.h" + + +struct PluginWritePolicy { + struct Pipe : NonCopyable { + int fds[2] = { -1, -1 }; + + Pipe() { + if (::pipe(fds)) throw herr("pipe failed: ", strerror(errno)); + } + + Pipe(int fd0, int fd1) { + fds[0] = fd0; + fds[1] = fd1; + } + + ~Pipe() { + if (fds[0] != -1) ::close(fds[0]); + if (fds[1] != -1) ::close(fds[1]); + } + + int saveFd(int offset) { + int fd = fds[offset]; + fds[offset] = -1; + return fd; + } + }; + + struct RunningPlugin { + pid_t pid; + std::string currPluginPath; + FILE *r; + FILE *w; + + RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) { + r = fdopen(rfd, "r"); + w = fdopen(wfd, "w"); + setlinebuf(w); + } + + ~RunningPlugin() { + fclose(r); + fclose(w); + waitpid(pid, nullptr, 0); + } + }; + + std::unique_ptr running; + + bool acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo) { + if (cfg().relay__plugins__writePolicyPath.size() == 0) return true; + + if (!running) { + try { + setupPlugin(); + } catch (std::exception &e) { + LE << "Couldn't setup PluginWritePolicy: " << e.what(); + return false; + } + } + + std::string output; + output += jsonStr; + output += "\n"; + + ::fwrite(output.data(), output.size(), 1, running->w); + + { + char buf[4096]; + fgets(buf, sizeof(buf), running->r); + auto j = tao::json::from_string(buf); + LI << "QQQ " << j; + } + + return true; + } + + + void setupPlugin() { + auto path = cfg().relay__plugins__writePolicyPath; + + Pipe outPipe; + Pipe inPipe; + + pid_t pid; + char *argv[] = { nullptr, }; + + posix_spawn_file_actions_t file_actions; + + if ( + posix_spawn_file_actions_init(&file_actions) || + posix_spawn_file_actions_adddup2(&file_actions, outPipe.fds[0], 0) || + posix_spawn_file_actions_adddup2(&file_actions, inPipe.fds[1], 1) || + posix_spawn_file_actions_addclose(&file_actions, outPipe.fds[0]) || + posix_spawn_file_actions_addclose(&file_actions, outPipe.fds[1]) || + posix_spawn_file_actions_addclose(&file_actions, inPipe.fds[0]) || + posix_spawn_file_actions_addclose(&file_actions, inPipe.fds[1]) + ) throw herr("posix_span_file_actions failed: ", strerror(errno)); + + auto ret = posix_spawn(&pid, path.c_str(), &file_actions, nullptr, argv, nullptr); + if (ret) throw herr("posix_spawn failed when to invoke '", path, "': ", strerror(errno)); + + running = make_unique(pid, inPipe.saveFd(0), outPipe.saveFd(1), path); + } +}; diff --git a/src/RelayWriter.cpp b/src/RelayWriter.cpp index 94a1413..4bbc77b 100644 --- a/src/RelayWriter.cpp +++ b/src/RelayWriter.cpp @@ -1,9 +1,13 @@ #include "RelayServer.h" +#include "PluginWritePolicy.h" + void RelayServer::runWriter(ThreadPool::Thread &thr) { auto qdb = getQdbInstance(); + PluginWritePolicy writePolicy; + while(1) { auto newMsgs = thr.inbox.pop_all(); @@ -14,6 +18,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { for (auto &newMsg : newMsgs) { if (auto msg = std::get_if(&newMsg.msg)) { EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; + if (!writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr)) continue; newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); } } diff --git a/strfry.conf b/strfry.conf index f41bebc..9a06608 100644 --- a/strfry.conf +++ b/strfry.conf @@ -9,7 +9,7 @@ dbParams { # Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required) maxreaders = 256 - # Size of mmap() to use when loading LMDB (does *not* correspond to disk-space used, default is 10TB) (restart required) + # Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used) (restart required) mapsize = 10995116277760 } @@ -53,7 +53,11 @@ relay { maxFilterLimit = 500 # Maximum number of subscriptions (concurrent REQs) a connection can have open at any time - maxSubsPerConnection = 2 + maxSubsPerConnection = 20 + + plugins { + writePolicyPath = "./test.pl" + } compression { # Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required)