From 058c97f856c33f0b223d050b284100f7f72f9b17 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Thu, 29 Aug 2024 20:18:05 -0400 Subject: [PATCH] use custom packing for indexable data: PackedEvent --- fbs/nostr-index.fbs | 28 ------- golpe.yaml | 45 ++++------- src/ActiveMonitors.h | 24 +++--- src/DBQuery.h | 2 +- src/PackedEvent.h | 91 ++++++++++++++++++++++ src/WriterPipeline.h | 12 +-- src/apps/mesh/cmd_router.cpp | 2 +- src/apps/mesh/cmd_stream.cpp | 2 +- src/apps/mesh/cmd_sync.cpp | 3 +- src/apps/relay/RelayCron.cpp | 2 +- src/apps/relay/RelayIngester.cpp | 14 ++-- src/apps/relay/RelayNegentropy.cpp | 3 +- src/apps/relay/RelayReqMonitor.cpp | 2 +- src/apps/relay/RelayServer.h | 2 +- src/apps/relay/RelayWriter.cpp | 14 ++-- src/events.cpp | 118 +++++++++++------------------ src/events.h | 24 +++--- src/filters.h | 32 +++----- 18 files changed, 212 insertions(+), 208 deletions(-) delete mode 100644 fbs/nostr-index.fbs create mode 100644 src/PackedEvent.h diff --git a/fbs/nostr-index.fbs b/fbs/nostr-index.fbs deleted file mode 100644 index 580e6d8..0000000 --- a/fbs/nostr-index.fbs +++ /dev/null @@ -1,28 +0,0 @@ -namespace NostrIndex; - -struct Fixed32Bytes { - val: [ubyte:32]; -} - -table TagGeneral { - key: uint8; - val: [ubyte]; -} - -table TagFixed32 { - key: uint8; - val: Fixed32Bytes; -} - -table Event { - id: Fixed32Bytes; - pubkey: Fixed32Bytes; - created_at: uint64; - kind: uint64; - tagsGeneral: [TagGeneral]; - tagsFixed32: [TagFixed32]; - expiration: uint64; -} - -table Empty {} -root_type Empty; diff --git a/golpe.yaml b/golpe.yaml index 0e9ad07..e4aac02 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -5,17 +5,11 @@ features: onAppStartup: true db: true customLMDBSetup: true - flatbuffers: true websockets: true templar: true -flatBuffers: | - include "../fbs/nostr-index.fbs"; - includes: | - inline std::string_view sv(const NostrIndex::Fixed32Bytes *f) { - return std::string_view((const char *)f->val()->data(), 32); - } + #include "PackedEvent.h" tables: ## DB meta-data. Single entry, with id = 1 @@ -29,9 +23,8 @@ tables: Event: fields: - name: receivedAt # microseconds - - name: flat + - name: packed type: ubytes - nestedFlat: NostrIndex.Event - name: sourceType - name: sourceInfo type: ubytes @@ -61,36 +54,30 @@ tables: multi: true indexPrelude: | - auto *flat = v.flat_nested(); - created_at = flat->created_at(); + PackedEventView packed(v.packed()); + created_at = packed.created_at(); uint64_t indexTime = *created_at; receivedAt = v.receivedAt(); - id = makeKey_StringUint64(sv(flat->id()), indexTime); - pubkey = makeKey_StringUint64(sv(flat->pubkey()), indexTime); - kind = makeKey_Uint64Uint64(flat->kind(), indexTime); - pubkeyKind = makeKey_StringUint64Uint64(sv(flat->pubkey()), flat->kind(), indexTime); - - for (const auto &tagPair : *(flat->tagsGeneral())) { - auto tagName = (char)tagPair->key(); - auto tagVal = sv(tagPair->val()); + id = makeKey_StringUint64(packed.id(), indexTime); + pubkey = makeKey_StringUint64(packed.pubkey(), indexTime); + kind = makeKey_Uint64Uint64(packed.kind(), indexTime); + pubkeyKind = makeKey_StringUint64Uint64(packed.pubkey(), packed.kind(), indexTime); + packed.foreachTag([&](char tagName, std::string_view tagVal){ tag.push_back(makeKey_StringUint64(std::string(1, tagName) + std::string(tagVal), indexTime)); if (tagName == 'd' && replace.size() == 0) { - replace.push_back(makeKey_StringUint64(std::string(sv(flat->pubkey())) + std::string(tagVal), flat->kind())); + replace.push_back(makeKey_StringUint64(std::string(packed.pubkey()) + std::string(tagVal), packed.kind())); + } else if (tagName == 'e' && packed.kind() == 5) { + deletion.push_back(std::string(tagVal) + std::string(packed.pubkey())); } - } - for (const auto &tagPair : *(flat->tagsFixed32())) { - auto tagName = (char)tagPair->key(); - auto tagVal = sv(tagPair->val()); - tag.push_back(makeKey_StringUint64(std::string(1, tagName) + std::string(tagVal), indexTime)); - if (flat->kind() == 5 && tagName == 'e') deletion.push_back(std::string(tagVal) + std::string(sv(flat->pubkey()))); - } + return true; + }); - if (flat->expiration() != 0) { - expiration.push_back(flat->expiration()); + if (packed.expiration() != 0) { + expiration.push_back(packed.expiration()); } CompressionDictionary: diff --git a/src/ActiveMonitors.h b/src/ActiveMonitors.h index 700fb11..62e1d68 100644 --- a/src/ActiveMonitors.h +++ b/src/ActiveMonitors.h @@ -92,7 +92,7 @@ struct ActiveMonitors : NonCopyable { if (item.latestEventId >= ev.primaryKeyId || item.mon->sub.latestEventId >= ev.primaryKeyId) continue; item.latestEventId = ev.primaryKeyId; - if (f->doesMatch(ev.flat_nested())) { + if (f->doesMatch(PackedEventView(ev.packed()))) { recipients.emplace_back(item.mon->sub.connId, item.mon->sub.subId); item.mon->sub.latestEventId = ev.primaryKeyId; continue; @@ -124,38 +124,32 @@ struct ActiveMonitors : NonCopyable { } }; - auto *flat = ev.flat_nested(); + auto packed = PackedEventView(ev.packed()); { - auto id = std::string(sv(flat->id())); + auto id = std::string(packed.id()); processMonitorsPrefix(allIds, id, static_cast>([&](const std::string &val){ return id.starts_with(val); })); } { - auto pubkey = std::string(sv(flat->pubkey())); + auto pubkey = std::string(packed.pubkey()); processMonitorsPrefix(allAuthors, pubkey, static_cast>([&](const std::string &val){ return pubkey.starts_with(val); })); } - for (const auto &tag : *flat->tagsFixed32()) { - auto &tagSpec = getTagSpec(tag->key(), sv(tag->val())); + packed.foreachTag([&](char tagName, std::string_view tagVal){ + auto &tagSpec = getTagSpec(tagName, tagVal); processMonitorsExact(allTags, tagSpec, static_cast>([&](const std::string &val){ return tagSpec == val; })); - } - - for (const auto &tag : *flat->tagsGeneral()) { - auto &tagSpec = getTagSpec(tag->key(), sv(tag->val())); - processMonitorsExact(allTags, tagSpec, static_cast>([&](const std::string &val){ - return tagSpec == val; - })); - } + return true; + }); { - auto kind = flat->kind(); + auto kind = packed.kind(); processMonitorsExact(allKinds, kind, static_cast>([&](const uint64_t &val){ return kind == val; })); diff --git a/src/DBQuery.h b/src/DBQuery.h index cd6cacc..3a662d6 100644 --- a/src/DBQuery.h +++ b/src/DBQuery.h @@ -266,7 +266,7 @@ struct DBScan : NonCopyable { } else { approxWork += 10; auto view = env.lookup_Event(txn, levId); - if (view && f.doesMatch(view->flat_nested())) doSend = true; + if (view && f.doesMatch(PackedEventView(view->packed()))) doSend = true; } if (doSend) { diff --git a/src/PackedEvent.h b/src/PackedEvent.h new file mode 100644 index 0000000..9ad2054 --- /dev/null +++ b/src/PackedEvent.h @@ -0,0 +1,91 @@ +#pragma once + +#include + +#include "golpe.h" + + +// PackedEvent (summary of indexable data in a nostr event) +// 0: id (32) +// 32: pubkey (32) +// 64: created_at (8) +// 72: kind (8) +// 80: expiration (8) +// 88: tags[] (variable) +// +// each tag: +// 0: tag char (1) +// 1: length (1) +// 2: value (variable) + +struct PackedEventView { + std::string_view buf; + + PackedEventView(const std::string &str) : buf(std::string_view(str)) { + if (buf.size() < 88) throw hoytech::error("PackedEventView too short"); + } + + PackedEventView(std::string_view sv) : buf(sv) { + if (buf.size() < 88) throw hoytech::error("PackedEventView too short"); + } + + std::string_view id() const { + return buf.substr(0, 32); + } + + std::string_view pubkey() const { + return buf.substr(32, 32); + } + + uint64_t created_at() const { + return lmdb::from_sv(buf.substr(64, 8)); + } + + uint64_t kind() const { + return lmdb::from_sv(buf.substr(72, 8)); + } + + uint64_t expiration() const { + return lmdb::from_sv(buf.substr(80, 8)); + } + + void foreachTag(const std::function &cb) { + std::string_view b = buf.substr(88); + + while (b.size()) { + bool done = cb(b[0], b.substr(2, (size_t)b[1])); + if (done) break; + b = b.substr(2 + b[1]); + } + } +}; + +struct PackedEventTagBuilder { + std::string buf; + + void add(char tagKey, std::string_view tagVal) { + if (tagVal.size() > 255) throw hoytech::error("tagVal too long"); + + buf += tagKey; + buf += (unsigned char) tagVal.size(); + buf += tagVal; + } +}; + +struct PackedEventBuilder { + std::string buf; + + PackedEventBuilder(std::string_view id, std::string_view pubkey, uint64_t created_at, uint64_t kind, uint64_t expiration, const PackedEventTagBuilder &tagBuilder) { + if (id.size() != 32) throw hoytech::error("unexpected id size"); + if (pubkey.size() != 32) throw hoytech::error("unexpected pubkey size"); + + buf.reserve(88 + tagBuilder.buf.size()); + + buf += id; + buf += pubkey; + buf += lmdb::to_sv(created_at); + buf += lmdb::to_sv(kind); + buf += lmdb::to_sv(expiration); + buf += tagBuilder.buf; + } +}; diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index d9e2c23..bdc25ff 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -67,11 +67,11 @@ struct WriterPipeline { return; } - std::string flatStr; + std::string packedStr; std::string jsonStr; try { - parseAndVerifyEvent(m.eventJson, secpCtx, verifyMsg, verifyTime, flatStr, jsonStr); + parseAndVerifyEvent(m.eventJson, secpCtx, verifyMsg, verifyTime, packedStr, jsonStr); } catch (std::exception &e) { if (verboseReject) LW << "Rejected event: " << m.eventJson << " reason: " << e.what(); numLive--; @@ -79,7 +79,7 @@ struct WriterPipeline { continue; } - writerInbox.push_move({ std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), m.sourceType, std::move(m.sourceInfo) }); + writerInbox.push_move({ std::move(packedStr), std::move(jsonStr), hoytech::curr_time_us(), m.sourceType, std::move(m.sourceInfo) }); } } }); @@ -122,15 +122,15 @@ struct WriterPipeline { auto event = std::move(newEvents.front()); newEvents.pop_front(); - if (event.flatStr.size() == 0) { + if (event.packedStr.size() == 0) { shutdownComplete = true; break; } numLive--; - auto *flat = flatStrToFlatEvent(event.flatStr); - if (lookupEventById(txn, sv(flat->id()))) { + PackedEventView packed(event.packedStr); + if (lookupEventById(txn, packed.id())) { dups++; totalDups++; continue; diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp index 248b3ef..edd0d83 100644 --- a/src/apps/mesh/cmd_router.cpp +++ b/src/apps/mesh/cmd_router.cpp @@ -193,7 +193,7 @@ struct Router { void outgoingEvent(lmdb::txn &txn, defaultDb::environment::View_Event &ev, std::string &responseStr, tao::json::value &evJson) { if (dir == "down") return; - if (!filterCompiled.doesMatch(ev.flat_nested())) return; + if (!filterCompiled.doesMatch(PackedEventView(ev.packed()))) return; if (responseStr.size() == 0) { auto evStr = getEventJson(txn, router->decomp, ev.primaryKeyId); diff --git a/src/apps/mesh/cmd_stream.cpp b/src/apps/mesh/cmd_stream.cpp index 9d5f11c..d9ca432 100644 --- a/src/apps/mesh/cmd_stream.cpp +++ b/src/apps/mesh/cmd_stream.cpp @@ -101,7 +101,7 @@ void cmd_stream(const std::vector &subArgs) { env.foreach_Event(txn, [&](auto &ev){ currEventId = ev.primaryKeyId; - auto id = std::string(sv(ev.flat_nested()->id())); + auto id = std::string(PackedEventView(ev.packed()).id()); if (downloadedIds.find(id) != downloadedIds.end()) { downloadedIds.erase(id); return true; diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 80c4ecb..1131c2d 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -72,7 +72,8 @@ void cmd_sync(const std::vector &subArgs) { for (auto levId : levIds) { auto ev = lookupEventByLevId(txn, levId); - ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize)); + PackedEventView packed(ev.packed()); + ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize)); } LI << "Filter matches " << numEvents << " events"; diff --git a/src/apps/relay/RelayCron.cpp b/src/apps/relay/RelayCron.cpp index ca776f4..2c112dc 100644 --- a/src/apps/relay/RelayCron.cpp +++ b/src/apps/relay/RelayCron.cpp @@ -91,7 +91,7 @@ void RelayServer::runCron() { if (expiration == 1) { // Ephemeral event auto view = env.lookup_Event(txn, levId); if (!view) throw herr("missing event from index, corrupt DB?"); - uint64_t created = view->flat_nested()->created_at(); + uint64_t created = PackedEventView(view->packed()).created_at(); if (created <= ephemeralCutoff) { numEphemeral++; diff --git a/src/apps/relay/RelayIngester.cpp b/src/apps/relay/RelayIngester.cpp index 77362f5..d51875b 100644 --- a/src/apps/relay/RelayIngester.cpp +++ b/src/apps/relay/RelayIngester.cpp @@ -86,33 +86,33 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output) { - std::string flatStr, jsonStr; + std::string packedStr, jsonStr; - parseAndVerifyEvent(origJson, secpCtx, true, true, flatStr, jsonStr); + parseAndVerifyEvent(origJson, secpCtx, true, true, packedStr, jsonStr); - auto *flat = flatbuffers::GetRoot(flatStr.data()); + PackedEventView packed(packedStr); { for (const auto &tagArr : origJson.at("tags").get_array()) { auto tag = tagArr.get_array(); if (tag.size() == 1 && tag.at(0).get_string() == "-") { LI << "Protected event, skipping"; - sendOKResponse(connId, to_hex(sv(flat->id())), false, "blocked: event marked as protected"); + sendOKResponse(connId, to_hex(packed.id()), false, "blocked: event marked as protected"); return; } } } { - auto existing = lookupEventById(txn, sv(flat->id())); + auto existing = lookupEventById(txn, packed.id()); if (existing) { LI << "Duplicate event, skipping"; - sendOKResponse(connId, to_hex(sv(flat->id())), true, "duplicate: have this event"); + sendOKResponse(connId, to_hex(packed.id()), true, "duplicate: have this event"); return; } } - output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}}); + output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(packedStr), std::move(jsonStr)}}); } void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) { diff --git a/src/apps/relay/RelayNegentropy.cpp b/src/apps/relay/RelayNegentropy.cpp index 84c0e34..9f752a8 100644 --- a/src/apps/relay/RelayNegentropy.cpp +++ b/src/apps/relay/RelayNegentropy.cpp @@ -100,7 +100,8 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { for (auto levId : view->levIds) { try { auto ev = lookupEventByLevId(txn, levId); - view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize)); + auto packed = PackedEventView(ev.packed()); + view->ne.addItem(packed.created_at(), packed.id().substr(0, view->ne.idSize)); } catch (std::exception &) { // levId was deleted when query was paused } diff --git a/src/apps/relay/RelayReqMonitor.cpp b/src/apps/relay/RelayReqMonitor.cpp index 4fadd70..5840bd7 100644 --- a/src/apps/relay/RelayReqMonitor.cpp +++ b/src/apps/relay/RelayReqMonitor.cpp @@ -31,7 +31,7 @@ void RelayServer::runReqMonitor(ThreadPool::Thread &thr) { auto connId = msg->sub.connId; env.foreach_Event(txn, [&](auto &ev){ - if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) { + if (msg->sub.filterGroup.doesMatch(PackedEventView(ev.packed()))) { sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId)); } diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index 407e09f..aae6d98 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -66,7 +66,7 @@ struct MsgWriter : NonCopyable { uint64_t connId; std::string ipAddr; uint64_t receivedAt; - std::string flatStr; + std::string packedStr; std::string jsonStr; }; diff --git a/src/apps/relay/RelayWriter.cpp b/src/apps/relay/RelayWriter.cpp index e1d95a8..bc5b4de 100644 --- a/src/apps/relay/RelayWriter.cpp +++ b/src/apps/relay/RelayWriter.cpp @@ -43,10 +43,10 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg); if (res == PluginEventSifterResult::Accept) { - newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); + newEvents.emplace_back(std::move(msg->packedStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); } else { - auto *flat = flatbuffers::GetRoot(msg->flatStr.data()); - auto eventIdHex = to_hex(sv(flat->id())); + PackedEventView packed(msg->packedStr); + auto eventIdHex = to_hex(packed.id()); if (okMsg.size()) LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg; @@ -67,8 +67,8 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { LE << "Error writing " << newEvents.size() << " events: " << e.what(); for (auto &newEvent : newEvents) { - auto *flat = flatbuffers::GetRoot(newEvent.flatStr.data()); - auto eventIdHex = to_hex(sv(flat->id())); + PackedEventView packed(newEvent.packedStr); + auto eventIdHex = to_hex(packed.id()); MsgWriter::AddEvent *addEventMsg = static_cast(newEvent.userData); std::string message = "Write error: "; @@ -83,8 +83,8 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { // Log for (auto &newEvent : newEvents) { - auto *flat = flatbuffers::GetRoot(newEvent.flatStr.data()); - auto eventIdHex = to_hex(sv(flat->id())); + PackedEventView packed(newEvent.packedStr); + auto eventIdHex = to_hex(packed.id()); std::string message; bool written = false; diff --git a/src/events.cpp b/src/events.cpp index f0c53a3..76f2d60 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -3,8 +3,8 @@ #include "events.h" -std::string nostrJsonToFlat(const tao::json::value &v) { - flatbuffers::FlatBufferBuilder builder; // FIXME: pre-allocate size approximately the same as orig JSON? +std::string nostrJsonToPackedEvent(const tao::json::value &v) { + PackedEventTagBuilder tagBuilder; // Extract values from JSON, add strings to builder @@ -16,17 +16,11 @@ std::string nostrJsonToFlat(const tao::json::value &v) { if (id.size() != 32) throw herr("unexpected id size"); if (pubkey.size() != 32) throw herr("unexpected pubkey size"); - std::vector> tagsGeneral; - std::vector> tagsFixed32; - uint64_t expiration = 0; if (isReplaceableKind(kind)) { // Prepend virtual d-tag - tagsGeneral.emplace_back(NostrIndex::CreateTagGeneral(builder, - 'd', - builder.CreateVector((uint8_t*)"", 0) - )); + tagBuilder.add('d', ""); } if (v.at("tags").get_array().size() > cfg().events__maxNumTags) throw herr("too many tags: ", v.at("tags").get_array().size()); @@ -41,10 +35,7 @@ std::string nostrJsonToFlat(const tao::json::value &v) { tagVal = from_hex(tagVal, false); if (tagVal.size() != 32) throw herr("unexpected size for fixed-size tag"); - tagsFixed32.emplace_back(NostrIndex::CreateTagFixed32(builder, - (uint8_t)tagName[0], - (NostrIndex::Fixed32Bytes*)tagVal.data() - )); + tagBuilder.add(tagName[0], tagVal); } else if (tagName == "expiration") { if (expiration == 0) { expiration = parseUint64(tagVal); @@ -54,41 +45,23 @@ std::string nostrJsonToFlat(const tao::json::value &v) { if (tagVal.size() > cfg().events__maxTagValSize) throw herr("tag val too large: ", tagVal.size()); if (tagVal.size() <= MAX_INDEXED_TAG_VAL_SIZE) { - tagsGeneral.emplace_back(NostrIndex::CreateTagGeneral(builder, - (uint8_t)tagName[0], - builder.CreateVector((uint8_t*)tagVal.data(), tagVal.size()) - )); + tagBuilder.add(tagName[0], tagVal); } } } if (isParamReplaceableKind(kind)) { // Append virtual d-tag - tagsGeneral.emplace_back(NostrIndex::CreateTagGeneral(builder, - 'd', - builder.CreateVector((uint8_t*)"", 0) - )); + tagBuilder.add('d', ""); } if (isEphemeralKind(kind)) { expiration = 1; } - // Create flatbuffer + PackedEventBuilder builder(id, pubkey, created_at, kind, expiration, tagBuilder); - auto eventPtr = NostrIndex::CreateEvent(builder, - (NostrIndex::Fixed32Bytes*)id.data(), - (NostrIndex::Fixed32Bytes*)pubkey.data(), - created_at, - kind, - builder.CreateVector>(tagsGeneral), - builder.CreateVector>(tagsFixed32), - expiration - ); - - builder.Finish(eventPtr); - - return std::string(reinterpret_cast(builder.GetBufferPointer()), builder.GetSize()); + return std::move(builder.buf); } std::string nostrHash(const tao::json::value &origJson) { @@ -127,11 +100,11 @@ bool verifySig(secp256k1_context* ctx, std::string_view sig, std::string_view ha ); } -void verifyNostrEvent(secp256k1_context *secpCtx, const NostrIndex::Event *flat, const tao::json::value &origJson) { +void verifyNostrEvent(secp256k1_context *secpCtx, PackedEventView packed, const tao::json::value &origJson) { auto hash = nostrHash(origJson); - if (hash != sv(flat->id())) throw herr("bad event id"); + if (hash != packed.id()) throw herr("bad event id"); - bool valid = verifySig(secpCtx, from_hex(origJson.at("sig").get_string(), false), sv(flat->id()), sv(flat->pubkey())); + bool valid = verifySig(secpCtx, from_hex(origJson.at("sig").get_string(), false), packed.id(), packed.pubkey()); if (!valid) throw herr("bad signature"); } @@ -139,11 +112,11 @@ void verifyNostrEventJsonSize(std::string_view jsonStr) { if (jsonStr.size() > cfg().events__maxEventSize) throw herr("event too large: ", jsonStr.size()); } -void verifyEventTimestamp(const NostrIndex::Event *flat) { +void verifyEventTimestamp(PackedEventView packed) { auto now = hoytech::curr_time_s(); - auto ts = flat->created_at(); + auto ts = packed.created_at(); - uint64_t earliest = now - (flat->expiration() == 1 ? cfg().events__rejectEphemeralEventsOlderThanSeconds : cfg().events__rejectEventsOlderThanSeconds); + uint64_t earliest = now - (packed.expiration() == 1 ? cfg().events__rejectEphemeralEventsOlderThanSeconds : cfg().events__rejectEventsOlderThanSeconds); uint64_t latest = now + cfg().events__rejectEventsNewerThanSeconds; // overflows @@ -153,14 +126,14 @@ void verifyEventTimestamp(const NostrIndex::Event *flat) { if (ts < earliest) throw herr("created_at too early"); if (ts > latest) throw herr("created_at too late"); - if (flat->expiration() > 1 && flat->expiration() <= now) throw herr("event expired"); + if (packed.expiration() > 1 && packed.expiration() <= now) throw herr("event expired"); } -void parseAndVerifyEvent(const tao::json::value &origJson, secp256k1_context *secpCtx, bool verifyMsg, bool verifyTime, std::string &flatStr, std::string &jsonStr) { - flatStr = nostrJsonToFlat(origJson); - auto *flat = flatbuffers::GetRoot(flatStr.data()); - if (verifyTime) verifyEventTimestamp(flat); - if (verifyMsg) verifyNostrEvent(secpCtx, flat, origJson); +void parseAndVerifyEvent(const tao::json::value &origJson, secp256k1_context *secpCtx, bool verifyMsg, bool verifyTime, std::string &packedStr, std::string &jsonStr) { + packedStr = nostrJsonToPackedEvent(origJson); + PackedEventView packed(packedStr); + if (verifyTime) verifyEventTimestamp(packed); + if (verifyMsg) verifyNostrEvent(secpCtx, packed, origJson); // Build new object to remove unknown top-level fields from json jsonStr = tao::json::to_string(tao::json::value({ @@ -276,14 +249,14 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev for (size_t i = 0; i < evs.size(); i++) { auto &ev = evs[i]; - const NostrIndex::Event *flat = flatbuffers::GetRoot(ev.flatStr.data()); + PackedEventView packed(ev.packedStr); - if (lookupEventById(txn, sv(flat->id())) || (i != 0 && ev.id() == evs[i-1].id())) { + if (lookupEventById(txn, packed.id()) || (i != 0 && ev.id() == evs[i-1].id())) { ev.status = EventWriteStatus::Duplicate; continue; } - if (env.lookup_Event__deletion(txn, std::string(sv(flat->id())) + std::string(sv(flat->pubkey())))) { + if (env.lookup_Event__deletion(txn, std::string(packed.id()) + std::string(packed.pubkey()))) { ev.status = EventWriteStatus::Deleted; continue; } @@ -291,30 +264,30 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev { std::optional replace; - if (isReplaceableKind(flat->kind()) || isParamReplaceableKind(flat->kind())) { - for (const auto &tagPair : *(flat->tagsGeneral())) { - auto tagName = (char)tagPair->key(); - if (tagName != 'd') continue; - replace = std::string(sv(tagPair->val())); - break; - } + if (isReplaceableKind(packed.kind()) || isParamReplaceableKind(packed.kind())) { + packed.foreachTag([&](char tagName, std::string_view tagVal){ + if (tagName != 'd') return true; + replace = std::string(tagVal); + return false; + }); } if (replace) { - auto searchStr = std::string(sv(flat->pubkey())) + *replace; - auto searchKey = makeKey_StringUint64(searchStr, flat->kind()); + auto searchStr = std::string(packed.pubkey()) + *replace; + auto searchKey = makeKey_StringUint64(searchStr, packed.kind()); env.generic_foreachFull(txn, env.dbi_Event__replace, searchKey, lmdb::to_sv(MAX_U64), [&](auto k, auto v) { ParsedKey_StringUint64 parsedKey(k); - if (parsedKey.s == searchStr && parsedKey.n == flat->kind()) { + if (parsedKey.s == searchStr && parsedKey.n == packed.kind()) { auto otherEv = lookupEventByLevId(txn, lmdb::from_sv(v)); - auto thisTimestamp = flat->created_at(); - auto otherTimestamp = otherEv.flat_nested()->created_at(); + auto thisTimestamp = packed.created_at(); + auto otherPacked = PackedEventView(otherEv.packed()); + auto otherTimestamp = otherPacked.created_at(); if (otherTimestamp < thisTimestamp || - (otherTimestamp == thisTimestamp && sv(flat->id()) < sv(otherEv.flat_nested()->id()))) { - if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv.flat_nested()->id())); + (otherTimestamp == thisTimestamp && packed.id() < otherPacked.id())) { + if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(otherPacked.id()); levIdsToDelete.push_back(otherEv.primaryKeyId); } else { ev.status = EventWriteStatus::Replaced; @@ -326,21 +299,22 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev } } - if (flat->kind() == 5) { + if (packed.kind() == 5) { // Deletion event, delete all referenced events - for (const auto &tagPair : *(flat->tagsFixed32())) { - if (tagPair->key() == 'e') { - auto otherEv = lookupEventById(txn, sv(tagPair->val())); - if (otherEv && sv(otherEv->flat_nested()->pubkey()) == sv(flat->pubkey())) { - if (logLevel >= 1) LI << "Deleting event (kind 5). id=" << to_hex(sv(tagPair->val())); + packed.foreachTag([&](char tagName, std::string_view tagVal){ + if (tagName == 'e') { + auto otherEv = lookupEventById(txn, tagVal); + if (otherEv && PackedEventView(otherEv->packed()).pubkey() == packed.pubkey()) { + if (logLevel >= 1) LI << "Deleting event (kind 5). id=" << to_hex(tagVal); levIdsToDelete.push_back(otherEv->primaryKeyId); } } - } + return true; + }); } if (ev.status == EventWriteStatus::Pending) { - ev.levId = env.insert_Event(txn, ev.receivedAt, ev.flatStr, (uint64_t)ev.sourceType, ev.sourceInfo); + ev.levId = env.insert_Event(txn, ev.receivedAt, ev.packedStr, (uint64_t)ev.sourceType, ev.sourceInfo); tmpBuf.clear(); tmpBuf += '\x00'; diff --git a/src/events.h b/src/events.h index 94c89d5..f8a9459 100644 --- a/src/events.h +++ b/src/events.h @@ -4,6 +4,7 @@ #include "golpe.h" +#include "PackedEvent.h" #include "Decompressor.h" @@ -33,22 +34,17 @@ inline bool isEphemeralKind(uint64_t kind) { -std::string nostrJsonToFlat(const tao::json::value &v); +std::string nostrJsonToPackedEvent(const tao::json::value &v); std::string nostrHash(const tao::json::value &origJson); bool verifySig(secp256k1_context* ctx, std::string_view sig, std::string_view hash, std::string_view pubkey); -void verifyNostrEvent(secp256k1_context *secpCtx, const NostrIndex::Event *flat, const tao::json::value &origJson); +void verifyNostrEvent(secp256k1_context *secpCtx, PackedEventView packed, const tao::json::value &origJson); void verifyNostrEventJsonSize(std::string_view jsonStr); -void verifyEventTimestamp(const NostrIndex::Event *flat); +void verifyEventTimestamp(PackedEventView packed); -void parseAndVerifyEvent(const tao::json::value &origJson, secp256k1_context *secpCtx, bool verifyMsg, bool verifyTime, std::string &flatStr, std::string &jsonStr); +void parseAndVerifyEvent(const tao::json::value &origJson, secp256k1_context *secpCtx, bool verifyMsg, bool verifyTime, std::string &packedStr, std::string &jsonStr); -// Does not do verification! -inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) { - return flatbuffers::GetRoot(flatStr.data()); -} - std::optional lookupEventById(lmdb::txn &txn, std::string_view id); defaultDb::environment::View_Event lookupEventByLevId(lmdb::txn &txn, uint64_t levId); // throws if can't find @@ -90,7 +86,7 @@ enum class EventWriteStatus { struct EventToWrite { - std::string flatStr; + std::string packedStr; std::string jsonStr; uint64_t receivedAt; EventSourceType sourceType; @@ -101,17 +97,15 @@ struct EventToWrite { EventToWrite() {} - EventToWrite(std::string flatStr, std::string jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string sourceInfo, void *userData = nullptr) : flatStr(flatStr), jsonStr(jsonStr), receivedAt(receivedAt), sourceType(sourceType), sourceInfo(sourceInfo), userData(userData) { + EventToWrite(std::string packedStr, std::string jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string sourceInfo, void *userData = nullptr) : packedStr(packedStr), jsonStr(jsonStr), receivedAt(receivedAt), sourceType(sourceType), sourceInfo(sourceInfo), userData(userData) { } std::string_view id() { - const NostrIndex::Event *flat = flatbuffers::GetRoot(flatStr.data()); - return sv(flat->id()); + return PackedEventView(packedStr).id(); } uint64_t createdAt() { - const NostrIndex::Event *flat = flatbuffers::GetRoot(flatStr.data()); - return flat->created_at(); + return PackedEventView(packedStr).created_at(); } }; diff --git a/src/filters.h b/src/filters.h index 22a82d3..8c2f7e3 100644 --- a/src/filters.h +++ b/src/filters.h @@ -172,35 +172,25 @@ struct NostrFilter { return true; } - bool doesMatch(const NostrIndex::Event *ev) const { + bool doesMatch(PackedEventView ev) const { if (neverMatch) return false; - if (!doesMatchTimes(ev->created_at())) return false; + if (!doesMatchTimes(ev.created_at())) return false; - if (ids && !ids->doesMatch(sv(ev->id()))) return false; - if (authors && !authors->doesMatch(sv(ev->pubkey()))) return false; - if (kinds && !kinds->doesMatch(ev->kind())) return false; + if (ids && !ids->doesMatch(ev.id())) return false; + if (authors && !authors->doesMatch(ev.pubkey())) return false; + if (kinds && !kinds->doesMatch(ev.kind())) return false; for (const auto &[tag, filt] : tags) { bool foundMatch = false; - for (const auto &tagPair : *(ev->tagsFixed32())) { - auto eventTag = tagPair->key(); - if (eventTag == tag && filt.doesMatch(sv(tagPair->val()))) { + ev.foreachTag([&](char tagName, std::string_view tagVal){ + if (tagName == tag && filt.doesMatch(tagVal)) { foundMatch = true; - break; + return false; } - } - - if (!foundMatch) { - for (const auto &tagPair : *(ev->tagsGeneral())) { - auto eventTag = tagPair->key(); - if (eventTag == tag && filt.doesMatch(sv(tagPair->val()))) { - foundMatch = true; - break; - } - } - } + return true; + }); if (!foundMatch) return false; } @@ -240,7 +230,7 @@ struct NostrFilterGroup { return NostrFilterGroup(pretendReqQuery, maxFilterLimit); } - bool doesMatch(const NostrIndex::Event *ev) const { + bool doesMatch(PackedEventView ev) const { for (const auto &f : filters) { if (f.doesMatch(ev)) return true; }