From abfd6d8a1a2d8da4358e0b3898e4f3ad2aef261e Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Tue, 3 Sep 2019 10:27:47 +0000 Subject: [PATCH] update(userspace/falco): reorganize grpc server Co-authored-by: Lorenzo Fontana Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_server.cpp | 131 +++++++++++++------------------- userspace/falco/grpc_server.h | 34 ++++++++- 2 files changed, 85 insertions(+), 80 deletions(-) diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 1a7c5d80..5530bb4b 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -17,9 +17,6 @@ limitations under the License. */ #include -#include -#include -#include #ifdef GRPC_INCLUDE_IS_GRPCPP #include @@ -28,8 +25,7 @@ limitations under the License. #endif #include "grpc_server.h" -#include "falco_output.grpc.pb.h" -#include "falco_output.pb.h" +#include "grpc_context.h" bool grpc_server_impl::is_running() { @@ -37,83 +33,60 @@ bool grpc_server_impl::is_running() return true; } -class grpc_server : public grpc_server_impl +void grpc_server::thread_process(int thread_index) { -public: - grpc_server(const char* server_addr, int threadiness): - server_addr(server_addr), - threadiness(threadiness) + // TODO: is this right? That's what we want? + // Tell pthread to not handle termination signals in the current thread + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGHUP); + sigaddset(&set, SIGINT); + pthread_sigmask(SIG_BLOCK, &set, nullptr); + + void* tag = nullptr; + bool event_read_success = false; + while(m_completion_queue->Next(&tag, &event_read_success)) { - } - - virtual ~grpc_server() = default; - - // Run() is blocked. It doesn't return until Stop() is called from another thread. - void Run(); - - void thread_process(int threadIndex) - { - // TODO: is this right? That's what we want? - // Tell pthread to not handle termination signals in the current thread - sigset_t set; - sigemptyset(&set); - sigaddset(&set, SIGHUP); - sigaddset(&set, SIGINT); - pthread_sigmask(SIG_BLOCK, &set, nullptr); - - void* tag = nullptr; - bool eventReadSuccess = false; - while(completion_queue->Next(&tag, &eventReadSuccess)) + if(tag == nullptr) { - if(tag == nullptr) - { - // TODO: empty tag returned, log, what to do? - continue; - } + // TODO: empty tag returned, log, what to do? + continue; } } - - // There is no shutdown handling in this code. - void run() - { - grpc::ServerBuilder builder; - // Listen on the given address without any authentication mechanism. - builder.AddListeningPort(server_addr, grpc::InsecureServerCredentials()); - // builder.RegisterService(&falco_output_svc); // TODO: enable this when we do the impl - - completion_queue = builder.AddCompletionQueue(); - server = builder.BuildAndStart(); - std::cout << "Server listening on " << server_addr << std::endl; - - // int context_count = threadiness * 10; - - threads.resize(threadiness); - - int thread_idx = 0; - for(std::thread& thread : threads) - { - thread = std::thread(&grpc_server::thread_process, this, thread_idx++); - } - - while(is_running()) - { - } - } - -private: - // FalcoOutputService::AsyncService falco_output_svc; - std::unique_ptr server; - std::string server_addr; - int threadiness = 0; - std::unique_ptr completion_queue; - std::vector threads; -}; - -bool start_grpc_server(unsigned short port, int threadiness) -{ - // TODO: make bind address configurable - std::string server_addr = "0.0.0.0:" + std::to_string(port); - grpc_server srv(server_addr.c_str(), threadiness); - srv.run(); - return true; +} + +void grpc_server::run() +{ + grpc::ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(m_server_addr, grpc::InsecureServerCredentials()); + // builder.RegisterService(&falco_output_svc); // TODO: enable this when we do the impl + + m_completion_queue = builder.AddCompletionQueue(); + m_server = builder.BuildAndStart(); + std::cout << "Server listening on " << m_server_addr << std::endl; + + // int context_count = threadiness * 10; + + m_threads.resize(m_threadiness); + + int thread_idx = 0; + for(std::thread& thread : m_threads) + { + thread = std::thread(&grpc_server::thread_process, this, thread_idx++); + } + + while(is_running()) + { + } +} + +void grpc_server::subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res) +{ +} + +void start_grpc_server(std::string server_address, int threadiness) +{ + grpc_server srv(server_address, threadiness); + srv.run(); } diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 35587f4c..df1ff86a 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -18,6 +18,13 @@ limitations under the License. #pragma once +#include +#include + +#include "falco_output.grpc.pb.h" +#include "falco_output.pb.h" +#include "grpc_context.h" + class grpc_server_impl { public: @@ -26,6 +33,31 @@ public: protected: bool is_running(); + + void subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res); }; -bool start_grpc_server(unsigned short port, int threadiness); +class grpc_server : public grpc_server_impl +{ +public: + grpc_server(std::string server_addr, int threadiness): + m_server_addr(server_addr), + m_threadiness(threadiness) + { + } + virtual ~grpc_server() = default; + + void thread_process(int thread_index); + void run(); + void subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res); + +private: + // falco_output_service::AsyncService falco_output_svc; + std::unique_ptr m_server; + std::string m_server_addr; + int m_threadiness = 0; + std::unique_ptr m_completion_queue; + std::vector m_threads; +}; + +void start_grpc_server(std::string server_address, int threadiness);