initial commit

This commit is contained in:
Doug Hoyte
2022-12-19 14:42:40 -05:00
commit c47d07e985
42 changed files with 4652 additions and 0 deletions

227
src/ActiveMonitors.h Normal file
View File

@ -0,0 +1,227 @@
#pragma once
#include "golpe.h"
#include "Subscription.h"
#include "filters.h"
struct ActiveMonitors : NonCopyable {
private:
struct Monitor : NonCopyable {
Subscription sub;
Monitor(Subscription &sub_) : sub(std::move(sub_)) {}
};
using ConnMonitor = std::map<SubId, Monitor>;
std::map<uint64_t, ConnMonitor> conns; // connId -> subId -> Monitor
struct MonitorItem {
Monitor *mon;
uint64_t latestEventId;
};
using MonitorSet = std::map<NostrFilter*, MonitorItem>; // FIXME: flat_map here
std::map<std::string, MonitorSet> allIds;
std::map<std::string, MonitorSet> allAuthors;
std::map<std::string, MonitorSet> allTags;
std::map<uint64_t, MonitorSet> allKinds;
MonitorSet allOthers;
public:
void addSub(lmdb::txn &txn, Subscription &&sub, uint64_t currEventId) {
if (sub.latestEventId != currEventId) throw herr("sub not up to date");
{
auto *existing = findMonitor(sub.connId, sub.subId);
if (existing) removeSub(sub.connId, sub.subId);
}
auto res = conns.try_emplace(sub.connId);
auto &connMonitors = res.first->second;
auto subId = sub.subId;
auto *m = &connMonitors.try_emplace(subId, sub).first->second;
installLookups(m, currEventId);
}
void removeSub(uint64_t connId, const SubId &subId) {
auto *monitor = findMonitor(connId, subId);
if (!monitor) return;
uninstallLookups(monitor);
conns[connId].erase(subId);
if (conns[connId].empty()) conns.erase(connId);
}
void closeConn(uint64_t connId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return;
for (auto &[k, v] : f1->second) uninstallLookups(&v);
conns.erase(connId);
}
void process(lmdb::txn &txn, defaultDb::environment::View_Event &ev, std::function<void(RecipientList &&, uint64_t)> cb) {
RecipientList recipients;
auto processMonitorSet = [&](MonitorSet &ms){
for (auto &[f, item] : ms) {
if (item.latestEventId >= ev.primaryKeyId || item.mon->sub.latestEventId >= ev.primaryKeyId) continue;
item.latestEventId = ev.primaryKeyId;
if (f->doesMatch(ev.flat_nested())) {
recipients.emplace_back(item.mon->sub.connId, item.mon->sub.subId);
item.mon->sub.latestEventId = ev.primaryKeyId;
continue;
}
}
};
auto processMonitorsPrefix = [&](std::map<std::string, MonitorSet> &m, const std::string &key, std::function<bool(const std::string&)> matches){
auto it = m.lower_bound(key.substr(0, 1));
if (it == m.end()) return;
while (it != m.end() && it->first[0] == key[0]) {
if (matches(it->first)) processMonitorSet(it->second);
it = std::next(it);
}
};
auto processMonitorsExact = [&]<typename T>(std::map<T, MonitorSet> &m, const T &key, std::function<bool(const T &)> matches){
auto it = m.upper_bound(key);
if (it == m.begin()) return;
it = std::prev(it);
while (matches(it->first)) {
processMonitorSet(it->second);
if (it == m.begin()) break;
it = std::prev(it);
}
};
auto *flat = ev.flat_nested();
{
auto id = std::string(sv(flat->id()));
processMonitorsPrefix(allIds, id, static_cast<std::function<bool(const std::string&)>>([&](const std::string &val){
return id.starts_with(val);
}));
}
{
auto pubkey = std::string(sv(flat->pubkey()));
processMonitorsPrefix(allAuthors, pubkey, static_cast<std::function<bool(const std::string&)>>([&](const std::string &val){
return pubkey.starts_with(val);
}));
}
for (const auto &tag : *flat->tags()) {
// FIXME: can avoid this allocation:
auto tagSpec = std::string(1, (char)tag->key()) + std::string(sv(tag->val()));
processMonitorsExact(allTags, tagSpec, static_cast<std::function<bool(const std::string&)>>([&](const std::string &val){
return tagSpec == val;
}));
}
{
auto kind = flat->kind();
processMonitorsExact(allKinds, kind, static_cast<std::function<bool(const uint64_t&)>>([&](const uint64_t &val){
return kind == val;
}));
}
processMonitorSet(allOthers);
if (recipients.size()) {
cb(std::move(recipients), ev.primaryKeyId);
}
}
private:
Monitor *findMonitor(uint64_t connId, const SubId &subId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return nullptr;
auto f2 = f1->second.find(subId);
if (f2 == f1->second.end()) return nullptr;
return &f2->second;
}
void installLookups(Monitor *m, uint64_t currEventId) {
for (auto &f : m->sub.filterGroup.filters) {
if (f.ids.size()) {
for (size_t i = 0; i < f.ids.size(); i++) {
auto res = allIds.try_emplace(f.ids.at(i));
res.first->second.try_emplace(&f, MonitorItem{m, currEventId});
}
} else if (f.authors.size()) {
for (size_t i = 0; i < f.authors.size(); i++) {
auto res = allAuthors.try_emplace(f.authors.at(i));
res.first->second.try_emplace(&f, MonitorItem{m, currEventId});
}
} else if (f.tags.size()) {
for (const auto &[tagName, filterSet] : f.tags) {
for (size_t i = 0; i < filterSet.size(); i++) {
std::string tagSpec = std::string(1, tagName) + filterSet.at(i);
auto res = allTags.try_emplace(tagSpec);
res.first->second.try_emplace(&f, MonitorItem{m, currEventId});
}
}
} else if (f.kinds.size()) {
for (size_t i = 0; i < f.kinds.size(); i++) {
auto res = allKinds.try_emplace(f.kinds.at(i));
res.first->second.try_emplace(&f, MonitorItem{m, currEventId});
}
} else {
allOthers.try_emplace(&f, MonitorItem{m, currEventId});
}
}
}
void uninstallLookups(Monitor *m) {
for (auto &f : m->sub.filterGroup.filters) {
if (f.ids.size()) {
for (size_t i = 0; i < f.ids.size(); i++) {
auto &monSet = allIds.at(f.ids.at(i));
monSet.erase(&f);
if (monSet.empty()) allIds.erase(f.ids.at(i));
}
} else if (f.authors.size()) {
for (size_t i = 0; i < f.authors.size(); i++) {
auto &monSet = allAuthors.at(f.authors.at(i));
monSet.erase(&f);
if (monSet.empty()) allAuthors.erase(f.authors.at(i));
}
} else if (f.tags.size()) {
for (const auto &[tagName, filterSet] : f.tags) {
for (size_t i = 0; i < filterSet.size(); i++) {
std::string tagSpec = std::string(1, tagName) + filterSet.at(i);
auto &monSet = allTags.at(tagSpec);
monSet.erase(&f);
if (monSet.empty()) allTags.erase(tagSpec);
}
}
} else if (f.kinds.size()) {
for (size_t i = 0; i < f.kinds.size(); i++) {
auto &monSet = allKinds.at(f.kinds.at(i));
monSet.erase(&f);
if (monSet.empty()) allKinds.erase(f.kinds.at(i));
}
} else {
allOthers.erase(&f);
}
}
}
};

328
src/DBScan.h Normal file
View File

