From 13f5a76b97ac0ba828252065b9915cb6378e75e5 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Tue, 3 Sep 2019 14:52:54 +0000 Subject: [PATCH] new(usperspace/falco): request stream context specialization and process stream macro Co-authored-by: Lorenzo Fontana Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_server.cpp | 73 +++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 5530bb4b..1b904293 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -27,13 +27,54 @@ limitations under the License. #include "grpc_server.h" #include "grpc_context.h" -bool grpc_server_impl::is_running() +template<> +void request_stream_context::start(falco_grpc_server* srv) +{ + m_state = request_context_base::REQUEST; + m_srv_ctx.reset(new grpc::ServerContext); + auto srvctx = m_srv_ctx.get(); + m_res_writer.reset(new grpc::ServerAsyncWriter(srvctx)); + m_stream_ctx.reset(); + m_req.Clear(); + + auto cq = srv->m_completion_queue.get(); + (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); +} + +template<> +void request_stream_context::process(falco_grpc_server* srv) +{ +} + +template<> +void request_stream_context::end(falco_grpc_server* srv, bool isError) +{ +} + +bool falco_grpc_server_impl::is_running() { // TODO: this must act as a switch to shut down the server return true; } -void grpc_server::thread_process(int thread_index) +void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res) +{ + if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) + { + // todo > logic + + ctx.m_stream = nullptr; + } + else + { + // Start (or continue) streaming + // ctx.m_status == stream_context::STREAMING + } + + // todo > print/store statistics +} + +void falco_grpc_server::thread_process(int thread_index) { // TODO: is this right? That's what we want? // Tell pthread to not handle termination signals in the current thread @@ -55,7 +96,19 @@ void grpc_server::thread_process(int thread_index) } } -void grpc_server::run() +// +// Create array of contexts and start processing streaming RPC request. +// +#define PROCESS_STREAM(REQ, RESP, RPC, IMPL, CONTEXT_COUNT) \ + std::vector> RPC##_contexts(CONTEXT_COUNT); \ + for(request_stream_context & ctx : RPC##_contexts) \ + { \ + ctx.m_process_func = &falco_grpc_server::IMPL; \ + ctx.m_request_func = &falco_output_service::AsyncService::Request##RPC; \ + ctx.start(this); \ + } + +void falco_grpc_server::run() { grpc::ServerBuilder builder; // Listen on the given address without any authentication mechanism. @@ -66,14 +119,14 @@ void grpc_server::run() m_server = builder.BuildAndStart(); std::cout << "Server listening on " << m_server_addr << std::endl; - // int context_count = threadiness * 10; + int context_count = m_threadiness * 10; + PROCESS_STREAM(falco_output_request, falco_output_response, subscribe, subscribe, context_count) m_threads.resize(m_threadiness); - int thread_idx = 0; for(std::thread& thread : m_threads) { - thread = std::thread(&grpc_server::thread_process, this, thread_idx++); + thread = std::thread(&falco_grpc_server::thread_process, this, thread_idx++); } while(is_running()) @@ -81,12 +134,8 @@ void grpc_server::run() } } -void grpc_server::subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res) -{ -} - void start_grpc_server(std::string server_address, int threadiness) { - grpc_server srv(server_address, threadiness); + falco_grpc_server srv(server_address, threadiness); srv.run(); -} +} \ No newline at end of file