mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-17 08:48:51 +00:00
Merge pull request #32 from bortloff/sharivegas-cmd_stream_plugin
cmd_stream writePolicy plugin support
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@ -3,3 +3,4 @@
|
|||||||
*.o
|
*.o
|
||||||
/strfry
|
/strfry
|
||||||
/strfry-db/*.mdb
|
/strfry-db/*.mdb
|
||||||
|
.vscode/*
|
||||||
|
2
TODO
2
TODO
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
0.2 release
|
0.2 release
|
||||||
? why isn't the LMDB mapping CLOEXEC
|
? why isn't the LMDB mapping CLOEXEC
|
||||||
plugin for stream
|
? plugin for stream: make sure bortloff@github didn't make a mess of it
|
||||||
fix sync
|
fix sync
|
||||||
* logging of bytes up/down
|
* logging of bytes up/down
|
||||||
* up/both directions
|
* up/both directions
|
||||||
|
@ -25,6 +25,8 @@ class WSConnection {
|
|||||||
std::function<void()> onTrigger;
|
std::function<void()> onTrigger;
|
||||||
bool reconnect = true;
|
bool reconnect = true;
|
||||||
uint64_t reconnectDelayMilliseconds = 5'000;
|
uint64_t reconnectDelayMilliseconds = 5'000;
|
||||||
|
// bortloff@github needed an address to work with here, for his stream writepolicy hack
|
||||||
|
std::string connected_addr;
|
||||||
|
|
||||||
// Should only be called from the websocket thread (ie within an onConnect or onMessage callback)
|
// Should only be called from the websocket thread (ie within an onConnect or onMessage callback)
|
||||||
void send(std::string_view msg, uWS::OpCode op = uWS::OpCode::TEXT, size_t *compressedSize = nullptr) {
|
void send(std::string_view msg, uWS::OpCode op = uWS::OpCode::TEXT, size_t *compressedSize = nullptr) {
|
||||||
@ -59,6 +61,7 @@ class WSConnection {
|
|||||||
|
|
||||||
std::string addr = ws->getAddress().address;
|
std::string addr = ws->getAddress().address;
|
||||||
LI << "Connected to " << addr;
|
LI << "Connected to " << addr;
|
||||||
|
connected_addr = addr;
|
||||||
|
|
||||||
{
|
{
|
||||||
int optval = 1;
|
int optval = 1;
|
||||||
|
@ -10,6 +10,8 @@
|
|||||||
#include "WSConnection.h"
|
#include "WSConnection.h"
|
||||||
#include "events.h"
|
#include "events.h"
|
||||||
|
|
||||||
|
#include "PluginWritePolicy.h"
|
||||||
|
|
||||||
|
|
||||||
static const char USAGE[] =
|
static const char USAGE[] =
|
||||||
R"(
|
R"(
|
||||||
@ -30,12 +32,14 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
|
|||||||
|
|
||||||
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
|
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
|
||||||
|
|
||||||
|
|
||||||
flat_hash_set<std::string> downloadedIds;
|
flat_hash_set<std::string> downloadedIds;
|
||||||
WriterPipeline writer;
|
WriterPipeline writer;
|
||||||
WSConnection ws(url);
|
WSConnection ws(url);
|
||||||
Decompressor decomp;
|
Decompressor decomp;
|
||||||
|
|
||||||
|
PluginWritePolicy writePolicy;
|
||||||
|
|
||||||
|
|
||||||
ws.onConnect = [&]{
|
ws.onConnect = [&]{
|
||||||
if (dir == "down" || dir == "both") {
|
if (dir == "down" || dir == "both") {
|
||||||
auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "sub", tao::json::value({ { "limit", 0 } }) }));
|
auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "sub", tao::json::value({ { "limit", 0 } }) }));
|
||||||
@ -63,8 +67,17 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
|
|||||||
if (dir == "down" || dir == "both") {
|
if (dir == "down" || dir == "both") {
|
||||||
if (origJson.get_array().size() < 3) throw herr("array too short");
|
if (origJson.get_array().size() < 3) throw herr("array too short");
|
||||||
auto &evJson = origJson.at(2);
|
auto &evJson = origJson.at(2);
|
||||||
|
|
||||||
|
// bortloff@github hacks in writePolicy here
|
||||||
|
std::string okMsg;
|
||||||
|
auto res = writePolicy.acceptEvent(tao::json::to_string(evJson), hoytech::curr_time_s(), EventSourceType::Stream, ws.connected_addr, okMsg);
|
||||||
|
if (res == WritePolicyResult::Accept) {
|
||||||
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
|
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
|
||||||
writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url });
|
writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url });
|
||||||
|
} else {
|
||||||
|
LI << "[" << ws.connected_addr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg;
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
LW << "Unexpected EVENT";
|
LW << "Unexpected EVENT";
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user