diff --git a/src/EventStreamer.h b/src/EventStreamer.h new file mode 100644 index 0000000..9b50125 --- /dev/null +++ b/src/EventStreamer.h @@ -0,0 +1,85 @@ +#pragma once + +#include + +#include "golpe.h" + +#include "WSConnection.h" + + +struct EventStreamer { + std::string url; + WSConnection ws; + std::string dir; + tao::json::value filter; + + std::function onEvent; + + private: + + hoytech::protected_queue> inbox; + + public: + + EventStreamer(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : url(url), ws(url), dir(dir), filter(filter) { + filter["limit"] = 0; + } + + void sendEvent(std::shared_ptr msg) { + inbox.push_move(std::move(msg)); + } + + void trigger() { + ws.trigger(); + } + + void run() { + ws.onConnect = [&]{ + if (dir == "down" || dir == "both") { + auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "X", filter })); + ws.send(encoded); + } + }; + + ws.onMessage = [&](auto msg, uWS::OpCode, size_t){ + auto origJson = tao::json::from_string(msg); + + if (origJson.is_array()) { + if (origJson.get_array().size() < 2) throw herr("array too short"); + + auto &msgType = origJson.get_array().at(0); + if (msgType == "EOSE") { + return; + } else if (msgType == "NOTICE") { + LW << "NOTICE message from " << url << " : " << tao::json::to_string(origJson); + return; + } else if (msgType == "OK") { + if (!origJson.get_array().at(2).get_boolean()) { + LW << "Event not written by " << url << " : " << origJson; + } + } else if (msgType == "EVENT") { + if (dir == "down" || dir == "both") { + if (origJson.get_array().size() < 3) throw herr("array too short"); + auto &evJson = origJson.at(2); + + // FIXME: validate that the event actually matches provided filter? + if (onEvent) onEvent(std::move(evJson), ws); + } else { + LW << "Unexpected EVENT from " << url; + } + } else { + throw herr("unexpected first element"); + } + } else { + throw herr("unexpected message"); + } + }; + + ws.onTrigger = [&]{ + auto msgs = inbox.pop_all(); + for (auto &msg : msgs) ws.send(*msg); + }; + + ws.run(); + } +}; diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp new file mode 100644 index 0000000..b6ce30a --- /dev/null +++ b/src/apps/mesh/cmd_router.cpp @@ -0,0 +1,31 @@ +#include +#include +#include + +#include "golpe.h" + +#include "EventStreamer.h" +#include "WriterPipeline.h" +#include "PluginWritePolicy.h" +#include "events.h" + + +static const char USAGE[] = +R"( + Usage: + router + + Options: +)"; + + + +void cmd_router(const std::vector &subArgs) { + std::map args = docopt::docopt(USAGE, subArgs, true, ""); + + std::string routerConfigFile = args[""].asString(); + + tao::config::value configJson = loadRawTaoConfig(routerConfigFile); + + std::cout << configJson << std::endl; +} diff --git a/src/apps/mesh/cmd_stream.cpp b/src/apps/mesh/cmd_stream.cpp index 9babaab..250363e 100644 --- a/src/apps/mesh/cmd_stream.cpp +++ b/src/apps/mesh/cmd_stream.cpp @@ -1,15 +1,13 @@ #include #include -#include #include #include "golpe.h" +#include "EventStreamer.h" #include "WriterPipeline.h" -#include "Subscription.h" -#include "WSConnection.h" -#include "events.h" #include "PluginWritePolicy.h" +#include "events.h" static const char USAGE[] = @@ -23,85 +21,6 @@ R"( -struct EventStreamer { - std::string url; - WSConnection ws; - std::string dir; - tao::json::value filter; - - std::function onEvent; - - private: - - hoytech::protected_queue> inbox; - - public: - - EventStreamer(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : url(url), ws(url), dir(dir), filter(filter) { - filter["limit"] = 0; - } - - void sendEvent(std::shared_ptr msg) { - inbox.push_move(std::move(msg)); - } - - void trigger() { - ws.trigger(); - } - - void run() { - ws.onConnect = [&]{ - if (dir == "down" || dir == "both") { - auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "X", filter })); - ws.send(encoded); - } - }; - - ws.onMessage = [&](auto msg, uWS::OpCode, size_t){ - auto origJson = tao::json::from_string(msg); - - if (origJson.is_array()) { - if (origJson.get_array().size() < 2) throw herr("array too short"); - - auto &msgType = origJson.get_array().at(0); - if (msgType == "EOSE") { - return; - } else if (msgType == "NOTICE") { - LW << "NOTICE message from " << url << " : " << tao::json::to_string(origJson); - return; - } else if (msgType == "OK") { - if (!origJson.get_array().at(2).get_boolean()) { - LW << "Event not written by " << url << " : " << origJson; - } - } else if (msgType == "EVENT") { - if (dir == "down" || dir == "both") { - if (origJson.get_array().size() < 3) throw herr("array too short"); - auto &evJson = origJson.at(2); - - // FIXME: validate that the event actually matches provided filter? - if (onEvent) onEvent(std::move(evJson), ws); - } else { - LW << "Unexpected EVENT from " << url; - } - } else { - throw herr("unexpected first element"); - } - } else { - throw herr("unexpected message"); - } - }; - - ws.onTrigger = [&]{ - auto msgs = inbox.pop_all(); - for (auto &msg : msgs) ws.send(*msg); - }; - - ws.run(); - } -}; - - - void cmd_stream(const std::vector &subArgs) { std::map args = docopt::docopt(USAGE, subArgs, true, "");