From c224633454f8f7e333be98f5924e0c23bd6a0614 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Fri, 4 Oct 2019 10:26:54 +0000 Subject: [PATCH] new(userspace/falco): initial work for version gRPC svc registration Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_server.cpp | 61 ++++++++++++++++++++++++++++----- userspace/falco/grpc_server.h | 35 +++++++++++++++++-- 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 36eec4d5..f7bf836e 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -26,13 +26,24 @@ limitations under the License. #include "utils.h" #include "banned.h" -#define REGISTER_STREAM(req, res, svc, rpc, impl, num) \ - std::vector> rpc##_contexts(num); \ - for(request_stream_context & ctx : rpc##_contexts) \ - { \ - ctx.m_process_func = &server::impl; \ - ctx.m_request_func = &svc::AsyncService::Request##rpc; \ - ctx.start(this); \ +// todo(leodido) > remove this macro doing only one REGISTER macro (only the context typeschange and they inherit from a base class) + +#define REGISTER_STREAM(req, res, svc, rpc, impl, num) \ + std::vector> rpc##_contexts(num); \ + for(request_stream_context & c : rpc##_contexts) \ + { \ + c.m_process_func = &server::impl; \ + c.m_request_func = &svc::AsyncService::Request##rpc; \ + c.start(this); \ + } + +#define REGISTER_UNARY(req, res, svc, rpc, impl, num) \ + std::vector> rpc##_contexts(num); \ + for(request_context & c : rpc##_contexts) \ + { \ + c.m_process_func = &server::impl; \ + c.m_request_func = &svc::AsyncService::Request##rpc; \ + c.start(this); \ } namespace falco @@ -99,6 +110,37 @@ void request_stream_context::en } // namespace grpc } // namespace falco +template<> +void falco::grpc::request_context::start(server* srv) +{ + m_state = request_context_base::REQUEST; + m_srv_ctx.reset(new ::grpc::ServerContext); + auto srvctx = m_srv_ctx.get(); + m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); + m_req.Clear(); + // auto cq = srv->m_completion_queue.get(); + // fixme(leodido) > m_svc is output::service not version::service + // (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); +} + +template<> +void falco::grpc::request_context::process(server* srv) +{ + version::response res; + (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); + // Done + m_state = request_context_base::FINISH; + m_res_writer->Write(res, this); + m_res_writer->Finish(::grpc::Status::OK, this); +} + +template<> +void falco::grpc::request_context::end(server* srv, bool errored) +{ + // todo(leodido) > what to do when errored is true? + start(srv); +} + void falco::grpc::server::thread_process(int thread_index) { void* tag = nullptr; @@ -172,6 +214,7 @@ void falco::grpc::server::run() ::grpc::ServerBuilder builder; builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts)); builder.RegisterService(&m_svc); + // fixme(leodido) > register various services ... m_completion_queue = builder.AddCompletionQueue(); m_server = builder.BuildAndStart(); @@ -181,9 +224,9 @@ void falco::grpc::server::run() // This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC) // For this approach to be sufficient server::IMPL have to be fast int context_num = m_threadiness * 10; - REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) - // register_stream(subscribe, context_num) + // REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) + REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) m_threads.resize(m_threadiness); int thread_idx = 0; diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 9b868322..9bbc604f 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -46,6 +46,7 @@ public: void run(); void stop(); + // fixme(leodido) > wny the output::service:: ..? output::service::AsyncService m_svc; std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; @@ -79,9 +80,8 @@ public: virtual void end(server* srv, bool isError) = 0; }; -// -// Template class to handle streaming responses -// +// The responsibility of `request_stream_context` template class +// is to handle streaming responses. template class request_stream_context : public request_context_base { @@ -94,6 +94,7 @@ public: // Pointer to function that does actual processing void (server::*m_process_func)(const stream_context&, const Request&, Response&); + // fixme(leodido) > why output::service:: ... ? // Pointer to function that requests the system to start processing given requests void (output::service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); @@ -106,5 +107,33 @@ private: std::unique_ptr m_stream_ctx; Request m_req; }; + +// The responsibility of `request_context` template class +// is to handle unary responses. +template +class request_context : public request_context_base +{ +public: + request_context(): + m_process_func(nullptr), + m_request_func(nullptr){}; + ~request_context() = default; + + // Pointer to function that does actual processing + void (server::*m_process_func)(const context&, const Request&, Response&); + + // fixme(leodido) > why output::service:: ... ? + // Pointer to function that requests the system to start processing given requests + void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); + + void start(server* srv); + void process(server* srv); + void end(server* srv, bool isError); + +private: + // todo(leodido) > factorize these two into tbe base class? + std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; + Request m_req; +}; } // namespace grpc } // namespace falco