update for new DB format

This commit is contained in:
Doug Hoyte
2024-09-12 16:52:11 -04:00
parent b1781d6576
commit dda300a11e
4 changed files with 53 additions and 40 deletions

View File

@ -157,11 +157,13 @@ struct AlgoParseState {
if (parsedKey.s == pubkey && parsedKey.n1 == kind) { if (parsedKey.s == pubkey && parsedKey.n1 == kind) {
auto levId = lmdb::from_sv<uint64_t>(v); auto levId = lmdb::from_sv<uint64_t>(v);
auto ev = lookupEventByLevId(txn, levId); auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
for (const auto &tagPair : *(ev.flat_nested()->tagsFixed32())) { packed.foreachTag([&](char tagName, std::string_view tagVal){
if ((char)tagPair->key() != 'p') continue; if (tagName != 'p') return true;
output.insert(std::string(sv(tagPair->val()))); output.insert(std::string(tagVal));
} return true;
});
} }
return false; return false;

View File

@ -36,21 +36,24 @@ struct AlgoScanner {
if (output.size() > limit) return false; if (output.size() > limit) return false;
auto ev = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v)); auto ev = lookupEventByLevId(txn, lmdb::from_sv<uint64_t>(v));
auto kind = ev.flat_nested()->kind(); PackedEventView packed(ev.buf);
auto id = sv(ev.flat_nested()->id());
auto kind = packed.kind();
auto id = packed.id();
if (kind == 1) { if (kind == 1) {
auto pubkey = std::string(sv(ev.flat_nested()->pubkey())); auto pubkey = std::string(packed.pubkey());
bool foundETag = false; bool foundETag = false;
for (const auto &tagPair : *(ev.flat_nested()->tagsFixed32())) { packed.foreachTag([&](char tagName, std::string_view tagVal){
if ((char)tagPair->key() == 'e') { if (tagName == 'e') {
auto tagEventId = std::string(sv(tagPair->val())); auto tagEventId = std::string(tagVal);
eventInfoCache.emplace(tagEventId, EventInfo{}); eventInfoCache.emplace(tagEventId, EventInfo{});
eventInfoCache[tagEventId].comments++; eventInfoCache[tagEventId].comments++;
foundETag = true; foundETag = true;
} }
} return true;
});
if (foundETag) return true; // not root event if (foundETag) return true; // not root event
eventInfoCache.emplace(id, EventInfo{}); eventInfoCache.emplace(id, EventInfo{});
@ -62,18 +65,19 @@ struct AlgoScanner {
output.emplace_back(FilteredEvent{ev.primaryKeyId, std::string(id), eventInfo}); output.emplace_back(FilteredEvent{ev.primaryKeyId, std::string(id), eventInfo});
} else if (kind == 7) { } else if (kind == 7) {
auto pubkey = std::string(sv(ev.flat_nested()->pubkey())); auto pubkey = std::string(packed.pubkey());
//if (a.voters && !a.voters->contains(pubkey)) return true; //if (a.voters && !a.voters->contains(pubkey)) return true;
const auto &tagsArr = *(ev.flat_nested()->tagsFixed32()); std::optional<std::string_view> lastETag;
for (auto it = tagsArr.rbegin(); it != tagsArr.rend(); ++it) { packed.foreachTag([&](char tagName, std::string_view tagVal){
auto tagPair = *it; if (tagName == 'e') lastETag = tagVal;
if ((char)tagPair->key() == 'e') { return true;
auto tagEventId = std::string(sv(tagPair->val())); });
eventInfoCache.emplace(tagEventId, EventInfo{});
eventInfoCache[tagEventId].score++; if (lastETag) {
break; auto tagEventId = std::string(*lastETag);
} eventInfoCache.emplace(tagEventId, EventInfo{});
eventInfoCache[tagEventId].score++;
} }
} }
@ -97,11 +101,13 @@ struct AlgoScanner {
if (parsedKey.s == pubkey && parsedKey.n1 == kind) { if (parsedKey.s == pubkey && parsedKey.n1 == kind) {
auto levId = lmdb::from_sv<uint64_t>(v); auto levId = lmdb::from_sv<uint64_t>(v);
auto ev = lookupEventByLevId(txn, levId); auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
for (const auto &tagPair : *(ev.flat_nested()->tagsFixed32())) { packed.foreachTag([&](char tagName, std::string_view tagVal){
if ((char)tagPair->key() != 'p') continue; if (tagName != 'p') return true;
output.insert(std::string(sv(tagPair->val()))); output.insert(std::string(tagVal));
} return true;
});
} }
return false; return false;

View File

@ -142,9 +142,10 @@ struct User {
auto levId = lmdb::from_sv<uint64_t>(v); auto levId = lmdb::from_sv<uint64_t>(v);
auto ev = lookupEventByLevId(txn, levId); auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
if (ev.flat_nested()->kind() == 3) { if (packed.kind() == 3) {
auto pubkey = std::string(sv(ev.flat_nested()->pubkey())); auto pubkey = std::string(packed.pubkey());
if (!alreadySeen.contains(pubkey)) { if (!alreadySeen.contains(pubkey)) {
alreadySeen.insert(pubkey); alreadySeen.insert(pubkey);
@ -206,19 +207,23 @@ struct Event {
std::string getId() const { std::string getId() const {
return std::string(sv(ev.flat_nested()->id())); PackedEventView packed(ev.buf);
return std::string(packed.id());
} }
uint64_t getKind() const { uint64_t getKind() const {
return ev.flat_nested()->kind(); PackedEventView packed(ev.buf);
return packed.kind();
} }
uint64_t getCreatedAt() const { uint64_t getCreatedAt() const {
return ev.flat_nested()->created_at(); PackedEventView packed(ev.buf);
return packed.created_at();
} }
std::string getPubkey() const { std::string getPubkey() const {
return std::string(sv(ev.flat_nested()->pubkey())); PackedEventView packed(ev.buf);
return std::string(packed.pubkey());
} }
std::string getNoteId() const { std::string getNoteId() const {

View File

@ -10,16 +10,16 @@ void WebServer::runWriter(ThreadPool<MsgWebWriter>::Thread &thr) {
secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
PluginEventSifter writePolicy; PluginEventSifter writePolicy;
NegentropyFilterCache neFilterCache;
while(1) { while(1) {
auto newMsgs = thr.inbox.pop_all(); auto newMsgs = thr.inbox.pop_all();
auto now = hoytech::curr_time_us();
std::vector<EventToWrite> newEvents; std::vector<EventToWrite> newEvents;
for (auto &newMsg : newMsgs) { for (auto &newMsg : newMsgs) {
if (auto msg = std::get_if<MsgWebWriter::Request>(&newMsg.msg)) { if (auto msg = std::get_if<MsgWebWriter::Request>(&newMsg.msg)) {
auto &req = msg->req; auto &req = msg->req;
EventSourceType sourceType = req.ipAddr.size() == 4 ? EventSourceType::IP4 : EventSourceType::IP6;
Url u(req.url); Url u(req.url);
if (u.path.size() != 1 || u.path[0] != "submit-post") { if (u.path.size() != 1 || u.path[0] != "submit-post") {
@ -27,23 +27,23 @@ void WebServer::runWriter(ThreadPool<MsgWebWriter>::Thread &thr) {
continue; continue;
} }
std::string flatStr, jsonStr; std::string packedStr, jsonStr;
try { try {
tao::json::value json = tao::json::from_string(req.body); tao::json::value json = tao::json::from_string(req.body);
parseAndVerifyEvent(json, secpCtx, true, true, flatStr, jsonStr); parseAndVerifyEvent(json, secpCtx, true, true, packedStr, jsonStr);
} catch(std::exception &e) { } catch(std::exception &e) {
sendHttpResponse(req, tao::json::to_string(tao::json::value({{ "message", e.what() }})), "404 Not Found", "application/json; charset=utf-8"); sendHttpResponse(req, tao::json::to_string(tao::json::value({{ "message", e.what() }})), "404 Not Found", "application/json; charset=utf-8");
continue; continue;
} }
newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), now, sourceType, req.ipAddr, &req); newEvents.emplace_back(std::move(packedStr), std::move(jsonStr), &req);
} }
} }
try { try {
auto txn = env.txn_rw(); auto txn = env.txn_rw();
writeEvents(txn, newEvents); writeEvents(txn, neFilterCache, newEvents);
txn.commit(); txn.commit();
} catch (std::exception &e) { } catch (std::exception &e) {
LE << "Error writing " << newEvents.size() << " events: " << e.what(); LE << "Error writing " << newEvents.size() << " events: " << e.what();
@ -61,8 +61,8 @@ void WebServer::runWriter(ThreadPool<MsgWebWriter>::Thread &thr) {
for (auto &newEvent : newEvents) { for (auto &newEvent : newEvents) {
auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(newEvent.flatStr.data()); PackedEventView packed(newEvent.packedStr);
auto eventIdHex = to_hex(sv(flat->id())); auto eventIdHex = to_hex(packed.id());
tao::json::value output = tao::json::empty_object; tao::json::value output = tao::json::empty_object;
std::string message; std::string message;
@ -71,7 +71,7 @@ void WebServer::runWriter(ThreadPool<MsgWebWriter>::Thread &thr) {
LI << "Inserted event. id=" << eventIdHex << " levId=" << newEvent.levId; LI << "Inserted event. id=" << eventIdHex << " levId=" << newEvent.levId;
output["message"] = message = "ok"; output["message"] = message = "ok";
output["written"] = true; output["written"] = true;
output["event"] = encodeBech32Simple("note", sv(flat->id())); output["event"] = encodeBech32Simple("note", packed.id());
} else if (newEvent.status == EventWriteStatus::Duplicate) { } else if (newEvent.status == EventWriteStatus::Duplicate) {
output["message"] = message = "duplicate: have this event"; output["message"] = message = "duplicate: have this event";
output["written"] = true; output["written"] = true;