apps refactor

This commit is contained in:
Doug Hoyte
2023-05-21 03:32:25 -04:00
parent ea73aca3f3
commit 03ef0958fc
21 changed files with 4 additions and 3 deletions

View File

@ -0,0 +1,30 @@
#include <unistd.h>
#include <stdio.h>
#include <docopt.h>
#include "golpe.h"
static const char USAGE[] =
R"(
Usage:
compact <output_file>
)";
void cmd_compact(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
std::string outputFile = args["<output_file>"].asString();
if (outputFile == "-") {
env.copy_fd(1);
} else {
if (access(outputFile.c_str(), F_OK) == 0) throw herr("output file '", outputFile, "' exists, not overwriting");
auto *f = ::fopen(outputFile.c_str(), "w");
if (!f) throw herr("opening output file '", outputFile, "' failed: ", strerror(errno));
env.copy_fd(::fileno(f));
}
}

View File

@ -0,0 +1,78 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "DBQuery.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
delete [--age=<age>] [--filter=<filter>] [--dry-run]
)";
void cmd_delete(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
uint64_t age = MAX_U64;
if (args["--age"]) age = args["--age"].asLong();
std::string filterStr;
if (args["--filter"]) filterStr = args["--filter"].asString();
bool dryRun = args["--dry-run"].asBool();
if (filterStr.size() == 0 && age == MAX_U64) throw herr("must specify --age and/or --filter");
if (filterStr.size() == 0) filterStr = "{}";
auto filter = tao::json::from_string(filterStr);
auto now = hoytech::curr_time_s();
if (age != MAX_U64) {
if (age > now) age = now;
if (filter.optional<uint64_t>("until")) throw herr("--age is not compatible with filter containing 'until'");
filter["until"] = now - age;
}
DBQuery query(filter);
btree_set<uint64_t> levIds;
{
auto txn = env.txn_ro();
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
levIds.insert(levId);
});
if (complete) break;
}
}
if (dryRun) {
LI << "Would delete " << levIds.size() << " events";
return;
}
LI << "Deleting " << levIds.size() << " events";
{
auto txn = env.txn_rw();
for (auto levId : levIds) {
deleteEvent(txn, levId);
}
txn.commit();
}
}

View File

@ -0,0 +1,240 @@
#include <zstd.h>
#include <zdict.h>
#include <iostream>
#include <random>
#include <docopt.h>
#include "golpe.h"
#include "DBQuery.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
dict stats [--filter=<filter>]
dict train [--filter=<filter>] [--limit=<limit>] [--dictSize=<dictSize>]
dict compress [--filter=<filter>] [--dictId=<dictId>] [--level=<level>]
dict decompress [--filter=<filter>]
)";
void cmd_dict(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
std::string filterStr;
if (args["--filter"]) filterStr = args["--filter"].asString();
else filterStr = "{}";
uint64_t limit = MAX_U64;
if (args["--limit"]) limit = args["--limit"].asLong();
uint64_t dictSize = 100'000;
if (args["--dictSize"]) dictSize = args["--dictSize"].asLong();
uint64_t dictId = 0;
if (args["--dictId"]) dictId = args["--dictId"].asLong();
int level = 3;
if (args["--level"]) level = args["--level"].asLong();
Decompressor decomp;
std::vector<uint64_t> levIds;
auto txn = env.txn_ro();
DBQuery query(tao::json::from_string(filterStr));
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view){
levIds.push_back(levId);
});
if (complete) break;
}
LI << "Filter matched " << levIds.size() << " records";
if (args["stats"].asBool()) {
uint64_t totalSize = 0;
uint64_t totalCompressedSize = 0;
uint64_t numCompressed = 0;
btree_map<uint32_t, uint64_t> dicts;
env.foreach_CompressionDictionary(txn, [&](auto &view){
auto dictId = view.primaryKeyId;
if (!dicts.contains(dictId)) dicts[dictId] = 0;
return true;
});
for (auto levId : levIds) {
std::string_view raw;
bool found = env.dbi_EventPayload.get(txn, lmdb::to_sv<uint64_t>(levId), raw);
if (!found) throw herr("couldn't find event in EventPayload");
uint32_t dictId;
size_t outCompressedSize;
auto json = decodeEventPayload(txn, decomp, raw, &dictId, &outCompressedSize);
totalSize += json.size();
totalCompressedSize += dictId ? outCompressedSize : json.size();
if (dictId) {
numCompressed++;
dicts[dictId]++;
}
}
auto ratio = renderPercent(1.0 - (double)totalCompressedSize / totalSize);
std::cout << "Num compressed: " << numCompressed << " / " << levIds.size() << "\n";
std::cout << "Uncompressed size: " << renderSize(totalSize) << "\n";
std::cout << "Compressed size: " << renderSize(totalCompressedSize) << " (" << ratio << ")" << "\n";
std::cout << "\ndictId : events\n";
for (auto &[dictId, n] : dicts) {
std::cout << " " << dictId << " : " << n << "\n";
}
} else if (args["train"].asBool()) {
std::string trainingBuf;
std::vector<size_t> trainingSizes;
if (levIds.size() > limit) {
LI << "Randomly selecting " << limit << " records";
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(levIds.begin(), levIds.end(), g);
levIds.resize(limit);
}
for (auto levId : levIds) {
std::string json = std::string(getEventJson(txn, decomp, levId));
trainingBuf += json;
trainingSizes.emplace_back(json.size());
}
std::string dict(dictSize, '\0');
LI << "Performing zstd training...";
auto ret = ZDICT_trainFromBuffer(dict.data(), dict.size(), trainingBuf.data(), trainingSizes.data(), trainingSizes.size());
if (ZDICT_isError(ret)) throw herr("zstd training failed: ", ZSTD_getErrorName(ret));
txn.abort();
txn = env.txn_rw();
uint64_t newDictId = env.insert_CompressionDictionary(txn, dict);
std::cout << "Saved new dictionary, dictId = " << newDictId << std::endl;
txn.commit();
} else if (args["compress"].asBool()) {
if (dictId == 0) throw herr("specify --dictId or --decompress");
txn.abort();
txn = env.txn_rw();
auto view = env.lookup_CompressionDictionary(txn, dictId);
if (!view) throw herr("couldn't find dictId ", dictId);
auto dict = view->dict();
auto *cctx = ZSTD_createCCtx();
auto *cdict = ZSTD_createCDict(dict.data(), dict.size(), level);
uint64_t origSizes = 0;
uint64_t compressedSizes = 0;
uint64_t pendingFlush = 0;
uint64_t processed = 0;
std::string compressedData(500'000, '\0');
for (auto levId : levIds) {
std::string_view orig;
try {
orig = getEventJson(txn, decomp, levId);
} catch (std::exception &e) {
continue;
}
auto ret = ZSTD_compress_usingCDict(cctx, compressedData.data(), compressedData.size(), orig.data(), orig.size(), cdict);
if (ZDICT_isError(ret)) throw herr("zstd compression failed: ", ZSTD_getErrorName(ret));
origSizes += orig.size();
compressedSizes += ret;
std::string newVal;
if (ret + 4 < orig.size()) {
newVal += '\x01';
newVal += lmdb::to_sv<uint32_t>(dictId);
newVal += std::string_view(compressedData.data(), ret);
} else {
newVal += '\x00';
newVal += orig;
}
env.dbi_EventPayload.put(txn, lmdb::to_sv<uint64_t>(levId), newVal);
pendingFlush++;
processed++;
if (pendingFlush > 10'000) {
txn.commit();
LI << "Progress: " << processed << "/" << levIds.size();
pendingFlush = 0;
txn = env.txn_rw();
}
}
txn.commit();
LI << "Original event sizes: " << origSizes;
LI << "New event sizes: " << compressedSizes;
} else if (args["decompress"].asBool()) {
txn.abort();
txn = env.txn_rw();
uint64_t pendingFlush = 0;
uint64_t processed = 0;
for (auto levId : levIds) {
std::string_view orig;
try {
orig = getEventJson(txn, decomp, levId);
} catch (std::exception &e) {
continue;
}
std::string newVal;
newVal += '\x00';
newVal += orig;
env.dbi_EventPayload.put(txn, lmdb::to_sv<uint64_t>(levId), newVal);
pendingFlush++;
processed++;
if (pendingFlush > 10'000) {
txn.commit();
LI << "Progress: " << processed << "/" << levIds.size();
pendingFlush = 0;
txn = env.txn_rw();
}
}
txn.commit();
}
}