@ -0,0 +1,328 @@
#pragma once
#include "golpe.h"
#include "Subscription.h"
#include "filters.h"
struct DBScan {
const NostrFilter &f;
uint64_t remainingLimit;
struct NullState {
};
struct IdScan {
size_t index = 0;
std::string prefix;
};
struct PubkeyKindScan {
size_t indexAuthor = 0;
size_t indexKind = 0;
std::string prefix;
};
struct PubkeyScan {
size_t index = 0;
std::string prefix;
};
struct TagScan {
std::map<char, FilterSetBytes>::const_iterator indexTagName;
size_t indexTagVal = 0;
std::string search;
};
struct KindScan {
size_t index = 0;
uint64_t kind;
};
struct CreatedAtScan {
bool done = false;
};
std::variant<NullState, IdScan, PubkeyKindScan, PubkeyScan, TagScan, KindScan, CreatedAtScan> scanState = NullState{};
lmdb::dbi indexDbi;
std::string resumeKey;
uint64_t resumeVal;
std::function<bool()> isComplete;
std::function<void()> nextFilterItem;
std::function<void()> resetResume;
std::function<bool(std::string_view, bool&)> keyMatch;
DBScan(const NostrFilter &f_) : f(f_) {
remainingLimit = f.limit;
if (f.ids.size()) {
LI << "ID Scan";
scanState = IdScan{};
auto *state = std::get_if<IdScan>(&scanState);
indexDbi = env.dbi_Event__id;
isComplete = [&, state]{
return state->index >= f.ids.size();
};
nextFilterItem = [&, state]{
state->index++;
};
resetResume = [&, state]{
state->prefix = f.ids.at(state->index);
resumeKey = padBytes(state->prefix, 32 + 8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return k.starts_with(state->prefix);
};
} else if (f.authors.size() && f.kinds.size()) {
LI << "PubkeyKind Scan";
scanState = PubkeyKindScan{};
auto *state = std::get_if<PubkeyKindScan>(&scanState);
indexDbi = env.dbi_Event__pubkeyKind;
isComplete = [&, state]{
return state->indexAuthor >= f.authors.size();
};
nextFilterItem = [&, state]{
state->indexKind++;
if (state->indexKind >= f.kinds.size()) {
state->indexAuthor++;
state->indexKind = 0;
}
};
resetResume = [&, state]{
state->prefix = f.authors.at(state->indexAuthor);
if (state->prefix.size() == 32) state->prefix += lmdb::to_sv<uint64_t>(f.kinds.at(state->indexKind));
resumeKey = padBytes(state->prefix, 32 + 8 + 8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool &skipBack){
if (!k.starts_with(state->prefix)) return false;
if (state->prefix.size() == 32 + 8) return true;
ParsedKey_StringUint64Uint64 parsedKey(k);
if (parsedKey.n1 <= f.kinds.at(state->indexKind)) return true;
resumeKey = makeKey_StringUint64Uint64(parsedKey.s, f.kinds.at(state->indexKind), MAX_U64);
resumeVal = MAX_U64;
skipBack = true;
return false;
};
} else if (f.authors.size()) {
LI << "Pubkey Scan";
scanState = PubkeyScan{};
auto *state = std::get_if<PubkeyScan>(&scanState);
indexDbi = env.dbi_Event__pubkey;
isComplete = [&, state]{
return state->index >= f.authors.size();
};
nextFilterItem = [&, state]{
state->index++;
};
resetResume = [&, state]{
state->prefix = f.authors.at(state->index);
resumeKey = padBytes(state->prefix, 32 + 8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return k.starts_with(state->prefix);
};
} else if (f.tags.size()) {
LI << "Tag Scan";
scanState = TagScan{f.tags.begin()};
auto *state = std::get_if<TagScan>(&scanState);
indexDbi = env.dbi_Event__tag;
isComplete = [&, state]{
return state->indexTagName == f.tags.end();
};
nextFilterItem = [&, state]{
state->indexTagVal++;
if (state->indexTagVal >= state->indexTagName->second.size()) {
state->indexTagName = std::next(state->indexTagName);
state->indexTagVal = 0;
}
};
resetResume = [&, state]{
state->search = state->indexTagName->first;
state->search += state->indexTagName->second.at(state->indexTagVal);
resumeKey = state->search + std::string(8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return k.substr(0, state->search.size()) == state->search;
};
} else if (f.kinds.size()) {
LI << "Kind Scan";
scanState = KindScan{};
auto *state = std::get_if<KindScan>(&scanState);
indexDbi = env.dbi_Event__kind;
isComplete = [&, state]{
return state->index >= f.kinds.size();
};
nextFilterItem = [&, state]{
state->index++;
};
resetResume = [&, state]{
state->kind = f.kinds.at(state->index);
resumeKey = std::string(lmdb::to_sv<uint64_t>(state->kind)) + std::string(8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
ParsedKey_Uint64Uint64 parsedKey(k);
return parsedKey.n1 == state->kind;
};
} else {
LI << "CreatedAt Scan";
scanState = CreatedAtScan{};
auto *state = std::get_if<CreatedAtScan>(&scanState);
indexDbi = env.dbi_Event__created_at;
isComplete = [&, state]{
return state->done;
};
nextFilterItem = [&, state]{
state->done = true;
};
resetResume = [&, state]{
resumeKey = std::string(8, '\xFF');
resumeVal = MAX_U64;
};
keyMatch = [&, state](std::string_view k, bool&){
return true;
};
}
}
// If scan is complete, returns true
bool scan(lmdb::txn &txn, std::function<void(uint64_t)> handleEvent, std::function<bool()> doPause) {
while (remainingLimit && !isComplete()) {
if (resumeKey == "") resetResume();
bool pause = false, skipBack = false;
env.generic_foreachFull(txn, indexDbi, resumeKey, lmdb::to_sv<uint64_t>(resumeVal), [&](auto k, auto v) {
if (doPause()) {
resumeKey = std::string(k);
resumeVal = lmdb::from_sv<uint64_t>(v);
LI << "SAVING resumeKey: " << to_hex(resumeKey) << " / " << resumeVal;
pause = true;
return false;
}
if (!keyMatch(k, skipBack)) return false;
uint64_t created;
{
ParsedKey_StringUint64 parsedKey(k);
created = parsedKey.n;
if ((f.since && created < f.since)) {
resumeKey = makeKey_StringUint64(parsedKey.s, 0);
resumeVal = 0;
skipBack = true;
return false;
}
if (f.until && created > f.until) {
resumeKey = makeKey_StringUint64(parsedKey.s, f.until);
resumeVal = MAX_U64;
skipBack = true;
return false;
}
}
bool sent = false;
uint64_t quadId = lmdb::from_sv<uint64_t>(v);
if (f.indexOnlyScans) {
if (f.doesMatchTimes(created)) {
handleEvent(quadId);
sent = true;
}
} else {
auto view = env.lookup_Event(txn, quadId);
if (!view) throw herr("missing event from index, corrupt DB?");
if (f.doesMatch(view->flat_nested())) {
handleEvent(quadId);
sent = true;
}
}
if (sent) {
if (remainingLimit) remainingLimit--;
if (!remainingLimit) return false;
}
return true;
}, true);
if (pause) return false;
if (!skipBack) {
nextFilterItem();
resumeKey = "";
}
}
return true;
}
std::string padBytes(std::string_view str, size_t n, char padChar) {
if (str.size() > n) throw herr("unable to pad, string longer than expected");
return std::string(str) + std::string(n - str.size(), padChar);
}
};
struct DBScanQuery : NonCopyable {
Subscription sub;
std::unique_ptr<DBScan> scanner;
size_t filterGroupIndex = 0;
bool dead = false;
std::unordered_set<uint64_t> alreadySentEvents; // FIXME: flat_set here, or roaring bitmap/judy/whatever
DBScanQuery(Subscription &sub_) : sub(std::move(sub_)) {}
// If scan is complete, returns true
bool process(lmdb::txn &txn, uint64_t timeBudgetMicroseconds, std::function<void(const Subscription &, uint64_t)> cb) {
uint64_t startTime = hoytech::curr_time_us();
while (filterGroupIndex < sub.filterGroup.size()) {
if (!scanner) scanner = std::make_unique<DBScan>(sub.filterGroup.filters[filterGroupIndex]);
bool complete = scanner->scan(txn, [&](uint64_t quadId){
// If this event came in after our query began, don't send it. It will be sent after the EOSE.
if (quadId > sub.latestEventId) return;
// We already sent this event
if (alreadySentEvents.find(quadId) != alreadySentEvents.end()) return;
alreadySentEvents.insert(quadId);
cb(sub, quadId);
}, [&]{
return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds;
});
if (!complete) return false;
filterGroupIndex++;
scanner.reset();
}
return true;
}
};

72
src/RelayCron.cpp Normal file
View File

@ -0,0 +1,72 @@
#include "RelayServer.h"
void RelayServer::cleanupOldEvents() {
struct EventDel {
uint64_t nodeId;
uint64_t deletedNodeId;
};
std::vector<EventDel> expiredEvents;
{
auto txn = env.txn_ro();
auto mostRecent = getMostRecentEventId(txn);
uint64_t cutoff = hoytech::curr_time_s() - cfg().events__ephemeralEventsLifetimeSeconds;
uint64_t currKind = 20'000;
while (currKind < 30'000) {
uint64_t numRecs = 0;
env.generic_foreachFull(txn, env.dbi_Event__kind, makeKey_Uint64Uint64(currKind, 0), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
numRecs++;
ParsedKey_Uint64Uint64 parsedKey(k);
currKind = parsedKey.n1;
if (currKind >= 30'000) return false;
if (parsedKey.n2 > cutoff) {
currKind++;
return false;
}
uint64_t nodeId = lmdb::from_sv<uint64_t>(v);
if (nodeId != mostRecent) { // prevent nodeId re-use
expiredEvents.emplace_back(nodeId, 0);
}
return true;
});
if (numRecs == 0) break;
}
}
if (expiredEvents.size() > 0) {
LI << "Deleting " << expiredEvents.size() << " old events";
auto txn = env.txn_rw();
quadrable::Quadrable qdb;
qdb.init(txn);
qdb.checkout("events");
auto changes = qdb.change();
for (auto &e : expiredEvents) {
auto view = env.lookup_Event(txn, e.nodeId);
if (!view) throw herr("missing event from index, corrupt DB?");
changes.del(flatEventToQuadrableKey(view->flat_nested()), &e.deletedNodeId);
}
changes.apply(txn);
for (auto &e : expiredEvents) {
if (e.deletedNodeId) env.delete_Event(txn, e.nodeId);
}
txn.commit();
}
}

109
src/RelayIngester.cpp Normal file
View File

@ -0,0 +1,109 @@
#include "RelayServer.h"
void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
while(1) {
auto newMsgs = thr.inbox.pop_all();
auto txn = env.txn_ro();
std::vector<MsgWriter> writerMsgs;
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgIngester::ClientMessage>(&newMsg.msg)) {
try {
if (msg->payload.starts_with('[')) {
auto payload = tao::json::from_string(msg->payload);
if (!payload.is_array()) throw herr("message is not an array");
auto &arr = payload.get_array();
if (arr.size() < 2) throw herr("bad message");
auto &cmd = arr[0].get_string();
if (cmd == "EVENT") {
try {
ingesterProcessEvent(txn, msg->connId, secpCtx, arr[1], writerMsgs);
} catch (std::exception &e) {
sendOKResponse(msg->connId, arr[1].at("id").get_string(), false, std::string("invalid: ") + e.what());
LI << "Rejected invalid event: " << e.what();
}
} else if (cmd == "REQ") {
try {
ingesterProcessReq(txn, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad req: ") + e.what());
}
} else if (cmd == "CLOSE") {
try {
ingesterProcessClose(txn, msg->connId, arr);
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad close: ") + e.what());
}
} else {
throw herr("unknown cmd");
}
} else if (msg->payload.starts_with("Y")) {
verifyYesstrRequest(msg->payload);
auto *req = parseYesstrRequest(msg->payload);
if (req->payload_type() == Yesstr::RequestPayload::RequestPayload_RequestSync) {
tpYesstr.dispatch(msg->connId, MsgYesstr{MsgYesstr::SyncRequest{ msg->connId, std::move(msg->payload) }});
} else {
throw herr("unrecognised yesstr request");
}
} else {
throw herr("unparseable message");
}
} catch (std::exception &e) {
sendNoticeError(msg->connId, std::string("bad msg: ") + e.what());
}
} else if (auto msg = std::get_if<MsgIngester::CloseConn>(&newMsg.msg)) {
auto connId = msg->connId;
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::CloseConn{connId}});
tpYesstr.dispatch(connId, MsgYesstr{MsgYesstr::CloseConn{connId}});
}
}
if (writerMsgs.size()) {
tpWriter.dispatchMulti(0, writerMsgs);
}
}
}
void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output) {
std::string flatStr, jsonStr;
parseAndVerifyEvent(origJson, secpCtx, true, true, flatStr, jsonStr);
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
{
auto existing = lookupEventById(txn, sv(flat->id()));
if (existing) {
LI << "Duplicate event, skipping";
sendOKResponse(connId, to_hex(sv(flat->id())), false, "duplicate: have this event");
return;
}
}
output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}});
}
void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
if (arr.get_array().size() < 2 + 1) throw herr("arr too small");
if (arr.get_array().size() > 2 + 20) throw herr("arr too big");
Subscription sub(connId, arr[1].get_string(), NostrFilterGroup(arr));
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::NewSub{std::move(sub)}});
}
void RelayServer::ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
if (arr.get_array().size() != 2) throw herr("arr too small/big");
tpReqWorker.dispatch(connId, MsgReqWorker{MsgReqWorker::RemoveSub{connId, SubId(arr[1].get_string())}});
}

57
src/RelayReqMonitor.cpp Normal file
View File

