cleanup: apply more reviewers suggestions

Co-authored-by: Andrea Terzolo <andreaterzolo3@gmail.com>
Co-authored-by: Leonardo Grasso <me@leonardograsso.com>
Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
This commit is contained in:
Melissa Kilby 2023-08-25 21:43:05 +00:00 committed by poiana
parent 016fdae93b
commit 0eff98aa8e
4 changed files with 21 additions and 17 deletions

View File

@ -60,9 +60,9 @@ namespace falco_common
{ {
enum outputs_queue_recovery_type { enum outputs_queue_recovery_type {
RECOVERY_CONTINUE = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ RECOVERY_CONTINUE = 0, /* outputs_queue_capacity recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ RECOVERY_EXIT = 1, /* outputs_queue_capacity recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ RECOVERY_EMPTY = 2, /* outputs_queue_capacity recovery strategy of emptying queue then continuing. */
}; };
const std::string syscall_source = sinsp_syscall_event_source_name; const std::string syscall_source = sinsp_syscall_event_source_name;

View File

@ -65,11 +65,11 @@ falco_outputs::falco_outputs(
{ {
add_output(output); add_output(output);
} }
#ifndef __EMSCRIPTEN__
m_worker_thread = std::thread(&falco_outputs::worker, this);
m_queue.set_capacity(outputs_queue_capacity);
m_recovery = outputs_queue_recovery;
m_outputs_queue_num_drops = 0UL; m_outputs_queue_num_drops = 0UL;
m_outputs_queue_recovery = outputs_queue_recovery;
#ifndef __EMSCRIPTEN__
m_queue.set_capacity(outputs_queue_capacity);
m_worker_thread = std::thread(&falco_outputs::worker, this);
#endif #endif
} }
@ -286,19 +286,23 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
#ifndef __EMSCRIPTEN__ #ifndef __EMSCRIPTEN__
if (!m_queue.try_push(cmsg)) if (!m_queue.try_push(cmsg))
{ {
switch (m_recovery) switch (m_outputs_queue_recovery)
{ {
case falco_common::RECOVERY_EXIT: case falco_common::RECOVERY_EXIT:
fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); falco_logger::log(LOG_ERR, "Fatal error: Output queue out of memory. Exiting ...");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
case falco_common::RECOVERY_EMPTY: case falco_common::RECOVERY_EMPTY:
m_outputs_queue_num_drops += m_queue.size(); m_outputs_queue_num_drops += m_queue.size();
fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); falco_logger::log(LOG_ERR, "Output queue out of memory. Dropping events in queue due to emptying the queue and continue on ...");
m_queue.empty(); m_queue.empty();
break; break;
default: case falco_common::RECOVERY_CONTINUE:
m_outputs_queue_num_drops++; m_outputs_queue_num_drops++;
fprintf(stderr, "Output queue out of memory. Continue on ... \n"); falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event and continue on ...");
break;
default:
falco_logger::log(LOG_ERR, "Fatal error: strategy unknown. Exiting ...");
exit(EXIT_FAILURE);
break; break;
} }
} }

View File

@ -118,10 +118,10 @@ private:
#ifndef __EMSCRIPTEN__ #ifndef __EMSCRIPTEN__
typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq; typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq;
falco_outputs_cbq m_queue; falco_outputs_cbq m_queue;
falco_common::outputs_queue_recovery_type m_recovery;
uint64_t m_outputs_queue_num_drops;
#endif #endif
falco_common::outputs_queue_recovery_type m_outputs_queue_recovery;
uint64_t m_outputs_queue_num_drops;
std::thread m_worker_thread; std::thread m_worker_thread;
inline void push(const ctrl_msg& cmsg); inline void push(const ctrl_msg& cmsg);
inline void push_ctrl(ctrl_msg_type cmt); inline void push_ctrl(ctrl_msg_type cmt);

View File

@ -89,9 +89,6 @@ stats_writer::stats_writer(
: m_initialized(false), m_total_samples(0) : m_initialized(false), m_total_samples(0)
{ {
m_config = config; m_config = config;
// capacity and controls should not be relevant for stats outputs, adopt capacity
// for completeness, but do not implement config recovery strategies.
m_queue.set_capacity(config->m_outputs_queue_capacity);
if (config->m_metrics_enabled) if (config->m_metrics_enabled)
{ {
if (!config->m_metrics_output_file.empty()) if (!config->m_metrics_output_file.empty())
@ -111,6 +108,9 @@ stats_writer::stats_writer(
if (m_initialized) if (m_initialized)
{ {
#ifndef __EMSCRIPTEN__ #ifndef __EMSCRIPTEN__
// capacity and controls should not be relevant for stats outputs, adopt capacity
// for completeness, but do not implement config recovery strategies.
m_queue.set_capacity(config->m_outputs_queue_capacity);
m_worker = std::thread(&stats_writer::worker, this); m_worker = std::thread(&stats_writer::worker, this);
#endif #endif
} }