From cf31712fad6a4e984a1121deaf7a08818c9a5e33 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Wed, 27 May 2020 19:59:35 +0000 Subject: [PATCH] update(userspace/falco): context class for bidirectional gRPC services Co-authored-by: Lorenzo Fontana Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_request_context.cpp | 71 +++++++++++++++++++++--- userspace/falco/grpc_request_context.h | 34 ++++++++++-- 2 files changed, 93 insertions(+), 12 deletions(-) diff --git a/userspace/falco/grpc_request_context.cpp b/userspace/falco/grpc_request_context.cpp index 50970345..dbb66294 100644 --- a/userspace/falco/grpc_request_context.cpp +++ b/userspace/falco/grpc_request_context.cpp @@ -16,6 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +#include "logger.h" #include "grpc_request_context.h" namespace falco @@ -40,6 +41,7 @@ void request_stream_context void request_stream_context::process(server* srv) { + falco_logger::log(LOG_INFO, "process\n"); // When it is the 1st process call if(m_state == request_context_base::REQUEST) { @@ -49,7 +51,7 @@ void request_stream_context*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() + (srv->*m_process_func)(*m_stream_ctx, m_req, res); // When there are still more responses to stream if(m_stream_ctx->m_has_more) @@ -69,19 +71,19 @@ void request_stream_context -void request_stream_context::end(server* srv, bool errored) +void request_stream_context::end(server* srv, bool error) { if(m_stream_ctx) { - if(errored) + if(error) { // todo(leodido) > log error "error streaming: tag=this, state=m_state, stream=m_stream_ctx->m_stream" } - m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS; + m_stream_ctx->m_status = error ? stream_context::ERROR : stream_context::SUCCESS; // Complete the processing output::response res; - (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() + (srv->*m_process_func)(*m_stream_ctx, m_req, res); } else { @@ -125,13 +127,66 @@ void falco::grpc::request_context -void falco::grpc::request_context::end(server* srv, bool errored) +void falco::grpc::request_context::end(server* srv, bool error) { // todo(leodido) > handle processing errors here - + // Ask to start processing requests start(srv); } +template<> +void request_bidi_context::start(server* srv) +{ + falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::START\n"); + m_state = request_context_base::REQUEST; + m_srv_ctx.reset(new ::grpc::ServerContext); + auto srvctx = m_srv_ctx.get(); + m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter(srvctx)); + auto cq = srv->m_completion_queue.get(); + // Request to start processing given requests. + // Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request. + // In this way, different contexts can serve different requests concurrently. + (srv->m_output_svc.*m_request_func)(srvctx, m_reader_writer.get(), cq, cq, this); +}; + +template<> +void request_bidi_context::process(server* srv) +{ + falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS\n"); + switch(m_state) + { + case request_context_base::REQUEST: + falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - REQUEST\n"); + m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get())); + m_bidi_ctx->m_status = bidi_context::READY_TO_WRITE; // stream_context::STREAMING + m_state = request_context_base::WRITE; + m_reader_writer->Read(&m_req, this); + return; + case request_context_base::WRITE: + falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE\n"); + m_state = request_context_base::WRITE; + m_reader_writer->Read(&m_req, this); + { + // Processing + output::response res; + (srv->*m_process_func)(*m_bidi_ctx, m_req, res); + } + return; + case request_context_base::FINISH: + falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - FINISH\n"); + // + return; + default: + return; + } +}; + +template<> +void request_bidi_context::end(server* srv, bool error) +{ + falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::END\n"); +}; + } // namespace grpc -} // namespace falco \ No newline at end of file +} // namespace falco diff --git a/userspace/falco/grpc_request_context.h b/userspace/falco/grpc_request_context.h index 4a9ec3e3..1b4878ec 100644 --- a/userspace/falco/grpc_request_context.h +++ b/userspace/falco/grpc_request_context.h @@ -40,7 +40,7 @@ public: WRITE, FINISH } m_state = UNKNOWN; - + virtual void start(server* srv) = 0; virtual void process(server* srv) = 0; virtual void end(server* srv, bool isError) = 0; @@ -65,7 +65,7 @@ public: void start(server* srv); void process(server* srv); - void end(server* srv, bool isError); + void end(server* srv, bool error); private: std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; @@ -92,11 +92,37 @@ public: void start(server* srv); void process(server* srv); - void end(server* srv, bool isError); + void end(server* srv, bool error); private: std::unique_ptr<::grpc::ServerAsyncResponseWriter> m_res_writer; Request m_req; }; + +template +class request_bidi_context : public request_context_base +{ +public: + request_bidi_context(): + m_process_func(nullptr), + m_request_func(nullptr){}; + ~request_bidi_context() = default; + + // Pointer to function that does actual processing + void (server::*m_process_func)(const bidi_context&, const Request&, Response&); + + // Pointer to function that requests the system to start processing given requests + void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, ::grpc::ServerAsyncReaderWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); + + void start(server* srv); + void process(server* srv); + void end(server* srv, bool error); + +private: + std::unique_ptr<::grpc::ServerAsyncReaderWriter> m_reader_writer; + std::unique_ptr m_bidi_ctx; + Request m_req; +}; + } // namespace grpc -} // namespace falco \ No newline at end of file +} // namespace falco