diff --git a/src/PluginWritePolicy.h b/src/PluginEventSifter.h similarity index 74% rename from src/PluginWritePolicy.h rename to src/PluginEventSifter.h index 8696d2d..9ba7541 100644 --- a/src/PluginWritePolicy.h +++ b/src/PluginEventSifter.h @@ -16,28 +16,28 @@ #include "golpe.h" -enum class WritePolicyResult { +enum class PluginEventSifterResult { Accept, Reject, ShadowReject, }; -struct PluginWritePolicy { +struct PluginEventSifter { struct RunningPlugin { pid_t pid; - std::string currPluginPath; + std::string currPluginCmd; struct timespec lastModTime; FILE *r; FILE *w; - RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) { + RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginCmd) : pid(pid), currPluginCmd(currPluginCmd) { r = fdopen(rfd, "r"); w = fdopen(wfd, "w"); setlinebuf(w); { struct stat statbuf; - if (stat(currPluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", currPluginPath); + if (stat(currPluginCmd.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", currPluginCmd); lastModTime = statbuf.st_mtim; } } @@ -52,21 +52,19 @@ struct PluginWritePolicy { std::unique_ptr running; - WritePolicyResult acceptEvent(const tao::json::value &evJson, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { - const auto &pluginPath = cfg().relay__writePolicy__plugin; - - if (pluginPath.size() == 0) { + PluginEventSifterResult acceptEvent(const std::string &pluginCmd, const tao::json::value &evJson, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { + if (pluginCmd.size() == 0) { running.reset(); - return WritePolicyResult::Accept; + return PluginEventSifterResult::Accept; } try { if (running) { - if (pluginPath != running->currPluginPath) { + if (pluginCmd != running->currPluginCmd) { running.reset(); } else { struct stat statbuf; - if (stat(pluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", pluginPath); + if (stat(pluginCmd.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", pluginCmd); if (statbuf.st_mtim.tv_sec != running->lastModTime.tv_sec || statbuf.st_mtim.tv_nsec != running->lastModTime.tv_nsec) { running.reset(); } @@ -74,7 +72,7 @@ struct PluginWritePolicy { } if (!running) { - setupPlugin(); + setupPlugin(pluginCmd); } auto request = tao::json::value({ @@ -111,15 +109,15 @@ struct PluginWritePolicy { okMsg = response.optional("msg").value_or(""); auto action = response.at("action").get_string(); - if (action == "accept") return WritePolicyResult::Accept; - else if (action == "reject") return WritePolicyResult::Reject; - else if (action == "shadowReject") return WritePolicyResult::ShadowReject; + if (action == "accept") return PluginEventSifterResult::Accept; + else if (action == "reject") return PluginEventSifterResult::Reject; + else if (action == "shadowReject") return PluginEventSifterResult::ShadowReject; else throw herr("unknown action: ", action); } catch (std::exception &e) { - LE << "Couldn't setup PluginWritePolicy: " << e.what(); + LE << "Couldn't setup plugin: " << e.what(); running.reset(); okMsg = "error: internal error"; - return WritePolicyResult::Reject; + return PluginEventSifterResult::Reject; } } @@ -149,9 +147,9 @@ struct PluginWritePolicy { } }; - void setupPlugin() { - auto path = cfg().relay__writePolicy__plugin; - LI << "Setting up write policy plugin: " << path; + private: + void setupPlugin(const std::string &pluginCmd) { + LI << "Setting up write policy plugin: " << pluginCmd; Pipe outPipe; Pipe inPipe; @@ -171,9 +169,9 @@ struct PluginWritePolicy { posix_spawn_file_actions_addclose(&file_actions, inPipe.fds[1]) ) throw herr("posix_span_file_actions failed: ", strerror(errno)); - auto ret = posix_spawn(&pid, path.c_str(), &file_actions, nullptr, argv, nullptr); - if (ret) throw herr("posix_spawn failed to invoke '", path, "': ", strerror(errno)); + auto ret = posix_spawn(&pid, pluginCmd.c_str(), &file_actions, nullptr, argv, nullptr); + if (ret) throw herr("posix_spawn failed to invoke '", pluginCmd, "': ", strerror(errno)); - running = make_unique(pid, inPipe.saveFd(0), outPipe.saveFd(1), path); + running = make_unique(pid, inPipe.saveFd(0), outPipe.saveFd(1), pluginCmd); } }; diff --git a/src/apps/mesh/cmd_stream.cpp b/src/apps/mesh/cmd_stream.cpp index 10ca745..cffb80e 100644 --- a/src/apps/mesh/cmd_stream.cpp +++ b/src/apps/mesh/cmd_stream.cpp @@ -9,7 +9,7 @@ #include "Subscription.h" #include "WSConnection.h" #include "events.h" -#include "PluginWritePolicy.h" +#include "PluginEventSifter.h" static const char USAGE[] = @@ -35,8 +35,7 @@ void cmd_stream(const std::vector &subArgs) { WriterPipeline writer; WSConnection ws(url); Decompressor decomp; - - PluginWritePolicy writePolicy; + PluginEventSifter writePolicyPlugin; ws.onConnect = [&]{ @@ -68,8 +67,8 @@ void cmd_stream(const std::vector &subArgs) { auto &evJson = origJson.at(2); std::string okMsg; - auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg); - if (res == WritePolicyResult::Accept) { + auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg); + if (res == PluginEventSifterResult::Accept) { downloadedIds.emplace(from_hex(evJson.at("id").get_string())); writer.write({ std::move(evJson), EventSourceType::Stream, url }); } else { diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 24ab415..3f9a228 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -10,7 +10,7 @@ #include "DBQuery.h" #include "filters.h" #include "events.h" -#include "PluginWritePolicy.h" +#include "PluginEventSifter.h" static const char USAGE[] = @@ -84,7 +84,7 @@ void cmd_sync(const std::vector &subArgs) { WriterPipeline writer; WSConnection ws(url); - PluginWritePolicy writePolicy; + PluginEventSifter writePolicyPlugin; ws.reconnect = false; @@ -160,8 +160,8 @@ void cmd_sync(const std::vector &subArgs) { auto &evJson = msg.at(2); std::string okMsg; - auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Sync, ws.remoteAddr, okMsg); - if (res == WritePolicyResult::Accept) { + auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Sync, ws.remoteAddr, okMsg); + if (res == PluginEventSifterResult::Accept) { writer.write({ std::move(evJson), EventSourceType::Sync, url }); } else { LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; diff --git a/src/apps/relay/RelayWriter.cpp b/src/apps/relay/RelayWriter.cpp index 9123d52..3460930 100644 --- a/src/apps/relay/RelayWriter.cpp +++ b/src/apps/relay/RelayWriter.cpp @@ -1,10 +1,10 @@ #include "RelayServer.h" -#include "PluginWritePolicy.h" +#include "PluginEventSifter.h" void RelayServer::runWriter(ThreadPool::Thread &thr) { - PluginWritePolicy writePolicy; + PluginEventSifter writePolicyPlugin; while(1) { auto newMsgs = thr.inbox.pop_all(); @@ -40,9 +40,9 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { tao::json::value evJson = tao::json::from_string(msg->jsonStr); EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; std::string okMsg; - auto res = writePolicy.acceptEvent(evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg); + auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg); - if (res == WritePolicyResult::Accept) { + if (res == PluginEventSifterResult::Accept) { newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); } else { auto *flat = flatbuffers::GetRoot(msg->flatStr.data()); @@ -50,7 +50,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg; - sendOKResponse(msg->connId, eventIdHex, res == WritePolicyResult::ShadowReject, okMsg); + sendOKResponse(msg->connId, eventIdHex, res == PluginEventSifterResult::ShadowReject, okMsg); } } }