mirror of
git://jb55.com/damus
synced 2024-09-18 19:23:49 +00:00
ndb: update nostrdb
This includes the new profile fetched_at logic and reaction stats. When receiving new profiles, nostrdb will record when it was last received in a new database. This database is a mapping from Pubkey to timestamp. You can manually read/write to this table using: ndb_read_last_profile_fetch ndb_write_last_profile_fetch This patch also includes the new reaction counting metadata table. It is not used yet (but reactions are still counted!) Changelog-Added: Added reaction counters to nostrdb Changelog-Added: Record when profile is last fetched in nostrdb
This commit is contained in:
parent
502ceee6d4
commit
1cf898e0b2
@ -16,6 +16,8 @@
|
|||||||
|
|
||||||
#include "bindings/c/profile_json_parser.h"
|
#include "bindings/c/profile_json_parser.h"
|
||||||
#include "bindings/c/profile_builder.h"
|
#include "bindings/c/profile_builder.h"
|
||||||
|
#include "bindings/c/meta_builder.h"
|
||||||
|
#include "bindings/c/meta_reader.h"
|
||||||
#include "bindings/c/profile_verifier.h"
|
#include "bindings/c/profile_verifier.h"
|
||||||
#include "secp256k1.h"
|
#include "secp256k1.h"
|
||||||
#include "secp256k1_ecdh.h"
|
#include "secp256k1_ecdh.h"
|
||||||
@ -155,8 +157,7 @@ static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id,
|
|||||||
key->search[sizeof(key->search) - 1] = '\0';
|
key->search[sizeof(key->search) - 1] = '\0';
|
||||||
}
|
}
|
||||||
|
|
||||||
static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb,
|
static int ndb_write_profile_search_index(struct ndb_txn *txn,
|
||||||
MDB_txn *txn,
|
|
||||||
struct ndb_search_key *index_key,
|
struct ndb_search_key *index_key,
|
||||||
uint64_t profile_key)
|
uint64_t profile_key)
|
||||||
{
|
{
|
||||||
@ -168,7 +169,9 @@ static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb,
|
|||||||
val.mv_data = &profile_key;
|
val.mv_data = &profile_key;
|
||||||
val.mv_size = sizeof(profile_key);
|
val.mv_size = sizeof(profile_key);
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], &key, &val, 0))) {
|
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
|
||||||
|
&key, &val, 0)))
|
||||||
|
{
|
||||||
ndb_debug("ndb_write_profile_search_index failed: %s\n",
|
ndb_debug("ndb_write_profile_search_index failed: %s\n",
|
||||||
mdb_strerror(rc));
|
mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
@ -179,8 +182,7 @@ static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb,
|
|||||||
|
|
||||||
|
|
||||||
// map usernames and display names to profile keys for user searching
|
// map usernames and display names to profile keys for user searching
|
||||||
static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb,
|
static int ndb_write_profile_search_indices(struct ndb_txn *txn,
|
||||||
MDB_txn *txn,
|
|
||||||
struct ndb_note *note,
|
struct ndb_note *note,
|
||||||
uint64_t profile_key,
|
uint64_t profile_key,
|
||||||
void *profile_root)
|
void *profile_root)
|
||||||
@ -199,8 +201,7 @@ static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb,
|
|||||||
if (name) {
|
if (name) {
|
||||||
ndb_make_search_key(&index, note->pubkey, note->created_at,
|
ndb_make_search_key(&index, note->pubkey, note->created_at,
|
||||||
name);
|
name);
|
||||||
if (!ndb_write_profile_search_index(lmdb, txn, &index,
|
if (!ndb_write_profile_search_index(txn, &index, profile_key))
|
||||||
profile_key))
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,19 +212,30 @@ static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb,
|
|||||||
}
|
}
|
||||||
ndb_make_search_key(&index, note->pubkey, note->created_at,
|
ndb_make_search_key(&index, note->pubkey, note->created_at,
|
||||||
display_name);
|
display_name);
|
||||||
if (!ndb_write_profile_search_index(lmdb, txn, &index,
|
if (!ndb_write_profile_search_index(txn, &index, profile_key))
|
||||||
profile_key))
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags)
|
||||||
|
{
|
||||||
|
txn->lmdb = &ndb->lmdb;
|
||||||
|
MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn;
|
||||||
|
return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn)
|
int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn)
|
||||||
{
|
{
|
||||||
txn->ndb = ndb;
|
return _ndb_begin_query(ndb, txn, MDB_RDONLY);
|
||||||
MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn;
|
}
|
||||||
return mdb_txn_begin(ndb->lmdb.env, NULL, 0, mdb_txn) == 0;
|
|
||||||
|
// this should only be used in migrations, etc
|
||||||
|
static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn)
|
||||||
|
{
|
||||||
|
return _ndb_begin_query(ndb, txn, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -243,8 +255,8 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
|
|||||||
size_t len;
|
size_t len;
|
||||||
int count;
|
int count;
|
||||||
|
|
||||||
if (!ndb_begin_query(ndb, &txn)) {
|
if (!ndb_begin_rw_query(ndb, &txn)) {
|
||||||
fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_query failed\n");
|
fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -268,8 +280,7 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ndb_write_profile_search_indices(&ndb->lmdb, txn.mdb_txn,
|
if (!ndb_write_profile_search_indices(&txn, note, profile_key,
|
||||||
note, profile_key,
|
|
||||||
profile_root)) {
|
profile_root)) {
|
||||||
|
|
||||||
fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n");
|
fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n");
|
||||||
@ -282,7 +293,8 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
|
|||||||
fprintf(stderr, "migrated %d profiles to include search indices\n", count);
|
fprintf(stderr, "migrated %d profiles to include search indices\n", count);
|
||||||
|
|
||||||
mdb_cursor_close(cur);
|
mdb_cursor_close(cur);
|
||||||
mdb_txn_commit(txn.mdb_txn);
|
|
||||||
|
ndb_end_query(&txn);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -442,11 +454,15 @@ struct ndb_writer_ndb_meta {
|
|||||||
uint64_t version;
|
uint64_t version;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Used in the writer thread when writing ndb_profile_fetch_record's
|
||||||
|
// kv = pubkey: recor
|
||||||
struct ndb_writer_last_fetch {
|
struct ndb_writer_last_fetch {
|
||||||
unsigned char pubkey[32];
|
unsigned char pubkey[32];
|
||||||
uint64_t fetched_at;
|
uint64_t fetched_at;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// The different types of messages that the writer thread can write to the
|
||||||
|
// database
|
||||||
struct ndb_writer_msg {
|
struct ndb_writer_msg {
|
||||||
enum ndb_writer_msgtype type;
|
enum ndb_writer_msgtype type;
|
||||||
union {
|
union {
|
||||||
@ -457,9 +473,10 @@ struct ndb_writer_msg {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
void ndb_end_query(struct ndb_txn *txn)
|
int ndb_end_query(struct ndb_txn *txn)
|
||||||
{
|
{
|
||||||
mdb_txn_abort(txn->mdb_txn);
|
// this works on read or write queries.
|
||||||
|
return mdb_txn_commit(txn->mdb_txn) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
|
int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
|
||||||
@ -504,18 +521,21 @@ static int ndb_writer_queue_note(struct ndb_writer *writer,
|
|||||||
return prot_queue_push(&writer->inbox, &msg);
|
return prot_queue_push(&writer->inbox, &msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ndb_writer_last_profile_fetch(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
static void ndb_writer_last_profile_fetch(struct ndb_txn *txn,
|
||||||
struct ndb_writer_last_fetch *w)
|
const unsigned char *pubkey,
|
||||||
|
uint64_t fetched_at)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
MDB_val key, val;
|
MDB_val key, val;
|
||||||
|
|
||||||
key.mv_data = (unsigned char*)&w->pubkey;
|
key.mv_data = (unsigned char*)pubkey;
|
||||||
key.mv_size = sizeof(w->pubkey);
|
key.mv_size = 32;
|
||||||
val.mv_data = &w->fetched_at;
|
val.mv_data = &fetched_at;
|
||||||
val.mv_size = sizeof(w->fetched_at);
|
val.mv_size = sizeof(fetched_at);
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &key, &val, 0))) {
|
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH],
|
||||||
|
&key, &val, 0)))
|
||||||
|
{
|
||||||
ndb_debug("write version to ndb_meta failed: %s\n",
|
ndb_debug("write version to ndb_meta failed: %s\n",
|
||||||
mdb_strerror(rc));
|
mdb_strerror(rc));
|
||||||
return;
|
return;
|
||||||
@ -524,6 +544,46 @@ static void ndb_writer_last_profile_fetch(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
//fprintf(stderr, "writing version %" PRIu64 "\n", version);
|
//fprintf(stderr, "writing version %" PRIu64 "\n", version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// We just received a profile that we haven't processed yet, but it could
|
||||||
|
// be an older one! Make sure we only write last fetched profile if it's a new
|
||||||
|
// one
|
||||||
|
//
|
||||||
|
// To do this, we first check the latest profile in the database. If the
|
||||||
|
// created_date for this profile note is newer, then we write a
|
||||||
|
// last_profile_fetch record, otherwise we do not.
|
||||||
|
//
|
||||||
|
// WARNING: This function is only valid when called from the writer thread
|
||||||
|
static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn,
|
||||||
|
struct ndb_note *note)
|
||||||
|
{
|
||||||
|
size_t len;
|
||||||
|
uint64_t profile_key, note_key;
|
||||||
|
void *root;
|
||||||
|
struct ndb_note *last_profile;
|
||||||
|
NdbProfileRecord_table_t record;
|
||||||
|
|
||||||
|
if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) {
|
||||||
|
record = NdbProfileRecord_as_root(root);
|
||||||
|
note_key = NdbProfileRecord_note_key(record);
|
||||||
|
last_profile = ndb_get_note_by_key(txn, note_key, &len);
|
||||||
|
if (last_profile == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// found profile, let's see if it's newer than ours
|
||||||
|
if (note->created_at > last_profile->created_at) {
|
||||||
|
// this is a new profile note, record last fetched time
|
||||||
|
ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// couldn't fetch profile. record last fetched time
|
||||||
|
ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
|
int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
|
||||||
uint64_t fetched_at)
|
uint64_t fetched_at)
|
||||||
{
|
{
|
||||||
@ -536,8 +596,8 @@ int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// get some value based on a clustered id key
|
// get some value based on a clustered id key
|
||||||
int ndb_get_tsid(MDB_txn *txn, struct ndb_lmdb *lmdb, enum ndb_dbs db,
|
int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id,
|
||||||
const unsigned char *id, MDB_val *val)
|
MDB_val *val)
|
||||||
{
|
{
|
||||||
MDB_val k, v;
|
MDB_val k, v;
|
||||||
MDB_cursor *cur;
|
MDB_cursor *cur;
|
||||||
@ -550,7 +610,7 @@ int ndb_get_tsid(MDB_txn *txn, struct ndb_lmdb *lmdb, enum ndb_dbs db,
|
|||||||
k.mv_data = &tsid;
|
k.mv_data = &tsid;
|
||||||
k.mv_size = sizeof(tsid);
|
k.mv_size = sizeof(tsid);
|
||||||
|
|
||||||
mdb_cursor_open(txn, lmdb->dbs[db], &cur);
|
mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur);
|
||||||
|
|
||||||
// Position cursor at the next key greater than or equal to the specified key
|
// Position cursor at the next key greater than or equal to the specified key
|
||||||
if (mdb_cursor_get(cur, &k, &v, MDB_SET_RANGE)) {
|
if (mdb_cursor_get(cur, &k, &v, MDB_SET_RANGE)) {
|
||||||
@ -582,7 +642,7 @@ static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key,
|
|||||||
k.mv_data = &key;
|
k.mv_data = &key;
|
||||||
k.mv_size = sizeof(key);
|
k.mv_size = sizeof(key);
|
||||||
|
|
||||||
if (mdb_get(txn->mdb_txn, txn->ndb->lmdb.dbs[store], &k, &v)) {
|
if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
|
||||||
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
|
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -602,7 +662,7 @@ static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind,
|
|||||||
if (len)
|
if (len)
|
||||||
*len = 0;
|
*len = 0;
|
||||||
|
|
||||||
if (!ndb_get_tsid(txn->mdb_txn, &txn->ndb->lmdb, ind, pk, &k)) {
|
if (!ndb_get_tsid(txn, ind, pk, &k)) {
|
||||||
//ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
|
//ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -610,7 +670,7 @@ static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind,
|
|||||||
if (primkey)
|
if (primkey)
|
||||||
*primkey = *(uint64_t*)k.mv_data;
|
*primkey = *(uint64_t*)k.mv_data;
|
||||||
|
|
||||||
if (mdb_get(txn->mdb_txn, txn->ndb->lmdb.dbs[store], &k, &v)) {
|
if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
|
||||||
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
|
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -638,7 +698,7 @@ static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn,
|
|||||||
{
|
{
|
||||||
MDB_val k;
|
MDB_val k;
|
||||||
|
|
||||||
if (!ndb_get_tsid(txn->mdb_txn, &txn->ndb->lmdb, db, id, &k))
|
if (!ndb_get_tsid(txn, db, id, &k))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
return *(uint32_t*)k.mv_data;
|
return *(uint32_t*)k.mv_data;
|
||||||
@ -664,37 +724,52 @@ void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len)
|
|||||||
return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len);
|
return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key)
|
uint64_t
|
||||||
|
ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey)
|
||||||
{
|
{
|
||||||
size_t len;
|
MDB_val k, v;
|
||||||
void *ret = ndb_lookup_by_key(txn, profile_key, NDB_DB_PROFILE_LAST_FETCH, &len);
|
|
||||||
if (ret == NULL)
|
k.mv_data = (unsigned char*)pubkey;
|
||||||
|
k.mv_size = 32;
|
||||||
|
|
||||||
|
if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) {
|
||||||
|
//ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n");
|
||||||
return 0;
|
return 0;
|
||||||
assert(len == sizeof(uint64_t));
|
}
|
||||||
return *((uint64_t*)ret);
|
|
||||||
|
return *((uint64_t*)v.mv_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int ndb_has_note(MDB_txn *txn, struct ndb_lmdb *lmdb, const unsigned char *id)
|
static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id)
|
||||||
{
|
{
|
||||||
MDB_val val;
|
MDB_val val;
|
||||||
|
|
||||||
if (!ndb_get_tsid(txn, lmdb, NDB_DB_NOTE_ID, id, &val))
|
if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb,
|
||||||
|
MDB_txn *mdb_txn)
|
||||||
|
{
|
||||||
|
txn->lmdb = lmdb;
|
||||||
|
txn->mdb_txn = mdb_txn;
|
||||||
|
}
|
||||||
|
|
||||||
static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid)
|
static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid)
|
||||||
{
|
{
|
||||||
unsigned char id[32];
|
unsigned char id[32];
|
||||||
struct ndb_ingest_controller *c = data;
|
struct ndb_ingest_controller *c = data;
|
||||||
|
struct ndb_txn txn;
|
||||||
|
|
||||||
hex_decode(hexid, 64, id, sizeof(id));
|
hex_decode(hexid, 64, id, sizeof(id));
|
||||||
|
|
||||||
// let's see if we already have it
|
// let's see if we already have it
|
||||||
|
|
||||||
if (!ndb_has_note(c->read_txn, c->lmdb, id))
|
ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn);
|
||||||
|
if (!ndb_has_note(&txn, id))
|
||||||
return NDB_IDRES_CONT;
|
return NDB_IDRES_CONT;
|
||||||
|
|
||||||
return NDB_IDRES_STOP;
|
return NDB_IDRES_STOP;
|
||||||
@ -785,8 +860,12 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
|
|||||||
size_t note_size,
|
size_t note_size,
|
||||||
struct ndb_writer_msg *out)
|
struct ndb_writer_msg *out)
|
||||||
{
|
{
|
||||||
|
//printf("ndb_ingester_process_note ");
|
||||||
|
//print_hex(note->id, 32);
|
||||||
|
//printf("\n");
|
||||||
|
|
||||||
// Verify! If it's an invalid note we don't need to
|
// Verify! If it's an invalid note we don't need to
|
||||||
// bothter writing it to the database
|
// bother writing it to the database
|
||||||
if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
|
if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
|
||||||
ndb_debug("signature verification failed\n");
|
ndb_debug("signature verification failed\n");
|
||||||
return 0;
|
return 0;
|
||||||
@ -957,8 +1036,8 @@ int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const cha
|
|||||||
k.mv_size = sizeof(s);
|
k.mv_size = sizeof(s);
|
||||||
|
|
||||||
if ((rc = mdb_cursor_open(txn->mdb_txn,
|
if ((rc = mdb_cursor_open(txn->mdb_txn,
|
||||||
txn->ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH],
|
txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
|
||||||
cursor))) {
|
cursor))) {
|
||||||
printf("search_profile: cursor opened failed: %s\n",
|
printf("search_profile: cursor opened failed: %s\n",
|
||||||
mdb_strerror(rc));
|
mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
@ -1040,7 +1119,7 @@ static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
static int ndb_write_profile(struct ndb_txn *txn,
|
||||||
struct ndb_writer_profile *profile,
|
struct ndb_writer_profile *profile,
|
||||||
uint64_t note_key)
|
uint64_t note_key)
|
||||||
{
|
{
|
||||||
@ -1071,11 +1150,11 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
//assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0);
|
//assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0);
|
||||||
|
|
||||||
// get dbs
|
// get dbs
|
||||||
profile_db = lmdb->dbs[NDB_DB_PROFILE];
|
profile_db = txn->lmdb->dbs[NDB_DB_PROFILE];
|
||||||
pk_db = lmdb->dbs[NDB_DB_PROFILE_PK];
|
pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK];
|
||||||
|
|
||||||
// get new key
|
// get new key
|
||||||
profile_key = ndb_get_last_key(txn, profile_db) + 1;
|
profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1;
|
||||||
|
|
||||||
// write profile to profile store
|
// write profile to profile store
|
||||||
key.mv_data = &profile_key;
|
key.mv_data = &profile_key;
|
||||||
@ -1084,7 +1163,7 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
val.mv_size = flatbuf_len;
|
val.mv_size = flatbuf_len;
|
||||||
//ndb_debug("profile_len %ld\n", profile->profile_len);
|
//ndb_debug("profile_len %ld\n", profile->profile_len);
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, profile_db, &key, &val, 0))) {
|
if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) {
|
||||||
ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc));
|
ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1097,14 +1176,20 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
val.mv_data = &profile_key;
|
val.mv_data = &profile_key;
|
||||||
val.mv_size = sizeof(profile_key);
|
val.mv_size = sizeof(profile_key);
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, pk_db, &key, &val, 0))) {
|
// write last fetched record
|
||||||
|
if (!ndb_maybe_write_last_profile_fetch(txn, note)) {
|
||||||
|
ndb_debug("failed to write last profile fetched record\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) {
|
||||||
ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n",
|
ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n",
|
||||||
profile_key, mdb_strerror(rc));
|
profile_key, mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// write name, display_name profile search indices
|
// write name, display_name profile search indices
|
||||||
if (!ndb_write_profile_search_indices(lmdb, txn, note, profile_key,
|
if (!ndb_write_profile_search_indices(txn, note, profile_key,
|
||||||
flatbuf)) {
|
flatbuf)) {
|
||||||
ndb_debug("failed to write profile search indices\n");
|
ndb_debug("failed to write profile search indices\n");
|
||||||
return 0;
|
return 0;
|
||||||
@ -1113,7 +1198,127 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
// find the last id tag in a note (e, p, etc)
|
||||||
|
static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type)
|
||||||
|
{
|
||||||
|
unsigned char *last = NULL;
|
||||||
|
struct ndb_iterator iter;
|
||||||
|
struct ndb_str str;
|
||||||
|
|
||||||
|
// get the liked event id (last id)
|
||||||
|
ndb_tags_iterate_start(note, &iter);
|
||||||
|
|
||||||
|
while (ndb_tags_iterate_next(&iter)) {
|
||||||
|
if (iter.tag->count < 2)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
str = ndb_note_str(note, &iter.tag->strs[0]);
|
||||||
|
|
||||||
|
// assign liked to the last e tag
|
||||||
|
if (str.flag == NDB_PACKED_STR && str.str[0] == type) {
|
||||||
|
str = ndb_note_str(note, &iter.tag->strs[1]);
|
||||||
|
if (str.flag == NDB_PACKED_ID)
|
||||||
|
last = str.id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return last;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len)
|
||||||
|
{
|
||||||
|
MDB_val k, v;
|
||||||
|
|
||||||
|
k.mv_data = (unsigned char*)id;
|
||||||
|
k.mv_size = 32;
|
||||||
|
|
||||||
|
if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) {
|
||||||
|
ndb_debug("ndb_get_note_meta: mdb_get note failed\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len)
|
||||||
|
*len = v.mv_size;
|
||||||
|
|
||||||
|
return v.mv_data;
|
||||||
|
}
|
||||||
|
|
||||||
|
// When receiving a reaction note, look for the liked id and increase the
|
||||||
|
// reaction counter in the note metadata database
|
||||||
|
//
|
||||||
|
// TODO: I found some bugs when implementing this feature. If the same note id
|
||||||
|
// is processed multiple times in the same ingestion block, then it will count
|
||||||
|
// the like twice. This is because it hasn't been written to the DB yet and the
|
||||||
|
// ingestor doesn't know about notes that are being processed at the same time.
|
||||||
|
// One fix for this is to maintain a hashtable in the ingestor and make sure
|
||||||
|
// the same note is not processed twice.
|
||||||
|
//
|
||||||
|
// I'm not sure how common this would be, so I'm not going to worry about it
|
||||||
|
// for now, but it's something to keep in mind.
|
||||||
|
static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note)
|
||||||
|
{
|
||||||
|
size_t len;
|
||||||
|
void *root;
|
||||||
|
int reactions, rc;
|
||||||
|
MDB_val key, val;
|
||||||
|
NdbEventMeta_table_t meta;
|
||||||
|
unsigned char *liked = ndb_note_last_id_tag(note, 'e');
|
||||||
|
|
||||||
|
if (liked == NULL)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
root = ndb_get_note_meta(txn, liked, &len);
|
||||||
|
|
||||||
|
flatcc_builder_t builder;
|
||||||
|
flatcc_builder_init(&builder);
|
||||||
|
NdbEventMeta_start_as_root(&builder);
|
||||||
|
|
||||||
|
// no meta record, let's make one
|
||||||
|
if (root == NULL) {
|
||||||
|
NdbEventMeta_reactions_add(&builder, 1);
|
||||||
|
} else {
|
||||||
|
// clone existing and add to it
|
||||||
|
meta = NdbEventMeta_as_root(root);
|
||||||
|
|
||||||
|
reactions = NdbEventMeta_reactions_get(meta);
|
||||||
|
NdbEventMeta_clone(&builder, meta);
|
||||||
|
NdbEventMeta_reactions_add(&builder, reactions + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
NdbProfileRecord_end_as_root(&builder);
|
||||||
|
root = flatcc_builder_finalize_aligned_buffer(&builder, &len);
|
||||||
|
assert(((uint64_t)root % 8) == 0);
|
||||||
|
|
||||||
|
if (root == NULL) {
|
||||||
|
ndb_debug("failed to create note metadata record\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// metadata is keyed on id because we want to collect stats regardless
|
||||||
|
// if we have the note yet or not
|
||||||
|
key.mv_data = liked;
|
||||||
|
key.mv_size = 32;
|
||||||
|
|
||||||
|
val.mv_data = root;
|
||||||
|
val.mv_size = len;
|
||||||
|
|
||||||
|
// write the new meta record
|
||||||
|
//ndb_debug("writing stats record for ");
|
||||||
|
//print_hex(liked, 32);
|
||||||
|
//ndb_debug("\n");
|
||||||
|
|
||||||
|
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) {
|
||||||
|
ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(root);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static uint64_t ndb_write_note(struct ndb_txn *txn,
|
||||||
struct ndb_writer_note *note)
|
struct ndb_writer_note *note)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
@ -1123,11 +1328,11 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
MDB_val key, val;
|
MDB_val key, val;
|
||||||
|
|
||||||
// get dbs
|
// get dbs
|
||||||
note_db = lmdb->dbs[NDB_DB_NOTE];
|
note_db = txn->lmdb->dbs[NDB_DB_NOTE];
|
||||||
id_db = lmdb->dbs[NDB_DB_NOTE_ID];
|
id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
|
||||||
|
|
||||||
// get new key
|
// get new key
|
||||||
note_key = ndb_get_last_key(txn, note_db) + 1;
|
note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1;
|
||||||
|
|
||||||
// write note to event store
|
// write note to event store
|
||||||
key.mv_data = ¬e_key;
|
key.mv_data = ¬e_key;
|
||||||
@ -1135,7 +1340,7 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
val.mv_data = note->note;
|
val.mv_data = note->note;
|
||||||
val.mv_size = note->note_len;
|
val.mv_size = note->note_len;
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, note_db, &key, &val, 0))) {
|
if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) {
|
||||||
ndb_debug("write note to db failed: %s\n", mdb_strerror(rc));
|
ndb_debug("write note to db failed: %s\n", mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1148,17 +1353,21 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
val.mv_data = ¬e_key;
|
val.mv_data = ¬e_key;
|
||||||
val.mv_size = sizeof(note_key);
|
val.mv_size = sizeof(note_key);
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, id_db, &key, &val, 0))) {
|
if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) {
|
||||||
ndb_debug("write note id index to db failed: %s\n",
|
ndb_debug("write note id index to db failed: %s\n",
|
||||||
mdb_strerror(rc));
|
mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (note->note->kind == 7) {
|
||||||
|
ndb_write_reaction_stats(txn, note->note);
|
||||||
|
}
|
||||||
|
|
||||||
return note_key;
|
return note_key;
|
||||||
}
|
}
|
||||||
|
|
||||||
// only to be called from the writer thread
|
// only to be called from the writer thread
|
||||||
static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t version)
|
static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
MDB_val key, val;
|
MDB_val key, val;
|
||||||
@ -1171,7 +1380,7 @@ static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t vers
|
|||||||
val.mv_data = &version;
|
val.mv_data = &version;
|
||||||
val.mv_size = sizeof(version);
|
val.mv_size = sizeof(version);
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
|
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
|
||||||
ndb_debug("write version to ndb_meta failed: %s\n",
|
ndb_debug("write version to ndb_meta failed: %s\n",
|
||||||
mdb_strerror(rc));
|
mdb_strerror(rc));
|
||||||
return;
|
return;
|
||||||
@ -1186,11 +1395,13 @@ static void *ndb_writer_thread(void *data)
|
|||||||
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
||||||
int i, popped, done, any_note;
|
int i, popped, done, any_note;
|
||||||
uint64_t note_nkey;
|
uint64_t note_nkey;
|
||||||
MDB_txn *txn;
|
MDB_txn *mdb_txn = NULL;
|
||||||
|
struct ndb_txn txn;
|
||||||
|
ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn);
|
||||||
|
|
||||||
done = 0;
|
done = 0;
|
||||||
while (!done) {
|
while (!done) {
|
||||||
txn = NULL;
|
txn.mdb_txn = NULL;
|
||||||
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
|
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
|
||||||
//ndb_debug("writer popped %d items\n", popped);
|
//ndb_debug("writer popped %d items\n", popped);
|
||||||
|
|
||||||
@ -1206,7 +1417,7 @@ static void *ndb_writer_thread(void *data)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (any_note && mdb_txn_begin(writer->lmdb->env, NULL, 0, &txn))
|
if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn))
|
||||||
{
|
{
|
||||||
fprintf(stderr, "writer thread txn_begin failed");
|
fprintf(stderr, "writer thread txn_begin failed");
|
||||||
// should definitely not happen unless DB is full
|
// should definitely not happen unless DB is full
|
||||||
@ -1224,33 +1435,37 @@ static void *ndb_writer_thread(void *data)
|
|||||||
continue;
|
continue;
|
||||||
case NDB_WRITER_PROFILE:
|
case NDB_WRITER_PROFILE:
|
||||||
note_nkey =
|
note_nkey =
|
||||||
ndb_write_note(writer->lmdb, txn, &msg->note);
|
ndb_write_note(&txn, &msg->note);
|
||||||
if (msg->profile.record.builder) {
|
if (msg->profile.record.builder) {
|
||||||
// only write if parsing didn't fail
|
// only write if parsing didn't fail
|
||||||
ndb_write_profile(writer->lmdb, txn,
|
ndb_write_profile(&txn, &msg->profile,
|
||||||
&msg->profile,
|
|
||||||
note_nkey);
|
note_nkey);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case NDB_WRITER_NOTE:
|
case NDB_WRITER_NOTE:
|
||||||
ndb_write_note(writer->lmdb, txn, &msg->note);
|
ndb_write_note(&txn, &msg->note);
|
||||||
|
//printf("wrote note ");
|
||||||
|
//print_hex(msg->note.note->id, 32);
|
||||||
|
//printf("\n");
|
||||||
break;
|
break;
|
||||||
case NDB_WRITER_DBMETA:
|
case NDB_WRITER_DBMETA:
|
||||||
ndb_write_version(writer->lmdb, txn, msg->ndb_meta.version);
|
ndb_write_version(&txn, msg->ndb_meta.version);
|
||||||
break;
|
break;
|
||||||
case NDB_WRITER_PROFILE_LAST_FETCH:
|
case NDB_WRITER_PROFILE_LAST_FETCH:
|
||||||
ndb_writer_last_profile_fetch(writer->lmdb, txn, &msg->last_fetch);
|
ndb_writer_last_profile_fetch(&txn,
|
||||||
|
msg->last_fetch.pubkey,
|
||||||
|
msg->last_fetch.fetched_at
|
||||||
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// commit writes
|
// commit writes
|
||||||
if (any_note && mdb_txn_commit(txn)) {
|
if (any_note && !ndb_end_query(&txn)) {
|
||||||
fprintf(stderr, "writer thread txn commit failed");
|
fprintf(stderr, "writer thread txn commit failed");
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// free notes
|
// free notes
|
||||||
for (i = 0; i < popped; i++) {
|
for (i = 0; i < popped; i++) {
|
||||||
msg = &msgs[i];
|
msg = &msgs[i];
|
||||||
@ -1459,7 +1674,7 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map
|
|||||||
}
|
}
|
||||||
|
|
||||||
// note metadata db
|
// note metadata db
|
||||||
if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_META]))) {
|
if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) {
|
||||||
fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc);
|
fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ struct ndb_search {
|
|||||||
|
|
||||||
// required to keep a read
|
// required to keep a read
|
||||||
struct ndb_txn {
|
struct ndb_txn {
|
||||||
struct ndb *ndb;
|
struct ndb_lmdb *lmdb;
|
||||||
void *mdb_txn;
|
void *mdb_txn;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -196,15 +196,16 @@ int ndb_begin_query(struct ndb *, struct ndb_txn *);
|
|||||||
int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query);
|
int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query);
|
||||||
int ndb_search_profile_next(struct ndb_search *search);
|
int ndb_search_profile_next(struct ndb_search *search);
|
||||||
void ndb_search_profile_end(struct ndb_search *search);
|
void ndb_search_profile_end(struct ndb_search *search);
|
||||||
void ndb_end_query(struct ndb_txn *);
|
int ndb_end_query(struct ndb_txn *);
|
||||||
int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, uint64_t fetched_at);
|
int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, uint64_t fetched_at);
|
||||||
uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key);
|
uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey);
|
||||||
void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pubkey, size_t *len, uint64_t *primkey);
|
void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pubkey, size_t *len, uint64_t *primkey);
|
||||||
void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len);
|
void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len);
|
||||||
uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id);
|
uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id);
|
||||||
uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id);
|
uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id);
|
||||||
struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *primkey);
|
struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *primkey);
|
||||||
struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len);
|
struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len);
|
||||||
|
void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len);
|
||||||
void ndb_destroy(struct ndb *);
|
void ndb_destroy(struct ndb *);
|
||||||
|
|
||||||
// BUILDER
|
// BUILDER
|
||||||
|
Loading…
Reference in New Issue
Block a user