backpressure for WriterPipeline

This commit is contained in:
Doug Hoyte
2023-05-20 00:42:25 -04:00
parent 2c6852ecf9
commit 89fe196242
2 changed files with 24 additions and 4 deletions

View File

@ -30,7 +30,10 @@ struct WriterPipeline {
std::mutex shutdownMutex; std::mutex shutdownMutex;
std::atomic<bool> shutdownRequested = false; std::atomic<bool> shutdownRequested = false;
std::atomic<bool> shutdownComplete = false; std::atomic<bool> shutdownComplete = false;
std::atomic<uint64_t> numLive = 0; std::atomic<uint64_t> numLive = 0;
std::condition_variable backpressureCv;
std::mutex backpressureMutex;
public: public:
WriterPipeline() { WriterPipeline() {
@ -44,7 +47,9 @@ struct WriterPipeline {
for (auto &m : msgs) { for (auto &m : msgs) {
if (m.eventJson.is_null()) { if (m.eventJson.is_null()) {
shutdownRequested = true;
writerInbox.push_move({}); writerInbox.push_move({});
shutdownCv.notify_all();
break; break;
} }
@ -55,6 +60,7 @@ struct WriterPipeline {
parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr); parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr);
} catch (std::exception &e) { } catch (std::exception &e) {
LW << "Rejected event: " << m.eventJson << " reason: " << e.what(); LW << "Rejected event: " << m.eventJson << " reason: " << e.what();
numLive--;
continue; continue;
} }
@ -106,6 +112,8 @@ struct WriterPipeline {
break; break;
} }
numLive--;
auto *flat = flatStrToFlatEvent(event.flatStr); auto *flat = flatStrToFlatEvent(event.flatStr);
if (lookupEventById(txn, sv(flat->id()))) { if (lookupEventById(txn, sv(flat->id()))) {
dups++; dups++;
@ -132,19 +140,30 @@ struct WriterPipeline {
if (written || dups) LI << "Writer: added: " << written << " dups: " << dups; if (written || dups) LI << "Writer: added: " << written << " dups: " << dups;
if (shutdownComplete) flushInbox.push_move(true); if (shutdownComplete) {
flushInbox.push_move(true);
if (numLive != 0) LW << "numLive was not 0 after shutdown!";
}
backpressureCv.notify_all();
} }
}); });
} }
void write(WriterPipelineInput &&inp) { void write(WriterPipelineInput &&inp) {
numLive++;
validatorInbox.push_move(std::move(inp)); validatorInbox.push_move(std::move(inp));
} }
void flush() { void flush() {
shutdownRequested = true;
validatorInbox.push_move({ tao::json::null, EventSourceType::None, "" }); validatorInbox.push_move({ tao::json::null, EventSourceType::None, "" });
shutdownCv.notify_all();
flushInbox.wait(); flushInbox.wait();
} }
void wait() {
uint64_t drainUntil = writeBatchSize * 2;
if (numLive < drainUntil) return;
std::unique_lock<std::mutex> lk(backpressureMutex);
backpressureCv.wait_for(lk, std::chrono::milliseconds(50), [&]{return numLive < drainUntil;});
}
}; };

View File

@ -131,6 +131,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
writer.write({ std::move(msg.at(2)), EventSourceType::Sync, url }); writer.write({ std::move(msg.at(2)), EventSourceType::Sync, url });
} else if (msg.at(0) == "EOSE") { } else if (msg.at(0) == "EOSE") {
inFlightDown = false; inFlightDown = false;
writer.wait();
} else { } else {
LW << "Unexpected message from relay: " << msg; LW << "Unexpected message from relay: " << msg;
} }
@ -139,7 +140,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
LW << "MSG: " << msgStr; LW << "MSG: " << msgStr;
} }
if (doUp && have.size() > 0 && inFlightUp < lowWaterUp) { if (doUp && have.size() > 0 && inFlightUp <= lowWaterUp) {
auto txn = env.txn_ro(); auto txn = env.txn_ro();
uint64_t numSent = 0; uint64_t numSent = 0;