get rid of thread locking: just do simple round robin

This commit is contained in:
Doug Hoyte
2024-08-29 15:12:54 -04:00
parent 9f99053ec2
commit b1781d6576
3 changed files with 10 additions and 53 deletions

View File

@ -11,12 +11,10 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
uWS::Group<uWS::SERVER> *hubGroup;
flat_hash_map<uint64_t, Connection*> connIdToConnection;
uint64_t nextConnectionId = 1;
uint64_t requestCounter = 0;
flat_hash_map<uWS::HttpResponse *, HTTPRequest> receivingRequests;
std::vector<bool> tpReaderLock(tpReader.numThreads, false);
std::queue<MsgWebReader> pendingReaderMessages;
{
int extensionOptions = 0;
@ -49,20 +47,8 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
c->pendingRequests.insert(res);
if (req.method == uWS::HttpMethod::METHOD_GET) {
auto m = MsgWebReader{MsgWebReader::Request{std::move(req), MAX_U64}};
bool didDispatch = false;
for (uint64_t i = 0; i < tpReader.numThreads; i++) {
if (tpReaderLock[i] == false) {
tpReaderLock[i] = true;
std::get<MsgWebReader::Request>(m.msg).lockedThreadId = i;
tpReader.dispatch(i, std::move(m));
didDispatch = true;
break;
}
}
if (!didDispatch) pendingReaderMessages.emplace(std::move(m));
auto m = MsgWebReader{MsgWebReader::Request{std::move(req)}};
tpReader.dispatch(requestCounter++, std::move(m));
} else if (req.method == uWS::HttpMethod::METHOD_POST) {
if (remainingBytes) {
receivingRequests.emplace(res, std::move(req));
@ -87,20 +73,6 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
});
auto unlockThread = [&](uint64_t lockedThreadId){
if (lockedThreadId == MAX_U64) return;
if (tpReaderLock.at(lockedThreadId) == false) throw herr("tried to unlock already unlocked reader lock!");
if (pendingReaderMessages.empty()) {
tpReaderLock[lockedThreadId] = false;
} else {
std::get<MsgWebReader::Request>(pendingReaderMessages.front().msg).lockedThreadId = lockedThreadId;
tpReader.dispatch(lockedThreadId, std::move(pendingReaderMessages.front()));
pendingReaderMessages.pop();
}
};
std::function<void()> asyncCb = [&]{
auto newMsgs = thr.inbox.pop_all_no_wait();
@ -118,10 +90,6 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
c.pendingRequests.erase(msg->res);
msg->res->end(msg->payload.data(), msg->payload.size());
unlockThread(msg->lockedThreadId);
} else if (auto msg = std::get_if<MsgHttpsocket::Unlock>(&newMsg.msg)) {
unlockThread(msg->lockedThreadId);
}
}
};

View File

@ -367,7 +367,7 @@ void WebServer::runReader(ThreadPool<MsgWebReader>::Thread &thr) {
try {
HTTPResponse resp = generateReadResponse(txn, decomp, msg->req);
std::string payload = resp.encode(msg->req.acceptGzip);
sendHttpResponseAndUnlock(msg->lockedThreadId, msg->req, payload);
sendHttpResponseRaw(msg->req, payload);
} catch (std::exception &e) {
HTTPResponse res;
res.code = "500 Server Error";
@ -375,7 +375,7 @@ void WebServer::runReader(ThreadPool<MsgWebReader>::Thread &thr) {
std::string payload = res.encode(false);
sendHttpResponseAndUnlock(msg->lockedThreadId, msg->req, payload);
sendHttpResponseRaw(msg->req, payload);
LE << "500 server error: " << e.what();
}
}

View File

@ -39,14 +39,9 @@ struct MsgHttpsocket : NonCopyable {
uint64_t connId;
uWS::HttpResponse *res;
std::string payload;
uint64_t lockedThreadId;
};
struct Unlock {
uint64_t lockedThreadId;
};
using Var = std::variant<Send, Unlock>;
using Var = std::variant<Send>;
Var msg;
MsgHttpsocket(Var &&msg_) : msg(std::move(msg_)) {}
};
@ -54,7 +49,6 @@ struct MsgHttpsocket : NonCopyable {
struct MsgWebReader : NonCopyable {
struct Request {
HTTPRequest req;
uint64_t lockedThreadId;
};
using Var = std::variant<Request>;
@ -96,14 +90,9 @@ struct WebServer {
// Utils
void unlockThread(uint64_t lockedThreadId) {
tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Unlock{lockedThreadId}});
hubTrigger->send();
}
// Moves from payload!
void sendHttpResponseAndUnlock(uint64_t lockedThreadId, const HTTPRequest &req, std::string &payload) {
tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Send{req.connId, req.res, std::move(payload), lockedThreadId}});
void sendHttpResponseRaw(const HTTPRequest &req, std::string &payload) {
tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Send{req.connId, req.res, std::move(payload)}});
hubTrigger->send();
}
@ -111,10 +100,10 @@ struct WebServer {
HTTPResponse res;
res.code = code;
res.contentType = contentType;
res.body = std::string(body); // FIXME: copy
res.body = std::string(body); // FIXME: don't copy
std::string payload = res.encode(false);
sendHttpResponseAndUnlock(MAX_U64, req, payload);
sendHttpResponseRaw(req, payload);
}
};