more work on write policy plugins

This commit is contained in:
Doug Hoyte
2023-02-08 15:04:59 -05:00
parent 50a3b5ed71
commit 4eb7a4fe53
6 changed files with 143 additions and 56 deletions

1
TODO
View File

@ -3,6 +3,7 @@
when disk is full it should log warning but not crash when disk is full it should log warning but not crash
ensure DB upgrade flow works ensure DB upgrade flow works
disable sync disable sync
get IP from HTTP header
? why isn't the LMDB mapping CLOEXEC ? why isn't the LMDB mapping CLOEXEC
0.2 release 0.2 release

View File

@ -160,9 +160,12 @@ config:
desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time" desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time"
default: 20 default: 20
- name: relay__plugins__writePolicyPath - name: relay__writePolicy__plugin
desc: "" desc: ""
default: "" default: ""
- name: relay__writePolicy__lookbackSeconds
desc: ""
default: 21600
- name: relay__compression__enabled - name: relay__compression__enabled
desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU" desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU"

View File

@ -7,13 +7,123 @@
#include <stdio.h> #include <stdio.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>
#include <memory> #include <memory>
#include "golpe.h" #include "golpe.h"
enum class WritePolicyResult {
Accept,
Reject,
ShadowReject,
};
struct PluginWritePolicy { struct PluginWritePolicy {
struct RunningPlugin {
pid_t pid;
std::string currPluginPath;
struct timespec lastModTime;
FILE *r;
FILE *w;
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) {
r = fdopen(rfd, "r");
w = fdopen(wfd, "w");
setlinebuf(w);
{
struct stat statbuf;
if (stat(currPluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", currPluginPath);
lastModTime = statbuf.st_mtim;
}
}
~RunningPlugin() {
fclose(r);
fclose(w);
kill(pid, SIGTERM);
waitpid(pid, nullptr, 0);
}
};
std::unique_ptr<RunningPlugin> running;
WritePolicyResult acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) {
const auto &pluginPath = cfg().relay__writePolicy__plugin;
if (pluginPath.size() == 0) {
running.reset();
return WritePolicyResult::Accept;
}
try {
if (running) {
if (pluginPath != running->currPluginPath) {
running.reset();
} else {
struct stat statbuf;
if (stat(pluginPath.c_str(), &statbuf)) throw herr("couldn't stat plugin: ", pluginPath);
if (statbuf.st_mtim.tv_sec != running->lastModTime.tv_sec || statbuf.st_mtim.tv_nsec != running->lastModTime.tv_nsec) {
running.reset();
}
}
}
if (!running) setupPlugin();
auto json = tao::json::from_string(jsonStr);
auto request = tao::json::value({
{ "type", "new" },
{ "event", json },
{ "receivedAt", receivedAt },
{ "sourceType", eventSourceTypeToStr(sourceType) },
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },
});
std::string output = tao::json::to_string(request);
output += "\n";
::fwrite(output.data(), output.size(), 1, running->w);
tao::json::value response;
while (1) {
char buf[8192];
if (!fgets(buf, sizeof(buf), running->r)) throw herr("pipe to plugin was closed (plugin crashed?)");
try {
response = tao::json::from_string(buf);
} catch (std::exception &e) {
LW << "Got unparseable line from write policy plugin: " << buf;
continue;
}
// FIXME: verify id
break;
}
okMsg = response.optional<std::string>("msg").value_or("");
auto action = response.at("action").get_string();
if (action == "accept") return WritePolicyResult::Accept;
else if (action == "reject") return WritePolicyResult::Reject;
else if (action == "shadowReject") return WritePolicyResult::ShadowReject;
else throw herr("unknown action: ", action);
} catch (std::exception &e) {
LE << "Couldn't setup PluginWritePolicy: " << e.what();
running.reset();
okMsg = "error: internal error";
return WritePolicyResult::Reject;
}
}
struct Pipe : NonCopyable { struct Pipe : NonCopyable {
int fds[2] = { -1, -1 }; int fds[2] = { -1, -1 };
@ -38,58 +148,9 @@ struct PluginWritePolicy {
} }
}; };
struct RunningPlugin {
pid_t pid;
std::string currPluginPath;
FILE *r;
FILE *w;
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) {
r = fdopen(rfd, "r");
w = fdopen(wfd, "w");
setlinebuf(w);
}
~RunningPlugin() {
fclose(r);
fclose(w);
waitpid(pid, nullptr, 0);
}
};
std::unique_ptr<RunningPlugin> running;
bool acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo) {
if (cfg().relay__plugins__writePolicyPath.size() == 0) return true;
if (!running) {
try {
setupPlugin();
} catch (std::exception &e) {
LE << "Couldn't setup PluginWritePolicy: " << e.what();
return false;
}
}
std::string output;
output += jsonStr;
output += "\n";
::fwrite(output.data(), output.size(), 1, running->w);
{
char buf[4096];
fgets(buf, sizeof(buf), running->r);
auto j = tao::json::from_string(buf);
LI << "QQQ " << j;
}
return true;
}
void setupPlugin() { void setupPlugin() {
auto path = cfg().relay__plugins__writePolicyPath; auto path = cfg().relay__writePolicy__plugin;
LI << "Setting up write policy plugin: " << path;
Pipe outPipe; Pipe outPipe;
Pipe inPipe; Pipe inPipe;

View File

@ -18,8 +18,19 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
for (auto &newMsg : newMsgs) { for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) { if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6;
if (!writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr)) continue; std::string okMsg;
auto res = writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr, okMsg);
if (res == WritePolicyResult::Accept) {
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);
} else {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(msg->flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg;
sendOKResponse(msg->connId, eventIdHex, res == WritePolicyResult::ShadowReject, okMsg);
}
} }
} }

View File

@ -67,6 +67,15 @@ enum class EventSourceType {
Sync = 5, Sync = 5,
}; };
inline std::string eventSourceTypeToStr(EventSourceType t) {
if (t == EventSourceType::IP4) return "IP4";
else if (t == EventSourceType::IP6) return "IP6";
else if (t == EventSourceType::Import) return "Import";
else if (t == EventSourceType::Stream) return "Stream";
else if (t == EventSourceType::Sync) return "Sync";
else return "?";
}
enum class EventWriteStatus { enum class EventWriteStatus {

View File

@ -55,8 +55,10 @@ relay {
# Maximum number of subscriptions (concurrent REQs) a connection can have open at any time # Maximum number of subscriptions (concurrent REQs) a connection can have open at any time
maxSubsPerConnection = 20 maxSubsPerConnection = 20
plugins { writePolicy {
writePolicyPath = "./test.pl" plugin = ""
lookbackSeconds = 21600
} }
compression { compression {