new(userspace/falco): namespace for falco grpc

Co-Authored-By: Leonardo Di Donato <leodidonato@gmail.com>
Signed-off-by: Lorenzo Fontana <lo@linux.com>
This commit is contained in:
Lorenzo Fontana 2019-09-24 15:33:44 +02:00 committed by Leo Di Donato
parent 392499f024
commit 203226d347
7 changed files with 55 additions and 59 deletions

View File

@ -455,7 +455,7 @@ int falco_init(int argc, char **argv)
scap_stats cstats; scap_stats cstats;
falco_webserver webserver; falco_webserver webserver;
falco_grpc_server grpc_server; falco::grpc::falco_grpc_server grpc_server;
std::thread grpc_server_thread; std::thread grpc_server_thread;
static struct option long_options[] = static struct option long_options[] =

View File

@ -20,7 +20,7 @@ limitations under the License.
#include "grpc_context.h" #include "grpc_context.h"
context::context(grpc::ServerContext* ctx): falco::grpc::context::context(::grpc::ServerContext* ctx):
m_ctx(ctx) m_ctx(ctx)
{ {
std::string session_id; std::string session_id;
@ -48,12 +48,12 @@ context::context(grpc::ServerContext* ctx):
m_prefix = meta.str(); m_prefix = meta.str();
} }
void context::get_metadata(std::string key, std::string& val) void falco::grpc::context::context::get_metadata(std::string key, std::string& val)
{ {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = m_ctx->client_metadata(); const std::multimap<::grpc::string_ref, ::grpc::string_ref>& client_metadata = m_ctx->client_metadata();
auto it = client_metadata.find(key); auto it = client_metadata.find(key);
if(it != client_metadata.end()) if(it != client_metadata.end())
{ {
val.assign(it->second.data(), it->second.size()); val.assign(it->second.data(), it->second.size());
} }
} }

View File

@ -26,26 +26,31 @@ limitations under the License.
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#endif #endif
namespace falco
{
namespace grpc
{
const std::string meta_session = "session_id"; const std::string meta_session = "session_id";
const std::string meta_request = "request_id"; const std::string meta_request = "request_id";
class context class context
{ {
public: public:
context(grpc::ServerContext* ctx); context(::grpc::ServerContext* ctx);
~context() = default; ~context() = default;
void get_metadata(std::string key, std::string& val); void get_metadata(std::string key, std::string& val);
private: private:
grpc::ServerContext* m_ctx = nullptr; ::grpc::ServerContext* m_ctx = nullptr;
std::string m_prefix; std::string m_prefix;
}; };
class stream_context : public context class stream_context : public context
{ {
public: public:
stream_context(grpc::ServerContext* ctx): stream_context(::grpc::ServerContext* ctx):
context(ctx){}; context(ctx){};
~stream_context() = default; ~stream_context() = default;
@ -59,4 +64,6 @@ public:
mutable void* m_stream = nullptr; mutable void* m_stream = nullptr;
// Are there more responses to stream? // Are there more responses to stream?
mutable bool m_has_more = false; mutable bool m_has_more = false;
}; };
} // namespace grpc
} // namespace falco

View File

