diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index a23cdc62..39efea78 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -119,6 +119,7 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou // ctx.m_status == stream_context::STREAMING // todo > do we want batching? +<<<<<<< HEAD std::stringstream ss; int c = 0; int i = 9; @@ -129,7 +130,13 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou } res.set_source(source::SYSCALL); res.set_rule(ss.str()); +======= +>>>>>>> new(userspace/falco): grpc server event bus queue + if(!m_event_queue.try_pop(res)) + { + // TODO: log that we've not been able to pop? + } ctx.m_has_more = true; } // todo > print/store statistics @@ -260,8 +267,9 @@ void falco_grpc_server::stop() } } -void start_grpc_server(std::string server_address, int threadiness) +bool start_grpc_server(std::string server_address, int threadiness, falco_output_response_cq& output_event_queue) { - falco_grpc_server srv(server_address, threadiness); + falco_grpc_server srv(server_address, threadiness, output_event_queue); srv.run(); -} \ No newline at end of file + return true; +} diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 26254a83..37f5c3c3 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -21,17 +21,31 @@ limitations under the License. #include #include #include +#include + +#include "tbb/concurrent_queue.h" #include "falco_output.grpc.pb.h" #include "falco_output.pb.h" #include "grpc_context.h" +using namespace tbb; + +typedef concurrent_queue falco_output_response_cq; + class falco_grpc_server_impl { public: falco_grpc_server_impl() = default; ~falco_grpc_server_impl() = default; + falco_output_response_cq& m_event_queue; + + falco_grpc_server_impl(falco_output_response_cq& event_queue): + m_event_queue(event_queue) + { + } + protected: bool is_running(); @@ -44,11 +58,13 @@ private: class falco_grpc_server : public falco_grpc_server_impl { public: - falco_grpc_server(std::string server_addr, int threadiness): + falco_grpc_server(std::string server_addr, int threadiness, falco_output_response_cq& m_event_queue): + falco_grpc_server_impl(m_event_queue), m_server_addr(server_addr), m_threadiness(threadiness) { } + virtual ~falco_grpc_server() = default; void thread_process(int thread_index); @@ -65,7 +81,7 @@ private: std::vector m_threads; }; -void start_grpc_server(std::string server_address, int threadiness); +bool start_grpc_server(std::string server_address, int threadiness, falco_output_response_cq& output_event_queue); class request_context_base { @@ -112,4 +128,4 @@ private: std::unique_ptr> m_res_writer; std::unique_ptr m_stream_ctx; Request m_req; -}; \ No newline at end of file +};