@ -0,0 +1,57 @@
#include "RelayServer.h"
#include "ActiveMonitors.h"
void RelayServer::runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr) {
auto dbChangeWatcher = hoytech::file_change_monitor(dbDir + "/data.mdb");
dbChangeWatcher.setDebounce(100);
dbChangeWatcher.run([&](){
tpReqMonitor.dispatchToAll([]{ return MsgReqMonitor{MsgReqMonitor::DBChange{}}; });
});
ActiveMonitors monitors;
uint64_t currEventId = MAX_U64;
while (1) {
auto newMsgs = thr.inbox.pop_all();
auto txn = env.txn_ro();
uint64_t latestEventId = getMostRecentEventId(txn);
if (currEventId > latestEventId) currEventId = latestEventId;
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgReqMonitor::NewSub>(&newMsg.msg)) {
env.foreach_Event(txn, [&](auto &ev){
if (msg->sub.filterGroup.doesMatch(ev.flat_nested())) {
sendEvent(msg->sub.connId, msg->sub.subId, getEventJson(txn, ev.primaryKeyId));
}
return true;
}, false, msg->sub.latestEventId + 1);
msg->sub.latestEventId = latestEventId;
monitors.addSub(txn, std::move(msg->sub), latestEventId);
} else if (auto msg = std::get_if<MsgReqMonitor::RemoveSub>(&newMsg.msg)) {
monitors.removeSub(msg->connId, msg->subId);
} else if (auto msg = std::get_if<MsgReqMonitor::CloseConn>(&newMsg.msg)) {
monitors.closeConn(msg->connId);
} else if (std::get_if<MsgReqMonitor::DBChange>(&newMsg.msg)) {
env.foreach_Event(txn, [&](auto &ev){
monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t quadId){
sendEventToBatch(std::move(recipients), std::string(getEventJson(txn, quadId)));
});
return true;
}, false, currEventId + 1);
currEventId = latestEventId;
}
}
}
}

111
src/RelayReqWorker.cpp Normal file
View File

@ -0,0 +1,111 @@
#include "RelayServer.h"
#include "DBScan.h"
struct ActiveQueries : NonCopyable {
using ConnQueries = std::map<SubId, DBScanQuery*>;
std::map<uint64_t, ConnQueries> conns; // connId -> subId -> DBScanQuery*
std::deque<DBScanQuery*> running;
void addSub(lmdb::txn &txn, Subscription &&sub) {
sub.latestEventId = getMostRecentEventId(txn);
{
auto *existing = findQuery(sub.connId, sub.subId);
if (existing) removeSub(sub.connId, sub.subId);
}
auto res = conns.try_emplace(sub.connId);
auto &connQueries = res.first->second;
DBScanQuery *q = new DBScanQuery(sub);
connQueries.try_emplace(q->sub.subId, q);
running.push_front(q);
}
DBScanQuery *findQuery(uint64_t connId, const SubId &subId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return nullptr;
auto f2 = f1->second.find(subId);
if (f2 == f1->second.end()) return nullptr;
return f2->second;
}
void removeSub(uint64_t connId, const SubId &subId) {
auto *query = findQuery(connId, subId);
if (!query) return;
query->dead = true;
conns[connId].erase(subId);
if (conns[connId].empty()) conns.erase(connId);
}
void closeConn(uint64_t connId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return;
for (auto &[k, v] : f1->second) v->dead = true;
conns.erase(connId);
}
void process(RelayServer *server, lmdb::txn &txn) {
if (running.empty()) return;
DBScanQuery *q = running.front();
running.pop_front();
if (q->dead) {
delete q;
return;
}
bool complete = q->process(txn, cfg().relay__queryTimesliceBudgetMicroseconds, [&](const auto &sub, uint64_t quadId){
server->sendEvent(sub.connId, sub.subId, getEventJson(txn, quadId));
});
if (complete) {
auto connId = q->sub.connId;
server->sendToConn(connId, tao::json::to_string(tao::json::value::array({ "EOSE", q->sub.subId.str() })));
removeSub(connId, q->sub.subId);
server->tpReqMonitor.dispatch(connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(q->sub)}});
delete q;
} else {
running.push_back(q);
}
}
};
void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
ActiveQueries queries;
while(1) {
auto newMsgs = queries.running.empty() ? thr.inbox.pop_all() : thr.inbox.pop_all_no_wait();
auto txn = env.txn_ro();
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgReqWorker::NewSub>(&newMsg.msg)) {
queries.addSub(txn, std::move(msg->sub));
queries.process(this, txn);
} else if (auto msg = std::get_if<MsgReqWorker::RemoveSub>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
tpReqMonitor.dispatch(msg->connId, MsgReqMonitor{MsgReqMonitor::RemoveSub{msg->connId, msg->subId}});
} else if (auto msg = std::get_if<MsgReqWorker::CloseConn>(&newMsg.msg)) {
queries.closeConn(msg->connId);
tpReqMonitor.dispatch(msg->connId, MsgReqMonitor{MsgReqMonitor::CloseConn{msg->connId}});
}
}
queries.process(this, txn);
txn.abort();
}
}

207
src/RelayServer.h Normal file
View File

@ -0,0 +1,207 @@
#pragma once
#include <iostream>
#include <memory>
#include <algorithm>
#include <hoytech/timer.h>
#include <hoytech/time.h>
#include <hoytech/hex.h>
#include <hoytech/file_change_monitor.h>
#include <uWebSockets/src/uWS.h>
#include <tao/json.hpp>
#include <quadrable.h>
#include "golpe.h"
#include "Subscription.h"
#include "ThreadPool.h"
#include "events.h"
#include "filters.h"
#include "yesstr.h"
struct MsgWebsocket : NonCopyable {
struct Send {
uint64_t connId;
std::string payload;
};
struct SendBinary {
uint64_t connId;
std::string payload;
};
struct SendEventToBatch {
RecipientList list;
std::string evJson;
};
using Var = std::variant<Send, SendBinary, SendEventToBatch>;
Var msg;
MsgWebsocket(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgIngester : NonCopyable {
struct ClientMessage {
uint64_t connId;
std::string payload;
};
struct CloseConn {
uint64_t connId;
};
using Var = std::variant<ClientMessage, CloseConn>;
Var msg;
MsgIngester(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgWriter : NonCopyable {
struct AddEvent {
uint64_t connId;
uint64_t receivedAt;
std::string flatStr;
std::string jsonStr;
};
using Var = std::variant<AddEvent>;
Var msg;
MsgWriter(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgReqWorker : NonCopyable {
struct NewSub {
Subscription sub;
};
struct RemoveSub {
uint64_t connId;
SubId subId;
};
struct CloseConn {
uint64_t connId;
};
using Var = std::variant<NewSub, RemoveSub, CloseConn>;
Var msg;
MsgReqWorker(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgReqMonitor : NonCopyable {
struct NewSub {
Subscription sub;
};
struct RemoveSub {
uint64_t connId;
SubId subId;
};
struct CloseConn {
uint64_t connId;
};
struct DBChange {
};
using Var = std::variant<NewSub, RemoveSub, CloseConn, DBChange>;
Var msg;
MsgReqMonitor(Var &&msg_) : msg(std::move(msg_)) {}
};
struct MsgYesstr : NonCopyable {
struct SyncRequest {
uint64_t connId;
std::string yesstrMessage;
};
struct CloseConn {
uint64_t connId;
};
using Var = std::variant<SyncRequest, CloseConn>;
Var msg;
MsgYesstr(Var &&msg_) : msg(std::move(msg_)) {}
};
struct RelayServer {
std::unique_ptr<uS::Async> hubTrigger;
// Thread Pools
ThreadPool<MsgWebsocket> tpWebsocket;
ThreadPool<MsgIngester> tpIngester;
ThreadPool<MsgWriter> tpWriter;
ThreadPool<MsgReqWorker> tpReqWorker;
ThreadPool<MsgReqMonitor> tpReqMonitor;
ThreadPool<MsgYesstr> tpYesstr;
hoytech::timer cron;
void run();
void runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr);
void runIngester(ThreadPool<MsgIngester>::Thread &thr);
void ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output);
void ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void ingesterProcessClose(lmdb::txn &txn, uint64_t connId, const tao::json::value &origJson);
void runWriter(ThreadPool<MsgWriter>::Thread &thr);
void runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr);
void runReqMonitor(ThreadPool<MsgReqMonitor>::Thread &thr);
void runYesstr(ThreadPool<MsgYesstr>::Thread &thr);
void cleanupOldEvents();
// Utils (can be called by any thread)
void sendToConn(uint64_t connId, std::string &&payload) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(payload)}});
hubTrigger->send();
}
void sendToConn(uint64_t connId, std::string &payload) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(payload)}});
hubTrigger->send();
}
void sendToConnBinary(uint64_t connId, std::string &&payload) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::SendBinary{connId, std::move(payload)}});
hubTrigger->send();
}
void sendEvent(uint64_t connId, const SubId &subId, std::string_view evJson) {
std::string reply = std::string("[\"EVENT\",\"");
reply += subId.sv();
reply += "\",";
reply += evJson;
reply += "]";
sendToConn(connId, reply);
}
void sendEventToBatch(RecipientList &&list, std::string &&evJson) {
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::SendEventToBatch{std::move(list), std::move(evJson)}});
hubTrigger->send();
}
void sendNoticeError(uint64_t connId, std::string &&payload) {
LI << "sending error to [" << connId << "]: " << payload;
auto reply = tao::json::value::array({ "NOTICE", std::string("ERROR: ") + payload });
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(tao::json::to_string(reply))}});
hubTrigger->send();
}
void sendOKResponse(uint64_t connId, std::string_view eventIdHex, bool written, std::string_view message) {
auto reply = tao::json::value::array({ "OK", eventIdHex, written, message });
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(tao::json::to_string(reply))}});
hubTrigger->send();
}
};

172
src/RelayWebsocket.cpp Normal file
View File

