diff --git a/src/RelayYesstr.cpp b/src/RelayYesstr.cpp index 0c7c00f..6f8d151 100644 --- a/src/RelayYesstr.cpp +++ b/src/RelayYesstr.cpp @@ -39,6 +39,8 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { conns[connId].try_emplace(reqId); auto &s = conns[connId][reqId]; + LI << "Yesstr sync. filter: '" << filterStr << "'"; + if (filterStr == "{}") { qdb->checkout("events"); uint64_t nodeId = qdb->getHeadNodeId(txn); @@ -51,8 +53,6 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { // FIXME: The following blocks the whole thread for the query duration. Should interleave it // with other requests like RelayReqWorker does. - LI << "Yesstr sync: Running filter: " << filterStr; - std::vector quadEventIds; auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr)); Subscription sub(1, "junkSub", filterGroup); @@ -97,7 +97,6 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { qdb->withMemStore(s->m, [&]{ qdb->writeToMemStore = true; - LI << "ZZZ NODE " << qdb->getHeadNodeId(txn); resps = qdb->handleSyncRequests(txn, qdb->getHeadNodeId(txn), reqs, 100'000); }); @@ -137,7 +136,11 @@ void RelayServer::runYesstr(ThreadPool::Thread &thr) { const auto *req = parseYesstrRequest(msg->yesstrMessage); // validated by ingester const auto *reqSync = req->payload_as(); - states.handleRequest(txn, msg->connId, req->requestId(), sv(reqSync->filter()), sv(reqSync->reqsEncoded())); + try { + states.handleRequest(txn, msg->connId, req->requestId(), sv(reqSync->filter()), sv(reqSync->reqsEncoded())); + } catch (std::exception &e) { + sendNoticeError(msg->connId, std::string("yesstr failure: ") + e.what()); + } } else if (auto msg = std::get_if(&newMsg.msg)) { states.closeConn(msg->connId); } diff --git a/src/WSConnection.h b/src/WSConnection.h index ef596a0..73ecb5e 100644 --- a/src/WSConnection.h +++ b/src/WSConnection.h @@ -21,7 +21,7 @@ class WSConnection { WSConnection(const std::string &url) : url(url) {} std::function onConnect; - std::function onMessage; + std::function onMessage; std::function onTrigger; bool reconnect = true; uint64_t reconnectDelayMilliseconds = 5'000; @@ -101,7 +101,7 @@ class WSConnection { if (!onMessage) return; try { - onMessage(std::string_view(message, length), compressedSize); + onMessage(std::string_view(message, length), opCode, compressedSize); } catch (std::exception &e) { LW << "onMessage failure: " << e.what(); } diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index eb51bdf..49f6ff1 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -42,7 +42,7 @@ void cmd_stream(const std::vector &subArgs) { } }; - ws.onMessage = [&](auto msg, size_t){ + ws.onMessage = [&](auto msg, uWS::OpCode, size_t){ auto origJson = tao::json::from_string(msg); if (origJson.is_array()) { diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index 5a1d92e..deb9643 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -120,8 +120,11 @@ void cmd_sync(const std::vector &subArgs) { std::map args = docopt::docopt(USAGE, subArgs, true, ""); std::string url = args[""].asString(); + std::string filterStr; if (args["--filter"]) filterStr = args["--filter"].asString(); + else filterStr = "{}"; + std::string dir = args["--dir"] ? args["--dir"].asString() : "down"; if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both"); if (dir != "down") throw herr("only down currently supported"); // FIXME @@ -147,8 +150,16 @@ void cmd_sync(const std::vector &subArgs) { if (filterStr.size()) { std::vector quadEventIds; - std::string filterStr = args["--filter"].asString(); - auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr)); + tao::json::value filterJson; + + try { + filterJson = tao::json::from_string(filterStr); + } catch (std::exception &e) { + LE << "Couldn't parse filter JSON: " << filterStr; + ::exit(1); + } + + auto filterGroup = NostrFilterGroup::unwrapped(filterJson); Subscription sub(1, "junkSub", filterGroup); @@ -196,7 +207,7 @@ void cmd_sync(const std::vector &subArgs) { controller->sendRequests(txn, filterStr); }; - ws.onMessage = [&](auto msg, size_t compressedSize){ + ws.onMessage = [&](auto msg, uWS::OpCode opCode, size_t compressedSize){ auto txn = env.txn_ro(); if (!controller) { @@ -204,6 +215,11 @@ void cmd_sync(const std::vector &subArgs) { return; } + if (opCode == uWS::OpCode::TEXT) { + LW << "Unexpected non-yesstr message from relay: " << msg; + return; + } + LI << "RECV size=" << msg.size() << " compressed=" << compressedSize; controller->handleResponses(txn, msg); @@ -211,6 +227,7 @@ void cmd_sync(const std::vector &subArgs) { LI << "Syncing done, writing/sending events"; controller->finish(txn, [&](std::string_view newLeaf){ + // FIXME: relay could crash client here by sending invalid JSON writer.inbox.push_move(tao::json::from_string(std::string(newLeaf))); }, [&](std::string_view){