diff --git a/userspace/falco/falco.cpp b/userspace/falco/falco.cpp index 46315466..617c986a 100644 --- a/userspace/falco/falco.cpp +++ b/userspace/falco/falco.cpp @@ -34,6 +34,7 @@ limitations under the License. #include #include + #include #include "logger.h" @@ -221,15 +222,16 @@ static std::string read_file(std::string filename) // Event processing loop // uint64_t do_inspect(falco_engine *engine, - falco_outputs *outputs, - sinsp* inspector, - falco_configuration &config, - syscall_evt_drop_mgr &sdropmgr, - uint64_t duration_to_tot_ns, - string &stats_filename, - uint64_t stats_interval, - bool all_events, - int &result) + falco_outputs *outputs, + sinsp* inspector, + falco_configuration &config, + syscall_evt_drop_mgr &sdropmgr, + uint64_t duration_to_tot_ns, + string &stats_filename, + uint64_t stats_interval, + bool all_events, + falco_output_response_cq &output_event_queue, + int &result) { uint64_t num_evts = 0; int32_t rc; @@ -324,6 +326,11 @@ uint64_t do_inspect(falco_engine *engine, if(res) { outputs->handle_event(res->evt, res->rule, res->source, res->priority_num, res->format); + + // TODO(fntlnz): integrate this with the output handle event logic, for now we just want to test the threads + falco_output_response grpcres = falco_output_response(); + grpcres.set_rule(res->rule); + output_event_queue.push(grpcres); } num_evts++; @@ -1164,8 +1171,10 @@ int falco_init(int argc, char **argv) } // grpc server - // TODO: this is blocking now, not what we want, falco must go on. Just an experiment for now. - start_grpc_server("0.0.0.0:5060", 1); + falco_output_response_cq output_event_queue; + + // TODO(fntlnz): do any handling, make sure we handle signals in the GRPC server and we clean it gracefully + std::thread grpc_server_thread (start_grpc_server, "0.0.0.0:5060", 1, std::ref(output_event_queue)); if(!trace_filename.empty() && !trace_is_scap) { @@ -1186,6 +1195,7 @@ int falco_init(int argc, char **argv) stats_filename, stats_interval, all_events, + output_event_queue, result); duration = ((double)clock()) / CLOCKS_PER_SEC - duration; diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 39efea78..8672dfbe 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -119,19 +119,6 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou // ctx.m_status == stream_context::STREAMING // todo > do we want batching? -<<<<<<< HEAD - std::stringstream ss; - int c = 0; - int i = 9; - while(c < i) - { - ss << std::to_string(c); - c++; - } - res.set_source(source::SYSCALL); - res.set_rule(ss.str()); -======= ->>>>>>> new(userspace/falco): grpc server event bus queue if(!m_event_queue.try_pop(res)) {