diff --git a/golpe b/golpe index d14d64e..dde080d 160000 --- a/golpe +++ b/golpe @@ -1 +1 @@ -Subproject commit d14d64e892149752e2c6b049e166ec425b6daa8f +Subproject commit dde080d39a47197080cc11190797953783233a67 diff --git a/golpe.yaml b/golpe.yaml index 38dcc31..9723c0d 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -212,6 +212,10 @@ config: default: 2 noReload: true + - name: relay__negentropy__maxFilterLimit + desc: "Maximum records that can be processed per filter" + default: 1000000 + - name: events__maxEventSize desc: "Maximum size of normalised JSON, in bytes" default: 65536 diff --git a/src/RelayIngester.cpp b/src/RelayIngester.cpp index 66c6432..65d2d61 100644 --- a/src/RelayIngester.cpp +++ b/src/RelayIngester.cpp @@ -55,7 +55,7 @@ void RelayServer::runIngester(ThreadPool::Thread &thr) { try { ingesterProcessNegentropy(txn, decomp, msg->connId, arr); } catch (std::exception &e) { - sendNoticeError(msg->connId, std::string("bad negentropy: ") + e.what()); + sendNoticeError(msg->connId, std::string("negentropy error: ") + e.what()); } } else { throw herr("unknown cmd"); @@ -120,7 +120,8 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp if (arr.at(0) == "NEG-OPEN") { if (arr.get_array().size() < 5) throw herr("negentropy query missing elements"); - NostrFilterGroup filter({}); + NostrFilterGroup filter; + auto maxFilterLimit = cfg().relay__negentropy__maxFilterLimit; if (arr.at(2).is_string()) { auto ev = lookupEventById(txn, from_hex(arr.at(2).get_string())); @@ -137,7 +138,7 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp tao::json::value json = tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId)); try { - filter = std::move(NostrFilterGroup::unwrapped(tao::json::from_string(json.at("content").get_string()))); + filter = std::move(NostrFilterGroup::unwrapped(tao::json::from_string(json.at("content").get_string()), maxFilterLimit)); } catch (std::exception &e) { sendToConn(connId, tao::json::to_string(tao::json::value::array({ "NEG-ERR", @@ -148,7 +149,7 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp return; } } else { - filter = std::move(NostrFilterGroup::unwrapped(arr.at(2))); + filter = std::move(NostrFilterGroup::unwrapped(arr.at(2), maxFilterLimit)); } Subscription sub(connId, arr[1].get_string(), std::move(filter)); diff --git a/src/RelayNegentropy.cpp b/src/RelayNegentropy.cpp index 4403630..5aa4bce 100644 --- a/src/RelayNegentropy.cpp +++ b/src/RelayNegentropy.cpp @@ -9,6 +9,7 @@ struct NegentropyViews { struct UserView { Negentropy ne; std::string initialMsg; + uint64_t startTime = hoytech::curr_time_us(); }; using ConnViews = flat_hash_map; @@ -76,6 +77,9 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { auto *view = views.findView(sub.connId, sub.subId); if (!view) return; + LI << "[" << sub.connId << "] Negentropy query matched " << view->ne.items.size() << " events in " + << (hoytech::curr_time_us() - view->startTime) << "us"; + view->ne.seal(); auto resp = view->ne.reconcile(view->initialMsg); @@ -120,7 +124,12 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { return; } - auto resp = view->ne.reconcile(view->initialMsg); + if (!view->ne.sealed) { + sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete"); + return; + } + + auto resp = view->ne.reconcile(msg->negPayload); sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({ "NEG-MSG", diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index ed6ce48..1efdbaa 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -16,17 +16,21 @@ struct WriterPipelineInput { struct WriterPipeline { public: - hoytech::protected_queue inbox; - hoytech::protected_queue flushInbox; + uint64_t debounceDelayMilliseconds = 1'000; + uint64_t writeBatchSize = 1'000; private: + hoytech::protected_queue validatorInbox; hoytech::protected_queue writerInbox; + hoytech::protected_queue flushInbox; std::thread validatorThread; std::thread writerThread; std::condition_variable shutdownCv; std::mutex shutdownMutex; - bool shutdown = false; + std::atomic shutdownRequested = false; + std::atomic shutdownComplete = false; + std::atomic numLive = 0; public: WriterPipeline() { @@ -36,7 +40,7 @@ struct WriterPipeline { secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); while (1) { - auto msgs = inbox.pop_all(); + auto msgs = validatorInbox.pop_all(); for (auto &m : msgs) { if (m.eventJson.is_null()) { @@ -64,19 +68,21 @@ struct WriterPipeline { while (1) { // Debounce - writerInbox.wait(); { - std::unique_lock lk(shutdownMutex); - shutdownCv.wait_for(lk, std::chrono::milliseconds(1'000), [&]{return shutdown;}); + auto numPendingElems = writerInbox.wait(); + + if (!shutdownRequested && numPendingElems < writeBatchSize) { + std::unique_lock lk(shutdownMutex); + shutdownCv.wait_for(lk, std::chrono::milliseconds(debounceDelayMilliseconds), [&]{return !!shutdownRequested;}); + } } auto newEvents = writerInbox.pop_all(); - bool flush = false; uint64_t written = 0, dups = 0; - // Collect a certain amount of records in a batch, push the rest back into the inbox + // Collect a certain amount of records in a batch, push the rest back into the writerInbox // Pre-filter out dups in a read-only txn as an optimisation std::vector newEventsToProc; @@ -84,16 +90,19 @@ struct WriterPipeline { { auto txn = env.txn_ro(); - for (auto &event : newEvents) { - if (newEventsToProc.size() > 1'000) { - // Put the rest back in the inbox + while (newEvents.size()) { + if (newEventsToProc.size() >= writeBatchSize) { + // Put the rest back in the writerInbox writerInbox.unshift_move_all(newEvents); newEvents.clear(); break; } + auto event = std::move(newEvents.front()); + newEvents.pop_front(); + if (event.flatStr.size() == 0) { - flush = true; + shutdownComplete = true; break; } @@ -121,21 +130,21 @@ struct WriterPipeline { } } - LI << "Writer: added: " << written << " dups: " << dups; + if (written || dups) LI << "Writer: added: " << written << " dups: " << dups; - if (flush) flushInbox.push_move(true); + if (shutdownComplete) flushInbox.push_move(true); } }); } - void flush() { - { - std::lock_guard lk(shutdownMutex); - shutdown = true; - } - shutdownCv.notify_all(); + void write(WriterPipelineInput &&inp) { + validatorInbox.push_move(std::move(inp)); + } - inbox.push_move({ tao::json::null, EventSourceType::None, "" }); + void flush() { + shutdownRequested = true; + validatorInbox.push_move({ tao::json::null, EventSourceType::None, "" }); + shutdownCv.notify_all(); flushInbox.wait(); } }; diff --git a/src/cmd_stream.cpp b/src/cmd_stream.cpp index 28b678d..5d494f2 100644 --- a/src/cmd_stream.cpp +++ b/src/cmd_stream.cpp @@ -72,7 +72,7 @@ void cmd_stream(const std::vector &subArgs) { auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg); if (res == WritePolicyResult::Accept) { downloadedIds.emplace(from_hex(evJson.at("id").get_string())); - writer.inbox.push_move({ std::move(evJson), EventSourceType::Stream, url }); + writer.write({ std::move(evJson), EventSourceType::Stream, url }); } else { LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg; } diff --git a/src/cmd_sync.cpp b/src/cmd_sync.cpp index f4893a9..37f2029 100644 --- a/src/cmd_sync.cpp +++ b/src/cmd_sync.cpp @@ -89,7 +89,8 @@ void cmd_sync(const std::vector &subArgs) { const uint64_t highWaterUp = 100, lowWaterUp = 50; const uint64_t batchSizeDown = 50; - uint64_t inFlightUp = 0, inFlightDown = 0; + uint64_t inFlightUp = 0; + bool inFlightDown = false; // bool because we can't count on getting every EVENT we request (might've been deleted mid-query) std::vector have, need; bool syncDone = false; uint64_t totalHaves = 0, totalNeeds = 0; @@ -127,9 +128,11 @@ void cmd_sync(const std::vector &subArgs) { LW << "Unable to upload event " << msg.at(1).get_string() << ": " << msg.at(3).get_string(); } } else if (msg.at(0) == "EVENT") { - writer.inbox.push_move({ std::move(msg.at(2)), EventSourceType::Sync, url }); + writer.write({ std::move(msg.at(2)), EventSourceType::Sync, url }); } else if (msg.at(0) == "EOSE") { - inFlightDown = 0; + inFlightDown = false; + } else { + LW << "Unexpected message from relay: " << msg; } } catch (std::exception &e) { LE << "Error processing websocket message: " << e.what(); @@ -163,7 +166,7 @@ void cmd_sync(const std::vector &subArgs) { if (numSent > 0) LI << "UP: " << numSent << " events (" << have.size() << " remaining)"; } - if (doDown && need.size() > 0 && inFlightDown == 0) { + if (doDown && need.size() > 0 && !inFlightDown) { tao::json::value ids = tao::json::empty_array; while (need.size() > 0 && ids.get_array().size() < batchSizeDown) { @@ -181,10 +184,10 @@ void cmd_sync(const std::vector &subArgs) { }), }))); - inFlightDown = 1; + inFlightDown = true; } - if (syncDone && have.size() == 0 && need.size() == 0 && inFlightUp == 0 && inFlightDown == 0) { + if (syncDone && have.size() == 0 && need.size() == 0 && inFlightUp == 0 && !inFlightDown) { if (doDown) writer.flush(); ::exit(0); } diff --git a/src/filters.h b/src/filters.h index 0363749..b7398a6 100644 --- a/src/filters.h +++ b/src/filters.h @@ -214,6 +214,8 @@ struct NostrFilter { struct NostrFilterGroup { std::vector filters; + NostrFilterGroup() {} + // Note that this expects the full array, so the first two items are "REQ" and the subId NostrFilterGroup(const tao::json::value &req, uint64_t maxFilterLimit = cfg().relay__maxFilterLimit) { const auto &arr = req.get_array(); @@ -225,7 +227,7 @@ struct NostrFilterGroup { } } - // Hacky! Deserves a refactor + // FIXME refactor: Make unwrapped the default constructor static NostrFilterGroup unwrapped(tao::json::value filter, uint64_t maxFilterLimit = cfg().relay__maxFilterLimit) { if (!filter.is_array()) { filter = tao::json::value::array({ filter });