update negentropy

This commit is contained in:
Doug Hoyte
2023-12-05 18:17:58 -05:00
parent eb24824b53
commit fa5ae606c6
12 changed files with 143 additions and 20 deletions

View File

@ -68,11 +68,13 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
{
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
for (auto levId : levIds) {
deleteEvent(txn, levId);
deleteEvent(txn, levId, negentropyStorage);
}
negentropyStorage.flush();
txn.commit();
}
}

View File

@ -108,13 +108,15 @@ void RelayServer::runCron() {
if (expiredLevIds.size() > 0) {
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
uint64_t numDeleted = 0;
for (auto levId : expiredLevIds) {
if (deleteEvent(txn, levId)) numDeleted++;
if (deleteEvent(txn, levId, negentropyStorage)) numDeleted++;
}
negentropyStorage.flush();
txn.commit();
if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")";

View File

@ -132,7 +132,7 @@ void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const ta
void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &arr) {
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 5) throw herr("negentropy query missing elements");
if (arr.get_array().size() < 4) throw herr("negentropy query missing elements");
NostrFilterGroup filter;
auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1;

View File

@ -42,6 +42,20 @@ struct NegentropyViews {
}
bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) {
{
auto *existing = findView(connId, subId);
if (existing) removeView(connId, subId);
}
auto res = conns.try_emplace(connId);
auto &connViews = res.first->second;
if (connViews.size() >= cfg().relay__maxSubsPerConnection) {
return false;
}
connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), } });
return true;
}
@ -205,15 +219,16 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
continue;
}
auto *view = std::get_if<NegentropyViews::MemoryView>(userView);
if (!view) throw herr("bad variant, expected memory view");
if (!view->storageVector.sealed) {
sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete");
continue;
if (auto *view = std::get_if<NegentropyViews::MemoryView>(userView)) {
if (!view->storageVector.sealed) {
sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete");
continue;
}
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (std::get_if<NegentropyViews::StatelessView>(userView)) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
handleReconcile(msg->connId, msg->subId, storage, msg->negPayload);
}
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
views.removeView(msg->connId, msg->subId);