quadrable node gc, clean-up writes, consolidate deletion

This commit is contained in:
Doug Hoyte
2023-01-25 15:16:39 -05:00
parent eac2095c83
commit 485abee8ed
5 changed files with 66 additions and 34 deletions

View File

@ -195,7 +195,7 @@ std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::s
}
}
// Return result only valid until on of: next call to getEventJson/decodeEventPayload, write on or closing of txn, or any action on decomp object
// Return result only valid until one of: next call to getEventJson/decodeEventPayload, write to/closing of txn, or any action on decomp object
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId) {
std::string_view raw;
@ -209,12 +209,20 @@ std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t lev
void deleteEvent(lmdb::txn &txn, quadrable::Quadrable::UpdateSet &changes, defaultDb::environment::View_Event &ev) {
changes.del(flatEventToQuadrableKey(ev.flat_nested()));
env.dbi_EventPayload.del(txn, lmdb::to_sv<uint64_t>(ev.primaryKeyId));
env.delete_Event(txn, ev.primaryKeyId);
}
void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToWrite> &evs) {
std::sort(evs.begin(), evs.end(), [](auto &a, auto &b) { return a.quadKey < b.quadKey; });
auto changes = qdb.change();
std::vector<uint64_t> levIdsToDelete;
std::string tmpBuf;
for (size_t i = 0; i < evs.size(); i++) {
auto &ev = evs[i];
@ -250,8 +258,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
if (otherLevId) {
auto otherEv = env.lookup_Event(txn, otherLevId);
if (!otherEv) throw herr("missing event from index, corrupt DB?");
changes.del(flatEventToQuadrableKey(otherEv->flat_nested()));
levIdsToDelete.push_back(otherLevId);
deleteEvent(txn, changes, *otherEv);
}
}
@ -262,26 +269,12 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
auto otherEv = lookupEventById(txn, sv(tagPair->val()));
if (otherEv && sv(otherEv->flat_nested()->pubkey()) == sv(flat->pubkey())) {
LI << "Deleting event. id=" << to_hex(sv(tagPair->val()));
changes.del(flatEventToQuadrableKey(otherEv->flat_nested()));
levIdsToDelete.push_back(otherEv->primaryKeyId);
deleteEvent(txn, changes, *otherEv);
}
}
}
}
if (ev.status == EventWriteStatus::Pending) changes.put(ev.quadKey, "");
}
changes.apply(txn);
for (auto levId : levIdsToDelete) {
env.delete_Event(txn, levId);
env.dbi_EventPayload.del(txn, lmdb::to_sv<uint64_t>(levId));
}
std::string tmpBuf;
for (auto &ev : evs) {
if (ev.status == EventWriteStatus::Pending) {
ev.levId = env.insert_Event(txn, ev.receivedAt, ev.flatStr);
@ -290,7 +283,11 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
tmpBuf += ev.jsonStr;
env.dbi_EventPayload.put(txn, lmdb::to_sv<uint64_t>(ev.levId), tmpBuf);
changes.put(ev.quadKey, "");
ev.status = EventWriteStatus::Written;
}
}
changes.apply(txn);
}