@ -0,0 +1,172 @@
#include "RelayServer.h"
std::string preGenerateHttpResponse(const std::string &contentType, const std::string &content) {
std::string output = "HTTP/1.1 200 OK\r\n";
output += std::string("Content-Type: ") + contentType + "\r\n";
output += "Access-Control-Allow-Origin: *\r\n";
output += "Connection: keep-alive\r\n";
output += "Server: strfry\r\n";
output += std::string("Content-Length: ") + std::to_string(content.size()) + "\r\n";
output += "\r\n";
output += content;
return output;
};
void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
struct Connection {
uWS::WebSocket<uWS::SERVER> *websocket;
uint64_t connId;
uint64_t connectedTimestamp;
std::string ipAddr;
Connection(uWS::WebSocket<uWS::SERVER> *p, uint64_t connId_)
: websocket(p), connId(connId_), connectedTimestamp(hoytech::curr_time_us()) { }
Connection(const Connection &) = delete;
Connection(Connection &&) = delete;
};
uWS::Hub hub;
uWS::Group<uWS::SERVER> *hubGroup;
std::map<uint64_t, Connection*> connIdToConnection;
uint64_t nextConnectionId = 1;
std::string tempBuf;
tempBuf.reserve(cfg().events__maxEventSize + MAX_SUBID_SIZE + 100);
auto getServerInfoHttpResponse = [ver = uint64_t(0), rendered = std::string("")]() mutable {
if (ver != cfg().version()) {
rendered = preGenerateHttpResponse("application/json", tao::json::to_string(tao::json::value({
{ "name", cfg().relay__info__name },
{ "description", cfg().relay__info__description },
{ "pubkey", cfg().relay__info__pubkey },
{ "contact", cfg().relay__info__contact },
{ "supported_nips", tao::json::value::array({ 1, 9, 11, 12, 15, 16, 20, 22 }) },
{ "software", "git+https://github.com/hoytech/strfry.git" },
{ "version", GOLPE_GIT_VER },
})));
ver = cfg().version();
}
return std::string_view(rendered);
};
const std::string defaultHttpResponse = preGenerateHttpResponse("text/plain", "Please use a Nostr client to connect.");
hubGroup = hub.createGroup<uWS::SERVER>(uWS::PERMESSAGE_DEFLATE | uWS::SLIDING_DEFLATE_WINDOW, cfg().relay__maxWebsocketPayloadSize);
hubGroup->onHttpRequest([&](uWS::HttpResponse *res, uWS::HttpRequest req, char *data, size_t length, size_t remainingBytes){
LI << "HTTP request for [" << req.getUrl().toString() << "]";
if (req.getHeader("accept").toString() == "application/nostr+json") {
auto info = getServerInfoHttpResponse();
res->write(info.data(), info.size());
} else {
res->write(defaultHttpResponse.data(), defaultHttpResponse.size());
}
});
hubGroup->onConnection([&](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req) {
std::string addr = ws->getAddress().address;
uint64_t connId = nextConnectionId++;
LI << "[" << connId << "] Connect from " << addr;
Connection *c = new Connection(ws, connId);
c->ipAddr = addr;
ws->setUserData((void*)c);
connIdToConnection.emplace(connId, c);
{
int optval = 1;
if (setsockopt(ws->getFd(), SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))) {
LW << "Failed to enable TCP keepalive: " << strerror(errno);
}
}
});
hubGroup->onDisconnection([&](uWS::WebSocket<uWS::SERVER> *ws, int code, char *message, size_t length) {
Connection *c = (Connection*)ws->getUserData();
uint64_t connId = c->connId;
LI << "[" << connId << "] Disconnect from " << c->ipAddr;
tpIngester.dispatch(connId, MsgIngester{MsgIngester::CloseConn{connId}});
connIdToConnection.erase(connId);
delete c;
});
hubGroup->onMessage2([&](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) {
//LI << "Decompression: " << compressedSize << " -> " << length;
auto &c = *(Connection*)ws->getUserData();
tpIngester.dispatch(c.connId, MsgIngester{MsgIngester::ClientMessage{c.connId, std::string(message, length)}});
});
std::function<void()> asyncCb = [&]{
auto newMsgs = thr.inbox.pop_all_no_wait();
auto doSend = [&](uint64_t connId, std::string_view payload, uWS::OpCode opCode){
auto it = connIdToConnection.find(connId);
if (it == connIdToConnection.end()) return;
auto &c = *it->second;
size_t compressedSize;
auto cb = [](uWS::WebSocket<uWS::SERVER> *webSocket, void *data, bool cancelled, void *reserved){};
c.websocket->send(payload.data(), payload.size(), opCode, cb, nullptr, true, &compressedSize);
//LI << "Compression: " << payload.size() << " -> " << compressedSize;
};
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWebsocket::Send>(&newMsg.msg)) {
doSend(msg->connId, msg->payload, uWS::OpCode::TEXT);
} else if (auto msg = std::get_if<MsgWebsocket::SendBinary>(&newMsg.msg)) {
doSend(msg->connId, msg->payload, uWS::OpCode::BINARY);
} else if (auto msg = std::get_if<MsgWebsocket::SendEventToBatch>(&newMsg.msg)) {
for (auto &item : msg->list) {
tempBuf.clear();
tempBuf += "[\"EVENT\",\"";
tempBuf += item.subId.sv();
tempBuf += "\",";
tempBuf += msg->evJson;
tempBuf += "]";
auto it = connIdToConnection.find(item.connId);
if (it == connIdToConnection.end()) continue;
auto &c = *it->second;
size_t compressedSize;
auto cb = [](uWS::WebSocket<uWS::SERVER> *webSocket, void *data, bool cancelled, void *reserved){};
c.websocket->send(tempBuf.data(), tempBuf.size(), uWS::OpCode::TEXT, cb, nullptr, true, &compressedSize);
//LI << "Compression: " << msg->payload.size() << " -> " << compressedSize;
}
}
}
};
hubTrigger = std::make_unique<uS::Async>(hub.getLoop());
hubTrigger->setData(&asyncCb);
hubTrigger->start([](uS::Async *a){
auto *r = static_cast<std::function<void()> *>(a->data);
(*r)();
});
int port = cfg().relay__port;
std::string bindHost = cfg().relay__bind;
if (!hub.listen(bindHost.c_str(), port, nullptr, uS::REUSE_PORT, hubGroup)) throw herr("unable to listen on port ", port);
LI << "Started websocket server on " << bindHost << ":" << port;
hub.run();
}

59
src/RelayWriter.cpp Normal file
View File

@ -0,0 +1,59 @@
#include "RelayServer.h"
void RelayServer::runWriter(ThreadPool<MsgWriter>::Thread &thr) {
quadrable::Quadrable qdb;
{
auto txn = env.txn_ro();
qdb.init(txn);
}
qdb.checkout("events");
while(1) {
auto newMsgs = thr.inbox.pop_all();
// Prepare messages
std::deque<EventToWrite> newEvents;
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWriter::AddEvent>(&newMsg.msg)) {
newEvents.emplace_back(std::move(msg->flatStr), std::move(msg->jsonStr), msg->receivedAt, msg);
}
}
{
auto txn = env.txn_rw();
writeEvents(txn, qdb, newEvents);
txn.commit();
}
// Log
for (auto &newEvent : newEvents) {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(newEvent.flatStr.data());
auto eventIdHex = to_hex(sv(flat->id()));
std::string message;
bool written = false;
if (newEvent.status == EventWriteStatus::Written) {
LI << "Inserted event. id=" << eventIdHex << " qdbNodeId=" << newEvent.nodeId;
written = true;
} else if (newEvent.status == EventWriteStatus::Duplicate) {
message = "duplicate: have this event";
} else if (newEvent.status == EventWriteStatus::Replaced) {
message = "replaced: have newer event";
} else if (newEvent.status == EventWriteStatus::Deleted) {
message = "deleted: user requested deletion";
}
if (newEvent.status != EventWriteStatus::Written) {
LI << "Rejected event. " << message << ", id=" << eventIdHex;
}
MsgWriter::AddEvent *addEventMsg = static_cast<MsgWriter::AddEvent*>(newEvent.userData);
sendOKResponse(addEventMsg->connId, eventIdHex, written, message);
}
}
}

146
src/RelayYesstr.cpp Normal file
View File

@ -0,0 +1,146 @@
#include <quadrable.h>
#include <quadrable/transport.h>
#include "RelayServer.h"
#include "DBScan.h"
void RelayServer::runYesstr(ThreadPool<MsgYesstr>::Thread &thr) {
quadrable::Quadrable qdb;
{
auto txn = env.txn_ro();
qdb.init(txn);
}
struct SyncState {
quadrable::MemStore m;
};
struct SyncStateCollection {
RelayServer *server;
quadrable::Quadrable *qdb;
std::map<uint64_t, std::map<uint64_t, SyncState>> conns; // connId -> reqId -> SyncState
SyncStateCollection(RelayServer *server_, quadrable::Quadrable *qdb_) : server(server_), qdb(qdb_) {}
SyncState *lookup(uint64_t connId, uint64_t reqId) {
if (!conns.contains(connId)) return nullptr;
if (!conns[connId].contains(reqId)) return nullptr;
return &conns[connId][reqId];
}
SyncState *newRequest(lmdb::txn &txn, uint64_t connId, uint64_t reqId, std::string_view filterStr) {
if (!conns.contains(connId)) conns.try_emplace(connId);
if (conns[connId].contains(reqId)) {
LI << "Client tried to re-use reqId for new filter, ignoring";
return &conns[connId][reqId];
}
conns[connId].try_emplace(reqId);
auto &s = conns[connId][reqId];
if (filterStr == "{}") {
qdb->checkout("events");
uint64_t nodeId = qdb->getHeadNodeId(txn);
qdb->withMemStore(s.m, [&]{
qdb->writeToMemStore = true;
qdb->checkout(nodeId);
});
} else {
// FIXME: The following blocks the whole thread for the query duration. Should interleave it
// with other requests like RelayReqWorker does.
LI << "Yesstr sync: Running filter: " << filterStr;
std::vector<uint64_t> quadEventIds;
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
while (1) {
bool complete = query.process(txn, MAX_U64, [&](const auto &sub, uint64_t quadId){
quadEventIds.push_back(quadId);
});
if (complete) break;
}
LI << "Filter matched " << quadEventIds.size() << " local events";
qdb->withMemStore(s.m, [&]{
qdb->writeToMemStore = true;
qdb->checkout();
auto changes = qdb->change();
for (auto id : quadEventIds) {
changes.putReuse(txn, id);
}
changes.apply(txn);
});
}
return &s;
}
void handleRequest(lmdb::txn &txn, uint64_t connId, uint64_t reqId, std::string_view filterStr, std::string_view reqsEncoded) {
SyncState *s = lookup(connId, reqId);
if (!s) s = newRequest(txn, connId, reqId, filterStr);
auto reqs = quadrable::transport::decodeSyncRequests(reqsEncoded);
quadrable::SyncResponses resps;
qdb->withMemStore(s->m, [&]{
qdb->writeToMemStore = true;
LI << "ZZZ NODE " << qdb->getHeadNodeId(txn);
resps = qdb->handleSyncRequests(txn, qdb->getHeadNodeId(txn), reqs, 100'000);
});
std::string respsEncoded = quadrable::transport::encodeSyncResponses(resps);
flatbuffers::FlatBufferBuilder builder;
auto respOffset = Yesstr::CreateResponse(builder,
reqId,
Yesstr::ResponsePayload::ResponsePayload_ResponseSync,
Yesstr::CreateResponseSync(builder,
builder.CreateVector((uint8_t*)respsEncoded.data(), respsEncoded.size())
).Union()
);
builder.Finish(respOffset);
std::string respMsg = std::string("Y") + std::string(reinterpret_cast<char*>(builder.GetBufferPointer()), builder.GetSize());
server->sendToConnBinary(connId, std::move(respMsg));
}
void closeConn(uint64_t connId) {
conns.erase(connId);
}
};
SyncStateCollection states(this, &qdb);
while(1) {
auto newMsgs = thr.inbox.pop_all();
auto txn = env.txn_ro();
for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgYesstr::SyncRequest>(&newMsg.msg)) {
const auto *req = parseYesstrRequest(msg->yesstrMessage); // validated by ingester
const auto *reqSync = req->payload_as<Yesstr::RequestSync>();
states.handleRequest(txn, msg->connId, req->requestId(), sv(reqSync->filter()), sv(reqSync->reqsEncoded()));
} else if (auto msg = std::get_if<MsgYesstr::CloseConn>(&newMsg.msg)) {
states.closeConn(msg->connId);
}
}
}
}

58
src/Subscription.h Normal file
View File

