From 448eecb114791a75dbf6e2c735ae0608897d205c Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 28 Nov 2023 22:01:14 +0000 Subject: [PATCH] feat: setup gRPC client --- .gitignore | 3 +- .vscode/settings.json | 8 ++- CMakeLists.txt | 47 ++++++++++++--- include/api.hpp | 23 ++++++++ include/proxy.hpp | 12 ++++ proto/api.proto | 39 +++++++++++++ src/main.cpp | 131 +++++++++++++++++++++++------------------- 7 files changed, 194 insertions(+), 69 deletions(-) create mode 100644 include/api.hpp create mode 100644 proto/api.proto diff --git a/.gitignore b/.gitignore index d163863..d56cb93 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -build/ \ No newline at end of file +build/ +.DS_Store \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index f0845a5..ed7f2ba 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -84,6 +84,12 @@ "functional": "cpp", "iterator": "cpp", "memory_resource": "cpp", - "queue": "cpp" + "queue": "cpp", + "codecvt": "cpp", + "condition_variable": "cpp", + "csignal": "cpp", + "shared_mutex": "cpp", + "source_location": "cpp", + "typeindex": "cpp" } } \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index bf1b302..626bfe9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.5) +cmake_minimum_required(VERSION 3.27) project(relay_middleware VERSION 0.1.0) if(NOT CMAKE_BUILD_TYPE) @@ -9,11 +9,40 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_LOWER) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) +find_package(Threads REQUIRED) +find_package(OpenSSL REQUIRED) +find_package(Protobuf CONFIG REQUIRED) +find_package(absl CONFIG REQUIRED) +find_package(gRPC CONFIG REQUIRED) + +# Protobuf +get_filename_component(api_proto "proto/api.proto" ABSOLUTE) +get_filename_component(api_proto_path "${api_proto}" PATH) +add_custom_command( + OUTPUT "api.pb.h" "api.pb.cc" "api.grpc.pb.cc" "api.grpc.pb.h" + COMMAND $ + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${api_proto_path}" + --plugin=protoc-gen-grpc="$" + "${api_proto}" + DEPENDS "${api_proto}") + +# Main exe add_executable(relay_middleware src/main.cpp + "api.pb.cc" + "api.grpc.pb.cc" +) +target_include_directories(relay_middleware PUBLIC + "include" + "uWebSockets/src" + "uWebSockets/uSockets/src" + ${CMAKE_CURRENT_BINARY_DIR} + ${Protobuf_INCLUDE_DIRS} ) -target_include_directories(relay_middleware PUBLIC "include" "uWebSockets/src" "uWebSockets/uSockets/src") +# uWebSockets add_library(uWs STATIC uWebSockets/uSockets/src/udp.c uWebSockets/uSockets/src/socket.c @@ -28,14 +57,18 @@ add_library(uWs STATIC uWebSockets/uSockets/src/eventing/gcd.c uWebSockets/uSockets/src/eventing/libuv.c ) -target_include_directories(uWs PRIVATE "uWebSockets/src" "uWebSockets/uSockets/src" "/opt/homebrew/opt/openssl@3.0/include") +target_include_directories(uWs PRIVATE "uWebSockets/src" "uWebSockets/uSockets/src" ${OPENSSL_INCLUDE_DIR}) target_compile_definitions(uWs PRIVATE LIBUS_USE_OPENSSL) -find_library(LIBSSL_LIB ssl) -find_library(LIBCRYPTO_LIB crypto) -target_link_libraries(uWs PRIVATE z ${LIBCRYPTO_LIB} ${LIBSSL_LIB}) +target_link_libraries(uWs PRIVATE z ${OPENSSL_CRYPTO_LIBRARY} ${OPENSSL_SSL_LIBRARY}) +# Linking +target_link_libraries(relay_middleware PRIVATE + uWs + protobuf::libprotobuf + gRPC::grpc++ +) if (${CMAKE_CXX_COMPILER_ID} STREQUAL GNU OR ${CMAKE_CXX_COMPILER_ID} STREQUAL AppleClang) - target_link_libraries(relay_middleware PRIVATE uWs) + elseif (${CMAKE_CXX_COMPILER_ID} STREQUAL MSVC) target_include_directories(relay_middleware PUBLIC ${VCPKG_INCLUDE_DIR}) target_link_directories(relay_middleware PRIVATE ${VCPKG_LIB_DIR}) diff --git a/include/api.hpp b/include/api.hpp new file mode 100644 index 0000000..5b6e7ed --- /dev/null +++ b/include/api.hpp @@ -0,0 +1,23 @@ +#pragma once +#include +#include +#include "api.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; + +namespace relay_middleware +{ + class RelayHandlerClient + { + public: + RelayHandlerClient(std::shared_ptr channel) : stub_(RelayHandler::NewStub(channel)) {} + + private: + std::unique_ptr stub_; + CompletionQueue cq_; + }; +} \ No newline at end of file diff --git a/include/proxy.hpp b/include/proxy.hpp index 906ad50..edba714 100644 --- a/include/proxy.hpp +++ b/include/proxy.hpp @@ -1,7 +1,9 @@ #pragma once #include +#include #include "request.hpp" +#include "api.hpp" using namespace relay_middleware::nostr; @@ -11,11 +13,21 @@ namespace relay_middleware::proxy { private: std::queue req_queue; + RelayHandlerClient client; public: + RelayProxy(const std::string &backend) : client(grpc::CreateChannel(backend, grpc::InsecureChannelCredentials())) + { + } + auto push(const NostrRequest &&req) -> void { req_queue.push(std::move(req)); } + + auto next() -> void + { + auto req = req_queue.back(); + } }; } \ No newline at end of file diff --git a/proto/api.proto b/proto/api.proto new file mode 100644 index 0000000..f375ea0 --- /dev/null +++ b/proto/api.proto @@ -0,0 +1,39 @@ +syntax = "proto3"; + +package relay_middleware; + +service RelayHandler { + rpc HandleEvent(EventRequest) returns (EventReply) {} +} + +message Event { + bytes id = 1; // 32-byte SHA256 hash of serialized event + bytes pubkey = 2; // 32-byte public key of event creator + fixed64 created_at = 3; // UNIX timestamp provided by event creator + uint64 kind = 4; // event kind + string content = 5; // arbitrary event contents + repeated TagEntry tags = 6; // event tag array + bytes sig = 7; // 32-byte signature of the event id + // Individual values for a single tag + message TagEntry { repeated string values = 1; } +} + +message EventRequest { + // the event to be admitted for further relay processing + Event event = 1; + // IP address of the client that submitted the event + string ip_addr = 2; + // HTTP origin header from the client, if one exists + string origin = 3; + // HTTP user-agent header from the client, if one exists + string user_agent = 4; + // the public key associated with a NIP-42 + // AUTH'd session, if authentication occurred + bytes auth_pubkey = 5; +} + +message EventReply { + string id = 1; + bool accepted = 2; + string message = 3; +} \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 6cbf65a..3d4237e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,69 +7,80 @@ #include "socket.hpp" #include "state.h" +void handleMessage(const std::string_view &message, const StateObj *state) +{ + try + { + auto j = nlohmann::json::parse(message); + if (!j.is_array()) + { + return; + } + + auto q = j[0].get(); + if (q == "REQ") + { + auto filter = NostrRequest::from_json(j); + state->proxy->push(std::move(filter)); + } + else if (q == "EVENT") + { + auto id = j[1]["id"].get(); + // do something + state->socket->ok(id, false, "not implemented"); + } + else if (q == "CLOSE") + { + // nothing + } + else + { + state->socket->notice("Command not supported"); + } + } + catch (const std::exception &e) + { + state->socket->notice("Unknown error: " + std::string(e.what())); + } +} + int main() { - uWS::App().ws("/", {.compression = uWS::SHARED_COMPRESSOR, - .maxPayloadLength = 16 * 1024 * 1024, - .idleTimeout = 16, - .maxBackpressure = 1 * 1024 * 1024, - .closeOnBackpressureLimit = false, - .resetIdleTimeoutOnSend = false, - .sendPingsAutomatically = true, - .upgrade = nullptr, - .open = [](auto *ws) - { - StateObj *state = ws->getUserData(); - std::cout << "New connection" << std::endl; - state->socket = new relay_middleware::nostr::NostrSocket(ws); - state->proxy = new relay_middleware::proxy::RelayProxy(); }, - .close = [](auto *ws, auto code, auto msg) - { - StateObj *state = ws->getUserData(); - delete state->proxy; - delete state->socket; }, - .message = [](auto *ws, std::string_view message, uWS::OpCode opCode) - { - std::cout << "In: " << message << std::endl; - if (opCode == uWS::OpCode::TEXT) - { - StateObj *state = ws->getUserData(); - try { - - auto j = nlohmann::json::parse(message); - if (!j.is_array()) - { - return; - } - - auto q = j[0].get(); - if (q == "REQ") - { - auto filter = NostrRequest::from_json(j); - std::cout << "Out: " << filter.to_json() << std::endl; - - state->socket->eose(filter.id); - } - else if (q == "EVENT") - { - auto id = j[1]["id"].get(); - // do something - state->socket->ok(id, false, "not implemented"); - } else if (q == "CLOSE") { - // nothing - } else { - state->socket->notice("Command not supported"); - } - }catch (const std::exception& e) { - state->socket->notice("Unknown error: " + std::string(e.what())); - } - } }}) - .listen(3334, [](auto *listen_socket) - { - if (listen_socket) { + auto openHandler = [](auto *ws) + { + StateObj *state = ws->getUserData(); + std::cout << "New connection" << std::endl; + state->socket = new relay_middleware::nostr::NostrSocket(ws); + state->proxy = new relay_middleware::proxy::RelayProxy("http://localhost:5167"); + }; + auto closeHandler = [](auto *ws, auto code, auto msg) + { + StateObj *state = ws->getUserData(); + delete state->proxy; + delete state->socket; + }; + auto messageHandler = [](auto *ws, std::string_view message, uWS::OpCode opCode) + { + std::cout << "In: " << message << std::endl; + if (opCode == uWS::OpCode::TEXT) + { + handleMessage(message, ws->getUserData()); + } + }; + auto listenHandler = [](auto *listen_socket) + { + if (listen_socket) + { std::cout << "Listening on port " << 3334 << std::endl; - } }) - .run(); + } + }; + + uWS::App::WebSocketBehavior params = {}; + params.open = openHandler; + params.close = closeHandler; + params.message = messageHandler; + + uWS::App().ws("/", std::move(params)).listen(3334, listenHandler).run(); std::cout << "Failed to listen on port 3334" << std::endl; } \ No newline at end of file