View File

@ -0,0 +1,48 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
export [--since=<since>] [--until=<until>] [--reverse]
)";
void cmd_export(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
uint64_t since = 0, until = MAX_U64;
if (args["--since"]) since = args["--since"].asLong();
if (args["--until"]) until = args["--until"].asLong();
bool reverse = args["--reverse"].asBool();
Decompressor decomp;
auto txn = env.txn_ro();
auto dbVersion = getDBVersion(txn);
if (dbVersion == 0) throw herr("migration from DB version 0 not supported by this version of strfry");
uint64_t start = reverse ? until : since;
uint64_t startDup = reverse ? MAX_U64 : 0;
env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv<uint64_t>(start), lmdb::to_sv<uint64_t>(startDup), [&](auto k, auto v) {
if (reverse) {
if (lmdb::from_sv<uint64_t>(k) < since) return false;
} else {
if (lmdb::from_sv<uint64_t>(k) > until) return false;
}
auto view = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
std::cout << getEventJson(txn, decomp, view.primaryKeyId) << "\n";
return true;
}, reverse);
}

View File

@ -0,0 +1,89 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "events.h"
#include "filters.h"
static const char USAGE[] =
R"(
Usage:
import [--show-rejected] [--no-verify]
)";
void cmd_import(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
bool showRejected = args["--show-rejected"].asBool();
bool noVerify = args["--no-verify"].asBool();
if (noVerify) LW << "not verifying event IDs or signatures!";
auto txn = env.txn_rw();
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
std::string line;
uint64_t processed = 0, added = 0, rejected = 0, dups = 0;
std::vector<EventToWrite> newEvents;
auto logStatus = [&]{
LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups";
};
auto flushChanges = [&]{
writeEvents(txn, newEvents, 0);
uint64_t numCommits = 0;
for (auto &newEvent : newEvents) {
if (newEvent.status == EventWriteStatus::Written) {
added++;
numCommits++;
} else if (newEvent.status == EventWriteStatus::Duplicate) {
dups++;
} else {
rejected++;
}
}
logStatus();
LI << "Committing " << numCommits << " records";
txn.commit();
txn = env.txn_rw();
newEvents.clear();
};
while (std::cin) {
std::getline(std::cin, line);
if (!line.size()) continue;
processed++;
std::string flatStr;
std::string jsonStr;
try {
auto origJson = tao::json::from_string(line);
parseAndVerifyEvent(origJson, secpCtx, !noVerify, false, flatStr, jsonStr);
} catch (std::exception &e) {
if (showRejected) LW << "Line " << processed << " rejected: " << e.what();
rejected++;
continue;
}
newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), EventSourceType::Import, "");
if (newEvents.size() >= 10'000) flushChanges();
}
flushChanges();
txn.commit();
}

View File

@ -0,0 +1,20 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
static const char USAGE[] =
R"(
Usage:
info
)";
void cmd_info(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
auto txn = env.txn_ro();
std::cout << "DB version: " << getDBVersion(txn) << "\n";
}

View File

@ -0,0 +1,67 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "ActiveMonitors.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
monitor
)";
// echo '["sub",1,"mysub",{"authors":["47f7163b"]}]' | ./strfry monitor
void cmd_monitor(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
auto txn = env.txn_ro();
Decompressor decomp;
ActiveMonitors monitors;
std::string line;
uint64_t interestConnId = 0;
std::string interestSubId;
while (std::cin) {
std::getline(std::cin, line);
if (!line.size()) continue;
auto msg = tao::json::from_string(line);
auto &msgArr = msg.get_array();
auto cmd = msgArr.at(0).get_string();
if (cmd == "sub") {
Subscription sub(msgArr.at(1).get_unsigned(), msgArr.at(2).get_string(), NostrFilterGroup::unwrapped(msgArr.at(3)));
sub.latestEventId = 0;
monitors.addSub(txn, std::move(sub), 0);
} else if (cmd == "removeSub") {
monitors.removeSub(msgArr.at(1).get_unsigned(), SubId(msgArr.at(2).get_string()));
} else if (cmd == "closeConn") {
monitors.closeConn(msgArr.at(1).get_unsigned());
} else if (cmd == "interest") {
if (interestConnId) throw herr("interest already set");
interestConnId = msgArr.at(1).get_unsigned();
interestSubId = msgArr.at(2).get_string();
} else {
throw herr("unknown cmd");
}
}
env.foreach_Event(txn, [&](auto &ev){
monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){
for (auto &r : recipients) {
if (r.connId == interestConnId && r.subId.str() == interestSubId) {
std::cout << getEventJson(txn, decomp, levId) << "\n";
}
}
});
return true;
});
}

View File

