re-org plugins

This commit is contained in:
Doug Hoyte
2023-08-17 17:41:00 -04:00
parent 93330c4cbc
commit 98f1020ef3
4 changed files with 35 additions and 38 deletions

View File

@ -16,28 +16,28 @@
#include "golpe.h" #include "golpe.h"
enum class WritePolicyResult { enum class PluginEventSifterResult {
Accept, Accept,
Reject, Reject,
ShadowReject, ShadowReject,
}; };
struct PluginWritePolicy { struct PluginEventSifter {
struct RunningPlugin { struct RunningPlugin {
pid_t pid; pid_t pid;
std::string currPluginPath; std::string currPluginCmd;
struct timespec lastModTime; struct timespec lastModTime;
FILE *r; FILE *r;
FILE *w; FILE *w;
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) { RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginCmd) : pid(pid), currPluginCmd(currPluginCmd) {
r = fdopen(rfd, "r"); r = fdopen(rfd, "r");
w = fdopen(wfd, "w"); w = fdopen(wfd, "w");
setlinebuf(w); setlinebuf(w);
{ {
struct stat statbuf; struct stat statbuf;
if (stat(currPluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", currPluginPath); if (stat(currPluginCmd.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", currPluginCmd);
lastModTime = statbuf.st_mtim; lastModTime = statbuf.st_mtim;
} }
} }
@ -52,21 +52,19 @@ struct PluginWritePolicy {
std::unique_ptr<RunningPlugin> running; std::unique_ptr<RunningPlugin> running;
WritePolicyResult acceptEvent(const tao::json::value &evJson, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { PluginEventSifterResult acceptEvent(const std::string &pluginCmd, const tao::json::value &evJson, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) {
const auto &pluginPath = cfg().relay__writePolicy__plugin; if (pluginCmd.size() == 0) {
if (pluginPath.size() == 0) {
running.reset(); running.reset();
return WritePolicyResult::Accept; return PluginEventSifterResult::Accept;
} }
try { try {
if (running) { if (running) {
if (pluginPath != running->currPluginPath) { if (pluginCmd != running->currPluginCmd) {
running.reset(); running.reset();
} else { } else {
struct stat statbuf; struct stat statbuf;
if (stat(pluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", pluginPath); if (stat(pluginCmd.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", pluginCmd);
if (statbuf.st_mtim.tv_sec != running->lastModTime.tv_sec || statbuf.st_mtim.tv_nsec != running->lastModTime.tv_nsec) { if (statbuf.st_mtim.tv_sec != running->lastModTime.tv_sec || statbuf.st_mtim.tv_nsec != running->lastModTime.tv_nsec) {
running.reset(); running.reset();
} }
@ -74,7 +72,7 @@ struct PluginWritePolicy {
} }
if (!running) { if (!running) {
setupPlugin(); setupPlugin(pluginCmd);
} }
auto request = tao::json::value({ auto request = tao::json::value({
@ -111,15 +109,15 @@ struct PluginWritePolicy {
okMsg = response.optional<std::string>("msg").value_or(""); okMsg = response.optional<std::string>("msg").value_or("");
auto action = response.at("action").get_string(); auto action = response.at("action").get_string();
if (action == "accept") return WritePolicyResult::Accept; if (action == "accept") return PluginEventSifterResult::Accept;
else if (action == "reject") return WritePolicyResult::Reject; else if (action == "reject") return PluginEventSifterResult::Reject;
else if (action == "shadowReject") return WritePolicyResult::ShadowReject; else if (action == "shadowReject") return PluginEventSifterResult::ShadowReject;
else throw herr("unknown action: ", action); else throw herr("unknown action: ", action);
} catch (std::exception &e) { } catch (std::exception &e) {
LE << "Couldn't setup PluginWritePolicy: " << e.what(); LE << "Couldn't setup plugin: " << e.what();
running.reset(); running.reset();
okMsg = "error: internal error"; okMsg = "error: internal error";
return WritePolicyResult::Reject; return PluginEventSifterResult::Reject;
} }
} }
@ -149,9 +147,9 @@ struct PluginWritePolicy {
} }
}; };
void setupPlugin() { private:
auto path = cfg().relay__writePolicy__plugin; void setupPlugin(const std::string &pluginCmd) {
LI << "Setting up write policy plugin: " << path; LI << "Setting up write policy plugin: " << pluginCmd;
Pipe outPipe; Pipe outPipe;
Pipe inPipe; Pipe inPipe;
@ -171,9 +169,9 @@ struct PluginWritePolicy {
posix_spawn_file_actions_addclose(&file_actions, inPipe.fds[1]) posix_spawn_file_actions_addclose(&file_actions, inPipe.fds[1])
) throw herr("posix_span_file_actions failed: ", strerror(errno)); ) throw herr("posix_span_file_actions failed: ", strerror(errno));
auto ret = posix_spawn(&pid, path.c_str(), &file_actions, nullptr, argv, nullptr); auto ret = posix_spawn(&pid, pluginCmd.c_str(), &file_actions, nullptr, argv, nullptr);
if (ret) throw herr("posix_spawn failed to invoke '", path, "': ", strerror(errno)); if (ret) throw herr("posix_spawn failed to invoke '", pluginCmd, "': ", strerror(errno));
running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), path); running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), pluginCmd);
} }
}; };

View File

@ -9,7 +9,7 @@
#include "Subscription.h" #include "Subscription.h"
#include "WSConnection.h" #include "WSConnection.h"
#include "events.h" #include "events.h"
#include "PluginWritePolicy.h" #include "PluginEventSifter.h"
static const char USAGE[] = static const char USAGE[] =
@ -35,8 +35,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
WriterPipeline writer; WriterPipeline writer;
WSConnection ws(url); WSConnection ws(url);
Decompressor decomp; Decompressor decomp;
PluginEventSifter writePolicyPlugin;
PluginWritePolicy writePolicy;
ws.onConnect = [&]{ ws.onConnect = [&]{
@ -68,8 +67,8 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
auto &evJson = origJson.at(2); auto &evJson = origJson.at(2);
std::string okMsg; std::string okMsg;
auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg); auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg);
if (res == WritePolicyResult::Accept) { if (res == PluginEventSifterResult::Accept) {
downloadedIds.emplace(from_hex(evJson.at("id").get_string())); downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.write({ std::move(evJson), EventSourceType::Stream, url }); writer.write({ std::move(evJson), EventSourceType::Stream, url });
} else { } else {

View File

@ -10,7 +10,7 @@
#include "DBQuery.h" #include "DBQuery.h"
#include "filters.h" #include "filters.h"
#include "events.h" #include "events.h"
#include "PluginWritePolicy.h" #include "PluginEventSifter.h"
static const char USAGE[] = static const char USAGE[] =
@ -84,7 +84,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
WriterPipeline writer; WriterPipeline writer;
WSConnection ws(url); WSConnection ws(url);
PluginWritePolicy writePolicy; PluginEventSifter writePolicyPlugin;
ws.reconnect = false; ws.reconnect = false;
@ -160,8 +160,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
auto &evJson = msg.at(2); auto &evJson = msg.at(2);
std::string okMsg; std::string okMsg;
auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Sync, ws.remoteAddr, okMsg); auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Sync, ws.remoteAddr, okMsg);
if (res == WritePolicyResult::Accept) { if (res == PluginEventSifterResult::Accept) {
writer.write({ std::move(evJson), EventSourceType::Sync, url }); writer.write({ std::move(evJson), EventSourceType::Sync, url });
} else { } else {
LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg;

View File

@ -1,10 +1,10 @@
#include "RelayServer.h" #include "RelayServer.h"
#include "PluginWritePolicy.h" #include "PluginEventSifter.h"
void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) { void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
PluginWritePolicy writePolicy; PluginEventSifter writePolicyPlugin;
while(1) { while(1) {
auto newMsgs = thr.inbox.pop_all(); auto newMsgs = thr.inbox.pop_all();
@ -40,9 +40,9 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
tao::json::value evJson = tao::json::from_string(msg->jsonStr); tao::json::value evJson = tao::json::from_string(msg->jsonStr);
EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6;
std::string okMsg; std::string okMsg;
auto res = writePolicy.acceptEvent(evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg); auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg);
if (res == WritePolicyResult::Accept) { if (res == PluginEventSifterResult::Accept) {
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);
} else { } else {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(msg->flatStr.data()); auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(msg->flatStr.data());
@ -50,7 +50,7 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg; LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg;
sendOKResponse(msg->connId, eventIdHex, res == WritePolicyResult::ShadowReject, okMsg); sendOKResponse(msg->connId, eventIdHex, res == PluginEventSifterResult::ShadowReject, okMsg);
} }
} }
} }