@ -0,0 +1,58 @@
#pragma once
#include "filters.h"
struct SubId {
char buf[40];
SubId(std::string_view val) {
static_assert(MAX_SUBID_SIZE == 39, "MAX_SUBID_SIZE mismatch");
if (val.size() > 39) throw herr("subscription id too long");
if (val.size() == 0) throw herr("subscription id too short");
auto badChar = [](char c){
return c < 0x20 || c == '\\' || c == '"' || c >= 0x7F;
};
if (std::any_of(val.begin(), val.end(), badChar)) throw herr("invalid character in subscription id");
buf[0] = (char)val.size();
memcpy(&buf[1], val.data(), val.size());
}
std::string_view sv() const {
return std::string_view(&buf[1], (size_t)buf[0]);
}
std::string str() const {
return std::string(sv());
}
};
inline bool operator <(const SubId &s1, const SubId &s2) {
return s1.sv() < s2.sv();
}
struct Subscription : NonCopyable {
Subscription(uint64_t connId_, std::string subId_, NostrFilterGroup filterGroup_) : connId(connId_), subId(subId_), filterGroup(filterGroup_) {}
// Params
uint64_t connId;
SubId subId;
NostrFilterGroup filterGroup;
// State
uint64_t latestEventId = MAX_U64;
};
struct ConnIdSubId {
uint64_t connId;
SubId subId;
};
using RecipientList = std::vector<ConnIdSubId>;

61
src/ThreadPool.h Normal file
View File

@ -0,0 +1,61 @@
#pragma once
#include <hoytech/protected_queue.h>
template <typename M>
struct ThreadPool {
uint64_t numThreads;
struct Thread {
uint64_t id;
std::thread thread;
hoytech::protected_queue<M> inbox;
};
std::deque<Thread> pool;
~ThreadPool() {
join();
}
void init(std::string name, uint64_t numThreads_, std::function<void(Thread &t)> cb) {
if (numThreads_ == 0) throw herr("must have more than 0 threads");
numThreads = numThreads_;
for (size_t i = 0; i < numThreads; i++) {
std::string myName = name;
if (numThreads != 1) myName += std::string(" ") + std::to_string(i);
pool.emplace_back();
auto &t = pool.back();
t.id = i;
t.thread = std::thread([&t, cb, myName]() {
setThreadName(myName.c_str());
cb(t);
});
}
}
void dispatch(uint64_t key, M &&m) {
uint64_t who = key % numThreads;
pool[who].inbox.push_move(std::move(m));
}
void dispatchMulti(uint64_t key, std::vector<M> &m) {
uint64_t who = key % numThreads;
pool[who].inbox.push_move_all(m);
}
void dispatchToAll(std::function<M()> cb) {
for (size_t i = 0; i < numThreads; i++) pool[i].inbox.push_move(cb());
}
void join() {
for (size_t i = 0; i < numThreads; i++) {
pool[i].thread.join();
}
}
};

133
src/WSConnection.h Normal file
View File

@ -0,0 +1,133 @@
#include <thread>
#include <chrono>
#include <uWebSockets/src/uWS.h>
#include "golpe.h"
class WSConnection {
std::string url;
uWS::Hub hub;
uWS::Group<uWS::CLIENT> *hubGroup;
std::unique_ptr<uS::Async> hubTrigger;
uWS::WebSocket<uWS::CLIENT> *currWs = nullptr;
public:
WSConnection(const std::string &url) : url(url) {}
std::function<void()> onConnect;
std::function<void(std::string_view, size_t)> onMessage;
std::function<void()> onTrigger;
bool reconnect = true;
uint64_t reconnectDelayMilliseconds = 5'000;
// Should only be called from the websocket thread (ie within an onConnect or onMessage callback)
void send(std::string_view msg, uWS::OpCode op = uWS::OpCode::TEXT, size_t *compressedSize = nullptr) {
if (currWs) {
currWs->send(msg.data(), msg.size(), op, nullptr, nullptr, true, compressedSize);
} else {
LI << "Tried to send message, but websocket is disconnected";
}
}
// Can be called from any thread, invokes onTrigger in websocket thread context
void trigger() {
if (hubTrigger) hubTrigger->send();
}
void run() {
hubGroup = hub.createGroup<uWS::CLIENT>(uWS::PERMESSAGE_DEFLATE | uWS::SLIDING_DEFLATE_WINDOW);
auto doConnect = [&](uint64_t delay = 0){
if (delay) std::this_thread::sleep_for(std::chrono::milliseconds(delay));
LI << "Attempting to connect to " << url;
hub.connect(url, nullptr, {}, 5000, hubGroup);
};
hubGroup->onConnection([&](uWS::WebSocket<uWS::CLIENT> *ws, uWS::HttpRequest req) {
if (currWs) {
currWs->terminate();
currWs = nullptr;
}
std::string addr = ws->getAddress().address;
LI << "Connected to " << addr;
{
int optval = 1;
if (setsockopt(ws->getFd(), SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))) {
LW << "Failed to enable TCP keepalive: " << strerror(errno);
}
}
currWs = ws;
if (!onConnect) return;
try {
onConnect();
} catch (std::exception &e) {
LW << "onConnect failure: " << e.what();
}
});
hubGroup->onDisconnection([&](uWS::WebSocket<uWS::CLIENT> *ws, int code, char *message, size_t length) {
LI << "Disconnected";
if (ws == currWs) {
currWs = nullptr;
if (!reconnect) ::exit(1);
doConnect(reconnectDelayMilliseconds);
} else {
LI << "Got disconnect for unexpected connection?";
}
});
hubGroup->onError([&](void *) {
LI << "Websocket connection error";
if (!reconnect) ::exit(1);
doConnect(reconnectDelayMilliseconds);
});
hubGroup->onMessage2([&](uWS::WebSocket<uWS::CLIENT> *ws, char *message, size_t length, uWS::OpCode opCode, size_t compressedSize) {
if (!onMessage) return;
try {
onMessage(std::string_view(message, length), compressedSize);
} catch (std::exception &e) {
LW << "onMessage failure: " << e.what();
}
});
std::function<void()> asyncCb = [&]{
if (!onTrigger) return;
try {
onTrigger();
} catch (std::exception &e) {
LW << "onTrigger failure: " << e.what();
}
};
hubTrigger = std::make_unique<uS::Async>(hub.getLoop());
hubTrigger->setData(&asyncCb);
hubTrigger->start([](uS::Async *a){
auto *r = static_cast<std::function<void()> *>(a->data);
(*r)();
});
doConnect();
hub.run();
}
};

126
src/WriterPipeline.h Normal file
View File

@ -0,0 +1,126 @@
#pragma once
#include <hoytech/protected_queue.h>
#include "golpe.h"
#include "events.h"
struct WriterPipeline {
public:
hoytech::protected_queue<tao::json::value> inbox;
hoytech::protected_queue<bool> flushInbox;
private:
hoytech::protected_queue<EventToWrite> writerInbox;
std::thread validatorThread;
std::thread writerThread;
public:
WriterPipeline() {
validatorThread = std::thread([&]() {
setThreadName("Validator");
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
while (1) {
auto msgs = inbox.pop_all();
for (auto &m : msgs) {
if (m.is_null()) {
writerInbox.push_move({ "", "", 0 });
break;
}
std::string flatStr;
std::string jsonStr;
try {
parseAndVerifyEvent(m, secpCtx, true, true, flatStr, jsonStr);
} catch (std::exception &e) {
LW << "Rejected event: " << m << " reason: " << e.what();
continue;
}
writerInbox.push_move({ std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us() });
}
}
});
writerThread = std::thread([&]() {
setThreadName("Writer");
quadrable::Quadrable qdb;
{
auto txn = env.txn_ro();
qdb.init(txn);
}
qdb.checkout("events");
while (1) {
// Debounce
writerInbox.wait();
std::this_thread::sleep_for(std::chrono::milliseconds(1'000));
auto newEvents = writerInbox.pop_all();
bool flush = false;
uint64_t written = 0, dups = 0;
// Collect a certain amount of records in a batch, push the rest back into the inbox
// Pre-filter out dups in a read-only txn as an optimisation
std::deque<EventToWrite> newEventsToProc;
{
auto txn = env.txn_ro();
for (auto &event : newEvents) {
if (newEventsToProc.size() > 1'000) {
// Put the rest back in the inbox
writerInbox.unshift_move_all(newEvents);
newEvents.clear();
break;
}
if (event.flatStr.size() == 0) {
flush = true;
break;
}
auto *flat = flatStrToFlatEvent(event.flatStr);
if (lookupEventById(txn, sv(flat->id()))) {
dups++;
continue;
}
newEventsToProc.emplace_back(std::move(event));
}
}
if (newEventsToProc.size()) {
{
auto txn = env.txn_rw();
writeEvents(txn, qdb, newEventsToProc);
txn.commit();
}
for (auto &ev : newEventsToProc) {
if (ev.status == EventWriteStatus::Written) written++;
else dups++;
// FIXME: log rejected stats too
}
}
LI << "Writer: added: " << written << " dups: " << dups;
if (flush) flushInbox.push_move(true);
}
});
}
void flush() {
inbox.push_move(tao::json::null);
flushInbox.wait();
}
};

39
src/cmd_export.cpp Normal file
View File

@ -0,0 +1,39 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
export [--since=<since>] [--until=<until>] [--include-ephemeral]
)";
void cmd_export(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
uint64_t since = 0, until = MAX_U64;
if (args["--since"]) since = args["--since"].asLong();
if (args["--until"]) until = args["--until"].asLong();
auto txn = env.txn_ro();
env.generic_foreachFull(txn, env.dbi_Event__created_at, lmdb::to_sv<uint64_t>(since), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
if (lmdb::from_sv<uint64_t>(k) > until) return false;
auto view = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
if (!view) throw herr("missing event from index, corrupt DB?");
if (!args["--include-ephemeral"].asBool()) {
if (isEphemeralEvent(view->flat_nested()->kind())) return true;
}
std::cout << getEventJson(txn, view->primaryKeyId) << "\n";
return true;
});
}

95
src/cmd_import.cpp Normal file
View File

@ -0,0 +1,95 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "events.h"
#include "filters.h"
static const char USAGE[] =
R"(
Usage:
import [--show-rejected] [--no-verify]
)";
void cmd_import(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
bool showRejected = args["--show-rejected"].asBool();
bool noVerify = args["--no-verify"].asBool();
if (noVerify) LW << "not verifying event IDs or signatures!";
quadrable::Quadrable qdb;
{
auto txn = env.txn_ro();
qdb.init(txn);
}
qdb.checkout("events");
auto txn = env.txn_rw();
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
std::string line;
uint64_t processed = 0, added = 0, rejected = 0, dups = 0;
std::deque<EventToWrite> newEvents;
auto logStatus = [&]{
LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups";
};
auto flushChanges = [&]{
writeEvents(txn, qdb, newEvents);
uint64_t numCommits = 0;
for (auto &newEvent : newEvents) {
if (newEvent.status == EventWriteStatus::Written) {
added++;
numCommits++;
} else if (newEvent.status == EventWriteStatus::Duplicate) {
dups++;
} else {
rejected++;
}
}
logStatus();
LI << "Committing " << numCommits << " records";
txn.commit();
txn = env.txn_rw();
newEvents.clear();
};
while (std::cin) {
std::getline(std::cin, line);
if (!line.size()) continue;
processed++;
std::string flatStr;
std::string jsonStr;
try {
auto origJson = tao::json::from_string(line);
parseAndVerifyEvent(origJson, secpCtx, !noVerify, false, flatStr, jsonStr);
} catch (std::exception &e) {
if (showRejected) LW << "Line " << processed << " rejected: " << e.what();
rejected++;
continue;
}
newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us());
if (newEvents.size() >= 10'000) flushChanges();
}
flushChanges();
txn.commit();
}

