From ac5d66203832ffd0028408893b2c29c8f146404f Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Thu, 17 Aug 2023 00:51:36 -0400 Subject: [PATCH] wip router --- src/EventStreamer.h | 2 +- src/WSConnection.h | 2 +- src/apps/mesh/cmd_router.cpp | 58 ++++++++++++++++++++++++++++++++++-- strfry-router.conf | 11 +++++++ 4 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 strfry-router.conf diff --git a/src/EventStreamer.h b/src/EventStreamer.h index 9b50125..cda5541 100644 --- a/src/EventStreamer.h +++ b/src/EventStreamer.h @@ -7,7 +7,7 @@ #include "WSConnection.h" -struct EventStreamer { +struct EventStreamer : NonCopyable { std::string url; WSConnection ws; std::string dir; diff --git a/src/WSConnection.h b/src/WSConnection.h index c577174..fa73475 100644 --- a/src/WSConnection.h +++ b/src/WSConnection.h @@ -6,7 +6,7 @@ #include "golpe.h" -class WSConnection { +class WSConnection : NonCopyable { std::string url; uWS::Hub hub; diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp index b6ce30a..ef2c153 100644 --- a/src/apps/mesh/cmd_router.cpp +++ b/src/apps/mesh/cmd_router.cpp @@ -20,12 +20,66 @@ R"( +struct StreamGroup : NonCopyable { + std::string dir; + tao::json::value filter; + std::optional pluginDown; + std::optional pluginUp; + + struct StreamerInstance : NonCopyable { + EventStreamer es; + std::thread t; + + StreamerInstance(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : es(url, dir, filter) { + es.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) { + LI << "GOT EVENT FROM " << es.url << " : " << evJson; + }; + + t = std::thread([this]{ + es.run(); + }); + } + }; + + std::map streams; // url -> StreamerInstance + + StreamGroup(const tao::config::value &spec) { + dir = spec.at("dir").get_string(); + + for (const auto &url : spec.at("urls").get_array()) { + streams.try_emplace(url.get_string(), url.get_string(), dir); + } + } +}; + + + void cmd_router(const std::vector &subArgs) { std::map args = docopt::docopt(USAGE, subArgs, true, ""); std::string routerConfigFile = args[""].asString(); - tao::config::value configJson = loadRawTaoConfig(routerConfigFile); - std::cout << configJson << std::endl; + WriterPipeline writer; + Decompressor decomp; + + + std::mutex groupMutex; + std::map streamGroups; // group name -> StreamGroup + + + auto reconcileConfig = [&](const tao::config::value &routerConfig){ + std::lock_guard guard(groupMutex); + + for (const auto &[k, v] : routerConfig.at("streams").get_object()) { + if (!streamGroups.contains(k)) { + LI << "New stream group [" << k << "]"; + streamGroups.emplace(k, v); + } + } + }; + + reconcileConfig(loadRawTaoConfig(routerConfigFile)); + + pause(); } diff --git a/strfry-router.conf b/strfry-router.conf new file mode 100644 index 0000000..979466f --- /dev/null +++ b/strfry-router.conf @@ -0,0 +1,11 @@ +streams { + myPeers { + dir = "down" + pluginDown = "/path/to/plugin" + + urls = [ + "wss://nos.lol" + "wss://relayable.org" + ] + } +}