diff --git a/userspace/falco/CMakeLists.txt b/userspace/falco/CMakeLists.txt index a3e78ae5..369c1440 100644 --- a/userspace/falco/CMakeLists.txt +++ b/userspace/falco/CMakeLists.txt @@ -50,6 +50,7 @@ add_executable( webserver.cpp grpc_context.cpp grpc_server_impl.cpp + grpc_request_context.cpp grpc_server.cpp utils.cpp ${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.cc @@ -58,6 +59,8 @@ add_executable( ${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc) +add_dependencies(falco civetweb) + if(USE_BUNDLED_DEPS) add_dependencies(falco yamlcpp) endif() diff --git a/userspace/falco/grpc_request_context.cpp b/userspace/falco/grpc_request_context.cpp index 2b219098..a5a50c63 100644 --- a/userspace/falco/grpc_request_context.cpp +++ b/userspace/falco/grpc_request_context.cpp @@ -33,7 +33,7 @@ void request_stream_contextm_completion_queue.get(); - (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); + (srv->m_output_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); } template<> @@ -86,22 +86,20 @@ void falco::grpc::request_context(srvctx)); + m_res_writer.reset(new ::grpc::ServerAsyncResponseWriter(srvctx)); m_req.Clear(); - // auto cq = srv->m_completion_queue.get(); - // fixme(leodido) > m_svc is output::service not version::service - // (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); + auto cq = srv->m_completion_queue.get(); + (srv->m_version_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this); } template<> void falco::grpc::request_context::process(server* srv) { version::response res; - (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); + (srv->*m_process_func)(m_srv_ctx.get(), m_req, res); // version() // Done m_state = request_context_base::FINISH; - m_res_writer->Write(res, this); - m_res_writer->Finish(::grpc::Status::OK, this); + m_res_writer->Finish(res, ::grpc::Status::OK, this); } template<> diff --git a/userspace/falco/grpc_request_context.h b/userspace/falco/grpc_request_context.h index 2f76ea06..8a0174e4 100644 --- a/userspace/falco/grpc_request_context.h +++ b/userspace/falco/grpc_request_context.h @@ -93,7 +93,7 @@ public: void end(server* srv, bool isError); private: - std::unique_ptr<::grpc::ServerAsyncWriter> m_res_writer; + std::unique_ptr<::grpc::ServerAsyncResponseWriter> m_res_writer; Request m_req; }; } // namespace grpc diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 596594d8..4f62d600 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -117,8 +117,8 @@ void falco::grpc::server::run() ::grpc::ServerBuilder builder; builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts)); - builder.RegisterService(&m_svc); - // fixme(leodido) > register various services ... + builder.RegisterService(&m_output_svc); + builder.RegisterService(&m_version_svc); m_completion_queue = builder.AddCompletionQueue(); m_server = builder.BuildAndStart(); @@ -129,9 +129,10 @@ void falco::grpc::server::run() // For this approach to be sufficient server::IMPL have to be fast int context_num = m_threadiness * 10; - //REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) + REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) + // todo(leodido, fntlnz) > do we need to size thrediness to context_num * number of registered services here? eg., context_num * 2 m_threads.resize(m_threadiness); int thread_idx = 0; for(std::thread& thread : m_threads) diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 2ecc58b2..abdbb08f 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -47,12 +47,10 @@ public: void run(); void stop(); - // fixme(leodido) > wny the output::service:: ..? - output::service::AsyncService m_svc; - std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; + output::service::AsyncService m_output_svc; + version::service::AsyncService m_version_svc; - // version::service::AsyncService m_version_svc; - // std::unique_ptr<::grpc::ServerCompletionQueue> m_version_completion_queue; + std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; private: std::string m_server_addr;