@ -0,0 +1,47 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "DBQuery.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
scan [--pause=<pause>] [--metrics] [--count] <filter>
)";
void cmd_scan(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
uint64_t pause = 0;
if (args["--pause"]) pause = args["--pause"].asLong();
bool metrics = args["--metrics"].asBool();
bool count = args["--count"].asBool();
std::string filterStr = args["<filter>"].asString();
DBQuery query(tao::json::from_string(filterStr));
Decompressor decomp;
auto txn = env.txn_ro();
uint64_t numEvents = 0;
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
if (count) numEvents++;
else std::cout << getEventJson(txn, decomp, levId, eventPayload) << "\n";
}, pause ? pause : MAX_U64, metrics);
if (complete) break;
}
if (count) std::cout << numEvents << std::endl;
}

View File

@ -0,0 +1,137 @@
#include <docopt.h>
#include <tao/json.hpp>
#include <hoytech/protected_queue.h>
#include <hoytech/file_change_monitor.h>
#include "golpe.h"
#include "WriterPipeline.h"
#include "Subscription.h"
#include "WSConnection.h"
#include "events.h"
#include "PluginWritePolicy.h"
static const char USAGE[] =
R"(
Usage:
stream <url> [--dir=<dir>]
Options:
--dir=<dir> Direction: down, up, or both [default: down]
)";
void cmd_stream(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
std::string url = args["<url>"].asString();
std::string dir = args["--dir"] ? args["--dir"].asString() : "down";
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
flat_hash_set<std::string> downloadedIds;
WriterPipeline writer;
WSConnection ws(url);
Decompressor decomp;
PluginWritePolicy writePolicy;
ws.onConnect = [&]{
if (dir == "down" || dir == "both") {
auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "sub", tao::json::value({ { "limit", 0 } }) }));
ws.send(encoded);
}
};
ws.onMessage = [&](auto msg, uWS::OpCode, size_t){
auto origJson = tao::json::from_string(msg);
if (origJson.is_array()) {
if (origJson.get_array().size() < 2) throw herr("array too short");
auto &msgType = origJson.get_array().at(0);
if (msgType == "EOSE") {
return;
} else if (msgType == "NOTICE") {
LW << "NOTICE message: " << tao::json::to_string(origJson);
return;
} else if (msgType == "OK") {
if (!origJson.get_array().at(2).get_boolean()) {
LW << "Event not written: " << origJson;
}
} else if (msgType == "EVENT") {
if (dir == "down" || dir == "both") {
if (origJson.get_array().size() < 3) throw herr("array too short");
auto &evJson = origJson.at(2);
std::string okMsg;
auto res = writePolicy.acceptEvent(evJson, hoytech::curr_time_s(), EventSourceType::Stream, ws.remoteAddr, okMsg);
if (res == WritePolicyResult::Accept) {
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.write({ std::move(evJson), EventSourceType::Stream, url });
} else {
LI << "[" << ws.remoteAddr << "] write policy blocked event " << evJson.at("id").get_string() << ": " << okMsg;
}
} else {
LW << "Unexpected EVENT";
}
} else {
throw herr("unexpected first element");
}
} else {
throw herr("unexpected message");
}
};
uint64_t currEventId;
{
auto txn = env.txn_ro();
currEventId = getMostRecentLevId(txn);
}
ws.onTrigger = [&]{
if (dir == "down") return;
auto txn = env.txn_ro();
env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId;
auto id = std::string(sv(ev.flat_nested()->id()));
if (downloadedIds.find(id) != downloadedIds.end()) {
downloadedIds.erase(id);
return true;
}
std::string msg = std::string("[\"EVENT\",");
msg += getEventJson(txn, decomp, ev.primaryKeyId);
msg += "]";
ws.send(msg);
return true;
}, false, currEventId + 1);
};
std::unique_ptr<hoytech::file_change_monitor> dbChangeWatcher;
if (dir == "up" || dir == "both") {
dbChangeWatcher = std::make_unique<hoytech::file_change_monitor>(dbDir + "/data.mdb");
dbChangeWatcher->setDebounce(100);
dbChangeWatcher->run([&](){
ws.trigger();
});
}
ws.run();
}

198
src/apps/mesh/cmd_sync.cpp Normal file
View File

@ -0,0 +1,198 @@
#include <docopt.h>
#include <tao/json.hpp>
#include <Negentropy.h>
#include "golpe.h"
#include "WriterPipeline.h"
#include "Subscription.h"
#include "WSConnection.h"
#include "DBQuery.h"
#include "filters.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
sync <url> [--filter=<filter>] [--dir=<dir>]
Options:
--filter=<filter> Nostr filter (either single filter object or array of filters)
--dir=<dir> Direction: both, down, up, none [default: both]
)";
void cmd_sync(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
std::string url = args["<url>"].asString();
std::string filterStr;
if (args["--filter"]) filterStr = args["--filter"].asString();
else filterStr = "{}";
std::string dir = args["--dir"] ? args["--dir"].asString() : "both";
if (dir != "both" && dir != "up" && dir != "down" && dir != "none") throw herr("invalid direction: ", dir, ". Should be one of both/up/down/none");
const uint64_t idSize = 16;
const bool doUp = dir == "both" || dir == "up";
const bool doDown = dir == "both" || dir == "down";
tao::json::value filter = tao::json::from_string(filterStr);
Negentropy ne(idSize);
{
DBQuery query(filter);
Decompressor decomp;
auto txn = env.txn_ro();
uint64_t numEvents = 0;
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId, std::string_view eventPayload){
auto ev = lookupEventByLevId(txn, levId);
ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize));
numEvents++;
});
if (complete) break;
}
LI << "Filter matches " << numEvents << " events";
}
ne.seal();
WriterPipeline writer;
WSConnection ws(url);
ws.reconnect = false;
ws.onConnect = [&]{
auto neMsg = to_hex(ne.initiate());
ws.send(tao::json::to_string(tao::json::value::array({
"NEG-OPEN",
"N",
filter,
idSize,
neMsg,
})));
};
const uint64_t highWaterUp = 100, lowWaterUp = 50;
const uint64_t batchSizeDown = 50;
uint64_t inFlightUp = 0;
bool inFlightDown = false; // bool because we can't count on getting every EVENT we request (might've been deleted mid-query)
std::vector<std::string> have, need;
bool syncDone = false;
uint64_t totalHaves = 0, totalNeeds = 0;
Decompressor decomp;
ws.onMessage = [&](auto msgStr, uWS::OpCode opCode, size_t compressedSize){
try {
tao::json::value msg = tao::json::from_string(msgStr);
if (msg.at(0) == "NEG-MSG") {
uint64_t origHaves = have.size(), origNeeds = need.size();
auto neMsg = ne.reconcile(from_hex(msg.at(2).get_string()), have, need);
totalHaves += have.size() - origHaves;
totalNeeds += need.size() - origNeeds;
if (!doUp) have.clear();
if (!doDown) need.clear();
if (neMsg.size() == 0) {
syncDone = true;
LI << "Set reconcile complete. Have " << totalHaves << " need " << totalNeeds;
} else {
ws.send(tao::json::to_string(tao::json::value::array({
"NEG-MSG",
"N",
to_hex(neMsg),
})));
}
} else if (msg.at(0) == "OK") {
inFlightUp--;
if (!msg.at(2).get_boolean()) {
LW << "Unable to upload event " << msg.at(1).get_string() << ": " << msg.at(3).get_string();
}
} else if (msg.at(0) == "EVENT") {
writer.write({ std::move(msg.at(2)), EventSourceType::Sync, url });
} else if (msg.at(0) == "EOSE") {
inFlightDown = false;
writer.wait();
} else {
LW << "Unexpected message from relay: " << msg;
}
} catch (std::exception &e) {
LE << "Error processing websocket message: " << e.what();
LW << "MSG: " << msgStr;
}
if (doUp && have.size() > 0 && inFlightUp <= lowWaterUp) {
auto txn = env.txn_ro();
uint64_t numSent = 0;
while (have.size() > 0 && inFlightUp < highWaterUp) {
auto id = std::move(have.back());
have.pop_back();
auto ev = lookupEventById(txn, id);
if (!ev) {
LW << "Couldn't upload event because not found (deleted?)";
continue;
}
std::string sendEventMsg = "[\"EVENT\",";
sendEventMsg += getEventJson(txn, decomp, ev->primaryKeyId);
sendEventMsg += "]";
ws.send(sendEventMsg);
numSent++;
inFlightUp++;
}
if (numSent > 0) LI << "UP: " << numSent << " events (" << have.size() << " remaining)";
}
if (doDown && need.size() > 0 && !inFlightDown) {
tao::json::value ids = tao::json::empty_array;
while (need.size() > 0 && ids.get_array().size() < batchSizeDown) {
ids.emplace_back(to_hex(need.back()));
need.pop_back();
}
LI << "DOWN: " << ids.get_array().size() << " events (" << need.size() << " remaining)";
ws.send(tao::json::to_string(tao::json::value::array({
"REQ",
"R",
tao::json::value({
{ "ids", std::move(ids) }
}),
})));
inFlightDown = true;
}
if (syncDone && have.size() == 0 && need.size() == 0 && inFlightUp == 0 && !inFlightDown) {
if (doDown) writer.flush();
::exit(0);
}
};
ws.run();
}

