mirror of
https://github.com/hoytech/strfry.git
synced 2025-06-17 08:48:51 +00:00
don't use flatbuffers for Event index table (make it opaque)
This commit is contained in:
@ -21,9 +21,7 @@ tables:
|
|||||||
## Meta-info of nostr events, suitable for indexing
|
## Meta-info of nostr events, suitable for indexing
|
||||||
## Primary key is auto-incremented, called "levId" for Local EVent ID
|
## Primary key is auto-incremented, called "levId" for Local EVent ID
|
||||||
Event:
|
Event:
|
||||||
fields:
|
opaque: true
|
||||||
- name: packed
|
|
||||||
type: ubytes
|
|
||||||
|
|
||||||
indices:
|
indices:
|
||||||
created_at:
|
created_at:
|
||||||
@ -48,7 +46,7 @@ tables:
|
|||||||
multi: true
|
multi: true
|
||||||
|
|
||||||
indexPrelude: |
|
indexPrelude: |
|
||||||
PackedEventView packed(v.packed());
|
PackedEventView packed(v.buf);
|
||||||
created_at = packed.created_at();
|
created_at = packed.created_at();
|
||||||
uint64_t indexTime = *created_at;
|
uint64_t indexTime = *created_at;
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ struct ActiveMonitors : NonCopyable {
|
|||||||
if (item.latestEventId >= ev.primaryKeyId || item.mon->sub.latestEventId >= ev.primaryKeyId) continue;
|
if (item.latestEventId >= ev.primaryKeyId || item.mon->sub.latestEventId >= ev.primaryKeyId) continue;
|
||||||
item.latestEventId = ev.primaryKeyId;
|
item.latestEventId = ev.primaryKeyId;
|
||||||
|
|
||||||
if (f->doesMatch(PackedEventView(ev.packed()))) {
|
if (f->doesMatch(PackedEventView(ev.buf))) {
|
||||||
recipients.emplace_back(item.mon->sub.connId, item.mon->sub.subId);
|
recipients.emplace_back(item.mon->sub.connId, item.mon->sub.subId);
|
||||||
item.mon->sub.latestEventId = ev.primaryKeyId;
|
item.mon->sub.latestEventId = ev.primaryKeyId;
|
||||||
continue;
|
continue;
|
||||||
@ -113,7 +113,7 @@ struct ActiveMonitors : NonCopyable {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto packed = PackedEventView(ev.packed());
|
auto packed = PackedEventView(ev.buf);
|
||||||
|
|
||||||
{
|
{
|
||||||
auto id = std::string(packed.id());
|
auto id = std::string(packed.id());
|
||||||
|
@ -258,7 +258,7 @@ struct DBScan : NonCopyable {
|
|||||||
} else {
|
} else {
|
||||||
approxWork += 10;
|
approxWork += 10;
|
||||||
auto view = env.lookup_Event(txn, levId);
|
auto view = env.lookup_Event(txn, levId);
|
||||||
if (view && f.doesMatch(PackedEventView(view->packed()))) doSend = true;
|
if (view && f.doesMatch(PackedEventView(view->buf))) doSend = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (doSend) {
|
if (doSend) {
|
||||||
|
@ -193,7 +193,7 @@ struct Router {
|
|||||||
|
|
||||||
void outgoingEvent(lmdb::txn &txn, defaultDb::environment::View_Event &ev, std::string &responseStr, tao::json::value &evJson) {
|
void outgoingEvent(lmdb::txn &txn, defaultDb::environment::View_Event &ev, std::string &responseStr, tao::json::value &evJson) {
|
||||||
if (dir == "down") return;
|
if (dir == "down") return;
|
||||||
if (!filterCompiled.doesMatch(PackedEventView(ev.packed()))) return;
|
if (!filterCompiled.doesMatch(PackedEventView(ev.buf))) return;
|
||||||
|
|
||||||
if (responseStr.size() == 0) {
|
if (responseStr.size() == 0) {
|
||||||
auto evStr = getEventJson(txn, router->decomp, ev.primaryKeyId);
|
auto evStr = getEventJson(txn, router->decomp, ev.primaryKeyId);
|
||||||
|
@ -101,7 +101,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
|
|||||||
env.foreach_Event(txn, [&](auto &ev){
|
env.foreach_Event(txn, [&](auto &ev){
|
||||||
currEventId = ev.primaryKeyId;
|
currEventId = ev.primaryKeyId;
|
||||||
|
|
||||||
auto id = std::string(PackedEventView(ev.packed()).id());
|
auto id = std::string(PackedEventView(ev.buf).id());
|
||||||
if (downloadedIds.find(id) != downloadedIds.end()) {
|
if (downloadedIds.find(id) != downloadedIds.end()) {
|
||||||
downloadedIds.erase(id);
|
downloadedIds.erase(id);
|
||||||
return true;
|
return true;
|
||||||
|
@ -72,7 +72,7 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
|
|||||||
|
|
||||||
for (auto levId : levIds) {
|
for (auto levId : levIds) {
|
||||||
auto ev = lookupEventByLevId(txn, levId);
|
auto ev = lookupEventByLevId(txn, levId);
|
||||||
PackedEventView packed(ev.packed());
|
PackedEventView packed(ev.buf);
|
||||||
ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize));
|
ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ void RelayServer::runCron() {
|
|||||||
if (expiration == 1) { // Ephemeral event
|
if (expiration == 1) { // Ephemeral event
|
||||||
auto view = env.lookup_Event(txn, levId);
|
auto view = env.lookup_Event(txn, levId);
|
||||||
if (!view) throw herr("missing event from index, corrupt DB?");
|
if (!view) throw herr("missing event from index, corrupt DB?");
|
||||||
uint64_t created = PackedEventView(view->packed()).created_at();
|
uint64_t created = PackedEventView(view->buf).created_at();
|
||||||
|
|
||||||
if (created <= ephemeralCutoff) {
|
if (created <= ephemeralCutoff) {
|
||||||
numEphemeral++;
|
numEphemeral++;
|
||||||
|
@ -100,7 +100,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
|
|||||||
for (auto levId : view->levIds) {
|
for (auto levId : view->levIds) {
|
||||||
try {
|
try {
|
||||||
auto ev = lookupEventByLevId(txn, levId);
|
auto ev = lookupEventByLevId(txn, levId);
|
||||||
auto packed = PackedEventView(ev.packed());
|
auto packed = PackedEventView(ev.buf);
|
||||||
view->ne.addItem(packed.created_at(), packed.id().substr(0, view->ne.idSize));
|
view->ne.addItem(packed.created_at(), packed.id().substr(0, view->ne.idSize));
|
||||||
} catch (std::exception &) {
|
} catch (std::exception &) {
|
||||||
// levId was deleted when query was paused
|
// levId was deleted when query was paused
|
||||||
|
@ -31,7 +31,7 @@ void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
|
|||||||
auto connId = msg->sub.connId;
|
auto connId = msg->sub.connId;
|
||||||
|
|
||||||
env.foreach_Event(txn, [&](auto &ev){
|
env.foreach_Event(txn, [&](auto &ev){
|
||||||
if (msg->sub.filterGroup.doesMatch(PackedEventView(ev.packed()))) {
|
if (msg->sub.filterGroup.doesMatch(PackedEventView(ev.buf))) {
|
||||||
sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
|
sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,7 +282,7 @@ void writeEvents(lmdb::txn &txn, std::vector<EventToWrite> &evs, uint64_t logLev
|
|||||||
auto otherEv = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
auto otherEv = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
|
||||||
|
|
||||||
auto thisTimestamp = packed.created_at();
|
auto thisTimestamp = packed.created_at();
|
||||||
auto otherPacked = PackedEventView(otherEv.packed());
|
auto otherPacked = PackedEventView(otherEv.buf);
|
||||||
auto otherTimestamp = otherPacked.created_at();
|
auto otherTimestamp = otherPacked.created_at();
|
||||||
|
|
||||||
if (otherTimestamp < thisTimestamp ||
|
if (otherTimestamp < thisTimestamp ||
|
||||||
@ -304,7 +304,7 @@ void writeEvents(lmdb::txn &txn, std::vector<EventToWrite> &evs, uint64_t logLev
|
|||||||
packed.foreachTag([&](char tagName, std::string_view tagVal){
|
packed.foreachTag([&](char tagName, std::string_view tagVal){
|
||||||
if (tagName == 'e') {
|
if (tagName == 'e') {
|
||||||
auto otherEv = lookupEventById(txn, tagVal);
|
auto otherEv = lookupEventById(txn, tagVal);
|
||||||
if (otherEv && PackedEventView(otherEv->packed()).pubkey() == packed.pubkey()) {
|
if (otherEv && PackedEventView(otherEv->buf).pubkey() == packed.pubkey()) {
|
||||||
if (logLevel >= 1) LI << "Deleting event (kind 5). id=" << to_hex(tagVal);
|
if (logLevel >= 1) LI << "Deleting event (kind 5). id=" << to_hex(tagVal);
|
||||||
levIdsToDelete.push_back(otherEv->primaryKeyId);
|
levIdsToDelete.push_back(otherEv->primaryKeyId);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user