From 4eb7a4fe53a2aaa2e1b71dff0173244d47067aa0 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Wed, 8 Feb 2023 15:04:59 -0500 Subject: [PATCH] more work on write policy plugins --- TODO | 1 + golpe.yaml | 5 +- src/PluginWritePolicy.h | 163 +++++++++++++++++++++++++++------------- src/RelayWriter.cpp | 15 +++- src/events.h | 9 +++ strfry.conf | 6 +- 6 files changed, 143 insertions(+), 56 deletions(-) diff --git a/TODO b/TODO index 96d623c..cdd0745 100644 --- a/TODO +++ b/TODO @@ -3,6 +3,7 @@ when disk is full it should log warning but not crash ensure DB upgrade flow works disable sync + get IP from HTTP header ? why isn't the LMDB mapping CLOEXEC 0.2 release diff --git a/golpe.yaml b/golpe.yaml index 12f758c..97f5c08 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -160,9 +160,12 @@ config: desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time" default: 20 - - name: relay__plugins__writePolicyPath + - name: relay__writePolicy__plugin desc: "" default: "" + - name: relay__writePolicy__lookbackSeconds + desc: "" + default: 21600 - name: relay__compression__enabled desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU" diff --git a/src/PluginWritePolicy.h b/src/PluginWritePolicy.h index 11d3290..a274c4f 100644 --- a/src/PluginWritePolicy.h +++ b/src/PluginWritePolicy.h @@ -7,13 +7,123 @@ #include #include #include +#include +#include +#include #include #include "golpe.h" +enum class WritePolicyResult { + Accept, + Reject, + ShadowReject, +}; + + struct PluginWritePolicy { + struct RunningPlugin { + pid_t pid; + std::string currPluginPath; + struct timespec lastModTime; + FILE *r; + FILE *w; + + RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) { + r = fdopen(rfd, "r"); + w = fdopen(wfd, "w"); + setlinebuf(w); + { + struct stat statbuf; + if (stat(currPluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", currPluginPath); + lastModTime = statbuf.st_mtim; + } + } + + ~RunningPlugin() { + fclose(r); + fclose(w); + kill(pid, SIGTERM); + waitpid(pid, nullptr, 0); + } + }; + + std::unique_ptr running; + + WritePolicyResult acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { + const auto &pluginPath = cfg().relay__writePolicy__plugin; + + if (pluginPath.size() == 0) { + running.reset(); + return WritePolicyResult::Accept; + } + + try { + if (running) { + if (pluginPath != running->currPluginPath) { + running.reset(); + } else { + struct stat statbuf; + if (stat(pluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", pluginPath); + if (statbuf.st_mtim.tv_sec != running->lastModTime.tv_sec || statbuf.st_mtim.tv_nsec != running->lastModTime.tv_nsec) { + running.reset(); + } + } + } + + if (!running) setupPlugin(); + + auto json = tao::json::from_string(jsonStr); + + auto request = tao::json::value({ + { "type", "new" }, + { "event", json }, + { "receivedAt", receivedAt }, + { "sourceType", eventSourceTypeToStr(sourceType) }, + { "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo }, + }); + + std::string output = tao::json::to_string(request); + output += "\n"; + + ::fwrite(output.data(), output.size(), 1, running->w); + + tao::json::value response; + + while (1) { + char buf[8192]; + if (!fgets(buf, sizeof(buf), running->r)) throw herr("pipe to plugin was closed (plugin crashed?)"); + + try { + response = tao::json::from_string(buf); + } catch (std::exception &e) { + LW << "Got unparseable line from write policy plugin: " << buf; + continue; + } + // FIXME: verify id + + break; + } + + okMsg = response.optional("msg").value_or(""); + + auto action = response.at("action").get_string(); + if (action == "accept") return WritePolicyResult::Accept; + else if (action == "reject") return WritePolicyResult::Reject; + else if (action == "shadowReject") return WritePolicyResult::ShadowReject; + else throw herr("unknown action: ", action); + } catch (std::exception &e) { + LE << "Couldn't setup PluginWritePolicy: " << e.what(); + running.reset(); + okMsg = "error: internal error"; + return WritePolicyResult::Reject; + } + } + + + struct Pipe : NonCopyable { int fds[2] = { -1, -1 }; @@ -38,58 +148,9 @@ struct PluginWritePolicy { } }; - struct RunningPlugin { - pid_t pid; - std::string currPluginPath; - FILE *r; - FILE *w; - - RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) { - r = fdopen(rfd, "r"); - w = fdopen(wfd, "w"); - setlinebuf(w); - } - - ~RunningPlugin() { - fclose(r); - fclose(w); - waitpid(pid, nullptr, 0); - } - }; - - std::unique_ptr running; - - bool acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo) { - if (cfg().relay__plugins__writePolicyPath.size() == 0) return true; - - if (!running) { - try { - setupPlugin(); - } catch (std::exception &e) { - LE << "Couldn't setup PluginWritePolicy: " << e.what(); - return false; - } - } - - std::string output; - output += jsonStr; - output += "\n"; - - ::fwrite(output.data(), output.size(), 1, running->w); - - { - char buf[4096]; - fgets(buf, sizeof(buf), running->r); - auto j = tao::json::from_string(buf); - LI << "QQQ " << j; - } - - return true; - } - - void setupPlugin() { - auto path = cfg().relay__plugins__writePolicyPath; + auto path = cfg().relay__writePolicy__plugin; + LI << "Setting up write policy plugin: " << path; Pipe outPipe; Pipe inPipe; diff --git a/src/RelayWriter.cpp b/src/RelayWriter.cpp index 4bbc77b..217148d 100644 --- a/src/RelayWriter.cpp +++ b/src/RelayWriter.cpp @@ -18,8 +18,19 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { for (auto &newMsg : newMsgs) { if (auto msg = std::get_if(&newMsg.msg)) { EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; - if (!writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr)) continue; - newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); + std::string okMsg; + auto res = writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr, okMsg); + + if (res == WritePolicyResult::Accept) { + newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); + } else { + auto *flat = flatbuffers::GetRoot(msg->flatStr.data()); + auto eventIdHex = to_hex(sv(flat->id())); + + LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg; + + sendOKResponse(msg->connId, eventIdHex, res == WritePolicyResult::ShadowReject, okMsg); + } } } diff --git a/src/events.h b/src/events.h index 21f839b..a380abb 100644 --- a/src/events.h +++ b/src/events.h @@ -67,6 +67,15 @@ enum class EventSourceType { Sync = 5, }; +inline std::string eventSourceTypeToStr(EventSourceType t) { + if (t == EventSourceType::IP4) return "IP4"; + else if (t == EventSourceType::IP6) return "IP6"; + else if (t == EventSourceType::Import) return "Import"; + else if (t == EventSourceType::Stream) return "Stream"; + else if (t == EventSourceType::Sync) return "Sync"; + else return "?"; +} + enum class EventWriteStatus { diff --git a/strfry.conf b/strfry.conf index 9a06608..8c06e02 100644 --- a/strfry.conf +++ b/strfry.conf @@ -55,8 +55,10 @@ relay { # Maximum number of subscriptions (concurrent REQs) a connection can have open at any time maxSubsPerConnection = 20 - plugins { - writePolicyPath = "./test.pl" + writePolicy { + plugin = "" + + lookbackSeconds = 21600 } compression {