mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-17 00:38:50 +00:00
remove lookbehind feature from plugins
This commit is contained in:
@ -10,17 +10,15 @@ A plugin can be implemented in any programming language that supports reading li
|
||||
|
||||
Whenever the script's modification-time changes, or the plugin settings in `strfry.conf` change, the plugin will be reloaded upon the next write attempt.
|
||||
|
||||
If configured, When a plugin is loaded some number of recently stored events will be sent to it as a "lookback". This is useful for populating the initial rate-limiting state. Plugins should print nothing in response to a lookback message.
|
||||
|
||||
|
||||
## Input messages
|
||||
|
||||
Input messages contain the following keys:
|
||||
|
||||
* `type`: Either `new` or `lookback`
|
||||
* `type`: Currently always `new`
|
||||
* `event`: The event posted by the client, with all the required fields such as `id`, `pubkey`, etc
|
||||
* `receivedAt`: Unix timestamp of when this event was received by the relay
|
||||
* `sourceType`: Where this event came from. Typically will be `IP4` or `IP6`, but in lookback can also be `Import`, `Stream`, or `Sync`.
|
||||
* `sourceType`: The channel where this event came from: `IP4`, `IP6`, `Import`, `Stream`, or `Sync`.
|
||||
* `sourceInfo`: Specifics of the event's source. Either an IP address or a relay URL (for stream/sync)
|
||||
|
||||
|
||||
@ -52,10 +50,6 @@ Here is a simple example `whitelist.js` plugin that will reject all events excep
|
||||
rl.on('line', (line) => {
|
||||
let req = JSON.parse(line);
|
||||
|
||||
if (req.type === 'lookback') {
|
||||
return; // do nothing
|
||||
}
|
||||
|
||||
if (req.type !== 'new') {
|
||||
console.error("unexpected request type"); // will appear in strfry logs
|
||||
return;
|
||||
|
@ -27,12 +27,11 @@ struct PluginWritePolicy {
|
||||
struct RunningPlugin {
|
||||
pid_t pid;
|
||||
std::string currPluginPath;
|
||||
uint64_t lookbackSeconds;
|
||||
struct timespec lastModTime;
|
||||
FILE *r;
|
||||
FILE *w;
|
||||
|
||||
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath, uint64_t lookbackSeconds) : pid(pid), currPluginPath(currPluginPath), lookbackSeconds(lookbackSeconds) {
|
||||
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) {
|
||||
r = fdopen(rfd, "r");
|
||||
w = fdopen(wfd, "w");
|
||||
setlinebuf(w);
|
||||
@ -63,7 +62,7 @@ struct PluginWritePolicy {
|
||||
|
||||
try {
|
||||
if (running) {
|
||||
if (pluginPath != running->currPluginPath || cfg().relay__writePolicy__lookbackSeconds != running->lookbackSeconds) {
|
||||
if (pluginPath != running->currPluginPath) {
|
||||
running.reset();
|
||||
} else {
|
||||
struct stat statbuf;
|
||||
@ -76,7 +75,6 @@ struct PluginWritePolicy {
|
||||
|
||||
if (!running) {
|
||||
setupPlugin();
|
||||
sendLookbackEvents();
|
||||
}
|
||||
|
||||
auto request = tao::json::value({
|
||||
@ -176,40 +174,6 @@ struct PluginWritePolicy {
|
||||
auto ret = posix_spawn(&pid, path.c_str(), &file_actions, nullptr, argv, nullptr);
|
||||
if (ret) throw herr("posix_spawn failed to invoke '", path, "': ", strerror(errno));
|
||||
|
||||
running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), path, cfg().relay__writePolicy__lookbackSeconds);
|
||||
}
|
||||
|
||||
void sendLookbackEvents() {
|
||||
if (running->lookbackSeconds == 0) return;
|
||||
|
||||
Decompressor decomp;
|
||||
auto now = hoytech::curr_time_us();
|
||||
|
||||
uint64_t start = now - (running->lookbackSeconds * 1'000'000);
|
||||
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
env.generic_foreachFull(txn, env.dbi_Event__receivedAt, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
||||
if (lmdb::from_sv<uint64_t>(k) > now) return false;
|
||||
|
||||
auto ev = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
auto sourceType = (EventSourceType)ev.sourceType();
|
||||
std::string_view sourceInfo = ev.sourceInfo();
|
||||
|
||||
auto request = tao::json::value({
|
||||
{ "type", "lookback" },
|
||||
{ "event", tao::json::from_string(getEventJson(txn, decomp, ev.primaryKeyId)) },
|
||||
{ "receivedAt", ev.receivedAt() / 1000000 },
|
||||
{ "sourceType", eventSourceTypeToStr(sourceType) },
|
||||
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },
|
||||
});
|
||||
|
||||
std::string output = tao::json::to_string(request);
|
||||
output += "\n";
|
||||
|
||||
if (::fwrite(output.data(), 1, output.size(), running->w) != output.size()) throw herr("error writing to plugin");
|
||||
|
||||
return true;
|
||||
});
|
||||
running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), path);
|
||||
}
|
||||
};
|
||||
|
@ -52,9 +52,6 @@ config:
|
||||
- name: relay__writePolicy__plugin
|
||||
desc: "If non-empty, path to an executable script that implements the writePolicy plugin logic"
|
||||
default: ""
|
||||
- name: relay__writePolicy__lookbackSeconds
|
||||
desc: "Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback)"
|
||||
default: 0
|
||||
|
||||
- name: relay__compression__enabled
|
||||
desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU"
|
||||
|
@ -87,9 +87,6 @@ relay {
|
||||
writePolicy {
|
||||
# If non-empty, path to an executable script that implements the writePolicy plugin logic
|
||||
plugin = ""
|
||||
|
||||
# Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback)
|
||||
lookbackSeconds = 0
|
||||
}
|
||||
|
||||
compression {
|
||||
|
Reference in New Issue
Block a user