mirror of
git://jb55.com/damus
synced 2024-10-04 19:00:42 +00:00
ndb: update nostrdb, fix alignment issues
This commit is contained in:
parent
2f60888fb1
commit
882f6e2534
@ -15,6 +15,8 @@
|
|||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
#include "bindings/c/profile_json_parser.h"
|
#include "bindings/c/profile_json_parser.h"
|
||||||
|
#include "bindings/c/profile_builder.h"
|
||||||
|
#include "bindings/c/profile_verifier.h"
|
||||||
#include "secp256k1.h"
|
#include "secp256k1.h"
|
||||||
#include "secp256k1_ecdh.h"
|
#include "secp256k1_ecdh.h"
|
||||||
#include "secp256k1_schnorrsig.h"
|
#include "secp256k1_schnorrsig.h"
|
||||||
@ -35,6 +37,10 @@ static const int DEFAULT_QUEUE_SIZE = 1000000;
|
|||||||
#define NDB_PARSED_TAGS (1 << 6)
|
#define NDB_PARSED_TAGS (1 << 6)
|
||||||
#define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS)
|
#define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS)
|
||||||
|
|
||||||
|
struct ndb_profile_record_builder {
|
||||||
|
flatcc_builder_t *builder;
|
||||||
|
void *flatbuf;
|
||||||
|
};
|
||||||
|
|
||||||
// controls whether to continue or stop the json parser
|
// controls whether to continue or stop the json parser
|
||||||
enum ndb_idres {
|
enum ndb_idres {
|
||||||
@ -183,8 +189,7 @@ struct ndb_writer_note {
|
|||||||
|
|
||||||
struct ndb_writer_profile {
|
struct ndb_writer_profile {
|
||||||
struct ndb_writer_note note;
|
struct ndb_writer_note note;
|
||||||
void *profile_flatbuf;
|
struct ndb_profile_record_builder record;
|
||||||
size_t profile_len;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ndb_ingester_msg {
|
struct ndb_ingester_msg {
|
||||||
@ -275,6 +280,32 @@ cleanup:
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *ndb_lookup_by_key(struct ndb *ndb, uint64_t key,
|
||||||
|
enum ndb_dbs store, size_t *len)
|
||||||
|
{
|
||||||
|
MDB_val k, v;
|
||||||
|
MDB_txn *txn;
|
||||||
|
|
||||||
|
k.mv_data = &key;
|
||||||
|
k.mv_size = sizeof(key);
|
||||||
|
|
||||||
|
if (mdb_txn_begin(ndb->lmdb.env, 0, 0, &txn)) {
|
||||||
|
ndb_debug("ndb_get_note_by_id: mdb_txn_begin failed\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mdb_get(txn, ndb->lmdb.dbs[store], &k, &v)) {
|
||||||
|
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
|
||||||
|
mdb_txn_abort(txn);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len)
|
||||||
|
*len = v.mv_size;
|
||||||
|
|
||||||
|
return v.mv_data;
|
||||||
|
}
|
||||||
|
|
||||||
static void *ndb_lookup_tsid(struct ndb *ndb, enum ndb_dbs ind,
|
static void *ndb_lookup_tsid(struct ndb *ndb, enum ndb_dbs ind,
|
||||||
enum ndb_dbs store, const unsigned char *pk,
|
enum ndb_dbs store, const unsigned char *pk,
|
||||||
size_t *len)
|
size_t *len)
|
||||||
@ -291,7 +322,7 @@ static void *ndb_lookup_tsid(struct ndb *ndb, enum ndb_dbs ind,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!ndb_get_tsid(txn, &ndb->lmdb, ind, pk, &k)) {
|
if (!ndb_get_tsid(txn, &ndb->lmdb, ind, pk, &k)) {
|
||||||
ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
|
//ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -301,6 +332,7 @@ static void *ndb_lookup_tsid(struct ndb *ndb, enum ndb_dbs ind,
|
|||||||
}
|
}
|
||||||
|
|
||||||
res = v.mv_data;
|
res = v.mv_data;
|
||||||
|
assert(((uint64_t)res % 4) == 0);
|
||||||
if (len)
|
if (len)
|
||||||
*len = v.mv_size;
|
*len = v.mv_size;
|
||||||
cleanup:
|
cleanup:
|
||||||
@ -318,6 +350,11 @@ struct ndb_note *ndb_get_note_by_id(struct ndb *ndb, const unsigned char *id, si
|
|||||||
return ndb_lookup_tsid(ndb, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len);
|
return ndb_lookup_tsid(ndb, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ndb_note *ndb_get_note_by_key(struct ndb *ndb, uint64_t key, size_t *len)
|
||||||
|
{
|
||||||
|
return ndb_lookup_by_key(ndb, key, NDB_DB_NOTE, len);
|
||||||
|
}
|
||||||
|
|
||||||
static int ndb_has_note(MDB_txn *txn, struct ndb_lmdb *lmdb, const unsigned char *id)
|
static int ndb_has_note(MDB_txn *txn, struct ndb_lmdb *lmdb, const unsigned char *id)
|
||||||
{
|
{
|
||||||
MDB_val val;
|
MDB_val val;
|
||||||
@ -343,30 +380,83 @@ static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid
|
|||||||
return NDB_IDRES_STOP;
|
return NDB_IDRES_STOP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int ndbprofile_parse_json(flatcc_builder_t *B,
|
||||||
|
const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile)
|
||||||
|
{
|
||||||
|
flatcc_json_parser_t parser, *ctx = &parser;
|
||||||
|
flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags);
|
||||||
|
|
||||||
static int ndb_process_profile_note(struct ndb_note *note, void **profile,
|
if (flatcc_builder_start_buffer(B, 0, 0, 0))
|
||||||
size_t *profile_len)
|
return 0;
|
||||||
|
|
||||||
|
NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile);
|
||||||
|
if (ctx->error)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (!flatcc_builder_end_buffer(B, *profile))
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
ctx->end_loc = buf;
|
||||||
|
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b)
|
||||||
|
{
|
||||||
|
b->builder = malloc(sizeof(*b->builder));
|
||||||
|
b->flatbuf = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b)
|
||||||
|
{
|
||||||
|
if (b->builder)
|
||||||
|
free(b->builder);
|
||||||
|
if (b->flatbuf)
|
||||||
|
free(b->flatbuf);
|
||||||
|
|
||||||
|
b->builder = NULL;
|
||||||
|
b->flatbuf = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int ndb_process_profile_note(struct ndb_note *note,
|
||||||
|
struct ndb_profile_record_builder *profile)
|
||||||
{
|
{
|
||||||
int res;
|
int res;
|
||||||
|
|
||||||
flatcc_builder_t builder;
|
NdbProfile_ref_t profile_table;
|
||||||
flatcc_json_parser_t json_parser;
|
flatcc_builder_t *builder;
|
||||||
|
|
||||||
flatcc_builder_init(&builder);
|
ndb_profile_record_builder_init(profile);
|
||||||
|
builder = profile->builder;
|
||||||
|
flatcc_builder_init(builder);
|
||||||
|
|
||||||
|
NdbProfileRecord_start_as_root(builder);
|
||||||
|
|
||||||
//printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note));
|
//printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note));
|
||||||
res = profile_parse_json(&builder, &json_parser,
|
if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note),
|
||||||
ndb_note_content(note),
|
note->content_length,
|
||||||
note->content_length,
|
flatcc_json_parser_f_skip_unknown,
|
||||||
flatcc_json_parser_f_skip_unknown);
|
&profile_table)))
|
||||||
|
{
|
||||||
if (res != 0) {
|
|
||||||
ndb_debug("profile_parse_json failed %d '%.*s'\n", res,
|
ndb_debug("profile_parse_json failed %d '%.*s'\n", res,
|
||||||
note->content_length, ndb_note_content(note));
|
note->content_length, ndb_note_content(note));
|
||||||
|
ndb_profile_record_builder_free(profile);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
*profile = flatcc_builder_finalize_aligned_buffer(&builder, profile_len);
|
uint64_t received_at = time(NULL);
|
||||||
|
const char *lnurl = "fixme";
|
||||||
|
|
||||||
|
NdbProfileRecord_profile_add(builder, profile_table);
|
||||||
|
NdbProfileRecord_received_at_add(builder, received_at);
|
||||||
|
|
||||||
|
flatcc_builder_ref_t lnurl_off;
|
||||||
|
lnurl_off = flatcc_builder_create_string_str(builder, lnurl);
|
||||||
|
|
||||||
|
NdbProfileRecord_lnurl_add(builder, lnurl_off);
|
||||||
|
|
||||||
|
//*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -382,8 +472,8 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
|
|||||||
struct ndb_note *note;
|
struct ndb_note *note;
|
||||||
struct ndb_ingest_controller controller;
|
struct ndb_ingest_controller controller;
|
||||||
struct ndb_id_cb cb;
|
struct ndb_id_cb cb;
|
||||||
void *buf, *flatbuf;
|
void *buf;
|
||||||
size_t bufsize, note_size, profile_len;
|
size_t bufsize, note_size;
|
||||||
|
|
||||||
// we will use this to check if we already have it in the DB during
|
// we will use this to check if we already have it in the DB during
|
||||||
// ID parsing
|
// ID parsing
|
||||||
@ -436,14 +526,17 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
|
|||||||
// we didn't find anything. let's send it
|
// we didn't find anything. let's send it
|
||||||
// to the writer thread
|
// to the writer thread
|
||||||
note = realloc(note, note_size);
|
note = realloc(note, note_size);
|
||||||
|
assert(((uint64_t)note % 4) == 0);
|
||||||
|
|
||||||
|
if (note->kind == 0) {
|
||||||
|
struct ndb_profile_record_builder *b =
|
||||||
|
&out->profile.record;
|
||||||
|
|
||||||
|
ndb_process_profile_note(note, b);
|
||||||
|
|
||||||
if (note->kind == 0 &&
|
|
||||||
ndb_process_profile_note(note, &flatbuf, &profile_len)) {
|
|
||||||
out->type = NDB_WRITER_PROFILE;
|
out->type = NDB_WRITER_PROFILE;
|
||||||
out->profile.note.note = note;
|
out->profile.note.note = note;
|
||||||
out->profile.note.note_len = note_size;
|
out->profile.note.note_len = note_size;
|
||||||
out->profile.profile_flatbuf = flatbuf;
|
|
||||||
out->profile.profile_len = profile_len;
|
|
||||||
} else {
|
} else {
|
||||||
out->type = NDB_WRITER_NOTE;
|
out->type = NDB_WRITER_NOTE;
|
||||||
out->note.note = note;
|
out->note.note = note;
|
||||||
@ -482,11 +575,14 @@ static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
||||||
struct ndb_writer_profile *profile)
|
struct ndb_writer_profile *profile,
|
||||||
|
uint64_t note_key)
|
||||||
{
|
{
|
||||||
uint64_t profile_key;
|
uint64_t profile_key;
|
||||||
struct ndb_tsid tsid;
|
struct ndb_tsid tsid;
|
||||||
struct ndb_note *note;
|
struct ndb_note *note;
|
||||||
|
void *flatbuf;
|
||||||
|
size_t flatbuf_len;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
MDB_val key, val;
|
MDB_val key, val;
|
||||||
@ -494,6 +590,20 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
|
|
||||||
note = profile->note.note;
|
note = profile->note.note;
|
||||||
|
|
||||||
|
// add note_key to profile record
|
||||||
|
NdbProfileRecord_note_key_add(profile->record.builder, note_key);
|
||||||
|
NdbProfileRecord_end_as_root(profile->record.builder);
|
||||||
|
|
||||||
|
flatbuf = profile->record.flatbuf =
|
||||||
|
flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len);
|
||||||
|
|
||||||
|
assert(((uint64_t)flatbuf % 8) == 0);
|
||||||
|
|
||||||
|
// TODO: this may not be safe!?
|
||||||
|
flatbuf_len = (flatbuf_len + 7) & ~7;
|
||||||
|
|
||||||
|
//assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0);
|
||||||
|
|
||||||
// get dbs
|
// get dbs
|
||||||
profile_db = lmdb->dbs[NDB_DB_PROFILE];
|
profile_db = lmdb->dbs[NDB_DB_PROFILE];
|
||||||
pk_db = lmdb->dbs[NDB_DB_PROFILE_PK];
|
pk_db = lmdb->dbs[NDB_DB_PROFILE_PK];
|
||||||
@ -504,8 +614,8 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
|
|||||||
// write profile to profile store
|
// write profile to profile store
|
||||||
key.mv_data = &profile_key;
|
key.mv_data = &profile_key;
|
||||||
key.mv_size = sizeof(profile_key);
|
key.mv_size = sizeof(profile_key);
|
||||||
val.mv_data = profile->profile_flatbuf + 4;
|
val.mv_data = flatbuf;
|
||||||
val.mv_size = profile->profile_len - 4;
|
val.mv_size = flatbuf_len;
|
||||||
//ndb_debug("profile_len %ld\n", profile->profile_len);
|
//ndb_debug("profile_len %ld\n", profile->profile_len);
|
||||||
|
|
||||||
if ((rc = mdb_put(txn, profile_db, &key, &val, 0))) {
|
if ((rc = mdb_put(txn, profile_db, &key, &val, 0))) {
|
||||||
@ -579,13 +689,14 @@ static void *ndb_writer_thread(void *data)
|
|||||||
struct ndb_writer *writer = data;
|
struct ndb_writer *writer = data;
|
||||||
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
||||||
int i, popped, done, any_note;
|
int i, popped, done, any_note;
|
||||||
|
uint64_t note_nkey;
|
||||||
MDB_txn *txn;
|
MDB_txn *txn;
|
||||||
|
|
||||||
done = 0;
|
done = 0;
|
||||||
while (!done) {
|
while (!done) {
|
||||||
txn = NULL;
|
txn = NULL;
|
||||||
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
|
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
|
||||||
ndb_debug("writer popped %d items\n", popped);
|
//ndb_debug("writer popped %d items\n", popped);
|
||||||
|
|
||||||
any_note = 0;
|
any_note = 0;
|
||||||
for (i = 0 ; i < popped; i++) {
|
for (i = 0 ; i < popped; i++) {
|
||||||
@ -614,9 +725,14 @@ static void *ndb_writer_thread(void *data)
|
|||||||
done = 1;
|
done = 1;
|
||||||
continue;
|
continue;
|
||||||
case NDB_WRITER_PROFILE:
|
case NDB_WRITER_PROFILE:
|
||||||
ndb_write_note(writer->lmdb, txn, &msg->note);
|
note_nkey =
|
||||||
// TODO: save note_key with profile
|
ndb_write_note(writer->lmdb, txn, &msg->note);
|
||||||
ndb_write_profile(writer->lmdb, txn, &msg->profile);
|
if (msg->profile.record.builder) {
|
||||||
|
// only write if parsing didn't fail
|
||||||
|
ndb_write_profile(writer->lmdb, txn,
|
||||||
|
&msg->profile,
|
||||||
|
note_nkey);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case NDB_WRITER_NOTE:
|
case NDB_WRITER_NOTE:
|
||||||
ndb_write_note(writer->lmdb, txn, &msg->note);
|
ndb_write_note(writer->lmdb, txn, &msg->note);
|
||||||
@ -637,8 +753,8 @@ static void *ndb_writer_thread(void *data)
|
|||||||
if (msg->type == NDB_WRITER_NOTE)
|
if (msg->type == NDB_WRITER_NOTE)
|
||||||
free(msg->note.note);
|
free(msg->note.note);
|
||||||
else if (msg->type == NDB_WRITER_PROFILE) {
|
else if (msg->type == NDB_WRITER_PROFILE) {
|
||||||
free(msg->profile.profile_flatbuf);
|
|
||||||
free(msg->profile.note.note);
|
free(msg->profile.note.note);
|
||||||
|
ndb_profile_record_builder_free(&msg->profile.record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -667,7 +783,7 @@ static void *ndb_ingester_thread(void *data)
|
|||||||
any_event = 0;
|
any_event = 0;
|
||||||
|
|
||||||
popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
|
popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
|
||||||
ndb_debug("ingester popped %d items\n", popped);
|
//ndb_debug("ingester popped %d items\n", popped);
|
||||||
|
|
||||||
for (i = 0; i < popped; i++) {
|
for (i = 0; i < popped; i++) {
|
||||||
msg = &msgs[i];
|
msg = &msgs[i];
|
||||||
@ -1336,6 +1452,9 @@ int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make sure we're aligned as a whole
|
||||||
|
total_size = (total_size + 7) & ~7;
|
||||||
|
assert((total_size % 8) == 0);
|
||||||
return total_size;
|
return total_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,7 +120,6 @@ struct ndb_note {
|
|||||||
uint32_t content_length;
|
uint32_t content_length;
|
||||||
union ndb_packed_str content;
|
union ndb_packed_str content;
|
||||||
uint32_t strings;
|
uint32_t strings;
|
||||||
uint32_t reserved[4]; // expansion slots
|
|
||||||
// nothing can come after tags since it contains variadic data
|
// nothing can come after tags since it contains variadic data
|
||||||
struct ndb_tags tags;
|
struct ndb_tags tags;
|
||||||
};
|
};
|
||||||
@ -157,6 +156,7 @@ int ndb_process_event(struct ndb *, const char *json, int len);
|
|||||||
int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
|
int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
|
||||||
void *ndb_get_profile_by_pubkey(struct ndb *, const unsigned char *pubkey, size_t *len);
|
void *ndb_get_profile_by_pubkey(struct ndb *, const unsigned char *pubkey, size_t *len);
|
||||||
struct ndb_note *ndb_get_note_by_id(struct ndb *, const unsigned char *id, size_t *len);
|
struct ndb_note *ndb_get_note_by_id(struct ndb *, const unsigned char *id, size_t *len);
|
||||||
|
struct ndb_note *ndb_get_note_by_key(struct ndb *, uint64_t key, size_t *len);
|
||||||
void ndb_destroy(struct ndb *);
|
void ndb_destroy(struct ndb *);
|
||||||
|
|
||||||
// BUILDER
|
// BUILDER
|
||||||
|
Loading…
Reference in New Issue
Block a user