update(userspace/falco): reorganize grpc server

Co-authored-by: Lorenzo Fontana <lo@linux.com>
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2019-09-03 10:27:47 +00:00 committed by Leo Di Donato
parent 5d0266a09e
commit abfd6d8a1a
2 changed files with 85 additions and 80 deletions

View File

@ -17,9 +17,6 @@ limitations under the License.
*/ */
#include <iostream> #include <iostream>
#include <memory>
#include <string>
#include <thread>
#ifdef GRPC_INCLUDE_IS_GRPCPP #ifdef GRPC_INCLUDE_IS_GRPCPP
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
@ -28,8 +25,7 @@ limitations under the License.
#endif #endif
#include "grpc_server.h" #include "grpc_server.h"
#include "falco_output.grpc.pb.h" #include "grpc_context.h"
#include "falco_output.pb.h"
bool grpc_server_impl::is_running() bool grpc_server_impl::is_running()
{ {
@ -37,22 +33,8 @@ bool grpc_server_impl::is_running()
return true; return true;
} }
class grpc_server : public grpc_server_impl void grpc_server::thread_process(int thread_index)
{ {
public:
grpc_server(const char* server_addr, int threadiness):
server_addr(server_addr),
threadiness(threadiness)
{
}
virtual ~grpc_server() = default;
// Run() is blocked. It doesn't return until Stop() is called from another thread.
void Run();
void thread_process(int threadIndex)
{
// TODO: is this right? That's what we want? // TODO: is this right? That's what we want?
// Tell pthread to not handle termination signals in the current thread // Tell pthread to not handle termination signals in the current thread
sigset_t set; sigset_t set;
@ -62,8 +44,8 @@ public:
pthread_sigmask(SIG_BLOCK, &set, nullptr); pthread_sigmask(SIG_BLOCK, &set, nullptr);
void* tag = nullptr; void* tag = nullptr;
bool eventReadSuccess = false; bool event_read_success = false;
while(completion_queue->Next(&tag, &eventReadSuccess)) while(m_completion_queue->Next(&tag, &event_read_success))
{ {
if(tag == nullptr) if(tag == nullptr)
{ {
@ -71,26 +53,25 @@ public:
continue; continue;
} }
} }
} }
// There is no shutdown handling in this code. void grpc_server::run()
void run() {
{
grpc::ServerBuilder builder; grpc::ServerBuilder builder;
// Listen on the given address without any authentication mechanism. // Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_addr, grpc::InsecureServerCredentials()); builder.AddListeningPort(m_server_addr, grpc::InsecureServerCredentials());
// builder.RegisterService(&falco_output_svc); // TODO: enable this when we do the impl // builder.RegisterService(&falco_output_svc); // TODO: enable this when we do the impl
completion_queue = builder.AddCompletionQueue(); m_completion_queue = builder.AddCompletionQueue();
server = builder.BuildAndStart(); m_server = builder.BuildAndStart();
std::cout << "Server listening on " << server_addr << std::endl; std::cout << "Server listening on " << m_server_addr << std::endl;
// int context_count = threadiness * 10; // int context_count = threadiness * 10;
threads.resize(threadiness); m_threads.resize(m_threadiness);
int thread_idx = 0; int thread_idx = 0;
for(std::thread& thread : threads) for(std::thread& thread : m_threads)
{ {
thread = std::thread(&grpc_server::thread_process, this, thread_idx++); thread = std::thread(&grpc_server::thread_process, this, thread_idx++);
} }
@ -98,22 +79,14 @@ public:
while(is_running()) while(is_running())
{ {
} }
} }
private: void grpc_server::subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res)
// FalcoOutputService::AsyncService falco_output_svc; {
std::unique_ptr<grpc::Server> server; }
std::string server_addr;
int threadiness = 0; void start_grpc_server(std::string server_address, int threadiness)
std::unique_ptr<grpc::ServerCompletionQueue> completion_queue; {
std::vector<std::thread> threads; grpc_server srv(server_address, threadiness);
}; srv.run();
bool start_grpc_server(unsigned short port, int threadiness)
{
// TODO: make bind address configurable
std::string server_addr = "0.0.0.0:" + std::to_string(port);
grpc_server srv(server_addr.c_str(), threadiness);
srv.run();
return true;
} }

View File

@ -18,6 +18,13 @@ limitations under the License.
#pragma once #pragma once
#include <thread>
#include <string>
#include "falco_output.grpc.pb.h"
#include "falco_output.pb.h"
#include "grpc_context.h"
class grpc_server_impl class grpc_server_impl
{ {
public: public:
@ -26,6 +33,31 @@ public:
protected: protected:
bool is_running(); bool is_running();
void subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res);
}; };
bool start_grpc_server(unsigned short port, int threadiness); class grpc_server : public grpc_server_impl
{
public:
grpc_server(std::string server_addr, int threadiness):
m_server_addr(server_addr),
m_threadiness(threadiness)
{
}
virtual ~grpc_server() = default;
void thread_process(int thread_index);
void run();
void subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res);
private:
// falco_output_service::AsyncService falco_output_svc;
std::unique_ptr<grpc::Server> m_server;
std::string m_server_addr;
int m_threadiness = 0;
std::unique_ptr<grpc::ServerCompletionQueue> m_completion_queue;
std::vector<std::thread> m_threads;
};
void start_grpc_server(std::string server_address, int threadiness);