27
src/cmd_info.cpp Normal file
View File

@ -0,0 +1,27 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
static const char USAGE[] =
R"(
Usage:
info
)";
void cmd_info(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
quadrable::Quadrable qdb;
{
auto txn = env.txn_ro();
qdb.init(txn);
}
qdb.checkout("events");
auto txn = env.txn_ro();
std::cout << "merkle root: " << to_hex(qdb.root(txn)) << "\n";
}

66
src/cmd_monitor.cpp Normal file
View File

@ -0,0 +1,66 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "ActiveMonitors.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
monitor
)";
// echo '["sub",1,"mysub",{"authors":["47f7163b"]}]' | ./strfry monitor
void cmd_monitor(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
auto txn = env.txn_ro();
ActiveMonitors monitors;
std::string line;
uint64_t interestConnId = 0;
std::string interestSubId;
while (std::cin) {
std::getline(std::cin, line);
if (!line.size()) continue;
auto msg = tao::json::from_string(line);
auto &msgArr = msg.get_array();
auto cmd = msgArr.at(0).get_string();
if (cmd == "sub") {
Subscription sub(msgArr.at(1).get_unsigned(), msgArr.at(2).get_string(), NostrFilterGroup::unwrapped(msgArr.at(3)));
sub.latestEventId = 0;
monitors.addSub(txn, std::move(sub), 0);
} else if (cmd == "removeSub") {
monitors.removeSub(msgArr.at(1).get_unsigned(), SubId(msgArr.at(2).get_string()));
} else if (cmd == "closeConn") {
monitors.closeConn(msgArr.at(1).get_unsigned());
} else if (cmd == "interest") {
if (interestConnId) throw herr("interest already set");
interestConnId = msgArr.at(1).get_unsigned();
interestSubId = msgArr.at(2).get_string();
} else {
throw herr("unknown cmd");
}
}
env.foreach_Event(txn, [&](auto &ev){
monitors.process(txn, ev, [&](RecipientList &&recipients, uint64_t quadId){
for (auto &r : recipients) {
if (r.connId == interestConnId && r.subId.str() == interestSubId) {
std::cout << getEventJson(txn, quadId) << "\n";
}
}
});
return true;
});
}

56
src/cmd_relay.cpp Normal file
View File

@ -0,0 +1,56 @@
#include "RelayServer.h"
void cmd_relay(const std::vector<std::string> &subArgs) {
RelayServer s;
s.run();
}
void RelayServer::run() {
tpWebsocket.init("Websocket", 1, [this](auto &thr){
runWebsocket(thr);
});
tpIngester.init("Ingester", cfg().relay__numThreads__ingester, [this](auto &thr){
runIngester(thr);
});
tpWriter.init("Writer", 1, [this](auto &thr){
runWriter(thr);
});
tpReqWorker.init("ReqWorker", cfg().relay__numThreads__reqWorker, [this](auto &thr){
runReqWorker(thr);
});
tpReqMonitor.init("ReqMonitor", cfg().relay__numThreads__reqMonitor, [this](auto &thr){
runReqMonitor(thr);
});
tpYesstr.init("Yesstr", cfg().relay__numThreads__yesstr, [this](auto &thr){
runYesstr(thr);
});
// Monitor for config file reloads
auto configFileChangeWatcher = hoytech::file_change_monitor(configFile);
configFileChangeWatcher.setDebounce(100);
configFileChangeWatcher.run([&](){
loadConfig(configFile);
});
// Cron
cron.repeat(10 * 1'000'000UL, [&]{
cleanupOldEvents();
});
cron.setupCb = []{ setThreadName("cron"); };
cron.run();
tpWebsocket.join();
}

41
src/cmd_scan.cpp Normal file
View File

@ -0,0 +1,41 @@
#include <iostream>
#include <docopt.h>
#include "golpe.h"
#include "DBScan.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
scan [--pause=<pause>] <filter>
)";
void cmd_scan(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
uint64_t pause = 0;
if (args["--pause"]) pause = args["--pause"].asLong();
std::string filterStr = args["<filter>"].asString();
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
auto txn = env.txn_ro();
while (1) {
bool complete = query.process(txn, pause ? pause : MAX_U64, [&](const auto &sub, uint64_t quadId){
std::cout << getEventJson(txn, quadId) << "\n";
});
if (complete) break;
}
}

124
src/cmd_stream.cpp Normal file
View File

@ -0,0 +1,124 @@
#include <docopt.h>
#include <tao/json.hpp>
#include <hoytech/protected_queue.h>
#include <hoytech/file_change_monitor.h>
#include "golpe.h"
#include "WriterPipeline.h"
#include "Subscription.h"
#include "WSConnection.h"
#include "events.h"
static const char USAGE[] =
R"(
Usage:
stream <url> [--dir=<dir>]
Options:
--dir=<dir> Direction: down, up, or both [default: down]
)";
void cmd_stream(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
std::string url = args["<url>"].asString();
std::string dir = args["--dir"] ? args["--dir"].asString() : "down";
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
std::unordered_set<std::string> downloadedIds;
WriterPipeline writer;
WSConnection ws(url);
ws.onConnect = [&]{
if (dir == "down" || dir == "both") {
auto encoded = tao::json::to_string(tao::json::value::array({ "REQ", "sub", tao::json::value({ { "limit", 0 } }) }));
ws.send(encoded);
}
};
ws.onMessage = [&](auto msg, size_t){
auto origJson = tao::json::from_string(msg);
if (origJson.is_array()) {
if (origJson.get_array().size() < 2) throw herr("array too short");
auto &msgType = origJson.get_array().at(0);
if (msgType == "EOSE") {
return;
} else if (msgType == "NOTICE") {
LW << "NOTICE message: " << tao::json::to_string(origJson);
return;
} else if (msgType == "OK") {
if (!origJson.get_array().at(2).get_boolean()) {
LW << "Event not written: " << origJson;
}
} else if (msgType == "EVENT") {
if (dir == "down" || dir == "both") {
if (origJson.get_array().size() < 3) throw herr("array too short");
auto &evJson = origJson.at(2);
downloadedIds.emplace(from_hex(evJson.at("id").get_string()));
writer.inbox.push_move(std::move(evJson));
} else {
LW << "Unexpected EVENT";
}
} else {
throw herr("unexpected first element");
}
} else {
throw herr("unexpected message");
}
};
uint64_t currEventId;
{
auto txn = env.txn_ro();
currEventId = getMostRecentEventId(txn);
}
ws.onTrigger = [&]{
if (dir == "down") return;
auto txn = env.txn_ro();
env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId;
auto id = std::string(sv(ev.flat_nested()->id()));
if (downloadedIds.find(id) != downloadedIds.end()) {
downloadedIds.erase(id);
return true;
}
std::string msg = std::string("[\"EVENT\",");
msg += getEventJson(txn, ev.primaryKeyId);
msg += "]";
ws.send(msg);
return true;
}, false, currEventId + 1);
};
std::unique_ptr<hoytech::file_change_monitor> dbChangeWatcher;
if (dir == "up" || dir == "both") {
dbChangeWatcher = std::make_unique<hoytech::file_change_monitor>(dbDir + "/data.mdb");
dbChangeWatcher->setDebounce(100);
dbChangeWatcher->run([&](){
ws.trigger();
});
}
ws.run();
}

229
src/cmd_sync.cpp Normal file
View File

