From 6218ca3334a7a85da533e0e11623e477ea86c3d7 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Mon, 27 Feb 2023 06:41:50 -0500 Subject: [PATCH] bugfix: don't crash if we can't load an event that was deleted while a DBQuery scan was paused --- src/DBQuery.h | 27 ++++++++++++++++++--------- src/PluginWritePolicy.h | 12 +++++------- src/RelayReqWorker.cpp | 8 ++------ src/RelayYesstr.cpp | 2 +- src/cmd_delete.cpp | 2 +- src/cmd_dict.cpp | 4 ++-- src/cmd_export.cpp | 11 +++++------ src/cmd_scan.cpp | 4 ++-- src/cmd_sync.cpp | 2 +- src/events.cpp | 34 +++++++++++++++++++++------------- src/events.h | 2 ++ 11 files changed, 60 insertions(+), 48 deletions(-) diff --git a/src/DBQuery.h b/src/DBQuery.h index 0083fcf..6a76d03 100644 --- a/src/DBQuery.h +++ b/src/DBQuery.h @@ -4,6 +4,7 @@ #include "Subscription.h" #include "filters.h" +#include "events.h" struct DBScan : NonCopyable { @@ -233,11 +234,13 @@ struct DBScan : NonCopyable { refillScanDepth = 10 * initialScanDepth; } - bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { + bool scan(lmdb::txn &txn, std::function handleEvent, std::function doPause) { auto cmp = [](auto &a, auto &b){ return a.created() == b.created() ? a.levId() > b.levId() : a.created() > b.created(); }; + auto eventPayloadCursor = lmdb::cursor::open(txn, env.dbi_EventPayload); + while (1) { approxWork++; if (doPause(approxWork)) return false; @@ -258,18 +261,24 @@ struct DBScan : NonCopyable { auto ev = eventQueue.front(); eventQueue.pop_front(); bool doSend = false; + uint64_t levId = ev.levId(); + std::string_view eventPayload; + + auto loadEventPayload = [&]{ + std::string_view key = lmdb::to_sv(levId); + return eventPayloadCursor.get(key, eventPayload, MDB_SET_KEY); // If not found, was deleted while scan was paused + }; if (indexOnly) { if (f.doesMatchTimes(ev.created())) doSend = true; - } else { + if (!loadEventPayload()) doSend = false; + } else if (loadEventPayload()) { approxWork += 10; - auto view = env.lookup_Event(txn, ev.levId()); - if (!view) throw herr("missing event from index, corrupt DB?"); - if (f.doesMatch(view->flat_nested())) doSend = true; + if (f.doesMatch(lookupEventByLevId(txn, levId).flat_nested())) doSend = true; } if (doSend) { - if (handleEvent(ev.levId())) return true; + if (handleEvent(levId, eventPayload)) return true; } cursors[ev.scanIndex()].outstanding--; @@ -306,7 +315,7 @@ struct DBQuery : NonCopyable { DBQuery(const tao::json::value &filter, uint64_t maxLimit = MAX_U64) : sub(Subscription(1, ".", NostrFilterGroup::unwrapped(filter, maxLimit))) {} // If scan is complete, returns true - bool process(lmdb::txn &txn, std::function cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) { + bool process(lmdb::txn &txn, std::function cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) { while (filterGroupIndex < sub.filterGroup.size()) { const auto &f = sub.filterGroup.filters[filterGroupIndex]; @@ -314,13 +323,13 @@ struct DBQuery : NonCopyable { uint64_t startTime = hoytech::curr_time_us(); - bool complete = scanner->scan(txn, [&](uint64_t levId){ + bool complete = scanner->scan(txn, [&](uint64_t levId, std::string_view eventPayload){ // If this event came in after our query began, don't send it. It will be sent after the EOSE. if (levId > sub.latestEventId) return false; if (sentEventsFull.find(levId) == sentEventsFull.end()) { sentEventsFull.insert(levId); - cb(sub, levId); + cb(sub, levId, eventPayload); } sentEventsCurr.insert(levId); diff --git a/src/PluginWritePolicy.h b/src/PluginWritePolicy.h index 9b80f1e..5230927 100644 --- a/src/PluginWritePolicy.h +++ b/src/PluginWritePolicy.h @@ -192,16 +192,14 @@ struct PluginWritePolicy { env.generic_foreachFull(txn, env.dbi_Event__receivedAt, lmdb::to_sv(start), lmdb::to_sv(0), [&](auto k, auto v) { if (lmdb::from_sv(k) > now) return false; - auto ev = env.lookup_Event(txn, lmdb::from_sv(v)); - if (!ev) throw herr("unable to look up event, corrupt DB?"); - - auto sourceType = (EventSourceType)ev->sourceType(); - std::string_view sourceInfo = ev->sourceInfo(); + auto ev = lookupEventByLevId(txn, lmdb::from_sv(v)); + auto sourceType = (EventSourceType)ev.sourceType(); + std::string_view sourceInfo = ev.sourceInfo(); auto request = tao::json::value({ { "type", "lookback" }, - { "event", tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId)) }, - { "receivedAt", ev->receivedAt() / 1000000 }, + { "event", tao::json::from_string(getEventJson(txn, decomp, ev.primaryKeyId)) }, + { "receivedAt", ev.receivedAt() / 1000000 }, { "sourceType", eventSourceTypeToStr(sourceType) }, { "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo }, }); diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index 0e9c2dc..f504bbd 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -70,12 +70,8 @@ struct ActiveQueries : NonCopyable { return; } - auto cursor = lmdb::cursor::open(txn, env.dbi_EventPayload); - - bool complete = q->process(txn, [&](const auto &sub, uint64_t levId){ - std::string_view key = lmdb::to_sv(levId), val; - if (!cursor.get(key, val, MDB_SET_KEY)) throw herr("couldn't find event in EventPayload, corrupted DB?"); - server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, val, nullptr, nullptr)); + bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ + server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr)); }, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf); if (complete) { diff --git a/src/RelayYesstr.cpp b/src/RelayYesstr.cpp index 1969924..5217cbb 100644 --- a/src/RelayYesstr.cpp +++ b/src/RelayYesstr.cpp @@ -52,7 +52,7 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { DBQuery query(tao::json::from_string(filterStr)); while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ levIds.push_back(levId); }, MAX_U64, cfg().relay__logging__dbScanPerf); diff --git a/src/cmd_delete.cpp b/src/cmd_delete.cpp index 11f2481..8532ec9 100644 --- a/src/cmd_delete.cpp +++ b/src/cmd_delete.cpp @@ -51,7 +51,7 @@ void cmd_delete(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ levIds.insert(levId); }); diff --git a/src/cmd_dict.cpp b/src/cmd_dict.cpp index 58b7104..15dc652 100644 --- a/src/cmd_dict.cpp +++ b/src/cmd_dict.cpp @@ -50,7 +50,7 @@ void cmd_dict(const std::vector &subArgs) { DBQuery query(tao::json::from_string(filterStr)); while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ levIds.push_back(levId); }); @@ -77,7 +77,7 @@ void cmd_dict(const std::vector &subArgs) { std::string_view raw; bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), raw); - if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?"); + if (!found) throw herr("couldn't find event in EventPayload"); uint32_t dictId; size_t outCompressedSize; diff --git a/src/cmd_export.cpp b/src/cmd_export.cpp index 2e62621..1031674 100644 --- a/src/cmd_export.cpp +++ b/src/cmd_export.cpp @@ -39,20 +39,19 @@ void cmd_export(const std::vector &subArgs) { if (lmdb::from_sv(k) > until) return false; } - auto view = env.lookup_Event(txn, lmdb::from_sv(v)); - if (!view) throw herr("missing event from index, corrupt DB?"); + auto view = lookupEventByLevId(txn, lmdb::from_sv(v)); if (dbVersion == 0) { std::string_view raw; - bool found = qdb.dbi_nodesLeaf.get(txn, lmdb::to_sv(view->primaryKeyId), raw); - if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?"); + bool found = qdb.dbi_nodesLeaf.get(txn, lmdb::to_sv(view.primaryKeyId), raw); + if (!found) throw herr("couldn't find leaf node in quadrable table"); std::cout << raw.substr(8 + 32 + 32) << "\n"; return true; } - if (!includeEphemeral && isEphemeralEvent(view->flat_nested()->kind())) return true; + if (!includeEphemeral && isEphemeralEvent(view.flat_nested()->kind())) return true; - std::cout << getEventJson(txn, decomp, view->primaryKeyId) << "\n"; + std::cout << getEventJson(txn, decomp, view.primaryKeyId) << "\n"; return true; }, reverse); diff --git a/src/cmd_scan.cpp b/src/cmd_scan.cpp index 8c73eb7..ecc053b 100644 --- a/src/cmd_scan.cpp +++ b/src/cmd_scan.cpp @@ -35,9 +35,9 @@ void cmd_scan(const std::vector &subArgs) { uint64_t numEvents = 0; while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){ if (count) numEvents++; - else std::cout << getEventJson(txn, decomp, levId) << "\n"; + else std::cout << getEventJson(txn, decomp, levId, eventPayload) << "\n"; }, pause ? pause : MAX_U64, metrics); if (complete) break; diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index 4473182..778c494 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -156,7 +156,7 @@ void cmd_sync(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){ levIds.push_back(levId); }); diff --git a/src/events.cpp b/src/events.cpp index c5725be..69f9aac 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -170,6 +170,12 @@ std::optional lookupEventById(lmdb::txn &txn return output; } +defaultDb::environment::View_Event lookupEventByLevId(lmdb::txn &txn, uint64_t levId) { + auto view = env.lookup_Event(txn, levId); + if (!view) throw herr("unable to lookup event by levId"); + return *view; +} + uint64_t getMostRecentLevId(lmdb::txn &txn) { uint64_t levId = 0; @@ -210,12 +216,16 @@ std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::s // Return result only valid until one of: next call to getEventJson/decodeEventPayload, write to/closing of txn, or any action on decomp object std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId) { - std::string_view raw; + std::string_view eventPayload; - bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), raw); - if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?"); + bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), eventPayload); + if (!found) throw herr("couldn't find event in EventPayload"); - return decodeEventPayload(txn, decomp, raw, nullptr, nullptr); + return getEventJson(txn, decomp, levId, eventPayload); +} + +std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId, std::string_view eventPayload) { + return decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr); } @@ -258,10 +268,9 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vectorpubkey()) && parsedKey.n1 == flat->kind()) { if (parsedKey.n2 < flat->created_at()) { - auto otherEv = env.lookup_Event(txn, lmdb::from_sv(v)); - if (!otherEv) throw herr("missing event from index, corrupt DB?"); - if (logLevel >= 1) LI << "Deleting event (replaceable). id=" << to_hex(sv(otherEv->flat_nested()->id())); - deleteEvent(txn, changes, *otherEv); + auto otherEv = lookupEventByLevId(txn, lmdb::from_sv(v)); + if (logLevel >= 1) LI << "Deleting event (replaceable). id=" << to_hex(sv(otherEv.flat_nested()->id())); + deleteEvent(txn, changes, otherEv); } else { ev.status = EventWriteStatus::Replaced; } @@ -285,12 +294,11 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector(MAX_U64), [&](auto k, auto v) { ParsedKey_StringUint64 parsedKey(k); if (parsedKey.s == searchStr && parsedKey.n == flat->kind()) { - auto otherEv = env.lookup_Event(txn, lmdb::from_sv(v)); - if (!otherEv) throw herr("missing event from index, corrupt DB?"); + auto otherEv = lookupEventByLevId(txn, lmdb::from_sv(v)); - if (otherEv->flat_nested()->created_at() < flat->created_at()) { - if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv->flat_nested()->id())); - deleteEvent(txn, changes, *otherEv); + if (otherEv.flat_nested()->created_at() < flat->created_at()) { + if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv.flat_nested()->id())); + deleteEvent(txn, changes, otherEv); } else { ev.status = EventWriteStatus::Replaced; } diff --git a/src/events.h b/src/events.h index a59ac07..1bf0711 100644 --- a/src/events.h +++ b/src/events.h @@ -45,9 +45,11 @@ inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) { std::optional lookupEventById(lmdb::txn &txn, std::string_view id); +defaultDb::environment::View_Event lookupEventByLevId(lmdb::txn &txn, uint64_t levId); // throws if can't find uint64_t getMostRecentLevId(lmdb::txn &txn); std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::string_view raw, uint32_t *outDictId, size_t *outCompressedSize); std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId); +std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId, std::string_view eventPayload); inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) { uint64_t timestamp = flat->created_at();