mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-19 17:37:43 +00:00
make sync command use negentropy
This commit is contained in:
13
TODO
13
TODO
@ -7,14 +7,11 @@
|
|||||||
0.2 release
|
0.2 release
|
||||||
? why isn't the LMDB mapping CLOEXEC
|
? why isn't the LMDB mapping CLOEXEC
|
||||||
? plugin for stream: make sure bortloff@github didn't make a mess of it
|
? plugin for stream: make sure bortloff@github didn't make a mess of it
|
||||||
fix sync
|
|
||||||
* logging of bytes up/down
|
sync
|
||||||
* up/both directions
|
logging of bytes up/down
|
||||||
* error handling and reporting
|
pre-calcuated tree of XOR
|
||||||
* way to close sync request
|
full-db scan limited by since/until
|
||||||
* limit on number of concurrent sync requests
|
|
||||||
* full-db scan limited by since/until
|
|
||||||
* `strfry sync` command always takes at least 1 second due to batching delay. figure out better way to flush
|
|
||||||
|
|
||||||
features
|
features
|
||||||
less verbose default logging
|
less verbose default logging
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
#include "Negentropy.h"
|
#include <Negentropy.h>
|
||||||
|
|
||||||
#include "RelayServer.h"
|
#include "RelayServer.h"
|
||||||
#include "DBQuery.h"
|
#include "DBQuery.h"
|
||||||
|
@ -24,6 +24,10 @@ struct WriterPipeline {
|
|||||||
std::thread validatorThread;
|
std::thread validatorThread;
|
||||||
std::thread writerThread;
|
std::thread writerThread;
|
||||||
|
|
||||||
|
std::condition_variable shutdownCv;
|
||||||
|
std::mutex shutdownMutex;
|
||||||
|
bool shutdown = false;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
WriterPipeline() {
|
WriterPipeline() {
|
||||||
validatorThread = std::thread([&]() {
|
validatorThread = std::thread([&]() {
|
||||||
@ -61,7 +65,12 @@ struct WriterPipeline {
|
|||||||
while (1) {
|
while (1) {
|
||||||
// Debounce
|
// Debounce
|
||||||
writerInbox.wait();
|
writerInbox.wait();
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1'000));
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(shutdownMutex);
|
||||||
|
shutdownCv.wait_for(lk, std::chrono::milliseconds(1'000), [&]{return shutdown;});
|
||||||
|
}
|
||||||
|
|
||||||
auto newEvents = writerInbox.pop_all();
|
auto newEvents = writerInbox.pop_all();
|
||||||
|
|
||||||
bool flush = false;
|
bool flush = false;
|
||||||
@ -120,6 +129,12 @@ struct WriterPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void flush() {
|
void flush() {
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(shutdownMutex);
|
||||||
|
shutdown = true;
|
||||||
|
}
|
||||||
|
shutdownCv.notify_all();
|
||||||
|
|
||||||
inbox.push_move({ tao::json::null, EventSourceType::None, "" });
|
inbox.push_move({ tao::json::null, EventSourceType::None, "" });
|
||||||
flushInbox.wait();
|
flushInbox.wait();
|
||||||
}
|
}
|
||||||
|
163
src/cmd_sync.cpp
163
src/cmd_sync.cpp
@ -1,5 +1,6 @@
|
|||||||
#include <docopt.h>
|
#include <docopt.h>
|
||||||
#include <tao/json.hpp>
|
#include <tao/json.hpp>
|
||||||
|
#include <Negentropy.h>
|
||||||
|
|
||||||
#include "golpe.h"
|
#include "golpe.h"
|
||||||
|
|
||||||
@ -18,7 +19,7 @@ R"(
|
|||||||
|
|
||||||
Options:
|
Options:
|
||||||
--filter=<filter> Nostr filter (either single filter object or array of filters)
|
--filter=<filter> Nostr filter (either single filter object or array of filters)
|
||||||
--dir=<dir> Direction: down, up, or both [default: down]
|
--dir=<dir> Direction: both, down, up, none [default: both]
|
||||||
)";
|
)";
|
||||||
|
|
||||||
|
|
||||||
@ -31,11 +32,163 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
|||||||
std::string filterStr;
|
std::string filterStr;
|
||||||
if (args["--filter"]) filterStr = args["--filter"].asString();
|
if (args["--filter"]) filterStr = args["--filter"].asString();
|
||||||
else filterStr = "{}";
|
else filterStr = "{}";
|
||||||
|
std::string dir = args["--dir"] ? args["--dir"].asString() : "both";
|
||||||
|
if (dir != "both" && dir != "up" && dir != "down" && dir != "none") throw herr("invalid direction: ", dir, ". Should be one of both/up/down/none");
|
||||||
|
|
||||||
std::string dir = args["--dir"] ? args["--dir"].asString() : "down";
|
const uint64_t idSize = 16;
|
||||||
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
|
const bool doUp = dir == "both" || dir == "up";
|
||||||
if (dir != "down") throw herr("only down currently supported"); // FIXME
|
const bool doDown = dir == "both" || dir == "down";
|
||||||
|
|
||||||
|
|
||||||
throw herr("sync is temporarily not implemented");
|
tao::json::value filter = tao::json::from_string(filterStr);
|
||||||
|
|
||||||
|
|
||||||
|
Negentropy ne(idSize);
|
||||||
|
|
||||||
|
{
|
||||||
|
DBQuery query(filter);
|
||||||
|
Decompressor decomp;
|
||||||
|
|
||||||
|
auto txn = env.txn_ro();
|
||||||
|
|
||||||
|
uint64_t numEvents = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
|
||||||
|
auto ev = lookupEventByLevId(txn, levId);
|
||||||
|
ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize));
|
||||||
|
|
||||||
|
numEvents++;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (complete) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
LI << "Filter matches " << numEvents << " events";
|
||||||
|
}
|
||||||
|
|
||||||
|
ne.seal();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
WriterPipeline writer;
|
||||||
|
WSConnection ws(url);
|
||||||
|
ws.reconnect = false;
|
||||||
|
|
||||||
|
ws.onConnect = [&]{
|
||||||
|
auto neMsg = to_hex(ne.initiate());
|
||||||
|
ws.send(tao::json::to_string(tao::json::value::array({
|
||||||
|
"NEG-OPEN",
|
||||||
|
"N",
|
||||||
|
filter,
|
||||||
|
idSize,
|
||||||
|
neMsg,
|
||||||
|
})));
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
const uint64_t highWaterUp = 100, lowWaterUp = 50;
|
||||||
|
const uint64_t batchSizeDown = 50;
|
||||||
|
uint64_t inFlightUp = 0, inFlightDown = 0;
|
||||||
|
std::vector<std::string> have, need;
|
||||||
|
bool syncDone = false;
|
||||||
|
uint64_t totalHaves = 0, totalNeeds = 0;
|
||||||
|
Decompressor decomp;
|
||||||
|
|
||||||
|
ws.onMessage = [&](auto msgStr, uWS::OpCode opCode, size_t compressedSize){
|
||||||
|
try {
|
||||||
|
tao::json::value msg = tao::json::from_string(msgStr);
|
||||||
|
|
||||||
|
if (msg.at(0) == "NEG-MSG") {
|
||||||
|
uint64_t origHaves = have.size(), origNeeds = need.size();
|
||||||
|
|
||||||
|
auto neMsg = ne.reconcile(from_hex(msg.at(2).get_string()), have, need);
|
||||||
|
|
||||||
|
totalHaves += have.size() - origHaves;
|
||||||
|
totalNeeds += need.size() - origNeeds;
|
||||||
|
|
||||||
|
if (!doUp) have.clear();
|
||||||
|
if (!doDown) need.clear();
|
||||||
|
|
||||||
|
if (neMsg.size() == 0) {
|
||||||
|
syncDone = true;
|
||||||
|
LI << "Set reconcile complete. Have " << totalHaves << " need " << totalNeeds;
|
||||||
|
} else {
|
||||||
|
ws.send(tao::json::to_string(tao::json::value::array({
|
||||||
|
"NEG-MSG",
|
||||||
|
"N",
|
||||||
|
to_hex(neMsg),
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
} else if (msg.at(0) == "OK") {
|
||||||
|
inFlightUp--;
|
||||||
|
|
||||||
|
if (!msg.at(2).get_boolean()) {
|
||||||
|
LW << "Unable to upload event " << msg.at(1).get_string() << ": " << msg.at(3).get_string();
|
||||||
|
}
|
||||||
|
} else if (msg.at(0) == "EVENT") {
|
||||||
|
writer.inbox.push_move({ std::move(msg.at(2)), EventSourceType::Sync, url });
|
||||||
|
} else if (msg.at(0) == "EOSE") {
|
||||||
|
inFlightDown = 0;
|
||||||
|
}
|
||||||
|
} catch (std::exception &e) {
|
||||||
|
LE << "Error processing websocket message: " << e.what();
|
||||||
|
LW << "MSG: " << msgStr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (doUp && have.size() > 0 && inFlightUp < lowWaterUp) {
|
||||||
|
auto txn = env.txn_ro();
|
||||||
|
|
||||||
|
uint64_t numSent = 0;
|
||||||
|
|
||||||
|
while (have.size() > 0 && inFlightUp < highWaterUp) {
|
||||||
|
auto id = std::move(have.back());
|
||||||
|
have.pop_back();
|
||||||
|
|
||||||
|
auto ev = lookupEventById(txn, id);
|
||||||
|
if (!ev) {
|
||||||
|
LW << "Couldn't upload event because not found (deleted?)";
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string sendEventMsg = "[\"EVENT\",";
|
||||||
|
sendEventMsg += getEventJson(txn, decomp, ev->primaryKeyId);
|
||||||
|
sendEventMsg += "]";
|
||||||
|
ws.send(sendEventMsg);
|
||||||
|
|
||||||
|
numSent++;
|
||||||
|
inFlightUp++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numSent > 0) LI << "UP: " << numSent << " events (" << have.size() << " remaining)";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (doDown && need.size() > 0 && inFlightDown == 0) {
|
||||||
|
tao::json::value ids = tao::json::empty_array;
|
||||||
|
|
||||||
|
while (need.size() > 0 && ids.get_array().size() < batchSizeDown) {
|
||||||
|
ids.emplace_back(to_hex(need.back()));
|
||||||
|
need.pop_back();
|
||||||
|
}
|
||||||
|
|
||||||
|
LI << "DOWN: " << ids.get_array().size() << " events (" << need.size() << " remaining)";
|
||||||
|
|
||||||
|
ws.send(tao::json::to_string(tao::json::value::array({
|
||||||
|
"REQ",
|
||||||
|
"R",
|
||||||
|
tao::json::value({
|
||||||
|
{ "ids", std::move(ids) }
|
||||||
|
}),
|
||||||
|
})));
|
||||||
|
|
||||||
|
inFlightDown = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncDone && have.size() == 0 && need.size() == 0 && inFlightUp == 0 && inFlightDown == 0) {
|
||||||
|
if (doDown) writer.flush();
|
||||||
|
::exit(0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ws.run();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user