minor clean-ups

This commit is contained in:
Doug Hoyte
2023-04-29 14:38:24 -04:00
parent 0aae8323b6
commit 70c0edc350
4 changed files with 9 additions and 11 deletions

View File

@ -53,7 +53,7 @@ struct PluginWritePolicy {
std::unique_ptr<RunningPlugin> running;
WritePolicyResult acceptEvent(std::string_view jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) {
WritePolicyResult acceptEvent(const tao::json::value &evJson, uint64_t receivedAt, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) {
const auto &pluginPath = cfg().relay__writePolicy__plugin;
if (pluginPath.size() == 0) {
@ -81,7 +81,7 @@ struct PluginWritePolicy {
auto request = tao::json::value({
{ "type", "new" },
{ "event", tao::json::from_string(jsonStr) },
{ "event", evJson },
{ "receivedAt", receivedAt / 1000000 },
{ "sourceType", eventSourceTypeToStr(sourceType) },
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },

View File

@ -17,9 +17,10 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
tao::json::value evJson = tao::json::from_string(msg->jsonStr);
EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6;
std::string okMsg;
auto res = writePolicy.acceptEvent(msg->jsonStr, msg->receivedAt, sourceType, msg->ipAddr, okMsg);
auto res = writePolicy.acceptEvent(evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg);
if (res == WritePolicyResult::Accept) {
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);

View File

@ -25,8 +25,7 @@ class WSConnection {
std::function<void()> onTrigger;
bool reconnect = true;
uint64_t reconnectDelayMilliseconds = 5'000;
// bortloff@github needed an address to work with here, for his stream writepolicy hack
std::string connected_addr;
std::string remoteAddr;
// Should only be called from the websocket thread (ie within an onConnect or onMessage callback)
void send(std::string_view msg, uWS::OpCode op = uWS::OpCode::TEXT, size_t *compressedSize = nullptr) {
@ -59,9 +58,8 @@ class WSConnection {
currWs = nullptr;
}
std::string addr = ws->getAddress().address;
LI << "Connected to " << addr;
connected_addr = addr;
remoteAddr = ws->getAddress().address;
LI << "Connected to " << remoteAddr;
{
int optval = 1;

View File

@ -68,14 +68,13 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
if (origJson.get_array().size() < 3) throw herr("array too short");
auto &evJson = origJson.at(2);
// bortloff@github hacks in writePolicy here
std::string okMsg;
auto res = writePolicy.acceptEvent(tao::json::to_string(evJson), hoytech::curr_time_s(), EventSourceType::Stream, ws.connected_addr, okMsg);
auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg);
if (res == WritePolicyResult::Accept) {
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url });
} else {
LI << "[" << ws.connected_addr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg;
LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg;
}
} else {