diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp index 6cfa225..5f4f119 100644 --- a/src/apps/mesh/cmd_router.cpp +++ b/src/apps/mesh/cmd_router.cpp @@ -38,7 +38,14 @@ struct IncomingEvent : NonCopyable { struct Shutdown { }; - using Var = std::variant; + // Only for parameters that can be changed without rebuilding group + struct ConfigUpdate { + std::string pluginDownCmd; + std::string pluginUpCmd; + tao::config::value urls; + }; + + using Var = std::variant; Var msg; IncomingEvent(Var &&msg_) : msg(std::move(msg_)) {} }; @@ -73,19 +80,20 @@ struct StreamGroup : NonCopyable { std::string pluginUpCmd; std::map streams; // url -> StreamerInstance - std::thread t; - hoytech::protected_queue inbox; - + tao::json::value filter; NostrFilterGroup filterCompiled; PluginEventSifter pluginDown; PluginEventSifter pluginUp; + std::thread t; + hoytech::protected_queue inbox; + StreamGroup(std::string groupName, const tao::config::value &spec) : groupName(groupName) { if (!spec.find("dir")) throw herr("no dir field"); dir = spec.at("dir").get_string(); - tao::json::value filter = tao::json::empty_object; + filter = tao::json::empty_object; // FIXME: Must be better way to go from config object to json, instead of round-trip through string if (spec.find("filter")) filter = tao::json::from_string(tao::json::to_string(spec.at("filter"))); @@ -102,14 +110,17 @@ struct StreamGroup : NonCopyable { streams.try_emplace(url.get_string(), inbox, url.get_string(), dir, filter); } - t = std::thread([this]{ while (1) { auto newMsgs = inbox.pop_all(); for (auto &m : newMsgs) { if (std::get_if(&m.msg)) return; - handleIncomingEvent(m); + try { + handleIncomingEvent(m); + } catch (std::exception &e) { + LE << "Got exception processing message for streamGroup: " << e.what(); + } } } }); @@ -151,6 +162,28 @@ struct StreamGroup : NonCopyable { } else { LI << "[" << groupName << "] pluginUp blocked event " << ev->evJson->at("id").get_string() << ": " << okMsg; } + } else if (auto c = std::get_if(&m.msg)) { + pluginDownCmd = c->pluginDownCmd; + pluginUpCmd = c->pluginUpCmd; + + std::set newUrls; + for (auto &url : c->urls.get_array()) newUrls.insert(url.get_string()); + + for (auto &url : newUrls) { + if (!streams.contains(url)) { + streams.try_emplace(url, inbox, url, dir, filter); + } + } + + std::vector toErase; + + for (auto &[url, v] : streams) { + if (!newUrls.contains(url)) toErase.push_back(url); + } + + for (auto &url : toErase) { + streams.erase(url); + } } } }; @@ -182,10 +215,30 @@ void cmd_router(const std::vector &subArgs) { std::lock_guard guard(groupsMutex); - for (const auto &[k, v] : routerConfig.at("streams").get_object()) { - if (!streamGroups.contains(k)) { - LI << "New stream group [" << k << "]"; - streamGroups.try_emplace(k, k, v); + for (const auto &[groupName, spec] : routerConfig.at("streams").get_object()) { + if (streamGroups.contains(groupName)) { + auto &oldGroup = streamGroups.at(groupName); + + if (spec.at("dir").get_string() != oldGroup.dir || tao::json::to_string(spec.at("filter")) != oldGroup.filterStr) { + // Need to restart group + + streamGroups.erase(groupName); + } else { + // No restart of group required + + oldGroup.inbox.push_move(IncomingEvent{IncomingEvent::ConfigUpdate{ + spec.get_object().contains("pluginDown") ? spec.at("pluginDown").get_string() : "", + spec.get_object().contains("pluginUp") ? spec.at("pluginUp").get_string() : "", + spec.at("urls") + }}); + } + } + } + + for (const auto &[groupName, spec] : routerConfig.at("streams").get_object()) { + if (!streamGroups.contains(groupName)) { + LI << "New stream group [" << groupName << "]"; + streamGroups.try_emplace(groupName, groupName, spec); } } } catch (std::exception &e) {