work on write policy plugin

This commit is contained in:
Doug Hoyte
2023-02-08 16:44:53 -05:00
parent 51243ce62f
commit 61f2638f88
5 changed files with 60 additions and 17 deletions

View File

@ -31,7 +31,9 @@ tables:
type: ubytes
indices:
created_at:
createdAt:
integer: true
receivedAt:
integer: true
id:
comparator: StringUint64
@ -54,8 +56,9 @@ tables:
indexPrelude: |
auto *flat = v.flat_nested();
created_at = flat->created_at();
uint64_t indexTime = *created_at;
createdAt = flat->created_at();
uint64_t indexTime = *createdAt;
receivedAt = v.receivedAt();
id = makeKey_StringUint64(sv(flat->id()), indexTime);
pubkey = makeKey_StringUint64(sv(flat->pubkey()), indexTime);
@ -164,11 +167,11 @@ config:
default: 20
- name: relay__writePolicy__plugin
desc: ""
desc: "If non-empty, path to an executable script that implements the writePolicy plugin logic"
default: ""
- name: relay__writePolicy__lookbackSeconds
desc: ""
default: 21600
desc: "Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback)"
default: 0
- name: relay__compression__enabled
desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU"

View File

@ -187,7 +187,7 @@ struct DBScan {
} else {
scanState = CreatedAtScan{};
auto *state = std::get_if<CreatedAtScan>(&scanState);
indexDbi = env.dbi_Event__created_at;
indexDbi = env.dbi_Event__createdAt;
isComplete = [&, state]{
return state->done;

View File

@ -73,14 +73,15 @@ struct PluginWritePolicy {
}
}
if (!running) setupPlugin();
auto json = tao::json::from_string(jsonStr);
if (!running) {
setupPlugin();
sendLookbackEvents();
}
auto request = tao::json::value({
{ "type", "new" },
{ "event", json },
{ "receivedAt", receivedAt },
{ "event", tao::json::from_string(jsonStr) },
{ "receivedAt", receivedAt / 1000000 },
{ "sourceType", eventSourceTypeToStr(sourceType) },
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },
});
@ -88,7 +89,7 @@ struct PluginWritePolicy {
std::string output = tao::json::to_string(request);
output += "\n";
::fwrite(output.data(), output.size(), 1, running->w);
if (::fwrite(output.data(), 1, output.size(), running->w) != output.size()) throw herr("error writing to plugin");
tao::json::value response;
@ -102,7 +103,8 @@ struct PluginWritePolicy {
LW << "Got unparseable line from write policy plugin: " << buf;
continue;
}
// FIXME: verify id
if (response.at("id").get_string() != request.at("event").at("id").get_string()) throw herr("id mismatch");
break;
}
@ -175,4 +177,40 @@ struct PluginWritePolicy {
running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), path);
}
void sendLookbackEvents() {
if (cfg().relay__writePolicy__lookbackSeconds == 0) return;
Decompressor decomp;
auto now = hoytech::curr_time_us();
uint64_t start = now - (cfg().relay__writePolicy__lookbackSeconds * 1'000'000);
auto txn = env.txn_ro();
env.generic_foreachFull(txn, env.dbi_Event__receivedAt, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
if (lmdb::from_sv<uint64_t>(k) > now) return false;
auto ev = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
if (!ev) throw herr("unable to look up event, corrupt DB?");
auto sourceType = (EventSourceType)ev->sourceType();
std::string_view sourceInfo = ev->sourceInfo();
auto request = tao::json::value({
{ "type", "lookback" },
{ "event", tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId)) },
{ "receivedAt", ev->receivedAt() / 1000000 },
{ "sourceType", eventSourceTypeToStr(sourceType) },
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },
});
std::string output = tao::json::to_string(request);
output += "\n";
if (::fwrite(output.data(), 1, output.size(), running->w) != output.size()) throw herr("error writing to plugin");
return true;
});
}
};

View File

@ -24,7 +24,7 @@ void cmd_export(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv<uint64_t>(since), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
env.generic_foreachFull(txn, env.dbi_Event__createdAt, lmdb::to_sv<uint64_t>(since), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
if (lmdb::from_sv<uint64_t>(k) > until) return false;
auto view = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));

View File

@ -23,7 +23,7 @@ relay {
# Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required)
nofiles = 1000000
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (case-insensitive)
# HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case)
realIpHeader = ""
info {
@ -59,9 +59,11 @@ relay {
maxSubsPerConnection = 20
writePolicy {
# If non-empty, path to an executable script that implements the writePolicy plugin logic
plugin = ""
lookbackSeconds = 21600
# Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback)
lookbackSeconds = 0
}
compression {