From d2889bc4916cc3060908723883b909d4441e61b6 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Tue, 6 Jun 2023 01:06:43 -0400 Subject: [PATCH] allow writePolicy plugins to filter events downloaded via sync --- src/apps/mesh/cmd_stream.cpp | 2 -- src/apps/mesh/cmd_sync.cpp | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/apps/mesh/cmd_stream.cpp b/src/apps/mesh/cmd_stream.cpp index 5d494f2..10ca745 100644 --- a/src/apps/mesh/cmd_stream.cpp +++ b/src/apps/mesh/cmd_stream.cpp @@ -9,7 +9,6 @@ #include "Subscription.h" #include "WSConnection.h" #include "events.h" - #include "PluginWritePolicy.h" @@ -76,7 +75,6 @@ void cmd_stream(const std::vector &subArgs) { } else { LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; } - } else { LW << "Unexpected EVENT"; } diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index f22c3b7..4061ac5 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -10,6 +10,7 @@ #include "DBQuery.h" #include "filters.h" #include "events.h" +#include "PluginWritePolicy.h" static const char USAGE[] = @@ -73,6 +74,9 @@ void cmd_sync(const std::vector &subArgs) { WriterPipeline writer; WSConnection ws(url); + PluginWritePolicy writePolicy; + + ws.reconnect = false; ws.onConnect = [&]{ @@ -128,7 +132,16 @@ void cmd_sync(const std::vector &subArgs) { LW << "Unable to upload event " << msg.at(1).get_string() << ": " << msg.at(3).get_string(); } } else if (msg.at(0) == "EVENT") { - writer.write({ std::move(msg.at(2)), EventSourceType::Sync, url }); + if (msg.get_array().size() < 3) throw herr("array too short"); + auto &evJson = msg.at(2); + + std::string okMsg; + auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Sync, ws.remoteAddr, okMsg); + if (res == WritePolicyResult::Accept) { + writer.write({ std::move(evJson), EventSourceType::Sync, url }); + } else { + LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; + } } else if (msg.at(0) == "EOSE") { inFlightDown = false; writer.wait();