diff --git a/userspace/engine/falco_common.h b/userspace/engine/falco_common.h index 63af03f0..b62c24ff 100644 --- a/userspace/engine/falco_common.h +++ b/userspace/engine/falco_common.h @@ -60,9 +60,9 @@ namespace falco_common { enum outputs_queue_recovery_type { - RECOVERY_CONTINUE = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ - RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ - RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ + RECOVERY_CONTINUE = 0, /* outputs_queue_capacity recovery strategy of continuing on. */ + RECOVERY_EXIT = 1, /* outputs_queue_capacity recovery strategy of exiting, self OOM kill. */ + RECOVERY_EMPTY = 2, /* outputs_queue_capacity recovery strategy of emptying queue then continuing. */ }; const std::string syscall_source = sinsp_syscall_event_source_name; diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index a94fc87d..f9ca7b2a 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -65,11 +65,11 @@ falco_outputs::falco_outputs( { add_output(output); } -#ifndef __EMSCRIPTEN__ - m_worker_thread = std::thread(&falco_outputs::worker, this); - m_queue.set_capacity(outputs_queue_capacity); - m_recovery = outputs_queue_recovery; m_outputs_queue_num_drops = 0UL; + m_outputs_queue_recovery = outputs_queue_recovery; +#ifndef __EMSCRIPTEN__ + m_queue.set_capacity(outputs_queue_capacity); + m_worker_thread = std::thread(&falco_outputs::worker, this); #endif } @@ -286,19 +286,23 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) #ifndef __EMSCRIPTEN__ if (!m_queue.try_push(cmsg)) { - switch (m_recovery) + switch (m_outputs_queue_recovery) { case falco_common::RECOVERY_EXIT: - fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); + falco_logger::log(LOG_ERR, "Fatal error: Output queue out of memory. Exiting ..."); exit(EXIT_FAILURE); case falco_common::RECOVERY_EMPTY: m_outputs_queue_num_drops += m_queue.size(); - fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); + falco_logger::log(LOG_ERR, "Output queue out of memory. Dropping events in queue due to emptying the queue and continue on ..."); m_queue.empty(); break; - default: + case falco_common::RECOVERY_CONTINUE: m_outputs_queue_num_drops++; - fprintf(stderr, "Output queue out of memory. Continue on ... \n"); + falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event and continue on ..."); + break; + default: + falco_logger::log(LOG_ERR, "Fatal error: strategy unknown. Exiting ..."); + exit(EXIT_FAILURE); break; } } diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 3fbf2169..55492085 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -118,10 +118,10 @@ private: #ifndef __EMSCRIPTEN__ typedef tbb::concurrent_bounded_queue falco_outputs_cbq; falco_outputs_cbq m_queue; - falco_common::outputs_queue_recovery_type m_recovery; - uint64_t m_outputs_queue_num_drops; #endif + falco_common::outputs_queue_recovery_type m_outputs_queue_recovery; + uint64_t m_outputs_queue_num_drops; std::thread m_worker_thread; inline void push(const ctrl_msg& cmsg); inline void push_ctrl(ctrl_msg_type cmt); diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index 700335f3..486570e1 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -89,9 +89,6 @@ stats_writer::stats_writer( : m_initialized(false), m_total_samples(0) { m_config = config; - // capacity and controls should not be relevant for stats outputs, adopt capacity - // for completeness, but do not implement config recovery strategies. - m_queue.set_capacity(config->m_outputs_queue_capacity); if (config->m_metrics_enabled) { if (!config->m_metrics_output_file.empty()) @@ -111,6 +108,9 @@ stats_writer::stats_writer( if (m_initialized) { #ifndef __EMSCRIPTEN__ + // capacity and controls should not be relevant for stats outputs, adopt capacity + // for completeness, but do not implement config recovery strategies. + m_queue.set_capacity(config->m_outputs_queue_capacity); m_worker = std::thread(&stats_writer::worker, this); #endif }