DB schema refactor

This commit is contained in:
Doug Hoyte
2023-01-22 03:22:28 -05:00
parent 27398fe54a
commit ec9161ce08
14 changed files with 97 additions and 86 deletions

2
golpe

Submodule golpe updated: 7705174669...511e70d7b9

View File

@ -13,20 +13,14 @@ includes: |
tables: tables:
## DB meta-data. Single entry, with id = 1 ## DB meta-data. Single entry, with id = 1
Meta: Meta:
tableId: 2
fields: fields:
- name: dbVersion - name: dbVersion
- name: endianness - name: endianness
## Stored nostr events ## Meta-info of nostr events, suitable for indexing
## Primary key is auto-incremented, called "levId" for Local EVent ID
Event: Event:
tableId: 1
primaryKey: quadId
fields: fields:
- name: quadId
- name: receivedAt # microseconds - name: receivedAt # microseconds
- name: flat - name: flat
type: ubytes type: ubytes
@ -72,6 +66,20 @@ tables:
if (flat->kind() == 5 && tagName == 'e') deletion.push_back(std::string(tagVal) + std::string(sv(flat->pubkey()))); if (flat->kind() == 5 && tagName == 'e') deletion.push_back(std::string(tagVal) + std::string(sv(flat->pubkey())));
} }
CompressionDictionary:
fields:
- name: dict
type: ubytes
tablesRaw:
## Raw nostr event JSON, possibly compressed
## keys are levIds
## vals are prefixed with a type byte:
## 0: no compression, payload follows
## 1: zstd compression. Followed by Dictionary ID (native endian uint32) then compressed payload
EventPayload:
flags: 'MDB_INTEGERKEY'
config: config:
- name: db - name: db
desc: "Directory that contains strfry database" desc: "Directory that contains strfry database"

View File

@ -232,18 +232,18 @@ struct DBScan {
} }
bool sent = false; bool sent = false;
uint64_t quadId = lmdb::from_sv<uint64_t>(v); uint64_t levId = lmdb::from_sv<uint64_t>(v);
if (f.indexOnlyScans) { if (f.indexOnlyScans) {
if (f.doesMatchTimes(created)) { if (f.doesMatchTimes(created)) {
handleEvent(quadId); handleEvent(levId);
sent = true; sent = true;
} }
} else { } else {
auto view = env.lookup_Event(txn, quadId); auto view = env.lookup_Event(txn, levId);
if (!view) throw herr("missing event from index, corrupt DB?"); if (!view) throw herr("missing event from index, corrupt DB?");
if (f.doesMatch(view->flat_nested())) { if (f.doesMatch(view->flat_nested())) {
handleEvent(quadId); handleEvent(levId);
sent = true; sent = true;
} }
} }
@ -298,16 +298,16 @@ struct DBScanQuery : NonCopyable {
while (filterGroupIndex < sub.filterGroup.size()) { while (filterGroupIndex < sub.filterGroup.size()) {
if (!scanner) scanner = std::make_unique<DBScan>(sub.filterGroup.filters[filterGroupIndex]); if (!scanner) scanner = std::make_unique<DBScan>(sub.filterGroup.filters[filterGroupIndex]);
bool complete = scanner->scan(txn, [&](uint64_t quadId){ bool complete = scanner->scan(txn, [&](uint64_t levId){
// If this event came in after our query began, don't send it. It will be sent after the EOSE. // If this event came in after our query began, don't send it. It will be sent after the EOSE.
if (quadId > sub.latestEventId) return; if (levId > sub.latestEventId) return;
// We already sent this event // We already sent this event
if (alreadySentEvents.find(quadId) != alreadySentEvents.end()) return; if (alreadySentEvents.find(levId) != alreadySentEvents.end()) return;
alreadySentEvents.insert(quadId); alreadySentEvents.insert(levId);
currScanRecordsFound++; currScanRecordsFound++;
cb(sub, quadId); cb(sub, levId);
}, [&]{ }, [&]{
currScanRecordsTraversed++; currScanRecordsTraversed++;
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;

View File

@ -2,17 +2,12 @@
void RelayServer::cleanupOldEvents() { void RelayServer::cleanupOldEvents() {
struct EventDel { std::vector<uint64_t> expiredLevIds;
uint64_t nodeId;
uint64_t deletedNodeId;
};
std::vector<EventDel> expiredEvents;
{ {
auto txn = env.txn_ro(); auto txn = env.txn_ro();
auto mostRecent = getMostRecentEventId(txn); auto mostRecent = getMostRecentLevId(txn);
uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds; uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds;
uint64_t currKind = 20'000; uint64_t currKind = 20'000;
@ -31,10 +26,10 @@ void RelayServer::cleanupOldEvents() {
return false; return false;
} }
uint64_t nodeId = lmdb::from_sv<uint64_t>(v); uint64_t levId = lmdb::from_sv<uint64_t>(v);
if (nodeId != mostRecent) { // prevent nodeId re-use if (levId != mostRecent) { // prevent levId re-use
expiredEvents.emplace_back(nodeId, 0); expiredLevIds.emplace_back(levId);
} }
return true; return true;
@ -44,29 +39,30 @@ void RelayServer::cleanupOldEvents() {
} }
} }
if (expiredEvents.size() > 0) { if (expiredLevIds.size() > 0) {
LI << "Deleting " << expiredEvents.size() << " ephemeral events";
auto txn = env.txn_rw(); auto txn = env.txn_rw();
quadrable::Quadrable qdb; quadrable::Quadrable qdb;
qdb.init(txn); qdb.init(txn);
qdb.checkout("events"); qdb.checkout("events");
uint64_t numDeleted = 0;
auto changes = qdb.change(); auto changes = qdb.change();
for (auto &e : expiredEvents) { for (auto levId : expiredLevIds) {
auto view = env.lookup_Event(txn, e.nodeId); auto view = env.lookup_Event(txn, levId);
if (!view) throw herr("missing event from index, corrupt DB?"); if (!view) continue; // Deleted in between transactions
changes.del(flatEventToQuadrableKey(view->flat_nested()), &e.deletedNodeId);
numDeleted++;
changes.del(flatEventToQuadrableKey(view->flat_nested()));
env.delete_Event(txn, levId);
env.dbi_EventPayload.del(txn, lmdb::to_sv<uint64_t>(levId));
} }
changes.apply(txn); changes.apply(txn);
for (auto &e : expiredEvents) {
if (e.deletedNodeId) env.delete_Event(txn, e.nodeId);
}
txn.commit(); txn.commit();
if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events";
} }
} }

View File

@ -22,7 +22,7 @@ void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
auto txn = env.txn_ro(); auto txn = env.txn_ro();
uint64_t latestEventId = getMostRecentEventId(txn); uint64_t latestEventId = getMostRecentLevId(txn);
if (currEventId > latestEventId) currEventId = latestEventId; if (currEventId > latestEventId) currEventId = latestEventId;
for (auto &newMsg : newMsgs) { for (auto &newMsg : newMsgs) {
@ -44,8 +44,8 @@ void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
monitors.closeConn(msg->connId); monitors.closeConn(msg->connId);
} else if (std::get_if<MsgReqMonitor::DBChange>(&newMsg.msg)) { } else if (std::get_if<MsgReqMonitor::DBChange>(&newMsg.msg)) {
env.foreach_Event(txn, [&](auto &ev){ env.foreach_Event(txn, [&](auto &ev){
monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t quadId){ monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){
sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, quadId))); sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, levId)));
}); });
return true; return true;
}, false, currEventId + 1); }, false, currEventId + 1);

