wip reconfig

This commit is contained in:
Doug Hoyte
2023-08-18 22:40:50 -04:00
parent 9e07e85376
commit ca12bdbd68

View File

@ -38,7 +38,14 @@ struct IncomingEvent : NonCopyable {
struct Shutdown {
};
using Var = std::variant<Down, Up, Shutdown>;
// Only for parameters that can be changed without rebuilding group
struct ConfigUpdate {
std::string pluginDownCmd;
std::string pluginUpCmd;
tao::config::value urls;
};
using Var = std::variant<Down, Up, Shutdown, ConfigUpdate>;
Var msg;
IncomingEvent(Var &&msg_) : msg(std::move(msg_)) {}
};
@ -73,19 +80,20 @@ struct StreamGroup : NonCopyable {
std::string pluginUpCmd;
std::map<std::string, StreamerInstance> streams; // url -> StreamerInstance
std::thread t;
hoytech::protected_queue<IncomingEvent> inbox;
tao::json::value filter;
NostrFilterGroup filterCompiled;
PluginEventSifter pluginDown;
PluginEventSifter pluginUp;
std::thread t;
hoytech::protected_queue<IncomingEvent> inbox;
StreamGroup(std::string groupName, const tao::config::value &spec) : groupName(groupName) {
if (!spec.find("dir")) throw herr("no dir field");
dir = spec.at("dir").get_string();
tao::json::value filter = tao::json::empty_object;
filter = tao::json::empty_object;
// FIXME: Must be better way to go from config object to json, instead of round-trip through string
if (spec.find("filter")) filter = tao::json::from_string(tao::json::to_string(spec.at("filter")));
@ -102,14 +110,17 @@ struct StreamGroup : NonCopyable {
streams.try_emplace(url.get_string(), inbox, url.get_string(), dir, filter);
}
t = std::thread([this]{
while (1) {
auto newMsgs = inbox.pop_all();
for (auto &m : newMsgs) {
if (std::get_if<IncomingEvent::Shutdown>(&m.msg)) return;
handleIncomingEvent(m);
try {
handleIncomingEvent(m);
} catch (std::exception &e) {
LE << "Got exception processing message for streamGroup: " << e.what();
}
}
}
});
@ -151,6 +162,28 @@ struct StreamGroup : NonCopyable {
} else {
LI << "[" << groupName << "] pluginUp blocked event " << ev->evJson->at("id").get_string() << ": " << okMsg;
}
} else if (auto c = std::get_if<IncomingEvent::ConfigUpdate>(&m.msg)) {
pluginDownCmd = c->pluginDownCmd;
pluginUpCmd = c->pluginUpCmd;
std::set<std::string> newUrls;
for (auto &url : c->urls.get_array()) newUrls.insert(url.get_string());
for (auto &url : newUrls) {
if (!streams.contains(url)) {
streams.try_emplace(url, inbox, url, dir, filter);
}
}
std::vector<std::string> toErase;
for (auto &[url, v] : streams) {
if (!newUrls.contains(url)) toErase.push_back(url);
}
for (auto &url : toErase) {
streams.erase(url);
}
}
}
};
@ -182,10 +215,30 @@ void cmd_router(const std::vector<std::string> &subArgs) {
std::lock_guard<std::mutex> guard(groupsMutex);
for (const auto &[k, v] : routerConfig.at("streams").get_object()) {
if (!streamGroups.contains(k)) {
LI << "New stream group [" << k << "]";
streamGroups.try_emplace(k, k, v);
for (const auto &[groupName, spec] : routerConfig.at("streams").get_object()) {
if (streamGroups.contains(groupName)) {
auto &oldGroup = streamGroups.at(groupName);
if (spec.at("dir").get_string() != oldGroup.dir || tao::json::to_string(spec.at("filter")) != oldGroup.filterStr) {
// Need to restart group
streamGroups.erase(groupName);
} else {
// No restart of group required
oldGroup.inbox.push_move(IncomingEvent{IncomingEvent::ConfigUpdate{
spec.get_object().contains("pluginDown") ? spec.at("pluginDown").get_string() : "",
spec.get_object().contains("pluginUp") ? spec.at("pluginUp").get_string() : "",
spec.at("urls")
}});
}
}
}
for (const auto &[groupName, spec] : routerConfig.at("streams").get_object()) {
if (!streamGroups.contains(groupName)) {
LI << "New stream group [" << groupName << "]";
streamGroups.try_emplace(groupName, groupName, spec);
}
}
} catch (std::exception &e) {