@ -25,14 +25,15 @@ limitations under the License.
#include "logger.h" #include "logger.h"
#include "grpc_server.h" #include "grpc_server.h"
#include "grpc_context.h" #include "grpc_context.h"
#include "utils.h"
template<> template<>
void request_stream_context<request, response>::start(falco_grpc_server* srv) void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::start(falco_grpc_server* srv)
{ {
m_state = request_context_base::REQUEST; m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new grpc::ServerContext); m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get(); auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new grpc::ServerAsyncWriter<response>(srvctx)); m_res_writer.reset(new ::grpc::ServerAsyncWriter<response>(srvctx));
m_stream_ctx.reset(); m_stream_ctx.reset();
m_req.Clear(); m_req.Clear();
auto cq = srv->m_completion_queue.get(); auto cq = srv->m_completion_queue.get();
@ -40,7 +41,7 @@ void request_stream_context<request, response>::start(falco_grpc_server* srv)
} }
template<> template<>
void request_stream_context<request, response>::process(falco_grpc_server* srv) void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::process(falco_grpc_server* srv)
{ {
// When it is the 1st process call // When it is the 1st process call
if(m_state == request_context_base::REQUEST) if(m_state == request_context_base::REQUEST)
@ -64,12 +65,12 @@ void request_stream_context<request, response>::process(falco_grpc_server* srv)
// Communicate to the gRPC runtime that we have finished. // Communicate to the gRPC runtime that we have finished.
// The memory address of `this` instance uniquely identifies the event. // The memory address of `this` instance uniquely identifies the event.
m_state = request_context_base::FINISH; m_state = request_context_base::FINISH;
m_res_writer->Finish(grpc::Status::OK, this); m_res_writer->Finish(::grpc::Status::OK, this);
} }
} }
template<> template<>
void request_stream_context<request, response>::end(falco_grpc_server* srv, bool errored) void falco::grpc::request_stream_context<falco::output::request, falco::output::response>::end(falco_grpc_server* srv, bool errored)
{ {
if(m_stream_ctx) if(m_stream_ctx)
{ {
@ -83,7 +84,7 @@ void request_stream_context<request, response>::end(falco_grpc_server* srv, bool
start(srv); start(srv);
} }
void falco_grpc_server::thread_process(int thread_index) void falco::grpc::falco_grpc_server::thread_process(int thread_index)
{ {
void* tag = nullptr; void* tag = nullptr;
@ -144,29 +145,7 @@ void falco_grpc_server::thread_process(int thread_index)
ctx.start(this); \ ctx.start(this); \
} }
// todo(fntlnz, leodido) > cleanup this part (paths from config, read, includes) void falco::grpc::falco_grpc_server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs)
#include <sstream>
#include <fstream>
#include <iostream>
void read(const std::string& filename, std::string& data)
{
std::ifstream file(filename.c_str(), std::ios::in);
if(file.is_open())
{
std::stringstream ss;
ss << file.rdbuf();
file.close();
data = ss.str();
}
return;
}
void falco_grpc_server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs)
{ {
m_server_addr = server_addr; m_server_addr = server_addr;
m_threadiness = threadiness; m_threadiness = threadiness;
@ -175,26 +154,26 @@ void falco_grpc_server::init(std::string server_addr, int threadiness, std::stri
m_root_certs = root_certs; m_root_certs = root_certs;
} }
void falco_grpc_server::run() void falco::grpc::falco_grpc_server::run()
{ {
string private_key; string private_key;
string cert_chain; string cert_chain;
string root_certs; string root_certs;
read(m_cert_chain, cert_chain); falco::utils::read(m_cert_chain, cert_chain);
read(m_private_key, private_key); falco::utils::read(m_private_key, private_key);
read(m_root_certs, root_certs); falco::utils::read(m_root_certs, root_certs);
grpc::SslServerCredentialsOptions::PemKeyCertPair cert_pair{private_key, cert_chain}; ::grpc::SslServerCredentialsOptions::PemKeyCertPair cert_pair{private_key, cert_chain};
grpc::SslServerCredentialsOptions ssl_opts(GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY); ::grpc::SslServerCredentialsOptions ssl_opts(GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY);
ssl_opts.pem_root_certs = root_certs; ssl_opts.pem_root_certs = root_certs;
ssl_opts.pem_key_cert_pairs.push_back(cert_pair); ssl_opts.pem_key_cert_pairs.push_back(cert_pair);
// Setup server // Setup server
grpc::ServerBuilder builder; ::grpc::ServerBuilder builder;
// Listen on the given address without any authentication mechanism. // Listen on the given address without any authentication mechanism.
builder.AddListeningPort(m_server_addr, grpc::SslServerCredentials(ssl_opts)); builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts));
builder.RegisterService(&m_svc); builder.RegisterService(&m_svc);
// builder.SetMaxSendMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size? // builder.SetMaxSendMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size?
@ -225,7 +204,7 @@ void falco_grpc_server::run()
stop(); stop();
} }
void falco_grpc_server::stop() void falco::grpc::falco_grpc_server::stop()
{ {
falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n"); falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n");
m_server->Shutdown(); m_server->Shutdown();

View File

@ -23,6 +23,10 @@ limitations under the License.
#include "grpc_server_impl.h" #include "grpc_server_impl.h"
namespace falco
{
namespace grpc
{
class falco_grpc_server : public falco_grpc_server_impl class falco_grpc_server : public falco_grpc_server_impl
{ {
public: public:
@ -45,7 +49,7 @@ public:
void stop(); void stop();
service::AsyncService m_svc; service::AsyncService m_svc;
std::unique_ptr<grpc::ServerCompletionQueue> m_completion_queue; std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue;
private: private:
std::string m_server_addr; std::string m_server_addr;
@ -54,7 +58,7 @@ private:
std::string m_cert_chain; std::string m_cert_chain;
std::string m_root_certs; std::string m_root_certs;
std::unique_ptr<grpc::Server> m_server; std::unique_ptr<::grpc::Server> m_server;
std::vector<std::thread> m_threads; std::vector<std::thread> m_threads;
}; };
@ -64,7 +68,7 @@ public:
request_context_base() = default; request_context_base() = default;
~request_context_base() = default; ~request_context_base() = default;
std::unique_ptr<grpc::ServerContext> m_srv_ctx; std::unique_ptr<::grpc::ServerContext> m_srv_ctx;
enum : char enum : char
{ {
UNKNOWN = 0, UNKNOWN = 0,
@ -93,14 +97,16 @@ public:
void (falco_grpc_server::*m_process_func)(const stream_context&, const Request&, Response&); void (falco_grpc_server::*m_process_func)(const stream_context&, const Request&, Response&);
// Pointer to function that requests the system to start processing given requests // Pointer to function that requests the system to start processing given requests
void (service::AsyncService::*m_request_func)(grpc::ServerContext*, Request*, grpc::ServerAsyncWriter<Response>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); void (service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(falco_grpc_server* srv); void start(falco_grpc_server* srv);
void process(falco_grpc_server* srv); void process(falco_grpc_server* srv);
void end(falco_grpc_server* srv, bool isError); void end(falco_grpc_server* srv, bool isError);
private: private:
std::unique_ptr<grpc::ServerAsyncWriter<Response>> m_res_writer; std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;
std::unique_ptr<stream_context> m_stream_ctx; std::unique_ptr<stream_context> m_stream_ctx;
Request m_req; Request m_req;
}; };
} // namespace grpc
} // namespace falco

View File

@ -18,7 +18,7 @@ limitations under the License.
#include "grpc_server_impl.h" #include "grpc_server_impl.h"
bool falco_grpc_server_impl::is_running() bool falco::grpc::falco_grpc_server_impl::is_running()
{ {
if(m_stop) if(m_stop)
{ {
@ -27,7 +27,7 @@ bool falco_grpc_server_impl::is_running()
return true; return true;
} }
void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request& req, response& res) void falco::grpc::falco_grpc_server_impl::subscribe(const stream_context& ctx, const output::request& req, output::response& res)
{ {
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{ {
@ -52,7 +52,7 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request&
} }
} }
void falco_grpc_server_impl::shutdown() void falco::grpc::falco_grpc_server_impl::shutdown()
{ {
m_stop = true; m_stop = true;
} }

View File

@ -24,8 +24,10 @@ limitations under the License.
#include "falco_output.grpc.pb.h" #include "falco_output.grpc.pb.h"
#include "grpc_context.h" #include "grpc_context.h"
using namespace falco::output; namespace falco
{
namespace grpc
{
class falco_grpc_server_impl class falco_grpc_server_impl
{ {
public: public:
@ -37,8 +39,10 @@ public:
protected: protected:
bool is_running(); bool is_running();
void subscribe(const stream_context& ctx, const request& req, response& res); void subscribe(const stream_context& ctx, const output::request& req, output::response& res);
private: private:
std::atomic<bool> m_stop{false}; std::atomic<bool> m_stop{false};
}; };
} // namespace grpc
} // namespace falco