wip router

This commit is contained in:
Doug Hoyte
2023-08-17 16:21:56 -04:00
parent ac5d662038
commit e6f0c90aab
2 changed files with 90 additions and 14 deletions

View File

@ -21,7 +21,7 @@ struct EventStreamer : NonCopyable {
public:
EventStreamer(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : url(url), ws(url), dir(dir), filter(filter) {
EventStreamer(const std::string &url, const std::string &dir, tao::json::value filter_ = tao::json::empty_object) : url(url), ws(url), dir(dir), filter(filter_) {
filter["limit"] = 0;
}

View File

@ -8,6 +8,7 @@
#include "WriterPipeline.h"
#include "PluginWritePolicy.h"
#include "events.h"
#include "filters.h"
static const char USAGE[] =
@ -20,9 +21,14 @@ R"(
static std::unique_ptr<WriterPipeline> globalRouterWriter;
struct StreamGroup : NonCopyable {
std::string dir;
tao::json::value filter;
std::string filterStr;
NostrFilterGroup filterCompiled;
std::optional<PluginWritePolicy> pluginDown;
std::optional<PluginWritePolicy> pluginUp;
@ -30,9 +36,9 @@ struct StreamGroup : NonCopyable {
EventStreamer es;
std::thread t;
StreamerInstance(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : es(url, dir, filter) {
StreamerInstance(const std::string &url, const std::string &dir, tao::json::value filter) : es(url, dir, filter) {
es.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) {
LI << "GOT EVENT FROM " << es.url << " : " << evJson;
globalRouterWriter->write({ std::move(evJson), EventSourceType::Stream, es.url });
};
t = std::thread([this]{
@ -44,10 +50,21 @@ struct StreamGroup : NonCopyable {
std::map<std::string, StreamerInstance> streams; // url -> StreamerInstance
StreamGroup(const tao::config::value &spec) {
if (!spec.find("dir")) throw herr("no dir field");
dir = spec.at("dir").get_string();
tao::json::value filter = tao::json::empty_object;
// FIXME: Must be better way to go from config object to json, instead of round-trip through string
if (spec.find("filter")) filter = tao::json::from_string(tao::json::to_string(spec.at("filter")));
filterStr = tao::json::to_string(filter);
filterCompiled = NostrFilterGroup::unwrapped(filter);
if (!spec.find("urls")) throw herr("no urls field");
for (const auto &url : spec.at("urls").get_array()) {
streams.try_emplace(url.get_string(), url.get_string(), dir);
streams.try_emplace(url.get_string(), url.get_string(), dir, filter);
}
}
};
@ -60,26 +77,85 @@ void cmd_router(const std::vector<std::string> &subArgs) {
std::string routerConfigFile = args["<routerConfigFile>"].asString();
WriterPipeline writer;
globalRouterWriter = std::make_unique<WriterPipeline>();
Decompressor decomp;
std::mutex groupMutex;
std::map<std::string, StreamGroup> streamGroups; // group name -> StreamGroup
auto reconcileConfig = [&](const tao::config::value &routerConfig){
std::lock_guard<std::mutex> guard(groupMutex);
// Config
for (const auto &[k, v] : routerConfig.at("streams").get_object()) {
if (!streamGroups.contains(k)) {
LI << "New stream group [" << k << "]";
streamGroups.emplace(k, v);
auto reconcileConfig = [&]{
LI << "Loading router config file: " << routerConfigFile;
try {
auto routerConfig = loadRawTaoConfig(routerConfigFile);
std::lock_guard<std::mutex> guard(groupMutex);
for (const auto &[k, v] : routerConfig.at("streams").get_object()) {
if (!streamGroups.contains(k)) {
LI << "New stream group [" << k << "]";
streamGroups.emplace(k, v);
}
}
} catch (std::exception &e) {
LE << "Failed to parse router config: " << e.what();
return;
}
};
reconcileConfig(loadRawTaoConfig(routerConfigFile));
hoytech::file_change_monitor configFileWatcher(routerConfigFile);
reconcileConfig();
configFileWatcher.run(reconcileConfig);
// DB change monitor
uint64_t currEventId;
{
auto txn = env.txn_ro();
currEventId = getMostRecentLevId(txn);
}
hoytech::file_change_monitor dbChangeWatcher(dbDir + "/data.mdb");
dbChangeWatcher.setDebounce(100);
dbChangeWatcher.run([&](){
std::lock_guard<std::mutex> guard(groupMutex);
auto txn = env.txn_ro();
env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId;
std::string msg = std::string("[\"EVENT\",");
msg += getEventJson(txn, decomp, ev.primaryKeyId);
msg += "]";
auto msgPtr = std::make_shared<std::string>(std::move(msg));
{
for (auto &[groupName, streamGroup] : streamGroups) {
if (streamGroup.dir == "down") continue;
if (!streamGroup.filterCompiled.doesMatch(ev.flat_nested())) continue;
for (auto &[url, streamer] : streamGroup.streams) {
streamer.es.sendEvent(msgPtr);
streamer.es.trigger(); // FIXME: do once at end
}
}
}
return true;
}, false, currEventId + 1);
});
pause();
}