diff --git a/nostrdb/nostrdb.c b/nostrdb/nostrdb.c index d973b67a..72650be9 100644 --- a/nostrdb/nostrdb.c +++ b/nostrdb/nostrdb.c @@ -7,6 +7,7 @@ #include "sha256.h" #include "lmdb.h" #include "util.h" +#include "cpu.h" #include "threadpool.h" #include "protected_queue.h" #include "memchr.h" @@ -115,6 +116,8 @@ struct ndb_ingester { uint32_t flags; struct threadpool tp; struct ndb_writer *writer; + void *filter_context; + ndb_ingest_filter_fn filter; }; @@ -1577,16 +1580,23 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, struct ndb_note *note, size_t note_size, struct ndb_writer_msg *out, - uint32_t flags) + struct ndb_ingester *ingester) { - //printf("ndb_ingester_process_note "); - //print_hex(note->id, 32); - //printf("\n"); + enum ndb_ingest_filter_action action; + action = NDB_INGEST_ACCEPT; + + if (ingester->filter) + action = ingester->filter(ingester->filter_context, note); + + if (action == NDB_INGEST_REJECT) + return 0; // some special situations we might want to skip sig validation, // like during large imports - if (!(flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { - // Verify! If it's an invalid note we don't need to + if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { + // if we're skipping validation we don't need to verify + } else { + // verify! If it's an invalid note we don't need to // bother writing it to the database if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { ndb_debug("signature verification failed\n"); @@ -1678,7 +1688,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, } if (!ndb_ingester_process_note(ctx, note, note_size, - out, ingester->flags)) { + out, ingester)) { goto cleanup; } else { // we're done with the original json, free it @@ -1699,7 +1709,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, } if (!ndb_ingester_process_note(ctx, note, note_size, - out, ingester->flags)) { + out, ingester)) { goto cleanup; } else { // we're done with the original json, free it @@ -3000,8 +3010,8 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb) // initialize the ingester queue and then spawn the thread static int ndb_ingester_init(struct ndb_ingester *ingester, - struct ndb_writer *writer, int num_threads, - int flags) + struct ndb_writer *writer, + struct ndb_config *config) { int elem_size, num_elems; static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; @@ -3011,10 +3021,13 @@ static int ndb_ingester_init(struct ndb_ingester *ingester, num_elems = DEFAULT_QUEUE_SIZE; ingester->writer = writer; - ingester->flags = flags; + ingester->flags = config->flags; + ingester->filter = config->ingest_filter; + ingester->filter_context = config->filter_context; - if (!threadpool_init(&ingester->tp, num_threads, elem_size, num_elems, - &quit_msg, ingester, ndb_ingester_thread)) + if (!threadpool_init(&ingester->tp, config->ingester_threads, + elem_size, num_elems, &quit_msg, ingester, + ndb_ingester_thread)) { fprintf(stderr, "ndb ingester threadpool failed to init\n"); return 0; @@ -3221,20 +3234,20 @@ static int ndb_run_migrations(struct ndb *ndb) return 1; } -int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingester_threads, int flags) +int ndb_init(struct ndb **pndb, const char *filename, struct ndb_config *config) { struct ndb *ndb; //MDB_dbi ind_id; // TODO: ind_pk, etc ndb = *pndb = calloc(1, sizeof(struct ndb)); - ndb->flags = flags; + ndb->flags = config->flags; if (ndb == NULL) { fprintf(stderr, "ndb_init: malloc failed\n"); return 0; } - if (!ndb_init_lmdb(filename, &ndb->lmdb, mapsize)) + if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) return 0; if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) { @@ -3242,14 +3255,14 @@ int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingest return 0; } - if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, ingester_threads, - ndb->flags)) { + if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) { fprintf(stderr, "failed to initialize %d ingester thread(s)\n", - ingester_threads); + config->ingester_threads); return 0; } - if (!ndb_flag_set(flags, NDB_FLAG_NOMIGRATE) && !ndb_run_migrations(ndb)) { + if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) && + !ndb_run_migrations(ndb)) { fprintf(stderr, "failed to run migrations\n"); return 0; } @@ -4380,4 +4393,37 @@ inline int ndb_builder_push_tag_str(struct ndb_builder *builder, return ndb_builder_finalize_tag(builder, pstr); } +// +// CONFIG +// +void ndb_default_config(struct ndb_config *config) +{ + config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB + config->ingester_threads = 4; // TODO: figure this out from platform apis + config->flags = 0; + config->ingest_filter = NULL; + config->filter_context = NULL; +} +void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) +{ + int cores = get_physical_cores(); + config->ingester_threads = cores == -1 ? 4 : cores; +} + +void ndb_config_set_flags(struct ndb_config *config, int flags) +{ + config->flags = flags; +} + +void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) +{ + config->mapsize = mapsize; +} + +void ndb_config_set_ingest_filter(struct ndb_config *config, + ndb_ingest_filter_fn fn, void *filter_ctx) +{ + config->ingest_filter = fn; + config->filter_context = filter_ctx; +} diff --git a/nostrdb/nostrdb.h b/nostrdb/nostrdb.h index 89347750..9b4c0ff9 100644 --- a/nostrdb/nostrdb.h +++ b/nostrdb/nostrdb.h @@ -105,6 +105,12 @@ enum tce_type { NDB_TCE_EOSE = 0x4, }; +enum ndb_ingest_filter_action { + NDB_INGEST_REJECT, + NDB_INGEST_ACCEPT, + NDB_INGEST_SKIP_VALIDATION +}; + // function pointer for controlling what to do after we parse an id typedef enum ndb_idres (*ndb_id_fn)(void *, const char *); @@ -233,6 +239,8 @@ struct ndb_note { #pragma pack(pop) +typedef enum ndb_ingest_filter_action (*ndb_ingest_filter_fn)(void *, struct ndb_note *); + struct ndb_builder { struct cursor mem; struct cursor note_cur; @@ -295,6 +303,20 @@ struct ndb_filter { struct ndb_filter_elements *elements[NDB_NUM_FILTERS]; }; +struct ndb_config { + int flags; + int ingester_threads; + size_t mapsize; + void *filter_context; + ndb_ingest_filter_fn ingest_filter; +}; + +// CONFIG +void ndb_default_config(struct ndb_config *); +void ndb_config_set_ingest_threads(struct ndb_config *config, int threads); +void ndb_config_set_flags(struct ndb_config *config, int flags); +void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize); +void ndb_config_set_ingest_filter(struct ndb_config *config, ndb_ingest_filter_fn fn, void *); // HELPERS int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen); @@ -304,7 +326,7 @@ int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair); int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[32], unsigned char signature[64]); // NDB -int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_threads, int flags); +int ndb_init(struct ndb **ndb, const char *dbdir, struct ndb_config *); int ndb_db_version(struct ndb *ndb); int ndb_process_event(struct ndb *, const char *json, int len); int ndb_process_events(struct ndb *, const char *ldjson, size_t len); @@ -354,7 +376,6 @@ void ndb_filter_reset(struct ndb_filter *); void ndb_filter_end_field(struct ndb_filter *); void ndb_filter_free(struct ndb_filter *filter); - // FULLTEXT SEARCH int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *);