patching writePolicy into cmd_stream, assuming we

want writePolicy to govern both relayIngest and stream

I really hate spam, is what I'm trying to say.
This commit is contained in:
Bonn Ortloff
2023-03-27 18:24:41 -05:00
parent e5ec135e78
commit 2308276af5
4 changed files with 21 additions and 4 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@
*.o
/strfry
/strfry-db/*.mdb
.vscode/c_cpp_properties.json

2
TODO
View File

@ -6,7 +6,7 @@
0.2 release
? why isn't the LMDB mapping CLOEXEC
plugin for stream
? plugin for stream: make sure bortloff@github didn't make a mess of it
fix sync
* logging of bytes up/down
* up/both directions

View File

@ -25,6 +25,8 @@ class WSConnection {
std::function<void()> onTrigger;
bool reconnect = true;
uint64_t reconnectDelayMilliseconds = 5'000;
// bortloff@github needed an address to work with here, for his stream writepolicy hack
std::string connected_addr;
// Should only be called from the websocket thread (ie within an onConnect or onMessage callback)
void send(std::string_view msg, uWS::OpCode op = uWS::OpCode::TEXT, size_t *compressedSize = nullptr) {
@ -59,6 +61,7 @@ class WSConnection {
std::string addr = ws->getAddress().address;
LI << "Connected to " << addr;
connected_addr = addr;
{
int optval = 1;

View File

@ -10,6 +10,8 @@
#include "WSConnection.h"
#include "events.h"
#include "PluginWritePolicy.h"
static const char USAGE[] =
R"(
@ -30,12 +32,14 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
flat_hash_set<std::string> downloadedIds;
WriterPipeline writer;
WSConnection ws(url);
Decompressor decomp;
PluginWritePolicy writePolicy;
ws.onConnect = [&]{
if (dir == "down" || dir == "both") {
auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "sub", tao::json::value({ { "limit", 0 } }) }));
@ -63,8 +67,17 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
if (dir == "down" || dir == "both") {
if (origJson.get_array().size() < 3) throw herr("array too short");
auto &evJson = origJson.at(2);
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url });
// bortloff@github hacks in writePolicy here
std::string okMsg;
auto res = writePolicy.acceptEvent(evJson.get_string(), hoytech::curr_time_s(), EventSourceType::Stream, ws.connected_addr, okMsg);
if (res == WritePolicyResult::Accept) {
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url });
} else {
LW << "[" << ws.connected_addr << "] write policy blocked event" << from_hex(evJson.at("id").get_string()) << ": " << okMsg;
}
} else {
LW << "Unexpected EVENT";
}