router wip

This commit is contained in:
Doug Hoyte
2023-08-18 01:05:55 -04:00
parent e8538651ac
commit d24d36134a
3 changed files with 133 additions and 41 deletions

View File

@ -13,7 +13,7 @@ struct EventStreamer : NonCopyable {
std::string dir; std::string dir;
tao::json::value filter; tao::json::value filter;
std::function<void(tao::json::value &&, const WSConnection &ws)> onEvent; std::function<void(tao::json::value &&)> onIncomingEvent;
private: private:
@ -33,6 +33,10 @@ struct EventStreamer : NonCopyable {
ws.trigger(); ws.trigger();
} }
void close() {
ws.close();
}
void run() { void run() {
ws.onConnect = [&]{ ws.onConnect = [&]{
if (dir == "down" || dir == "both") { if (dir == "down" || dir == "both") {
@ -63,7 +67,7 @@ struct EventStreamer : NonCopyable {
auto &evJson = origJson.at(2); auto &evJson = origJson.at(2);
// FIXME: validate that the event actually matches provided filter? // FIXME: validate that the event actually matches provided filter?
if (onEvent) onEvent(std::move(evJson), ws); if (onIncomingEvent) onIncomingEvent(std::move(evJson));
} else { } else {
LW << "Unexpected EVENT from " << url; LW << "Unexpected EVENT from " << url;
} }

View File

@ -24,32 +24,63 @@ R"(
static std::unique_ptr<WriterPipeline> globalRouterWriter; static std::unique_ptr<WriterPipeline> globalRouterWriter;
struct StreamGroup : NonCopyable { struct IncomingEvent : NonCopyable {
std::string dir; struct Down {
std::string filterStr; tao::json::value evJson;
NostrFilterGroup filterCompiled; std::string url;
};
std::optional<PluginEventSifter> pluginDown; struct Up {
std::optional<PluginEventSifter> pluginUp; std::shared_ptr<std::string> evStr;
std::shared_ptr<tao::json::value> evJson;
};
struct Shutdown {
};
using Var = std::variant<Down, Up, Shutdown>;
Var msg;
IncomingEvent(Var &&msg_) : msg(std::move(msg_)) {}
};
struct StreamerInstance : NonCopyable { struct StreamerInstance : NonCopyable {
hoytech::protected_queue<IncomingEvent> &inbox;
EventStreamer es; EventStreamer es;
std::thread t; std::thread t;
StreamerInstance(const std::string &url, const std::string &dir, tao::json::value filter) : es(url, dir, filter) { StreamerInstance(hoytech::protected_queue<IncomingEvent> &inbox, const std::string &url, const std::string &dir, tao::json::value filter) : inbox(inbox), es(url, dir, filter) {
es.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) { es.onIncomingEvent = [&](tao::json::value &&evJson) {
globalRouterWriter->write({ std::move(evJson), EventSourceType::Stream, es.url }); inbox.push_move(IncomingEvent{IncomingEvent::Down{ std::move(evJson), es.url }});
}; };
t = std::thread([this]{ t = std::thread([this]{
es.run(); es.run();
}); });
} }
~StreamerInstance() {
es.close();
t.join();
}
}; };
struct StreamGroup : NonCopyable {
std::string groupName;
std::string dir;
std::string filterStr;
std::string pluginDownCmd;
std::string pluginUpCmd;
std::map<std::string, StreamerInstance> streams; // url -> StreamerInstance std::map<std::string, StreamerInstance> streams; // url -> StreamerInstance
StreamGroup(const tao::config::value &spec) { std::thread t;
hoytech::protected_queue<IncomingEvent> inbox;
NostrFilterGroup filterCompiled;
PluginEventSifter pluginDown;
PluginEventSifter pluginUp;
StreamGroup(std::string groupName, const tao::config::value &spec) : groupName(groupName) {
if (!spec.find("dir")) throw herr("no dir field"); if (!spec.find("dir")) throw herr("no dir field");
dir = spec.at("dir").get_string(); dir = spec.at("dir").get_string();
@ -62,9 +93,64 @@ struct StreamGroup : NonCopyable {
filterCompiled = NostrFilterGroup::unwrapped(filter); filterCompiled = NostrFilterGroup::unwrapped(filter);
if (spec.find("pluginDown")) pluginDownCmd = spec.at("pluginDown").get_string();
if (spec.find("pluginUp")) pluginUpCmd = spec.at("pluginUp").get_string();
if (!spec.find("urls")) throw herr("no urls field"); if (!spec.find("urls")) throw herr("no urls field");
for (const auto &url : spec.at("urls").get_array()) { for (const auto &url : spec.at("urls").get_array()) {
streams.try_emplace(url.get_string(), url.get_string(), dir, filter); streams.try_emplace(url.get_string(), inbox, url.get_string(), dir, filter);
}
t = std::thread([this]{
while (1) {
auto newMsgs = inbox.pop_all();
for (auto &m : newMsgs) {
if (std::get_if<IncomingEvent::Shutdown>(&m.msg)) return;
handleIncomingEvent(m);
}
}
});
}
void sendEvent(std::shared_ptr<std::string> evStr, std::shared_ptr<tao::json::value> evJson) {
inbox.push_move(IncomingEvent{IncomingEvent::Up{ std::move(evStr), std::move(evJson) }});
}
~StreamGroup() {
inbox.push_move(IncomingEvent{IncomingEvent::Shutdown{}});
t.join();
}
private:
void handleIncomingEvent(IncomingEvent &m) {
if (auto ev = std::get_if<IncomingEvent::Down>(&m.msg)) {
if (dir == "up") return;
std::string okMsg;
auto res = pluginDown.acceptEvent(pluginDownCmd, ev->evJson, hoytech::curr_time_s(), EventSourceType::Stream, ev->url, okMsg);
if (res == PluginEventSifterResult::Accept) {
globalRouterWriter->write({ std::move(ev->evJson), EventSourceType::Stream, ev->url });
} else {
LI << "[" << groupName << "] " << ev->url << ": pluginDown blocked event " << ev->evJson.at("id").get_string() << ": " << okMsg;
}
} else if (auto ev = std::get_if<IncomingEvent::Up>(&m.msg)) {
if (dir == "down") return;
std::string okMsg;
auto res = pluginUp.acceptEvent(pluginUpCmd, ev->evJson, hoytech::curr_time_s(), EventSourceType::Stream, "", okMsg);
if (res == PluginEventSifterResult::Accept) {
for (auto &[url, streamer] : streams) {
streamer.es.sendEvent(ev->evStr);
streamer.es.trigger();
}
} else {
LI << "[" << groupName << "] pluginUp blocked event " << ev->evJson->at("id").get_string() << ": " << okMsg;
}
} }
} }
}; };
@ -80,30 +166,35 @@ void cmd_router(const std::vector<std::string> &subArgs) {
globalRouterWriter = std::make_unique<WriterPipeline>(); globalRouterWriter = std::make_unique<WriterPipeline>();
Decompressor decomp; Decompressor decomp;
std::mutex groupMutex; std::mutex groupsMutex;
std::map<std::string, StreamGroup> streamGroups; // group name -> StreamGroup std::map<std::string, StreamGroup> streamGroups; // group name -> StreamGroup
// Config // Config
bool configLoadSuccess = false;
auto reconcileConfig = [&]{ auto reconcileConfig = [&]{
LI << "Loading router config file: " << routerConfigFile; LI << "Loading router config file: " << routerConfigFile;
try { try {
auto routerConfig = loadRawTaoConfig(routerConfigFile); auto routerConfig = loadRawTaoConfig(routerConfigFile);
std::lock_guard<std::mutex> guard(groupMutex); std::lock_guard<std::mutex> guard(groupsMutex);
for (const auto &[k, v] : routerConfig.at("streams").get_object()) { for (const auto &[k, v] : routerConfig.at("streams").get_object()) {
if (!streamGroups.contains(k)) { if (!streamGroups.contains(k)) {
LI << "New stream group [" << k << "]"; LI << "New stream group [" << k << "]";
streamGroups.emplace(k, v); streamGroups.try_emplace(k, k, v);
} }
} }
} catch (std::exception &e) { } catch (std::exception &e) {
LE << "Failed to parse router config: " << e.what(); LE << "Failed to parse router config: " << e.what();
if (!configLoadSuccess) ::exit(1);
return; return;
} }
configLoadSuccess = true;
}; };
hoytech::file_change_monitor configFileWatcher(routerConfigFile); hoytech::file_change_monitor configFileWatcher(routerConfigFile);
@ -127,28 +218,26 @@ void cmd_router(const std::vector<std::string> &subArgs) {
dbChangeWatcher.setDebounce(100); dbChangeWatcher.setDebounce(100);
dbChangeWatcher.run([&](){ dbChangeWatcher.run([&](){
std::lock_guard<std::mutex> guard(groupMutex); std::lock_guard<std::mutex> guard(groupsMutex);
auto txn = env.txn_ro(); auto txn = env.txn_ro();
env.foreach_Event(txn, [&](auto &ev){ env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId; currEventId = ev.primaryKeyId;
auto evStr = getEventJson(txn, decomp, ev.primaryKeyId);
std::string msg = std::string("[\"EVENT\","); std::string msg = std::string("[\"EVENT\",");
msg += getEventJson(txn, decomp, ev.primaryKeyId); msg += evStr;
msg += "]"; msg += "]";
auto msgPtr = std::make_shared<std::string>(std::move(msg)); auto msgPtr = std::make_shared<std::string>(std::move(msg));
auto jsonPtr = std::make_shared<tao::json::value>(tao::json::from_string(evStr));
{ {
for (auto &[groupName, streamGroup] : streamGroups) { for (auto &[groupName, streamGroup] : streamGroups) {
if (streamGroup.dir == "down") continue; if (!streamGroup.filterCompiled.doesMatch(ev.flat_nested())) continue; // OK to access streamGroup innards because mutex
if (!streamGroup.filterCompiled.doesMatch(ev.flat_nested())) continue; streamGroup.sendEvent(msgPtr, jsonPtr);
for (auto &[url, streamer] : streamGroup.streams) {
streamer.es.sendEvent(msgPtr);
streamer.es.trigger(); // FIXME: do once at end
}
} }
} }

View File

@ -35,14 +35,14 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
Decompressor decomp; Decompressor decomp;
PluginEventSifter writePolicyPlugin; PluginEventSifter writePolicyPlugin;
streamer.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) { streamer.onIncomingEvent = [&](tao::json::value &&evJson) {
std::string okMsg; std::string okMsg;
auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg); auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_s(), EventSourceType::Stream, url, okMsg);
if (res == PluginEventSifterResult::Accept) { if (res == PluginEventSifterResult::Accept) {
downloadedIds.emplace(from_hex(evJson.at("id").get_string())); downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.write({ std::move(evJson), EventSourceType::Stream, url }); writer.write({ std::move(evJson), EventSourceType::Stream, url });
} else { } else {
LI << "[" << ws.remoteAddr << "] write policy blocked event from " << url << " : " << evJson.at("id").get_string() << " -> " << okMsg; LI << "write policy blocked event from " << url << " : " << evJson.at("id").get_string() << " -> " << okMsg;
} }
}; };
@ -78,7 +78,6 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
msg += "]"; msg += "]";
auto msgPtr = std::make_shared<std::string>(std::move(msg)); auto msgPtr = std::make_shared<std::string>(std::move(msg));
streamer.sendEvent(msgPtr); streamer.sendEvent(msgPtr);
return true; return true;