diff --git a/golpe.yaml b/golpe.yaml index f959654..97180df 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -31,7 +31,9 @@ tables: type: ubytes indices: - created_at: + createdAt: + integer: true + receivedAt: integer: true id: comparator: StringUint64 @@ -54,8 +56,9 @@ tables: indexPrelude: | auto *flat = v.flat_nested(); - created_at = flat->created_at(); - uint64_t indexTime = *created_at; + createdAt = flat->created_at(); + uint64_t indexTime = *createdAt; + receivedAt = v.receivedAt(); id = makeKey_StringUint64(sv(flat->id()), indexTime); pubkey = makeKey_StringUint64(sv(flat->pubkey()), indexTime); @@ -164,11 +167,11 @@ config: default: 20 - name: relay__writePolicy__plugin - desc: "" + desc: "If non-empty, path to an executable script that implements the writePolicy plugin logic" default: "" - name: relay__writePolicy__lookbackSeconds - desc: "" - default: 21600 + desc: "Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback)" + default: 0 - name: relay__compression__enabled desc: "Use permessage-deflate compression if supported by client. Reduces bandwidth, but slight increase in CPU" diff --git a/src/DBScan.h b/src/DBScan.h index b463bd5..63a964d 100644 --- a/src/DBScan.h +++ b/src/DBScan.h @@ -187,7 +187,7 @@ struct DBScan { } else { scanState = CreatedAtScan{}; auto *state = std::get_if(&scanState); - indexDbi = env.dbi_Event__created_at; + indexDbi = env.dbi_Event__createdAt; isComplete = [&, state]{ return state->done; diff --git a/src/PluginWritePolicy.h b/src/PluginWritePolicy.h index a274c4f..76fb8a7 100644 --- a/src/PluginWritePolicy.h +++ b/src/PluginWritePolicy.h @@ -73,14 +73,15 @@ struct PluginWritePolicy { } } - if (!running) setupPlugin(); - - auto json = tao::json::from_string(jsonStr); + if (!running) { + setupPlugin(); + sendLookbackEvents(); + } auto request = tao::json::value({ { "type", "new" }, - { "event", json }, - { "receivedAt", receivedAt }, + { "event", tao::json::from_string(jsonStr) }, + { "receivedAt", receivedAt / 1000000 }, { "sourceType", eventSourceTypeToStr(sourceType) }, { "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo }, }); @@ -88,7 +89,7 @@ struct PluginWritePolicy { std::string output = tao::json::to_string(request); output += "\n"; - ::fwrite(output.data(), output.size(), 1, running->w); + if (::fwrite(output.data(), 1, output.size(), running->w) != output.size()) throw herr("error writing to plugin"); tao::json::value response; @@ -102,7 +103,8 @@ struct PluginWritePolicy { LW << "Got unparseable line from write policy plugin: " << buf; continue; } - // FIXME: verify id + + if (response.at("id").get_string() != request.at("event").at("id").get_string()) throw herr("id mismatch"); break; } @@ -175,4 +177,40 @@ struct PluginWritePolicy { running = make_unique(pid, inPipe.saveFd(0), outPipe.saveFd(1), path); } + + void sendLookbackEvents() { + if (cfg().relay__writePolicy__lookbackSeconds == 0) return; + + Decompressor decomp; + auto now = hoytech::curr_time_us(); + + uint64_t start = now - (cfg().relay__writePolicy__lookbackSeconds * 1'000'000); + + auto txn = env.txn_ro(); + + env.generic_foreachFull(txn, env.dbi_Event__receivedAt, lmdb::to_sv(start), lmdb::to_sv(0), [&](auto k, auto v) { + if (lmdb::from_sv(k) > now) return false; + + auto ev = env.lookup_Event(txn, lmdb::from_sv(v)); + if (!ev) throw herr("unable to look up event, corrupt DB?"); + + auto sourceType = (EventSourceType)ev->sourceType(); + std::string_view sourceInfo = ev->sourceInfo(); + + auto request = tao::json::value({ + { "type", "lookback" }, + { "event", tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId)) }, + { "receivedAt", ev->receivedAt() / 1000000 }, + { "sourceType", eventSourceTypeToStr(sourceType) }, + { "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo }, + }); + + std::string output = tao::json::to_string(request); + output += "\n"; + + if (::fwrite(output.data(), 1, output.size(), running->w) != output.size()) throw herr("error writing to plugin"); + + return true; + }); + } }; diff --git a/src/cmd_export.cpp b/src/cmd_export.cpp index 90e94fd..8dc1b88 100644 --- a/src/cmd_export.cpp +++ b/src/cmd_export.cpp @@ -24,7 +24,7 @@ void cmd_export(const std::vector &subArgs) { auto txn = env.txn_ro(); - env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv(since), lmdb::to_sv(0), [&](auto k, auto v) { + env.generic_foreachFull(txn, env.dbi_Event__createdAt, lmdb::to_sv(since), lmdb::to_sv(0), [&](auto k, auto v) { if (lmdb::from_sv(k) > until) return false; auto view = env.lookup_Event(txn, lmdb::from_sv(v)); diff --git a/strfry.conf b/strfry.conf index d6d440e..0b909e7 100644 --- a/strfry.conf +++ b/strfry.conf @@ -23,7 +23,7 @@ relay { # Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required) nofiles = 1000000 - # HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (case-insensitive) + # HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case) realIpHeader = "" info { @@ -59,9 +59,11 @@ relay { maxSubsPerConnection = 20 writePolicy { + # If non-empty, path to an executable script that implements the writePolicy plugin logic plugin = "" - lookbackSeconds = 21600 + # Number of seconds to search backwards for lookback events when starting the writePolicy plugin (0 for no lookback) + lookbackSeconds = 0 } compression {