From 2308276af54ba3f44684ca34189bc4f460a81bc9 Mon Sep 17 00:00:00 2001 From: Bonn Ortloff Date: Mon, 27 Mar 2023 18:24:41 -0500 Subject: [PATCH] patching writePolicy into cmd_stream, assuming we want writePolicy to govern both relayIngest and stream I really hate spam, is what I'm trying to say. --- .gitignore | 1 + TODO | 2 +- src/WSConnection.h | 3 +++ src/cmd_stream.cpp | 19 ++++++++++++++++--- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 920b466..99f7481 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ *.o /strfry /strfry-db/*.mdb +.vscode/c_cpp_properties.json diff --git a/TODO b/TODO index 88c68b4..b5fb7eb 100644 --- a/TODO +++ b/TODO @@ -6,7 +6,7 @@ 0.2 release ? why isn't the LMDB mapping CLOEXEC - plugin for stream + ? plugin for stream: make sure bortloff@github didn't make a mess of it fix sync * logging of bytes up/down * up/both directions diff --git a/src/WSConnection.h b/src/WSConnection.h index 73ecb5e..7a2e7a2 100644 --- a/src/WSConnection.h +++ b/src/WSConnection.h @@ -25,6 +25,8 @@ class WSConnection { std::function onTrigger; bool reconnect = true; uint64_t reconnectDelayMilliseconds = 5'000; + // bortloff@github needed an address to work with here, for his stream writepolicy hack + std::string connected_addr; // Should only be called from the websocket thread (ie within an onConnect or onMessage callback) void send(std::string_view msg, uWS::OpCode op = uWS::OpCode::TEXT, size_t *compressedSize = nullptr) { @@ -59,6 +61,7 @@ class WSConnection { std::string addr = ws->getAddress().address; LI << "Connected to " << addr; + connected_addr = addr; { int optval = 1; diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index 96799d7..ee47853 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -10,6 +10,8 @@ #include "WSConnection.h" #include "events.h" +#include "PluginWritePolicy.h" + static const char USAGE[] = R"( @@ -30,12 +32,14 @@ void cmd_stream(const std::vector &subArgs) { if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both"); - flat_hash_set downloadedIds; WriterPipeline writer; WSConnection ws(url); Decompressor decomp; + PluginWritePolicy writePolicy; + + ws.onConnect = [&]{ if (dir == "down" || dir == "both") { auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "sub", tao::json::value({ { "limit", 0 } }) })); @@ -63,8 +67,17 @@ void cmd_stream(const std::vector &subArgs) { if (dir == "down" || dir == "both") { if (origJson.get_array().size() < 3) throw herr("array too short"); auto &evJson = origJson.at(2); - downloadedIds.emplace(from_hex(evJson.at("id").get_string())); - writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url }); + + // bortloff@github hacks in writePolicy here + std::string okMsg; + auto res = writePolicy.acceptEvent(evJson.get_string(), hoytech::curr_time_s(), EventSourceType::Stream, ws.connected_addr, okMsg); + if (res == WritePolicyResult::Accept) { + downloadedIds.emplace(from_hex(evJson.at("id").get_string())); + writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url }); + } else { + LW << "[" << ws.connected_addr << "] write policy blocked event" << from_hex(evJson.at("id").get_string()) << ": " << okMsg; + } + } else { LW << "Unexpected EVENT"; }