diff --git a/src/RelayCron.cpp b/src/RelayCron.cpp index 7e19d81..5d6f30e 100644 --- a/src/RelayCron.cpp +++ b/src/RelayCron.cpp @@ -1,71 +1,86 @@ +#include + #include "RelayServer.h" #include "gc.h" -void RelayServer::cleanupOldEvents() { - std::vector expiredLevIds; - - { - auto txn = env.txn_ro(); - - auto mostRecent = getMostRecentLevId(txn); - uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds; - uint64_t currKind = 20'000; - - while (currKind < 30'000) { - uint64_t numRecs = 0; - - env.generic_foreachFull(txn, env.dbi_Event__kind, makeKey_Uint64Uint64(currKind, 0), lmdb::to_sv(0), [&](auto k, auto v) { - numRecs++; - ParsedKey_Uint64Uint64 parsedKey(k); - currKind = parsedKey.n1; - - if (currKind >= 30'000) return false; - - if (parsedKey.n2 > cutoff) { - currKind++; - return false; - } - - uint64_t levId = lmdb::from_sv(v); - - if (levId != mostRecent) { // prevent levId re-use - expiredLevIds.emplace_back(levId); - } - - return true; - }); - - if (numRecs == 0) break; - } - } - - if (expiredLevIds.size() > 0) { - auto qdb = getQdbInstance(); - - auto txn = env.txn_rw(); - - uint64_t numDeleted = 0; - auto changes = qdb.change(); - - for (auto levId : expiredLevIds) { - auto view = env.lookup_Event(txn, levId); - if (!view) continue; // Deleted in between transactions - deleteEvent(txn, changes, *view); - numDeleted++; - } - - changes.apply(txn); - - txn.commit(); - - if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events"; - } -} - -void RelayServer::garbageCollect() { +void RelayServer::runCron() { auto qdb = getQdbInstance(); - quadrableGarbageCollect(qdb, 1); + hoytech::timer cron; + + cron.setupCb = []{ setThreadName("cron"); }; + + + // Delete ephemeral events + + cron.repeat(10 * 1'000'000UL, [&]{ + std::vector expiredLevIds; + + { + auto txn = env.txn_ro(); + + auto mostRecent = getMostRecentLevId(txn); + uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds; + uint64_t currKind = 20'000; + + while (currKind < 30'000) { + uint64_t numRecs = 0; + + env.generic_foreachFull(txn, env.dbi_Event__kind, makeKey_Uint64Uint64(currKind, 0), lmdb::to_sv(0), [&](auto k, auto v) { + numRecs++; + ParsedKey_Uint64Uint64 parsedKey(k); + currKind = parsedKey.n1; + + if (currKind >= 30'000) return false; + + if (parsedKey.n2 > cutoff) { + currKind++; + return false; + } + + uint64_t levId = lmdb::from_sv(v); + + if (levId != mostRecent) { // prevent levId re-use + expiredLevIds.emplace_back(levId); + } + + return true; + }); + + if (numRecs == 0) break; + } + } + + if (expiredLevIds.size() > 0) { + auto txn = env.txn_rw(); + + uint64_t numDeleted = 0; + auto changes = qdb.change(); + + for (auto levId : expiredLevIds) { + auto view = env.lookup_Event(txn, levId); + if (!view) continue; // Deleted in between transactions + deleteEvent(txn, changes, *view); + numDeleted++; + } + + changes.apply(txn); + + txn.commit(); + + if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events"; + } + }); + + // Garbage collect quadrable nodes + + cron.repeat(60 * 60 * 1'000'000UL, [&]{ + quadrableGarbageCollect(qdb, 1); + }); + + cron.run(); + + while (1) std::this_thread::sleep_for(std::chrono::seconds(1'000'000)); } diff --git a/src/RelayServer.h b/src/RelayServer.h index 4585dd0..deba101 100644 --- a/src/RelayServer.h +++ b/src/RelayServer.h @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -142,7 +141,7 @@ struct RelayServer { ThreadPool tpReqWorker; ThreadPool tpReqMonitor; ThreadPool tpYesstr; - hoytech::timer cron; + std::thread cronThread; void run(); @@ -161,8 +160,7 @@ struct RelayServer { void runYesstr(ThreadPool::Thread &thr); - void cleanupOldEvents(); - void garbageCollect(); + void runCron(); // Utils (can be called by any thread) diff --git a/src/cmd_relay.cpp b/src/cmd_relay.cpp index d1cc0b3..f62ce3f 100644 --- a/src/cmd_relay.cpp +++ b/src/cmd_relay.cpp @@ -32,6 +32,10 @@ void RelayServer::run() { runYesstr(thr); }); + cronThread = std::thread([this]{ + runCron(); + }); + // Monitor for config file reloads auto configFileChangeWatcher = hoytech::file_change_monitor(configFile); @@ -42,19 +46,6 @@ void RelayServer::run() { loadConfig(configFile); }); - // Cron - - cron.repeat(10 * 1'000'000UL, [&]{ - cleanupOldEvents(); - }); - - cron.repeat(60 * 60 * 1'000'000UL, [&]{ - garbageCollect(); - }); - - cron.setupCb = []{ setThreadName("cron"); }; - - cron.run(); tpWebsocket.join(); }