diff --git a/src/EventStreamer.h b/src/EventStreamer.h index 44d9afe..6af81ed 100644 --- a/src/EventStreamer.h +++ b/src/EventStreamer.h @@ -13,7 +13,7 @@ struct EventStreamer : NonCopyable { std::string dir; tao::json::value filter; - std::function onEvent; + std::function onIncomingEvent; private: @@ -33,6 +33,10 @@ struct EventStreamer : NonCopyable { ws.trigger(); } + void close() { + ws.close(); + } + void run() { ws.onConnect = [&]{ if (dir == "down" || dir == "both") { @@ -63,7 +67,7 @@ struct EventStreamer : NonCopyable { auto &evJson = origJson.at(2); // FIXME: validate that the event actually matches provided filter? - if (onEvent) onEvent(std::move(evJson), ws); + if (onIncomingEvent) onIncomingEvent(std::move(evJson)); } else { LW << "Unexpected EVENT from " << url; } diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp index 192a90b..6cfa225 100644 --- a/src/apps/mesh/cmd_router.cpp +++ b/src/apps/mesh/cmd_router.cpp @@ -24,32 +24,63 @@ R"( static std::unique_ptr globalRouterWriter; -struct StreamGroup : NonCopyable { - std::string dir; - std::string filterStr; - NostrFilterGroup filterCompiled; - - std::optional pluginDown; - std::optional pluginUp; - - struct StreamerInstance : NonCopyable { - EventStreamer es; - std::thread t; - - StreamerInstance(const std::string &url, const std::string &dir, tao::json::value filter) : es(url, dir, filter) { - es.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) { - globalRouterWriter->write({ std::move(evJson), EventSourceType::Stream, es.url }); - }; - - t = std::thread([this]{ - es.run(); - }); - } +struct IncomingEvent : NonCopyable { + struct Down { + tao::json::value evJson; + std::string url; }; + struct Up { + std::shared_ptr evStr; + std::shared_ptr evJson; + }; + + struct Shutdown { + }; + + using Var = std::variant; + Var msg; + IncomingEvent(Var &&msg_) : msg(std::move(msg_)) {} +}; + +struct StreamerInstance : NonCopyable { + hoytech::protected_queue &inbox; + EventStreamer es; + std::thread t; + + StreamerInstance(hoytech::protected_queue &inbox, const std::string &url, const std::string &dir, tao::json::value filter) : inbox(inbox), es(url, dir, filter) { + es.onIncomingEvent = [&](tao::json::value &&evJson) { + inbox.push_move(IncomingEvent{IncomingEvent::Down{ std::move(evJson), es.url }}); + }; + + t = std::thread([this]{ + es.run(); + }); + } + + ~StreamerInstance() { + es.close(); + t.join(); + } +}; + +struct StreamGroup : NonCopyable { + std::string groupName; + + std::string dir; + std::string filterStr; + std::string pluginDownCmd; + std::string pluginUpCmd; std::map streams; // url -> StreamerInstance - StreamGroup(const tao::config::value &spec) { + std::thread t; + hoytech::protected_queue inbox; + + NostrFilterGroup filterCompiled; + PluginEventSifter pluginDown; + PluginEventSifter pluginUp; + + StreamGroup(std::string groupName, const tao::config::value &spec) : groupName(groupName) { if (!spec.find("dir")) throw herr("no dir field"); dir = spec.at("dir").get_string(); @@ -62,9 +93,64 @@ struct StreamGroup : NonCopyable { filterCompiled = NostrFilterGroup::unwrapped(filter); + if (spec.find("pluginDown")) pluginDownCmd = spec.at("pluginDown").get_string(); + if (spec.find("pluginUp")) pluginUpCmd = spec.at("pluginUp").get_string(); + + if (!spec.find("urls")) throw herr("no urls field"); for (const auto &url : spec.at("urls").get_array()) { - streams.try_emplace(url.get_string(), url.get_string(), dir, filter); + streams.try_emplace(url.get_string(), inbox, url.get_string(), dir, filter); + } + + + t = std::thread([this]{ + while (1) { + auto newMsgs = inbox.pop_all(); + + for (auto &m : newMsgs) { + if (std::get_if(&m.msg)) return; + handleIncomingEvent(m); + } + } + }); + } + + void sendEvent(std::shared_ptr evStr, std::shared_ptr evJson) { + inbox.push_move(IncomingEvent{IncomingEvent::Up{ std::move(evStr), std::move(evJson) }}); + } + + ~StreamGroup() { + inbox.push_move(IncomingEvent{IncomingEvent::Shutdown{}}); + t.join(); + } + + private: + void handleIncomingEvent(IncomingEvent &m) { + if (auto ev = std::get_if(&m.msg)) { + if (dir == "up") return; + + std::string okMsg; + + auto res = pluginDown.acceptEvent(pluginDownCmd, ev->evJson, hoytech::curr_time_s(), EventSourceType::Stream, ev->url, okMsg); + if (res == PluginEventSifterResult::Accept) { + globalRouterWriter->write({ std::move(ev->evJson), EventSourceType::Stream, ev->url }); + } else { + LI << "[" << groupName << "] " << ev->url << ": pluginDown blocked event " << ev->evJson.at("id").get_string() << ": " << okMsg; + } + } else if (auto ev = std::get_if(&m.msg)) { + if (dir == "down") return; + + std::string okMsg; + + auto res = pluginUp.acceptEvent(pluginUpCmd, ev->evJson, hoytech::curr_time_s(), EventSourceType::Stream, "", okMsg); + if (res == PluginEventSifterResult::Accept) { + for (auto &[url, streamer] : streams) { + streamer.es.sendEvent(ev->evStr); + streamer.es.trigger(); + } + } else { + LI << "[" << groupName << "] pluginUp blocked event " << ev->evJson->at("id").get_string() << ": " << okMsg; + } } } }; @@ -80,30 +166,35 @@ void cmd_router(const std::vector &subArgs) { globalRouterWriter = std::make_unique(); Decompressor decomp; - std::mutex groupMutex; + std::mutex groupsMutex; std::map streamGroups; // group name -> StreamGroup // Config + bool configLoadSuccess = false; + auto reconcileConfig = [&]{ LI << "Loading router config file: " << routerConfigFile; try { auto routerConfig = loadRawTaoConfig(routerConfigFile); - std::lock_guard guard(groupMutex); + std::lock_guard guard(groupsMutex); for (const auto &[k, v] : routerConfig.at("streams").get_object()) { if (!streamGroups.contains(k)) { LI << "New stream group [" << k << "]"; - streamGroups.emplace(k, v); + streamGroups.try_emplace(k, k, v); } } } catch (std::exception &e) { LE << "Failed to parse router config: " << e.what(); + if (!configLoadSuccess) ::exit(1); return; } + + configLoadSuccess = true; }; hoytech::file_change_monitor configFileWatcher(routerConfigFile); @@ -127,28 +218,26 @@ void cmd_router(const std::vector &subArgs) { dbChangeWatcher.setDebounce(100); dbChangeWatcher.run([&](){ - std::lock_guard guard(groupMutex); + std::lock_guard guard(groupsMutex); auto txn = env.txn_ro(); env.foreach_Event(txn, [&](auto &ev){ currEventId = ev.primaryKeyId; + auto evStr = getEventJson(txn, decomp, ev.primaryKeyId); + std::string msg = std::string("[\"EVENT\","); - msg += getEventJson(txn, decomp, ev.primaryKeyId); + msg += evStr; msg += "]"; auto msgPtr = std::make_shared(std::move(msg)); + auto jsonPtr = std::make_shared(tao::json::from_string(evStr)); { for (auto &[groupName, streamGroup] : streamGroups) { - if (streamGroup.dir == "down") continue; - if (!streamGroup.filterCompiled.doesMatch(ev.flat_nested())) continue; - - for (auto &[url, streamer] : streamGroup.streams) { - streamer.es.sendEvent(msgPtr); - streamer.es.trigger(); // FIXME: do once at end - } + if (!streamGroup.filterCompiled.doesMatch(ev.flat_nested())) continue; // OK to access streamGroup innards because mutex + streamGroup.sendEvent(msgPtr, jsonPtr); } } diff --git a/src/apps/mesh/cmd_stream.cpp b/src/apps/mesh/cmd_stream.cpp index 9b64812..c7a0e28 100644 --- a/src/apps/mesh/cmd_stream.cpp +++ b/src/apps/mesh/cmd_stream.cpp @@ -35,14 +35,14 @@ void cmd_stream(const std::vector &subArgs) { Decompressor decomp; PluginEventSifter writePolicyPlugin; - streamer.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) { + streamer.onIncomingEvent = [&](tao::json::value &&evJson) { std::string okMsg; - auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg); + auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Stream, url, okMsg); if (res == PluginEventSifterResult::Accept) { downloadedIds.emplace(from_hex(evJson.at("id").get_string())); writer.write({ std::move(evJson), EventSourceType::Stream, url }); } else { - LI << "[" << ws.remoteAddr << "] write policy blocked event from " << url << " : " << evJson.at("id").get_string() << " -> " << okMsg; + LI << "write policy blocked event from " << url << " : " << evJson.at("id").get_string() << " -> " << okMsg; } }; @@ -78,7 +78,6 @@ void cmd_stream(const std::vector &subArgs) { msg += "]"; auto msgPtr = std::make_shared(std::move(msg)); - streamer.sendEvent(msgPtr); return true;