View File

@ -0,0 +1,128 @@
#include <hoytech/timer.h>
#include "RelayServer.h"
void RelayServer::runCron() {
hoytech::timer cron;
cron.setupCb = []{ setThreadName("cron"); };
// Delete ephemeral events
// FIXME: This is for backwards compat during upgrades, and can be removed eventually since
// the newer style of finding ephemeral events relies on expiration=1
cron.repeat(10 * 1'000'000UL, [&]{
std::vector<uint64_t> expiredLevIds;
{
auto txn = env.txn_ro();
auto mostRecent = getMostRecentLevId(txn);
uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds;
uint64_t currKind = 20'000;
while (currKind < 30'000) {
uint64_t numRecs = 0;
env.generic_foreachFull(txn, env.dbi_Event__kind, makeKey_Uint64Uint64(currKind, 0), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
numRecs++;
ParsedKey_Uint64Uint64 parsedKey(k);
currKind = parsedKey.n1;
if (currKind >= 30'000) return false;
if (parsedKey.n2 > cutoff) {
currKind++;
return false;
}
uint64_t levId = lmdb::from_sv<uint64_t>(v);
if (levId != mostRecent) { // prevent levId re-use
expiredLevIds.emplace_back(levId);
}
return true;
});
if (numRecs == 0) break;
}
}
if (expiredLevIds.size() > 0) {
auto txn = env.txn_rw();
uint64_t numDeleted = 0;
for (auto levId : expiredLevIds) {
if (deleteEvent(txn, levId)) numDeleted++;
}
txn.commit();
if (numDeleted) LI << "Deleted " << numDeleted << " ephemeral events";
}
});
// Delete expired events
cron.repeat(9 * 1'000'000UL, [&]{
std::vector<uint64_t> expiredLevIds;
uint64_t numEphemeral = 0;
uint64_t numExpired = 0;
{
auto txn = env.txn_ro();
auto mostRecent = getMostRecentLevId(txn);
uint64_t now = hoytech::curr_time_s();
uint64_t ephemeralCutoff = now - cfg().events__ephemeralEventsLifetimeSeconds;
env.generic_foreachFull(txn, env.dbi_Event__expiration, lmdb::to_sv<uint64_t>(0), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
auto expiration = lmdb::from_sv<uint64_t>(k);
auto levId = lmdb::from_sv<uint64_t>(v);
if (levId == mostRecent) return true;
if (expiration == 1) { // Ephemeral event
auto view = env.lookup_Event(txn, levId);
if (!view) throw herr("missing event from index, corrupt DB?");
uint64_t created = view->flat_nested()->created_at();
if (created <= ephemeralCutoff) {
numEphemeral++;
expiredLevIds.emplace_back(levId);
}
} else {
numExpired++;
expiredLevIds.emplace_back(levId);
}
return expiration <= now;
});
}
if (expiredLevIds.size() > 0) {
auto txn = env.txn_rw();
uint64_t numDeleted = 0;
for (auto levId : expiredLevIds) {
if (deleteEvent(txn, levId)) numDeleted++;
}
txn.commit();
if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")";
}
});
cron.run();
while (1) std::this_thread::sleep_for(std::chrono::seconds(1'000'000));
}

View File

