diff --git a/golpe b/golpe index dc3096f..19ce171 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit dc3096fbe06e0b09e4969f3afa34b038a8f1c54d +Subproject commit 19ce1715c271eb5c1ae3dc9f422093e4afe6c5e7 diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index cc293e2..d9e2c23 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -16,8 +16,22 @@ struct WriterPipelineInput { struct WriterPipeline { public: + // Params: + uint64_t debounceDelayMilliseconds = 1'000; uint64_t writeBatchSize = 1'000; + bool verifyMsg = true; + bool verifyTime = true; + bool verboseReject = true; + bool verboseCommit = true; + std::function onCommit; + + // For logging: + + std::atomic totalProcessed = 0; + std::atomic totalWritten = 0; + std::atomic totalRejected = 0; + std::atomic totalDups = 0; private: hoytech::protected_queue validatorInbox; @@ -57,10 +71,11 @@ struct WriterPipeline { std::string jsonStr; try { - parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr); + parseAndVerifyEvent(m.eventJson, secpCtx, verifyMsg, verifyTime, flatStr, jsonStr); } catch (std::exception &e) { - LW << "Rejected event: " << m.eventJson << " reason: " << e.what(); + if (verboseReject) LW << "Rejected event: " << m.eventJson << " reason: " << e.what(); numLive--; + totalRejected++; continue; } @@ -117,6 +132,7 @@ struct WriterPipeline { auto *flat = flatStrToFlatEvent(event.flatStr); if (lookupEventById(txn, sv(flat->id()))) { dups++; + totalDups++; continue; } @@ -132,13 +148,19 @@ struct WriterPipeline { } for (auto &ev : newEventsToProc) { - if (ev.status == EventWriteStatus::Written) written++; - else dups++; - // FIXME: log rejected stats too + if (ev.status == EventWriteStatus::Written) { + written++; + totalWritten++; + } else { + dups++; + totalDups++; + } } + + if (onCommit) onCommit(written); } - if (written || dups) LI << "Writer: added: " << written << " dups: " << dups; + if (verboseCommit && (written || dups)) LI << "Writer: added: " << written << " dups: " << dups; if (shutdownComplete) { flushInbox.push_move(true); @@ -158,6 +180,8 @@ struct WriterPipeline { } void write(WriterPipelineInput &&inp) { + if (inp.eventJson.is_null()) return; + totalProcessed++; numLive++; validatorInbox.push_move(std::move(inp)); } diff --git a/src/apps/dbutils/cmd_import.cpp b/src/apps/dbutils/cmd_import.cpp index b70c158..492b110 100644 --- a/src/apps/dbutils/cmd_import.cpp +++ b/src/apps/dbutils/cmd_import.cpp @@ -3,14 +3,13 @@ #include #include "golpe.h" -#include "events.h" -#include "filters.h" +#include "WriterPipeline.h" static const char USAGE[] = R"( Usage: - import [--show-rejected] [--no-verify] + import [--show-rejected] [--no-verify] [--debounce-millis=] [--write-batch=] )"; @@ -19,71 +18,48 @@ void cmd_import(const std::vector &subArgs) { bool showRejected = args["--show-rejected"].asBool(); bool noVerify = args["--no-verify"].asBool(); + uint64_t debounceMillis = 1'000; + if (args["--debounce-millis"]) debounceMillis = args["--debounce-millis"].asLong(); + uint64_t writeBatch = 10'000; + if (args["--write-batch"]) writeBatch = args["--write-batch"].asLong(); if (noVerify) LW << "not verifying event IDs or signatures!"; - auto txn = env.txn_rw(); + WriterPipeline writer; - secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); + writer.debounceDelayMilliseconds = debounceMillis; + writer.writeBatchSize = writeBatch; + writer.verifyMsg = !noVerify; + writer.verifyTime = false; + writer.verboseReject = showRejected; + writer.verboseCommit = false; + writer.onCommit = [&](uint64_t numCommitted){ + LI << "Committed " << numCommitted + << ". Processed " << writer.totalProcessed << " lines. " << writer.totalWritten << " added, " << writer.totalRejected << " rejected, " << writer.totalDups << " dups"; + }; std::string line; - uint64_t processed = 0, added = 0, rejected = 0, dups = 0; - std::vector newEvents; - - auto logStatus = [&]{ - LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups"; - }; - - auto flushChanges = [&]{ - writeEvents(txn, newEvents, 0); - - uint64_t numCommits = 0; - - for (auto &newEvent : newEvents) { - if (newEvent.status == EventWriteStatus::Written) { - added++; - numCommits++; - } else if (newEvent.status == EventWriteStatus::Duplicate) { - dups++; - } else { - rejected++; - } - } - - logStatus(); - LI << "Committing " << numCommits << " records"; - - txn.commit(); - - txn = env.txn_rw(); - newEvents.clear(); - }; - + uint64_t currLine = 0; while (std::cin) { + currLine++; std::getline(std::cin, line); if (!line.size()) continue; - processed++; - - std::string flatStr; - std::string jsonStr; + tao::json::value evJson; try { - auto origJson = tao::json::from_string(line); - parseAndVerifyEvent(origJson, secpCtx, !noVerify, false, flatStr, jsonStr); + evJson = tao::json::from_string(line); } catch (std::exception &e) { - if (showRejected) LW << "Line " << processed << " rejected: " << e.what(); - rejected++; + LW << "Unable to parse JSON on line " << currLine; continue; } - newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), EventSourceType::Import, ""); - - if (newEvents.size() >= 10'000) flushChanges(); + writer.write({ std::move(evJson), EventSourceType::Import, "" }); + writer.wait(); } - flushChanges(); + writer.flush(); - txn.commit(); + LI << "Done. Processed " << writer.totalProcessed << " lines. " << writer.totalWritten << " added, " << writer.totalRejected << " rejected, " << writer.totalDups << " dups"; }