diff --git a/src/RelayWriter.cpp b/src/RelayWriter.cpp index a082487..d241552 100644 --- a/src/RelayWriter.cpp +++ b/src/RelayWriter.cpp @@ -14,7 +14,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { // Prepare messages - std::deque newEvents; + std::vector newEvents; for (auto &newMsg : newMsgs) { if (auto msg = std::get_if(&newMsg.msg)) { diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index 21d1851..85a81fa 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -70,7 +70,7 @@ struct WriterPipeline { // Collect a certain amount of records in a batch, push the rest back into the inbox // Pre-filter out dups in a read-only txn as an optimisation - std::deque newEventsToProc; + std::vector newEventsToProc; { auto txn = env.txn_ro(); diff --git a/src/cmd_import.cpp b/src/cmd_import.cpp index 3a84636..f7e2de5 100644 --- a/src/cmd_import.cpp +++ b/src/cmd_import.cpp @@ -35,7 +35,7 @@ void cmd_import(const std::vector &subArgs) { std::string line; uint64_t processed = 0, added = 0, rejected = 0, dups = 0; - std::deque newEvents; + std::vector newEvents; auto logStatus = [&]{ LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups"; diff --git a/src/events.cpp b/src/events.cpp index a7dbd6f..759aa49 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -166,23 +166,23 @@ std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId) { -void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque &evs) { +void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector &evs) { + std::sort(evs.begin(), evs.end(), [](auto &a, auto &b) { return a.quadKey < b.quadKey; }); + auto changes = qdb.change(); std::vector eventIdsToDelete; - std::set seenAlready; - for (auto &ev : evs) { + for (size_t i = 0; i < evs.size(); i++) { + auto &ev = evs[i]; + const NostrIndex::Event *flat = flatbuffers::GetRoot(ev.flatStr.data()); - auto quadKey = flatEventToQuadrableKey(flat); - if (lookupEventById(txn, sv(flat->id())) || seenAlready.contains(quadKey)) { + if (lookupEventById(txn, sv(flat->id())) || (i != 0 && ev.quadKey == evs[i-1].quadKey)) { ev.status = EventWriteStatus::Duplicate; continue; } - seenAlready.insert(quadKey); - if (env.lookup_Event__deletion(txn, std::string(sv(flat->id())) + std::string(sv(flat->pubkey())))) { ev.status = EventWriteStatus::Deleted; continue; @@ -227,7 +227,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque(flatStr.data()); + quadKey = flatEventToQuadrableKey(flat); + } }; -void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque &evs); +void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector &evs);