@ -0,0 +1,171 @@
#include "RelayServer.h"
void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
Decompressor decomp;
while(1) {
auto newMsgs = thr.inbox.pop_all();
auto txn = env.txn_ro();
std::vector<MsgWriter> writerMsgs;
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgIngester::ClientMessage>(&newMsg.msg)) {
try {
if (msg->payload.starts_with('[')) {
auto payload = tao::json::from_string(msg->payload);
if (cfg().relay__logging__dumpInAll) LI << "[" << msg->connId << "] dumpInAll: " << msg->payload;
if (!payload.is_array()) throw herr("message is not an array");
auto &arr = payload.get_array();
if (arr.size() < 2) throw herr("bad message");
auto &cmd = arr[0].get_string();
if (cmd == "EVENT") {
if (cfg().relay__logging__dumpInEvents) LI << "[" << msg->connId << "] dumpInEvent: " << msg->payload;
try {
ingesterProcessEvent(txn, msg->connId, msg->ipAddr, secpCtx, arr[1], writerMsgs);
} catch (std::exception &e) {
sendOKResponse(msg->connId, arr[1].at("id").get_string(), false, std::string("invalid: ") + e.what());
LI << "Rejected invalid event: " << e.what();
}
} else if (cmd == "REQ") {
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;
try {
ingesterProcessReq(txn, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad req: ") + e.what());
}
} else if (cmd == "CLOSE") {
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;
try {
ingesterProcessClose(txn, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad close: ") + e.what());
}
} else if (cmd.starts_with("NEG-")) {
try {
ingesterProcessNegentropy(txn, decomp, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("negentropy error: ") + e.what());
}
} else {
throw herr("unknown cmd");
}
} else if (msg->payload == "\n") {
// Do nothing.
// This is for when someone is just sending newlines on websocat for debugging purposes.
} else {
throw herr("unparseable message");
}
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad msg: ") + e.what());
}
} else if (auto msg = std::get_if<MsgIngester::CloseConn>(&newMsg.msg)) {
auto connId = msg->connId;
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::CloseConn{connId}});
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::CloseConn{connId}});
}
}
if (writerMsgs.size()) {
tpWriter.dispatchMulti(0, writerMsgs);
}
}
}
void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output) {
std::string flatStr, jsonStr;
parseAndVerifyEvent(origJson, secpCtx, true, true, flatStr, jsonStr);
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
{
auto existing = lookupEventById(txn, sv(flat->id()));
if (existing) {
LI << "Duplicate event, skipping";
sendOKResponse(connId, to_hex(sv(flat->id())), true, "duplicate: have this event");
return;
}
}
output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}});
}
void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
if (arr.get_array().size() < 2 + 1) throw herr("arr too small");
if (arr.get_array().size() > 2 + 20) throw herr("arr too big");
Subscription sub(connId, arr[1].get_string(), NostrFilterGroup(arr));
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::NewSub{std::move(sub)}});
}
void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
if (arr.get_array().size() != 2) throw herr("arr too small/big");
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::RemoveSub{connId, SubId(arr[1].get_string())}});
}
void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &arr) {
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 5) throw herr("negentropy query missing elements");
NostrFilterGroup filter;
auto maxFilterLimit = cfg().relay__negentropy__maxFilterLimit;
if (arr.at(2).is_string()) {
auto ev = lookupEventById(txn, from_hex(arr.at(2).get_string()));
if (!ev) {
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
arr[1].get_string(),
"FILTER_NOT_FOUND"
})));
return;
}
tao::json::value json = tao::json::from_string(getEventJson(txn, decomp, ev->primaryKeyId));
try {
filter = std::move(NostrFilterGroup::unwrapped(tao::json::from_string(json.at("content").get_string()), maxFilterLimit));
} catch (std::exception &e) {
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
arr[1].get_string(),
"FILTER_INVALID"
})));
return;
}
} else {
filter = std::move(NostrFilterGroup::unwrapped(arr.at(2), maxFilterLimit));
}
Subscription sub(connId, arr[1].get_string(), std::move(filter));
uint64_t idSize = arr.at(3).get_unsigned();
if (idSize < 8 || idSize > 32) throw herr("idSize out of range");
std::string negPayload = from_hex(arr.at(4).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), idSize, std::move(negPayload)}});
} else if (arr.at(0) == "NEG-MSG") {
std::string negPayload = from_hex(arr.at(2).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}});
} else if (arr.at(0) == "NEG-CLOSE") {
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegClose{connId, SubId(arr[1].get_string())}});
} else {
throw herr("unknown command");
}
}

View File

@ -0,0 +1,152 @@
#include <Negentropy.h>
#include "RelayServer.h"
#include "DBQuery.h"
#include "QueryScheduler.h"
struct NegentropyViews {
struct UserView {
Negentropy ne;
std::string initialMsg;
uint64_t startTime = hoytech::curr_time_us();
};
using ConnViews = flat_hash_map<SubId, UserView>;
flat_hash_map<uint64_t, ConnViews> conns; // connId -> subId -> Negentropy
bool addView(uint64_t connId, const SubId &subId, uint64_t idSize, const std::string &initialMsg) {
{
auto *existing = findView(connId, subId);
if (existing) removeView(connId, subId);
}
auto res = conns.try_emplace(connId);
auto &connViews = res.first->second;
if (connViews.size() >= cfg().relay__maxSubsPerConnection) {
return false;
}
connViews.try_emplace(subId, UserView{ Negentropy(idSize), initialMsg });
return true;
}
UserView *findView(uint64_t connId, const SubId &subId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return nullptr;
auto f2 = f1->second.find(subId);
if (f2 == f1->second.end()) return nullptr;
return &f2->second;
}
void removeView(uint64_t connId, const SubId &subId) {
auto *view = findView(connId, subId);
if (!view) return;
conns[connId].erase(subId);
if (conns[connId].empty()) conns.erase(connId);
}
void closeConn(uint64_t connId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return;
conns.erase(connId);
}
};
void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
QueryScheduler queries;
NegentropyViews views;
queries.onEventBatch = [&](lmdb::txn &txn, const auto &sub, const std::vector<uint64_t> &levIds){
auto *view = views.findView(sub.connId, sub.subId);
if (!view) return;
for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize));
}
};
queries.onComplete = [&](Subscription &sub){
auto *view = views.findView(sub.connId, sub.subId);
if (!view) return;
LI << "[" << sub.connId << "] Negentropy query matched " << view->ne.items.size() << " events in "
<< (hoytech::curr_time_us() - view->startTime) << "us";
view->ne.seal();
auto resp = view->ne.reconcile(view->initialMsg);
view->initialMsg = "";
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
sub.subId.str(),
to_hex(resp)
})));
};
while(1) {
auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait();
auto txn = env.txn_ro();
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgNegentropy::NegOpen>(&newMsg.msg)) {
auto connId = msg->sub.connId;
auto subId = msg->sub.subId;
if (!queries.addSub(txn, std::move(msg->sub))) {
sendNoticeError(connId, std::string("too many concurrent REQs"));
}
if (!views.addView(connId, subId, msg->idSize, msg->negPayload)) {
queries.removeSub(connId, subId);
sendNoticeError(connId, std::string("too many concurrent NEG requests"));
}
queries.process(txn);
} else if (auto msg = std::get_if<MsgNegentropy::NegMsg>(&newMsg.msg)) {
auto *view = views.findView(msg->connId, msg->subId);
if (!view) {
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
msg->subId.str(),
"CLOSED"
})));
return;
}
if (!view->ne.sealed) {
sendNoticeError(msg->connId, "negentropy error: got NEG-MSG before NEG-OPEN complete");
return;
}
auto resp = view->ne.reconcile(msg->negPayload);
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
msg->subId.str(),
to_hex(resp)
})));
} else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
views.removeView(msg->connId, msg->subId);
} else if (auto msg = std::get_if<MsgNegentropy::CloseConn>(&newMsg.msg)) {
queries.closeConn(msg->connId);
views.closeConn(msg->connId);
}
}
queries.process(txn);
txn.abort();
}
}

