diff --git a/TODO b/TODO index cc7710a..3cb9593 100644 --- a/TODO +++ b/TODO @@ -7,14 +7,11 @@ 0.2 release ? why isn't the LMDB mapping CLOEXEC ? plugin for stream: make sure bortloff@github didn't make a mess of it - fix sync - * logging of bytes up/down - * up/both directions - * error handling and reporting - * way to close sync request - * limit on number of concurrent sync requests - * full-db scan limited by since/until - * `strfry sync` command always takes at least 1 second due to batching delay. figure out better way to flush + +sync + logging of bytes up/down + pre-calcuated tree of XOR + full-db scan limited by since/until features less verbose default logging diff --git a/src/RelayNegentropy.cpp b/src/RelayNegentropy.cpp index f35e0b0..4403630 100644 --- a/src/RelayNegentropy.cpp +++ b/src/RelayNegentropy.cpp @@ -1,4 +1,4 @@ -#include "Negentropy.h" +#include #include "RelayServer.h" #include "DBQuery.h" diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index bd10d0c..ed6ce48 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -24,6 +24,10 @@ struct WriterPipeline { std::thread validatorThread; std::thread writerThread; + std::condition_variable shutdownCv; + std::mutex shutdownMutex; + bool shutdown = false; + public: WriterPipeline() { validatorThread = std::thread([&]() { @@ -61,7 +65,12 @@ struct WriterPipeline { while (1) { // Debounce writerInbox.wait(); - std::this_thread::sleep_for(std::chrono::milliseconds(1'000)); + + { + std::unique_lock lk(shutdownMutex); + shutdownCv.wait_for(lk, std::chrono::milliseconds(1'000), [&]{return shutdown;}); + } + auto newEvents = writerInbox.pop_all(); bool flush = false; @@ -120,6 +129,12 @@ struct WriterPipeline { } void flush() { + { + std::lock_guard lk(shutdownMutex); + shutdown = true; + } + shutdownCv.notify_all(); + inbox.push_move({ tao::json::null, EventSourceType::None, "" }); flushInbox.wait(); } diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index 32a673f..f4893a9 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "golpe.h" @@ -18,7 +19,7 @@ R"( Options: --filter= Nostr filter (either single filter object or array of filters) - --dir= Direction: down, up, or both [default: down] + --dir= Direction: both, down, up, none [default: both] )"; @@ -31,11 +32,163 @@ void cmd_sync(const std::vector &subArgs) { std::string filterStr; if (args["--filter"]) filterStr = args["--filter"].asString(); else filterStr = "{}"; + std::string dir = args["--dir"] ? args["--dir"].asString() : "both"; + if (dir != "both" && dir != "up" && dir != "down" && dir != "none") throw herr("invalid direction: ", dir, ". Should be one of both/up/down/none"); - std::string dir = args["--dir"] ? args["--dir"].asString() : "down"; - if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both"); - if (dir != "down") throw herr("only down currently supported"); // FIXME + const uint64_t idSize = 16; + const bool doUp = dir == "both" || dir == "up"; + const bool doDown = dir == "both" || dir == "down"; - throw herr("sync is temporarily not implemented"); + tao::json::value filter = tao::json::from_string(filterStr); + + + Negentropy ne(idSize); + + { + DBQuery query(filter); + Decompressor decomp; + + auto txn = env.txn_ro(); + + uint64_t numEvents = 0; + + while (1) { + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ + auto ev = lookupEventByLevId(txn, levId); + ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize)); + + numEvents++; + }); + + if (complete) break; + } + + LI << "Filter matches " << numEvents << " events"; + } + + ne.seal(); + + + + WriterPipeline writer; + WSConnection ws(url); + ws.reconnect = false; + + ws.onConnect = [&]{ + auto neMsg = to_hex(ne.initiate()); + ws.send(tao::json::to_string(tao::json::value::array({ + "NEG-OPEN", + "N", + filter, + idSize, + neMsg, + }))); + }; + + + const uint64_t highWaterUp = 100, lowWaterUp = 50; + const uint64_t batchSizeDown = 50; + uint64_t inFlightUp = 0, inFlightDown = 0; + std::vector have, need; + bool syncDone = false; + uint64_t totalHaves = 0, totalNeeds = 0; + Decompressor decomp; + + ws.onMessage = [&](auto msgStr, uWS::OpCode opCode, size_t compressedSize){ + try { + tao::json::value msg = tao::json::from_string(msgStr); + + if (msg.at(0) == "NEG-MSG") { + uint64_t origHaves = have.size(), origNeeds = need.size(); + + auto neMsg = ne.reconcile(from_hex(msg.at(2).get_string()), have, need); + + totalHaves += have.size() - origHaves; + totalNeeds += need.size() - origNeeds; + + if (!doUp) have.clear(); + if (!doDown) need.clear(); + + if (neMsg.size() == 0) { + syncDone = true; + LI << "Set reconcile complete. Have " << totalHaves << " need " << totalNeeds; + } else { + ws.send(tao::json::to_string(tao::json::value::array({ + "NEG-MSG", + "N", + to_hex(neMsg), + }))); + } + } else if (msg.at(0) == "OK") { + inFlightUp--; + + if (!msg.at(2).get_boolean()) { + LW << "Unable to upload event " << msg.at(1).get_string() << ": " << msg.at(3).get_string(); + } + } else if (msg.at(0) == "EVENT") { + writer.inbox.push_move({ std::move(msg.at(2)), EventSourceType::Sync, url }); + } else if (msg.at(0) == "EOSE") { + inFlightDown = 0; + } + } catch (std::exception &e) { + LE << "Error processing websocket message: " << e.what(); + LW << "MSG: " << msgStr; + } + + if (doUp && have.size() > 0 && inFlightUp < lowWaterUp) { + auto txn = env.txn_ro(); + + uint64_t numSent = 0; + + while (have.size() > 0 && inFlightUp < highWaterUp) { + auto id = std::move(have.back()); + have.pop_back(); + + auto ev = lookupEventById(txn, id); + if (!ev) { + LW << "Couldn't upload event because not found (deleted?)"; + continue; + } + + std::string sendEventMsg = "[\"EVENT\","; + sendEventMsg += getEventJson(txn, decomp, ev->primaryKeyId); + sendEventMsg += "]"; + ws.send(sendEventMsg); + + numSent++; + inFlightUp++; + } + + if (numSent > 0) LI << "UP: " << numSent << " events (" << have.size() << " remaining)"; + } + + if (doDown && need.size() > 0 && inFlightDown == 0) { + tao::json::value ids = tao::json::empty_array; + + while (need.size() > 0 && ids.get_array().size() < batchSizeDown) { + ids.emplace_back(to_hex(need.back())); + need.pop_back(); + } + + LI << "DOWN: " << ids.get_array().size() << " events (" << need.size() << " remaining)"; + + ws.send(tao::json::to_string(tao::json::value::array({ + "REQ", + "R", + tao::json::value({ + { "ids", std::move(ids) } + }), + }))); + + inFlightDown = 1; + } + + if (syncDone && have.size() == 0 && need.size() == 0 && inFlightUp == 0 && inFlightDown == 0) { + if (doDown) writer.flush(); + ::exit(0); + } + }; + + ws.run(); }