docs(userspace/falco): document gRPC errors and actions

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2020-02-06 22:03:03 +00:00 committed by poiana
parent 5663d4d02b
commit 738d757b08
2 changed files with 43 additions and 10 deletions

View File

@ -33,6 +33,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
m_stream_ctx.reset(); m_stream_ctx.reset();
m_req.Clear(); m_req.Clear();
auto cq = srv->m_completion_queue.get(); auto cq = srv->m_completion_queue.get();
// todo(leodido) > log "calling m_request_func: tag=this, state=m_state"
(srv->m_output_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); (srv->m_output_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
} }
@ -50,17 +51,19 @@ void request_stream_context<falco::output::service, falco::output::request, falc
output::response res; output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
// When there still are more responses to stream // When there are still more responses to stream
if(m_stream_ctx->m_has_more) if(m_stream_ctx->m_has_more)
{ {
// todo(leodido) > log "write: tag=this, state=m_state"
m_res_writer->Write(res, this); m_res_writer->Write(res, this);
} }
// No more responses to stream // No more responses to stream
else else
{ {
// Communicate to the gRPC runtime that we have finished. // Communicate to the gRPC runtime that we have finished.
// The memory address of `this` instance uniquely identifies the event. // The memory address of "this" instance uniquely identifies the event.
m_state = request_context_base::FINISH; m_state = request_context_base::FINISH;
// todo(leodido) > log "finish: tag=this, state=m_state"
m_res_writer->Finish(::grpc::Status::OK, this); m_res_writer->Finish(::grpc::Status::OK, this);
} }
} }
@ -70,13 +73,27 @@ void request_stream_context<falco::output::service, falco::output::request, falc
{ {
if(m_stream_ctx) if(m_stream_ctx)
{ {
if(errored)
{
// todo(leodido) > log error "error streaming: tag=this, state=m_state, stream=m_stream_ctx->m_stream"
}
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS; m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
// Complete the processing // Complete the processing
output::response res; output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
} }
else
{
// Flow enters here when the processing of "m_request_func" fails.
// Since this happens into the `start()` function, the processing does not advance to the `process()` function.
// So, `m_stream_ctx` is null because it is set into the `process()` function.
// The stream haven't started.
// todo(leodido) > log error "ending streaming: tag=this, state=m_state, stream=null"
}
// Ask to start processing requests
start(srv); start(srv);
} }
@ -89,6 +106,9 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
m_res_writer.reset(new ::grpc::ServerAsyncResponseWriter<version::response>(srvctx)); m_res_writer.reset(new ::grpc::ServerAsyncResponseWriter<version::response>(srvctx));
m_req.Clear(); m_req.Clear();
auto cq = srv->m_completion_queue.get(); auto cq = srv->m_completion_queue.get();
// Request to start processing given requests.
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request.
// In this way, different contexts can serve different requests concurrently.
(srv->m_version_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); (srv->m_version_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
} }
@ -96,16 +116,20 @@ template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv) void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv)
{ {
version::response res; version::response res;
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res); // version() (srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
// Done
// Notify the gRPC runtime that this processing is done
m_state = request_context_base::FINISH; m_state = request_context_base::FINISH;
// Using "this"- ie., the memory address of this context - to uniquely identify the event.
m_res_writer->Finish(res, ::grpc::Status::OK, this); m_res_writer->Finish(res, ::grpc::Status::OK, this);
} }
template<> template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored) void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored)
{ {
// todo(leodido) > what to do when errored is true? // todo(leodido) > handle processing errors here
// Ask to start processing requests
start(srv); start(srv);
} }

View File

@ -52,17 +52,22 @@ void falco::grpc::server::thread_process(int thread_index)
{ {
if(tag == nullptr) if(tag == nullptr)
{ {
// todo(leodido) > log error "server completion queue error: empty tag"
continue; continue;
} }
// Obtain the context for a given tag // Obtain the context for a given tag
request_context_base* ctx = static_cast<request_context_base*>(tag); request_context_base* ctx = static_cast<request_context_base*>(tag);
// todo(leodido) > log "next event: tag=tag, read_success=event_read_success, state=ctx->m_state"
// When event has not been read successfully // When event has not been read successfully
if(!event_read_success) if(!event_read_success)
{ {
if(ctx->m_state != request_context_base::REQUEST) if(ctx->m_state != request_context_base::REQUEST)
{ {
// todo(leodido) > log error "server completion queue failing to read: tag=tag"
// End the context with error // End the context with error
ctx->end(this, true); ctx->end(this, true);
} }
@ -75,17 +80,19 @@ void falco::grpc::server::thread_process(int thread_index)
case request_context_base::REQUEST: case request_context_base::REQUEST:
// Completion of m_request_func // Completion of m_request_func
case request_context_base::WRITE: case request_context_base::WRITE:
// Completion of ServerAsyncWriter::Write() // Completion of Write()
ctx->process(this); ctx->process(this);
break; break;
case request_context_base::FINISH: case request_context_base::FINISH:
// Completion of ServerAsyncWriter::Finish() // Completion of Finish()
ctx->end(this, false); ctx->end(this, false);
break; break;
default: default:
// todo > log "unkown completion queue event" // todo(leodido) > log error "unkown completion queue event: tag=tag, state=ctx->m_state"
break; break;
} }
// todo(leodido) > log "thread completed: index=thread_index"
} }
} }
@ -127,22 +134,24 @@ void falco::grpc::server::run()
// This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC) // This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC)
// For this approach to be sufficient server::IMPL have to be fast // For this approach to be sufficient server::IMPL have to be fast
int context_num = m_threadiness * 10; int context_num = m_threadiness * 10;
// todo(leodido) > take a look at thread_stress_test.cc into grpc repository
REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num)
REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num)
// todo(leodido, fntlnz) > do we need to size thrediness to context_num * number of registered services here? eg., context_num * 2
m_threads.resize(m_threadiness); m_threads.resize(m_threadiness);
int thread_idx = 0; int thread_idx = 0;
for(std::thread& thread : m_threads) for(std::thread& thread : m_threads)
{ {
thread = std::thread(&server::thread_process, this, thread_idx++); thread = std::thread(&server::thread_process, this, thread_idx++);
} }
// todo(leodido) > log "gRPC server running: threadiness=m_threads.size()"
while(server_impl::is_running()) while(server_impl::is_running())
{ {
sleep(1); sleep(1);
} }
// todo(leodido) > log "stopping gRPC server"
stop(); stop();
} }
@ -162,7 +171,7 @@ void falco::grpc::server::stop()
} }
m_threads.clear(); m_threads.clear();
falco_logger::log(LOG_INFO, "Ignoring all the remaining gRPC events\n"); falco_logger::log(LOG_INFO, "Draining all the remaining gRPC events\n");
// Ignore remaining events // Ignore remaining events
void* ignore_tag = nullptr; void* ignore_tag = nullptr;
bool ignore_ok = false; bool ignore_ok = false;