new(userspace/falco): grpc server event bus queue

Co-Authored-By: Leonardo Di Donato <leodidonato@gmail.com>
Signed-off-by: Lorenzo Fontana <lo@linux.com>
This commit is contained in:
Lorenzo Fontana 2019-09-04 18:34:05 +02:00 committed by Leo Di Donato
parent 36fb0f6751
commit 25f5fcacae
2 changed files with 30 additions and 6 deletions

View File

@ -119,6 +119,7 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou
// ctx.m_status == stream_context::STREAMING // ctx.m_status == stream_context::STREAMING
// todo > do we want batching? // todo > do we want batching?
<<<<<<< HEAD
std::stringstream ss; std::stringstream ss;
int c = 0; int c = 0;
int i = 9; int i = 9;
@ -129,7 +130,13 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou
} }
res.set_source(source::SYSCALL); res.set_source(source::SYSCALL);
res.set_rule(ss.str()); res.set_rule(ss.str());
=======
>>>>>>> new(userspace/falco): grpc server event bus queue
if(!m_event_queue.try_pop(res))
{
// TODO: log that we've not been able to pop?
}
ctx.m_has_more = true; ctx.m_has_more = true;
} }
// todo > print/store statistics // todo > print/store statistics
@ -260,8 +267,9 @@ void falco_grpc_server::stop()
} }
} }
void start_grpc_server(std::string server_address, int threadiness) bool start_grpc_server(std::string server_address, int threadiness, falco_output_response_cq& output_event_queue)
{ {
falco_grpc_server srv(server_address, threadiness); falco_grpc_server srv(server_address, threadiness, output_event_queue);
srv.run(); srv.run();
} return true;
}

View File

@ -21,17 +21,31 @@ limitations under the License.
#include <thread> #include <thread>
#include <string> #include <string>
#include <atomic> #include <atomic>
#include <queue>
#include "tbb/concurrent_queue.h"
#include "falco_output.grpc.pb.h" #include "falco_output.grpc.pb.h"
#include "falco_output.pb.h" #include "falco_output.pb.h"
#include "grpc_context.h" #include "grpc_context.h"
using namespace tbb;
typedef concurrent_queue<falco_output_response> falco_output_response_cq;
class falco_grpc_server_impl class falco_grpc_server_impl
{ {
public: public:
falco_grpc_server_impl() = default; falco_grpc_server_impl() = default;
~falco_grpc_server_impl() = default; ~falco_grpc_server_impl() = default;
falco_output_response_cq& m_event_queue;
falco_grpc_server_impl(falco_output_response_cq& event_queue):
m_event_queue(event_queue)
{
}
protected: protected:
bool is_running(); bool is_running();
@ -44,11 +58,13 @@ private:
class falco_grpc_server : public falco_grpc_server_impl class falco_grpc_server : public falco_grpc_server_impl
{ {
public: public:
falco_grpc_server(std::string server_addr, int threadiness): falco_grpc_server(std::string server_addr, int threadiness, falco_output_response_cq& m_event_queue):
falco_grpc_server_impl(m_event_queue),
m_server_addr(server_addr), m_server_addr(server_addr),
m_threadiness(threadiness) m_threadiness(threadiness)
{ {
} }
virtual ~falco_grpc_server() = default; virtual ~falco_grpc_server() = default;
void thread_process(int thread_index); void thread_process(int thread_index);
@ -65,7 +81,7 @@ private:
std::vector<std::thread> m_threads; std::vector<std::thread> m_threads;
}; };
void start_grpc_server(std::string server_address, int threadiness); bool start_grpc_server(std::string server_address, int threadiness, falco_output_response_cq& output_event_queue);
class request_context_base class request_context_base
{ {
@ -112,4 +128,4 @@ private:
std::unique_ptr<grpc::ServerAsyncWriter<Response>> m_res_writer; std::unique_ptr<grpc::ServerAsyncWriter<Response>> m_res_writer;
std::unique_ptr<stream_context> m_stream_ctx; std::unique_ptr<stream_context> m_stream_ctx;
Request m_req; Request m_req;
}; };