diff --git a/userspace/falco/falco.cpp b/userspace/falco/falco.cpp index 14526086..71650a02 100644 --- a/userspace/falco/falco.cpp +++ b/userspace/falco/falco.cpp @@ -455,7 +455,7 @@ int falco_init(int argc, char **argv) scap_stats cstats; falco_webserver webserver; - falco_grpc_server grpc_server; + falco::grpc::falco_grpc_server grpc_server; std::thread grpc_server_thread; static struct option long_options[] = diff --git a/userspace/falco/grpc_context.cpp b/userspace/falco/grpc_context.cpp index c5257049..f689c598 100644 --- a/userspace/falco/grpc_context.cpp +++ b/userspace/falco/grpc_context.cpp @@ -20,7 +20,7 @@ limitations under the License. #include "grpc_context.h" -context::context(grpc::ServerContext* ctx): +falco::grpc::context::context(::grpc::ServerContext* ctx): m_ctx(ctx) { std::string session_id; @@ -48,12 +48,12 @@ context::context(grpc::ServerContext* ctx): m_prefix = meta.str(); } -void context::get_metadata(std::string key, std::string& val) +void falco::grpc::context::context::get_metadata(std::string key, std::string& val) { - const std::multimap& client_metadata = m_ctx->client_metadata(); + const std::multimap<::grpc::string_ref, ::grpc::string_ref>& client_metadata = m_ctx->client_metadata(); auto it = client_metadata.find(key); if(it != client_metadata.end()) { val.assign(it->second.data(), it->second.size()); } -} \ No newline at end of file +} diff --git a/userspace/falco/grpc_context.h b/userspace/falco/grpc_context.h index 4781517a..bb8eec62 100644 --- a/userspace/falco/grpc_context.h +++ b/userspace/falco/grpc_context.h @@ -26,26 +26,31 @@ limitations under the License. #include #endif +namespace falco +{ +namespace grpc +{ + const std::string meta_session = "session_id"; const std::string meta_request = "request_id"; class context { public: - context(grpc::ServerContext* ctx); + context(::grpc::ServerContext* ctx); ~context() = default; void get_metadata(std::string key, std::string& val); private: - grpc::ServerContext* m_ctx = nullptr; + ::grpc::ServerContext* m_ctx = nullptr; std::string m_prefix; }; class stream_context : public context { public: - stream_context(grpc::ServerContext* ctx): + stream_context(::grpc::ServerContext* ctx): context(ctx){}; ~stream_context() = default; @@ -59,4 +64,6 @@ public: mutable void* m_stream = nullptr; // Are there more responses to stream? mutable bool m_has_more = false; -}; \ No newline at end of file +}; +} // namespace grpc +} // namespace falco diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index d8078c9f..0c4c2cf3 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -25,14 +25,15 @@ limitations under the License. #include "logger.h" #include "grpc_server.h" #include "grpc_context.h" +#include "utils.h" template<> -void request_stream_context::start(falco_grpc_server* srv) +void falco::grpc::request_stream_context::start(falco_grpc_server* srv) { m_state = request_context_base::REQUEST; - m_srv_ctx.reset(new grpc::ServerContext); + m_srv_ctx.reset(new ::grpc::ServerContext); auto srvctx = m_srv_ctx.get(); - m_res_writer.reset(new grpc::ServerAsyncWriter(srvctx)); + m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); m_stream_ctx.reset(); m_req.Clear(); auto cq = srv->m_completion_queue.get(); @@ -40,7 +41,7 @@ void request_stream_context::start(falco_grpc_server* srv) } template<> -void request_stream_context::process(falco_grpc_server* srv) +void falco::grpc::request_stream_context::process(falco_grpc_server* srv) { // When it is the 1st process call if(m_state == request_context_base::REQUEST) @@ -64,12 +65,12 @@ void request_stream_context::process(falco_grpc_server* srv) // Communicate to the gRPC runtime that we have finished. // The memory address of `this` instance uniquely identifies the event. m_state = request_context_base::FINISH; - m_res_writer->Finish(grpc::Status::OK, this); + m_res_writer->Finish(::grpc::Status::OK, this); } } template<> -void request_stream_context::end(falco_grpc_server* srv, bool errored) +void falco::grpc::request_stream_context::end(falco_grpc_server* srv, bool errored) { if(m_stream_ctx) { @@ -83,7 +84,7 @@ void request_stream_context::end(falco_grpc_server* srv, bool start(srv); } -void falco_grpc_server::thread_process(int thread_index) +void falco::grpc::falco_grpc_server::thread_process(int thread_index) { void* tag = nullptr; @@ -144,29 +145,7 @@ void falco_grpc_server::thread_process(int thread_index) ctx.start(this); \ } -// todo(fntlnz, leodido) > cleanup this part (paths from config, read, includes) -#include -#include -#include - -void read(const std::string& filename, std::string& data) -{ - std::ifstream file(filename.c_str(), std::ios::in); - - if(file.is_open()) - { - std::stringstream ss; - ss << file.rdbuf(); - - file.close(); - - data = ss.str(); - } - - return; -} - -void falco_grpc_server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs) +void falco::grpc::falco_grpc_server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs) { m_server_addr = server_addr; m_threadiness = threadiness; @@ -175,26 +154,26 @@ void falco_grpc_server::init(std::string server_addr, int threadiness, std::stri m_root_certs = root_certs; } -void falco_grpc_server::run() +void falco::grpc::falco_grpc_server::run() { string private_key; string cert_chain; string root_certs; - read(m_cert_chain, cert_chain); - read(m_private_key, private_key); - read(m_root_certs, root_certs); + falco::utils::read(m_cert_chain, cert_chain); + falco::utils::read(m_private_key, private_key); + falco::utils::read(m_root_certs, root_certs); - grpc::SslServerCredentialsOptions::PemKeyCertPair cert_pair{private_key, cert_chain}; + ::grpc::SslServerCredentialsOptions::PemKeyCertPair cert_pair{private_key, cert_chain}; - grpc::SslServerCredentialsOptions ssl_opts(GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY); + ::grpc::SslServerCredentialsOptions ssl_opts(GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY); ssl_opts.pem_root_certs = root_certs; ssl_opts.pem_key_cert_pairs.push_back(cert_pair); // Setup server - grpc::ServerBuilder builder; + ::grpc::ServerBuilder builder; // Listen on the given address without any authentication mechanism. - builder.AddListeningPort(m_server_addr, grpc::SslServerCredentials(ssl_opts)); + builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts)); builder.RegisterService(&m_svc); // builder.SetMaxSendMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size? @@ -225,7 +204,7 @@ void falco_grpc_server::run() stop(); } -void falco_grpc_server::stop() +void falco::grpc::falco_grpc_server::stop() { falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n"); m_server->Shutdown(); diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 672aec20..f40935ba 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -23,6 +23,10 @@ limitations under the License. #include "grpc_server_impl.h" +namespace falco +{ +namespace grpc +{ class falco_grpc_server : public falco_grpc_server_impl { public: @@ -45,7 +49,7 @@ public: void stop(); service::AsyncService m_svc; - std::unique_ptr m_completion_queue; + std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; private: std::string m_server_addr; @@ -54,7 +58,7 @@ private: std::string m_cert_chain; std::string m_root_certs; - std::unique_ptr m_server; + std::unique_ptr<::grpc::Server> m_server; std::vector m_threads; }; @@ -64,7 +68,7 @@ public: request_context_base() = default; ~request_context_base() = default; - std::unique_ptr m_srv_ctx; + std::unique_ptr<::grpc::ServerContext> m_srv_ctx; enum : char { UNKNOWN = 0, @@ -93,14 +97,16 @@ public: void (falco_grpc_server::*m_process_func)(const stream_context&, const Request&, Response&); // Pointer to function that requests the system to start processing given requests - void (service::AsyncService::*m_request_func)(grpc::ServerContext*, Request*, grpc::ServerAsyncWriter*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); + void (service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); void start(falco_grpc_server* srv); void process(falco_grpc_server* srv); void end(falco_grpc_server* srv, bool isError); private: - std::unique_ptr> m_res_writer; + std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; std::unique_ptr m_stream_ctx; Request m_req; }; +} // namespace grpc +} // namespace falco diff --git a/userspace/falco/grpc_server_impl.cpp b/userspace/falco/grpc_server_impl.cpp index 32cb99f7..67497be6 100644 --- a/userspace/falco/grpc_server_impl.cpp +++ b/userspace/falco/grpc_server_impl.cpp @@ -18,7 +18,7 @@ limitations under the License. #include "grpc_server_impl.h" -bool falco_grpc_server_impl::is_running() +bool falco::grpc::falco_grpc_server_impl::is_running() { if(m_stop) { @@ -27,7 +27,7 @@ bool falco_grpc_server_impl::is_running() return true; } -void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request& req, response& res) +void falco::grpc::falco_grpc_server_impl::subscribe(const stream_context& ctx, const output::request& req, output::response& res) { if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) { @@ -52,7 +52,7 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request& } } -void falco_grpc_server_impl::shutdown() +void falco::grpc::falco_grpc_server_impl::shutdown() { m_stop = true; } diff --git a/userspace/falco/grpc_server_impl.h b/userspace/falco/grpc_server_impl.h index 54277a22..ef318556 100644 --- a/userspace/falco/grpc_server_impl.h +++ b/userspace/falco/grpc_server_impl.h @@ -24,8 +24,10 @@ limitations under the License. #include "falco_output.grpc.pb.h" #include "grpc_context.h" -using namespace falco::output; - +namespace falco +{ +namespace grpc +{ class falco_grpc_server_impl { public: @@ -37,8 +39,10 @@ public: protected: bool is_running(); - void subscribe(const stream_context& ctx, const request& req, response& res); + void subscribe(const stream_context& ctx, const output::request& req, output::response& res); private: std::atomic m_stop{false}; }; +} // namespace grpc +} // namespace falco