From 98cdc30aa33592c8871e3f8314fac29649fef6a4 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Wed, 25 Sep 2019 11:35:14 +0000 Subject: [PATCH] chore(userspace): addressing review comments and typos Co-authored-by: Lorenzo Fontana Signed-off-by: Leonardo Di Donato --- userspace/engine/formats.cpp | 1 - userspace/falco/configuration.cpp | 1 - userspace/falco/grpc_context.h | 5 ++--- userspace/falco/grpc_server.cpp | 31 ++++++++++++---------------- userspace/falco/grpc_server_impl.cpp | 5 +---- userspace/falco/lua/output.lua | 8 ++++--- 6 files changed, 21 insertions(+), 30 deletions(-) diff --git a/userspace/engine/formats.cpp b/userspace/engine/formats.cpp index bea9e5f0..5ae1a471 100644 --- a/userspace/engine/formats.cpp +++ b/userspace/engine/formats.cpp @@ -290,7 +290,6 @@ int falco_formats::resolve_tokens(lua_State *ls) json_event_formatter json_formatter(s_engine->json_factory(), sformat); values = json_formatter.tomap((json_event*) evt); } - // todo(leodido, fntlnz) > check explicitly for k8s_audit, otherwise throw exception lua_newtable(ls); for(auto const& v : values) diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index bfe6512e..ae871690 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -151,7 +151,6 @@ void falco_configuration::init(string conf_filename, list &cmdline_optio m_grpc_enabled = m_config->get_scalar("grpc", "enabled", false); m_grpc_bind_address = m_config->get_scalar("grpc", "bind_address", "0.0.0.0:5060"); m_grpc_threadiness = m_config->get_scalar("grpc", "threadiness", 8); // todo > limit it to avoid overshubscription? std::thread::hardware_concurrency() - // todo(fntlnz,leodido) > chose correct paths m_grpc_private_key = m_config->get_scalar("grpc", "private_key", "/etc/falco/certs/server.key"); m_grpc_cert_chain = m_config->get_scalar("grpc", "cert_chain", "/etc/falco/certs/server.crt"); m_grpc_root_certs = m_config->get_scalar("grpc", "root_certs", "/etc/falco/certs/ca.crt"); diff --git a/userspace/falco/grpc_context.h b/userspace/falco/grpc_context.h index bb8eec62..b086792d 100644 --- a/userspace/falco/grpc_context.h +++ b/userspace/falco/grpc_context.h @@ -60,9 +60,8 @@ public: SUCCESS, ERROR } m_status = STREAMING; - // Request-specific stream data - mutable void* m_stream = nullptr; - // Are there more responses to stream? + + mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future mutable bool m_has_more = false; }; } // namespace grpc diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index a0a448ce..2ec98c5d 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -27,6 +27,15 @@ limitations under the License. #include "grpc_context.h" #include "utils.h" +#define REGISTER_STREAM(req, res, svc, rpc, impl, num) \ + std::vector> rpc##_contexts(num); \ + for(request_stream_context & ctx : rpc##_contexts) \ + { \ + ctx.m_process_func = &server::impl; \ + ctx.m_request_func = &svc::AsyncService::Request##rpc; \ + ctx.start(this); \ + } + template<> void falco::grpc::request_stream_context::start(server* srv) { @@ -52,7 +61,7 @@ void falco::grpc::request_stream_context*m_process_func)(*m_stream_ctx, m_req, res); + (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() // When there still are more responses to stream if(m_stream_ctx->m_has_more) @@ -86,14 +95,12 @@ void falco::grpc::request_stream_contextNext(&tag, &event_read_success)) { if(tag == nullptr) { - // TODO: empty tag returned, log "completion queue with empty tag" continue; } @@ -105,7 +112,6 @@ void falco::grpc::server::thread_process(int thread_index) { if(ctx->m_state != request_context_base::REQUEST) { - // todo > log "server completion queue failed to read event for tag `tag`" // End the context with error ctx->end(this, true); } @@ -127,24 +133,11 @@ void falco::grpc::server::thread_process(int thread_index) break; default: // todo > log "unkown completion queue event" - // todo > abort? break; } } } -// -// Create array of contexts and start processing streaming RPC request. -// -#define REGISTER_STREAM(REQ, RESP, RPC, IMPL, CONTEXT_NUM) \ - std::vector> RPC##_contexts(CONTEXT_NUM); \ - for(request_stream_context & ctx : RPC##_contexts) \ - { \ - ctx.m_process_func = &server::IMPL; \ - ctx.m_request_func = &output::service::AsyncService::Request##RPC; \ - ctx.start(this); \ - } - void falco::grpc::server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs) { m_server_addr = server_addr; @@ -182,7 +175,9 @@ void falco::grpc::server::run() // This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC) // For this approach to be sufficient server::IMPL have to be fast int context_num = m_threadiness * 10; - REGISTER_STREAM(output::request, output::response, subscribe, subscribe, context_num) + REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) + + // register_stream(subscribe, context_num) m_threads.resize(m_threadiness); int thread_idx = 0; diff --git a/userspace/falco/grpc_server_impl.cpp b/userspace/falco/grpc_server_impl.cpp index f423b5af..d4712290 100644 --- a/userspace/falco/grpc_server_impl.cpp +++ b/userspace/falco/grpc_server_impl.cpp @@ -32,14 +32,11 @@ void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output { if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) { - // todo > logic - ctx.m_stream = nullptr; } else { - // Start (or continue) streaming - // ctx.m_status == stream_context::STREAMING + // Streaming if(output::queue::get().try_pop(res) && !req.keepalive()) { ctx.m_has_more = true; diff --git a/userspace/falco/lua/output.lua b/userspace/falco/lua/output.lua index 6a635d35..8c845480 100644 --- a/userspace/falco/lua/output.lua +++ b/userspace/falco/lua/output.lua @@ -170,12 +170,14 @@ function mod.http_reopen() end function mod.grpc(event, rule, source, priority, priority_num, msg, format, options) - fields = formats.resolve_tokens(event, source, format) - c_outputs.handle_grpc(event, rule, source, priority, msg, fields, options) + if options.enabled == true then + fields = formats.resolve_tokens(event, source, format) + c_outputs.handle_grpc(event, rule, source, priority, msg, fields, options) + end end function mod.grpc_message(priority, priority_num, msg, options) - -- todo + -- todo(fntlnz, leodido) > gRPC does not support subscribing to dropped events yet end