diff --git a/userspace/falco/falco_output_queue.h b/userspace/falco/falco_output_queue.h index f2ef9803..bc41d24e 100644 --- a/userspace/falco/falco_output_queue.h +++ b/userspace/falco/falco_output_queue.h @@ -21,7 +21,9 @@ limitations under the License. #include "falco_output.pb.h" #include "tbb/concurrent_queue.h" -typedef tbb::concurrent_queue falco_output_response_cq; +using namespace falco::output; + +typedef tbb::concurrent_queue falco_output_response_cq; class falco_output_queue { @@ -32,12 +34,12 @@ public: return instance; } - bool try_pop(falco_output_response& res) + bool try_pop(response& res) { return m_queue.try_pop(res); } - void push(falco_output_response& res) + void push(response& res) { m_queue.push(res); } diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index bf54b376..1a6c21dd 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -28,12 +28,12 @@ limitations under the License. #include "grpc_context.h" template<> -void request_stream_context::start(falco_grpc_server* srv) +void request_stream_context::start(falco_grpc_server* srv) { m_state = request_context_base::REQUEST; m_srv_ctx.reset(new grpc::ServerContext); auto srvctx = m_srv_ctx.get(); - m_res_writer.reset(new grpc::ServerAsyncWriter(srvctx)); + m_res_writer.reset(new grpc::ServerAsyncWriter(srvctx)); m_stream_ctx.reset(); m_req.Clear(); @@ -42,7 +42,7 @@ void request_stream_context::start( } template<> -void request_stream_context::process(falco_grpc_server* srv) +void request_stream_context::process(falco_grpc_server* srv) { // When it is the 1st process call if(m_state == request_context_base::REQUEST) @@ -52,7 +52,7 @@ void request_stream_context::proces } // Processing - falco_output_response res; + response res; (srv->*m_process_func)(*m_stream_ctx, m_req, res); // When there still are more responses to stream @@ -71,7 +71,7 @@ void request_stream_context::proces } template<> -void request_stream_context::end(falco_grpc_server* srv, bool isError) +void request_stream_context::end(falco_grpc_server* srv, bool isError) { if(m_stream_ctx) { @@ -83,7 +83,7 @@ void request_stream_context::end(fa } // Complete the processing - falco_output_response res; + response res; (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() } else @@ -96,8 +96,6 @@ void request_stream_context::end(fa start(srv); } - - void falco_grpc_server::thread_process(int thread_index) { // TODO: is this right? That's what we want? @@ -164,7 +162,7 @@ void falco_grpc_server::thread_process(int thread_index) for(request_stream_context & ctx : RPC##_contexts) \ { \ ctx.m_process_func = &falco_grpc_server::IMPL; \ - ctx.m_request_func = &falco_output_service::AsyncService::Request##RPC; \ + ctx.m_request_func = &service::AsyncService::Request##RPC; \ ctx.start(this); \ } @@ -183,7 +181,7 @@ void falco_grpc_server::run() falco_logger::log(LOG_INFO, "Starting gRPC webserver at " + m_server_addr + "\n"); int context_count = m_threadiness * 1; // todo > 10 or 100? - PROCESS_STREAM(falco_output_request, falco_output_response, subscribe, subscribe, context_count) + PROCESS_STREAM(request, response, subscribe, subscribe, context_count) m_threads.resize(m_threadiness); int thread_idx = 0; diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 7aec3e73..a144d5e7 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -39,7 +39,7 @@ public: void run(); void stop(); - falco_output_service::AsyncService m_svc; + service::AsyncService m_svc; std::unique_ptr m_completion_queue; private: @@ -86,7 +86,7 @@ public: void (falco_grpc_server::*m_process_func)(const stream_context&, const Request&, Response&); // Pointer to function that requests the system to start processing given requests - void (falco_output_service::AsyncService::*m_request_func)(grpc::ServerContext*, Request*, grpc::ServerAsyncWriter*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); + void (service::AsyncService::*m_request_func)(grpc::ServerContext*, Request*, grpc::ServerAsyncWriter*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); void start(falco_grpc_server* srv); void process(falco_grpc_server* srv); diff --git a/userspace/falco/grpc_server_impl.cpp b/userspace/falco/grpc_server_impl.cpp index 6a06a430..44f1682c 100644 --- a/userspace/falco/grpc_server_impl.cpp +++ b/userspace/falco/grpc_server_impl.cpp @@ -27,7 +27,7 @@ bool falco_grpc_server_impl::is_running() return true; } -void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res) +void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request& req, response& res) { if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) { diff --git a/userspace/falco/grpc_server_impl.h b/userspace/falco/grpc_server_impl.h index c64fbb23..fbf22b28 100644 --- a/userspace/falco/grpc_server_impl.h +++ b/userspace/falco/grpc_server_impl.h @@ -24,6 +24,8 @@ limitations under the License. #include "falco_output.grpc.pb.h" #include "grpc_context.h" +using namespace falco::output; + class falco_grpc_server_impl { public: @@ -33,7 +35,7 @@ public: protected: bool is_running(); - void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res); + void subscribe(const stream_context& ctx, const request& req, response& res); private: std::atomic m_stop{false};