feat(userspace): remove experimental outputs queue recovery strategies

Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
This commit is contained in:
Melissa Kilby 2023-10-10 16:39:42 +00:00 committed by poiana
parent 56401340c3
commit dd807b19c8
9 changed files with 12 additions and 76 deletions

View File

@ -303,24 +303,17 @@ rule_matching: first
# If it does, it is most likely happening due to the entire event flow being too slow, # If it does, it is most likely happening due to the entire event flow being too slow,
# indicating that the server is under heavy load. # indicating that the server is under heavy load.
# #
# Lowering the number of items can prevent memory from steadily increasing until the OOM
# killer stops the Falco process. We provide recovery actions to self-limit or self-kill
# in order to handle this situation earlier, similar to how we expose the kernel buffer size
# as a parameter. However, it will not address the root cause of the event pipe not keeping up.
#
# `capacity`: the maximum number of items allowed in the queue is determined by this value. # `capacity`: the maximum number of items allowed in the queue is determined by this value.
# Setting the value to 0 (which is the default) is equivalent to keeping the queue unbounded. # Setting the value to 0 (which is the default) is equivalent to keeping the queue unbounded.
# In other words, when this configuration is set to 0, the number of allowed items is effectively # In other words, when this configuration is set to 0, the number of allowed items is
# set to the largest possible long value, disabling this setting. # effectively set to the largest possible long value, disabling this setting.
# #
# `recovery`: strategy to follow when the queue becomes filled up. It applies only when the # In the case of an unbounded queue, if the available memory on the system is consumed,
# queue is bounded and there is still available system memory. In the case of an unbounded # the Falco process would be OOM killed. When using this option and setting the capacity,
# queue, if the available memory on the system is consumed, the Falco process would be # the current event would be dropped, and the event loop would continue. This behavior mirrors
# OOM killed. The value `exit` is the default, `continue` does nothing special and `empty` # kernel-side event drops when the buffer between kernel space and user space is full.
# empties the queue and then continues.
outputs_queue: outputs_queue:
capacity: 0 capacity: 0
recovery: exit
########################## ##########################

View File

@ -33,12 +33,6 @@ static std::vector<std::string> rule_matching_names = {
"all" "all"
}; };
static std::vector<std::string> outputs_queue_recovery_names = {
"continue",
"exit",
"empty",
};
bool falco_common::parse_priority(std::string v, priority_type& out) bool falco_common::parse_priority(std::string v, priority_type& out)
{ {
for (size_t i = 0; i < priority_names.size(); i++) for (size_t i = 0; i < priority_names.size(); i++)
@ -66,19 +60,6 @@ falco_common::priority_type falco_common::parse_priority(std::string v)
return out; return out;
} }
bool falco_common::parse_queue_recovery(std::string v, outputs_queue_recovery_type& out)
{
for (size_t i = 0; i < outputs_queue_recovery_names.size(); i++)
{
if (!strcasecmp(v.c_str(), outputs_queue_recovery_names[i].c_str()))
{
out = (outputs_queue_recovery_type) i;
return true;
}
}
return false;
}
bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt) bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt)
{ {
if ((size_t) v < priority_names.size()) if ((size_t) v < priority_names.size())

View File

@ -60,12 +60,6 @@ struct falco_exception : std::exception
namespace falco_common namespace falco_common
{ {
enum outputs_queue_recovery_type {
RECOVERY_CONTINUE = 0, /* outputs_queue_capacity recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* outputs_queue_capacity recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* outputs_queue_capacity recovery strategy of emptying queue then continuing. */
};
const std::string syscall_source = sinsp_syscall_event_source_name; const std::string syscall_source = sinsp_syscall_event_source_name;
// Same as numbers/indices into the above vector // Same as numbers/indices into the above vector
@ -83,7 +77,6 @@ namespace falco_common
bool parse_priority(std::string v, priority_type& out); bool parse_priority(std::string v, priority_type& out);
priority_type parse_priority(std::string v); priority_type parse_priority(std::string v);
bool parse_queue_recovery(std::string v, outputs_queue_recovery_type& out);
bool format_priority(priority_type v, std::string& out, bool shortfmt=false); bool format_priority(priority_type v, std::string& out, bool shortfmt=false);
std::string format_priority(priority_type v, bool shortfmt=false); std::string format_priority(priority_type v, bool shortfmt=false);

View File

@ -65,7 +65,6 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s)
s.config->m_output_timeout, s.config->m_output_timeout,
s.config->m_buffered_outputs, s.config->m_buffered_outputs,
s.config->m_outputs_queue_capacity, s.config->m_outputs_queue_capacity,
s.config->m_outputs_queue_recovery,
s.config->m_time_format_iso_8601, s.config->m_time_format_iso_8601,
hostname)); hostname));

View File

