1
0
mirror of git://jb55.com/damus synced 2024-10-04 19:00:42 +00:00

update nostrdb

to include potential ingester thread crash fix
This commit is contained in:
William Casarin 2023-11-14 10:26:09 -08:00
parent 41c76d9de0
commit bc330ab5de
2 changed files with 220 additions and 14 deletions

View File

@ -72,18 +72,6 @@ enum ndb_writer_msgtype {
NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
};
enum ndb_dbs {
NDB_DB_NOTE,
NDB_DB_META,
NDB_DB_PROFILE,
NDB_DB_NOTE_ID,
NDB_DB_PROFILE_PK,
NDB_DB_NDB_META,
NDB_DB_PROFILE_SEARCH,
NDB_DB_PROFILE_LAST_FETCH,
NDB_DBS,
};
// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
enum ndb_meta_key {
NDB_META_KEY_VERSION = 1
@ -1572,6 +1560,7 @@ static void *ndb_ingester_thread(void *data)
struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
int i, to_write, popped, done, any_event;
MDB_txn *read_txn = NULL;
int rc;
ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
ndb_debug("started ingester thread\n");
@ -1592,8 +1581,12 @@ static void *ndb_ingester_thread(void *data)
}
}
if (any_event)
mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn);
if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) {
// this is bad
fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n",
mdb_strerror(rc));
continue;
}
for (i = 0; i < popped; i++) {
msg = &msgs[i];
@ -2916,6 +2909,93 @@ int ndb_builder_new_tag(struct ndb_builder *builder)
return cursor_push_tag(&builder->note_cur, &tag);
}
void ndb_stat_counts_init(struct ndb_stat_counts *counts)
{
counts->count = 0;
counts->key_size = 0;
counts->value_size = 0;
}
static void ndb_stat_init(struct ndb_stat *stat)
{
// init stats
int i;
for (i = 0; i < NDB_CKIND_COUNT; i++) {
ndb_stat_counts_init(&stat->common_kinds[i]);
}
for (i = 0; i < NDB_DBS; i++) {
ndb_stat_counts_init(&stat->dbs[i]);
}
ndb_stat_counts_init(&stat->other_kinds);
}
int ndb_stat(struct ndb *ndb, struct ndb_stat *stat)
{
int rc;
MDB_cursor *cur;
MDB_val k, v;
MDB_dbi db;
struct ndb_txn txn;
struct ndb_note *note;
int i;
enum ndb_common_kind common_kind;
// initialize to 0
ndb_stat_init(stat);
if (!ndb_begin_query(ndb, &txn)) {
fprintf(stderr, "ndb_stat failed at ndb_begin_query\n");
return 0;
}
// stat each dbi in the database
for (i = 0; i < NDB_DBS; i++)
{
db = ndb->lmdb.dbs[i];
if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) {
fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n",
mdb_strerror(rc));
return 0;
}
// loop over every entry and count kv sizes
while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
// we gather more detailed per-kind stats if we're in
// the notes db
if (i == NDB_DB_NOTE) {
note = v.mv_data;
common_kind = ndb_kind_to_common_kind(note->kind);
// uncommon kind? just count them in bulk
if (common_kind == -1) {
stat->other_kinds.count++;
stat->other_kinds.key_size += k.mv_size;
stat->other_kinds.value_size += v.mv_size;
} else {
stat->common_kinds[common_kind].count++;
stat->common_kinds[common_kind].key_size += k.mv_size;
stat->common_kinds[common_kind].value_size += v.mv_size;
}
}
stat->dbs[i].count++;
stat->dbs[i].key_size += k.mv_size;
stat->dbs[i].value_size += v.mv_size;
}
// close the cursor, they are per-dbi
mdb_cursor_close(cur);
}
ndb_end_query(&txn);
return 1;
}
/// Push an element to the current tag
///
/// Basic idea is to call ndb_builder_new_tag

View File

