From 738d757b08d32542c1927dcfc0c8f86c4be4bdf8 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Thu, 6 Feb 2020 22:03:03 +0000 Subject: [PATCH] docs(userspace/falco): document gRPC errors and actions Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_request_context.cpp | 34 ++++++++++++++++++++---- userspace/falco/grpc_server.cpp | 19 +++++++++---- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/userspace/falco/grpc_request_context.cpp b/userspace/falco/grpc_request_context.cpp index a5a50c63..50970345 100644 --- a/userspace/falco/grpc_request_context.cpp +++ b/userspace/falco/grpc_request_context.cpp @@ -33,6 +33,7 @@ void request_stream_contextm_completion_queue.get(); + // todo(leodido) > log "calling m_request_func: tag=this, state=m_state" (srv->m_output_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); } @@ -50,17 +51,19 @@ void request_stream_context*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() - // When there still are more responses to stream + // When there are still more responses to stream if(m_stream_ctx->m_has_more) { + // todo(leodido) > log "write: tag=this, state=m_state" m_res_writer->Write(res, this); } // No more responses to stream else { // Communicate to the gRPC runtime that we have finished. - // The memory address of `this` instance uniquely identifies the event. + // The memory address of "this" instance uniquely identifies the event. m_state = request_context_base::FINISH; + // todo(leodido) > log "finish: tag=this, state=m_state" m_res_writer->Finish(::grpc::Status::OK, this); } } @@ -70,13 +73,27 @@ void request_stream_context log error "error streaming: tag=this, state=m_state, stream=m_stream_ctx->m_stream" + } m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS; // Complete the processing output::response res; (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() } + else + { + // Flow enters here when the processing of "m_request_func" fails. + // Since this happens into the `start()` function, the processing does not advance to the `process()` function. + // So, `m_stream_ctx` is null because it is set into the `process()` function. + // The stream haven't started. + // todo(leodido) > log error "ending streaming: tag=this, state=m_state, stream=null" + } + + // Ask to start processing requests start(srv); } @@ -89,6 +106,9 @@ void falco::grpc::request_context(srvctx)); m_req.Clear(); auto cq = srv->m_completion_queue.get(); + // Request to start processing given requests. + // Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request. + // In this way, different contexts can serve different requests concurrently. (srv->m_version_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); } @@ -96,16 +116,20 @@ template<> void falco::grpc::request_context::process(server* srv) { version::response res; - (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); // version() - // Done + (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); + + // Notify the gRPC runtime that this processing is done m_state = request_context_base::FINISH; + // Using "this"- ie., the memory address of this context - to uniquely identify the event. m_res_writer->Finish(res, ::grpc::Status::OK, this); } template<> void falco::grpc::request_context::end(server* srv, bool errored) { - // todo(leodido) > what to do when errored is true? + // todo(leodido) > handle processing errors here + + // Ask to start processing requests start(srv); } diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 19670b1b..0bd53781 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -52,17 +52,22 @@ void falco::grpc::server::thread_process(int thread_index) { if(tag == nullptr) { + // todo(leodido) > log error "server completion queue error: empty tag" continue; } // Obtain the context for a given tag request_context_base* ctx = static_cast(tag); + // todo(leodido) > log "next event: tag=tag, read_success=event_read_success, state=ctx->m_state" + // When event has not been read successfully if(!event_read_success) { if(ctx->m_state != request_context_base::REQUEST) { + // todo(leodido) > log error "server completion queue failing to read: tag=tag" + // End the context with error ctx->end(this, true); } @@ -75,17 +80,19 @@ void falco::grpc::server::thread_process(int thread_index) case request_context_base::REQUEST: // Completion of m_request_func case request_context_base::WRITE: - // Completion of ServerAsyncWriter::Write() + // Completion of Write() ctx->process(this); break; case request_context_base::FINISH: - // Completion of ServerAsyncWriter::Finish() + // Completion of Finish() ctx->end(this, false); break; default: - // todo > log "unkown completion queue event" + // todo(leodido) > log error "unkown completion queue event: tag=tag, state=ctx->m_state" break; } + + // todo(leodido) > log "thread completed: index=thread_index" } } @@ -127,22 +134,24 @@ void falco::grpc::server::run() // This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC) // For this approach to be sufficient server::IMPL have to be fast int context_num = m_threadiness * 10; + // todo(leodido) > take a look at thread_stress_test.cc into grpc repository REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) - // todo(leodido, fntlnz) > do we need to size thrediness to context_num * number of registered services here? eg., context_num * 2 m_threads.resize(m_threadiness); int thread_idx = 0; for(std::thread& thread : m_threads) { thread = std::thread(&server::thread_process, this, thread_idx++); } + // todo(leodido) > log "gRPC server running: threadiness=m_threads.size()" while(server_impl::is_running()) { sleep(1); } + // todo(leodido) > log "stopping gRPC server" stop(); } @@ -162,7 +171,7 @@ void falco::grpc::server::stop() } m_threads.clear(); - falco_logger::log(LOG_INFO, "Ignoring all the remaining gRPC events\n"); + falco_logger::log(LOG_INFO, "Draining all the remaining gRPC events\n"); // Ignore remaining events void* ignore_tag = nullptr; bool ignore_ok = false;