in negentropy handler, implement queries stored in events

This commit is contained in:
Doug Hoyte
2023-05-19 01:58:03 -04:00
parent e2816f6bd5
commit af0fa71f26
3 changed files with 39 additions and 4 deletions

View File

@ -58,6 +58,8 @@ Current reason codes are:
* Because the `NEG-OPEN` queries are stateful, relays may choose to time-out inactive queries to recover memory resources
* `FILTER_NOT_FOUND`
* If an event ID is used as the filter, this error will be returned if the relay does not have this event. The client should retry with the full filter, or upload the event to the relay.
* `FILTER_INVALID`
* The event's `content` was not valid JSON, or the filter was invalid for some other reason.
After a `NEG-ERR` is issued, the subscription is considered to be closed.

View File

@ -3,6 +3,7 @@
void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
Decompressor decomp;
while(1) {
auto newMsgs = thr.inbox.pop_all();
@ -52,7 +53,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
}
} else if (cmd.starts_with("NEG-")) {
try {
ingesterProcessNegentropy(txn, msg->connId, arr);
ingesterProcessNegentropy(txn, decomp, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad negentropy: ") + e.what());
}
@ -115,11 +116,42 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::RemoveSub{connId, SubId(arr[1].get_string())}});
}
void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &arr) {
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 5) throw herr("negentropy query missing elements");
Subscription sub(connId, arr[1].get_string(), NostrFilterGroup::unwrapped(arr.at(2)));
NostrFilterGroup filter({});
if (arr.at(2).is_string()) {
auto ev = lookupEventById(txn, from_hex(arr.at(2).get_string()));
if (!ev) {
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
arr[1].get_string(),
"FILTER_NOT_FOUND"
})));
return;
}
tao::json::value json = tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId));
try {
filter = std::move(NostrFilterGroup::unwrapped(tao::json::from_string(json.at("content").get_string())));
} catch (std::exception &e) {
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
arr[1].get_string(),
"FILTER_INVALID"
})));
return;
}
} else {
filter = std::move(NostrFilterGroup::unwrapped(arr.at(2)));
}
Subscription sub(connId, arr[1].get_string(), std::move(filter));
uint64_t idSize = arr.at(3).get_unsigned();
if (idSize < 8 || idSize > 32) throw herr("idSize out of range");

View File

@ -16,6 +16,7 @@
#include "ThreadPool.h"
#include "events.h"
#include "filters.h"
#include "Decompressor.h"
@ -161,7 +162,7 @@ struct RelayServer {
void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output);
void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessNegentropy(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &origJson);
void runWriter(ThreadPool<MsgWriter>::Thread &thr);