View File

@ -9,7 +9,7 @@ struct ActiveQueries : NonCopyable {
std::deque<DBScanQuery*> running; std::deque<DBScanQuery*> running;
void addSub(lmdb::txn &txn, Subscription &&sub) { void addSub(lmdb::txn &txn, Subscription &&sub) {
sub.latestEventId = getMostRecentEventId(txn); sub.latestEventId = getMostRecentLevId(txn);
{ {
auto *existing = findQuery(sub.connId, sub.subId); auto *existing = findQuery(sub.connId, sub.subId);
@ -63,8 +63,8 @@ struct ActiveQueries : NonCopyable {
return; return;
} }
bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){ bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){
server->sendEvent(sub.connId, sub.subId, getEventJson(txn, quadId)); server->sendEvent(sub.connId, sub.subId, getEventJson(txn, levId));
}); });
if (complete) { if (complete) {

View File

@ -37,7 +37,7 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
bool written = false; bool written = false;
if (newEvent.status == EventWriteStatus::Written) { if (newEvent.status == EventWriteStatus::Written) {
LI << "Inserted event. id=" << eventIdHex << " qdbNodeId=" << newEvent.nodeId; LI << "Inserted event. id=" << eventIdHex << " levId=" << newEvent.levId;
written = true; written = true;
} else if (newEvent.status == EventWriteStatus::Duplicate) { } else if (newEvent.status == EventWriteStatus::Duplicate) {
message = "duplicate: have this event"; message = "duplicate: have this event";

View File

@ -53,20 +53,20 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
// FIXME: The following blocks the whole thread for the query duration. Should interleave it // FIXME: The following blocks the whole thread for the query duration. Should interleave it
// with other requests like RelayReqWorker does. // with other requests like RelayReqWorker does.
std::vector<uint64_t> quadEventIds; std::vector<uint64_t> levIds;
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr)); auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
Subscription sub(1, "junkSub", filterGroup); Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub); DBScanQuery query(sub);
while (1) { while (1) {
bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t quadId){ bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){
quadEventIds.push_back(quadId); levIds.push_back(levId);
}); });
if (complete) break; if (complete) break;
} }
LI << "Filter matched " << quadEventIds.size() << " local events"; LI << "Filter matched " << levIds.size() << " local events";
qdb->withMemStore(s.m, [&]{ qdb->withMemStore(s.m, [&]{
qdb->writeToMemStore = true; qdb->writeToMemStore = true;
@ -74,8 +74,8 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
auto changes = qdb->change(); auto changes = qdb->change();
for (auto id : quadEventIds) { for (auto levId : levIds) {
changes.putReuse(txn, id); changes.putReuse(txn, levId);
} }
changes.apply(txn); changes.apply(txn);

View File

@ -54,10 +54,10 @@ void cmd_monitor(const std::vector<std::string> &subArgs) {
} }
env.foreach_Event(txn, [&](auto &ev){ env.foreach_Event(txn, [&](auto &ev){
monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t quadId){ monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){
for (auto &r : recipients) { for (auto &r : recipients) {
if (r.connId == interestConnId && r.subId.str() == interestSubId) { if (r.connId == interestConnId && r.subId.str() == interestSubId) {
std::cout << getEventJson(txn, quadId) << "\n"; std::cout << getEventJson(txn, levId) << "\n";
} }
} }
}); });

View File

@ -35,8 +35,8 @@ void cmd_scan(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro(); auto txn = env.txn_ro();
while (1) { while (1) {
bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t quadId){ bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t levId){
std::cout << getEventJson(txn, quadId) << "\n"; std::cout << getEventJson(txn, levId) << "\n";
}); });
if (complete) break; if (complete) break;

View File

@ -80,7 +80,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
{ {
auto txn = env.txn_ro(); auto txn = env.txn_ro();
currEventId = getMostRecentEventId(txn); currEventId = getMostRecentLevId(txn);
} }
ws.onTrigger = [&]{ ws.onTrigger = [&]{

View File

@ -148,7 +148,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
if (filterStr.size()) { if (filterStr.size()) {
std::vector<uint64_t> quadEventIds; std::vector<uint64_t> levIds;
tao::json::value filterJson; tao::json::value filterJson;
@ -167,14 +167,14 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro(); auto txn = env.txn_ro();
while (1) { while (1) {
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t quadId){ bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){
quadEventIds.push_back(quadId); levIds.push_back(levId);
}); });
if (complete) break; if (complete) break;
} }
LI << "Filter matched " << quadEventIds.size() << " local events"; LI << "Filter matched " << levIds.size() << " local events";
controller = std::make_unique<SyncController>(&qdb, &ws); controller = std::make_unique<SyncController>(&qdb, &ws);
@ -184,8 +184,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
auto changes = qdb.change(); auto changes = qdb.change();
for (auto id : quadEventIds) { for (auto levId : levIds) {
changes.putReuse(txn, id); changes.putReuse(txn, levId);
} }
changes.apply(txn); changes.apply(txn);

View File

@ -158,22 +158,22 @@ std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn
return output; return output;
} }
uint64_t getMostRecentEventId(lmdb::txn &txn) { uint64_t getMostRecentLevId(lmdb::txn &txn) {
uint64_t output = 0; uint64_t levId = 0;
env.foreach_Event(txn, [&](auto &ev){ env.foreach_Event(txn, [&](auto &ev){
output = ev.primaryKeyId; levId = ev.primaryKeyId;
return false; return false;
}, true); }, true);
return output; return levId;
} }
std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId) { std::string_view getEventJson(lmdb::txn &txn, uint64_t levId) {
std::string_view raw; std::string_view raw;
bool found = env.dbiQuadrable_nodesLeaf.get(txn, lmdb::to_sv<uint64_t>(quadId), raw); bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), raw);
if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?"); if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?");
return raw.substr(8 + 32 + 32); return raw.substr(1);
} }
@ -183,7 +183,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
auto changes = qdb.change(); auto changes = qdb.change();
std::vector<uint64_t> eventIdsToDelete; std::vector<uint64_t> levIdsToDelete;
for (size_t i = 0; i < evs.size(); i++) { for (size_t i = 0; i < evs.size(); i++) {
auto &ev = evs[i]; auto &ev = evs[i];
@ -202,13 +202,13 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
if (isReplaceableEvent(flat->kind())) { if (isReplaceableEvent(flat->kind())) {
auto searchKey = makeKey_StringUint64Uint64(sv(flat->pubkey()), flat->kind(), MAX_U64); auto searchKey = makeKey_StringUint64Uint64(sv(flat->pubkey()), flat->kind(), MAX_U64);
uint64_t otherEventId = 0; uint64_t otherLevId = 0;
env.generic_foreachFull(txn, env.dbi_Event__pubkeyKind, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) { env.generic_foreachFull(txn, env.dbi_Event__pubkeyKind, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) {
ParsedKey_StringUint64Uint64 parsedKey(k); ParsedKey_StringUint64Uint64 parsedKey(k);
if (parsedKey.s == sv(flat->pubkey()) && parsedKey.n1 == flat->kind()) { if (parsedKey.s == sv(flat->pubkey()) && parsedKey.n1 == flat->kind()) {
if (parsedKey.n2 < flat->created_at()) { if (parsedKey.n2 < flat->created_at()) {
otherEventId = lmdb::from_sv<uint64_t>(v); otherLevId = lmdb::from_sv<uint64_t>(v);
} else { } else {
ev.status = EventWriteStatus::Replaced; ev.status = EventWriteStatus::Replaced;
} }
@ -216,11 +216,11 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
return false; return false;
}, true); }, true);
if (otherEventId) { if (otherLevId) {
auto otherEv = env.lookup_Event(txn, otherEventId); auto otherEv = env.lookup_Event(txn, otherLevId);
if (!otherEv) throw herr("missing event from index, corrupt DB?"); if (!otherEv) throw herr("missing event from index, corrupt DB?");
changes.del(flatEventToQuadrableKey(otherEv->flat_nested())); changes.del(flatEventToQuadrableKey(otherEv->flat_nested()));
eventIdsToDelete.push_back(otherEventId); levIdsToDelete.push_back(otherLevId);
} }
} }
@ -232,26 +232,33 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
if (otherEv && sv(otherEv->flat_nested()->pubkey()) == sv(flat->pubkey())) { if (otherEv && sv(otherEv->flat_nested()->pubkey()) == sv(flat->pubkey())) {
LI << "Deleting event. id=" << to_hex(sv(tagPair->val())); LI << "Deleting event. id=" << to_hex(sv(tagPair->val()));
changes.del(flatEventToQuadrableKey(otherEv->flat_nested())); changes.del(flatEventToQuadrableKey(otherEv->flat_nested()));
eventIdsToDelete.push_back(otherEv->primaryKeyId); levIdsToDelete.push_back(otherEv->primaryKeyId);
} }
} }
} }
} }
if (ev.status == EventWriteStatus::Pending) { if (ev.status == EventWriteStatus::Pending) changes.put(ev.quadKey, "");
changes.put(ev.quadKey, ev.jsonStr, &ev.nodeId);
}
} }
changes.apply(txn); changes.apply(txn);
for (auto eventId : eventIdsToDelete) { for (auto levId : levIdsToDelete) {
env.delete_Event(txn, eventId); env.delete_Event(txn, levId);
env.dbi_EventPayload.del(txn, lmdb::to_sv<uint64_t>(levId));
} }
std::string tmpBuf;
for (auto &ev : evs) { for (auto &ev : evs) {
if (ev.status == EventWriteStatus::Pending) { if (ev.status == EventWriteStatus::Pending) {
env.insert_Event(txn, ev.nodeId, ev.receivedAt, ev.flatStr); ev.levId = env.insert_Event(txn, ev.receivedAt, ev.flatStr);
tmpBuf.clear();
tmpBuf += '\x00';
tmpBuf += ev.jsonStr;
env.dbi_EventPayload.put(txn, lmdb::to_sv<uint64_t>(ev.levId), tmpBuf);
ev.status = EventWriteStatus::Written; ev.status = EventWriteStatus::Written;
} }
} }

View File

@ -45,8 +45,8 @@ inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) {
std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn, std::string_view id); std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn, std::string_view id);
uint64_t getMostRecentEventId(lmdb::txn &txn); uint64_t getMostRecentLevId(lmdb::txn &txn);
std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId); std::string_view getEventJson(lmdb::txn &txn, uint64_t levId);
inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) { inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) {
return quadrable::Key::fromIntegerAndHash(flat->created_at(), sv(flat->id()).substr(0, 23)); return quadrable::Key::fromIntegerAndHash(flat->created_at(), sv(flat->id()).substr(0, 23));
@ -72,8 +72,8 @@ struct EventToWrite {
uint64_t receivedAt; uint64_t receivedAt;
void *userData = nullptr; void *userData = nullptr;
quadrable::Key quadKey; quadrable::Key quadKey;
uint64_t nodeId = 0;
EventWriteStatus status = EventWriteStatus::Pending; EventWriteStatus status = EventWriteStatus::Pending;
uint64_t levId = 0;
EventToWrite() {} EventToWrite() {}