mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-19 01:34:57 +00:00
refactor cron
This commit is contained in:
@ -1,71 +1,86 @@
|
||||
#include <hoytech/timer.h>
|
||||
|
||||
#include "RelayServer.h"
|
||||
|
||||
#include "gc.h"
|
||||
|
||||
|
||||
void RelayServer::cleanupOldEvents() {
|
||||
std::vector<uint64_t> expiredLevIds;
|
||||
|
||||
{
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
auto mostRecent = getMostRecentLevId(txn);
|
||||
uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds;
|
||||
uint64_t currKind = 20'000;
|
||||
|
||||
while (currKind < 30'000) {
|
||||
uint64_t numRecs = 0;
|
||||
|
||||
env.generic_foreachFull(txn, env.dbi_Event__kind, makeKey_Uint64Uint64(currKind, 0), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
||||
numRecs++;
|
||||
ParsedKey_Uint64Uint64 parsedKey(k);
|
||||
currKind = parsedKey.n1;
|
||||
|
||||
if (currKind >= 30'000) return false;
|
||||
|
||||
if (parsedKey.n2 > cutoff) {
|
||||
currKind++;
|
||||
return false;
|
||||
}
|
||||
|
||||
uint64_t levId = lmdb::from_sv<uint64_t>(v);
|
||||
|
||||
if (levId != mostRecent) { // prevent levId re-use
|
||||
expiredLevIds.emplace_back(levId);
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if (numRecs == 0) break;
|
||||
}
|
||||
}
|
||||
|
||||
if (expiredLevIds.size() > 0) {
|
||||
auto qdb = getQdbInstance();
|
||||
|
||||
auto txn = env.txn_rw();
|
||||
|
||||
uint64_t numDeleted = 0;
|
||||
auto changes = qdb.change();
|
||||
|
||||
for (auto levId : expiredLevIds) {
|
||||
auto view = env.lookup_Event(txn, levId);
|
||||
if (!view) continue; // Deleted in between transactions
|
||||
deleteEvent(txn, changes, *view);
|
||||
numDeleted++;
|
||||
}
|
||||
|
||||
changes.apply(txn);
|
||||
|
||||
txn.commit();
|
||||
|
||||
if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events";
|
||||
}
|
||||
}
|
||||
|
||||
void RelayServer::garbageCollect() {
|
||||
void RelayServer::runCron() {
|
||||
auto qdb = getQdbInstance();
|
||||
|
||||
quadrableGarbageCollect(qdb, 1);
|
||||
hoytech::timer cron;
|
||||
|
||||
cron.setupCb = []{ setThreadName("cron"); };
|
||||
|
||||
|
||||
// Delete ephemeral events
|
||||
|
||||
cron.repeat(10 * 1'000'000UL, [&]{
|
||||
std::vector<uint64_t> expiredLevIds;
|
||||
|
||||
{
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
auto mostRecent = getMostRecentLevId(txn);
|
||||
uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds;
|
||||
uint64_t currKind = 20'000;
|
||||
|
||||
while (currKind < 30'000) {
|
||||
uint64_t numRecs = 0;
|
||||
|
||||
env.generic_foreachFull(txn, env.dbi_Event__kind, makeKey_Uint64Uint64(currKind, 0), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
||||
numRecs++;
|
||||
ParsedKey_Uint64Uint64 parsedKey(k);
|
||||
currKind = parsedKey.n1;
|
||||
|
||||
if (currKind >= 30'000) return false;
|
||||
|
||||
if (parsedKey.n2 > cutoff) {
|
||||
currKind++;
|
||||
return false;
|
||||
}
|
||||
|
||||
uint64_t levId = lmdb::from_sv<uint64_t>(v);
|
||||
|
||||
if (levId != mostRecent) { // prevent levId re-use
|
||||
expiredLevIds.emplace_back(levId);
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if (numRecs == 0) break;
|
||||
}
|
||||
}
|
||||
|
||||
if (expiredLevIds.size() > 0) {
|
||||
auto txn = env.txn_rw();
|
||||
|
||||
uint64_t numDeleted = 0;
|
||||
auto changes = qdb.change();
|
||||
|
||||
for (auto levId : expiredLevIds) {
|
||||
auto view = env.lookup_Event(txn, levId);
|
||||
if (!view) continue; // Deleted in between transactions
|
||||
deleteEvent(txn, changes, *view);
|
||||
numDeleted++;
|
||||
}
|
||||
|
||||
changes.apply(txn);
|
||||
|
||||
txn.commit();
|
||||
|
||||
if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events";
|
||||
}
|
||||
});
|
||||
|
||||
// Garbage collect quadrable nodes
|
||||
|
||||
cron.repeat(60 * 60 * 1'000'000UL, [&]{
|
||||
quadrableGarbageCollect(qdb, 1);
|
||||
});
|
||||
|
||||
cron.run();
|
||||
|
||||
while (1) std::this_thread::sleep_for(std::chrono::seconds(1'000'000));
|
||||
}
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <memory>
|
||||
#include <algorithm>
|
||||
|
||||
#include <hoytech/timer.h>
|
||||
#include <hoytech/time.h>
|
||||
#include <hoytech/hex.h>
|
||||
#include <hoytech/file_change_monitor.h>
|
||||
@ -142,7 +141,7 @@ struct RelayServer {
|
||||
ThreadPool<MsgReqWorker> tpReqWorker;
|
||||
ThreadPool<MsgReqMonitor> tpReqMonitor;
|
||||
ThreadPool<MsgYesstr> tpYesstr;
|
||||
hoytech::timer cron;
|
||||
std::thread cronThread;
|
||||
|
||||
void run();
|
||||
|
||||
@ -161,8 +160,7 @@ struct RelayServer {
|
||||
|
||||
void runYesstr(ThreadPool<MsgYesstr>::Thread &thr);
|
||||
|
||||
void cleanupOldEvents();
|
||||
void garbageCollect();
|
||||
void runCron();
|
||||
|
||||
// Utils (can be called by any thread)
|
||||
|
||||
|
@ -32,6 +32,10 @@ void RelayServer::run() {
|
||||
runYesstr(thr);
|
||||
});
|
||||
|
||||
cronThread = std::thread([this]{
|
||||
runCron();
|
||||
});
|
||||
|
||||
// Monitor for config file reloads
|
||||
|
||||
auto configFileChangeWatcher = hoytech::file_change_monitor(configFile);
|
||||
@ -42,19 +46,6 @@ void RelayServer::run() {
|
||||
loadConfig(configFile);
|
||||
});
|
||||
|
||||
// Cron
|
||||
|
||||
cron.repeat(10 * 1'000'000UL, [&]{
|
||||
cleanupOldEvents();
|
||||
});
|
||||
|
||||
cron.repeat(60 * 60 * 1'000'000UL, [&]{
|
||||
garbageCollect();
|
||||
});
|
||||
|
||||
cron.setupCb = []{ setThreadName("cron"); };
|
||||
|
||||
cron.run();
|
||||
|
||||
tpWebsocket.join();
|
||||
}
|
||||
|
Reference in New Issue
Block a user