update(userspace/falco): falco_grpc_server is now just server

Co-Authored-By: Leonardo Di Donato <leodidonato@gmail.com>
Signed-off-by: Lorenzo Fontana <lo@linux.com>
This commit is contained in:
Lorenzo Fontana 2019-09-24 15:41:39 +02:00 committed by Leo Di Donato
parent 203226d347
commit 6cf2ccf857
5 changed files with 31 additions and 31 deletions

View File

@ -455,7 +455,7 @@ int falco_init(int argc, char **argv)
scap_stats cstats; scap_stats cstats;
falco_webserver webserver; falco_webserver webserver;
falco::grpc::falco_grpc_server grpc_server; falco::grpc::server grpc_server;
std::thread grpc_server_thread; std::thread grpc_server_thread;
static struct option long_options[] = static struct option long_options[] =

View File

@ -28,7 +28,7 @@ limitations under the License.
#include "utils.h" #include "utils.h"
template<> template<>
void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::start(falco_grpc_server* srv) void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::start(server* srv)
{ {
m_state = request_context_base::REQUEST; m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext); m_srv_ctx.reset(new ::grpc::ServerContext);
@ -41,7 +41,7 @@ void falco::grpc::request_stream_context<falco::output::request, falco::output::
} }
template<> template<>
void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::process(falco_grpc_server* srv) void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::process(server* srv)
{ {
// When it is the 1st process call // When it is the 1st process call
if(m_state == request_context_base::REQUEST) if(m_state == request_context_base::REQUEST)
@ -70,7 +70,7 @@ void falco::grpc::request_stream_context<falco::output::request, falco::output::
} }
template<> template<>
void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::end(falco_grpc_server* srv, bool errored) void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::end(server* srv, bool errored)
{ {
if(m_stream_ctx) if(m_stream_ctx)
{ {
@ -84,7 +84,7 @@ void falco::grpc::request_stream_context<falco::output::request, falco::output::
start(srv); start(srv);
} }
void falco::grpc::falco_grpc_server::thread_process(int thread_index) void falco::grpc::server::thread_process(int thread_index)
{ {
void* tag = nullptr; void* tag = nullptr;
@ -140,12 +140,12 @@ void falco::grpc::falco_grpc_server::thread_process(int thread_index)
std::vector<request_stream_context<REQ, RESP>> RPC##_contexts(CONTEXT_COUNT); \ std::vector<request_stream_context<REQ, RESP>> RPC##_contexts(CONTEXT_COUNT); \
for(request_stream_context<REQ, RESP> & ctx : RPC##_contexts) \ for(request_stream_context<REQ, RESP> & ctx : RPC##_contexts) \
{ \ { \
ctx.m_process_func = &falco_grpc_server::IMPL; \ ctx.m_process_func = &server::IMPL; \
ctx.m_request_func = &service::AsyncService::Request##RPC; \ ctx.m_request_func = &service::AsyncService::Request##RPC; \
ctx.start(this); \ ctx.start(this); \
} }
void falco::grpc::falco_grpc_server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs) void falco::grpc::server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs)
{ {
m_server_addr = server_addr; m_server_addr = server_addr;
m_threadiness = threadiness; m_threadiness = threadiness;
@ -154,7 +154,7 @@ void falco::grpc::falco_grpc_server::init(std::string server_addr, int threadine
m_root_certs = root_certs; m_root_certs = root_certs;
} }
void falco::grpc::falco_grpc_server::run() void falco::grpc::server::run()
{ {
string private_key; string private_key;
string cert_chain; string cert_chain;
@ -181,12 +181,12 @@ void falco::grpc::falco_grpc_server::run()
m_completion_queue = builder.AddCompletionQueue(); m_completion_queue = builder.AddCompletionQueue();
m_server = builder.BuildAndStart(); m_server = builder.BuildAndStart();
falco_logger::log(LOG_INFO, "Starting gRPC webserver at " + m_server_addr + "\n"); falco_logger::log(LOG_INFO, "Starting gRPC server at " + m_server_addr + "\n");
// Create context for server threads // Create context for server threads
// The number of contexts is multiple of the number of threads // The number of contexts is multiple of the number of threads
// This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC) // This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC)
// For this approach to be sufficient falco_grpc_server::IMPL have to be fast // For this approach to be sufficient server::IMPL have to be fast
int context_count = m_threadiness * 10; int context_count = m_threadiness * 10;
PROCESS_STREAM(request, response, subscribe, subscribe, context_count) PROCESS_STREAM(request, response, subscribe, subscribe, context_count)
@ -194,17 +194,17 @@ void falco::grpc::falco_grpc_server::run()
int thread_idx = 0; int thread_idx = 0;
for(std::thread& thread : m_threads) for(std::thread& thread : m_threads)
{ {
thread = std::thread(&falco_grpc_server::thread_process, this, thread_idx++); thread = std::thread(&server::thread_process, this, thread_idx++);
} }
while(falco_grpc_server_impl::is_running()) while(server_impl::is_running())
{ {
sleep(1); sleep(1);
} }
stop(); stop();
} }
void falco::grpc::falco_grpc_server::stop() void falco::grpc::server::stop()
{ {
falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n"); falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n");
m_server->Shutdown(); m_server->Shutdown();
@ -228,5 +228,5 @@ void falco::grpc::falco_grpc_server::stop()
{ {
} }
falco_logger::log(LOG_INFO, "gRPC shutdown is now complete\n"); falco_logger::log(LOG_INFO, "Shutting down gRPC server complete\n");
} }

View File

@ -27,13 +27,13 @@ namespace falco
{ {
namespace grpc namespace grpc
{ {
class falco_grpc_server : public falco_grpc_server_impl class server : public server_impl
{ {
public: public:
falco_grpc_server() server()
{ {
} }
falco_grpc_server(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs): server(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs):
m_server_addr(server_addr), m_server_addr(server_addr),
m_threadiness(threadiness), m_threadiness(threadiness),
m_private_key(private_key), m_private_key(private_key),
@ -41,7 +41,7 @@ public:
m_root_certs(root_certs) m_root_certs(root_certs)
{ {
} }
virtual ~falco_grpc_server() = default; virtual ~server() = default;
void init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs); void init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs);
void thread_process(int thread_index); void thread_process(int thread_index);
@ -76,9 +76,9 @@ public:
WRITE, WRITE,
FINISH FINISH
} m_state = UNKNOWN; } m_state = UNKNOWN;
virtual void start(falco_grpc_server* srv) = 0; virtual void start(server* srv) = 0;
virtual void process(falco_grpc_server* srv) = 0; virtual void process(server* srv) = 0;
virtual void end(falco_grpc_server* srv, bool isError) = 0; virtual void end(server* srv, bool isError) = 0;
}; };
// //
@ -94,14 +94,14 @@ public:
~request_stream_context() = default; ~request_stream_context() = default;
// Pointer to function that does actual processing // Pointer to function that does actual processing
void (falco_grpc_server::*m_process_func)(const stream_context&, const Request&, Response&); void (server::*m_process_func)(const stream_context&, const Request&, Response&);
// Pointer to function that requests the system to start processing given requests // Pointer to function that requests the system to start processing given requests
void (service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); void (service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(falco_grpc_server* srv); void start(server* srv);
void process(falco_grpc_server* srv); void process(server* srv);
void end(falco_grpc_server* srv, bool isError); void end(server* srv, bool isError);
private: private:
std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer; std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;

View File

@ -18,7 +18,7 @@ limitations under the License.
#include "grpc_server_impl.h" #include "grpc_server_impl.h"
bool falco::grpc::falco_grpc_server_impl::is_running() bool falco::grpc::server_impl::is_running()
{ {
if(m_stop) if(m_stop)
{ {
@ -27,7 +27,7 @@ bool falco::grpc::falco_grpc_server_impl::is_running()
return true; return true;
} }
void falco::grpc::falco_grpc_server_impl::subscribe(const stream_context& ctx, const output::request& req, output::response& res) void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output::request& req, output::response& res)
{ {
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{ {
@ -52,7 +52,7 @@ void falco::grpc::falco_grpc_server_impl::subscribe(const stream_context& ctx, c
} }
} }
void falco::grpc::falco_grpc_server_impl::shutdown() void falco::grpc::server_impl::shutdown()
{ {
m_stop = true; m_stop = true;
} }

View File

@ -28,11 +28,11 @@ namespace falco
{ {
namespace grpc namespace grpc
{ {
class falco_grpc_server_impl class server_impl
{ {
public: public:
falco_grpc_server_impl() = default; server_impl() = default;
~falco_grpc_server_impl() = default; ~server_impl() = default;
void shutdown(); void shutdown();