mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-18 17:27:11 +00:00
remove lookbehind feature from plugins
This commit is contained in:
@ -10,17 +10,15 @@ A plugin can be implemented in any programming language that supports reading li
|
|||||||
|
|
||||||
Whenever the script's modification-time changes, or the plugin settings in `strfry.conf` change, the plugin will be reloaded upon the next write attempt.
|
Whenever the script's modification-time changes, or the plugin settings in `strfry.conf` change, the plugin will be reloaded upon the next write attempt.
|
||||||
|
|
||||||
If configured, When a plugin is loaded some number of recently stored events will be sent to it as a "lookback". This is useful for populating the initial rate-limiting state. Plugins should print nothing in response to a lookback message.
|
|
||||||
|
|
||||||
|
|
||||||
## Input messages
|
## Input messages
|
||||||
|
|
||||||
Input messages contain the following keys:
|
Input messages contain the following keys:
|
||||||
|
|
||||||
* `type`: Either `new` or `lookback`
|
* `type`: Currently always `new`
|
||||||
* `event`: The event posted by the client, with all the required fields such as `id`, `pubkey`, etc
|
* `event`: The event posted by the client, with all the required fields such as `id`, `pubkey`, etc
|
||||||
* `receivedAt`: Unix timestamp of when this event was received by the relay
|
* `receivedAt`: Unix timestamp of when this event was received by the relay
|
||||||
* `sourceType`: Where this event came from. Typically will be `IP4` or `IP6`, but in lookback can also be `Import`, `Stream`, or `Sync`.
|
* `sourceType`: The channel where this event came from: `IP4`, `IP6`, `Import`, `Stream`, or `Sync`.
|
||||||
* `sourceInfo`: Specifics of the event's source. Either an IP address or a relay URL (for stream/sync)
|
* `sourceInfo`: Specifics of the event's source. Either an IP address or a relay URL (for stream/sync)
|
||||||
|
|
||||||
|
|
||||||
@ -52,10 +50,6 @@ Here is a simple example `whitelist.js` plugin that will reject all events excep
|
|||||||
rl.on('line', (line) => {
|
rl.on('line', (line) => {
|
||||||
let req = JSON.parse(line);
|
let req = JSON.parse(line);
|
||||||
|
|
||||||
if (req.type === 'lookback') {
|
|
||||||
return; // do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
if (req.type !== 'new') {
|
if (req.type !== 'new') {
|
||||||
console.error("unexpected request type"); // will appear in strfry logs
|
console.error("unexpected request type"); // will appear in strfry logs
|
||||||
return;
|
return;
|
||||||
|
@ -27,12 +27,11 @@ struct PluginWritePolicy {
|
|||||||
struct RunningPlugin {
|
struct RunningPlugin {
|
||||||
pid_t pid;
|
pid_t pid;
|
||||||
std::string currPluginPath;
|
std::string currPluginPath;
|
||||||
uint64_t lookbackSeconds;
|
|
||||||
struct timespec lastModTime;
|
struct timespec lastModTime;
|
||||||
FILE *r;
|
FILE *r;
|
||||||
FILE *w;
|
FILE *w;
|
||||||
|
|
||||||
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath, uint64_t lookbackSeconds) : pid(pid), currPluginPath(currPluginPath), lookbackSeconds(lookbackSeconds) {
|
RunningPlugin(pid_t pid, int rfd, int wfd, std::string currPluginPath) : pid(pid), currPluginPath(currPluginPath) {
|
||||||
r = fdopen(rfd, "r");
|
r = fdopen(rfd, "r");
|
||||||
w = fdopen(wfd, "w");
|
w = fdopen(wfd, "w");
|
||||||
setlinebuf(w);
|
setlinebuf(w);
|
||||||
@ -63,7 +62,7 @@ struct PluginWritePolicy {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
if (running) {
|
if (running) {
|
||||||
if (pluginPath != running->currPluginPath || cfg().relay__writePolicy__lookbackSeconds != running->lookbackSeconds) {
|
if (pluginPath != running->currPluginPath) {
|
||||||
running.reset();
|
running.reset();
|
||||||
} else {
|
} else {
|
||||||
struct stat statbuf;
|
struct stat statbuf;
|
||||||
@ -76,7 +75,6 @@ struct PluginWritePolicy {
|
|||||||
|
|
||||||
if (!running) {
|
if (!running) {
|
||||||
setupPlugin();
|
setupPlugin();
|
||||||
sendLookbackEvents();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto request = tao::json::value({
|
auto request = tao::json::value({
|
||||||
@ -176,40 +174,6 @@ struct PluginWritePolicy {
|
|||||||
auto ret = posix_spawn(&pid, path.c_str(), &file_actions, nullptr, argv, nullptr);
|
auto ret = posix_spawn(&pid, path.c_str(), &file_actions, nullptr, argv, nullptr);
|
||||||
if (ret) throw herr("posix_spawn failed to invoke '", path, "': ", strerror(errno));
|
if (ret) throw herr("posix_spawn failed to invoke '", path, "': ", strerror(errno));
|
||||||
|
|
||||||
running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), path, cfg().relay__writePolicy__lookbackSeconds);
|
running = make_unique<RunningPlugin>(pid, inPipe.saveFd(0), outPipe.saveFd(1), path);
|
||||||
}
|
|
||||||
|
|
||||||
void sendLookbackEvents() {
|
|
||||||
if (running->lookbackSeconds == 0) return;
|
|
||||||
|
|
||||||
Decompressor decomp;
|
|
||||||
auto now = hoytech::curr_time_us();
|
|
||||||
|
|
||||||
uint64_t start = now - (running->lookbackSeconds * 1'000'000);
|
|
||||||
|
|
||||||
auto txn = env.txn_ro();
|
|
||||||
|
|
||||||
env.generic_foreachFull(txn, env.dbi_Event__receivedAt, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
|
||||||
if (lmdb::from_sv<uint64_t>(k) > now) return false;
|
|
||||||
|
|
||||||
auto ev = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
|
||||||
auto sourceType = (EventSourceType)ev.sourceType();
|
|
||||||
std::string_view sourceInfo = ev.sourceInfo();
|
|
||||||
|
|
||||||
auto request = tao::json::value({
|
|
||||||
{ "type", "lookback" },
|
|
||||||
{ "event", tao::json::from_string(getEventJson(txn, decomp, ev.primaryKeyId)) },
|
|
||||||
{ "receivedAt", ev.receivedAt() / 1000000 },
|
|
||||||
{ "sourceType", eventSourceTypeToStr(sourceType) },
|
|
||||||
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },
|
|
||||||
});
|
|
||||||
|
|
||||||
std::string output = tao::json::to_string(request);
|
|
||||||
output += "\n";
|
|
||||||
|
|
||||||
if (::fwrite(output.data(), 1, output.size(), running->w) != output.size()) throw herr("error writing to plugin");
|
|
||||||
|
|
||||||
return true;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -52,9 +52,6 @@ config:
|
|||||||
- name: relay__writePolicy__plugin
|
- name: relay__writePolicy__plugin
|
||||||
desc: "If non-empty, path to an executable script that implements the writePolicy plugin logic"
|
desc: "If non-empty, path to an executable script that implements the writePolicy plugin logic"
|
||||||
default: ""
|
default: ""
|
||||||
- name: relay__writePolicy__lookbackSeconds
|
|
||||||
desc: "Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback)"
|
|
||||||
default: 0
|
|
||||||
|
|
||||||
- name: relay__compression__enabled
|
- name: relay__compression__enabled
|
||||||
desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU"
|
desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU"
|
||||||
|
@ -87,9 +87,6 @@ relay {
|
|||||||
writePolicy {
|
writePolicy {
|
||||||
# If non-empty, path to an executable script that implements the writePolicy plugin logic
|
# If non-empty, path to an executable script that implements the writePolicy plugin logic
|
||||||
plugin = ""
|
plugin = ""
|
||||||
|
|
||||||
# Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback)
|
|
||||||
lookbackSeconds = 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
compression {
|
compression {
|
||||||
|
Reference in New Issue
Block a user