feat: setup gRPC client

This commit is contained in:
Kieran 2023-11-28 22:01:14 +00:00
parent ac85adbca4
commit 448eecb114
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
7 changed files with 194 additions and 69 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
build/
build/
.DS_Store

View File

@ -84,6 +84,12 @@
"functional": "cpp",
"iterator": "cpp",
"memory_resource": "cpp",
"queue": "cpp"
"queue": "cpp",
"codecvt": "cpp",
"condition_variable": "cpp",
"csignal": "cpp",
"shared_mutex": "cpp",
"source_location": "cpp",
"typeindex": "cpp"
}
}

View File

@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.5)
cmake_minimum_required(VERSION 3.27)
project(relay_middleware VERSION 0.1.0)
if(NOT CMAKE_BUILD_TYPE)
@ -9,11 +9,40 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_LOWER)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(Threads REQUIRED)
find_package(OpenSSL REQUIRED)
find_package(Protobuf CONFIG REQUIRED)
find_package(absl CONFIG REQUIRED)
find_package(gRPC CONFIG REQUIRED)
# Protobuf
get_filename_component(api_proto "proto/api.proto" ABSOLUTE)
get_filename_component(api_proto_path "${api_proto}" PATH)
add_custom_command(
OUTPUT "api.pb.h" "api.pb.cc" "api.grpc.pb.cc" "api.grpc.pb.h"
COMMAND $<TARGET_FILE:protobuf::protoc>
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${api_proto_path}"
--plugin=protoc-gen-grpc="$<TARGET_FILE:gRPC::grpc_cpp_plugin>"
"${api_proto}"
DEPENDS "${api_proto}")
# Main exe
add_executable(relay_middleware
src/main.cpp
"api.pb.cc"
"api.grpc.pb.cc"
)
target_include_directories(relay_middleware PUBLIC
"include"
"uWebSockets/src"
"uWebSockets/uSockets/src"
${CMAKE_CURRENT_BINARY_DIR}
${Protobuf_INCLUDE_DIRS}
)
target_include_directories(relay_middleware PUBLIC "include" "uWebSockets/src" "uWebSockets/uSockets/src")
# uWebSockets
add_library(uWs STATIC
uWebSockets/uSockets/src/udp.c
uWebSockets/uSockets/src/socket.c
@ -28,14 +57,18 @@ add_library(uWs STATIC
uWebSockets/uSockets/src/eventing/gcd.c
uWebSockets/uSockets/src/eventing/libuv.c
)
target_include_directories(uWs PRIVATE "uWebSockets/src" "uWebSockets/uSockets/src" "/opt/homebrew/opt/openssl@3.0/include")
target_include_directories(uWs PRIVATE "uWebSockets/src" "uWebSockets/uSockets/src" ${OPENSSL_INCLUDE_DIR})
target_compile_definitions(uWs PRIVATE LIBUS_USE_OPENSSL)
find_library(LIBSSL_LIB ssl)
find_library(LIBCRYPTO_LIB crypto)
target_link_libraries(uWs PRIVATE z ${LIBCRYPTO_LIB} ${LIBSSL_LIB})
target_link_libraries(uWs PRIVATE z ${OPENSSL_CRYPTO_LIBRARY} ${OPENSSL_SSL_LIBRARY})
# Linking
target_link_libraries(relay_middleware PRIVATE
uWs
protobuf::libprotobuf
gRPC::grpc++
)
if (${CMAKE_CXX_COMPILER_ID} STREQUAL GNU OR ${CMAKE_CXX_COMPILER_ID} STREQUAL AppleClang)
target_link_libraries(relay_middleware PRIVATE uWs)
elseif (${CMAKE_CXX_COMPILER_ID} STREQUAL MSVC)
target_include_directories(relay_middleware PUBLIC ${VCPKG_INCLUDE_DIR})
target_link_directories(relay_middleware PRIVATE ${VCPKG_LIB_DIR})

23
include/api.hpp Normal file
View File

@ -0,0 +1,23 @@
#pragma once
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include "api.grpc.pb.h"
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
namespace relay_middleware
{
class RelayHandlerClient
{
public:
RelayHandlerClient(std::shared_ptr<Channel> channel) : stub_(RelayHandler::NewStub(channel)) {}
private:
std::unique_ptr<RelayHandler::Stub> stub_;
CompletionQueue cq_;
};
}

View File

@ -1,7 +1,9 @@
#pragma once
#include <queue>
#include <grpcpp/grpcpp.h>
#include "request.hpp"
#include "api.hpp"
using namespace relay_middleware::nostr;
@ -11,11 +13,21 @@ namespace relay_middleware::proxy
{
private:
std::queue<const NostrRequest> req_queue;
RelayHandlerClient client;
public:
RelayProxy(const std::string &backend) : client(grpc::CreateChannel(backend, grpc::InsecureChannelCredentials()))
{
}
auto push(const NostrRequest &&req) -> void
{
req_queue.push(std::move(req));
}
auto next() -> void
{
auto req = req_queue.back();
}
};
}

