stream refactor

This commit is contained in:
Doug Hoyte
2023-08-16 15:49:27 -04:00
parent 6216920153
commit d66de14667

View File

@ -23,6 +23,85 @@ R"(
struct EventStreamer {
std::string url;
WSConnection ws;
std::string dir;
tao::json::value filter;
std::function<void(tao::json::value &&, const WSConnection &ws)> onEvent;
private:
hoytech::protected_queue<std::shared_ptr<std::string>> inbox;
public:
EventStreamer(const std::string &url, const std::string &dir, tao::json::value filter = tao::json::empty_object) : url(url), ws(url), dir(dir), filter(filter) {
filter["limit"] = 0;
}
void sendEvent(std::shared_ptr<std::string> msg) {
inbox.push_move(std::move(msg));
}
void trigger() {
ws.trigger();
}
void run() {
ws.onConnect = [&]{
if (dir == "down" || dir == "both") {
auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "X", filter }));
ws.send(encoded);
}
};
ws.onMessage = [&](auto msg, uWS::OpCode, size_t){
auto origJson = tao::json::from_string(msg);
if (origJson.is_array()) {
if (origJson.get_array().size() < 2) throw herr("array too short");
auto &msgType = origJson.get_array().at(0);
if (msgType == "EOSE") {
return;
} else if (msgType == "NOTICE") {
LW << "NOTICE message from " << url << " : " << tao::json::to_string(origJson);
return;
} else if (msgType == "OK") {
if (!origJson.get_array().at(2).get_boolean()) {
LW << "Event not written by " << url << " : " << origJson;
}
} else if (msgType == "EVENT") {
if (dir == "down" || dir == "both") {
if (origJson.get_array().size() < 3) throw herr("array too short");
auto &evJson = origJson.at(2);
// FIXME: validate that the event actually matches provided filter?
if (onEvent) onEvent(std::move(evJson), ws);
} else {
LW << "Unexpected EVENT from " << url;
}
} else {
throw herr("unexpected first element");
}
} else {
throw herr("unexpected message");
}
};
ws.onTrigger = [&]{
auto msgs = inbox.pop_all();
for (auto &msg : msgs) ws.send(*msg);
};
ws.run();
}
};
void cmd_stream(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
@ -32,57 +111,19 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
flat_hash_set<std::string> downloadedIds;
EventStreamer streamer(url, dir);
WriterPipeline writer;
WSConnection ws(url);
Decompressor decomp;
PluginWritePolicy writePolicy;
ws.onConnect = [&]{
if (dir == "down" || dir == "both") {
auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "sub", tao::json::value({ { "limit", 0 } }) }));
ws.send(encoded);
}
};
ws.onMessage = [&](auto msg, uWS::OpCode, size_t){
auto origJson = tao::json::from_string(msg);
if (origJson.is_array()) {
if (origJson.get_array().size() < 2) throw herr("array too short");
auto &msgType = origJson.get_array().at(0);
if (msgType == "EOSE") {
return;
} else if (msgType == "NOTICE") {
LW << "NOTICE message: " << tao::json::to_string(origJson);
return;
} else if (msgType == "OK") {
if (!origJson.get_array().at(2).get_boolean()) {
LW << "Event not written: " << origJson;
}
} else if (msgType == "EVENT") {
if (dir == "down" || dir == "both") {
if (origJson.get_array().size() < 3) throw herr("array too short");
auto &evJson = origJson.at(2);
std::string okMsg;
auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg);
if (res == WritePolicyResult::Accept) {
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.write({ std::move(evJson), EventSourceType::Stream, url });
} else {
LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg;
}
} else {
LW << "Unexpected EVENT";
}
} else {
throw herr("unexpected first element");
}
streamer.onEvent = [&](tao::json::value &&evJson, const WSConnection &ws) {
std::string okMsg;
auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg);
if (res == WritePolicyResult::Accept) {
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.write({ std::move(evJson), EventSourceType::Stream, url });
} else {
throw herr("unexpected message");
LI << "[" << ws.remoteAddr << "] write policy blocked event from " << url << " : " << evJson.at("id").get_string() << " -> " << okMsg;
}
};
@ -94,30 +135,6 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
currEventId = getMostRecentLevId(txn);
}
ws.onTrigger = [&]{
if (dir == "down") return;
auto txn = env.txn_ro();
env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId;
auto id = std::string(sv(ev.flat_nested()->id()));
if (downloadedIds.find(id) != downloadedIds.end()) {
downloadedIds.erase(id);
return true;
}
std::string msg = std::string("[\"EVENT\",");
msg += getEventJson(txn, decomp, ev.primaryKeyId);
msg += "]";
ws.send(msg);
return true;
}, false, currEventId + 1);
};
std::unique_ptr<hoytech::file_change_monitor> dbChangeWatcher;
if (dir == "up" || dir == "both") {
@ -126,10 +143,32 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
dbChangeWatcher->setDebounce(100);
dbChangeWatcher->run([&](){
ws.trigger();
auto txn = env.txn_ro();
env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId;
auto id = std::string(sv(ev.flat_nested()->id()));
if (downloadedIds.find(id) != downloadedIds.end()) {
downloadedIds.erase(id);
return true;
}
std::string msg = std::string("[\"EVENT\",");
msg += getEventJson(txn, decomp, ev.primaryKeyId);
msg += "]";
auto msgPtr = std::make_shared<std::string>(std::move(msg));
streamer.sendEvent(msgPtr);
return true;
}, false, currEventId + 1);
streamer.trigger();
});
}
ws.run();
streamer.run();
}