@ -0,0 +1,229 @@
#include <docopt.h>
#include <tao/json.hpp>
#include <quadrable.h>
#include <quadrable/transport.h>
#include "golpe.h"
#include "WriterPipeline.h"
#include "Subscription.h"
#include "WSConnection.h"
#include "DBScan.h"
#include "filters.h"
#include "events.h"
#include "yesstr.h"
static const char USAGE[] =
R"(
Usage:
sync <url> [--filter=<filter>] [--dir=<dir>]
Options:
--filter=<filter> Nostr filter (either single filter object or array of filters)
--dir=<dir> Direction: down, up, or both [default: down]
)";
struct SyncController {
quadrable::Quadrable *qdb;
WSConnection *ws;
quadrable::Quadrable::Sync sync;
quadrable::MemStore m;
uint64_t ourNodeId = 0;
quadrable::SyncRequests reqs;
bool sentFirstReq = false;
SyncController(quadrable::Quadrable *qdb_, WSConnection *ws_) : qdb(qdb_), ws(ws_), sync(qdb_) { }
void init(lmdb::txn &txn) {
qdb->withMemStore(m, [&]{
qdb->writeToMemStore = true;
ourNodeId = qdb->getHeadNodeId(txn);
sync.init(txn, ourNodeId);
});
}
bool sendRequests(lmdb::txn &txn, const std::string &filterStr) {
qdb->withMemStore(m, [&]{
qdb->writeToMemStore = true;
reqs = sync.getReqs(txn, 10'000);
});
if (reqs.size() == 0) return false;
std::string reqsEncoded = quadrable::transport::encodeSyncRequests(reqs);
flatbuffers::FlatBufferBuilder builder;
auto reqOffset = Yesstr::CreateRequest(builder,
123,
Yesstr::RequestPayload::RequestPayload_RequestSync,
Yesstr::CreateRequestSync(builder,
(filterStr.size() && !sentFirstReq) ? builder.CreateString(filterStr) : 0,
builder.CreateVector((uint8_t*)reqsEncoded.data(), reqsEncoded.size())
).Union()
);
builder.Finish(reqOffset);
std::string reqMsg = std::string("Y") + std::string(reinterpret_cast<char*>(builder.GetBufferPointer()), builder.GetSize());
size_t compressedSize;
ws->send(reqMsg, uWS::OpCode::BINARY, &compressedSize);
LI << "SEND size=" << reqMsg.size() << " compressed=" << compressedSize;
sentFirstReq = true;
return true;
}
void handleResponses(lmdb::txn &txn, std::string_view msg) {
verifyYesstrResponse(msg);
const auto *resp = parseYesstrResponse(msg);
const auto *respSync = resp->payload_as_ResponseSync();
auto resps = quadrable::transport::decodeSyncResponses(sv(respSync->respsEncoded()));
qdb->withMemStore(m, [&]{
qdb->writeToMemStore = true;
sync.addResps(txn, reqs, resps);
});
}
void finish(lmdb::txn &txn, std::function<void(std::string_view)> onNewLeaf, std::function<void(std::string_view)> onMissingLeaf) {
qdb->withMemStore(m, [&]{
qdb->writeToMemStore = true;
sync.diff(txn, ourNodeId, sync.nodeIdShadow, [&](auto dt, const auto &node){
if (dt == quadrable::Quadrable::DiffType::Added) {
// node exists only on the provider-side
LI << "NEW LEAF: " << node.leafVal();
onNewLeaf(node.leafVal());
} else if (dt == quadrable::Quadrable::DiffType::Deleted) {
// node exists only on the syncer-side
LI << "MISSING LEAF: " << node.leafVal();
onMissingLeaf(node.leafVal());
} else if (dt == quadrable::Quadrable::DiffType::Changed) {
// nodes differ. node is the one on the provider-side
}
});
});
}
};
void cmd_sync(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");
std::string url = args["<url>"].asString();
std::string filterStr;
if (args["--filter"]) filterStr = args["--filter"].asString();
std::string dir = args["--dir"] ? args["--dir"].asString() : "down";
if (dir != "up" && dir != "down" && dir != "both") throw herr("invalid direction: ", dir, ". Should be one of up/down/both");
if (dir != "down") throw herr("only down currently supported"); // FIXME
std::unique_ptr<SyncController> controller;
WriterPipeline writer;
WSConnection ws(url);
quadrable::Quadrable qdb;
{
auto txn = env.txn_ro();
qdb.init(txn);
}
qdb.checkout("events");
ws.reconnect = false;
if (filterStr.size()) {
std::vector<uint64_t> quadEventIds;
std::string filterStr = args["--filter"].asString();
auto filterGroup = NostrFilterGroup::unwrapped(tao::json::from_string(filterStr));
Subscription sub(1, "junkSub", filterGroup);
DBScanQuery query(sub);
auto txn = env.txn_ro();
while (1) {
bool complete = query.process(txn, MAX_U64, [&](const auto &sub, uint64_t quadId){
quadEventIds.push_back(quadId);
});
if (complete) break;
}
LI << "Filter matched " << quadEventIds.size() << " local events";
controller = std::make_unique<SyncController>(&qdb, &ws);
qdb.withMemStore(controller->m, [&]{
qdb.writeToMemStore = true;
qdb.checkout();
auto changes = qdb.change();
for (auto id : quadEventIds) {
changes.putReuse(txn, id);
}
changes.apply(txn);
});
controller->init(txn);
} else {
auto txn = env.txn_ro();
controller = std::make_unique<SyncController>(&qdb, &ws);
controller->init(txn);
}
ws.onConnect = [&]{
auto txn = env.txn_ro();
controller->sendRequests(txn, filterStr);
};
ws.onMessage = [&](auto msg, size_t compressedSize){
auto txn = env.txn_ro();
if (!controller) {
LW << "No sync active, ignoring message";
return;
}
LI << "RECV size=" << msg.size() << " compressed=" << compressedSize;
controller->handleResponses(txn, msg);
if (!controller->sendRequests(txn, filterStr)) {
LI << "Syncing done, writing/sending events";
controller->finish(txn,
[&](std::string_view newLeaf){
writer.inbox.push_move(tao::json::from_string(std::string(newLeaf)));
},
[&](std::string_view){
}
);
writer.flush();
::exit(0);
}
};
ws.run();
}

3
src/constants.h Normal file
View File

@ -0,0 +1,3 @@
#pragma once
const size_t MAX_SUBID_SIZE = 39;

234
src/events.cpp Normal file
View File

@ -0,0 +1,234 @@
#include "events.h"
std::string nostrJsonToFlat(const tao::json::value &v) {
flatbuffers::FlatBufferBuilder builder; // FIXME: pre-allocate size approximately the same as orig JSON?
// Extract values from JSON, add strings to builder
auto loadHexStr = [&](std::string_view k, uint64_t size){
auto s = from_hex(v.at(k).get_string(), false);
if (s.size() != size) throw herr("unexpected size of hex data");
return builder.CreateVector((uint8_t*)s.data(), s.size());
};
auto idPtr = loadHexStr("id", 32);
auto pubkeyPtr = loadHexStr("pubkey", 32);
uint64_t created_at = v.at("created_at").get_unsigned();
uint64_t kind = v.at("kind").get_unsigned();
std::vector<flatbuffers::Offset<NostrIndex::Tag>> tagPtrs;
if (v.at("tags").get_array().size() > cfg().events__maxNumTags) throw herr("too many tags: ", v.at("tags").get_array().size());
for (auto &tagArr : v.at("tags").get_array()) {
auto &tag = tagArr.get_array();
if (tag.size() < 2) throw herr("too few fields in tag");
auto tagName = tag.at(0).get_string();
if (tagName.size() != 1) continue; // only single-char tags need indexing
auto tagVal = tag.at(1).get_string();
if (tagVal.size() < 1 || tagVal.size() > cfg().events__maxTagValSize) throw herr("tag val too small/large: ", tagVal.size());
if (tagName == "e" || tagName == "p") {
tagVal = from_hex(tagVal, false);
if (tagVal.size() != 32) throw herr("unexpected size for e/p tag");
}
auto tagValPtr = builder.CreateVector((uint8_t*)tagVal.data(), tagVal.size());
tagPtrs.push_back(NostrIndex::CreateTag(builder, (uint8_t)tagName[0], tagValPtr));
}
auto tagsPtr = builder.CreateVector<flatbuffers::Offset<NostrIndex::Tag>>(tagPtrs);
// Create flatbuffer
auto eventPtr = NostrIndex::CreateEvent(builder, idPtr, pubkeyPtr, created_at, kind, tagsPtr);
builder.Finish(eventPtr);
return std::string(reinterpret_cast<char*>(builder.GetBufferPointer()), builder.GetSize());
}
std::string nostrHash(const tao::json::value &origJson) {
tao::json::value arr = tao::json::empty_array;
arr.emplace_back(0);
arr.emplace_back(origJson.at("pubkey"));
arr.emplace_back(origJson.at("created_at"));
arr.emplace_back(origJson.at("kind"));
arr.emplace_back(origJson.at("tags"));
arr.emplace_back(origJson.at("content"));
std::string encoded = tao::json::to_string(arr);
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256_CTX sha256;
SHA256_Init(&sha256);
SHA256_Update(&sha256, encoded.data(), encoded.size());
SHA256_Final(hash, &sha256);
return std::string(((char*)hash), SHA256_DIGEST_LENGTH);
}
bool verifySig(secp256k1_context* ctx, std::string_view sig, std::string_view hash, std::string_view pubkey) {
if (sig.size() != 64 || hash.size() != 32 || pubkey.size() != 32) throw herr("verify sig: bad input size");
secp256k1_xonly_pubkey pubkeyParsed;
if (!secp256k1_xonly_pubkey_parse(ctx, &pubkeyParsed, (const uint8_t*)pubkey.data())) throw herr("verify sig: bad pubkey");
return secp256k1_schnorrsig_verify(ctx, (const uint8_t*)sig.data(), (const uint8_t*)hash.data(), &pubkeyParsed);
}
void verifyNostrEvent(secp256k1_context *secpCtx, const NostrIndex::Event *flat, const tao::json::value &origJson) {
auto hash = nostrHash(origJson);
if (hash != sv(flat->id())) throw herr("bad event id");
bool valid = verifySig(secpCtx, from_hex(origJson.at("sig").get_string(), false), sv(flat->id()), sv(flat->pubkey()));
if (!valid) throw herr("bad signature");
}
void verifyNostrEventJsonSize(std::string_view jsonStr) {
if (jsonStr.size() > cfg().events__maxEventSize) throw herr("event too large: ", jsonStr.size());
}
void verifyEventTimestamp(const NostrIndex::Event *flat) {
auto now = hoytech::curr_time_s();
auto ts = flat->created_at();
uint64_t earliest = now - (isEphemeralEvent(flat->kind()) ? cfg().events__rejectEphemeralEventsOlderThanSeconds : cfg().events__rejectEventsOlderThanSeconds);
uint64_t latest = now + cfg().events__rejectEventsNewerThanSeconds;
if (ts < earliest) throw herr("created_at too early");
if (ts > latest) throw herr("created_at too late");
}
void parseAndVerifyEvent(const tao::json::value &origJson, secp256k1_context *secpCtx, bool verifyMsg, bool verifyTime, std::string &flatStr, std::string &jsonStr) {
flatStr = nostrJsonToFlat(origJson);
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
if (verifyTime) verifyEventTimestamp(flat);
if (verifyMsg) verifyNostrEvent(secpCtx, flat, origJson);
// Build new object to remove unknown top-level fields from json
jsonStr = tao::json::to_string(tao::json::value({
{ "content", &origJson.at("content") },
{ "created_at", &origJson.at("created_at") },
{ "id", &origJson.at("id") },
{ "kind", &origJson.at("kind") },
{ "pubkey", &origJson.at("pubkey") },
{ "sig", &origJson.at("sig") },
{ "tags", &origJson.at("tags") },
}));
if (verifyMsg) verifyNostrEventJsonSize(jsonStr);
}
std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn, std::string_view id) {
std::optional<defaultDb::environment::View_Event> output;
env.generic_foreachFull(txn, env.dbi_Event__id, makeKey_StringUint64(id, 0), lmdb::to_sv<uint64_t>(0), [&](auto k, auto v) {
if (k.starts_with(id)) output = env.lookup_Event(txn, lmdb::from_sv<uint64_t>(v));
return false;
});
return output;
}
uint64_t getMostRecentEventId(lmdb::txn &txn) {
uint64_t output = 0;
env.foreach_Event(txn, [&](auto &ev){
output = ev.primaryKeyId;
return false;
}, true);
return output;
}
std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId) {
std::string_view raw;
bool found = env.dbiQuadrable_nodesLeaf.get(txn, lmdb::to_sv<uint64_t>(quadId), raw);
if (!found) throw herr("couldn't find leaf node in quadrable, corrupted DB?");
return raw.substr(8 + 32 + 32);
}
void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque<EventToWrite> &evs) {
auto changes = qdb.change();
std::vector<uint64_t> eventIdsToDelete;
for (auto &ev : evs) {
const NostrIndex::Event *flat = flatbuffers::GetRoot<NostrIndex::Event>(ev.flatStr.data());
if (lookupEventById(txn, sv(flat->id()))) {
ev.status = EventWriteStatus::Duplicate;
continue;
}
if (env.lookup_Event__deletion(txn, std::string(sv(flat->id())) + std::string(sv(flat->pubkey())))) {
ev.status = EventWriteStatus::Deleted;
continue;
}
if (isReplaceableEvent(flat->kind())) {
auto searchKey = makeKey_StringUint64Uint64(sv(flat->pubkey()), flat->kind(), MAX_U64);
uint64_t otherEventId = 0;
env.generic_foreachFull(txn, env.dbi_Event__pubkeyKind, searchKey, lmdb::to_sv<uint64_t>(MAX_U64), [&](auto k, auto v) {
ParsedKey_StringUint64Uint64 parsedKey(k);
if (parsedKey.s == sv(flat->pubkey()) && parsedKey.n1 == flat->kind()) {
if (parsedKey.n2 < flat->created_at()) {
otherEventId = lmdb::from_sv<uint64_t>(v);
} else {
ev.status = EventWriteStatus::Replaced;
}
}
return false;
}, true);
if (otherEventId) {
auto otherEv = env.lookup_Event(txn, otherEventId);
if (!otherEv) throw herr("missing event from index, corrupt DB?");
changes.del(flatEventToQuadrableKey(otherEv->flat_nested()));
eventIdsToDelete.push_back(otherEventId);
}
}
if (flat->kind() == 5) {
// Deletion event, delete all referenced events
for (const auto &tagPair : *(flat->tags())) {
if (tagPair->key() == 'e') {
auto otherEv = lookupEventById(txn, sv(tagPair->val()));
if (otherEv && sv(otherEv->flat_nested()->pubkey()) == sv(flat->pubkey())) {
LI << "Deleting event. id=" << to_hex(sv(tagPair->val()));
changes.del(flatEventToQuadrableKey(otherEv->flat_nested()));
eventIdsToDelete.push_back(otherEv->primaryKeyId);
}
}
}
}
if (ev.status == EventWriteStatus::Pending) {
changes.put(flatEventToQuadrableKey(flat), ev.jsonStr, &ev.nodeId);
}
}
changes.apply(txn);
for (auto eventId : eventIdsToDelete) {
env.delete_Event(txn, eventId);
}
for (auto &ev : evs) {
if (ev.status == EventWriteStatus::Pending) {
env.insert_Event(txn, ev.nodeId, ev.receivedAt, ev.flatStr);
ev.status = EventWriteStatus::Written;
}
}
}

