mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-16 16:28:50 +00:00
bugfix: don't crash if we can't load an event that was deleted while a DBQuery scan was paused
This commit is contained in:
@ -4,6 +4,7 @@
|
||||
|
||||
#include "Subscription.h"
|
||||
#include "filters.h"
|
||||
#include "events.h"
|
||||
|
||||
|
||||
struct DBScan : NonCopyable {
|
||||
@ -233,11 +234,13 @@ struct DBScan : NonCopyable {
|
||||
refillScanDepth = 10 * initialScanDepth;
|
||||
}
|
||||
|
||||
bool scan(lmdb::txn &txn, std::function<bool(uint64_t)> handleEvent, std::function<bool(uint64_t)> doPause) {
|
||||
bool scan(lmdb::txn &txn, std::function<bool(uint64_t, std::string_view)> handleEvent, std::function<bool(uint64_t)> doPause) {
|
||||
auto cmp = [](auto &a, auto &b){
|
||||
return a.created() == b.created() ? a.levId() > b.levId() : a.created() > b.created();
|
||||
};
|
||||
|
||||
auto eventPayloadCursor = lmdb::cursor::open(txn, env.dbi_EventPayload);
|
||||
|
||||
while (1) {
|
||||
approxWork++;
|
||||
if (doPause(approxWork)) return false;
|
||||
@ -258,18 +261,24 @@ struct DBScan : NonCopyable {
|
||||
auto ev = eventQueue.front();
|
||||
eventQueue.pop_front();
|
||||
bool doSend = false;
|
||||
uint64_t levId = ev.levId();
|
||||
std::string_view eventPayload;
|
||||
|
||||
auto loadEventPayload = [&]{
|
||||
std::string_view key = lmdb::to_sv<uint64_t>(levId);
|
||||
return eventPayloadCursor.get(key, eventPayload, MDB_SET_KEY); // If not found, was deleted while scan was paused
|
||||
};
|
||||
|
||||
if (indexOnly) {
|
||||
if (f.doesMatchTimes(ev.created())) doSend = true;
|
||||
} else {
|
||||
if (!loadEventPayload()) doSend = false;
|
||||
} else if (loadEventPayload()) {
|
||||
approxWork += 10;
|
||||
auto view = env.lookup_Event(txn, ev.levId());
|
||||
if (!view) throw herr("missing event from index, corrupt DB?");
|
||||
if (f.doesMatch(view->flat_nested())) doSend = true;
|
||||
if (f.doesMatch(lookupEventByLevId(txn, levId).flat_nested())) doSend = true;
|
||||
}
|
||||
|
||||
if (doSend) {
|
||||
if (handleEvent(ev.levId())) return true;
|
||||
if (handleEvent(levId, eventPayload)) return true;
|
||||
}
|
||||
|
||||
cursors[ev.scanIndex()].outstanding--;
|
||||
@ -306,7 +315,7 @@ struct DBQuery : NonCopyable {
|
||||
DBQuery(const tao::json::value &filter, uint64_t maxLimit = MAX_U64) : sub(Subscription(1, ".", NostrFilterGroup::unwrapped(filter, maxLimit))) {}
|
||||
|
||||
// If scan is complete, returns true
|
||||
bool process(lmdb::txn &txn, std::function<void(const Subscription &, uint64_t)> cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) {
|
||||
bool process(lmdb::txn &txn, std::function<void(const Subscription &, uint64_t, std::string_view)> cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) {
|
||||
while (filterGroupIndex < sub.filterGroup.size()) {
|
||||
const auto &f = sub.filterGroup.filters[filterGroupIndex];
|
||||
|
||||
@ -314,13 +323,13 @@ struct DBQuery : NonCopyable {
|
||||
|
||||
uint64_t startTime = hoytech::curr_time_us();
|
||||
|
||||
bool complete = scanner->scan(txn, [&](uint64_t levId){
|
||||
bool complete = scanner->scan(txn, [&](uint64_t levId, std::string_view eventPayload){
|
||||
// If this event came in after our query began, don't send it. It will be sent after the EOSE.
|
||||
if (levId > sub.latestEventId) return false;
|
||||
|
||||
if (sentEventsFull.find(levId) == sentEventsFull.end()) {
|
||||
sentEventsFull.insert(levId);
|
||||
cb(sub, levId);
|
||||
cb(sub, levId, eventPayload);
|
||||
}
|
||||
|
||||
sentEventsCurr.insert(levId);
|
||||
|
@ -192,16 +192,14 @@ struct PluginWritePolicy {
|
||||
env.generic_foreachFull(txn, env.dbi_Event__receivedAt, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
||||
if (lmdb::from_sv<uint64_t>(k) > now) return false;
|
||||
|
||||
auto ev = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!ev) throw herr("unable to look up event, corrupt DB?");
|
||||
|
||||
auto sourceType = (EventSourceType)ev->sourceType();
|
||||
std::string_view sourceInfo = ev->sourceInfo();
|
||||
auto ev = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
auto sourceType = (EventSourceType)ev.sourceType();
|
||||
std::string_view sourceInfo = ev.sourceInfo();
|
||||
|
||||
auto request = tao::json::value({
|
||||
{ "type", "lookback" },
|
||||
{ "event", tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId)) },
|
||||
{ "receivedAt", ev->receivedAt() / 1000000 },
|
||||
{ "event", tao::json::from_string(getEventJson(txn, decomp, ev.primaryKeyId)) },
|
||||
{ "receivedAt", ev.receivedAt() / 1000000 },
|
||||
{ "sourceType", eventSourceTypeToStr(sourceType) },
|
||||
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },
|
||||
});
|
||||
|
@ -70,12 +70,8 @@ struct ActiveQueries : NonCopyable {
|
||||
return;
|
||||
}
|
||||
|
||||
auto cursor = lmdb::cursor::open(txn, env.dbi_EventPayload);
|
||||
|
||||
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId){
|
||||
std::string_view key = lmdb::to_sv<uint64_t>(levId), val;
|
||||
if (!cursor.get(key, val, MDB_SET_KEY)) throw herr("couldn't find event in EventPayload, corrupted DB?");
|
||||
server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, val, nullptr, nullptr));
|
||||
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
|
||||
server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr));
|
||||
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
|
||||
|
||||
if (complete) {
|
||||
|
@ -52,7 +52,7 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
|
||||
DBQuery query(tao::json::from_string(filterStr));
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.push_back(levId);
|
||||
}, MAX_U64, cfg().relay__logging__dbScanPerf);
|
||||
|
||||
|
@ -51,7 +51,7 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.insert(levId);
|
||||
});
|
||||
|
||||
|
@ -50,7 +50,7 @@ void cmd_dict(const std::vector<std::string> &subArgs) {
|
||||
DBQuery query(tao::json::from_string(filterStr));
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.push_back(levId);
|
||||
});
|
||||
|
||||
@ -77,7 +77,7 @@ void cmd_dict(const std::vector<std::string> &subArgs) {
|
||||
std::string_view raw;
|
||||
|
||||
bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), raw);
|
||||
if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?");
|
||||
if (!found) throw herr("couldn't find event in EventPayload");
|
||||
|
||||
uint32_t dictId;
|
||||
size_t outCompressedSize;
|
||||
|
@ -39,20 +39,19 @@ void cmd_export(const std::vector<std::string> &subArgs) {
|
||||
if (lmdb::from_sv<uint64_t>(k) > until) return false;
|
||||
}
|
||||
|
||||
auto view = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!view) throw herr("missing event from index, corrupt DB?");
|
||||
auto view = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
|
||||
if (dbVersion == 0) {
|
||||
std::string_view raw;
|
||||
bool found = qdb.dbi_nodesLeaf.get(txn, lmdb::to_sv<uint64_t>(view->primaryKeyId), raw);
|
||||
if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?");
|
||||
bool found = qdb.dbi_nodesLeaf.get(txn, lmdb::to_sv<uint64_t>(view.primaryKeyId), raw);
|
||||
if (!found) throw herr("couldn't find leaf node in quadrable table");
|
||||
std::cout << raw.substr(8 + 32 + 32) << "\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!includeEphemeral && isEphemeralEvent(view->flat_nested()->kind())) return true;
|
||||
if (!includeEphemeral && isEphemeralEvent(view.flat_nested()->kind())) return true;
|
||||
|
||||
std::cout << getEventJson(txn, decomp, view->primaryKeyId) << "\n";
|
||||
std::cout << getEventJson(txn, decomp, view.primaryKeyId) << "\n";
|
||||
|
||||
return true;
|
||||
}, reverse);
|
||||
|
@ -35,9 +35,9 @@ void cmd_scan(const std::vector<std::string> &subArgs) {
|
||||
uint64_t numEvents = 0;
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
|
||||
if (count) numEvents++;
|
||||
else std::cout << getEventJson(txn, decomp, levId) << "\n";
|
||||
else std::cout << getEventJson(txn, decomp, levId, eventPayload) << "\n";
|
||||
}, pause ? pause : MAX_U64, metrics);
|
||||
|
||||
if (complete) break;
|
||||
|
@ -156,7 +156,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.push_back(levId);
|
||||
});
|
||||
|
||||
|
@ -170,6 +170,12 @@ std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn
|
||||
return output;
|
||||
}
|
||||
|
||||
defaultDb::environment::View_Event lookupEventByLevId(lmdb::txn &txn, uint64_t levId) {
|
||||
auto view = env.lookup_Event(txn, levId);
|
||||
if (!view) throw herr("unable to lookup event by levId");
|
||||
return *view;
|
||||
}
|
||||
|
||||
uint64_t getMostRecentLevId(lmdb::txn &txn) {
|
||||
uint64_t levId = 0;
|
||||
|
||||
@ -210,12 +216,16 @@ std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::s
|
||||
// Return result only valid until one of: next call to getEventJson/decodeEventPayload, write to/closing of txn, or any action on decomp object
|
||||
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId) {
|
||||
std::string_view raw;
|
||||
std::string_view eventPayload;
|
||||
|
||||
bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), raw);
|
||||
if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?");
|
||||
bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), eventPayload);
|
||||
if (!found) throw herr("couldn't find event in EventPayload");
|
||||
|
||||
return decodeEventPayload(txn, decomp, raw, nullptr, nullptr);
|
||||
return getEventJson(txn, decomp, levId, eventPayload);
|
||||
}
|
||||
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId, std::string_view eventPayload) {
|
||||
return decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr);
|
||||
}
|
||||
|
||||
|
||||
@ -258,10 +268,9 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
|
||||
ParsedKey_StringUint64Uint64 parsedKey(k);
|
||||
if (parsedKey.s == sv(flat->pubkey()) && parsedKey.n1 == flat->kind()) {
|
||||
if (parsedKey.n2 < flat->created_at()) {
|
||||
auto otherEv = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!otherEv) throw herr("missing event from index, corrupt DB?");
|
||||
if (logLevel >= 1) LI << "Deleting event (replaceable). id=" << to_hex(sv(otherEv->flat_nested()->id()));
|
||||
deleteEvent(txn, changes, *otherEv);
|
||||
auto otherEv = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (logLevel >= 1) LI << "Deleting event (replaceable). id=" << to_hex(sv(otherEv.flat_nested()->id()));
|
||||
deleteEvent(txn, changes, otherEv);
|
||||
} else {
|
||||
ev.status = EventWriteStatus::Replaced;
|
||||
}
|
||||
@ -285,12 +294,11 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
|
||||
env.generic_foreachFull(txn, env.dbi_Event__replace, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) {
|
||||
ParsedKey_StringUint64 parsedKey(k);
|
||||
if (parsedKey.s == searchStr && parsedKey.n == flat->kind()) {
|
||||
auto otherEv = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!otherEv) throw herr("missing event from index, corrupt DB?");
|
||||
auto otherEv = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
|
||||
if (otherEv->flat_nested()->created_at() < flat->created_at()) {
|
||||
if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv->flat_nested()->id()));
|
||||
deleteEvent(txn, changes, *otherEv);
|
||||
if (otherEv.flat_nested()->created_at() < flat->created_at()) {
|
||||
if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv.flat_nested()->id()));
|
||||
deleteEvent(txn, changes, otherEv);
|
||||
} else {
|
||||
ev.status = EventWriteStatus::Replaced;
|
||||
}
|
||||
|
@ -45,9 +45,11 @@ inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) {
|
||||
|
||||
|
||||
std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn, std::string_view id);
|
||||
defaultDb::environment::View_Event lookupEventByLevId(lmdb::txn &txn, uint64_t levId); // throws if can't find
|
||||
uint64_t getMostRecentLevId(lmdb::txn &txn);
|
||||
std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::string_view raw, uint32_t *outDictId, size_t *outCompressedSize);
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId);
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId, std::string_view eventPayload);
|
||||
|
||||
inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) {
|
||||
uint64_t timestamp = flat->created_at();
|
||||
|
Reference in New Issue
Block a user