View File

@ -0,0 +1,62 @@
#include "RelayServer.h"
#include "ActiveMonitors.h"
void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
auto dbChangeWatcher = hoytech::file_change_monitor(dbDir + "/data.mdb");
dbChangeWatcher.setDebounce(100);
dbChangeWatcher.run([&](){
tpReqMonitor.dispatchToAll([]{ return MsgReqMonitor{MsgReqMonitor::DBChange{}}; });
});
Decompressor decomp;
ActiveMonitors monitors;
uint64_t currEventId = MAX_U64;
while (1) {
auto newMsgs = thr.inbox.pop_all();
auto txn = env.txn_ro();
uint64_t latestEventId = getMostRecentLevId(txn);
if (currEventId > latestEventId) currEventId = latestEventId;
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgReqMonitor::NewSub>(&newMsg.msg)) {
auto connId = msg->sub.connId;
env.foreach_Event(txn, [&](auto &ev){
if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) {
sendEvent(connId, msg->sub.subId, getEventJson(txn, decomp, ev.primaryKeyId));
}
return true;
}, false, msg->sub.latestEventId + 1);
msg->sub.latestEventId = latestEventId;
if (!monitors.addSub(txn, std::move(msg->sub), latestEventId)) {
sendNoticeError(connId, std::string("too many concurrent REQs"));
}
} else if (auto msg = std::get_if<MsgReqMonitor::RemoveSub>(&newMsg.msg)) {
monitors.removeSub(msg->connId, msg->subId);
} else if (auto msg = std::get_if<MsgReqMonitor::CloseConn>(&newMsg.msg)) {
monitors.closeConn(msg->connId);
} else if (std::get_if<MsgReqMonitor::DBChange>(&newMsg.msg)) {
env.foreach_Event(txn, [&](auto &ev){
monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t levId){
sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, decomp, levId)));
});
return true;
}, false, currEventId + 1);
currEventId = latestEventId;
}
}
}
}

View File

@ -0,0 +1,46 @@
#include "RelayServer.h"
#include "DBQuery.h"
#include "QueryScheduler.h"
void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
Decompressor decomp;
QueryScheduler queries;
queries.onEvent = [&](lmdb::txn &txn, const auto &sub, uint64_t levId, std::string_view eventPayload){
sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr));
};
queries.onComplete = [&](Subscription &sub){
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ "EOSE", sub.subId.str() })));
tpReqMonitor.dispatch(sub.connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(sub)}});
};
while(1) {
auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait();
auto txn = env.txn_ro();
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgReqWorker::NewSub>(&newMsg.msg)) {
auto connId = msg->sub.connId;
if (!queries.addSub(txn, std::move(msg->sub))) {
sendNoticeError(connId, std::string("too many concurrent REQs"));
}
queries.process(txn);
} else if (auto msg = std::get_if<MsgReqWorker::RemoveSub>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
tpReqMonitor.dispatch(msg->connId, MsgReqMonitor{MsgReqMonitor::RemoveSub{msg->connId, msg->subId}});
} else if (auto msg = std::get_if<MsgReqWorker::CloseConn>(&newMsg.msg)) {
queries.closeConn(msg->connId);
tpReqMonitor.dispatch(msg->connId, MsgReqMonitor{MsgReqMonitor::CloseConn{msg->connId}});
}
}
queries.process(txn);
txn.abort();
}
}

View File

@ -0,0 +1,221 @@
#pragma once
#include <iostream>
#include <memory>
#include <algorithm>
#include <hoytech/time.h>
#include <hoytech/hex.h>
#include <hoytech/file_change_monitor.h>
#include <uWebSockets/src/uWS.h>
#include <tao/json.hpp>
#include "golpe.h"
#include "Subscription.h"
#include "ThreadPool.h"
#include "events.h"
#include "filters.h"
#include "Decompressor.h"
struct MsgWebsocket : NonCopyable {
struct Send {
uint64_t connId;
std::string payload;
};
struct SendBinary {
uint64_t connId;
std::string payload;
};
struct SendEventToBatch {
RecipientList list;
std::string evJson;
};
using Var = std::variant<Send, SendBinary, SendEventToBatch>;
Var msg;
MsgWebsocket(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgIngester : NonCopyable {
struct ClientMessage {
uint64_t connId;
std::string ipAddr;
std::string payload;
};
struct CloseConn {
uint64_t connId;
};
using Var = std::variant<ClientMessage, CloseConn>;
Var msg;
MsgIngester(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgWriter : NonCopyable {
struct AddEvent {
uint64_t connId;
std::string ipAddr;
uint64_t receivedAt;
std::string flatStr;
std::string jsonStr;
};
using Var = std::variant<AddEvent>;
Var msg;
MsgWriter(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgReqWorker : NonCopyable {
struct NewSub {
Subscription sub;
};
struct RemoveSub {
uint64_t connId;
SubId subId;
};
struct CloseConn {
uint64_t connId;
};
using Var = std::variant<NewSub, RemoveSub, CloseConn>;
Var msg;
MsgReqWorker(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgReqMonitor : NonCopyable {
struct NewSub {
Subscription sub;
};
struct RemoveSub {
uint64_t connId;
SubId subId;
};
struct CloseConn {
uint64_t connId;
};
struct DBChange {
};
using Var = std::variant<NewSub, RemoveSub, CloseConn, DBChange>;
Var msg;
MsgReqMonitor(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgNegentropy : NonCopyable {
struct NegOpen {
Subscription sub;
uint64_t idSize;
std::string negPayload;
};
struct NegMsg {
uint64_t connId;
SubId subId;
std::string negPayload;
};
struct NegClose {
uint64_t connId;
SubId subId;
};
struct CloseConn {
uint64_t connId;
};
using Var = std::variant<NegOpen, NegMsg, NegClose, CloseConn>;
Var msg;
MsgNegentropy(Var &&msg_) : msg(std::move(msg_)) {}
};
struct RelayServer {
std::unique_ptr<uS::Async> hubTrigger;
// Thread Pools
ThreadPool<MsgWebsocket> tpWebsocket;
ThreadPool<MsgIngester> tpIngester;
ThreadPool<MsgWriter> tpWriter;
ThreadPool<MsgReqWorker> tpReqWorker;
ThreadPool<MsgReqMonitor> tpReqMonitor;
ThreadPool<MsgNegentropy> tpNegentropy;
std::thread cronThread;
void run();
void runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr);
void runIngester(ThreadPool<MsgIngester>::Thread &thr);
void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output);
void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp, uint64_t connId, const tao::json::value &origJson);
void runWriter(ThreadPool<MsgWriter>::Thread &thr);
void runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr);
void runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr);
void runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr);
void runCron();
// Utils (can be called by any thread)
void sendToConn(uint64_t connId, std::string &&payload) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(payload)}});
hubTrigger->send();
}
void sendToConnBinary(uint64_t connId, std::string &&payload) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::SendBinary{connId, std::move(payload)}});
hubTrigger->send();
}
void sendEvent(uint64_t connId, const SubId &subId, std::string_view evJson) {
auto subIdSv = subId.sv();
std::string reply;
reply.reserve(13 + subIdSv.size() + evJson.size());
reply += "[\"EVENT\",\"";
reply += subIdSv;
reply += "\",";
reply += evJson;
reply += "]";
sendToConn(connId, std::move(reply));
}
void sendEventToBatch(RecipientList &&list, std::string &&evJson) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::SendEventToBatch{std::move(list), std::move(evJson)}});
hubTrigger->send();
}
void sendNoticeError(uint64_t connId, std::string &&payload) {
LI << "sending error to [" << connId << "]: " << payload;
auto reply = tao::json::value::array({ "NOTICE", std::string("ERROR: ") + payload });
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(tao::json::to_string(reply))}});
hubTrigger->send();
}
void sendOKResponse(uint64_t connId, std::string_view eventIdHex, bool written, std::string_view message) {
auto reply = tao::json::value::array({ "OK", eventIdHex, written, message });
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(tao::json::to_string(reply))}});
hubTrigger->send();
}
};

