diff --git a/src/PluginWritePolicy.h b/src/PluginWritePolicy.h index 5230927..452c4a1 100644 --- a/src/PluginWritePolicy.h +++ b/src/PluginWritePolicy.h @@ -53,7 +53,7 @@ struct PluginWritePolicy { std::unique_ptr running; - WritePolicyResult acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { + WritePolicyResult acceptEvent(const tao::json::value &evJson, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { const auto &pluginPath = cfg().relay__writePolicy__plugin; if (pluginPath.size() == 0) { @@ -81,7 +81,7 @@ struct PluginWritePolicy { auto request = tao::json::value({ { "type", "new" }, - { "event", tao::json::from_string(jsonStr) }, + { "event", evJson }, { "receivedAt", receivedAt / 1000000 }, { "sourceType", eventSourceTypeToStr(sourceType) }, { "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo }, diff --git a/src/RelayWriter.cpp b/src/RelayWriter.cpp index 217148d..c1e2639 100644 --- a/src/RelayWriter.cpp +++ b/src/RelayWriter.cpp @@ -17,9 +17,10 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { for (auto &newMsg : newMsgs) { if (auto msg = std::get_if(&newMsg.msg)) { + tao::json::value evJson = tao::json::from_string(msg->jsonStr); EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; std::string okMsg; - auto res = writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr, okMsg); + auto res = writePolicy.acceptEvent(evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg); if (res == WritePolicyResult::Accept) { newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); diff --git a/src/WSConnection.h b/src/WSConnection.h index 7a2e7a2..9ecfdfb 100644 --- a/src/WSConnection.h +++ b/src/WSConnection.h @@ -25,8 +25,7 @@ class WSConnection { std::function onTrigger; bool reconnect = true; uint64_t reconnectDelayMilliseconds = 5'000; - // bortloff@github needed an address to work with here, for his stream writepolicy hack - std::string connected_addr; + std::string remoteAddr; // Should only be called from the websocket thread (ie within an onConnect or onMessage callback) void send(std::string_view msg, uWS::OpCode op = uWS::OpCode::TEXT, size_t *compressedSize = nullptr) { @@ -59,9 +58,8 @@ class WSConnection { currWs = nullptr; } - std::string addr = ws->getAddress().address; - LI << "Connected to " << addr; - connected_addr = addr; + remoteAddr = ws->getAddress().address; + LI << "Connected to " << remoteAddr; { int optval = 1; diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index 8f7a5cb..28b678d 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -68,14 +68,13 @@ void cmd_stream(const std::vector &subArgs) { if (origJson.get_array().size() < 3) throw herr("array too short"); auto &evJson = origJson.at(2); - // bortloff@github hacks in writePolicy here std::string okMsg; - auto res = writePolicy.acceptEvent(tao::json::to_string(evJson), hoytech::curr_time_s(), EventSourceType::Stream, ws.connected_addr, okMsg); + auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg); if (res == WritePolicyResult::Accept) { downloadedIds.emplace(from_hex(evJson.at("id").get_string())); writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url }); } else { - LI << "[" << ws.connected_addr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; + LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; } } else {