mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-19 17:37:43 +00:00
sort batches of writes to reduce fragmentation
This commit is contained in:
@ -14,7 +14,7 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
|
|||||||
|
|
||||||
// Prepare messages
|
// Prepare messages
|
||||||
|
|
||||||
std::deque<EventToWrite> newEvents;
|
std::vector<EventToWrite> newEvents;
|
||||||
|
|
||||||
for (auto &newMsg : newMsgs) {
|
for (auto &newMsg : newMsgs) {
|
||||||
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
|
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
|
||||||
|
@ -70,7 +70,7 @@ struct WriterPipeline {
|
|||||||
// Collect a certain amount of records in a batch, push the rest back into the inbox
|
// Collect a certain amount of records in a batch, push the rest back into the inbox
|
||||||
// Pre-filter out dups in a read-only txn as an optimisation
|
// Pre-filter out dups in a read-only txn as an optimisation
|
||||||
|
|
||||||
std::deque<EventToWrite> newEventsToProc;
|
std::vector<EventToWrite> newEventsToProc;
|
||||||
|
|
||||||
{
|
{
|
||||||
auto txn = env.txn_ro();
|
auto txn = env.txn_ro();
|
||||||
|
@ -35,7 +35,7 @@ void cmd_import(const std::vector<std::string> &subArgs) {
|
|||||||
|
|
||||||
std::string line;
|
std::string line;
|
||||||
uint64_t processed = 0, added = 0, rejected = 0, dups = 0;
|
uint64_t processed = 0, added = 0, rejected = 0, dups = 0;
|
||||||
std::deque<EventToWrite> newEvents;
|
std::vector<EventToWrite> newEvents;
|
||||||
|
|
||||||
auto logStatus = [&]{
|
auto logStatus = [&]{
|
||||||
LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups";
|
LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups";
|
||||||
|
@ -166,23 +166,23 @@ std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId) {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque<EventToWrite> &evs) {
|
void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToWrite> &evs) {
|
||||||
|
std::sort(evs.begin(), evs.end(), [](auto &a, auto &b) { return a.quadKey < b.quadKey; });
|
||||||
|
|
||||||
auto changes = qdb.change();
|
auto changes = qdb.change();
|
||||||
|
|
||||||
std::vector<uint64_t> eventIdsToDelete;
|
std::vector<uint64_t> eventIdsToDelete;
|
||||||
std::set<quadrable::Key> seenAlready;
|
|
||||||
|
|
||||||
for (auto &ev : evs) {
|
for (size_t i = 0; i < evs.size(); i++) {
|
||||||
|
auto &ev = evs[i];
|
||||||
|
|
||||||
const NostrIndex::Event *flat = flatbuffers::GetRoot<NostrIndex::Event>(ev.flatStr.data());
|
const NostrIndex::Event *flat = flatbuffers::GetRoot<NostrIndex::Event>(ev.flatStr.data());
|
||||||
auto quadKey = flatEventToQuadrableKey(flat);
|
|
||||||
|
|
||||||
if (lookupEventById(txn, sv(flat->id())) || seenAlready.contains(quadKey)) {
|
if (lookupEventById(txn, sv(flat->id())) || (i != 0 && ev.quadKey == evs[i-1].quadKey)) {
|
||||||
ev.status = EventWriteStatus::Duplicate;
|
ev.status = EventWriteStatus::Duplicate;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
seenAlready.insert(quadKey);
|
|
||||||
|
|
||||||
if (env.lookup_Event__deletion(txn, std::string(sv(flat->id())) + std::string(sv(flat->pubkey())))) {
|
if (env.lookup_Event__deletion(txn, std::string(sv(flat->id())) + std::string(sv(flat->pubkey())))) {
|
||||||
ev.status = EventWriteStatus::Deleted;
|
ev.status = EventWriteStatus::Deleted;
|
||||||
continue;
|
continue;
|
||||||
@ -227,7 +227,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque<EventToWr
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (ev.status == EventWriteStatus::Pending) {
|
if (ev.status == EventWriteStatus::Pending) {
|
||||||
changes.put(quadKey, ev.jsonStr, &ev.nodeId);
|
changes.put(ev.quadKey, ev.jsonStr, &ev.nodeId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,9 +72,15 @@ struct EventToWrite {
|
|||||||
std::string jsonStr;
|
std::string jsonStr;
|
||||||
uint64_t receivedAt;
|
uint64_t receivedAt;
|
||||||
void *userData = nullptr;
|
void *userData = nullptr;
|
||||||
|
quadrable::Key quadKey;
|
||||||
uint64_t nodeId = 0;
|
uint64_t nodeId = 0;
|
||||||
EventWriteStatus status = EventWriteStatus::Pending;
|
EventWriteStatus status = EventWriteStatus::Pending;
|
||||||
|
|
||||||
|
EventToWrite(std::string flatStr, std::string jsonStr, uint64_t receivedAt, void *userData = nullptr) : flatStr(flatStr), jsonStr(jsonStr), receivedAt(receivedAt), userData(userData) {
|
||||||
|
const NostrIndex::Event *flat = flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
|
||||||
|
quadKey = flatEventToQuadrableKey(flat);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque<EventToWrite> &evs);
|
void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToWrite> &evs);
|
||||||
|
Reference in New Issue
Block a user