diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp new file mode 100644 index 0000000..75d52b2 --- /dev/null +++ b/src/apps/mesh/cmd_router.cpp @@ -0,0 +1,449 @@ +#include +#include +#include +#include +#include + +#include "golpe.h" + +#include "WriterPipeline.h" +#include "PluginEventSifter.h" +#include "events.h" +#include "filters.h" + + +static const char USAGE[] = +R"( + Usage: + router + + Options: +)"; + + + +struct RouterEvent : NonCopyable { + struct ConfigFileChange { + }; + + struct DBChange { + }; + + struct ReconnectCron { + }; + + using Var = std::variant; + Var msg; + RouterEvent(Var &&msg_) : msg(std::move(msg_)) {} +}; + + +struct ConnDesignator { + std::string groupName; + std::string url; +}; + + + +struct Router { + struct StreamGroup : NonCopyable { + std::string groupName; + Router *router; + + std::string dir; + std::string filterStr; + std::string pluginDownCmd; + std::string pluginUpCmd; + std::vector urls; + + struct Connection { + uWS::WebSocket *ws = nullptr; + uint64_t started = 0; + + ~Connection() { + if (ws) { + ws->close(); + ws = nullptr; + } + } + }; + std::map conns; // url -> Connection + + tao::json::value filter; + NostrFilterGroup filterCompiled; + PluginEventSifter pluginDown; + PluginEventSifter pluginUp; + + StreamGroup(std::string groupName, Router *router) : groupName(groupName), router(router) { + } + + void configure(const tao::config::value &spec) { + bool needsReconnect = false; + + { + if (!spec.find("dir")) throw herr("no dir field"); + auto newDir = spec.at("dir").get_string(); + if (newDir != dir) needsReconnect = true; + + dir = newDir; + } + + { + tao::json::value newFilter = tao::json::empty_object; + // FIXME: Must be better way to go from config object to json, instead of round-trip through string + if (spec.find("filter")) newFilter = tao::json::from_string(tao::json::to_string(spec.at("filter"))); + + std::string newFilterStr = tao::json::to_string(newFilter); + if (newFilterStr != filterStr) needsReconnect = true; + + filterStr = newFilterStr; + filterCompiled = NostrFilterGroup::unwrapped(newFilter); + filter = newFilter; + } + + + pluginDownCmd = pluginUpCmd = ""; + if (spec.find("pluginDown")) pluginDownCmd = spec.at("pluginDown").get_string(); + if (spec.find("pluginUp")) pluginUpCmd = spec.at("pluginUp").get_string(); + + + if (!spec.find("urls")) throw herr("no urls field"); + urls.clear(); + for (const auto &url : spec.at("urls").get_array()) { + urls.push_back(url.get_string()); + } + + // Disconnect any urls that were deleted + + { + std::set unneededUrls; + for (auto &[url, c] : conns) unneededUrls.insert(url); + for (const auto &url : urls) unneededUrls.erase(url); + for (const auto &url : unneededUrls) conns.erase(url); + } + + + if (needsReconnect) { + for (auto &[url, c] : conns) { + if (c.ws) c.ws->close(); + } + conns.clear(); + } + + tryConnects(); + } + + + void tryConnects() { + for (const auto &url : urls) { + if (conns.find(url) == conns.end()) conns.try_emplace(url); + auto &c = conns.at(url); + + if (!c.ws && c.started + (router->connectionTimeoutUs * 2) < hoytech::curr_time_us()) { + LI << groupName << ": Connecting to " << url; + router->hub.connect(url, (void*)(new ConnDesignator(groupName, url)), {}, router->connectionTimeoutUs / 1'000, router->hubGroup); + c.started = hoytech::curr_time_us(); + } + } + } + + void connOpen(const std::string &url, uWS::WebSocket *ws) { + if (!conns.contains(url)) return; + auto &c = conns.at(url); + + if (c.ws) { + LI << "Already had open connection to " << url << ", closing"; + ws->close(); + return; + } + + c.ws = ws; + + if (dir == "down" || dir == "both") { + tao::json::value filterToSend = filter; + filterToSend["limit"] = 0; + + auto msg = tao::json::to_string(tao::json::value::array({ "REQ", "X", filterToSend })); + ws->send(msg.data(), msg.size(), uWS::OpCode::TEXT, nullptr, nullptr, true); + } + } + + void connClose(const std::string &url, uWS::WebSocket *ws) { + if (!conns.contains(url)) return; + auto &c = conns.at(url); + + if (c.ws == ws) { + c.ws = nullptr; + c.started = 0; + } + } + + void incomingEvent(const std::string &url, tao::json::value &evJson) { + if (dir == "up") return; + + std::string okMsg; + + auto res = pluginDown.acceptEvent(pluginDownCmd, evJson, hoytech::curr_time_s(), EventSourceType::Stream, url, okMsg); + if (res == PluginEventSifterResult::Accept) { + router->writer.write({ std::move(evJson), EventSourceType::Stream, url }); + } else { + LI << groupName << " / " << url << " : pluginDown blocked event " << evJson.at("id").get_string() << ": " << okMsg; + } + } + + void outgoingEvent(lmdb::txn &txn, const defaultDb::environment::View_Event &ev, std::string &responseStr, tao::json::value &evJson) { + if (dir == "down") return; + if (!filterCompiled.doesMatch(ev.flat_nested())) return; + + if (responseStr.size() == 0) { + auto evStr = getEventJson(txn, router->decomp, ev.primaryKeyId); + evJson = tao::json::from_string(evStr); + + responseStr = std::string("[\"EVENT\","); + responseStr += evStr; + responseStr += "]"; + } + + std::string okMsg; + + auto res = pluginUp.acceptEvent(pluginUpCmd, evJson, hoytech::curr_time_s(), EventSourceType::Stream, "", okMsg); + if (res == PluginEventSifterResult::Accept) { + for (auto &[url, c] : conns) { + if (c.ws) c.ws->send(responseStr.data(), responseStr.size(), uWS::OpCode::TEXT, nullptr, nullptr, true); + } + } else { + LI << groupName << " : pluginUp blocked event " << evJson.at("id").get_string() << ": " << okMsg; + } + } + }; + + std::string routerConfigFile; + uint64_t connectionTimeoutUs = 5'000'000; + + WriterPipeline writer; + Decompressor decomp; + hoytech::protected_queue inbox; + uWS::Hub hub; + uWS::Group *hubGroup = nullptr; + uS::Async *hubTrigger = nullptr; + + std::map streamGroups; // group name -> StreamGroup + uint64_t currEventId = 0; + bool firstConfigLoadSuccess = false; + + + Router(std::string routerConfigFile) : routerConfigFile(routerConfigFile) { + { + auto txn = env.txn_ro(); + currEventId = getMostRecentLevId(txn); + } + + hubGroup = hub.createGroup(uWS::PERMESSAGE_DEFLATE | uWS::SLIDING_DEFLATE_WINDOW); + + hubGroup->onConnection([&](uWS::WebSocket *ws, uWS::HttpRequest req) { + auto *desig = (ConnDesignator*) ws->getUserData(); + LI << desig->groupName << ": Connected to " << desig->url; + + if (!streamGroups.contains(desig->groupName)) { + // Connection to streamGroup that no longer exists + ws->close(); + return; + } + + streamGroups.at(desig->groupName).connOpen(desig->url, ws); + }); + + hubGroup->onDisconnection([&](uWS::WebSocket *ws, int code, char *message, size_t length) { + auto *desig = (ConnDesignator*) ws->getUserData(); + LI << desig->groupName << ": Disconnected from " << desig->url; + + if (streamGroups.contains(desig->groupName)) { + streamGroups.at(desig->groupName).connClose(desig->url, ws); + } + + delete desig; + }); + + hubGroup->onError([&](void *userData) { + auto *desig = (ConnDesignator*) userData; + LI << desig->groupName << ": Error connecting to " << desig->url; + + delete desig; + }); + + hubGroup->onMessage2([&](uWS::WebSocket *ws, char *message, size_t length, uWS::OpCode, size_t) { + auto *desig = (ConnDesignator*) ws->getUserData(); + + if (!streamGroups.contains(desig->groupName)) { + ws->close(); + return; + } + + try { + handleIncomingMessage(ws, desig, std::string_view(message, length)); + } catch (std::exception &e) { + LW << "Failed to handle incoming message config: " << e.what(); + } + }); + + reconcileConfig(); + } + + + void reconcileConfig() { + LI << "Loading router config file: " << routerConfigFile; + + try { + auto routerConfig = loadRawTaoConfig(routerConfigFile); + + for (const auto &[groupName, spec] : routerConfig.at("streams").get_object()) { + if (!streamGroups.contains(groupName)) { + LI << "New stream group [" << groupName << "]"; + streamGroups.try_emplace(groupName, groupName, this); + } + + streamGroups.at(groupName).configure(spec); + } + + // remove streamGroups if they were deleted from config + + { + std::set unneededGroups; + for (auto &[groupName, streamGroup] : streamGroups) unneededGroups.insert(groupName); + for (const auto &[groupName, spec] : routerConfig.at("streams").get_object()) unneededGroups.erase(groupName); + for (const auto &groupName : unneededGroups) streamGroups.erase(groupName); + } + } catch (std::exception &e) { + LE << "Failed to parse router config: " << e.what(); + if (!firstConfigLoadSuccess) ::exit(1); + return; + } + + firstConfigLoadSuccess = true; + } + + void onTrigger() { + auto newMsgs = inbox.pop_all_no_wait(); + + for (auto &newMsg : newMsgs) { + if (std::get_if(&newMsg.msg)) { + reconcileConfig(); + } else if (std::get_if(&newMsg.msg)) { + handleDBChange(); + } else if (std::get_if(&newMsg.msg)) { + for (auto &[groupName, streamGroup] : streamGroups) { + streamGroup.tryConnects(); + } + } + } + } + + void handleIncomingMessage(uWS::WebSocket *ws, ConnDesignator *desig, std::string_view msg) { + auto origJson = tao::json::from_string(msg); + + if (!origJson.is_array()) throw herr("not an array"); + if (origJson.get_array().size() < 2) throw herr("array too short"); + + auto &msgType = origJson.get_array().at(0); + + if (msgType == "EOSE") { + } else if (msgType == "NOTICE") { + LW << desig->groupName << " / " << desig->url << " NOTICE: " << tao::json::to_string(origJson); + } else if (msgType == "OK") { + if (!origJson.get_array().at(2).get_boolean()) { + LW << desig->groupName << " / " << desig->url << " Event not written: " << origJson; + } + } else if (msgType == "EVENT") { + if (origJson.get_array().size() < 3) throw herr("array too short"); + auto &evJson = origJson.at(2); + streamGroups.at(desig->groupName).incomingEvent(desig->url, evJson); + } else { + LW << "Unexpected message: " << origJson; + } + } + + void handleDBChange() { + auto txn = env.txn_ro(); + + env.foreach_Event(txn, [&](auto &ev){ + currEventId = ev.primaryKeyId; + + std::string responseStr; + tao::json::value json = tao::json::null; + + for (auto &[groupName, streamGroup] : streamGroups) { + streamGroup.outgoingEvent(txn, ev, responseStr, json); + } + + return true; + }, false, currEventId + 1); + } + + void run() { + // Trigger + + hubTrigger = new uS::Async(hub.getLoop()); + + std::function asyncCb = [&]{ + onTrigger(); + }; + + hubTrigger->setData(&asyncCb); + + hubTrigger->start([](uS::Async *a){ + auto *r = static_cast *>(a->data); + (*r)(); + }); + + // Config file change monitor + + hoytech::file_change_monitor configFileWatcher(routerConfigFile); + + configFileWatcher.run([&](){ + inbox.push_move(RouterEvent{RouterEvent::ConfigFileChange{}}); + hubTrigger->send(); + }); + + + // DB change monitor + + hoytech::file_change_monitor dbChangeWatcher(dbDir + "/data.mdb"); + + dbChangeWatcher.setDebounce(100); + + dbChangeWatcher.run([&](){ + inbox.push_move(RouterEvent{RouterEvent::DBChange{}}); + hubTrigger->send(); + }); + + + // Reconnection timer + + hoytech::timer cron; + + cron.setupCb = []{ setThreadName("cron"); }; + + cron.repeat(connectionTimeoutUs, [&]{ + inbox.push_move(RouterEvent{RouterEvent::ReconnectCron{}}); + hubTrigger->send(); + }); + + cron.run(); + + + // Websocket + + hub.run(); + } +}; + + +void cmd_router(const std::vector &subArgs) { + std::map args = docopt::docopt(USAGE, subArgs, true, ""); + + std::string routerConfigFile = args[""].asString(); + + Router router(routerConfigFile); + + router.run(); +}