View File

@ -0,0 +1,209 @@
#include "RelayServer.h"
#include "app_git_version.h"
static std::string preGenerateHttpResponse(const std::string &contentType, const std::string &content) {
std::string output = "HTTP/1.1 200 OK\r\n";
output += std::string("Content-Type: ") + contentType + "\r\n";
output += "Access-Control-Allow-Origin: *\r\n";
output += "Connection: keep-alive\r\n";
output += "Server: strfry\r\n";
output += std::string("Content-Length: ") + std::to_string(content.size()) + "\r\n";
output += "\r\n";
output += content;
return output;
};
void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
struct Connection {
uWS::WebSocket<uWS::SERVER> *websocket;
uint64_t connId;
uint64_t connectedTimestamp;
std::string ipAddr;
struct Stats {
uint64_t bytesUp = 0;
uint64_t bytesUpCompressed = 0;
uint64_t bytesDown = 0;
uint64_t bytesDownCompressed = 0;
} stats;
Connection(uWS::WebSocket<uWS::SERVER> *p, uint64_t connId_)
: websocket(p), connId(connId_), connectedTimestamp(hoytech::curr_time_us()) { }
Connection(const Connection &) = delete;
Connection(Connection &&) = delete;
};
uWS::Hub hub;
uWS::Group<uWS::SERVER> *hubGroup;
flat_hash_map<uint64_t, Connection*> connIdToConnection;
uint64_t nextConnectionId = 1;
std::string tempBuf;
tempBuf.reserve(cfg().events__maxEventSize + MAX_SUBID_SIZE + 100);
auto getServerInfoHttpResponse = [ver = uint64_t(0), rendered = std::string("")]() mutable {
if (ver != cfg().version()) {
rendered = preGenerateHttpResponse("application/json", tao::json::to_string(tao::json::value({
{ "name", cfg().relay__info__name },
{ "description", cfg().relay__info__description },
{ "pubkey", cfg().relay__info__pubkey },
{ "contact", cfg().relay__info__contact },
{ "supported_nips", tao::json::value::array({ 1, 9, 11, 12, 15, 16, 20, 22 }) },
{ "software", "git+https://github.com/hoytech/strfry.git" },
{ "version", APP_GIT_VERSION },
})));
ver = cfg().version();
}
return std::string_view(rendered); // memory only valid until next call
};
const std::string defaultHttpResponse = preGenerateHttpResponse("text/plain", "Please use a Nostr client to connect.");
{
int extensionOptions = 0;
if (cfg().relay__compression__enabled) extensionOptions |= uWS::PERMESSAGE_DEFLATE;
if (cfg().relay__compression__slidingWindow) extensionOptions |= uWS::SLIDING_DEFLATE_WINDOW;
hubGroup = hub.createGroup<uWS::SERVER>(extensionOptions, cfg().relay__maxWebsocketPayloadSize);
}
if (cfg().relay__autoPingSeconds) hubGroup->startAutoPing(cfg().relay__autoPingSeconds * 1'000);
hubGroup->onHttpRequest([&](uWS::HttpResponse *res, uWS::HttpRequest req, char *data, size_t length, size_t remainingBytes){
LI << "HTTP request for [" << req.getUrl().toString() << "]";
if (req.getHeader("accept").toString() == "application/nostr+json") {
auto info = getServerInfoHttpResponse();
res->write(info.data(), info.size());
} else {
res->write(defaultHttpResponse.data(), defaultHttpResponse.size());
}
});
hubGroup->onConnection([&](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req) {
uint64_t connId = nextConnectionId++;
Connection *c = new Connection(ws, connId);
if (cfg().relay__realIpHeader.size()) {
auto header = req.getHeader(cfg().relay__realIpHeader.c_str()).toString();
c->ipAddr = parseIP(header);
if (c->ipAddr.size() == 0) LW << "Couldn't parse IP from header " << cfg().relay__realIpHeader << ": " << header;
}
if (c->ipAddr.size() == 0) c->ipAddr = ws->getAddressBytes();
ws->setUserData((void*)c);
connIdToConnection.emplace(connId, c);
bool compEnabled, compSlidingWindow;
ws->getCompressionState(compEnabled, compSlidingWindow);
LI << "[" << connId << "] Connect from " << renderIP(c->ipAddr)
<< " compression=" << (compEnabled ? 'Y' : 'N')
<< " sliding=" << (compSlidingWindow ? 'Y' : 'N')
;
if (cfg().relay__enableTcpKeepalive) {
int optval = 1;
if (setsockopt(ws->getFd(), SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))) {
LW << "Failed to enable TCP keepalive: " << strerror(errno);
}
}
});
hubGroup->onDisconnection([&](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) {
auto *c = (Connection*)ws->getUserData();
uint64_t connId = c->connId;
auto upComp = renderPercent(1.0 - (double)c->stats.bytesUpCompressed / c->stats.bytesUp);
auto downComp = renderPercent(1.0 - (double)c->stats.bytesDownCompressed / c->stats.bytesDown);
LI << "[" << connId << "] Disconnect from " << renderIP(c->ipAddr)
<< " UP: " << renderSize(c->stats.bytesUp) << " (" << upComp << " compressed)"
<< " DN: " << renderSize(c->stats.bytesDown) << " (" << downComp << " compressed)"
;
tpIngester.dispatch(connId, MsgIngester{MsgIngester::CloseConn{connId}});
connIdToConnection.erase(connId);
delete c;
});
hubGroup->onMessage2([&](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) {
auto &c = *(Connection*)ws->getUserData();
c.stats.bytesDown += length;
c.stats.bytesDownCompressed += compressedSize;
tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, c.ipAddr, std::string(message, length)}});
});
std::function<void()> asyncCb = [&]{
auto newMsgs = thr.inbox.pop_all_no_wait();
auto doSend = [&](uint64_t connId, std::string_view payload, uWS::OpCode opCode){
auto it = connIdToConnection.find(connId);
if (it == connIdToConnection.end()) return;
auto &c = *it->second;
size_t compressedSize;
auto cb = [](uWS::WebSocket<uWS::SERVER> *webSocket, void *data, bool cancelled, void *reserved){};
c.websocket->send(payload.data(), payload.size(), opCode, cb, nullptr, true, &compressedSize);
c.stats.bytesUp += payload.size();
c.stats.bytesUpCompressed += compressedSize;
};
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWebsocket::Send>(&newMsg.msg)) {
doSend(msg->connId, msg->payload, uWS::OpCode::TEXT);
} else if (auto msg = std::get_if<MsgWebsocket::SendBinary>(&newMsg.msg)) {
doSend(msg->connId, msg->payload, uWS::OpCode::BINARY);
} else if (auto msg = std::get_if<MsgWebsocket::SendEventToBatch>(&newMsg.msg)) {
tempBuf.reserve(13 + MAX_SUBID_SIZE + msg->evJson.size());
tempBuf.resize(10 + MAX_SUBID_SIZE);
tempBuf += "\",";
tempBuf += msg->evJson;
tempBuf += "]";
for (auto &item : msg->list) {
auto subIdSv = item.subId.sv();
auto *p = tempBuf.data() + MAX_SUBID_SIZE - subIdSv.size();
memcpy(p, "[\"EVENT\",\"", 10);
memcpy(p + 10, subIdSv.data(), subIdSv.size());
doSend(item.connId, std::string_view(p, 13 + subIdSv.size() + msg->evJson.size()), uWS::OpCode::TEXT);
}
}
}
};
hubTrigger = std::make_unique<uS::Async>(hub.getLoop());
hubTrigger->setData(&asyncCb);
hubTrigger->start([](uS::Async *a){
auto *r = static_cast<std::function<void()> *>(a->data);
(*r)();
});
int port = cfg().relay__port;
std::string bindHost = cfg().relay__bind;
if (!hub.listen(bindHost.c_str(), port, nullptr, uS::REUSE_PORT, hubGroup)) throw herr("unable to listen on port ", port);
LI << "Started websocket server on " << bindHost << ":" << port;
hub.run();
}