@ -32,6 +32,51 @@ struct ndb_search_key
uint64_t timestamp;
};
enum ndb_dbs {
NDB_DB_NOTE,
NDB_DB_META,
NDB_DB_PROFILE,
NDB_DB_NOTE_ID,
NDB_DB_PROFILE_PK,
NDB_DB_NDB_META,
NDB_DB_PROFILE_SEARCH,
NDB_DB_PROFILE_LAST_FETCH,
NDB_DBS,
};
// common kinds. we collect stats on these in ndb_stat. mainly because I don't
// want to deal with including a hashtable to the project.
enum ndb_common_kind {
NDB_CKIND_PROFILE,
NDB_CKIND_TEXT,
NDB_CKIND_CONTACTS,
NDB_CKIND_DM,
NDB_CKIND_DELETE,
NDB_CKIND_REPOST,
NDB_CKIND_REACTION,
NDB_CKIND_ZAP,
NDB_CKIND_ZAP_REQUEST,
NDB_CKIND_NWC_REQUEST,
NDB_CKIND_NWC_RESPONSE,
NDB_CKIND_HTTP_AUTH,
NDB_CKIND_LIST,
NDB_CKIND_LONGFORM,
NDB_CKIND_STATUS,
NDB_CKIND_COUNT, // should always be last
};
struct ndb_stat_counts {
size_t key_size;
size_t value_size;
size_t count;
};
struct ndb_stat {
struct ndb_stat_counts dbs[NDB_DBS];
struct ndb_stat_counts common_kinds[NDB_CKIND_COUNT];
struct ndb_stat_counts other_kinds;
};
struct ndb_search {
struct ndb_search_key *key;
uint64_t profile_key;
@ -224,6 +269,10 @@ void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind);
int ndb_builder_new_tag(struct ndb_builder *builder);
int ndb_builder_push_tag_str(struct ndb_builder *builder, const char *str, int len);
// stats
int ndb_stat(struct ndb *ndb, struct ndb_stat *stat);
void ndb_stat_counts_init(struct ndb_stat_counts *counts);
static inline struct ndb_str ndb_note_str(struct ndb_note *note,
union ndb_packed_str *pstr)
{
@ -349,4 +398,81 @@ static inline int ndb_tags_iterate_next(struct ndb_iterator *iter)
return 0;
}
static inline enum ndb_common_kind
ndb_kind_to_common_kind(int kind)
{
switch (kind)
{
case 0: return NDB_CKIND_PROFILE;
case 1: return NDB_CKIND_TEXT;
case 3: return NDB_CKIND_CONTACTS;
case 4: return NDB_CKIND_DM;
case 5: return NDB_CKIND_DELETE;
case 6: return NDB_CKIND_REPOST;
case 7: return NDB_CKIND_REACTION;
case 9735: return NDB_CKIND_ZAP;
case 9734: return NDB_CKIND_ZAP_REQUEST;
case 23194: return NDB_CKIND_NWC_REQUEST;
case 23195: return NDB_CKIND_NWC_RESPONSE;
case 27235: return NDB_CKIND_HTTP_AUTH;
case 30000: return NDB_CKIND_LIST;
case 30023: return NDB_CKIND_LONGFORM;
case 30315: return NDB_CKIND_STATUS;
}
return -1;
}
static inline const char *
ndb_kind_name(enum ndb_common_kind ck)
{
switch (ck) {
case NDB_CKIND_PROFILE: return "profile";
case NDB_CKIND_TEXT: return "text";
case NDB_CKIND_CONTACTS: return "contacts";
case NDB_CKIND_DM: return "dm";
case NDB_CKIND_DELETE: return "delete";
case NDB_CKIND_REPOST: return "repost";
case NDB_CKIND_REACTION: return "reaction";
case NDB_CKIND_ZAP: return "zap";
case NDB_CKIND_ZAP_REQUEST: return "zap_request";
case NDB_CKIND_NWC_REQUEST: return "nwc_request";
case NDB_CKIND_NWC_RESPONSE: return "nwc_response";
case NDB_CKIND_HTTP_AUTH: return "http_auth";
case NDB_CKIND_LIST: return "list";
case NDB_CKIND_LONGFORM: return "longform";
case NDB_CKIND_STATUS: return "status";
case NDB_CKIND_COUNT: return "unknown";
}
return "unknown";
}
static inline const char *
ndb_db_name(enum ndb_dbs db)
{
switch (db) {
case NDB_DB_NOTE:
return "note";
case NDB_DB_META:
return "note_metadata";
case NDB_DB_PROFILE:
return "profile";
case NDB_DB_NOTE_ID:
return "note_index";
case NDB_DB_PROFILE_PK:
return "profile_pubkey_index";
case NDB_DB_NDB_META:
return "nostrdb_metadata";
case NDB_DB_PROFILE_SEARCH:
return "profile_search";
case NDB_DB_PROFILE_LAST_FETCH:
return "profile_last_fetch";
case NDB_DBS:
return "count";
}
return "unknown";
}
#endif