track sources of events

This commit is contained in:
Doug Hoyte
2023-02-07 08:08:29 -05:00
parent 89540bc4c7
commit 376d7cbf1f
12 changed files with 42 additions and 18 deletions

2
golpe

Submodule golpe updated: 31b657a426...c95388c461

View File

@ -27,6 +27,9 @@ tables:
- name: flat
type: ubytes
nestedFlat: NostrIndex.Event
- name: sourceType
- name: sourceInfo
type: ubytes
indices:
created_at:

View File

@ -29,7 +29,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
if (cfg().relay__logging__dumpInEvents) LI << "[" << msg->connId << "] dumpInEvent: " << msg->payload;
try {
ingesterProcessEvent(txn, msg->connId, secpCtx, arr[1], writerMsgs);
ingesterProcessEvent(txn, msg->connId, msg->ipAddr, secpCtx, arr[1], writerMsgs);
} catch (std::exception &e) {
sendOKResponse(msg->connId, arr[1].at("id").get_string(), false, std::string("invalid: ") + e.what());
LI << "Rejected invalid event: " << e.what();
@ -82,7 +82,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
}
}
void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output) {
void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output) {
std::string flatStr, jsonStr;
parseAndVerifyEvent(origJson, secpCtx, true, true, flatStr, jsonStr);
@ -98,7 +98,7 @@ void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k
}
}
output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}});
output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}});
}
void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {

View File

@ -47,6 +47,7 @@ struct MsgWebsocket : NonCopyable {
struct MsgIngester : NonCopyable {
struct ClientMessage {
uint64_t connId;
std::string ipAddr;
std::string payload;
};
@ -62,6 +63,7 @@ struct MsgIngester : NonCopyable {
struct MsgWriter : NonCopyable {
struct AddEvent {
uint64_t connId;
std::string ipAddr;
uint64_t receivedAt;
std::string flatStr;
std::string jsonStr;
@ -147,7 +149,7 @@ struct RelayServer {
void runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr);
void runIngester(ThreadPool<MsgIngester>::Thread &thr);
void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output);
void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output);
void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);

View File

@ -139,7 +139,7 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
c.stats.bytesDown += length;
c.stats.bytesDownCompressed += compressedSize;
tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length)}});
tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length), ws->getAddressBytes()}});
});

View File

@ -18,7 +18,8 @@ void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, msg);
EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6;
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);
}
}

View File

@ -7,9 +7,16 @@
#include "events.h"
struct WriterPipelineInput {
tao::json::value eventJson;
EventSourceType sourceType;
std::string sourceInfo;
};
struct WriterPipeline {
public:
hoytech::protected_queue<tao::json::value> inbox;
hoytech::protected_queue<WriterPipelineInput> inbox;
hoytech::protected_queue<bool> flushInbox;
private:
@ -28,7 +35,7 @@ struct WriterPipeline {
auto msgs = inbox.pop_all();
for (auto &m : msgs) {
if (m.is_null()) {
if (m.eventJson.is_null()) {
writerInbox.push_move({});
break;
}
@ -37,13 +44,13 @@ struct WriterPipeline {
std::string jsonStr;
try {
parseAndVerifyEvent(m, secpCtx, true, true, flatStr, jsonStr);
parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr);
} catch (std::exception &e) {
LW << "Rejected event: " << m << " reason: " << e.what();
LW << "Rejected event: " << m.eventJson << " reason: " << e.what();
continue;
}
writerInbox.push_move({ std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us() });
writerInbox.push_move({ std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), m.sourceType, std::move(m.sourceInfo) });
}
}
});
@ -120,7 +127,7 @@ struct WriterPipeline {
}
void flush() {
inbox.push_move(tao::json::null);
inbox.push_move({ tao::json::null, EventSourceType::None, "" });
flushInbox.wait();
}
};

View File

@ -86,7 +86,7 @@ void cmd_import(const std::vector<std::string> &subArgs) {
continue;
}
newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us());
newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), EventSourceType::Import, "");
if (newEvents.size() >= 10'000) flushChanges();
}

View File

@ -64,7 +64,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
if (origJson.get_array().size() < 3) throw herr("array too short");
auto &evJson = origJson.at(2);
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.inbox.push_move(std::move(evJson));
writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url });
} else {
LW << "Unexpected EVENT";
}

View File

@ -228,7 +228,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
controller->finish(txn,
[&](std::string_view newLeaf){
// FIXME: relay could crash client here by sending invalid JSON
writer.inbox.push_move(tao::json::from_string(std::string(newLeaf)));
writer.inbox.push_move(WriterPipelineInput{ tao::json::from_string(std::string(newLeaf)), EventSourceType::Sync, url });
},
[&](std::string_view){
}

View File

@ -276,7 +276,7 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
}
if (ev.status == EventWriteStatus::Pending) {
ev.levId = env.insert_Event(txn, ev.receivedAt, ev.flatStr);
ev.levId = env.insert_Event(txn, ev.receivedAt, ev.flatStr, (uint64_t)ev.sourceType, ev.sourceInfo);
tmpBuf.clear();
tmpBuf += '\x00';

View File

@ -59,6 +59,15 @@ inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) {
enum class EventSourceType {
None = 0,
IP4 = 1,
IP6 = 2,
Import = 3,
Stream = 4,
Sync = 5,
};
enum class EventWriteStatus {
@ -74,6 +83,8 @@ struct EventToWrite {
std::string flatStr;
std::string jsonStr;
uint64_t receivedAt;
EventSourceType sourceType;
std::string sourceInfo;
void *userData = nullptr;
quadrable::Key quadKey;
EventWriteStatus status = EventWriteStatus::Pending;
@ -81,7 +92,7 @@ struct EventToWrite {
EventToWrite() {}
EventToWrite(std::string flatStr, std::string jsonStr, uint64_t receivedAt, void *userData = nullptr) : flatStr(flatStr), jsonStr(jsonStr), receivedAt(receivedAt), userData(userData) {
EventToWrite(std::string flatStr, std::string jsonStr, uint64_t receivedAt, EventSourceType sourceType, std::string sourceInfo, void *userData = nullptr) : flatStr(flatStr), jsonStr(jsonStr), receivedAt(receivedAt), sourceType(sourceType), sourceInfo(sourceInfo), userData(userData) {
const NostrIndex::Event *flat = flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
quadKey = flatEventToQuadrableKey(flat);
}