mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-20 01:40:29 +00:00
bugfixes in sync
This commit is contained in:
@ -39,6 +39,8 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
|
|||||||
conns[connId].try_emplace(reqId);
|
conns[connId].try_emplace(reqId);
|
||||||
auto &s = conns[connId][reqId];
|
auto &s = conns[connId][reqId];
|
||||||
|
|
||||||
|
LI << "Yesstr sync. filter: '" << filterStr << "'";
|
||||||
|
|
||||||
if (filterStr == "{}") {
|
if (filterStr == "{}") {
|
||||||
qdb->checkout("events");
|
qdb->checkout("events");
|
||||||
uint64_t nodeId = qdb->getHeadNodeId(txn);
|
uint64_t nodeId = qdb->getHeadNodeId(txn);
|
||||||
@ -51,8 +53,6 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
|
|||||||
// FIXME: The following blocks the whole thread for the query duration. Should interleave it
|
// FIXME: The following blocks the whole thread for the query duration. Should interleave it
|
||||||
// with other requests like RelayReqWorker does.
|
// with other requests like RelayReqWorker does.
|
||||||
|
|
||||||
LI << "Yesstr sync: Running filter: " << filterStr;
|
|
||||||
|
|
||||||
std::vector<uint64_t> quadEventIds;
|
std::vector<uint64_t> quadEventIds;
|
||||||
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
|
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
|
||||||
Subscription sub(1, "junkSub", filterGroup);
|
Subscription sub(1, "junkSub", filterGroup);
|
||||||
@ -97,7 +97,6 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
|
|||||||
|
|
||||||
qdb->withMemStore(s->m, [&]{
|
qdb->withMemStore(s->m, [&]{
|
||||||
qdb->writeToMemStore = true;
|
qdb->writeToMemStore = true;
|
||||||
LI << "ZZZ NODE " << qdb->getHeadNodeId(txn);
|
|
||||||
resps = qdb->handleSyncRequests(txn, qdb->getHeadNodeId(txn), reqs, 100'000);
|
resps = qdb->handleSyncRequests(txn, qdb->getHeadNodeId(txn), reqs, 100'000);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -137,7 +136,11 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
|
|||||||
const auto *req = parseYesstrRequest(msg->yesstrMessage); // validated by ingester
|
const auto *req = parseYesstrRequest(msg->yesstrMessage); // validated by ingester
|
||||||
const auto *reqSync = req->payload_as<Yesstr::RequestSync>();
|
const auto *reqSync = req->payload_as<Yesstr::RequestSync>();
|
||||||
|
|
||||||
states.handleRequest(txn, msg->connId, req->requestId(), sv(reqSync->filter()), sv(reqSync->reqsEncoded()));
|
try {
|
||||||
|
states.handleRequest(txn, msg->connId, req->requestId(), sv(reqSync->filter()), sv(reqSync->reqsEncoded()));
|
||||||
|
} catch (std::exception &e) {
|
||||||
|
sendNoticeError(msg->connId, std::string("yesstr failure: ") + e.what());
|
||||||
|
}
|
||||||
} else if (auto msg = std::get_if<MsgYesstr::CloseConn>(&newMsg.msg)) {
|
} else if (auto msg = std::get_if<MsgYesstr::CloseConn>(&newMsg.msg)) {
|
||||||
states.closeConn(msg->connId);
|
states.closeConn(msg->connId);
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ class WSConnection {
|
|||||||
WSConnection(const std::string &url) : url(url) {}
|
WSConnection(const std::string &url) : url(url) {}
|
||||||
|
|
||||||
std::function<void()> onConnect;
|
std::function<void()> onConnect;
|
||||||
std::function<void(std::string_view, size_t)> onMessage;
|
std::function<void(std::string_view, uWS::OpCode, size_t)> onMessage;
|
||||||
std::function<void()> onTrigger;
|
std::function<void()> onTrigger;
|
||||||
bool reconnect = true;
|
bool reconnect = true;
|
||||||
uint64_t reconnectDelayMilliseconds = 5'000;
|
uint64_t reconnectDelayMilliseconds = 5'000;
|
||||||
@ -101,7 +101,7 @@ class WSConnection {
|
|||||||
if (!onMessage) return;
|
if (!onMessage) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
onMessage(std::string_view(message, length), compressedSize);
|
onMessage(std::string_view(message, length), opCode, compressedSize);
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
LW << "onMessage failure: " << e.what();
|
LW << "onMessage failure: " << e.what();
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ws.onMessage = [&](auto msg, size_t){
|
ws.onMessage = [&](auto msg, uWS::OpCode, size_t){
|
||||||
auto origJson = tao::json::from_string(msg);
|
auto origJson = tao::json::from_string(msg);
|
||||||
|
|
||||||
if (origJson.is_array()) {
|
if (origJson.is_array()) {
|
||||||
|
@ -120,8 +120,11 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
|||||||
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
|
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
|
||||||
|
|
||||||
std::string url = args["<url>"].asString();
|
std::string url = args["<url>"].asString();
|
||||||
|
|
||||||
std::string filterStr;
|
std::string filterStr;
|
||||||
if (args["--filter"]) filterStr = args["--filter"].asString();
|
if (args["--filter"]) filterStr = args["--filter"].asString();
|
||||||
|
else filterStr = "{}";
|
||||||
|
|
||||||
std::string dir = args["--dir"] ? args["--dir"].asString() : "down";
|
std::string dir = args["--dir"] ? args["--dir"].asString() : "down";
|
||||||
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
|
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
|
||||||
if (dir != "down") throw herr("only down currently supported"); // FIXME
|
if (dir != "down") throw herr("only down currently supported"); // FIXME
|
||||||
@ -147,8 +150,16 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
|||||||
if (filterStr.size()) {
|
if (filterStr.size()) {
|
||||||
std::vector<uint64_t> quadEventIds;
|
std::vector<uint64_t> quadEventIds;
|
||||||
|
|
||||||
std::string filterStr = args["--filter"].asString();
|
tao::json::value filterJson;
|
||||||
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
|
|
||||||
|
try {
|
||||||
|
filterJson = tao::json::from_string(filterStr);
|
||||||
|
} catch (std::exception &e) {
|
||||||
|
LE << "Couldn't parse filter JSON: " << filterStr;
|
||||||
|
::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto filterGroup = NostrFilterGroup::unwrapped(filterJson);
|
||||||
|
|
||||||
Subscription sub(1, "junkSub", filterGroup);
|
Subscription sub(1, "junkSub", filterGroup);
|
||||||
|
|
||||||
@ -196,7 +207,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
|||||||
controller->sendRequests(txn, filterStr);
|
controller->sendRequests(txn, filterStr);
|
||||||
};
|
};
|
||||||
|
|
||||||
ws.onMessage = [&](auto msg, size_t compressedSize){
|
ws.onMessage = [&](auto msg, uWS::OpCode opCode, size_t compressedSize){
|
||||||
auto txn = env.txn_ro();
|
auto txn = env.txn_ro();
|
||||||
|
|
||||||
if (!controller) {
|
if (!controller) {
|
||||||
@ -204,6 +215,11 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (opCode == uWS::OpCode::TEXT) {
|
||||||
|
LW << "Unexpected non-yesstr message from relay: " << msg;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
LI << "RECV size=" << msg.size() << " compressed=" << compressedSize;
|
LI << "RECV size=" << msg.size() << " compressed=" << compressedSize;
|
||||||
controller->handleResponses(txn, msg);
|
controller->handleResponses(txn, msg);
|
||||||
|
|
||||||
@ -211,6 +227,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
|||||||
LI << "Syncing done, writing/sending events";
|
LI << "Syncing done, writing/sending events";
|
||||||
controller->finish(txn,
|
controller->finish(txn,
|
||||||
[&](std::string_view newLeaf){
|
[&](std::string_view newLeaf){
|
||||||
|
// FIXME: relay could crash client here by sending invalid JSON
|
||||||
writer.inbox.push_move(tao::json::from_string(std::string(newLeaf)));
|
writer.inbox.push_move(tao::json::from_string(std::string(newLeaf)));
|
||||||
},
|
},
|
||||||
[&](std::string_view){
|
[&](std::string_view){
|
||||||
|
Reference in New Issue
Block a user