mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-17 08:48:51 +00:00

- It is now up to the caller to do so - QueryScheduler now can optionally not bother to ensure that the events are fresh
112 lines
3.3 KiB
C++
112 lines
3.3 KiB
C++
#pragma once
|
|
|
|
#include "DBQuery.h"
|
|
|
|
|
|
struct QueryScheduler : NonCopyable {
|
|
std::function<void(lmdb::txn &txn, const Subscription &sub, uint64_t levId, std::string_view eventPayload)> onEvent;
|
|
std::function<void(lmdb::txn &txn, const Subscription &sub, const std::vector<uint64_t> &levIds)> onEventBatch;
|
|
std::function<void(lmdb::txn &txn, Subscription &sub)> onComplete;
|
|
|
|
// If false, then levIds returned to above callbacks can be stale (because they were deleted)
|
|
// If false, then onEvent's eventPayload will always be ""
|
|
bool ensureExists = true;
|
|
|
|
using ConnQueries = flat_hash_map<SubId, DBQuery*>;
|
|
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBQuery*
|
|
std::deque<DBQuery*> running;
|
|
std::vector<uint64_t> levIdBatch;
|
|
|
|
bool addSub(lmdb::txn &txn, Subscription &&sub) {
|
|
sub.latestEventId = getMostRecentLevId(txn);
|
|
|
|
{
|
|
auto *existing = findQuery(sub.connId, sub.subId);
|
|
if (existing) removeSub(sub.connId, sub.subId);
|
|
}
|
|
|
|
auto res = conns.try_emplace(sub.connId);
|
|
auto &connQueries = res.first->second;
|
|
|
|
if (connQueries.size() >= cfg().relay__maxSubsPerConnection) {
|
|
return false;
|
|
}
|
|
|
|
DBQuery *q = new DBQuery(sub);
|
|
|
|
connQueries.try_emplace(q->sub.subId, q);
|
|
running.push_front(q);
|
|
|
|
return true;
|
|
}
|
|
|
|
DBQuery *findQuery(uint64_t connId, const SubId &subId) {
|
|
auto f1 = conns.find(connId);
|
|
if (f1 == conns.end()) return nullptr;
|
|
|
|
auto f2 = f1->second.find(subId);
|
|
if (f2 == f1->second.end()) return nullptr;
|
|
|
|
return f2->second;
|
|
}
|
|
|
|
void removeSub(uint64_t connId, const SubId &subId) {
|
|
auto *query = findQuery(connId, subId);
|
|
if (!query) return;
|
|
query->dead = true;
|
|
conns[connId].erase(subId);
|
|
if (conns[connId].empty()) conns.erase(connId);
|
|
}
|
|
|
|
void closeConn(uint64_t connId) {
|
|
auto f1 = conns.find(connId);
|
|
if (f1 == conns.end()) return;
|
|
|
|
for (auto &[k, v] : f1->second) v->dead = true;
|
|
|
|
conns.erase(connId);
|
|
}
|
|
|
|
void process(lmdb::txn &txn) {
|
|
if (running.empty()) return;
|
|
|
|
DBQuery *q = running.front();
|
|
running.pop_front();
|
|
|
|
if (q->dead) {
|
|
delete q;
|
|
return;
|
|
}
|
|
|
|
auto eventPayloadCursor = lmdb::cursor::open(txn, env.dbi_EventPayload);
|
|
|
|
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId){
|
|
std::string_view eventPayload;
|
|
|
|
if (ensureExists) {
|
|
std::string_view key = lmdb::to_sv<uint64_t>(levId);
|
|
if (!eventPayloadCursor.get(key, eventPayload, MDB_SET_KEY)) return; // If not found, was deleted while scan was paused
|
|
}
|
|
|
|
if (onEvent) onEvent(txn, sub, levId, eventPayload);
|
|
if (onEventBatch) levIdBatch.push_back(levId);
|
|
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
|
|
|
|
if (onEventBatch) {
|
|
onEventBatch(txn, q->sub, levIdBatch);
|
|
levIdBatch.clear();
|
|
}
|
|
|
|
if (complete) {
|
|
auto connId = q->sub.connId;
|
|
removeSub(connId, q->sub.subId);
|
|
|
|
if (onComplete) onComplete(txn, q->sub);
|
|
|
|
delete q;
|
|
} else {
|
|
running.push_back(q);
|
|
}
|
|
}
|
|
};
|