From fbe4e34a5721c07b5f6f897696328cfeb3b9f35b Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Tue, 3 Sep 2019 16:19:51 +0000 Subject: [PATCH] new(userspace/falco): request stream context process and end handling Co-authored-by: Lorenzo Fontana Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_server.cpp | 45 +++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 1b904293..529ab712 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -44,11 +44,56 @@ void request_stream_context::start( template<> void request_stream_context::process(falco_grpc_server* srv) { + // When it is the 1st process call + if(m_state == request_context_base::REQUEST) + { + m_state = request_context_base::WRITE; + m_stream_ctx.reset(new stream_context(m_srv_ctx.get())); + } + + // Processing + falco_output_response res; + (srv->*m_process_func)(*m_stream_ctx, m_req, res); + + // When there still are more responses to stream + if(m_stream_ctx->m_has_more) + { + m_res_writer->Write(res, this); + } + // No more responses to stream + else + { + // Communicate to the gRPC runtime that we have finished. + // The memory address of `this` instance uniquely identifies the event. + m_state = request_context_base::FINISH; + m_res_writer->Finish(grpc::Status::OK, this); + } } template<> void request_stream_context::end(falco_grpc_server* srv, bool isError) { + if(m_stream_ctx) + { + m_stream_ctx->m_status = stream_context::SUCCESS; + if(isError) + { + m_stream_ctx->m_status = stream_context::ERROR; + // todo > log error + } + + // Complete the processing + falco_output_response res; + (srv->*m_process_func)(*m_stream_ctx, m_req, res); + } + else + { + // Handle the edge case when `m_request_func` event failed + // which means `m_stream_ctx` was not set + // todo > log error + } + + start(srv); } bool falco_grpc_server_impl::is_running()