diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index 1efdbaa..374f45b 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -30,7 +30,10 @@ struct WriterPipeline { std::mutex shutdownMutex; std::atomic shutdownRequested = false; std::atomic shutdownComplete = false; + std::atomic numLive = 0; + std::condition_variable backpressureCv; + std::mutex backpressureMutex; public: WriterPipeline() { @@ -44,7 +47,9 @@ struct WriterPipeline { for (auto &m : msgs) { if (m.eventJson.is_null()) { + shutdownRequested = true; writerInbox.push_move({}); + shutdownCv.notify_all(); break; } @@ -55,6 +60,7 @@ struct WriterPipeline { parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr); } catch (std::exception &e) { LW << "Rejected event: " << m.eventJson << " reason: " << e.what(); + numLive--; continue; } @@ -106,6 +112,8 @@ struct WriterPipeline { break; } + numLive--; + auto *flat = flatStrToFlatEvent(event.flatStr); if (lookupEventById(txn, sv(flat->id()))) { dups++; @@ -132,19 +140,30 @@ struct WriterPipeline { if (written || dups) LI << "Writer: added: " << written << " dups: " << dups; - if (shutdownComplete) flushInbox.push_move(true); + if (shutdownComplete) { + flushInbox.push_move(true); + if (numLive != 0) LW << "numLive was not 0 after shutdown!"; + } + + backpressureCv.notify_all(); } }); } void write(WriterPipelineInput &&inp) { + numLive++; validatorInbox.push_move(std::move(inp)); } void flush() { - shutdownRequested = true; validatorInbox.push_move({ tao::json::null, EventSourceType::None, "" }); - shutdownCv.notify_all(); flushInbox.wait(); } + + void wait() { + uint64_t drainUntil = writeBatchSize * 2; + if (numLive < drainUntil) return; + std::unique_lock lk(backpressureMutex); + backpressureCv.wait_for(lk, std::chrono::milliseconds(50), [&]{return numLive < drainUntil;}); + } }; diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index 37f2029..f22c3b7 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -131,6 +131,7 @@ void cmd_sync(const std::vector &subArgs) { writer.write({ std::move(msg.at(2)), EventSourceType::Sync, url }); } else if (msg.at(0) == "EOSE") { inFlightDown = false; + writer.wait(); } else { LW << "Unexpected message from relay: " << msg; } @@ -139,7 +140,7 @@ void cmd_sync(const std::vector &subArgs) { LW << "MSG: " << msgStr; } - if (doUp && have.size() > 0 && inFlightUp < lowWaterUp) { + if (doUp && have.size() > 0 && inFlightUp <= lowWaterUp) { auto txn = env.txn_ro(); uint64_t numSent = 0;