diff --git a/golpe b/golpe index 7705174..511e70d 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit 770517466971574ed0ac9e821ef0db3b43b9f6fb +Subproject commit 511e70d7b9a295d0861884ee0b368605bfe460c3 diff --git a/golpe.yaml b/golpe.yaml index d551cb0..0babc3d 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -13,20 +13,14 @@ includes: | tables: ## DB meta-data. Single entry, with id = 1 Meta: - tableId: 2 - fields: - name: dbVersion - name: endianness - ## Stored nostr events + ## Meta-info of nostr events, suitable for indexing + ## Primary key is auto-incremented, called "levId" for Local EVent ID Event: - tableId: 1 - - primaryKey: quadId - fields: - - name: quadId - name: receivedAt # microseconds - name: flat type: ubytes @@ -72,6 +66,20 @@ tables: if (flat->kind() == 5 && tagName == 'e') deletion.push_back(std::string(tagVal) + std::string(sv(flat->pubkey()))); } + CompressionDictionary: + fields: + - name: dict + type: ubytes + +tablesRaw: + ## Raw nostr event JSON, possibly compressed + ## keys are levIds + ## vals are prefixed with a type byte: + ## 0: no compression, payload follows + ## 1: zstd compression. Followed by Dictionary ID (native endian uint32) then compressed payload + EventPayload: + flags: 'MDB_INTEGERKEY' + config: - name: db desc: "Directory that contains strfry database" diff --git a/src/DBScan.h b/src/DBScan.h index 00fb479..a6ebd86 100644 --- a/src/DBScan.h +++ b/src/DBScan.h @@ -232,18 +232,18 @@ struct DBScan { } bool sent = false; - uint64_t quadId = lmdb::from_sv(v); + uint64_t levId = lmdb::from_sv(v); if (f.indexOnlyScans) { if (f.doesMatchTimes(created)) { - handleEvent(quadId); + handleEvent(levId); sent = true; } } else { - auto view = env.lookup_Event(txn, quadId); + auto view = env.lookup_Event(txn, levId); if (!view) throw herr("missing event from index, corrupt DB?"); if (f.doesMatch(view->flat_nested())) { - handleEvent(quadId); + handleEvent(levId); sent = true; } } @@ -298,16 +298,16 @@ struct DBScanQuery : NonCopyable { while (filterGroupIndex < sub.filterGroup.size()) { if (!scanner) scanner = std::make_unique(sub.filterGroup.filters[filterGroupIndex]); - bool complete = scanner->scan(txn, [&](uint64_t quadId){ + bool complete = scanner->scan(txn, [&](uint64_t levId){ // If this event came in after our query began, don't send it. It will be sent after the EOSE. - if (quadId > sub.latestEventId) return; + if (levId > sub.latestEventId) return; // We already sent this event - if (alreadySentEvents.find(quadId) != alreadySentEvents.end()) return; - alreadySentEvents.insert(quadId); + if (alreadySentEvents.find(levId) != alreadySentEvents.end()) return; + alreadySentEvents.insert(levId); currScanRecordsFound++; - cb(sub, quadId); + cb(sub, levId); }, [&]{ currScanRecordsTraversed++; return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; diff --git a/src/RelayCron.cpp b/src/RelayCron.cpp index c0da185..1f11cd0 100644 --- a/src/RelayCron.cpp +++ b/src/RelayCron.cpp @@ -2,17 +2,12 @@ void RelayServer::cleanupOldEvents() { - struct EventDel { - uint64_t nodeId; - uint64_t deletedNodeId; - }; - - std::vector expiredEvents; + std::vector expiredLevIds; { auto txn = env.txn_ro(); - auto mostRecent = getMostRecentEventId(txn); + auto mostRecent = getMostRecentLevId(txn); uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds; uint64_t currKind = 20'000; @@ -31,10 +26,10 @@ void RelayServer::cleanupOldEvents() { return false; } - uint64_t nodeId = lmdb::from_sv(v); + uint64_t levId = lmdb::from_sv(v); - if (nodeId != mostRecent) { // prevent nodeId re-use - expiredEvents.emplace_back(nodeId, 0); + if (levId != mostRecent) { // prevent levId re-use + expiredLevIds.emplace_back(levId); } return true; @@ -44,29 +39,30 @@ void RelayServer::cleanupOldEvents() { } } - if (expiredEvents.size() > 0) { - LI << "Deleting " << expiredEvents.size() << " ephemeral events"; - + if (expiredLevIds.size() > 0) { auto txn = env.txn_rw(); quadrable::Quadrable qdb; qdb.init(txn); qdb.checkout("events"); + uint64_t numDeleted = 0; auto changes = qdb.change(); - for (auto &e : expiredEvents) { - auto view = env.lookup_Event(txn, e.nodeId); - if (!view) throw herr("missing event from index, corrupt DB?"); - changes.del(flatEventToQuadrableKey(view->flat_nested()), &e.deletedNodeId); + for (auto levId : expiredLevIds) { + auto view = env.lookup_Event(txn, levId); + if (!view) continue; // Deleted in between transactions + + numDeleted++; + changes.del(flatEventToQuadrableKey(view->flat_nested())); + env.delete_Event(txn, levId); + env.dbi_EventPayload.del(txn, lmdb::to_sv(levId)); } changes.apply(txn); - for (auto &e : expiredEvents) { - if (e.deletedNodeId) env.delete_Event(txn, e.nodeId); - } - txn.commit(); + + if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events"; } } diff --git a/src/RelayReqMonitor.cpp b/src/RelayReqMonitor.cpp index 717cb9c..7c513ab 100644 --- a/src/RelayReqMonitor.cpp +++ b/src/RelayReqMonitor.cpp @@ -22,7 +22,7 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { auto txn = env.txn_ro(); - uint64_t latestEventId = getMostRecentEventId(txn); + uint64_t latestEventId = getMostRecentLevId(txn); if (currEventId > latestEventId) currEventId = latestEventId; for (auto &newMsg : newMsgs) { @@ -44,8 +44,8 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { monitors.closeConn(msg->connId); } else if (std::get_if(&newMsg.msg)) { env.foreach_Event(txn, [&](auto &ev){ - monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t quadId){ - sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, quadId))); + monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){ + sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, levId))); }); return true; }, false, currEventId + 1); diff --git a/src/RelayReqWorker.cpp b/src/RelayReqWorker.cpp index 16391ea..32e1a3b 100644 --- a/src/RelayReqWorker.cpp +++ b/src/RelayReqWorker.cpp @@ -9,7 +9,7 @@ struct ActiveQueries : NonCopyable { std::deque running; void addSub(lmdb::txn &txn, Subscription &&sub) { - sub.latestEventId = getMostRecentEventId(txn); + sub.latestEventId = getMostRecentLevId(txn); { auto *existing = findQuery(sub.connId, sub.subId); @@ -63,8 +63,8 @@ struct ActiveQueries : NonCopyable { return; } - bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){ - server->sendEvent(sub.connId, sub.subId, getEventJson(txn, quadId)); + bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){ + server->sendEvent(sub.connId, sub.subId, getEventJson(txn, levId)); }); if (complete) { diff --git a/src/RelayWriter.cpp b/src/RelayWriter.cpp index d241552..10c60f5 100644 --- a/src/RelayWriter.cpp +++ b/src/RelayWriter.cpp @@ -37,7 +37,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { bool written = false; if (newEvent.status == EventWriteStatus::Written) { - LI << "Inserted event. id=" << eventIdHex << " qdbNodeId=" << newEvent.nodeId; + LI << "Inserted event. id=" << eventIdHex << " levId=" << newEvent.levId; written = true; } else if (newEvent.status == EventWriteStatus::Duplicate) { message = "duplicate: have this event"; diff --git a/src/RelayYesstr.cpp b/src/RelayYesstr.cpp index 9eabc4c..a81cf7c 100644 --- a/src/RelayYesstr.cpp +++ b/src/RelayYesstr.cpp @@ -53,20 +53,20 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { // FIXME: The following blocks the whole thread for the query duration. Should interleave it // with other requests like RelayReqWorker does. - std::vector quadEventIds; + std::vector levIds; auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr)); Subscription sub(1, "junkSub", filterGroup); DBScanQuery query(sub); while (1) { - bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){ - quadEventIds.push_back(quadId); + bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){ + levIds.push_back(levId); }); if (complete) break; } - LI << "Filter matched " << quadEventIds.size() << " local events"; + LI << "Filter matched " << levIds.size() << " local events"; qdb->withMemStore(s.m, [&]{ qdb->writeToMemStore = true; @@ -74,8 +74,8 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { auto changes = qdb->change(); - for (auto id : quadEventIds) { - changes.putReuse(txn, id); + for (auto levId : levIds) { + changes.putReuse(txn, levId); } changes.apply(txn); diff --git a/src/cmd_monitor.cpp b/src/cmd_monitor.cpp index 03d3c00..91ad0c8 100644 --- a/src/cmd_monitor.cpp +++ b/src/cmd_monitor.cpp @@ -54,10 +54,10 @@ void cmd_monitor(const std::vector &subArgs) { } env.foreach_Event(txn, [&](auto &ev){ - monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t quadId){ + monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){ for (auto &r : recipients) { if (r.connId == interestConnId && r.subId.str() == interestSubId) { - std::cout << getEventJson(txn, quadId) << "\n"; + std::cout << getEventJson(txn, levId) << "\n"; } } }); diff --git a/src/cmd_scan.cpp b/src/cmd_scan.cpp index 96b0700..10efce0 100644 --- a/src/cmd_scan.cpp +++ b/src/cmd_scan.cpp @@ -35,8 +35,8 @@ void cmd_scan(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t quadId){ - std::cout << getEventJson(txn, quadId) << "\n"; + bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t levId){ + std::cout << getEventJson(txn, levId) << "\n"; }); if (complete) break; diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index 49f6ff1..ef54212 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -80,7 +80,7 @@ void cmd_stream(const std::vector &subArgs) { { auto txn = env.txn_ro(); - currEventId = getMostRecentEventId(txn); + currEventId = getMostRecentLevId(txn); } ws.onTrigger = [&]{ diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index d6723f1..0bc3e85 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -148,7 +148,7 @@ void cmd_sync(const std::vector &subArgs) { if (filterStr.size()) { - std::vector quadEventIds; + std::vector levIds; tao::json::value filterJson; @@ -167,14 +167,14 @@ void cmd_sync(const std::vector &subArgs) { auto txn = env.txn_ro(); while (1) { - bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t quadId){ - quadEventIds.push_back(quadId); + bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){ + levIds.push_back(levId); }); if (complete) break; } - LI << "Filter matched " << quadEventIds.size() << " local events"; + LI << "Filter matched " << levIds.size() << " local events"; controller = std::make_unique(&qdb, &ws); @@ -184,8 +184,8 @@ void cmd_sync(const std::vector &subArgs) { auto changes = qdb.change(); - for (auto id : quadEventIds) { - changes.putReuse(txn, id); + for (auto levId : levIds) { + changes.putReuse(txn, levId); } changes.apply(txn); diff --git a/src/events.cpp b/src/events.cpp index 4e68df6..52dc3e5 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -158,22 +158,22 @@ std::optional lookupEventById(lmdb::txn &txn return output; } -uint64_t getMostRecentEventId(lmdb::txn &txn) { - uint64_t output = 0; +uint64_t getMostRecentLevId(lmdb::txn &txn) { + uint64_t levId = 0; env.foreach_Event(txn, [&](auto &ev){ - output = ev.primaryKeyId; + levId = ev.primaryKeyId; return false; }, true); - return output; + return levId; } -std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId) { +std::string_view getEventJson(lmdb::txn &txn, uint64_t levId) { std::string_view raw; - bool found = env.dbiQuadrable_nodesLeaf.get(txn, lmdb::to_sv(quadId), raw); + bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), raw); if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?"); - return raw.substr(8 + 32 + 32); + return raw.substr(1); } @@ -183,7 +183,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector eventIdsToDelete; + std::vector levIdsToDelete; for (size_t i = 0; i < evs.size(); i++) { auto &ev = evs[i]; @@ -202,13 +202,13 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vectorkind())) { auto searchKey = makeKey_StringUint64Uint64(sv(flat->pubkey()), flat->kind(), MAX_U64); - uint64_t otherEventId = 0; + uint64_t otherLevId = 0; env.generic_foreachFull(txn, env.dbi_Event__pubkeyKind, searchKey, lmdb::to_sv(MAX_U64), [&](auto k, auto v) { ParsedKey_StringUint64Uint64 parsedKey(k); if (parsedKey.s == sv(flat->pubkey()) && parsedKey.n1 == flat->kind()) { if (parsedKey.n2 < flat->created_at()) { - otherEventId = lmdb::from_sv(v); + otherLevId = lmdb::from_sv(v); } else { ev.status = EventWriteStatus::Replaced; } @@ -216,11 +216,11 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vectorflat_nested())); - eventIdsToDelete.push_back(otherEventId); + levIdsToDelete.push_back(otherLevId); } } @@ -232,26 +232,33 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vectorflat_nested()->pubkey()) == sv(flat->pubkey())) { LI << "Deleting event. id=" << to_hex(sv(tagPair->val())); changes.del(flatEventToQuadrableKey(otherEv->flat_nested())); - eventIdsToDelete.push_back(otherEv->primaryKeyId); + levIdsToDelete.push_back(otherEv->primaryKeyId); } } } } - if (ev.status == EventWriteStatus::Pending) { - changes.put(ev.quadKey, ev.jsonStr, &ev.nodeId); - } + if (ev.status == EventWriteStatus::Pending) changes.put(ev.quadKey, ""); } changes.apply(txn); - for (auto eventId : eventIdsToDelete) { - env.delete_Event(txn, eventId); + for (auto levId : levIdsToDelete) { + env.delete_Event(txn, levId); + env.dbi_EventPayload.del(txn, lmdb::to_sv(levId)); } + std::string tmpBuf; + for (auto &ev : evs) { if (ev.status == EventWriteStatus::Pending) { - env.insert_Event(txn, ev.nodeId, ev.receivedAt, ev.flatStr); + ev.levId = env.insert_Event(txn, ev.receivedAt, ev.flatStr); + + tmpBuf.clear(); + tmpBuf += '\x00'; + tmpBuf += ev.jsonStr; + env.dbi_EventPayload.put(txn, lmdb::to_sv(ev.levId), tmpBuf); + ev.status = EventWriteStatus::Written; } } diff --git a/src/events.h b/src/events.h index 8d73531..d6529b8 100644 --- a/src/events.h +++ b/src/events.h @@ -45,8 +45,8 @@ inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) { std::optional lookupEventById(lmdb::txn &txn, std::string_view id); -uint64_t getMostRecentEventId(lmdb::txn &txn); -std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId); +uint64_t getMostRecentLevId(lmdb::txn &txn); +std::string_view getEventJson(lmdb::txn &txn, uint64_t levId); inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) { return quadrable::Key::fromIntegerAndHash(flat->created_at(), sv(flat->id()).substr(0, 23)); @@ -72,8 +72,8 @@ struct EventToWrite { uint64_t receivedAt; void *userData = nullptr; quadrable::Key quadKey; - uint64_t nodeId = 0; EventWriteStatus status = EventWriteStatus::Pending; + uint64_t levId = 0; EventToWrite() {}