write policy wip

This commit is contained in:
Doug Hoyte
2023-02-07 14:00:21 -05:00
parent 79dcceaee0
commit 1d2295bd7d
6 changed files with 135 additions and 4 deletions

1
TODO
View File

@ -6,6 +6,7 @@
fix sync fix sync
when disk is full it should log warning but not crash when disk is full it should log warning but not crash
ensure DB upgrade flow works ensure DB upgrade flow works
? why isn't the LMDB mapping CLOEXEC
features features
finish syncing finish syncing

2
golpe

Submodule golpe updated: e938a71c0d...620e8233da

View File

@ -95,7 +95,7 @@ config:
default: 256 default: 256
noReload: true noReload: true
- name: dbParams__mapsize - name: dbParams__mapsize
desc: "Size of mmap() to use when loading LMDB (does *not* correspond to disk-space used, default is 10TB)" desc: "Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used)"
default: 10995116277760 default: 10995116277760
noReload: true noReload: true
@ -146,6 +146,10 @@ config:
desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time" desc: "Maximum number of subscriptions (concurrent REQs) a connection can have open at any time"
default: 20 default: 20
- name: relay__plugins__writePolicyPath
desc: ""
default: ""
- name: relay__compression__enabled - name: relay__compression__enabled
desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU" desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU"
default: true default: true

117
src/PluginWritePolicy.h Normal file
View File

@ -0,0 +1,117 @@
#pragma once
#include <string.h>
#include <errno.h>
#include <spawn.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <memory>
#include "golpe.h"
struct PluginWritePolicy {
struct Pipe : NonCopyable {
int fds[2] = { -1, -1 };
Pipe() {
if (::pipe(fds)) throw herr("pipe failed: ", strerror(errno));
}
Pipe(int fd0, int fd1) {
fds[0] = fd0;
fds[1] = fd1;
}
~Pipe() {
if (fds[0] != -1) ::close(fds[0]);
if (fds[1] != -1) ::close(fds[1]);
}
int saveFd(int offset) {
int fd = fds[offset];
fds[offset] = -1;
return fd;
}
};
struct RunningPlugin {
pid_t pid;
std::string currPluginPath;
FILE *r;
FILE *w;
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) {
r = fdopen(rfd, "r");
w = fdopen(wfd, "w");
setlinebuf(w);
}
~RunningPlugin() {
fclose(r);
fclose(w);
waitpid(pid, nullptr, 0);
}
};
std::unique_ptr<RunningPlugin> running;
bool acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo) {
if (cfg().relay__plugins__writePolicyPath.size() == 0) return true;
if (!running) {
try {
setupPlugin();
} catch (std::exception &e) {
LE << "Couldn't setup PluginWritePolicy: " << e.what();
return false;
}
}
std::string output;
output += jsonStr;
output += "\n";
::fwrite(output.data(), output.size(), 1, running->w);
{
char buf[4096];
fgets(buf, sizeof(buf), running->r);
auto j = tao::json::from_string(buf);
LI << "QQQ " << j;
}
return true;
}
void setupPlugin() {
auto path = cfg().relay__plugins__writePolicyPath;
Pipe outPipe;
Pipe inPipe;
pid_t pid;
char *argv[] = { nullptr, };
posix_spawn_file_actions_t file_actions;
if (
posix_spawn_file_actions_init(&file_actions) ||
posix_spawn_file_actions_adddup2(&file_actions, outPipe.fds[0], 0) ||
posix_spawn_file_actions_adddup2(&file_actions, inPipe.fds[1], 1) ||
posix_spawn_file_actions_addclose(&file_actions, outPipe.fds[0]) ||
posix_spawn_file_actions_addclose(&file_actions, outPipe.fds[1]) ||
posix_spawn_file_actions_addclose(&file_actions, inPipe.fds[0]) ||
posix_spawn_file_actions_addclose(&file_actions, inPipe.fds[1])
) throw herr("posix_span_file_actions failed: ", strerror(errno));
auto ret = posix_spawn(&pid, path.c_str(), &file_actions, nullptr, argv, nullptr);
if (ret) throw herr("posix_spawn failed when to invoke '", path, "': ", strerror(errno));
running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), path);
}
};

View File

@ -1,9 +1,13 @@
#include "RelayServer.h" #include "RelayServer.h"
#include "PluginWritePolicy.h"
void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) { void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
auto qdb = getQdbInstance(); auto qdb = getQdbInstance();
PluginWritePolicy writePolicy;
while(1) { while(1) {
auto newMsgs = thr.inbox.pop_all(); auto newMsgs = thr.inbox.pop_all();
@ -14,6 +18,7 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
for (auto &newMsg : newMsgs) { for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) { if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6;
if (!writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr)) continue;
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);
} }
} }

View File

@ -9,7 +9,7 @@ dbParams {
# Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required) # Maximum number of threads/processes that can simultaneously have LMDB transactions open (restart required)
maxreaders = 256 maxreaders = 256
# Size of mmap() to use when loading LMDB (does *not* correspond to disk-space used, default is 10TB) (restart required) # Size of mmap() to use when loading LMDB (default is 10TB, does *not* correspond to disk-space used) (restart required)
mapsize = 10995116277760 mapsize = 10995116277760
} }
@ -53,7 +53,11 @@ relay {
maxFilterLimit = 500 maxFilterLimit = 500
# Maximum number of subscriptions (concurrent REQs) a connection can have open at any time # Maximum number of subscriptions (concurrent REQs) a connection can have open at any time
maxSubsPerConnection = 2 maxSubsPerConnection = 20
plugins {
writePolicyPath = "./test.pl"
}
compression { compression {
# Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required) # Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU (restart required)