mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-17 16:58:50 +00:00
Merge branch 'query-fixes'
This commit is contained in:
9
TODO
9
TODO
@ -1,9 +1,12 @@
|
||||
0.1 release
|
||||
when disk is full it should log warning but not crash
|
||||
disable sync
|
||||
delete expired events
|
||||
* decide what to do about generalised ephemeral events
|
||||
kill plugin if it times out
|
||||
|
||||
0.2 release
|
||||
? why isn't the LMDB mapping CLOEXEC
|
||||
plugin for stream
|
||||
fix sync
|
||||
* logging of bytes up/down
|
||||
* up/both directions
|
||||
@ -18,6 +21,9 @@ features
|
||||
multiple sync connections in one process/config
|
||||
NIP-42 AUTH
|
||||
slow-reader detection and back-pressure
|
||||
delete command
|
||||
* delete by receivedAt, IP addrs, etc
|
||||
* inverted filter: delete events that *don't* match the provided filter
|
||||
? relay block-list events
|
||||
? if a client disconnects, delete all its pending write messages
|
||||
|
||||
@ -31,3 +37,4 @@ rate limits
|
||||
|
||||
misc
|
||||
? periodic reaping of disconnected sockets (maybe autoping is doing this already)
|
||||
export not dying on SIGPIPE, or not getting SIGPIPE
|
||||
|
384
src/DBQuery.h
Normal file
384
src/DBQuery.h
Normal file
@ -0,0 +1,384 @@
|
||||
#pragma once
|
||||
|
||||
#include "golpe.h"
|
||||
|
||||
#include "Subscription.h"
|
||||
#include "filters.h"
|
||||
#include "events.h"
|
||||
|
||||
|
||||
struct DBScan : NonCopyable {
|
||||
struct CandidateEvent {
|
||||
private:
|
||||
uint64_t packed;
|
||||
uint64_t levIdStorage;
|
||||
|
||||
public:
|
||||
CandidateEvent(uint64_t levId, uint64_t created, uint64_t scanIndex) : packed(scanIndex << 40 | created), levIdStorage(levId) {}
|
||||
|
||||
uint64_t levId() { return levIdStorage; }
|
||||
uint64_t created() { return packed & 0xFF'FFFFFFFF; }
|
||||
uint64_t scanIndex() { return packed >> 40; }
|
||||
};
|
||||
|
||||
enum class KeyMatchResult {
|
||||
Yes,
|
||||
No,
|
||||
NoButContinue,
|
||||
};
|
||||
|
||||
struct ScanCursor {
|
||||
std::string resumeKey;
|
||||
uint64_t resumeVal;
|
||||
std::function<KeyMatchResult(std::string_view)> keyMatch;
|
||||
uint64_t outstanding = 0; // number of records remaining in eventQueue, decremented in DBScan::scan
|
||||
|
||||
bool active() {
|
||||
return resumeKey.size() > 0;
|
||||
}
|
||||
|
||||
uint64_t collect(lmdb::txn &txn, DBScan &s, uint64_t scanIndex, uint64_t limit, std::deque<CandidateEvent> &output) {
|
||||
uint64_t added = 0;
|
||||
|
||||
while (active() && limit > 0) {
|
||||
bool finished = env.generic_foreachFull(txn, s.indexDbi, resumeKey, lmdb::to_sv<uint64_t>(resumeVal), [&](auto k, auto v) {
|
||||
if (limit == 0) {
|
||||
resumeKey = std::string(k);
|
||||
resumeVal = lmdb::from_sv<uint64_t>(v);
|
||||
return false;
|
||||
}
|
||||
|
||||
auto matched = keyMatch(k);
|
||||
if (matched == KeyMatchResult::No) {
|
||||
resumeKey = "";
|
||||
return false;
|
||||
}
|
||||
|
||||
uint64_t created;
|
||||
|
||||
{
|
||||
ParsedKey_StringUint64 parsedKey(k);
|
||||
created = parsedKey.n;
|
||||
|
||||
if (s.f.since && created < s.f.since) {
|
||||
resumeKey = makeKey_StringUint64(parsedKey.s, 0);
|
||||
resumeVal = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (s.f.until && created > s.f.until) {
|
||||
resumeKey = makeKey_StringUint64(parsedKey.s, s.f.until);
|
||||
resumeVal = MAX_U64;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (matched == KeyMatchResult::Yes) {
|
||||
uint64_t levId = lmdb::from_sv<uint64_t>(v);
|
||||
output.emplace_back(levId, created, scanIndex);
|
||||
added++;
|
||||
limit--;
|
||||
}
|
||||
|
||||
return true;
|
||||
}, true);
|
||||
|
||||
if (finished) resumeKey = "";
|
||||
}
|
||||
|
||||
outstanding += added;
|
||||
return added;
|
||||
}
|
||||
};
|
||||
|
||||
const NostrFilter &f;
|
||||
bool indexOnly;
|
||||
lmdb::dbi indexDbi;
|
||||
const char *desc = "?";
|
||||
std::vector<ScanCursor> cursors;
|
||||
std::deque<CandidateEvent> eventQueue; // sorted descending by created
|
||||
uint64_t initialScanDepth;
|
||||
uint64_t refillScanDepth;
|
||||
uint64_t nextInitIndex = 0;
|
||||
uint64_t approxWork = 0;
|
||||
|
||||
DBScan(const NostrFilter &f) : f(f) {
|
||||
indexOnly = f.indexOnlyScans;
|
||||
|
||||
if (f.ids) {
|
||||
indexDbi = env.dbi_Event__id;
|
||||
desc = "ID";
|
||||
|
||||
cursors.reserve(f.ids->size());
|
||||
for (uint64_t i = 0; i < f.ids->size(); i++) {
|
||||
std::string prefix = f.ids->at(i);
|
||||
|
||||
cursors.emplace_back(
|
||||
padBytes(prefix, 32 + 8, '\xFF'),
|
||||
MAX_U64,
|
||||
[prefix](std::string_view k){
|
||||
return k.starts_with(prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
}
|
||||
);
|
||||
}
|
||||
} else if (f.tags.size()) {
|
||||
indexDbi = env.dbi_Event__tag;
|
||||
desc = "Tag";
|
||||
|
||||
char tagName = '\0';
|
||||
{
|
||||
uint64_t numTags = MAX_U64;
|
||||
for (const auto &[tn, filterSet] : f.tags) {
|
||||
if (filterSet.size() < numTags) {
|
||||
numTags = filterSet.size();
|
||||
tagName = tn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const auto &filterSet = f.tags.at(tagName);
|
||||
|
||||
cursors.reserve(filterSet.size());
|
||||
for (uint64_t i = 0; i < filterSet.size(); i++) {
|
||||
std::string search;
|
||||
search += tagName;
|
||||
search += filterSet.at(i);
|
||||
|
||||
cursors.emplace_back(
|
||||
search + std::string(8, '\xFF'),
|
||||
MAX_U64,
|
||||
[search](std::string_view k){
|
||||
return k.size() == search.size() + 8 && k.starts_with(search) ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
}
|
||||
);
|
||||
}
|
||||
} else if (f.authors && f.kinds && f.authors->size() * f.kinds->size() < 1'000) {
|
||||
indexDbi = env.dbi_Event__pubkeyKind;
|
||||
desc = "PubkeyKind";
|
||||
|
||||
cursors.reserve(f.authors->size() * f.kinds->size());
|
||||
for (uint64_t i = 0; i < f.authors->size(); i++) {
|
||||
for (uint64_t j = 0; j < f.kinds->size(); j++) {
|
||||
uint64_t kind = f.kinds->at(j);
|
||||
|
||||
std::string prefix = f.authors->at(i);
|
||||
if (prefix.size() == 32) prefix += lmdb::to_sv<uint64_t>(kind);
|
||||
|
||||
cursors.emplace_back(
|
||||
padBytes(prefix, 32 + 8 + 8, '\xFF'),
|
||||
MAX_U64,
|
||||
[prefix, kind](std::string_view k){
|
||||
if (!k.starts_with(prefix)) return KeyMatchResult::No;
|
||||
if (prefix.size() == 32 + 8) return KeyMatchResult::Yes;
|
||||
|
||||
ParsedKey_StringUint64Uint64 parsedKey(k);
|
||||
if (parsedKey.n1 == kind) return KeyMatchResult::Yes;
|
||||
|
||||
// With a prefix pubkey, continue scanning (pubkey,kind) backwards because with this index
|
||||
// we don't know the next pubkey to jump back to
|
||||
return KeyMatchResult::NoButContinue;
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
} else if (f.authors) {
|
||||
if (f.kinds) indexOnly = false; // because of the size limit in previous test
|
||||
|
||||
indexDbi = env.dbi_Event__pubkey;
|
||||
desc = "Pubkey";
|
||||
|
||||
cursors.reserve(f.authors->size());
|
||||
for (uint64_t i = 0; i < f.authors->size(); i++) {
|
||||
std::string prefix = f.authors->at(i);
|
||||
|
||||
cursors.emplace_back(
|
||||
padBytes(prefix, 32 + 8, '\xFF'),
|
||||
MAX_U64,
|
||||
[prefix](std::string_view k){
|
||||
return k.starts_with(prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
}
|
||||
);
|
||||
}
|
||||
} else if (f.kinds) {
|
||||
indexDbi = env.dbi_Event__kind;
|
||||
desc = "Kind";
|
||||
|
||||
cursors.reserve(f.kinds->size());
|
||||
for (uint64_t i = 0; i < f.kinds->size(); i++) {
|
||||
uint64_t kind = f.kinds->at(i);
|
||||
|
||||
cursors.emplace_back(
|
||||
std::string(lmdb::to_sv<uint64_t>(kind)) + std::string(8, '\xFF'),
|
||||
MAX_U64,
|
||||
[kind](std::string_view k){
|
||||
ParsedKey_Uint64Uint64 parsedKey(k);
|
||||
return parsedKey.n1 == kind ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
}
|
||||
);
|
||||
}
|
||||
} else {
|
||||
indexDbi = env.dbi_Event__created_at;
|
||||
desc = "CreatedAt";
|
||||
|
||||
cursors.reserve(1);
|
||||
cursors.emplace_back(
|
||||
std::string(8, '\xFF'),
|
||||
MAX_U64,
|
||||
[](std::string_view){
|
||||
return KeyMatchResult::Yes;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
initialScanDepth = std::clamp(f.limit / cursors.size(), uint64_t(5), uint64_t(50));
|
||||
refillScanDepth = 10 * initialScanDepth;
|
||||
}
|
||||
|
||||
bool scan(lmdb::txn &txn, std::function<bool(uint64_t, std::string_view)> handleEvent, std::function<bool(uint64_t)> doPause) {
|
||||
auto cmp = [](auto &a, auto &b){
|
||||
return a.created() == b.created() ? a.levId() > b.levId() : a.created() > b.created();
|
||||
};
|
||||
|
||||
auto eventPayloadCursor = lmdb::cursor::open(txn, env.dbi_EventPayload);
|
||||
|
||||
while (1) {
|
||||
approxWork++;
|
||||
if (doPause(approxWork)) return false;
|
||||
|
||||
if (nextInitIndex < cursors.size()) {
|
||||
approxWork += cursors[nextInitIndex].collect(txn, *this, nextInitIndex, initialScanDepth, eventQueue);
|
||||
nextInitIndex++;
|
||||
|
||||
if (nextInitIndex == cursors.size()) {
|
||||
std::sort(eventQueue.begin(), eventQueue.end(), cmp);
|
||||
}
|
||||
|
||||
continue;
|
||||
} else if (eventQueue.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ev = eventQueue.front();
|
||||
eventQueue.pop_front();
|
||||
bool doSend = false;
|
||||
uint64_t levId = ev.levId();
|
||||
std::string_view eventPayload;
|
||||
|
||||
auto loadEventPayload = [&]{
|
||||
std::string_view key = lmdb::to_sv<uint64_t>(levId);
|
||||
return eventPayloadCursor.get(key, eventPayload, MDB_SET_KEY); // If not found, was deleted while scan was paused
|
||||
};
|
||||
|
||||
if (indexOnly) {
|
||||
if (f.doesMatchTimes(ev.created())) doSend = true;
|
||||
if (!loadEventPayload()) doSend = false;
|
||||
} else if (loadEventPayload()) {
|
||||
approxWork += 10;
|
||||
if (f.doesMatch(lookupEventByLevId(txn, levId).flat_nested())) doSend = true;
|
||||
}
|
||||
|
||||
if (doSend) {
|
||||
if (handleEvent(levId, eventPayload)) return true;
|
||||
}
|
||||
|
||||
cursors[ev.scanIndex()].outstanding--;
|
||||
|
||||
if (cursors[ev.scanIndex()].outstanding == 0) {
|
||||
std::deque<CandidateEvent> moreEvents;
|
||||
std::deque<CandidateEvent> newEventQueue;
|
||||
approxWork += cursors[ev.scanIndex()].collect(txn, *this, ev.scanIndex(), refillScanDepth, moreEvents);
|
||||
|
||||
std::merge(eventQueue.begin(), eventQueue.end(), moreEvents.begin(), moreEvents.end(), std::back_inserter(newEventQueue), cmp);
|
||||
eventQueue.swap(newEventQueue);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct DBQuery : NonCopyable {
|
||||
Subscription sub;
|
||||
|
||||
std::unique_ptr<DBScan> scanner;
|
||||
size_t filterGroupIndex = 0;
|
||||
bool dead = false; // external flag
|
||||
flat_hash_set<uint64_t> sentEventsFull;
|
||||
flat_hash_set<uint64_t> sentEventsCurr;
|
||||
uint64_t lastWorkChecked = 0;
|
||||
|
||||
uint64_t currScanTime = 0;
|
||||
uint64_t currScanSaveRestores = 0;
|
||||
uint64_t totalTime = 0;
|
||||
uint64_t totalWork = 0;
|
||||
|
||||
DBQuery(Subscription &sub) : sub(std::move(sub)) {}
|
||||
DBQuery(const tao::json::value &filter, uint64_t maxLimit = MAX_U64) : sub(Subscription(1, ".", NostrFilterGroup::unwrapped(filter, maxLimit))) {}
|
||||
|
||||
// If scan is complete, returns true
|
||||
bool process(lmdb::txn &txn, std::function<void(const Subscription &, uint64_t, std::string_view)> cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) {
|
||||
while (filterGroupIndex < sub.filterGroup.size()) {
|
||||
const auto &f = sub.filterGroup.filters[filterGroupIndex];
|
||||
|
||||
if (!scanner) scanner = std::make_unique<DBScan>(f);
|
||||
|
||||
uint64_t startTime = hoytech::curr_time_us();
|
||||
|
||||
bool complete = scanner->scan(txn, [&](uint64_t levId, std::string_view eventPayload){
|
||||
// If this event came in after our query began, don't send it. It will be sent after the EOSE.
|
||||
if (levId > sub.latestEventId) return false;
|
||||
|
||||
if (sentEventsFull.find(levId) == sentEventsFull.end()) {
|
||||
sentEventsFull.insert(levId);
|
||||
cb(sub, levId, eventPayload);
|
||||
}
|
||||
|
||||
sentEventsCurr.insert(levId);
|
||||
return sentEventsCurr.size() >= f.limit;
|
||||
}, [&](uint64_t approxWork){
|
||||
if (approxWork > lastWorkChecked + 2'000) {
|
||||
lastWorkChecked = approxWork;
|
||||
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
currScanTime += hoytech::curr_time_us() - startTime;
|
||||
|
||||
if (!complete) {
|
||||
currScanSaveRestores++;
|
||||
return false;
|
||||
}
|
||||
|
||||
totalTime += currScanTime;
|
||||
totalWork += scanner->approxWork;
|
||||
|
||||
if (logMetrics) {
|
||||
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
|
||||
<< " scan=" << scanner->desc
|
||||
<< " indexOnly=" << scanner->indexOnly
|
||||
<< " time=" << currScanTime << "us"
|
||||
<< " saveRestores=" << currScanSaveRestores
|
||||
<< " recsFound=" << sentEventsCurr.size()
|
||||
<< " work=" << scanner->approxWork;
|
||||
;
|
||||
}
|
||||
|
||||
scanner.reset();
|
||||
filterGroupIndex++;
|
||||
sentEventsCurr.clear();
|
||||
|
||||
currScanTime = 0;
|
||||
currScanSaveRestores = 0;
|
||||
}
|
||||
|
||||
if (logMetrics) {
|
||||
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
|
||||
<< " totalTime=" << totalTime << "us"
|
||||
<< " totalWork=" << totalWork
|
||||
<< " recsSent=" << sentEventsFull.size()
|
||||
;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
383
src/DBScan.h
383
src/DBScan.h
@ -1,383 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "golpe.h"
|
||||
|
||||
#include "Subscription.h"
|
||||
#include "filters.h"
|
||||
|
||||
|
||||
struct DBScan {
|
||||
const NostrFilter &f;
|
||||
uint64_t remainingLimit;
|
||||
|
||||
struct NullState {
|
||||
};
|
||||
|
||||
struct IdScan {
|
||||
size_t index = 0;
|
||||
std::string prefix;
|
||||
};
|
||||
|
||||
struct PubkeyKindScan {
|
||||
size_t indexAuthor = 0;
|
||||
size_t indexKind = 0;
|
||||
std::string prefix;
|
||||
};
|
||||
|
||||
struct PubkeyScan {
|
||||
size_t index = 0;
|
||||
std::string prefix;
|
||||
};
|
||||
|
||||
struct TagScan {
|
||||
flat_hash_map<char, FilterSetBytes>::const_iterator indexTagName;
|
||||
size_t indexTagVal = 0;
|
||||
std::string search;
|
||||
};
|
||||
|
||||
struct KindScan {
|
||||
size_t index = 0;
|
||||
uint64_t kind;
|
||||
};
|
||||
|
||||
struct CreatedAtScan {
|
||||
bool done = false;
|
||||
};
|
||||
|
||||
std::variant<NullState, IdScan, PubkeyKindScan, PubkeyScan, TagScan, KindScan, CreatedAtScan> scanState = NullState{};
|
||||
lmdb::dbi indexDbi;
|
||||
std::string resumeKey;
|
||||
uint64_t resumeVal;
|
||||
|
||||
enum class KeyMatchResult {
|
||||
Yes,
|
||||
No,
|
||||
NoButContinue,
|
||||
};
|
||||
|
||||
std::function<bool()> isComplete;
|
||||
std::function<void()> nextFilterItem;
|
||||
std::function<void()> resetResume;
|
||||
std::function<KeyMatchResult(std::string_view, bool&)> keyMatch;
|
||||
|
||||
DBScan(const NostrFilter &f_) : f(f_) {
|
||||
remainingLimit = f.limit;
|
||||
|
||||
if (f.ids) {
|
||||
scanState = IdScan{};
|
||||
auto *state = std::get_if<IdScan>(&scanState);
|
||||
indexDbi = env.dbi_Event__id;
|
||||
|
||||
isComplete = [&, state]{
|
||||
return state->index >= f.ids->size();
|
||||
};
|
||||
nextFilterItem = [&, state]{
|
||||
state->index++;
|
||||
};
|
||||
resetResume = [&, state]{
|
||||
state->prefix = f.ids->at(state->index);
|
||||
resumeKey = padBytes(state->prefix, 32 + 8, '\xFF');
|
||||
resumeVal = MAX_U64;
|
||||
};
|
||||
keyMatch = [&, state](std::string_view k, bool&){
|
||||
return k.starts_with(state->prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
};
|
||||
} else if (f.authors && f.kinds) {
|
||||
scanState = PubkeyKindScan{};
|
||||
auto *state = std::get_if<PubkeyKindScan>(&scanState);
|
||||
indexDbi = env.dbi_Event__pubkeyKind;
|
||||
|
||||
isComplete = [&, state]{
|
||||
return state->indexAuthor >= f.authors->size();
|
||||
};
|
||||
nextFilterItem = [&, state]{
|
||||
state->indexKind++;
|
||||
if (state->indexKind >= f.kinds->size()) {
|
||||
state->indexAuthor++;
|
||||
state->indexKind = 0;
|
||||
}
|
||||
};
|
||||
resetResume = [&, state]{
|
||||
state->prefix = f.authors->at(state->indexAuthor);
|
||||
if (state->prefix.size() == 32) state->prefix += lmdb::to_sv<uint64_t>(f.kinds->at(state->indexKind));
|
||||
resumeKey = padBytes(state->prefix, 32 + 8 + 8, '\xFF');
|
||||
resumeVal = MAX_U64;
|
||||
};
|
||||
keyMatch = [&, state](std::string_view k, bool &skipBack){
|
||||
if (!k.starts_with(state->prefix)) return KeyMatchResult::No;
|
||||
if (state->prefix.size() == 32 + 8) return KeyMatchResult::Yes;
|
||||
|
||||
ParsedKey_StringUint64Uint64 parsedKey(k);
|
||||
if (parsedKey.n1 == f.kinds->at(state->indexKind)) {
|
||||
return KeyMatchResult::Yes;
|
||||
} else if (parsedKey.n1 < f.kinds->at(state->indexKind)) {
|
||||
// With a prefix pubkey, continue scanning (pubkey,kind) backwards because with this index
|
||||
// we don't know the next pubkey to jump back to
|
||||
return KeyMatchResult::NoButContinue;
|
||||
}
|
||||
|
||||
resumeKey = makeKey_StringUint64Uint64(parsedKey.s, f.kinds->at(state->indexKind), MAX_U64);
|
||||
resumeVal = MAX_U64;
|
||||
skipBack = true;
|
||||
return KeyMatchResult::No;
|
||||
};
|
||||
} else if (f.authors) {
|
||||
scanState = PubkeyScan{};
|
||||
auto *state = std::get_if<PubkeyScan>(&scanState);
|
||||
indexDbi = env.dbi_Event__pubkey;
|
||||
|
||||
isComplete = [&, state]{
|
||||
return state->index >= f.authors->size();
|
||||
};
|
||||
nextFilterItem = [&, state]{
|
||||
state->index++;
|
||||
};
|
||||
resetResume = [&, state]{
|
||||
state->prefix = f.authors->at(state->index);
|
||||
resumeKey = padBytes(state->prefix, 32 + 8, '\xFF');
|
||||
resumeVal = MAX_U64;
|
||||
};
|
||||
keyMatch = [&, state](std::string_view k, bool&){
|
||||
return k.starts_with(state->prefix) ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
};
|
||||
} else if (f.tags.size()) {
|
||||
scanState = TagScan{f.tags.begin()};
|
||||
auto *state = std::get_if<TagScan>(&scanState);
|
||||
indexDbi = env.dbi_Event__tag;
|
||||
|
||||
isComplete = [&, state]{
|
||||
return state->indexTagName == f.tags.end();
|
||||
};
|
||||
nextFilterItem = [&, state]{
|
||||
state->indexTagVal++;
|
||||
if (state->indexTagVal >= state->indexTagName->second.size()) {
|
||||
state->indexTagName = std::next(state->indexTagName);
|
||||
state->indexTagVal = 0;
|
||||
}
|
||||
};
|
||||
resetResume = [&, state]{
|
||||
state->search = state->indexTagName->first;
|
||||
state->search += state->indexTagName->second.at(state->indexTagVal);
|
||||
resumeKey = state->search + std::string(8, '\xFF');
|
||||
resumeVal = MAX_U64;
|
||||
};
|
||||
keyMatch = [&, state](std::string_view k, bool&){
|
||||
return k.substr(0, state->search.size()) == state->search ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
};
|
||||
} else if (f.kinds) {
|
||||
scanState = KindScan{};
|
||||
auto *state = std::get_if<KindScan>(&scanState);
|
||||
indexDbi = env.dbi_Event__kind;
|
||||
|
||||
isComplete = [&, state]{
|
||||
return state->index >= f.kinds->size();
|
||||
};
|
||||
nextFilterItem = [&, state]{
|
||||
state->index++;
|
||||
};
|
||||
resetResume = [&, state]{
|
||||
state->kind = f.kinds->at(state->index);
|
||||
resumeKey = std::string(lmdb::to_sv<uint64_t>(state->kind)) + std::string(8, '\xFF');
|
||||
resumeVal = MAX_U64;
|
||||
};
|
||||
keyMatch = [&, state](std::string_view k, bool&){
|
||||
ParsedKey_Uint64Uint64 parsedKey(k);
|
||||
return parsedKey.n1 == state->kind ? KeyMatchResult::Yes : KeyMatchResult::No;
|
||||
};
|
||||
} else {
|
||||
scanState = CreatedAtScan{};
|
||||
auto *state = std::get_if<CreatedAtScan>(&scanState);
|
||||
indexDbi = env.dbi_Event__created_at;
|
||||
|
||||
isComplete = [&, state]{
|
||||
return state->done;
|
||||
};
|
||||
nextFilterItem = [&, state]{
|
||||
state->done = true;
|
||||
};
|
||||
resetResume = [&, state]{
|
||||
resumeKey = std::string(8, '\xFF');
|
||||
resumeVal = MAX_U64;
|
||||
};
|
||||
keyMatch = [&, state](std::string_view k, bool&){
|
||||
return KeyMatchResult::Yes;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// If scan is complete, returns true
|
||||
bool scan(lmdb::txn &txn, std::function<void(uint64_t)> handleEvent, std::function<bool()> doPause) {
|
||||
while (remainingLimit && !isComplete()) {
|
||||
if (resumeKey == "") resetResume();
|
||||
|
||||
bool pause = false, skipBack = false;
|
||||
|
||||
env.generic_foreachFull(txn, indexDbi, resumeKey, lmdb::to_sv<uint64_t>(resumeVal), [&](auto k, auto v) {
|
||||
if (doPause()) {
|
||||
resumeKey = std::string(k);
|
||||
resumeVal = lmdb::from_sv<uint64_t>(v);
|
||||
pause = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
auto matched = keyMatch(k, skipBack);
|
||||
if (matched == KeyMatchResult::No) return false;
|
||||
|
||||
uint64_t created;
|
||||
|
||||
{
|
||||
ParsedKey_StringUint64 parsedKey(k);
|
||||
created = parsedKey.n;
|
||||
|
||||
if ((f.since && created < f.since)) {
|
||||
resumeKey = makeKey_StringUint64(parsedKey.s, 0);
|
||||
resumeVal = 0;
|
||||
skipBack = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (f.until && created > f.until) {
|
||||
resumeKey = makeKey_StringUint64(parsedKey.s, f.until);
|
||||
resumeVal = MAX_U64;
|
||||
skipBack = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool sent = false;
|
||||
uint64_t levId = lmdb::from_sv<uint64_t>(v);
|
||||
|
||||
if (matched == KeyMatchResult::NoButContinue) {
|
||||
// Don't attempt to match filter
|
||||
} else if (f.indexOnlyScans) {
|
||||
if (f.doesMatchTimes(created)) {
|
||||
handleEvent(levId);
|
||||
sent = true;
|
||||
}
|
||||
} else {
|
||||
auto view = env.lookup_Event(txn, levId);
|
||||
if (!view) throw herr("missing event from index, corrupt DB?");
|
||||
if (f.doesMatch(view->flat_nested())) {
|
||||
handleEvent(levId);
|
||||
sent = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (sent) {
|
||||
if (remainingLimit) remainingLimit--;
|
||||
if (!remainingLimit) return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}, true);
|
||||
|
||||
if (pause) return false;
|
||||
|
||||
if (!skipBack) {
|
||||
nextFilterItem();
|
||||
resumeKey = "";
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string padBytes(std::string_view str, size_t n, char padChar) {
|
||||
if (str.size() > n) throw herr("unable to pad, string longer than expected");
|
||||
return std::string(str) + std::string(n - str.size(), padChar);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct DBScanQuery : NonCopyable {
|
||||
Subscription sub;
|
||||
std::unique_ptr<DBScan> scanner;
|
||||
|
||||
size_t filterGroupIndex = 0;
|
||||
bool dead = false;
|
||||
flat_hash_set<uint64_t> alreadySentEvents;
|
||||
|
||||
uint64_t currScanTime = 0;
|
||||
uint64_t currScanSaveRestores = 0;
|
||||
uint64_t currScanRecordsTraversed = 0;
|
||||
uint64_t currScanRecordsFound = 0;
|
||||
|
||||
uint64_t totalScanTime = 0;
|
||||
|
||||
DBScanQuery(Subscription &sub_) : sub(std::move(sub_)) {}
|
||||
|
||||
// If scan is complete, returns true
|
||||
bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, bool logMetrics, std::function<void(const Subscription &, uint64_t)> cb) {
|
||||
uint64_t startTime = hoytech::curr_time_us();
|
||||
|
||||
while (filterGroupIndex < sub.filterGroup.size()) {
|
||||
if (!scanner) scanner = std::make_unique<DBScan>(sub.filterGroup.filters[filterGroupIndex]);
|
||||
|
||||
bool complete = scanner->scan(txn, [&](uint64_t levId){
|
||||
// If this event came in after our query began, don't send it. It will be sent after the EOSE.
|
||||
if (levId > sub.latestEventId) return;
|
||||
|
||||
// We already sent this event
|
||||
if (alreadySentEvents.find(levId) != alreadySentEvents.end()) return;
|
||||
alreadySentEvents.insert(levId);
|
||||
|
||||
currScanRecordsFound++;
|
||||
cb(sub, levId);
|
||||
}, [&]{
|
||||
currScanRecordsTraversed++;
|
||||
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;
|
||||
});
|
||||
|
||||
currScanTime += hoytech::curr_time_us() - startTime;
|
||||
|
||||
if (!complete) {
|
||||
currScanSaveRestores++;
|
||||
return false;
|
||||
}
|
||||
|
||||
totalScanTime += currScanTime;
|
||||
|
||||
if (logMetrics) {
|
||||
std::string scanType = "?";
|
||||
|
||||
if (std::get_if<DBScan::IdScan>(&scanner->scanState)) {
|
||||
scanType = "Id";
|
||||
} else if (std::get_if<DBScan::PubkeyKindScan>(&scanner->scanState)) {
|
||||
scanType = "PubkeyKind";
|
||||
} else if (std::get_if<DBScan::PubkeyScan>(&scanner->scanState)) {
|
||||
scanType = "Pubkey";
|
||||
} else if (std::get_if<DBScan::TagScan>(&scanner->scanState)) {
|
||||
scanType = "Tag";
|
||||
} else if (std::get_if<DBScan::KindScan>(&scanner->scanState)) {
|
||||
scanType = "Kind";
|
||||
} else if (std::get_if<DBScan::CreatedAtScan>(&scanner->scanState)) {
|
||||
scanType = "CreatedAt";
|
||||
}
|
||||
|
||||
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
|
||||
<< " scan=" << scanType
|
||||
<< " indexOnly=" << scanner->f.indexOnlyScans
|
||||
<< " time=" << currScanTime << "us"
|
||||
<< " saveRestores=" << currScanSaveRestores
|
||||
<< " recsFound=" << currScanRecordsFound
|
||||
<< " recsScanned=" << currScanRecordsTraversed
|
||||
;
|
||||
}
|
||||
|
||||
filterGroupIndex++;
|
||||
scanner.reset();
|
||||
currScanTime = 0;
|
||||
currScanSaveRestores = 0;
|
||||
currScanRecordsTraversed = 0;
|
||||
currScanRecordsFound = 0;
|
||||
}
|
||||
|
||||
if (logMetrics) {
|
||||
LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'"
|
||||
<< " totalTime=" << totalScanTime << "us"
|
||||
;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
@ -192,16 +192,14 @@ struct PluginWritePolicy {
|
||||
env.generic_foreachFull(txn, env.dbi_Event__receivedAt, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
||||
if (lmdb::from_sv<uint64_t>(k) > now) return false;
|
||||
|
||||
auto ev = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!ev) throw herr("unable to look up event, corrupt DB?");
|
||||
|
||||
auto sourceType = (EventSourceType)ev->sourceType();
|
||||
std::string_view sourceInfo = ev->sourceInfo();
|
||||
auto ev = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
auto sourceType = (EventSourceType)ev.sourceType();
|
||||
std::string_view sourceInfo = ev.sourceInfo();
|
||||
|
||||
auto request = tao::json::value({
|
||||
{ "type", "lookback" },
|
||||
{ "event", tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId)) },
|
||||
{ "receivedAt", ev->receivedAt() / 1000000 },
|
||||
{ "event", tao::json::from_string(getEventJson(txn, decomp, ev.primaryKeyId)) },
|
||||
{ "receivedAt", ev.receivedAt() / 1000000 },
|
||||
{ "sourceType", eventSourceTypeToStr(sourceType) },
|
||||
{ "sourceInfo", sourceType == EventSourceType::IP4 || sourceType == EventSourceType::IP6 ? renderIP(sourceInfo) : sourceInfo },
|
||||
});
|
||||
|
@ -1,9 +1,21 @@
|
||||
#include <hoytech/timer.h>
|
||||
|
||||
#include "RelayServer.h"
|
||||
|
||||
#include "gc.h"
|
||||
|
||||
|
||||
void RelayServer::cleanupOldEvents() {
|
||||
void RelayServer::runCron() {
|
||||
auto qdb = getQdbInstance();
|
||||
|
||||
hoytech::timer cron;
|
||||
|
||||
cron.setupCb = []{ setThreadName("cron"); };
|
||||
|
||||
|
||||
// Delete ephemeral events
|
||||
|
||||
cron.repeat(10 * 1'000'000UL, [&]{
|
||||
std::vector<uint64_t> expiredLevIds;
|
||||
|
||||
{
|
||||
@ -42,8 +54,6 @@ void RelayServer::cleanupOldEvents() {
|
||||
}
|
||||
|
||||
if (expiredLevIds.size() > 0) {
|
||||
auto qdb = getQdbInstance();
|
||||
|
||||
auto txn = env.txn_rw();
|
||||
|
||||
uint64_t numDeleted = 0;
|
||||
@ -62,10 +72,15 @@ void RelayServer::cleanupOldEvents() {
|
||||
|
||||
if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events";
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
void RelayServer::garbageCollect() {
|
||||
auto qdb = getQdbInstance();
|
||||
// Garbage collect quadrable nodes
|
||||
|
||||
cron.repeat(60 * 60 * 1'000'000UL, [&]{
|
||||
quadrableGarbageCollect(qdb, 1);
|
||||
});
|
||||
|
||||
cron.run();
|
||||
|
||||
while (1) std::this_thread::sleep_for(std::chrono::seconds(1'000'000));
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
#include "RelayServer.h"
|
||||
#include "DBScan.h"
|
||||
#include "DBQuery.h"
|
||||
|
||||
|
||||
|
||||
struct ActiveQueries : NonCopyable {
|
||||
Decompressor decomp;
|
||||
using ConnQueries = flat_hash_map<SubId, DBScanQuery*>;
|
||||
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBScanQuery*
|
||||
std::deque<DBScanQuery*> running;
|
||||
using ConnQueries = flat_hash_map<SubId, DBQuery*>;
|
||||
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBQuery*
|
||||
std::deque<DBQuery*> running;
|
||||
|
||||
bool addSub(lmdb::txn &txn, Subscription &&sub) {
|
||||
sub.latestEventId = getMostRecentLevId(txn);
|
||||
@ -24,7 +24,7 @@ struct ActiveQueries : NonCopyable {
|
||||
return false;
|
||||
}
|
||||
|
||||
DBScanQuery *q = new DBScanQuery(sub);
|
||||
DBQuery *q = new DBQuery(sub);
|
||||
|
||||
connQueries.try_emplace(q->sub.subId, q);
|
||||
running.push_front(q);
|
||||
@ -32,7 +32,7 @@ struct ActiveQueries : NonCopyable {
|
||||
return true;
|
||||
}
|
||||
|
||||
DBScanQuery *findQuery(uint64_t connId, const SubId &subId) {
|
||||
DBQuery *findQuery(uint64_t connId, const SubId &subId) {
|
||||
auto f1 = conns.find(connId);
|
||||
if (f1 == conns.end()) return nullptr;
|
||||
|
||||
@ -62,7 +62,7 @@ struct ActiveQueries : NonCopyable {
|
||||
void process(RelayServer *server, lmdb::txn &txn) {
|
||||
if (running.empty()) return;
|
||||
|
||||
DBScanQuery *q = running.front();
|
||||
DBQuery *q = running.front();
|
||||
running.pop_front();
|
||||
|
||||
if (q->dead) {
|
||||
@ -70,13 +70,9 @@ struct ActiveQueries : NonCopyable {
|
||||
return;
|
||||
}
|
||||
|
||||
auto cursor = lmdb::cursor::open(txn, env.dbi_EventPayload);
|
||||
|
||||
bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){
|
||||
std::string_view key = lmdb::to_sv<uint64_t>(levId), val;
|
||||
if (!cursor.get(key, val, MDB_SET_KEY)) throw herr("couldn't find event in EventPayload, corrupted DB?");
|
||||
server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, val, nullptr, nullptr));
|
||||
});
|
||||
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
|
||||
server->sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr));
|
||||
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
|
||||
|
||||
if (complete) {
|
||||
auto connId = q->sub.connId;
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <memory>
|
||||
#include <algorithm>
|
||||
|
||||
#include <hoytech/timer.h>
|
||||
#include <hoytech/time.h>
|
||||
#include <hoytech/hex.h>
|
||||
#include <hoytech/file_change_monitor.h>
|
||||
@ -142,7 +141,7 @@ struct RelayServer {
|
||||
ThreadPool<MsgReqWorker> tpReqWorker;
|
||||
ThreadPool<MsgReqMonitor> tpReqMonitor;
|
||||
ThreadPool<MsgYesstr> tpYesstr;
|
||||
hoytech::timer cron;
|
||||
std::thread cronThread;
|
||||
|
||||
void run();
|
||||
|
||||
@ -161,8 +160,7 @@ struct RelayServer {
|
||||
|
||||
void runYesstr(ThreadPool<MsgYesstr>::Thread &thr);
|
||||
|
||||
void cleanupOldEvents();
|
||||
void garbageCollect();
|
||||
void runCron();
|
||||
|
||||
// Utils (can be called by any thread)
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <quadrable/transport.h>
|
||||
|
||||
#include "RelayServer.h"
|
||||
#include "DBScan.h"
|
||||
#include "DBQuery.h"
|
||||
|
||||
|
||||
void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
|
||||
@ -49,14 +49,12 @@ void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
|
||||
// with other requests like RelayReqWorker does.
|
||||
|
||||
std::vector<uint64_t> levIds;
|
||||
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
|
||||
Subscription sub(1, "junkSub", filterGroup);
|
||||
DBScanQuery query(sub);
|
||||
DBQuery query(tao::json::from_string(filterStr));
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, MAX_U64, cfg().relay__logging__dbScanPerf, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.push_back(levId);
|
||||
});
|
||||
}, MAX_U64, cfg().relay__logging__dbScanPerf);
|
||||
|
||||
if (complete) break;
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <docopt.h>
|
||||
#include "golpe.h"
|
||||
|
||||
#include "DBScan.h"
|
||||
#include "DBQuery.h"
|
||||
#include "events.h"
|
||||
#include "gc.h"
|
||||
|
||||
@ -11,7 +11,7 @@
|
||||
static const char USAGE[] =
|
||||
R"(
|
||||
Usage:
|
||||
delete [--age=<age>] [--filter=<filter>] [--dry-run] [--no-gc]
|
||||
delete [--age=<age>] [--filter=<filter>] [--dry-run]
|
||||
)";
|
||||
|
||||
|
||||
@ -25,7 +25,6 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
|
||||
if (args["--filter"]) filterStr = args["--filter"].asString();
|
||||
|
||||
bool dryRun = args["--dry-run"].asBool();
|
||||
bool noGc = args["--no-gc"].asBool();
|
||||
|
||||
|
||||
|
||||
@ -44,10 +43,7 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
|
||||
}
|
||||
|
||||
|
||||
auto filterGroup = NostrFilterGroup::unwrapped(filter, MAX_U64);
|
||||
Subscription sub(1, "junkSub", filterGroup);
|
||||
DBScanQuery query(sub);
|
||||
|
||||
DBQuery query(filter);
|
||||
|
||||
btree_set<uint64_t> levIds;
|
||||
|
||||
@ -55,7 +51,7 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.insert(levId);
|
||||
});
|
||||
|
||||
@ -88,6 +84,4 @@ void cmd_delete(const std::vector<std::string> &subArgs) {
|
||||
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
if (!noGc) quadrableGarbageCollect(qdb, 2);
|
||||
}
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include <docopt.h>
|
||||
#include "golpe.h"
|
||||
|
||||
#include "DBScan.h"
|
||||
#include "DBQuery.h"
|
||||
#include "events.h"
|
||||
|
||||
|
||||
@ -47,12 +47,10 @@ void cmd_dict(const std::vector<std::string> &subArgs) {
|
||||
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64);
|
||||
Subscription sub(1, "junkSub", filterGroup);
|
||||
DBScanQuery query(sub);
|
||||
DBQuery query(tao::json::from_string(filterStr));
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.push_back(levId);
|
||||
});
|
||||
|
||||
@ -79,7 +77,7 @@ void cmd_dict(const std::vector<std::string> &subArgs) {
|
||||
std::string_view raw;
|
||||
|
||||
bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), raw);
|
||||
if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?");
|
||||
if (!found) throw herr("couldn't find event in EventPayload");
|
||||
|
||||
uint32_t dictId;
|
||||
size_t outCompressedSize;
|
||||
|
@ -9,7 +9,7 @@
|
||||
static const char USAGE[] =
|
||||
R"(
|
||||
Usage:
|
||||
export [--since=<since>] [--until=<until>] [--include-ephemeral]
|
||||
export [--since=<since>] [--until=<until>] [--reverse] [--include-ephemeral]
|
||||
)";
|
||||
|
||||
|
||||
@ -19,6 +19,8 @@ void cmd_export(const std::vector<std::string> &subArgs) {
|
||||
uint64_t since = 0, until = MAX_U64;
|
||||
if (args["--since"]) since = args["--since"].asLong();
|
||||
if (args["--until"]) until = args["--until"].asLong();
|
||||
bool includeEphemeral = args["--include-ephemeral"].asBool();
|
||||
bool reverse = args["--reverse"].asBool();
|
||||
|
||||
Decompressor decomp;
|
||||
|
||||
@ -27,26 +29,30 @@ void cmd_export(const std::vector<std::string> &subArgs) {
|
||||
auto dbVersion = getDBVersion(txn);
|
||||
auto qdb = getQdbInstance(txn);
|
||||
|
||||
env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv<uint64_t>(since), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
||||
if (lmdb::from_sv<uint64_t>(k) > until) return false;
|
||||
uint64_t start = reverse ? until : since;
|
||||
uint64_t startDup = reverse ? MAX_U64 : 0;
|
||||
|
||||
auto view = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!view) throw herr("missing event from index, corrupt DB?");
|
||||
env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(startDup), [&](auto k, auto v) {
|
||||
if (reverse) {
|
||||
if (lmdb::from_sv<uint64_t>(k) < since) return false;
|
||||
} else {
|
||||
if (lmdb::from_sv<uint64_t>(k) > until) return false;
|
||||
}
|
||||
|
||||
auto view = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
|
||||
if (dbVersion == 0) {
|
||||
std::string_view raw;
|
||||
bool found = qdb.dbi_nodesLeaf.get(txn, lmdb::to_sv<uint64_t>(view->primaryKeyId), raw);
|
||||
if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?");
|
||||
bool found = qdb.dbi_nodesLeaf.get(txn, lmdb::to_sv<uint64_t>(view.primaryKeyId), raw);
|
||||
if (!found) throw herr("couldn't find leaf node in quadrable table");
|
||||
std::cout << raw.substr(8 + 32 + 32) << "\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!args["--include-ephemeral"].asBool()) {
|
||||
if (isEphemeralEvent(view->flat_nested()->kind())) return true;
|
||||
}
|
||||
if (!includeEphemeral && isEphemeralEvent(view.flat_nested()->kind())) return true;
|
||||
|
||||
std::cout << getEventJson(txn, decomp, view->primaryKeyId) << "\n";
|
||||
std::cout << getEventJson(txn, decomp, view.primaryKeyId) << "\n";
|
||||
|
||||
return true;
|
||||
});
|
||||
}, reverse);
|
||||
}
|
||||
|
@ -11,7 +11,7 @@
|
||||
static const char USAGE[] =
|
||||
R"(
|
||||
Usage:
|
||||
import [--show-rejected] [--no-verify] [--no-gc]
|
||||
import [--show-rejected] [--no-verify]
|
||||
)";
|
||||
|
||||
|
||||
@ -20,7 +20,6 @@ void cmd_import(const std::vector<std::string> &subArgs) {
|
||||
|
||||
bool showRejected = args["--show-rejected"].asBool();
|
||||
bool noVerify = args["--no-verify"].asBool();
|
||||
bool noGc = args["--no-gc"].asBool();
|
||||
|
||||
if (noVerify) LW << "not verifying event IDs or signatures!";
|
||||
|
||||
@ -90,6 +89,4 @@ void cmd_import(const std::vector<std::string> &subArgs) {
|
||||
flushChanges();
|
||||
|
||||
txn.commit();
|
||||
|
||||
if (!noGc) quadrableGarbageCollect(qdb, 2);
|
||||
}
|
||||
|
@ -32,6 +32,10 @@ void RelayServer::run() {
|
||||
runYesstr(thr);
|
||||
});
|
||||
|
||||
cronThread = std::thread([this]{
|
||||
runCron();
|
||||
});
|
||||
|
||||
// Monitor for config file reloads
|
||||
|
||||
auto configFileChangeWatcher = hoytech::file_change_monitor(configFile);
|
||||
@ -42,19 +46,6 @@ void RelayServer::run() {
|
||||
loadConfig(configFile);
|
||||
});
|
||||
|
||||
// Cron
|
||||
|
||||
cron.repeat(10 * 1'000'000UL, [&]{
|
||||
cleanupOldEvents();
|
||||
});
|
||||
|
||||
cron.repeat(60 * 60 * 1'000'000UL, [&]{
|
||||
garbageCollect();
|
||||
});
|
||||
|
||||
cron.setupCb = []{ setThreadName("cron"); };
|
||||
|
||||
cron.run();
|
||||
|
||||
tpWebsocket.join();
|
||||
}
|
||||
|
@ -3,14 +3,14 @@
|
||||
#include <docopt.h>
|
||||
#include "golpe.h"
|
||||
|
||||
#include "DBScan.h"
|
||||
#include "DBQuery.h"
|
||||
#include "events.h"
|
||||
|
||||
|
||||
static const char USAGE[] =
|
||||
R"(
|
||||
Usage:
|
||||
scan [--pause=<pause>] [--metrics] <filter>
|
||||
scan [--pause=<pause>] [--metrics] [--count] <filter>
|
||||
)";
|
||||
|
||||
|
||||
@ -21,24 +21,27 @@ void cmd_scan(const std::vector<std::string> &subArgs) {
|
||||
if (args["--pause"]) pause = args["--pause"].asLong();
|
||||
|
||||
bool metrics = args["--metrics"].asBool();
|
||||
bool count = args["--count"].asBool();
|
||||
|
||||
std::string filterStr = args["<filter>"].asString();
|
||||
|
||||
|
||||
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr), MAX_U64);
|
||||
Subscription sub(1, "junkSub", filterGroup);
|
||||
DBScanQuery query(sub);
|
||||
|
||||
DBQuery query(tao::json::from_string(filterStr));
|
||||
|
||||
Decompressor decomp;
|
||||
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
uint64_t numEvents = 0;
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, pause ? pause : MAX_U64, metrics, [&](const auto &sub, uint64_t levId){
|
||||
std::cout << getEventJson(txn, decomp, levId) << "\n";
|
||||
});
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
|
||||
if (count) numEvents++;
|
||||
else std::cout << getEventJson(txn, decomp, levId, eventPayload) << "\n";
|
||||
}, pause ? pause : MAX_U64, metrics);
|
||||
|
||||
if (complete) break;
|
||||
}
|
||||
|
||||
if (count) std::cout << numEvents << std::endl;
|
||||
}
|
||||
|
@ -9,7 +9,7 @@
|
||||
#include "WriterPipeline.h"
|
||||
#include "Subscription.h"
|
||||
#include "WSConnection.h"
|
||||
#include "DBScan.h"
|
||||
#include "DBQuery.h"
|
||||
#include "filters.h"
|
||||
#include "events.h"
|
||||
#include "yesstr.h"
|
||||
@ -151,15 +151,12 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
||||
::exit(1);
|
||||
}
|
||||
|
||||
auto filterGroup = NostrFilterGroup::unwrapped(filterJson);
|
||||
DBQuery query(filterJson);
|
||||
|
||||
Subscription sub(1, "junkSub", filterGroup);
|
||||
|
||||
DBScanQuery query(sub);
|
||||
auto txn = env.txn_ro();
|
||||
|
||||
while (1) {
|
||||
bool complete = query.process(txn, MAX_U64, false, [&](const auto &sub, uint64_t levId){
|
||||
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
|
||||
levIds.push_back(levId);
|
||||
});
|
||||
|
||||
|
@ -170,6 +170,12 @@ std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn
|
||||
return output;
|
||||
}
|
||||
|
||||
defaultDb::environment::View_Event lookupEventByLevId(lmdb::txn &txn, uint64_t levId) {
|
||||
auto view = env.lookup_Event(txn, levId);
|
||||
if (!view) throw herr("unable to lookup event by levId");
|
||||
return *view;
|
||||
}
|
||||
|
||||
uint64_t getMostRecentLevId(lmdb::txn &txn) {
|
||||
uint64_t levId = 0;
|
||||
|
||||
@ -210,12 +216,16 @@ std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::s
|
||||
// Return result only valid until one of: next call to getEventJson/decodeEventPayload, write to/closing of txn, or any action on decomp object
|
||||
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId) {
|
||||
std::string_view raw;
|
||||
std::string_view eventPayload;
|
||||
|
||||
bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), raw);
|
||||
if (!found) throw herr("couldn't find event in EventPayload, corrupted DB?");
|
||||
bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), eventPayload);
|
||||
if (!found) throw herr("couldn't find event in EventPayload");
|
||||
|
||||
return decodeEventPayload(txn, decomp, raw, nullptr, nullptr);
|
||||
return getEventJson(txn, decomp, levId, eventPayload);
|
||||
}
|
||||
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId, std::string_view eventPayload) {
|
||||
return decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr);
|
||||
}
|
||||
|
||||
|
||||
@ -258,10 +268,9 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
|
||||
ParsedKey_StringUint64Uint64 parsedKey(k);
|
||||
if (parsedKey.s == sv(flat->pubkey()) && parsedKey.n1 == flat->kind()) {
|
||||
if (parsedKey.n2 < flat->created_at()) {
|
||||
auto otherEv = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!otherEv) throw herr("missing event from index, corrupt DB?");
|
||||
if (logLevel >= 1) LI << "Deleting event (replaceable). id=" << to_hex(sv(otherEv->flat_nested()->id()));
|
||||
deleteEvent(txn, changes, *otherEv);
|
||||
auto otherEv = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (logLevel >= 1) LI << "Deleting event (replaceable). id=" << to_hex(sv(otherEv.flat_nested()->id()));
|
||||
deleteEvent(txn, changes, otherEv);
|
||||
} else {
|
||||
ev.status = EventWriteStatus::Replaced;
|
||||
}
|
||||
@ -285,12 +294,11 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
|
||||
env.generic_foreachFull(txn, env.dbi_Event__replace, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) {
|
||||
ParsedKey_StringUint64 parsedKey(k);
|
||||
if (parsedKey.s == searchStr && parsedKey.n == flat->kind()) {
|
||||
auto otherEv = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
|
||||
if (!otherEv) throw herr("missing event from index, corrupt DB?");
|
||||
auto otherEv = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||
|
||||
if (otherEv->flat_nested()->created_at() < flat->created_at()) {
|
||||
if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv->flat_nested()->id()));
|
||||
deleteEvent(txn, changes, *otherEv);
|
||||
if (otherEv.flat_nested()->created_at() < flat->created_at()) {
|
||||
if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(sv(otherEv.flat_nested()->id()));
|
||||
deleteEvent(txn, changes, otherEv);
|
||||
} else {
|
||||
ev.status = EventWriteStatus::Replaced;
|
||||
}
|
||||
|
@ -45,9 +45,11 @@ inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) {
|
||||
|
||||
|
||||
std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn, std::string_view id);
|
||||
defaultDb::environment::View_Event lookupEventByLevId(lmdb::txn &txn, uint64_t levId); // throws if can't find
|
||||
uint64_t getMostRecentLevId(lmdb::txn &txn);
|
||||
std::string_view decodeEventPayload(lmdb::txn &txn, Decompressor &decomp, std::string_view raw, uint32_t *outDictId, size_t *outCompressedSize);
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId);
|
||||
std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t levId, std::string_view eventPayload);
|
||||
|
||||
inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) {
|
||||
uint64_t timestamp = flat->created_at();
|
||||
|
@ -21,3 +21,4 @@ std::string renderPercent(double p);
|
||||
uint64_t parseUint64(const std::string &s);
|
||||
std::string parseIP(const std::string &ip);
|
||||
uint64_t getDBVersion(lmdb::txn &txn);
|
||||
std::string padBytes(std::string_view str, size_t n, char padChar);
|
||||
|
@ -103,3 +103,9 @@ uint64_t getDBVersion(lmdb::txn &txn) {
|
||||
|
||||
return dbVersion;
|
||||
}
|
||||
|
||||
|
||||
std::string padBytes(std::string_view str, size_t n, char padChar) {
|
||||
if (str.size() > n) throw herr("unable to pad, string longer than expected");
|
||||
return std::string(str) + std::string(n - str.size(), padChar);
|
||||
}
|
||||
|
@ -8,8 +8,6 @@ use IPC::Open2;
|
||||
|
||||
# ./strfry export|perl -MJSON::XS -nE '$z=decode_json($_); for my $t (@{$z->{tags}}) { say $t->[1] if $t->[0] eq "e"}'|sort|uniq -c|sort -rn|head -50|perl -nE '/\d+\s+(\w+)/ && say $1'
|
||||
|
||||
# Don't forget to set 'maxFilterLimit = 1000000000000' in config
|
||||
|
||||
|
||||
my $kinds = [qw/1 7 4 42 0 30 3 6/];
|
||||
|
||||
@ -90,7 +88,9 @@ c1e5e04d92d9bd20701bff4cbdac1cdc317d405035883b7adcf9a6a5308d0f54
|
||||
}];
|
||||
|
||||
sub genRandomFilterGroup {
|
||||
my $numFilters = (rand()*10)+1;
|
||||
my $useLimit = shift;
|
||||
|
||||
my $numFilters = $useLimit ? 1 : (rand()*10)+1;
|
||||
|
||||
my @filters;
|
||||
for (1..$numFilters) {
|
||||
@ -100,14 +100,14 @@ sub genRandomFilterGroup {
|
||||
if (rand() < .15) {
|
||||
$f->{ids} = [];
|
||||
for (1..(rand()*10)) {
|
||||
push @{$f->{ids}}, randPrefix($ids->[int(rand() * @$ids)]);
|
||||
push @{$f->{ids}}, randPrefix($ids->[int(rand() * @$ids)], $useLimit);
|
||||
}
|
||||
}
|
||||
|
||||
if (rand() < .3) {
|
||||
$f->{authors} = [];
|
||||
for (1..(rand()*5)) {
|
||||
push @{$f->{authors}}, randPrefix($pubkeys->[int(rand() * @$pubkeys)]);
|
||||
push @{$f->{authors}}, randPrefix($pubkeys->[int(rand() * @$pubkeys)], $useLimit);
|
||||
}
|
||||
}
|
||||
|
||||
@ -141,6 +141,10 @@ sub genRandomFilterGroup {
|
||||
$f->{until} = 1640300802 + int(rand() * 86400*365);
|
||||
}
|
||||
|
||||
if ($useLimit) {
|
||||
$f->{limit} = 1 + int(rand() * 1000);
|
||||
}
|
||||
|
||||
if ($f->{since} && $f->{until} && $f->{since} > $f->{until}) {
|
||||
delete $f->{since};
|
||||
delete $f->{until};
|
||||
@ -154,7 +158,8 @@ sub genRandomFilterGroup {
|
||||
|
||||
sub randPrefix {
|
||||
my $v = shift;
|
||||
return $v if rand() < .5;
|
||||
my $noPrefix = shift;
|
||||
return $v if $noPrefix || rand() < .5;
|
||||
return substr($v, 0, (int(rand() * 20) + 1) * 2);
|
||||
}
|
||||
|
||||
@ -190,8 +195,10 @@ sub testScan {
|
||||
#print JSON::XS->new->pretty(1)->encode($fg);
|
||||
print "$fge\n";
|
||||
|
||||
my $resA = `./strfry export 2>/dev/null | perl test/dumbFilter.pl '$fge' | jq -r .pubkey | sort | sha256sum`;
|
||||
my $resB = `./strfry scan '$fge' | jq -r .pubkey | sort | sha256sum`;
|
||||
my $headCmd = @$fg == 1 && $fg->[0]->{limit} ? "| head -n $fg->[0]->{limit}" : "";
|
||||
|
||||
my $resA = `./strfry export --reverse 2>/dev/null | perl test/dumbFilter.pl '$fge' $headCmd | jq -r .id | sort | sha256sum`;
|
||||
my $resB = `./strfry scan --pause 1 --metrics '$fge' | jq -r .id | sort | sha256sum`;
|
||||
|
||||
print "$resA\n$resB\n";
|
||||
|
||||
@ -212,7 +219,7 @@ sub testMonitor {
|
||||
print "filt: $fge\n\n";
|
||||
|
||||
print "DOING MONS\n";
|
||||
my $pid = open2(my $outfile, my $infile, './strfry monitor | jq -r .pubkey | sort | sha256sum');
|
||||
my $pid = open2(my $outfile, my $infile, './strfry monitor | jq -r .id | sort | sha256sum');
|
||||
for my $c (@$monCmds) { print $infile encode_json($c), "\n"; }
|
||||
close($infile);
|
||||
|
||||
@ -223,7 +230,7 @@ sub testMonitor {
|
||||
die "monitor cmd died" if $child_exit_status;
|
||||
|
||||
print "DOING SCAN\n";
|
||||
my $resB = `./strfry scan '$fge' 2>/dev/null | jq -r .pubkey | sort | sha256sum`;
|
||||
my $resB = `./strfry scan '$fge' 2>/dev/null | jq -r .id | sort | sha256sum`;
|
||||
|
||||
print "$resA\n$resB\n";
|
||||
|
||||
@ -246,6 +253,11 @@ if ($cmd eq 'scan') {
|
||||
my $fg = genRandomFilterGroup();
|
||||
testScan($fg);
|
||||
}
|
||||
} elsif ($cmd eq 'scan-limit') {
|
||||
while (1) {
|
||||
my $fg = genRandomFilterGroup(1);
|
||||
testScan($fg);
|
||||
}
|
||||
} elsif ($cmd eq 'monitor') {
|
||||
while (1) {
|
||||
my ($monCmds, $interestFg) = genRandomMonitorCmds();
|
||||
|
Reference in New Issue
Block a user