diff --git a/docs/negentropy.md b/docs/negentropy.md index 313c16e..ac79133 100644 --- a/docs/negentropy.md +++ b/docs/negentropy.md @@ -58,6 +58,8 @@ Current reason codes are: * Because the `NEG-OPEN` queries are stateful, relays may choose to time-out inactive queries to recover memory resources * `FILTER_NOT_FOUND` * If an event ID is used as the filter, this error will be returned if the relay does not have this event. The client should retry with the full filter, or upload the event to the relay. +* `FILTER_INVALID` + * The event's `content` was not valid JSON, or the filter was invalid for some other reason. After a `NEG-ERR` is issued, the subscription is considered to be closed. diff --git a/src/RelayIngester.cpp b/src/RelayIngester.cpp index 3f31b57..66c6432 100644 --- a/src/RelayIngester.cpp +++ b/src/RelayIngester.cpp @@ -3,6 +3,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); + Decompressor decomp; while(1) { auto newMsgs = thr.inbox.pop_all(); @@ -52,7 +53,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { } } else if (cmd.starts_with("NEG-")) { try { - ingesterProcessNegentropy(txn, msg->connId, arr); + ingesterProcessNegentropy(txn, decomp, msg->connId, arr); } catch (std::exception &e) { sendNoticeError(msg->connId, std::string("bad negentropy: ") + e.what()); } @@ -115,11 +116,42 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::RemoveSub{connId, SubId(arr[1].get_string())}}); } -void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) { +void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &arr) { if (arr.at(0) == "NEG-OPEN") { if (arr.get_array().size() < 5) throw herr("negentropy query missing elements"); - Subscription sub(connId, arr[1].get_string(), NostrFilterGroup::unwrapped(arr.at(2))); + NostrFilterGroup filter({}); + + if (arr.at(2).is_string()) { + auto ev = lookupEventById(txn, from_hex(arr.at(2).get_string())); + if (!ev) { + sendToConn(connId, tao::json::to_string(tao::json::value::array({ + "NEG-ERR", + arr[1].get_string(), + "FILTER_NOT_FOUND" + }))); + + return; + } + + tao::json::value json = tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId)); + + try { + filter = std::move(NostrFilterGroup::unwrapped(tao::json::from_string(json.at("content").get_string()))); + } catch (std::exception &e) { + sendToConn(connId, tao::json::to_string(tao::json::value::array({ + "NEG-ERR", + arr[1].get_string(), + "FILTER_INVALID" + }))); + + return; + } + } else { + filter = std::move(NostrFilterGroup::unwrapped(arr.at(2))); + } + + Subscription sub(connId, arr[1].get_string(), std::move(filter)); uint64_t idSize = arr.at(3).get_unsigned(); if (idSize < 8 || idSize > 32) throw herr("idSize out of range"); diff --git a/src/RelayServer.h b/src/RelayServer.h index 52aeee7..236267c 100644 --- a/src/RelayServer.h +++ b/src/RelayServer.h @@ -16,6 +16,7 @@ #include "ThreadPool.h" #include "events.h" #include "filters.h" +#include "Decompressor.h" @@ -161,7 +162,7 @@ struct RelayServer { void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector &output); void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); - void ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson); + void ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &origJson); void runWriter(ThreadPool::Thread &thr);