chore(userspace/falco): use the falco grpc output namespace

Co-authored-by: Lorenzo Fontana <lo@linux.com>
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2019-09-13 11:47:57 +00:00 committed by Leo Di Donato
parent c40b797f33
commit d6efb17f88
5 changed files with 19 additions and 17 deletions

View File

@ -21,7 +21,9 @@ limitations under the License.
#include "falco_output.pb.h" #include "falco_output.pb.h"
#include "tbb/concurrent_queue.h" #include "tbb/concurrent_queue.h"
typedef tbb::concurrent_queue<falco_output_response> falco_output_response_cq; using namespace falco::output;
typedef tbb::concurrent_queue<response> falco_output_response_cq;
class falco_output_queue class falco_output_queue
{ {
@ -32,12 +34,12 @@ public:
return instance; return instance;
} }
bool try_pop(falco_output_response& res) bool try_pop(response& res)
{ {
return m_queue.try_pop(res); return m_queue.try_pop(res);
} }
void push(falco_output_response& res) void push(response& res)
{ {
m_queue.push(res); m_queue.push(res);
} }

View File

@ -28,12 +28,12 @@ limitations under the License.
#include "grpc_context.h" #include "grpc_context.h"
template<> template<>
void request_stream_context<falco_output_request, falco_output_response>::start(falco_grpc_server* srv) void request_stream_context<request, response>::start(falco_grpc_server* srv)
{ {
m_state = request_context_base::REQUEST; m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new grpc::ServerContext); m_srv_ctx.reset(new grpc::ServerContext);
auto srvctx = m_srv_ctx.get(); auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new grpc::ServerAsyncWriter<falco_output_response>(srvctx)); m_res_writer.reset(new grpc::ServerAsyncWriter<response>(srvctx));
m_stream_ctx.reset(); m_stream_ctx.reset();
m_req.Clear(); m_req.Clear();
@ -42,7 +42,7 @@ void request_stream_context<falco_output_request, falco_output_response>::start(
} }
template<> template<>
void request_stream_context<falco_output_request, falco_output_response>::process(falco_grpc_server* srv) void request_stream_context<request, response>::process(falco_grpc_server* srv)
{ {
// When it is the 1st process call // When it is the 1st process call
if(m_state == request_context_base::REQUEST) if(m_state == request_context_base::REQUEST)
@ -52,7 +52,7 @@ void request_stream_context<falco_output_request, falco_output_response>::proces
} }
// Processing // Processing
falco_output_response res; response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); (srv->*m_process_func)(*m_stream_ctx, m_req, res);
// When there still are more responses to stream // When there still are more responses to stream
@ -71,7 +71,7 @@ void request_stream_context<falco_output_request, falco_output_response>::proces
} }
template<> template<>
void request_stream_context<falco_output_request, falco_output_response>::end(falco_grpc_server* srv, bool isError) void request_stream_context<request, response>::end(falco_grpc_server* srv, bool isError)
{ {
if(m_stream_ctx) if(m_stream_ctx)
{ {
@ -83,7 +83,7 @@ void request_stream_context<falco_output_request, falco_output_response>::end(fa
} }
// Complete the processing // Complete the processing
falco_output_response res; response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
} }
else else
@ -96,8 +96,6 @@ void request_stream_context<falco_output_request, falco_output_response>::end(fa
start(srv); start(srv);
} }
void falco_grpc_server::thread_process(int thread_index) void falco_grpc_server::thread_process(int thread_index)
{ {
// TODO: is this right? That's what we want? // TODO: is this right? That's what we want?
@ -164,7 +162,7 @@ void falco_grpc_server::thread_process(int thread_index)
for(request_stream_context<REQ, RESP> & ctx : RPC##_contexts) \ for(request_stream_context<REQ, RESP> & ctx : RPC##_contexts) \
{ \ { \
ctx.m_process_func = &falco_grpc_server::IMPL; \ ctx.m_process_func = &falco_grpc_server::IMPL; \
ctx.m_request_func = &falco_output_service::AsyncService::Request##RPC; \ ctx.m_request_func = &service::AsyncService::Request##RPC; \
ctx.start(this); \ ctx.start(this); \
} }
@ -183,7 +181,7 @@ void falco_grpc_server::run()
falco_logger::log(LOG_INFO, "Starting gRPC webserver at " + m_server_addr + "\n"); falco_logger::log(LOG_INFO, "Starting gRPC webserver at " + m_server_addr + "\n");
int context_count = m_threadiness * 1; // todo > 10 or 100? int context_count = m_threadiness * 1; // todo > 10 or 100?
PROCESS_STREAM(falco_output_request, falco_output_response, subscribe, subscribe, context_count) PROCESS_STREAM(request, response, subscribe, subscribe, context_count)
m_threads.resize(m_threadiness); m_threads.resize(m_threadiness);
int thread_idx = 0; int thread_idx = 0;

View File

@ -39,7 +39,7 @@ public:
void run(); void run();
void stop(); void stop();
falco_output_service::AsyncService m_svc; service::AsyncService m_svc;
std::unique_ptr<grpc::ServerCompletionQueue> m_completion_queue; std::unique_ptr<grpc::ServerCompletionQueue> m_completion_queue;
private: private:
@ -86,7 +86,7 @@ public:
void (falco_grpc_server::*m_process_func)(const stream_context&, const Request&, Response&); void (falco_grpc_server::*m_process_func)(const stream_context&, const Request&, Response&);
// Pointer to function that requests the system to start processing given requests // Pointer to function that requests the system to start processing given requests
void (falco_output_service::AsyncService::*m_request_func)(grpc::ServerContext*, Request*, grpc::ServerAsyncWriter<Response>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); void (service::AsyncService::*m_request_func)(grpc::ServerContext*, Request*, grpc::ServerAsyncWriter<Response>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
void start(falco_grpc_server* srv); void start(falco_grpc_server* srv);
void process(falco_grpc_server* srv); void process(falco_grpc_server* srv);

View File

@ -27,7 +27,7 @@ bool falco_grpc_server_impl::is_running()
return true; return true;
} }
void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res) void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request& req, response& res)
{ {
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{ {

View File

@ -24,6 +24,8 @@ limitations under the License.
#include "falco_output.grpc.pb.h" #include "falco_output.grpc.pb.h"
#include "grpc_context.h" #include "grpc_context.h"
using namespace falco::output;
class falco_grpc_server_impl class falco_grpc_server_impl
{ {
public: public:
@ -33,7 +35,7 @@ public:
protected: protected:
bool is_running(); bool is_running();
void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res); void subscribe(const stream_context& ctx, const request& req, response& res);
private: private:
std::atomic<bool> m_stop{false}; std::atomic<bool> m_stop{false};