80
src/events.h Normal file
View File

@ -0,0 +1,80 @@
#pragma once
#include <openssl/sha.h>
#include <secp256k1_schnorrsig.h>
#include "golpe.h"
#include "constants.h"
inline bool isReplaceableEvent(uint64_t kind) {
return (
kind == 0 ||
kind == 3 ||
kind == 41 ||
(kind >= 10'000 && kind < 20'000)
);
}
inline bool isEphemeralEvent(uint64_t kind) {
return (
(kind >= 20'000 && kind < 30'000)
);
}
std::string nostrJsonToFlat(const tao::json::value &v);
std::string nostrHash(const tao::json::value &origJson);
bool verifySig(secp256k1_context* ctx, std::string_view sig, std::string_view hash, std::string_view pubkey);
void verifyNostrEvent(secp256k1_context *secpCtx, const NostrIndex::Event *flat, const tao::json::value &origJson);
void verifyNostrEventJsonSize(std::string_view jsonStr);
void verifyEventTimestamp(const NostrIndex::Event *flat);
void parseAndVerifyEvent(const tao::json::value &origJson, secp256k1_context *secpCtx, bool verifyMsg, bool verifyTime, std::string &flatStr, std::string &jsonStr);
// Does not do verification!
inline const NostrIndex::Event *flatStrToFlatEvent(std::string_view flatStr) {
return flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
}
std::optional<defaultDb::environment::View_Event> lookupEventById(lmdb::txn &txn, std::string_view id);
uint64_t getMostRecentEventId(lmdb::txn &txn);
std::string_view getEventJson(lmdb::txn &txn, uint64_t quadId);
inline quadrable::Key flatEventToQuadrableKey(const NostrIndex::Event *flat) {
return quadrable::Key::fromIntegerAndHash(flat->created_at(), sv(flat->id()).substr(0, 23));
}
enum class EventWriteStatus {
Pending,
Written,
Duplicate,
Replaced,
Deleted,
};
struct EventToWrite {
std::string flatStr;
std::string jsonStr;
uint64_t receivedAt;
void *userData = nullptr;
uint64_t nodeId = 0;
EventWriteStatus status = EventWriteStatus::Pending;
};
void writeEvents(lmdb::txn &txn, quadrable::Quadrable &qdb, std::deque<EventToWrite> &evs);

248
src/filters.h Normal file
View File

@ -0,0 +1,248 @@
#pragma once
#include "golpe.h"
#include "constants.h"
struct FilterSetBytes {
struct Item {
uint16_t offset;
uint8_t size;
uint8_t firstByte;
};
std::vector<Item> items;
std::string buf;
FilterSetBytes() {}
// Sizes are post-hex decode
void init(const tao::json::value &arrHex, bool hexDecode, size_t minSize, size_t maxSize) {
std::vector<std::string> arr;
uint64_t totalSize = 0;
for (const auto &i : arrHex.get_array()) {
arr.emplace_back(hexDecode ? from_hex(i.get_string(), false) : i.get_string());
size_t itemSize = arr.back().size();
if (itemSize < minSize) throw herr("filter item too small");
if (itemSize > maxSize) throw herr("filter item too large");
totalSize += itemSize;
}
if (arr.size() == 0) throw herr("empty filter item");
std::sort(arr.begin(), arr.end());
for (const auto &item : arr) {
if (items.size() > 0 && item.starts_with(at(items.size() - 1))) continue; // remove duplicates and redundant prefixes
items.emplace_back(Item{ (uint16_t)buf.size(), (uint8_t)item.size(), (uint8_t)item[0] });
buf += item;
}
if (buf.size() > 65535) throw herr("total filter items too large");
}
std::string at(size_t n) const {
if (n >= items.size()) throw("FilterSetBytes access out of bounds");
auto &item = items[n];
return buf.substr(item.offset, item.size);
}
size_t size() const {
return items.size();
}
bool doesMatch(std::string_view candidate) const {
if (candidate.size() == 0) throw herr("invalid candidate");
// Binary search for upper-bound: https://en.cppreference.com/w/cpp/algorithm/upper_bound
ssize_t first = 0, last = items.size(), curr;
ssize_t count = last - first, step;
while (count > 0) {
curr = first;
step = count / 2;
curr += step;
bool comp = (uint8_t)candidate[0] != items[curr].firstByte
? (uint8_t)candidate[0] < items[curr].firstByte
: candidate < std::string_view(buf.data() + items[curr].offset, items[curr].size);
if (!comp) {
first = ++curr;
count -= step + 1;
} else {
count = step;
}
}
if (first == 0) return false;
if (candidate.starts_with(std::string_view(buf.data() + items[first - 1].offset, items[first - 1].size))) return true;
return false;
}
};
struct FilterSetUint {
std::vector<uint64_t> items;
FilterSetUint() {}
void init(const tao::json::value &arr) {
for (const auto &i : arr.get_array()) {
items.push_back(i.get_unsigned());
}
if (items.size() == 0) throw herr("empty filter item");
std::sort(items.begin(), items.end());
items.erase(std::unique(items.begin(), items.end()), items.end()); // remove duplicates
}
uint64_t at(size_t n) const {
if (n >= items.size()) throw("FilterSetBytes access out of bounds");
return items[n];
}
size_t size() const {
return items.size();
}
bool doesMatch(uint64_t candidate) const {
return std::binary_search(items.begin(), items.end(), candidate);
}
};
struct NostrFilter {
FilterSetBytes ids;
FilterSetBytes authors;
FilterSetUint kinds;
std::map<char, FilterSetBytes> tags;
uint64_t since = 0;
uint64_t until = MAX_U64;
uint64_t limit = MAX_U64;
bool indexOnlyScans = false;
explicit NostrFilter(const tao::json::value &filterObj) {
uint64_t numMajorFields = 0;
for (const auto &[k, v] : filterObj.get_object()) {
if (k == "ids") {
ids.init(v, true, 1, 32);
numMajorFields++;
} else if (k == "authors") {
authors.init(v, true, 1, 32);
numMajorFields++;
} else if (k == "kinds") {
kinds.init(v);
numMajorFields++;
} else if (k.starts_with('#')) {
numMajorFields++;
if (k.size() == 2) {
char tag = k[1];
auto [it, _] = tags.emplace(tag, FilterSetBytes{});
if (tag == 'p' || tag == 'e') {
it->second.init(v, true, 32, 32);
} else {
it->second.init(v, false, 1, cfg().events__maxTagValSize);
}
} else {
throw herr("unindexed tag filter");
}
} else if (k == "since") {
since = v.get_unsigned();
} else if (k == "until") {
until = v.get_unsigned();
} else if (k == "limit") {
limit = v.get_unsigned();
} else {
throw herr("unrecognised filter item");
}
}
if (tags.size() > 2) throw herr("too many tags in filter"); // O(N^2) in matching, just prohibit it
if (limit > cfg().relay__maxFilterLimit) limit = cfg().relay__maxFilterLimit;
indexOnlyScans = numMajorFields <= 1;
// FIXME: pubkeyKind scan could be serviced index-only too
}
bool doesMatchTimes(uint64_t created) const {
if (created < since) return false;
if (created > until) return false;
return true;
}
bool doesMatch(const NostrIndex::Event *ev) const {
if (!doesMatchTimes(ev->created_at())) return false;
if (ids.size() && !ids.doesMatch(sv(ev->id()))) return false;
if (authors.size() && !authors.doesMatch(sv(ev->pubkey()))) return false;
if (kinds.size() && !kinds.doesMatch(ev->kind())) return false;
for (const auto &[tag, filt] : tags) {
bool foundMatch = false;
for (const auto &tagPair : *(ev->tags())) {
auto eventTag = tagPair->key();
if (eventTag == tag && filt.doesMatch(sv(tagPair->val()))) {
foundMatch = true;
break;
}
}
if (!foundMatch) return false;
}
return true;
}
};
struct NostrFilterGroup {
std::vector<NostrFilter> filters;
// Note that this expects the full array, so the first two items are "REQ" and the subId
NostrFilterGroup(const tao::json::value &req) {
const auto &arr = req.get_array();
if (arr.size() < 3) throw herr("too small");
for (size_t i = 2; i < arr.size(); i++) {
filters.emplace_back(arr[i]);
}
}
// Hacky! Deserves a refactor
static NostrFilterGroup unwrapped(tao::json::value filter) {
if (!filter.is_array()) {
filter = tao::json::value::array({ filter });
}
tao::json::value pretendReqQuery = tao::json::value::array({ "REQ", "junkSub" });
for (auto &e : filter.get_array()) {
pretendReqQuery.push_back(e);
}
return NostrFilterGroup(pretendReqQuery);
}
bool doesMatch(const NostrIndex::Event *ev) const {
for (const auto &f : filters) {
if (f.doesMatch(ev)) return true;
}
return false;
}
size_t size() const {
return filters.size();
}
};

29
src/yesstr.h Normal file
View File

@ -0,0 +1,29 @@
#pragma once
#include "golpe.h"
inline void verifyYesstrRequest(std::string_view msg) {
if (!msg.starts_with("Y")) throw herr("invalid yesstr magic char");
msg = msg.substr(1);
auto verifier = flatbuffers::Verifier(reinterpret_cast<const uint8_t*>(msg.data()), msg.size());
bool ok = verifier.VerifyBuffer<Yesstr::Request>(nullptr);
if (!ok) throw herr("yesstr request verification failed");
}
inline void verifyYesstrResponse(std::string_view msg) {
if (!msg.starts_with("Y")) throw herr("invalid yesstr magic char");
msg = msg.substr(1);
auto verifier = flatbuffers::Verifier(reinterpret_cast<const uint8_t*>(msg.data()), msg.size());
bool ok = verifier.VerifyBuffer<Yesstr::Response>(nullptr);
if (!ok) throw herr("yesstr response verification failed");
}
inline const Yesstr::Request *parseYesstrRequest(std::string_view msg) {
return flatbuffers::GetRoot<Yesstr::Request>(msg.substr(1).data());
}
inline const Yesstr::Response *parseYesstrResponse(std::string_view msg) {
return flatbuffers::GetRoot<Yesstr::Response>(msg.substr(1).data());
}