View File

@ -0,0 +1,86 @@
#include "RelayServer.h"
#include "PluginWritePolicy.h"
void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
PluginWritePolicy writePolicy;
while(1) {
auto newMsgs = thr.inbox.pop_all();
// Prepare messages
std::vector<EventToWrite> newEvents;
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
tao::json::value evJson = tao::json::from_string(msg->jsonStr);
EventSourceType sourceType = msg->ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6;
std::string okMsg;
auto res = writePolicy.acceptEvent(evJson, msg->receivedAt, sourceType, msg->ipAddr, okMsg);
if (res == WritePolicyResult::Accept) {
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, sourceType, std::move(msg->ipAddr), msg);
} else {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(msg->flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
LI << "[" << msg->connId << "] write policy blocked event " << eventIdHex << ": " << okMsg;
sendOKResponse(msg->connId, eventIdHex, res == WritePolicyResult::ShadowReject, okMsg);
}
}
}
try {
auto txn = env.txn_rw();
writeEvents(txn, newEvents);
txn.commit();
} catch (std::exception &e) {
LE << "Error writing " << newEvents.size() << " events: " << e.what();
for (auto &newEvent : newEvents) {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(newEvent.flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
MsgWriter::AddEvent *addEventMsg = static_cast<MsgWriter::AddEvent*>(newEvent.userData);
std::string message = "Write error: ";
message += e.what();
sendOKResponse(addEventMsg->connId, eventIdHex, false, message);
}
continue;
}
// Log
for (auto &newEvent : newEvents) {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(newEvent.flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
std::string message;
bool written = false;
if (newEvent.status == EventWriteStatus::Written) {
LI << "Inserted event. id=" << eventIdHex << " levId=" << newEvent.levId;
written = true;
} else if (newEvent.status == EventWriteStatus::Duplicate) {
message = "duplicate: have this event";
written = true;
} else if (newEvent.status == EventWriteStatus::Replaced) {
message = "replaced: have newer event";
} else if (newEvent.status == EventWriteStatus::Deleted) {
message = "deleted: user requested deletion";
}
if (newEvent.status != EventWriteStatus::Written) {
LI << "Rejected event. " << message << ", id=" << eventIdHex;
}
MsgWriter::AddEvent *addEventMsg = static_cast<MsgWriter::AddEvent*>(newEvent.userData);
sendOKResponse(addEventMsg->connId, eventIdHex, written, message);
}
}
}

View File

@ -0,0 +1,51 @@
#include "RelayServer.h"
void cmd_relay(const std::vector<std::string> &subArgs) {
RelayServer s;
s.run();
}
void RelayServer::run() {
tpWebsocket.init("Websocket", 1, [this](auto &thr){
runWebsocket(thr);
});
tpIngester.init("Ingester", cfg().relay__numThreads__ingester, [this](auto &thr){
runIngester(thr);
});
tpWriter.init("Writer", 1, [this](auto &thr){
runWriter(thr);
});
tpReqWorker.init("ReqWorker", cfg().relay__numThreads__reqWorker, [this](auto &thr){
runReqWorker(thr);
});
tpReqMonitor.init("ReqMonitor", cfg().relay__numThreads__reqMonitor, [this](auto &thr){
runReqMonitor(thr);
});
tpNegentropy.init("Negentropy", cfg().relay__numThreads__negentropy, [this](auto &thr){
runNegentropy(thr);
});
cronThread = std::thread([this]{
runCron();
});
// Monitor for config file reloads
auto configFileChangeWatcher = hoytech::file_change_monitor(configFile);
configFileChangeWatcher.setDebounce(100);
configFileChangeWatcher.run([&](){
loadConfig(configFile);
});
tpWebsocket.join();
}