mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-17 16:58:50 +00:00
generalise replaceable and ephemeral events
This commit is contained in:
1
TODO
1
TODO
@ -26,6 +26,7 @@ features
|
|||||||
* inverted filter: delete events that *don't* match the provided filter
|
* inverted filter: delete events that *don't* match the provided filter
|
||||||
? relay block-list events
|
? relay block-list events
|
||||||
? if a client disconnects, delete all its pending write messages
|
? if a client disconnects, delete all its pending write messages
|
||||||
|
? support filtering on empty value tags
|
||||||
|
|
||||||
rate limits
|
rate limits
|
||||||
! event writes per second per ip
|
! event writes per second per ip
|
||||||
|
@ -48,7 +48,7 @@ tables:
|
|||||||
multi: true
|
multi: true
|
||||||
deletion: # eventId, pubkey
|
deletion: # eventId, pubkey
|
||||||
multi: true
|
multi: true
|
||||||
expiration:
|
expiration: # unix timestamp, value of 1 is special-case for ephemeral event
|
||||||
integer: true
|
integer: true
|
||||||
multi: true
|
multi: true
|
||||||
replace: # pubkey, d-tag, kind
|
replace: # pubkey, d-tag, kind
|
||||||
|
@ -14,6 +14,8 @@ void RelayServer::runCron() {
|
|||||||
|
|
||||||
|
|
||||||
// Delete ephemeral events
|
// Delete ephemeral events
|
||||||
|
// FIXME: This is for backwards compat during upgrades, and can be removed eventually since
|
||||||
|
// the newer style of finding ephemeral events relies on expiration=1
|
||||||
|
|
||||||
cron.repeat(10 * 1'000'000UL, [&]{
|
cron.repeat(10 * 1'000'000UL, [&]{
|
||||||
std::vector<uint64_t> expiredLevIds;
|
std::vector<uint64_t> expiredLevIds;
|
||||||
@ -74,6 +76,67 @@ void RelayServer::runCron() {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
// Delete expired events
|
||||||
|
|
||||||
|
cron.repeat(9 * 1'000'000UL, [&]{
|
||||||
|
std::vector<uint64_t> expiredLevIds;
|
||||||
|
uint64_t numEphemeral = 0;
|
||||||
|
uint64_t numExpired = 0;
|
||||||
|
|
||||||
|
{
|
||||||
|
auto txn = env.txn_ro();
|
||||||
|
|
||||||
|
auto mostRecent = getMostRecentLevId(txn);
|
||||||
|
uint64_t now = hoytech::curr_time_s();
|
||||||
|
uint64_t ephemeralCutoff = now - cfg().events__ephemeralEventsLifetimeSeconds;
|
||||||
|
|
||||||
|
env.generic_foreachFull(txn, env.dbi_Event__expiration, lmdb::to_sv<uint64_t>(0), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
|
||||||
|
auto expiration = lmdb::from_sv<uint64_t>(k);
|
||||||
|
auto levId = lmdb::from_sv<uint64_t>(v);
|
||||||
|
|
||||||
|
if (levId == mostRecent) return true;
|
||||||
|
|
||||||
|
if (expiration == 1) { // Ephemeral event
|
||||||
|
auto view = env.lookup_Event(txn, levId);
|
||||||
|
if (!view) throw herr("missing event from index, corrupt DB?");
|
||||||
|
uint64_t created = view->flat_nested()->created_at();
|
||||||
|
|
||||||
|
if (created <= ephemeralCutoff) {
|
||||||
|
numEphemeral++;
|
||||||
|
expiredLevIds.emplace_back(levId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
numExpired++;
|
||||||
|
expiredLevIds.emplace_back(levId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return expiration <= now;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (expiredLevIds.size() > 0) {
|
||||||
|
auto txn = env.txn_rw();
|
||||||
|
|
||||||
|
uint64_t numDeleted = 0;
|
||||||
|
auto changes = qdb.change();
|
||||||
|
|
||||||
|
for (auto levId : expiredLevIds) {
|
||||||
|
auto view = env.lookup_Event(txn, levId);
|
||||||
|
if (!view) continue; // Deleted in between transactions
|
||||||
|
deleteEvent(txn, changes, *view);
|
||||||
|
numDeleted++;
|
||||||
|
}
|
||||||
|
|
||||||
|
changes.apply(txn);
|
||||||
|
|
||||||
|
txn.commit();
|
||||||
|
|
||||||
|
if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
// Garbage collect quadrable nodes
|
// Garbage collect quadrable nodes
|
||||||
|
|
||||||
cron.repeat(60 * 60 * 1'000'000UL, [&]{
|
cron.repeat(60 * 60 * 1'000'000UL, [&]{
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
static const char USAGE[] =
|
static const char USAGE[] =
|
||||||
R"(
|
R"(
|
||||||
Usage:
|
Usage:
|
||||||
export [--since=<since>] [--until=<until>] [--reverse] [--include-ephemeral]
|
export [--since=<since>] [--until=<until>] [--reverse]
|
||||||
)";
|
)";
|
||||||
|
|
||||||
|
|
||||||
@ -19,7 +19,6 @@ void cmd_export(const std::vector<std::string> &subArgs) {
|
|||||||
uint64_t since = 0, until = MAX_U64;
|
uint64_t since = 0, until = MAX_U64;
|
||||||
if (args["--since"]) since = args["--since"].asLong();
|
if (args["--since"]) since = args["--since"].asLong();
|
||||||
if (args["--until"]) until = args["--until"].asLong();
|
if (args["--until"]) until = args["--until"].asLong();
|
||||||
bool includeEphemeral = args["--include-ephemeral"].asBool();
|
|
||||||
bool reverse = args["--reverse"].asBool();
|
bool reverse = args["--reverse"].asBool();
|
||||||
|
|
||||||
Decompressor decomp;
|
Decompressor decomp;
|
||||||
@ -49,8 +48,6 @@ void cmd_export(const std::vector<std::string> &subArgs) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!includeEphemeral && isEphemeralEvent(view.flat_nested()->kind())) return true;
|
|
||||||
|
|
||||||
std::cout << getEventJson(txn, decomp, view.primaryKeyId) << "\n";
|
std::cout << getEventJson(txn, decomp, view.primaryKeyId) << "\n";
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -24,11 +24,10 @@ std::string nostrJsonToFlat(const tao::json::value &v) {
|
|||||||
if (v.at("tags").get_array().size() > cfg().events__maxNumTags) throw herr("too many tags: ", v.at("tags").get_array().size());
|
if (v.at("tags").get_array().size() > cfg().events__maxNumTags) throw herr("too many tags: ", v.at("tags").get_array().size());
|
||||||
for (auto &tagArr : v.at("tags").get_array()) {
|
for (auto &tagArr : v.at("tags").get_array()) {
|
||||||
auto &tag = tagArr.get_array();
|
auto &tag = tagArr.get_array();
|
||||||
if (tag.size() < 2) throw herr("too few fields in tag");
|
if (tag.size() < 1) throw herr("too few fields in tag");
|
||||||
|
|
||||||
auto tagName = tag.at(0).get_string();
|
auto tagName = tag.at(0).get_string();
|
||||||
|
auto tagVal = tag.size() >= 2 ? tag.at(1).get_string() : "";
|
||||||
auto tagVal = tag.at(1).get_string();
|
|
||||||
|
|
||||||
if (tagName == "e" || tagName == "p") {
|
if (tagName == "e" || tagName == "p") {
|
||||||
tagVal = from_hex(tagVal, false);
|
tagVal = from_hex(tagVal, false);
|
||||||
@ -41,10 +40,11 @@ std::string nostrJsonToFlat(const tao::json::value &v) {
|
|||||||
} else if (tagName == "expiration") {
|
} else if (tagName == "expiration") {
|
||||||
if (expiration == 0) {
|
if (expiration == 0) {
|
||||||
expiration = parseUint64(tagVal);
|
expiration = parseUint64(tagVal);
|
||||||
if (expiration == 0) expiration = 1; // special value to indicate expiration of 0 was set
|
if (expiration < 100) throw herr("invalid expiration");
|
||||||
}
|
}
|
||||||
|
} else if (tagName == "ephemeral") {
|
||||||
|
expiration = 1;
|
||||||
} else if (tagName.size() == 1) {
|
} else if (tagName.size() == 1) {
|
||||||
if (tagVal.size() == 0) throw herr("tag val empty");
|
|
||||||
if (tagVal.size() > cfg().events__maxTagValSize) throw herr("tag val too large: ", tagVal.size());
|
if (tagVal.size() > cfg().events__maxTagValSize) throw herr("tag val too large: ", tagVal.size());
|
||||||
|
|
||||||
if (tagVal.size() <= MAX_INDEXED_TAG_VAL_SIZE) {
|
if (tagVal.size() <= MAX_INDEXED_TAG_VAL_SIZE) {
|
||||||
@ -56,6 +56,17 @@ std::string nostrJsonToFlat(const tao::json::value &v) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isDefaultReplaceableKind(kind)) {
|
||||||
|
tagsGeneral.emplace_back(NostrIndex::CreateTagGeneral(builder,
|
||||||
|
'd',
|
||||||
|
builder.CreateVector((uint8_t*)"", 0)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isDefaultEphemeralKind(kind)) {
|
||||||
|
expiration = 1;
|
||||||
|
}
|
||||||
|
|
||||||
// Create flatbuffer
|
// Create flatbuffer
|
||||||
|
|
||||||
auto eventPtr = NostrIndex::CreateEvent(builder,
|
auto eventPtr = NostrIndex::CreateEvent(builder,
|
||||||
@ -125,7 +136,7 @@ void verifyEventTimestamp(const NostrIndex::Event *flat) {
|
|||||||
auto now = hoytech::curr_time_s();
|
auto now = hoytech::curr_time_s();
|
||||||
auto ts = flat->created_at();
|
auto ts = flat->created_at();
|
||||||
|
|
||||||
uint64_t earliest = now - (isEphemeralEvent(flat->kind()) ? cfg().events__rejectEphemeralEventsOlderThanSeconds : cfg().events__rejectEventsOlderThanSeconds);
|
uint64_t earliest = now - (flat->expiration() == 1 ? cfg().events__rejectEphemeralEventsOlderThanSeconds : cfg().events__rejectEventsOlderThanSeconds);
|
||||||
uint64_t latest = now + cfg().events__rejectEventsNewerThanSeconds;
|
uint64_t latest = now + cfg().events__rejectEventsNewerThanSeconds;
|
||||||
|
|
||||||
if (ts < earliest) throw herr("created_at too early");
|
if (ts < earliest) throw herr("created_at too early");
|
||||||
@ -261,24 +272,8 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isReplaceableEvent(flat->kind())) {
|
{
|
||||||
auto searchKey = makeKey_StringUint64Uint64(sv(flat->pubkey()), flat->kind(), MAX_U64);
|
std::optional<std::string> replace;
|
||||||
|
|
||||||
env.generic_foreachFull(txn, env.dbi_Event__pubkeyKind, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) {
|
|
||||||
ParsedKey_StringUint64Uint64 parsedKey(k);
|
|
||||||
if (parsedKey.s == sv(flat->pubkey()) && parsedKey.n1 == flat->kind()) {
|
|
||||||
if (parsedKey.n2 < flat->created_at()) {
|
|
||||||
auto otherEv = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
|
||||||
if (logLevel >= 1) LI << "Deleting event (replaceable). id=" << to_hex(sv(otherEv.flat_nested()->id()));
|
|
||||||
deleteEvent(txn, changes, otherEv);
|
|
||||||
} else {
|
|
||||||
ev.status = EventWriteStatus::Replaced;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}, true);
|
|
||||||
} else {
|
|
||||||
std::string replace;
|
|
||||||
|
|
||||||
for (const auto &tagPair : *(flat->tagsGeneral())) {
|
for (const auto &tagPair : *(flat->tagsGeneral())) {
|
||||||
auto tagName = (char)tagPair->key();
|
auto tagName = (char)tagPair->key();
|
||||||
@ -287,8 +282,8 @@ void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::vector<EventToW
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (replace.size()) {
|
if (replace) {
|
||||||
auto searchStr = std::string(sv(flat->pubkey())) + replace;
|
auto searchStr = std::string(sv(flat->pubkey())) + *replace;
|
||||||
auto searchKey = makeKey_StringUint64(searchStr, flat->kind());
|
auto searchKey = makeKey_StringUint64(searchStr, flat->kind());
|
||||||
|
|
||||||
env.generic_foreachFull(txn, env.dbi_Event__replace, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) {
|
env.generic_foreachFull(txn, env.dbi_Event__replace, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) {
|
||||||
|
@ -9,16 +9,17 @@
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
inline bool isReplaceableEvent(uint64_t kind) {
|
inline bool isDefaultReplaceableKind(uint64_t kind) {
|
||||||
return (
|
return (
|
||||||
kind == 0 ||
|
kind == 0 ||
|
||||||
kind == 3 ||
|
kind == 3 ||
|
||||||
kind == 41 ||
|
kind == 41 ||
|
||||||
(kind >= 10'000 && kind < 20'000)
|
(kind >= 10'000 && kind < 20'000) ||
|
||||||
|
(kind >= 30'000 && kind < 40'000)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool isEphemeralEvent(uint64_t kind) {
|
inline bool isDefaultEphemeralKind(uint64_t kind) {
|
||||||
return (
|
return (
|
||||||
(kind >= 20'000 && kind < 30'000)
|
(kind >= 20'000 && kind < 30'000)
|
||||||
);
|
);
|
||||||
|
@ -2,6 +2,9 @@
|
|||||||
|
|
||||||
use strict;
|
use strict;
|
||||||
|
|
||||||
|
use Carp;
|
||||||
|
$SIG{ __DIE__ } = \&Carp::confess;
|
||||||
|
|
||||||
use Data::Dumper;
|
use Data::Dumper;
|
||||||
use JSON::XS;
|
use JSON::XS;
|
||||||
|
|
||||||
@ -40,6 +43,17 @@ doTest({
|
|||||||
verify => [ 1, ],
|
verify => [ 1, ],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
## Same, but explicit empty d tag
|
||||||
|
|
||||||
|
doTest({
|
||||||
|
events => [
|
||||||
|
qq{--sec $ids->[0]->{sec} --content "hi" --kind 10000 --created-at 5000 },
|
||||||
|
qq{--sec $ids->[0]->{sec} --content "hi 2" --kind 10000 --created-at 5001 --tag d '' },
|
||||||
|
qq{--sec $ids->[0]->{sec} --content "hi" --kind 10000 --created-at 5000 },
|
||||||
|
],
|
||||||
|
verify => [ 1, ],
|
||||||
|
});
|
||||||
|
|
||||||
## Replacement is dropped
|
## Replacement is dropped
|
||||||
|
|
||||||
doTest({
|
doTest({
|
||||||
@ -159,6 +173,9 @@ doTest({
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
print "OK\n";
|
||||||
|
|
||||||
|
|
||||||
sub doTest {
|
sub doTest {
|
||||||
my $spec = shift;
|
my $spec = shift;
|
||||||
|
|
||||||
@ -200,7 +217,7 @@ sub addEvent {
|
|||||||
|
|
||||||
my $eventJson = `cat test-eventXYZ.json`;
|
my $eventJson = `cat test-eventXYZ.json`;
|
||||||
|
|
||||||
system(qq{ <test-eventXYZ.json ./strfry --config test/strfry.conf import --no-gc 2>/dev/null });
|
system(qq{ <test-eventXYZ.json ./strfry --config test/strfry.conf import 2>/dev/null });
|
||||||
|
|
||||||
system(qq{ rm test-eventXYZ.json });
|
system(qq{ rm test-eventXYZ.json });
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user