diff --git a/src/EventStreamer.h b/src/EventStreamer.h index cda5541..44d9afe 100644 --- a/src/EventStreamer.h +++ b/src/EventStreamer.h @@ -21,7 +21,7 @@ struct EventStreamer : NonCopyable { public: - EventStreamer(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : url(url), ws(url), dir(dir), filter(filter) { + EventStreamer(const std::string &url, const std::string &dir, tao::json::value filter_ = tao::json::empty_object) : url(url), ws(url), dir(dir), filter(filter_) { filter["limit"] = 0; } diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp index ef2c153..8501581 100644 --- a/src/apps/mesh/cmd_router.cpp +++ b/src/apps/mesh/cmd_router.cpp @@ -8,6 +8,7 @@ #include "WriterPipeline.h" #include "PluginWritePolicy.h" #include "events.h" +#include "filters.h" static const char USAGE[] = @@ -20,9 +21,14 @@ R"( +static std::unique_ptr globalRouterWriter; + + struct StreamGroup : NonCopyable { std::string dir; - tao::json::value filter; + std::string filterStr; + NostrFilterGroup filterCompiled; + std::optional pluginDown; std::optional pluginUp; @@ -30,9 +36,9 @@ struct StreamGroup : NonCopyable { EventStreamer es; std::thread t; - StreamerInstance(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : es(url, dir, filter) { + StreamerInstance(const std::string &url, const std::string &dir, tao::json::value filter) : es(url, dir, filter) { es.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) { - LI << "GOT EVENT FROM " << es.url << " : " << evJson; + globalRouterWriter->write({ std::move(evJson), EventSourceType::Stream, es.url }); }; t = std::thread([this]{ @@ -44,10 +50,21 @@ struct StreamGroup : NonCopyable { std::map streams; // url -> StreamerInstance StreamGroup(const tao::config::value &spec) { + if (!spec.find("dir")) throw herr("no dir field"); dir = spec.at("dir").get_string(); + + tao::json::value filter = tao::json::empty_object; + // FIXME: Must be better way to go from config object to json, instead of round-trip through string + if (spec.find("filter")) filter = tao::json::from_string(tao::json::to_string(spec.at("filter"))); + + filterStr = tao::json::to_string(filter); + filterCompiled = NostrFilterGroup::unwrapped(filter); + + + if (!spec.find("urls")) throw herr("no urls field"); for (const auto &url : spec.at("urls").get_array()) { - streams.try_emplace(url.get_string(), url.get_string(), dir); + streams.try_emplace(url.get_string(), url.get_string(), dir, filter); } } }; @@ -60,26 +77,85 @@ void cmd_router(const std::vector &subArgs) { std::string routerConfigFile = args[""].asString(); - WriterPipeline writer; + globalRouterWriter = std::make_unique(); Decompressor decomp; - std::mutex groupMutex; std::map streamGroups; // group name -> StreamGroup - auto reconcileConfig = [&](const tao::config::value &routerConfig){ - std::lock_guard guard(groupMutex); + // Config - for (const auto &[k, v] : routerConfig.at("streams").get_object()) { - if (!streamGroups.contains(k)) { - LI << "New stream group [" << k << "]"; - streamGroups.emplace(k, v); + auto reconcileConfig = [&]{ + LI << "Loading router config file: " << routerConfigFile; + + try { + auto routerConfig = loadRawTaoConfig(routerConfigFile); + + std::lock_guard guard(groupMutex); + + for (const auto &[k, v] : routerConfig.at("streams").get_object()) { + if (!streamGroups.contains(k)) { + LI << "New stream group [" << k << "]"; + streamGroups.emplace(k, v); + } } + } catch (std::exception &e) { + LE << "Failed to parse router config: " << e.what(); + return; } }; - reconcileConfig(loadRawTaoConfig(routerConfigFile)); + hoytech::file_change_monitor configFileWatcher(routerConfigFile); + + reconcileConfig(); + + configFileWatcher.run(reconcileConfig); + + + // DB change monitor + + uint64_t currEventId; + + { + auto txn = env.txn_ro(); + currEventId = getMostRecentLevId(txn); + } + + hoytech::file_change_monitor dbChangeWatcher(dbDir + "/data.mdb"); + + dbChangeWatcher.setDebounce(100); + + dbChangeWatcher.run([&](){ + std::lock_guard guard(groupMutex); + + auto txn = env.txn_ro(); + + env.foreach_Event(txn, [&](auto &ev){ + currEventId = ev.primaryKeyId; + + std::string msg = std::string("[\"EVENT\","); + msg += getEventJson(txn, decomp, ev.primaryKeyId); + msg += "]"; + + auto msgPtr = std::make_shared(std::move(msg)); + + { + for (auto &[groupName, streamGroup] : streamGroups) { + if (streamGroup.dir == "down") continue; + if (!streamGroup.filterCompiled.doesMatch(ev.flat_nested())) continue; + + for (auto &[url, streamer] : streamGroup.streams) { + streamer.es.sendEvent(msgPtr); + streamer.es.trigger(); // FIXME: do once at end + } + } + } + + return true; + }, false, currEventId + 1); + }); + pause(); }