diff --git a/proposals/20190826-grpc-outputs.md b/proposals/20190826-grpc-outputs.md index 04ee6f87..0e2a6c4a 100644 --- a/proposals/20190826-grpc-outputs.md +++ b/proposals/20190826-grpc-outputs.md @@ -2,15 +2,14 @@ -- [Falco gRPC Outputs](#falco-grpc-outputs) - - [Summary](#summary) - - [Motivation](#motivation) - - [Goals](#goals) - - [Non-Goals](#non-goals) - - [Proposal](#proposal) - - [Use cases](#use-cases) - - [Diagrams](#diagrams) - - [Design Details](#design-details) +- [Summary](#summary) +- [Motivation](#motivation) + * [Goals](#goals) + * [Non-Goals](#non-goals) +- [Proposal](#proposal) + * [Use cases](#use-cases) + * [Diagrams](#diagrams) + * [Design Details](#design-details) diff --git a/userspace/falco/CMakeLists.txt b/userspace/falco/CMakeLists.txt index afe3a339..e45f1e43 100644 --- a/userspace/falco/CMakeLists.txt +++ b/userspace/falco/CMakeLists.txt @@ -19,23 +19,24 @@ add_custom_command( ${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.h ${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/version.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc - ${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.h - ${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc - ${CMAKE_CURRENT_BINARY_DIR}/output.pb.h + ${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc + ${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.h + ${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc + ${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.h ${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/schema.pb.h - COMMENT "Generate gRPC version API" + COMMENT "Generate gRPC API" + # Falco gRPC Version API DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/version.proto COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/version.proto COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN} ${CMAKE_CURRENT_SOURCE_DIR}/version.proto - COMMENT "Generate gRPC outputs API" - DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/output.proto - COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/output.proto + # Falco gRPC Outputs API + DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto + COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto ${CMAKE_CURRENT_SOURCE_DIR}/schema.proto COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN} - ${CMAKE_CURRENT_SOURCE_DIR}/output.proto + ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) add_executable( @@ -54,8 +55,8 @@ add_executable( grpc_server.cpp ${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc - ${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc - ${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc + ${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc + ${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc) add_dependencies(falco civetweb string-view-lite) diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 36bbb27d..a7434a94 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -22,11 +22,10 @@ limitations under the License. #include "formats.h" #include "logger.h" -#include "falco_output_queue.h" +#include "falco_outputs_queue.h" #include "banned.h" // This raises a compilation error when certain functions are used using namespace std; -using namespace falco::output; const static struct luaL_reg ll_falco_outputs [] = { @@ -316,7 +315,7 @@ int falco_outputs::handle_grpc(lua_State *ls) lua_error(ls); } - response grpc_res = response(); + auto grpc_res = falco::outputs::response(); // time gen_event *evt = (gen_event *)lua_topointer(ls, 1); @@ -366,7 +365,7 @@ int falco_outputs::handle_grpc(lua_State *ls) auto host = grpc_res.mutable_hostname(); *host = (char *)lua_tostring(ls, 7); - falco::output::queue::get().push(grpc_res); + falco::outputs::queue::get().push(grpc_res); return 1; } diff --git a/userspace/falco/falco_output_queue.h b/userspace/falco/falco_outputs_queue.h similarity index 96% rename from userspace/falco/falco_output_queue.h rename to userspace/falco/falco_outputs_queue.h index 2e57001a..26c1e041 100644 --- a/userspace/falco/falco_output_queue.h +++ b/userspace/falco/falco_outputs_queue.h @@ -16,12 +16,12 @@ limitations under the License. #pragma once -#include "output.pb.h" +#include "outputs.pb.h" #include "tbb/concurrent_queue.h" namespace falco { -namespace output +namespace outputs { typedef tbb::concurrent_queue response_cq; diff --git a/userspace/falco/grpc_request_context.cpp b/userspace/falco/grpc_request_context.cpp index 2a8b21f1..194fb442 100644 --- a/userspace/falco/grpc_request_context.cpp +++ b/userspace/falco/grpc_request_context.cpp @@ -24,12 +24,12 @@ namespace grpc { template<> -void request_stream_context::start(server* srv) +void request_stream_context::start(server* srv) { m_state = request_context_base::REQUEST; m_srv_ctx.reset(new ::grpc::ServerContext); auto srvctx = m_srv_ctx.get(); - m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); + m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); m_stream_ctx.reset(); m_req.Clear(); auto cq = srv->m_completion_queue.get(); @@ -38,7 +38,7 @@ void request_stream_context -void request_stream_context::process(server* srv) +void request_stream_context::process(server* srv) { // When it is the 1st process call if(m_state == request_context_base::REQUEST) @@ -48,7 +48,7 @@ void request_stream_context*m_process_func)(*m_stream_ctx, m_req, res); // get() if(!m_stream_ctx->m_is_running) @@ -75,7 +75,7 @@ void request_stream_context -void request_stream_context::end(server* srv, bool error) +void request_stream_context::end(server* srv, bool error) { if(m_stream_ctx) { @@ -86,7 +86,7 @@ void request_stream_contextm_status = error ? stream_context::ERROR : stream_context::SUCCESS; // Complete the processing - output::response res; + outputs::response res; (srv->*m_process_func)(*m_stream_ctx, m_req, res); // get() } else @@ -104,7 +104,7 @@ void request_stream_context -void falco::grpc::request_context::start(server* srv) +void request_context::start(server* srv) { m_state = request_context_base::REQUEST; m_srv_ctx.reset(new ::grpc::ServerContext); @@ -119,7 +119,7 @@ void falco::grpc::request_context -void falco::grpc::request_context::process(server* srv) +void request_context::process(server* srv) { version::response res; (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); @@ -131,7 +131,7 @@ void falco::grpc::request_context -void falco::grpc::request_context::end(server* srv, bool error) +void request_context::end(server* srv, bool error) { // todo(leodido) > handle processing errors here @@ -140,12 +140,12 @@ void falco::grpc::request_context -void request_bidi_context::start(server* srv) +void request_bidi_context::start(server* srv) { m_state = request_context_base::REQUEST; m_srv_ctx.reset(new ::grpc::ServerContext); auto srvctx = m_srv_ctx.get(); - m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter(srvctx)); + m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter(srvctx)); m_req.Clear(); auto cq = srv->m_completion_queue.get(); // Request to start processing given requests. @@ -155,7 +155,7 @@ void request_bidi_context -void request_bidi_context::process(server* srv) +void request_bidi_context::process(server* srv) { switch(m_state) { @@ -168,7 +168,7 @@ void request_bidi_context*m_process_func)(*m_bidi_ctx, m_req, res); // sub() if(!m_bidi_ctx->m_is_running) @@ -196,14 +196,14 @@ void request_bidi_context -void request_bidi_context::end(server* srv, bool error) +void request_bidi_context::end(server* srv, bool error) { if(m_bidi_ctx) { m_bidi_ctx->m_status = error ? bidi_context::ERROR : bidi_context::SUCCESS; // Complete the processing - output::response res; + outputs::response res; (srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub() } diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 3c9123db..21690347 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -208,8 +208,8 @@ void falco::grpc::server::run() // todo(leodido) > take a look at thread_stress_test.cc into grpc repository REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) - REGISTER_STREAM(output::request, output::response, output::service, get, get, context_num) - REGISTER_BIDI(output::request, output::response, output::service, sub, sub, context_num) + REGISTER_STREAM(outputs::request, outputs::response, outputs::service, get, get, context_num) + REGISTER_BIDI(outputs::request, outputs::response, outputs::service, sub, sub, context_num) m_threads.resize(m_threadiness); int thread_idx = 0; diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 9f24b651..ced4020f 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -44,7 +44,7 @@ public: void run(); void stop(); - output::service::AsyncService m_output_svc; + outputs::service::AsyncService m_output_svc; version::service::AsyncService m_version_svc; std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; diff --git a/userspace/falco/grpc_server_impl.cpp b/userspace/falco/grpc_server_impl.cpp index 2a3d62e1..bda1656a 100644 --- a/userspace/falco/grpc_server_impl.cpp +++ b/userspace/falco/grpc_server_impl.cpp @@ -16,7 +16,7 @@ limitations under the License. #include "config_falco.h" #include "grpc_server_impl.h" -#include "falco_output_queue.h" +#include "falco_outputs_queue.h" #include "logger.h" #include "banned.h" // This raises a compilation error when certain functions are used @@ -29,7 +29,7 @@ bool falco::grpc::server_impl::is_running() return true; } -void falco::grpc::server_impl::get(const stream_context& ctx, const output::request& req, output::response& res) +void falco::grpc::server_impl::get(const stream_context& ctx, const outputs::request& req, outputs::response& res) { if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) { @@ -44,10 +44,10 @@ void falco::grpc::server_impl::get(const stream_context& ctx, const output::requ // m_status == stream_context::STREAMING? // todo(leodido) > set m_stream - ctx.m_has_more = output::queue::get().try_pop(res); + ctx.m_has_more = outputs::queue::get().try_pop(res); } -void falco::grpc::server_impl::sub(const bidi_context& ctx, const output::request& req, output::response& res) +void falco::grpc::server_impl::sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res) { if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) { @@ -61,7 +61,7 @@ void falco::grpc::server_impl::sub(const bidi_context& ctx, const output::reques // m_status == stream_context::STREAMING? // todo(leodido) > set m_stream - ctx.m_has_more = output::queue::get().try_pop(res); + ctx.m_has_more = outputs::queue::get().try_pop(res); } void falco::grpc::server_impl::version(const context& ctx, const version::request&, version::response& res) diff --git a/userspace/falco/grpc_server_impl.h b/userspace/falco/grpc_server_impl.h index 4bf93976..f6ac1dce 100644 --- a/userspace/falco/grpc_server_impl.h +++ b/userspace/falco/grpc_server_impl.h @@ -17,7 +17,7 @@ limitations under the License. #pragma once #include -#include "output.grpc.pb.h" +#include "outputs.grpc.pb.h" #include "version.grpc.pb.h" #include "grpc_context.h" @@ -37,8 +37,8 @@ protected: bool is_running(); // Outputs - void get(const stream_context& ctx, const output::request& req, output::response& res); - void sub(const bidi_context& ctx, const output::request& req, output::response& res); + void get(const stream_context& ctx, const outputs::request& req, outputs::response& res); + void sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res); // Version void version(const context& ctx, const version::request& req, version::response& res);