use custom packing for indexable data: PackedEvent

This commit is contained in:
Doug Hoyte
2024-08-29 20:18:05 -04:00
parent 3905f84ec8
commit 058c97f856
18 changed files with 212 additions and 208 deletions

View File

@ -193,7 +193,7 @@ struct Router {
void outgoingEvent(lmdb::txn &txn, defaultDb::environment::View_Event &ev, std::string &responseStr, tao::json::value &evJson) {
if (dir == "down") return;
if (!filterCompiled.doesMatch(ev.flat_nested())) return;
if (!filterCompiled.doesMatch(PackedEventView(ev.packed()))) return;
if (responseStr.size() == 0) {
auto evStr = getEventJson(txn, router->decomp, ev.primaryKeyId);

View File

@ -101,7 +101,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId;
auto id = std::string(sv(ev.flat_nested()->id()));
auto id = std::string(PackedEventView(ev.packed()).id());
if (downloadedIds.find(id) != downloadedIds.end()) {
downloadedIds.erase(id);
return true;

View File

@ -72,7 +72,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize));
PackedEventView packed(ev.packed());
ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize));
}
LI << "Filter matches " << numEvents << " events";

View File

@ -91,7 +91,7 @@ void RelayServer::runCron() {
if (expiration == 1) { // Ephemeral event
auto view = env.lookup_Event(txn, levId);
if (!view) throw herr("missing event from index, corrupt DB?");
uint64_t created = view->flat_nested()->created_at();
uint64_t created = PackedEventView(view->packed()).created_at();
if (created <= ephemeralCutoff) {
numEphemeral++;

View File

@ -86,33 +86,33 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
}
void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output) {
std::string flatStr, jsonStr;
std::string packedStr, jsonStr;
parseAndVerifyEvent(origJson, secpCtx, true, true, flatStr, jsonStr);
parseAndVerifyEvent(origJson, secpCtx, true, true, packedStr, jsonStr);
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
PackedEventView packed(packedStr);
{
for (const auto &tagArr : origJson.at("tags").get_array()) {
auto tag = tagArr.get_array();
if (tag.size() == 1 && tag.at(0).get_string() == "-") {
LI << "Protected event, skipping";
sendOKResponse(connId, to_hex(sv(flat->id())), false, "blocked: event marked as protected");
sendOKResponse(connId, to_hex(packed.id()), false, "blocked: event marked as protected");
return;
}
}
}
{
auto existing = lookupEventById(txn, sv(flat->id()));
auto existing = lookupEventById(txn, packed.id());
if (existing) {
LI << "Duplicate event, skipping";
sendOKResponse(connId, to_hex(sv(flat->id())), true, "duplicate: have this event");
sendOKResponse(connId, to_hex(packed.id()), true, "duplicate: have this event");
return;
}
}
output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}});
output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(packedStr), std::move(jsonStr)}});
}
void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {

View File

@ -100,7 +100,8 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
for (auto levId : view->levIds) {
try {
auto ev = lookupEventByLevId(txn, levId);
view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize));
auto packed = PackedEventView(ev.packed());
view->ne.addItem(packed.created_at(), packed.id().substr(0, view->ne.idSize));
} catch (std::exception &) {
// levId was deleted when query was paused
}

View File

@ -31,7 +31,7 @@ void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
auto connId = msg->sub.connId;
env.foreach_Event(txn, [&](auto &ev){
if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) {
if (msg->sub.filterGroup.doesMatch(PackedEventView(ev.packed()))) {
sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
}

View File

@ -66,7 +66,7 @@ struct MsgWriter : NonCopyable {
uint64_t connId;
std::string ipAddr;
uint64_t receivedAt;
std::string flatStr;
std::string packedStr;
std::string jsonStr;
};

View File

@ -43,10 +43,10 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg);
if (res == PluginEventSifterResult::Accept) {
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);
newEvents.emplace_back(std::move(msg->packedStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);
} else {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(msg->flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
PackedEventView packed(msg->packedStr);
auto eventIdHex = to_hex(packed.id());
if (okMsg.size()) LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg;
@ -67,8 +67,8 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
LE << "Error writing " << newEvents.size() << " events: " << e.what();
for (auto &newEvent : newEvents) {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(newEvent.flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
PackedEventView packed(newEvent.packedStr);
auto eventIdHex = to_hex(packed.id());
MsgWriter::AddEvent *addEventMsg = static_cast<MsgWriter::AddEvent*>(newEvent.userData);
std::string message = "Write error: ";
@ -83,8 +83,8 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
// Log
for (auto &newEvent : newEvents) {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(newEvent.flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
PackedEventView packed(newEvent.packedStr);
auto eventIdHex = to_hex(packed.id());
std::string message;
bool written = false;