diff --git a/golpe b/golpe index 31b657a..c95388c 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit 31b657a426277a14847e40512b621d6a89b85e51 +Subproject commit c95388c4619fffad4ff6632ccafc2780a6d32663 diff --git a/golpe.yaml b/golpe.yaml index 2dfa186..013acf6 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -27,6 +27,9 @@ tables: - name: flat type: ubytes nestedFlat: NostrIndex.Event + - name: sourceType + - name: sourceInfo + type: ubytes indices: created_at: diff --git a/src/RelayIngester.cpp b/src/RelayIngester.cpp index f02ca80..dc8139a 100644 --- a/src/RelayIngester.cpp +++ b/src/RelayIngester.cpp @@ -29,7 +29,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { if (cfg().relay__logging__dumpInEvents) LI << "[" << msg->connId << "] dumpInEvent: " << msg->payload; try { - ingesterProcessEvent(txn, msg->connId, secpCtx, arr[1], writerMsgs); + ingesterProcessEvent(txn, msg->connId, msg->ipAddr, secpCtx, arr[1], writerMsgs); } catch (std::exception &e) { sendOKResponse(msg->connId, arr[1].at("id").get_string(), false, std::string("invalid: ") + e.what()); LI << "Rejected invalid event: " << e.what(); @@ -82,7 +82,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } } -void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output) { +void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output) { std::string flatStr, jsonStr; parseAndVerifyEvent(origJson, secpCtx, true, true, flatStr, jsonStr); @@ -98,7 +98,7 @@ void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k } } - output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}}); + output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}}); } void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) { diff --git a/src/RelayServer.h b/src/RelayServer.h index d044e74..4585dd0 100644 --- a/src/RelayServer.h +++ b/src/RelayServer.h @@ -47,6 +47,7 @@ struct MsgWebsocket : NonCopyable { struct MsgIngester : NonCopyable { struct ClientMessage { uint64_t connId; + std::string ipAddr; std::string payload; }; @@ -62,6 +63,7 @@ struct MsgIngester : NonCopyable { struct MsgWriter : NonCopyable { struct AddEvent { uint64_t connId; + std::string ipAddr; uint64_t receivedAt; std::string flatStr; std::string jsonStr; @@ -147,7 +149,7 @@ struct RelayServer { void runWebsocket(ThreadPool::Thread &thr); void runIngester(ThreadPool::Thread &thr); - void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output); + void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output); void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); diff --git a/src/RelayWebsocket.cpp b/src/RelayWebsocket.cpp index d286d91..b6dc1d3 100644 --- a/src/RelayWebsocket.cpp +++ b/src/RelayWebsocket.cpp @@ -139,7 +139,7 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { c.stats.bytesDown += length; c.stats.bytesDownCompressed += compressedSize; - tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length)}}); + tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length), ws->getAddressBytes()}}); }); diff --git a/src/RelayWriter.cpp b/src/RelayWriter.cpp index 10c60f5..7810a69 100644 --- a/src/RelayWriter.cpp +++ b/src/RelayWriter.cpp @@ -18,7 +18,8 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { for (auto &newMsg : newMsgs) { if (auto msg = std::get_if(&newMsg.msg)) { - newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, msg); + EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6; + newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg); } } diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index de82c7f..6eb4011 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -7,9 +7,16 @@ #include "events.h" +struct WriterPipelineInput { + tao::json::value eventJson; + EventSourceType sourceType; + std::string sourceInfo; +}; + + struct WriterPipeline { public: - hoytech::protected_queue inbox; + hoytech::protected_queue inbox; hoytech::protected_queue flushInbox; private: @@ -28,7 +35,7 @@ struct WriterPipeline { auto msgs = inbox.pop_all(); for (auto &m : msgs) { - if (m.is_null()) { + if (m.eventJson.is_null()) { writerInbox.push_move({}); break; } @@ -37,13 +44,13 @@ struct WriterPipeline { std::string jsonStr; try { - parseAndVerifyEvent(m, secpCtx, true, true, flatStr, jsonStr); + parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr); } catch (std::exception &e) { - LW << "Rejected event: " << m << " reason: " << e.what(); + LW << "Rejected event: " << m.eventJson << " reason: " << e.what(); continue; } - writerInbox.push_move({ std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us() }); + writerInbox.push_move({ std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), m.sourceType, std::move(m.sourceInfo) }); } } }); @@ -120,7 +127,7 @@ struct WriterPipeline { } void flush() { - inbox.push_move(tao::json::null); + inbox.push_move({ tao::json::null, EventSourceType::None, "" }); flushInbox.wait(); } }; diff --git a/src/cmd_import.cpp b/src/cmd_import.cpp index 0100f01..cb0f1df 100644 --- a/src/cmd_import.cpp +++ b/src/cmd_import.cpp @@ -86,7 +86,7 @@ void cmd_import(const std::vector &subArgs) { continue; } - newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us()); + newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), EventSourceType::Import, ""); if (newEvents.size() >= 10'000) flushChanges(); } diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index 7435e0b..96799d7 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -64,7 +64,7 @@ void cmd_stream(const std::vector &subArgs) { if (origJson.get_array().size() < 3) throw herr("array too short"); auto &evJson = origJson.at(2); downloadedIds.emplace(from_hex(evJson.at("id").get_string())); - writer.inbox.push_move(std::move(evJson)); + writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url }); } else { LW << "Unexpected EVENT"; } diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index 0bc3e85..25b3d4e 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -228,7 +228,7 @@ void cmd_sync(const std::vector &subArgs) { controller->finish(txn, [&](std::string_view newLeaf){ // FIXME: relay could crash client here by sending invalid JSON - writer.inbox.push_move(tao::json::from_string(std::string(newLeaf))); + writer.inbox.push_move(WriterPipelineInput{ tao::json::from_string(std::string(newLeaf)), EventSourceType::Sync, url }); }, [&](std::string_view){ } diff --git a/src/events.cpp b/src/events.cpp index e66d9b1..758c61c 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -276,7 +276,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector(flatStr.data()); quadKey = flatEventToQuadrableKey(flat); }