@ -40,7 +40,6 @@ falco_configuration::falco_configuration():
m_watch_config_files(true), m_watch_config_files(true),
m_buffered_outputs(false), m_buffered_outputs(false),
m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE), m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE),
m_outputs_queue_recovery(falco_common::RECOVERY_EXIT),
m_time_format_iso_8601(false), m_time_format_iso_8601(false),
m_output_timeout(2000), m_output_timeout(2000),
m_grpc_enabled(false), m_grpc_enabled(false),
@ -281,11 +280,6 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h
{ {
m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE; m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE;
} }
std::string recovery = config.get_scalar<std::string>("outputs_queue.recovery", "exit");
if (!falco_common::parse_queue_recovery(recovery, m_outputs_queue_recovery))
{
throw std::logic_error("Unknown recovery \"" + recovery + "\"--must be one of exit, continue, empty");
}
m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false); m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false);

View File

@ -72,7 +72,6 @@ public:
bool m_watch_config_files; bool m_watch_config_files;
bool m_buffered_outputs; bool m_buffered_outputs;
size_t m_outputs_queue_capacity; size_t m_outputs_queue_capacity;
falco_common::outputs_queue_recovery_type m_outputs_queue_recovery;
bool m_time_format_iso_8601; bool m_time_format_iso_8601;
uint32_t m_output_timeout; uint32_t m_output_timeout;

View File

@ -48,7 +48,6 @@ falco_outputs::falco_outputs(
uint32_t timeout, uint32_t timeout,
bool buffered, bool buffered,
size_t outputs_queue_capacity, size_t outputs_queue_capacity,
falco_common::outputs_queue_recovery_type outputs_queue_recovery,
bool time_format_iso_8601, bool time_format_iso_8601,
const std::string& hostname) const std::string& hostname)
{ {
@ -67,7 +66,6 @@ falco_outputs::falco_outputs(
add_output(output); add_output(output);
} }
m_outputs_queue_num_drops = {0}; m_outputs_queue_num_drops = {0};
m_outputs_queue_recovery = outputs_queue_recovery;
#ifndef __EMSCRIPTEN__ #ifndef __EMSCRIPTEN__
m_queue.set_capacity(outputs_queue_capacity); m_queue.set_capacity(outputs_queue_capacity);
m_worker_thread = std::thread(&falco_outputs::worker, this); m_worker_thread = std::thread(&falco_outputs::worker, this);
@ -287,29 +285,11 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
#ifndef __EMSCRIPTEN__ #ifndef __EMSCRIPTEN__
if (!m_queue.try_push(cmsg)) if (!m_queue.try_push(cmsg))
{ {
switch (m_outputs_queue_recovery) if(m_outputs_queue_num_drops.load() == 0)
{ {
case falco_common::RECOVERY_EXIT: falco_logger::log(LOG_ERR, "Outputs queue out of memory. Drop event and continue on ...");
throw falco_exception("Fatal error: Output queue out of memory. Exiting ...");
case falco_common::RECOVERY_EMPTY:
/* Print a log just the first time */
if(m_outputs_queue_num_drops.load() == 0)
{
falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event plus events in queue due to emptying the queue; continue on ...");
}
m_outputs_queue_num_drops += m_queue.size() + 1;
m_queue.clear();
break;
case falco_common::RECOVERY_CONTINUE:
if(m_outputs_queue_num_drops.load() == 0)
{
falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event and continue on ...");
}
m_outputs_queue_num_drops++;
break;
default:
throw falco_exception("Fatal error: strategy unknown. Exiting ...");
} }
m_outputs_queue_num_drops++;
} }
#else #else
for (auto o : m_outputs) for (auto o : m_outputs)

View File

@ -50,7 +50,6 @@ public:
uint32_t timeout, uint32_t timeout,
bool buffered, bool buffered,
size_t outputs_queue_capacity, size_t outputs_queue_capacity,
falco_common::outputs_queue_recovery_type outputs_queue_recovery,
bool time_format_iso_8601, bool time_format_iso_8601,
const std::string& hostname); const std::string& hostname);
@ -87,8 +86,8 @@ public:
void reopen_outputs(); void reopen_outputs();
/*! /*!
\brief Return the number of currently dropped events as a result of failed push attempts \brief Return the number of events currently dropped due to failed push
into the outputs queue when using `continue` or `empty` recovery strategies. attempts into the outputs queue
*/ */
uint64_t get_outputs_queue_num_drops(); uint64_t get_outputs_queue_num_drops();
@ -121,7 +120,6 @@ private:
falco_outputs_cbq m_queue; falco_outputs_cbq m_queue;
#endif #endif
falco_common::outputs_queue_recovery_type m_outputs_queue_recovery;
std::atomic<uint64_t> m_outputs_queue_num_drops; std::atomic<uint64_t> m_outputs_queue_num_drops;
std::thread m_worker_thread; std::thread m_worker_thread;
inline void push(const ctrl_msg& cmsg); inline void push(const ctrl_msg& cmsg);

View File

@ -132,8 +132,7 @@ stats_writer::stats_writer(
if (m_initialized) if (m_initialized)
{ {
#ifndef __EMSCRIPTEN__ #ifndef __EMSCRIPTEN__
// capacity and controls should not be relevant for stats outputs, adopt capacity // Adopt capacity for completeness, even if it's likely not relevant
// for completeness, but do not implement config recovery strategies.
m_queue.set_capacity(config->m_outputs_queue_capacity); m_queue.set_capacity(config->m_outputs_queue_capacity);
m_worker = std::thread(&stats_writer::worker, this); m_worker = std::thread(&stats_writer::worker, this);
#endif #endif