mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-19 17:37:43 +00:00
switch import to use WriterPipeline
- This makes it do the verification and writing in parallel - Also, it will now flush periodically (default 1s) even if it has read fewer than N (default 10k) records from stdin. This lets import be used as a general-purpose non-relay event ingester. To do so, users must ensure that the stdout of their process they pipe into import is line buffered.
This commit is contained in:
2
golpe
2
golpe
Submodule golpe updated: dc3096fbe0...19ce1715c2
@ -16,8 +16,22 @@ struct WriterPipelineInput {
|
||||
|
||||
struct WriterPipeline {
|
||||
public:
|
||||
// Params:
|
||||
|
||||
uint64_t debounceDelayMilliseconds = 1'000;
|
||||
uint64_t writeBatchSize = 1'000;
|
||||
bool verifyMsg = true;
|
||||
bool verifyTime = true;
|
||||
bool verboseReject = true;
|
||||
bool verboseCommit = true;
|
||||
std::function<void(uint64_t)> onCommit;
|
||||
|
||||
// For logging:
|
||||
|
||||
std::atomic<uint64_t> totalProcessed = 0;
|
||||
std::atomic<uint64_t> totalWritten = 0;
|
||||
std::atomic<uint64_t> totalRejected = 0;
|
||||
std::atomic<uint64_t> totalDups = 0;
|
||||
|
||||
private:
|
||||
hoytech::protected_queue<WriterPipelineInput> validatorInbox;
|
||||
@ -57,10 +71,11 @@ struct WriterPipeline {
|
||||
std::string jsonStr;
|
||||
|
||||
try {
|
||||
parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr);
|
||||
parseAndVerifyEvent(m.eventJson, secpCtx, verifyMsg, verifyTime, flatStr, jsonStr);
|
||||
} catch (std::exception &e) {
|
||||
LW << "Rejected event: " << m.eventJson << " reason: " << e.what();
|
||||
if (verboseReject) LW << "Rejected event: " << m.eventJson << " reason: " << e.what();
|
||||
numLive--;
|
||||
totalRejected++;
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -117,6 +132,7 @@ struct WriterPipeline {
|
||||
auto *flat = flatStrToFlatEvent(event.flatStr);
|
||||
if (lookupEventById(txn, sv(flat->id()))) {
|
||||
dups++;
|
||||
totalDups++;
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -132,13 +148,19 @@ struct WriterPipeline {
|
||||
}
|
||||
|
||||
for (auto &ev : newEventsToProc) {
|
||||
if (ev.status == EventWriteStatus::Written) written++;
|
||||
else dups++;
|
||||
// FIXME: log rejected stats too
|
||||
if (ev.status == EventWriteStatus::Written) {
|
||||
written++;
|
||||
totalWritten++;
|
||||
} else {
|
||||
dups++;
|
||||
totalDups++;
|
||||
}
|
||||
}
|
||||
|
||||
if (onCommit) onCommit(written);
|
||||
}
|
||||
|
||||
if (written || dups) LI << "Writer: added: " << written << " dups: " << dups;
|
||||
if (verboseCommit && (written || dups)) LI << "Writer: added: " << written << " dups: " << dups;
|
||||
|
||||
if (shutdownComplete) {
|
||||
flushInbox.push_move(true);
|
||||
@ -158,6 +180,8 @@ struct WriterPipeline {
|
||||
}
|
||||
|
||||
void write(WriterPipelineInput &&inp) {
|
||||
if (inp.eventJson.is_null()) return;
|
||||
totalProcessed++;
|
||||
numLive++;
|
||||
validatorInbox.push_move(std::move(inp));
|
||||
}
|
||||
|
@ -3,14 +3,13 @@
|
||||
#include <docopt.h>
|
||||
#include "golpe.h"
|
||||
|
||||
#include "events.h"
|
||||
#include "filters.h"
|
||||
#include "WriterPipeline.h"
|
||||
|
||||
|
||||
static const char USAGE[] =
|
||||
R"(
|
||||
Usage:
|
||||
import [--show-rejected] [--no-verify]
|
||||
import [--show-rejected] [--no-verify] [--debounce-millis=<debounce-millis>] [--write-batch=<write-batch>]
|
||||
)";
|
||||
|
||||
|
||||
@ -19,71 +18,48 @@ void cmd_import(const std::vector<std::string> &subArgs) {
|
||||
|
||||
bool showRejected = args["--show-rejected"].asBool();
|
||||
bool noVerify = args["--no-verify"].asBool();
|
||||
uint64_t debounceMillis = 1'000;
|
||||
if (args["--debounce-millis"]) debounceMillis = args["--debounce-millis"].asLong();
|
||||
uint64_t writeBatch = 10'000;
|
||||
if (args["--write-batch"]) writeBatch = args["--write-batch"].asLong();
|
||||
|
||||
if (noVerify) LW << "not verifying event IDs or signatures!";
|
||||
|
||||
auto txn = env.txn_rw();
|
||||
WriterPipeline writer;
|
||||
|
||||
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
|
||||
writer.debounceDelayMilliseconds = debounceMillis;
|
||||
writer.writeBatchSize = writeBatch;
|
||||
writer.verifyMsg = !noVerify;
|
||||
writer.verifyTime = false;
|
||||
writer.verboseReject = showRejected;
|
||||
writer.verboseCommit = false;
|
||||
writer.onCommit = [&](uint64_t numCommitted){
|
||||
LI << "Committed " << numCommitted
|
||||
<< ". Processed " << writer.totalProcessed << " lines. " << writer.totalWritten << " added, " << writer.totalRejected << " rejected, " << writer.totalDups << " dups";
|
||||
};
|
||||
|
||||
std::string line;
|
||||
uint64_t processed = 0, added = 0, rejected = 0, dups = 0;
|
||||
std::vector<EventToWrite> newEvents;
|
||||
|
||||
auto logStatus = [&]{
|
||||
LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups";
|
||||
};
|
||||
|
||||
auto flushChanges = [&]{
|
||||
writeEvents(txn, newEvents, 0);
|
||||
|
||||
uint64_t numCommits = 0;
|
||||
|
||||
for (auto &newEvent : newEvents) {
|
||||
if (newEvent.status == EventWriteStatus::Written) {
|
||||
added++;
|
||||
numCommits++;
|
||||
} else if (newEvent.status == EventWriteStatus::Duplicate) {
|
||||
dups++;
|
||||
} else {
|
||||
rejected++;
|
||||
}
|
||||
}
|
||||
|
||||
logStatus();
|
||||
LI << "Committing " << numCommits << " records";
|
||||
|
||||
txn.commit();
|
||||
|
||||
txn = env.txn_rw();
|
||||
newEvents.clear();
|
||||
};
|
||||
|
||||
uint64_t currLine = 0;
|
||||
|
||||
while (std::cin) {
|
||||
currLine++;
|
||||
std::getline(std::cin, line);
|
||||
if (!line.size()) continue;
|
||||
|
||||
processed++;
|
||||
|
||||
std::string flatStr;
|
||||
std::string jsonStr;
|
||||
tao::json::value evJson;
|
||||
|
||||
try {
|
||||
auto origJson = tao::json::from_string(line);
|
||||
parseAndVerifyEvent(origJson, secpCtx, !noVerify, false, flatStr, jsonStr);
|
||||
evJson = tao::json::from_string(line);
|
||||
} catch (std::exception &e) {
|
||||
if (showRejected) LW << "Line " << processed << " rejected: " << e.what();
|
||||
rejected++;
|
||||
LW << "Unable to parse JSON on line " << currLine;
|
||||
continue;
|
||||
}
|
||||
|
||||
newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), EventSourceType::Import, "");
|
||||
|
||||
if (newEvents.size() >= 10'000) flushChanges();
|
||||
writer.write({ std::move(evJson), EventSourceType::Import, "" });
|
||||
writer.wait();
|
||||
}
|
||||
|
||||
flushChanges();
|
||||
writer.flush();
|
||||
|
||||
txn.commit();
|
||||
LI << "Done. Processed " << writer.totalProcessed << " lines. " << writer.totalWritten << " added, " << writer.totalRejected << " rejected, " << writer.totalDups << " dups";
|
||||
}
|
||||
|
Reference in New Issue
Block a user