From 0565ce2f506f2aadb40825bcbc7cc170f3915bfc Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Thu, 19 Sep 2019 16:05:39 +0200 Subject: [PATCH] fix(userspace/falco): grpc server implementation subscribe handle output queue stop Signed-off-by: Lorenzo Fontana --- userspace/falco/grpc_server.cpp | 46 +++++++++------------------- userspace/falco/grpc_server.h | 2 -- userspace/falco/grpc_server_impl.cpp | 6 ++-- userspace/falco/grpc_server_impl.h | 2 +- 4 files changed, 18 insertions(+), 38 deletions(-) diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index c1d46ef7..d8078c9f 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -21,7 +21,6 @@ limitations under the License. #else #include #endif -#include // pthread_sigmask #include "logger.h" #include "grpc_server.h" @@ -36,7 +35,6 @@ void request_stream_context::start(falco_grpc_server* srv) m_res_writer.reset(new grpc::ServerAsyncWriter(srvctx)); m_stream_ctx.reset(); m_req.Clear(); - auto cq = srv->m_completion_queue.get(); (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); } @@ -71,40 +69,22 @@ void request_stream_context::process(falco_grpc_server* srv) } template<> -void request_stream_context::end(falco_grpc_server* srv, bool isError) +void request_stream_context::end(falco_grpc_server* srv, bool errored) { if(m_stream_ctx) { - m_stream_ctx->m_status = stream_context::SUCCESS; - if(isError) - { - m_stream_ctx->m_status = stream_context::ERROR; - // todo > log error - } + m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS; // Complete the processing response res; (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() } - else - { - // Handle the edge case when `m_request_func` event failed - // which means `m_stream_ctx` was not set - // todo > log error - } start(srv); } void falco_grpc_server::thread_process(int thread_index) { - // Tell pthread to not handle termination signals in the current thread - sigset_t set; - sigemptyset(&set); - sigaddset(&set, SIGTERM); - // sigaddset(&set, SIGHUP); // todo > SIGHUP should restart Falco, what to do? - sigaddset(&set, SIGINT); - pthread_sigmask(SIG_BLOCK, &set, nullptr); void* tag = nullptr; bool event_read_success = false; @@ -143,15 +123,13 @@ void falco_grpc_server::thread_process(int thread_index) case request_context_base::FINISH: // Completion of ServerAsyncWriter::Finish() ctx->end(this, false); - + break; default: // todo > log "unkown completion queue event" // todo > abort? break; } } - - // todo > log "thread `thread_index` complete" } // @@ -240,32 +218,36 @@ void falco_grpc_server::run() thread = std::thread(&falco_grpc_server::thread_process, this, thread_idx++); } - while(is_running()) + while(falco_grpc_server_impl::is_running()) { + sleep(1); } - stop(); } void falco_grpc_server::stop() { + falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n"); m_server->Shutdown(); m_completion_queue->Shutdown(); - // todo > log "waiting for the server threads to complete" - + falco_logger::log(LOG_INFO, "Waiting for the gRPC threads to complete\n"); for(std::thread& t : m_threads) { - t.join(); + if(t.joinable()) + { + t.join(); + } } m_threads.clear(); - // todo > log "all server threads complete" - + falco_logger::log(LOG_INFO, "Ignoring all the remaining gRPC events\n"); // Ignore remaining events void* ignore_tag = nullptr; bool ignore_ok = false; while(m_completion_queue->Next(&ignore_tag, &ignore_ok)) { } + + falco_logger::log(LOG_INFO, "gRPC shutdown is now complete\n"); } diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index ed8822a9..672aec20 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -21,8 +21,6 @@ limitations under the License. #include #include -#include - #include "grpc_server_impl.h" class falco_grpc_server : public falco_grpc_server_impl diff --git a/userspace/falco/grpc_server_impl.cpp b/userspace/falco/grpc_server_impl.cpp index 22c6cb46..32cb99f7 100644 --- a/userspace/falco/grpc_server_impl.cpp +++ b/userspace/falco/grpc_server_impl.cpp @@ -44,15 +44,15 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request& ctx.m_has_more = true; return; } - while(!falco_output_queue::get().try_pop(res) && req.keepalive()) + while(is_running() && !falco_output_queue::get().try_pop(res) && req.keepalive()) { } - ctx.m_has_more = req.keepalive(); + ctx.m_has_more = !is_running() ? false : req.keepalive(); } } void falco_grpc_server_impl::shutdown() { m_stop = true; -} \ No newline at end of file +} diff --git a/userspace/falco/grpc_server_impl.h b/userspace/falco/grpc_server_impl.h index 451e5ba2..54277a22 100644 --- a/userspace/falco/grpc_server_impl.h +++ b/userspace/falco/grpc_server_impl.h @@ -41,4 +41,4 @@ protected: private: std::atomic m_stop{false}; -}; \ No newline at end of file +};