bugfixes in sync

This commit is contained in:
Doug Hoyte
2023-05-19 15:01:14 -04:00
parent af0fa71f26
commit 799da2c016
8 changed files with 64 additions and 36 deletions

View File

@ -16,17 +16,21 @@ struct WriterPipelineInput {
struct WriterPipeline {
public:
hoytech::protected_queue<WriterPipelineInput> inbox;
hoytech::protected_queue<bool> flushInbox;
uint64_t debounceDelayMilliseconds = 1'000;
uint64_t writeBatchSize = 1'000;
private:
hoytech::protected_queue<WriterPipelineInput> validatorInbox;
hoytech::protected_queue<EventToWrite> writerInbox;
hoytech::protected_queue<bool> flushInbox;
std::thread validatorThread;
std::thread writerThread;
std::condition_variable shutdownCv;
std::mutex shutdownMutex;
bool shutdown = false;
std::atomic<bool> shutdownRequested = false;
std::atomic<bool> shutdownComplete = false;
std::atomic<uint64_t> numLive = 0;
public:
WriterPipeline() {
@ -36,7 +40,7 @@ struct WriterPipeline {
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
while (1) {
auto msgs = inbox.pop_all();
auto msgs = validatorInbox.pop_all();
for (auto &m : msgs) {
if (m.eventJson.is_null()) {
@ -64,19 +68,21 @@ struct WriterPipeline {
while (1) {
// Debounce
writerInbox.wait();
{
std::unique_lock<std::mutex> lk(shutdownMutex);
shutdownCv.wait_for(lk, std::chrono::milliseconds(1'000), [&]{return shutdown;});
auto numPendingElems = writerInbox.wait();
if (!shutdownRequested && numPendingElems < writeBatchSize) {
std::unique_lock<std::mutex> lk(shutdownMutex);
shutdownCv.wait_for(lk, std::chrono::milliseconds(debounceDelayMilliseconds), [&]{return !!shutdownRequested;});
}
}
auto newEvents = writerInbox.pop_all();
bool flush = false;
uint64_t written = 0, dups = 0;
// Collect a certain amount of records in a batch, push the rest back into the inbox
// Collect a certain amount of records in a batch, push the rest back into the writerInbox
// Pre-filter out dups in a read-only txn as an optimisation
std::vector<EventToWrite> newEventsToProc;
@ -84,16 +90,19 @@ struct WriterPipeline {
{
auto txn = env.txn_ro();
for (auto &event : newEvents) {
if (newEventsToProc.size() > 1'000) {
// Put the rest back in the inbox
while (newEvents.size()) {
if (newEventsToProc.size() >= writeBatchSize) {
// Put the rest back in the writerInbox
writerInbox.unshift_move_all(newEvents);
newEvents.clear();
break;
}
auto event = std::move(newEvents.front());
newEvents.pop_front();
if (event.flatStr.size() == 0) {
flush = true;
shutdownComplete = true;
break;
}
@ -121,21 +130,21 @@ struct WriterPipeline {
}
}
LI << "Writer: added: " << written << " dups: " << dups;
if (written || dups) LI << "Writer: added: " << written << " dups: " << dups;
if (flush) flushInbox.push_move(true);
if (shutdownComplete) flushInbox.push_move(true);
}
});
}
void flush() {
{
std::lock_guard<std::mutex> lk(shutdownMutex);
shutdown = true;
}
shutdownCv.notify_all();
void write(WriterPipelineInput &&inp) {
validatorInbox.push_move(std::move(inp));
}
inbox.push_move({ tao::json::null, EventSourceType::None, "" });
void flush() {
shutdownRequested = true;
validatorInbox.push_move({ tao::json::null, EventSourceType::None, "" });
shutdownCv.notify_all();
flushInbox.wait();
}
};