From 391f632b73f29b744281bf526059056e7536d82f Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Fri, 30 Aug 2024 16:51:43 -0400 Subject: [PATCH] don't store receivedAt and source metadata --- golpe.yaml | 7 ------- src/PluginEventSifter.h | 4 ++-- src/WriterPipeline.h | 6 ++---- src/apps/dbutils/cmd_import.cpp | 2 +- src/apps/mesh/cmd_router.cpp | 6 +++--- src/apps/mesh/cmd_stream.cpp | 4 ++-- src/apps/mesh/cmd_sync.cpp | 4 ++-- src/apps/relay/RelayIngester.cpp | 2 +- src/apps/relay/RelayServer.h | 1 - src/apps/relay/RelayWriter.cpp | 4 ++-- src/events.cpp | 2 +- src/events.h | 6 ++---- 12 files changed, 18 insertions(+), 30 deletions(-) diff --git a/golpe.yaml b/golpe.yaml index e4aac02..d3aae31 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -22,18 +22,12 @@ tables: ## Primary key is auto-incremented, called "levId" for Local EVent ID Event: fields: - - name: receivedAt # microseconds - name: packed type: ubytes - - name: sourceType - - name: sourceInfo - type: ubytes indices: created_at: integer: true - receivedAt: - integer: true id: comparator: StringUint64 pubkey: @@ -57,7 +51,6 @@ tables: PackedEventView packed(v.packed()); created_at = packed.created_at(); uint64_t indexTime = *created_at; - receivedAt = v.receivedAt(); id = makeKey_StringUint64(packed.id(), indexTime); pubkey = makeKey_StringUint64(packed.pubkey(), indexTime); diff --git a/src/PluginEventSifter.h b/src/PluginEventSifter.h index f04e445..89a74ee 100644 --- a/src/PluginEventSifter.h +++ b/src/PluginEventSifter.h @@ -59,7 +59,7 @@ struct PluginEventSifter { std::unique_ptr running; - PluginEventSifterResult acceptEvent(const std::string &pluginCmd, const tao::json::value &evJson, uint64_t receivedAtUs, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { + PluginEventSifterResult acceptEvent(const std::string &pluginCmd, const tao::json::value &evJson, EventSourceType sourceType, std::string_view sourceInfo, std::string &okMsg) { if (pluginCmd.size() == 0) { running.reset(); return PluginEventSifterResult::Accept; @@ -85,7 +85,7 @@ struct PluginEventSifter { auto request = tao::json::value({ { "type", "new" }, { "event", evJson }, - { "receivedAt", receivedAtUs / 1'000'000 }, + { "receivedAt", ::time(nullptr) }, { "sourceType", eventSourceTypeToStr(sourceType) }, { "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo }, }); diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index bdc25ff..8d43de0 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -9,8 +9,6 @@ struct WriterPipelineInput { tao::json::value eventJson; - EventSourceType sourceType; - std::string sourceInfo; }; @@ -79,7 +77,7 @@ struct WriterPipeline { continue; } - writerInbox.push_move({ std::move(packedStr), std::move(jsonStr), hoytech::curr_time_us(), m.sourceType, std::move(m.sourceInfo) }); + writerInbox.push_move({ std::move(packedStr), std::move(jsonStr), }); } } }); @@ -187,7 +185,7 @@ struct WriterPipeline { } void flush() { - validatorInbox.push_move({ tao::json::null, EventSourceType::None, "" }); + validatorInbox.push_move({ tao::json::null, }); flushInbox.wait(); } diff --git a/src/apps/dbutils/cmd_import.cpp b/src/apps/dbutils/cmd_import.cpp index 492b110..1ff0b04 100644 --- a/src/apps/dbutils/cmd_import.cpp +++ b/src/apps/dbutils/cmd_import.cpp @@ -55,7 +55,7 @@ void cmd_import(const std::vector &subArgs) { continue; } - writer.write({ std::move(evJson), EventSourceType::Import, "" }); + writer.write({ std::move(evJson), }); writer.wait(); } diff --git a/src/apps/mesh/cmd_router.cpp b/src/apps/mesh/cmd_router.cpp index edd0d83..131d106 100644 --- a/src/apps/mesh/cmd_router.cpp +++ b/src/apps/mesh/cmd_router.cpp @@ -183,9 +183,9 @@ struct Router { std::string okMsg; - auto res = pluginDown.acceptEvent(pluginDownCmd, evJson, hoytech::curr_time_us(), EventSourceType::Stream, url, okMsg); + auto res = pluginDown.acceptEvent(pluginDownCmd, evJson, EventSourceType::Stream, url, okMsg); if (res == PluginEventSifterResult::Accept) { - router->writer.write({ std::move(evJson), EventSourceType::Stream, url }); + router->writer.write({ std::move(evJson), }); } else { if (okMsg.size()) LI << groupName << " / " << url << " : pluginDown blocked event " << evJson.at("id").get_string() << ": " << okMsg; } @@ -206,7 +206,7 @@ struct Router { std::string okMsg; - auto res = pluginUp.acceptEvent(pluginUpCmd, evJson, ev.receivedAt(), (EventSourceType)ev.sourceType(), ev.sourceInfo(), okMsg); + auto res = pluginUp.acceptEvent(pluginUpCmd, evJson, EventSourceType::Stored, "", okMsg); if (res == PluginEventSifterResult::Accept) { for (auto &[url, c] : conns) { if (c.ws) c.ws->send(responseStr.data(), responseStr.size(), uWS::OpCode::TEXT, nullptr, nullptr, true); diff --git a/src/apps/mesh/cmd_stream.cpp b/src/apps/mesh/cmd_stream.cpp index d9ca432..4862a66 100644 --- a/src/apps/mesh/cmd_stream.cpp +++ b/src/apps/mesh/cmd_stream.cpp @@ -67,10 +67,10 @@ void cmd_stream(const std::vector &subArgs) { auto &evJson = origJson.at(2); std::string okMsg; - auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_us(), EventSourceType::Stream, ws.remoteAddr, okMsg); + auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, EventSourceType::Stream, ws.remoteAddr, okMsg); if (res == PluginEventSifterResult::Accept) { downloadedIds.emplace(from_hex(evJson.at("id").get_string())); - writer.write({ std::move(evJson), EventSourceType::Stream, url }); + writer.write({ std::move(evJson), }); } else { if (okMsg.size()) LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; } diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 1131c2d..5ab2513 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -168,9 +168,9 @@ void cmd_sync(const std::vector &subArgs) { auto &evJson = msg.at(2); std::string okMsg; - auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, hoytech::curr_time_us(), EventSourceType::Sync, ws.remoteAddr, okMsg); + auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, EventSourceType::Sync, ws.remoteAddr, okMsg); if (res == PluginEventSifterResult::Accept) { - writer.write({ std::move(evJson), EventSourceType::Sync, url }); + writer.write({ std::move(evJson), }); } else { if (okMsg.size()) LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; } diff --git a/src/apps/relay/RelayIngester.cpp b/src/apps/relay/RelayIngester.cpp index d51875b..0021434 100644 --- a/src/apps/relay/RelayIngester.cpp +++ b/src/apps/relay/RelayIngester.cpp @@ -112,7 +112,7 @@ void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::str } } - output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(packedStr), std::move(jsonStr)}}); + output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), std::move(packedStr), std::move(jsonStr)}}); } void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) { diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index aae6d98..cc57c69 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -65,7 +65,6 @@ struct MsgWriter : NonCopyable { struct AddEvent { uint64_t connId; std::string ipAddr; - uint64_t receivedAt; std::string packedStr; std::string jsonStr; }; diff --git a/src/apps/relay/RelayWriter.cpp b/src/apps/relay/RelayWriter.cpp index bc5b4de..73394bc 100644 --- a/src/apps/relay/RelayWriter.cpp +++ b/src/apps/relay/RelayWriter.cpp @@ -40,10 +40,10 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { tao::json::value evJson = tao::json::from_string(msg->jsonStr); EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; std::string okMsg; - auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg); + auto res = writePolicyPlugin.acceptEvent(cfg().relay__writePolicy__plugin, evJson, sourceType, msg->ipAddr, okMsg); if (res == PluginEventSifterResult::Accept) { - newEvents.emplace_back(std::move(msg->packedStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); + newEvents.emplace_back(std::move(msg->packedStr), std::move(msg->jsonStr), msg); } else { PackedEventView packed(msg->packedStr); auto eventIdHex = to_hex(packed.id()); diff --git a/src/events.cpp b/src/events.cpp index 76f2d60..2d0e3e7 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -314,7 +314,7 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev } if (ev.status == EventWriteStatus::Pending) { - ev.levId = env.insert_Event(txn, ev.receivedAt, ev.packedStr, (uint64_t)ev.sourceType, ev.sourceInfo); + ev.levId = env.insert_Event(txn, ev.packedStr); tmpBuf.clear(); tmpBuf += '\x00'; diff --git a/src/events.h b/src/events.h index f8a9459..d05bc71 100644 --- a/src/events.h +++ b/src/events.h @@ -63,6 +63,7 @@ enum class EventSourceType { Import = 3, Stream = 4, Sync = 5, + Stored = 6, }; inline std::string eventSourceTypeToStr(EventSourceType t) { @@ -88,16 +89,13 @@ enum class EventWriteStatus { struct EventToWrite { std::string packedStr; std::string jsonStr; - uint64_t receivedAt; - EventSourceType sourceType; - std::string sourceInfo; void *userData = nullptr; EventWriteStatus status = EventWriteStatus::Pending; uint64_t levId = 0; EventToWrite() {} - EventToWrite(std::string packedStr, std::string jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string sourceInfo, void *userData = nullptr) : packedStr(packedStr), jsonStr(jsonStr), receivedAt(receivedAt), sourceType(sourceType), sourceInfo(sourceInfo), userData(userData) { + EventToWrite(std::string packedStr, std::string jsonStr, void *userData = nullptr) : packedStr(packedStr), jsonStr(jsonStr), userData(userData) { } std::string_view id() {