From 2a91289ee456c746c0a23b5c24cf6ce93fdd76a0 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Fri, 4 Oct 2019 11:35:39 +0000 Subject: [PATCH] update(userspace/falco): request context and request stream context templatize the service too now Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_request_context.cpp | 115 +++++++++++++++++++++++ userspace/falco/grpc_request_context.h | 100 ++++++++++++++++++++ userspace/falco/grpc_server.cpp | 108 ++------------------- userspace/falco/grpc_server.h | 80 +--------------- 4 files changed, 226 insertions(+), 177 deletions(-) create mode 100644 userspace/falco/grpc_request_context.cpp create mode 100644 userspace/falco/grpc_request_context.h diff --git a/userspace/falco/grpc_request_context.cpp b/userspace/falco/grpc_request_context.cpp new file mode 100644 index 00000000..2b219098 --- /dev/null +++ b/userspace/falco/grpc_request_context.cpp @@ -0,0 +1,115 @@ +/* +Copyright (C) 2016-2019 The Falco Authors + +This file is part of falco. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "grpc_request_context.h" + +namespace falco +{ +namespace grpc +{ + +template<> +void request_stream_context::start(server* srv) +{ + m_state = request_context_base::REQUEST; + m_srv_ctx.reset(new ::grpc::ServerContext); + auto srvctx = m_srv_ctx.get(); + m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); + m_stream_ctx.reset(); + m_req.Clear(); + auto cq = srv->m_completion_queue.get(); + (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); +} + +template<> +void request_stream_context::process(server* srv) +{ + // When it is the 1st process call + if(m_state == request_context_base::REQUEST) + { + m_state = request_context_base::WRITE; + m_stream_ctx.reset(new stream_context(m_srv_ctx.get())); + } + + // Processing + output::response res; + (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() + + // When there still are more responses to stream + if(m_stream_ctx->m_has_more) + { + m_res_writer->Write(res, this); + } + // No more responses to stream + else + { + // Communicate to the gRPC runtime that we have finished. + // The memory address of `this` instance uniquely identifies the event. + m_state = request_context_base::FINISH; + m_res_writer->Finish(::grpc::Status::OK, this); + } +} + +template<> +void request_stream_context::end(server* srv, bool errored) +{ + if(m_stream_ctx) + { + m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS; + + // Complete the processing + output::response res; + (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() + } + + start(srv); +} + +template<> +void falco::grpc::request_context::start(server* srv) +{ + m_state = request_context_base::REQUEST; + m_srv_ctx.reset(new ::grpc::ServerContext); + auto srvctx = m_srv_ctx.get(); + m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); + m_req.Clear(); + // auto cq = srv->m_completion_queue.get(); + // fixme(leodido) > m_svc is output::service not version::service + // (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); +} + +template<> +void falco::grpc::request_context::process(server* srv) +{ + version::response res; + (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); + // Done + m_state = request_context_base::FINISH; + m_res_writer->Write(res, this); + m_res_writer->Finish(::grpc::Status::OK, this); +} + +template<> +void falco::grpc::request_context::end(server* srv, bool errored) +{ + // todo(leodido) > what to do when errored is true? + start(srv); +} + +} // namespace grpc +} // namespace falco \ No newline at end of file diff --git a/userspace/falco/grpc_request_context.h b/userspace/falco/grpc_request_context.h new file mode 100644 index 00000000..2f76ea06 --- /dev/null +++ b/userspace/falco/grpc_request_context.h @@ -0,0 +1,100 @@ +/* +Copyright (C) 2016-2019 The Falco Authors + +This file is part of falco. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#pragma once + +#include "grpc_server.h" + +namespace falco +{ +namespace grpc +{ + +class request_context_base +{ +public: + request_context_base() = default; + ~request_context_base() = default; + + std::unique_ptr<::grpc::ServerContext> m_srv_ctx; + enum : char + { + UNKNOWN = 0, + REQUEST, + WRITE, + FINISH + } m_state = UNKNOWN; + virtual void start(server* srv) = 0; + virtual void process(server* srv) = 0; + virtual void end(server* srv, bool isError) = 0; +}; + +// The responsibility of `request_stream_context` template class +// is to handle streaming responses. +template +class request_stream_context : public request_context_base +{ +public: + request_stream_context(): + m_process_func(nullptr), + m_request_func(nullptr){}; + ~request_stream_context() = default; + + // Pointer to function that does actual processing + void (server::*m_process_func)(const stream_context&, const Request&, Response&); + + // Pointer to function that requests the system to start processing given requests + void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); + + void start(server* srv); + void process(server* srv); + void end(server* srv, bool isError); + +private: + std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; + std::unique_ptr m_stream_ctx; + Request m_req; +}; + +// The responsibility of `request_context` template class +// is to handle unary responses. +template +class request_context : public request_context_base +{ +public: + request_context(): + m_process_func(nullptr), + m_request_func(nullptr){}; + ~request_context() = default; + + // Pointer to function that does actual processing + void (server::*m_process_func)(const context&, const Request&, Response&); + + // Pointer to function that requests the system to start processing given requests + void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncResponseWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); + + void start(server* srv); + void process(server* srv); + void end(server* srv, bool isError); + +private: + std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; + Request m_req; +}; +} // namespace grpc +} // namespace falco \ No newline at end of file diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index f7bf836e..596594d8 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -22,15 +22,14 @@ limitations under the License. #include "logger.h" #include "grpc_server.h" +#include "grpc_request_context.h" #include "grpc_context.h" #include "utils.h" #include "banned.h" -// todo(leodido) > remove this macro doing only one REGISTER macro (only the context typeschange and they inherit from a base class) - #define REGISTER_STREAM(req, res, svc, rpc, impl, num) \ - std::vector> rpc##_contexts(num); \ - for(request_stream_context & c : rpc##_contexts) \ + std::vector> rpc##_contexts(num); \ + for(request_stream_context & c : rpc##_contexts) \ { \ c.m_process_func = &server::impl; \ c.m_request_func = &svc::AsyncService::Request##rpc; \ @@ -38,109 +37,14 @@ limitations under the License. } #define REGISTER_UNARY(req, res, svc, rpc, impl, num) \ - std::vector> rpc##_contexts(num); \ - for(request_context & c : rpc##_contexts) \ + std::vector> rpc##_contexts(num); \ + for(request_context & c : rpc##_contexts) \ { \ c.m_process_func = &server::impl; \ c.m_request_func = &svc::AsyncService::Request##rpc; \ c.start(this); \ } -namespace falco -{ -namespace grpc -{ - -template<> -void request_stream_context::start(server* srv) -{ - m_state = request_context_base::REQUEST; - m_srv_ctx.reset(new ::grpc::ServerContext); - auto srvctx = m_srv_ctx.get(); - m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); - m_stream_ctx.reset(); - m_req.Clear(); - auto cq = srv->m_completion_queue.get(); - (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); -} - -template<> -void request_stream_context::process(server* srv) -{ - // When it is the 1st process call - if(m_state == request_context_base::REQUEST) - { - m_state = request_context_base::WRITE; - m_stream_ctx.reset(new stream_context(m_srv_ctx.get())); - } - - // Processing - output::response res; - (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() - - // When there still are more responses to stream - if(m_stream_ctx->m_has_more) - { - m_res_writer->Write(res, this); - } - // No more responses to stream - else - { - // Communicate to the gRPC runtime that we have finished. - // The memory address of `this` instance uniquely identifies the event. - m_state = request_context_base::FINISH; - m_res_writer->Finish(::grpc::Status::OK, this); - } -} - -template<> -void request_stream_context::end(server* srv, bool errored) -{ - if(m_stream_ctx) - { - m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS; - - // Complete the processing - output::response res; - (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() - } - - start(srv); -} -} // namespace grpc -} // namespace falco - -template<> -void falco::grpc::request_context::start(server* srv) -{ - m_state = request_context_base::REQUEST; - m_srv_ctx.reset(new ::grpc::ServerContext); - auto srvctx = m_srv_ctx.get(); - m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); - m_req.Clear(); - // auto cq = srv->m_completion_queue.get(); - // fixme(leodido) > m_svc is output::service not version::service - // (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); -} - -template<> -void falco::grpc::request_context::process(server* srv) -{ - version::response res; - (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); - // Done - m_state = request_context_base::FINISH; - m_res_writer->Write(res, this); - m_res_writer->Finish(::grpc::Status::OK, this); -} - -template<> -void falco::grpc::request_context::end(server* srv, bool errored) -{ - // todo(leodido) > what to do when errored is true? - start(srv); -} - void falco::grpc::server::thread_process(int thread_index) { void* tag = nullptr; @@ -225,7 +129,7 @@ void falco::grpc::server::run() // For this approach to be sufficient server::IMPL have to be fast int context_num = m_threadiness * 10; - // REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) + //REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) m_threads.resize(m_threadiness); diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 9bbc604f..2ecc58b2 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -25,6 +25,7 @@ namespace falco { namespace grpc { + class server : public server_impl { public: @@ -50,6 +51,9 @@ public: output::service::AsyncService m_svc; std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; + // version::service::AsyncService m_version_svc; + // std::unique_ptr<::grpc::ServerCompletionQueue> m_version_completion_queue; + private: std::string m_server_addr; int m_threadiness; @@ -61,79 +65,5 @@ private: std::vector m_threads; }; -class request_context_base -{ -public: - request_context_base() = default; - ~request_context_base() = default; - - std::unique_ptr<::grpc::ServerContext> m_srv_ctx; - enum : char - { - UNKNOWN = 0, - REQUEST, - WRITE, - FINISH - } m_state = UNKNOWN; - virtual void start(server* srv) = 0; - virtual void process(server* srv) = 0; - virtual void end(server* srv, bool isError) = 0; -}; - -// The responsibility of `request_stream_context` template class -// is to handle streaming responses. -template -class request_stream_context : public request_context_base -{ -public: - request_stream_context(): - m_process_func(nullptr), - m_request_func(nullptr){}; - ~request_stream_context() = default; - - // Pointer to function that does actual processing - void (server::*m_process_func)(const stream_context&, const Request&, Response&); - - // fixme(leodido) > why output::service:: ... ? - // Pointer to function that requests the system to start processing given requests - void (output::service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); - - void start(server* srv); - void process(server* srv); - void end(server* srv, bool isError); - -private: - std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; - std::unique_ptr m_stream_ctx; - Request m_req; -}; - -// The responsibility of `request_context` template class -// is to handle unary responses. -template -class request_context : public request_context_base -{ -public: - request_context(): - m_process_func(nullptr), - m_request_func(nullptr){}; - ~request_context() = default; - - // Pointer to function that does actual processing - void (server::*m_process_func)(const context&, const Request&, Response&); - - // fixme(leodido) > why output::service:: ... ? - // Pointer to function that requests the system to start processing given requests - void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); - - void start(server* srv); - void process(server* srv); - void end(server* srv, bool isError); - -private: - // todo(leodido) > factorize these two into tbe base class? - std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; - Request m_req; -}; } // namespace grpc -} // namespace falco +} // namespace falco \ No newline at end of file