39
proto/api.proto Normal file
View File

@ -0,0 +1,39 @@
syntax = "proto3";
package relay_middleware;
service RelayHandler {
rpc HandleEvent(EventRequest) returns (EventReply) {}
}
message Event {
bytes id = 1; // 32-byte SHA256 hash of serialized event
bytes pubkey = 2; // 32-byte public key of event creator
fixed64 created_at = 3; // UNIX timestamp provided by event creator
uint64 kind = 4; // event kind
string content = 5; // arbitrary event contents
repeated TagEntry tags = 6; // event tag array
bytes sig = 7; // 32-byte signature of the event id
// Individual values for a single tag
message TagEntry { repeated string values = 1; }
}
message EventRequest {
// the event to be admitted for further relay processing
Event event = 1;
// IP address of the client that submitted the event
string ip_addr = 2;
// HTTP origin header from the client, if one exists
string origin = 3;
// HTTP user-agent header from the client, if one exists
string user_agent = 4;
// the public key associated with a NIP-42
// AUTH'd session, if authentication occurred
bytes auth_pubkey = 5;
}
message EventReply {
string id = 1;
bool accepted = 2;
string message = 3;
}

View File

@ -7,69 +7,80 @@
#include "socket.hpp"
#include "state.h"
void handleMessage(const std::string_view &message, const StateObj *state)
{
try
{
auto j = nlohmann::json::parse(message);
if (!j.is_array())
{
return;
}
auto q = j[0].get<std::string>();
if (q == "REQ")
{
auto filter = NostrRequest::from_json(j);
state->proxy->push(std::move(filter));
}
else if (q == "EVENT")
{
auto id = j[1]["id"].get<std::string>();
// do something
state->socket->ok(id, false, "not implemented");
}
else if (q == "CLOSE")
{
// nothing
}
else
{
state->socket->notice("Command not supported");
}
}
catch (const std::exception &e)
{
state->socket->notice("Unknown error: " + std::string(e.what()));
}
}
int main()
{
uWS::App().ws<StateObj>("/", {.compression = uWS::SHARED_COMPRESSOR,
.maxPayloadLength = 16 * 1024 * 1024,
.idleTimeout = 16,
.maxBackpressure = 1 * 1024 * 1024,
.closeOnBackpressureLimit = false,
.resetIdleTimeoutOnSend = false,
.sendPingsAutomatically = true,
.upgrade = nullptr,
.open = [](auto *ws)
{
StateObj *state = ws->getUserData();
std::cout << "New connection" << std::endl;
state->socket = new relay_middleware::nostr::NostrSocket(ws);
state->proxy = new relay_middleware::proxy::RelayProxy(); },
.close = [](auto *ws, auto code, auto msg)
{
StateObj *state = ws->getUserData();
delete state->proxy;
delete state->socket; },
.message = [](auto *ws, std::string_view message, uWS::OpCode opCode)
{
std::cout << "In: " << message << std::endl;
if (opCode == uWS::OpCode::TEXT)
{
StateObj *state = ws->getUserData();
try {
auto j = nlohmann::json::parse(message);
if (!j.is_array())
{
return;
}
auto q = j[0].get<std::string>();
if (q == "REQ")
{
auto filter = NostrRequest::from_json(j);
std::cout << "Out: " << filter.to_json() << std::endl;
state->socket->eose(filter.id);
}
else if (q == "EVENT")
{
auto id = j[1]["id"].get<std::string>();
// do something
state->socket->ok(id, false, "not implemented");
} else if (q == "CLOSE") {
// nothing
} else {
state->socket->notice("Command not supported");
}
}catch (const std::exception& e) {
state->socket->notice("Unknown error: " + std::string(e.what()));
}
} }})
.listen(3334, [](auto *listen_socket)
{
if (listen_socket) {
auto openHandler = [](auto *ws)
{
StateObj *state = ws->getUserData();
std::cout << "New connection" << std::endl;
state->socket = new relay_middleware::nostr::NostrSocket(ws);
state->proxy = new relay_middleware::proxy::RelayProxy("http://localhost:5167");
};
auto closeHandler = [](auto *ws, auto code, auto msg)
{
StateObj *state = ws->getUserData();
delete state->proxy;
delete state->socket;
};
auto messageHandler = [](auto *ws, std::string_view message, uWS::OpCode opCode)
{
std::cout << "In: " << message << std::endl;
if (opCode == uWS::OpCode::TEXT)
{
handleMessage(message, ws->getUserData());
}
};
auto listenHandler = [](auto *listen_socket)
{
if (listen_socket)
{
std::cout << "Listening on port " << 3334 << std::endl;
} })
.run();
}
};
uWS::App::WebSocketBehavior<StateObj> params = {};
params.open = openHandler;
params.close = closeHandler;
params.message = messageHandler;
uWS::App().ws<StateObj>("/", std::move(params)).listen(3334, listenHandler).run();
std::cout << "Failed to listen on port 3334" << std::endl;
}