cleanup(outputs): adopt different style for outputs_queue params encodings

Co-authored-by: Leonardo Grasso <me@leonardograsso.com>
Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
This commit is contained in:
Melissa Kilby
2023-08-03 03:04:15 +00:00
committed by poiana
parent 03a557725b
commit 85883b7200
10 changed files with 66 additions and 54 deletions

View File

@@ -39,7 +39,8 @@
# json_include_tags_property # json_include_tags_property
# buffered_outputs # buffered_outputs
# outputs (throttling) # outputs (throttling)
# queue_capacity_outputs # rule_matching
# outputs_queue
# Falco outputs channels # Falco outputs channels
# stdout_output # stdout_output
# syslog_output # syslog_output
@@ -320,7 +321,7 @@ outputs:
# deploying it in production. # deploying it in production.
rule_matching: first rule_matching: first
# [Experimental] `queue_capacity_outputs` # [Experimental] `outputs_queue`
# #
# Falco utilizes tbb::concurrent_bounded_queue for handling outputs, and this parameter # Falco utilizes tbb::concurrent_bounded_queue for handling outputs, and this parameter
# allows you to customize the queue capacity. Please refer to the official documentation: # allows you to customize the queue capacity. Please refer to the official documentation:
@@ -335,18 +336,17 @@ rule_matching: first
# as a parameter. # as a parameter.
# However, it will not address the root cause of the event pipe not keeping up. # However, it will not address the root cause of the event pipe not keeping up.
# #
# `items`: the maximum number of items allowed in the queue, defaulting to 0. This means that # `capacity`: the maximum number of items allowed in the queue, defaulting to 0. This means that
# the queue is unbounded. # the queue remains unbounded aka this setting is disabled.
# You can experiment with values greater or smaller than the anchor value 1000000. # You can experiment with values greater or smaller than the anchor value 1000000.
# #
# `recovery`: the strategy to follow when the queue becomes filled up. This also applies when # `recovery`: the strategy to follow when the queue becomes filled up. This also applies when
# the queue is unbounded, and all available memory on the system is consumed. # the queue is unbounded, and all available memory on the system is consumed.
# recovery: 0 means continue. # `exit` is default, `continue` does nothing special and `empty` empties the queue and then
# recovery: 1 means simply exit (default behavior). # continues.
# recovery: 2 means empty the queue and then continue. outputs_queue:
queue_capacity_outputs: capacity: 0
items: 0 recovery: exit
recovery: 1
########################## ##########################

View File

@@ -27,10 +27,18 @@ static std::vector<std::string> priority_names = {
"Debug" "Debug"
}; };
<<<<<<< HEAD
static std::vector<std::string> rule_matching_names = { static std::vector<std::string> rule_matching_names = {
"first", "first",
"all" "all"
}; };
=======
static std::vector<std::string> outputs_recovery_names = {
"continue",
"exit",
"empty",
};
>>>>>>> 92bd5767 (cleanup(outputs): adopt different style for outputs_queue params encodings)
bool falco_common::parse_priority(std::string v, priority_type& out) bool falco_common::parse_priority(std::string v, priority_type& out)
{ {
@@ -59,6 +67,19 @@ falco_common::priority_type falco_common::parse_priority(std::string v)
return out; return out;
} }
bool falco_common::parse_recovery(std::string v, outputs_recovery_type& out)
{
for (size_t i = 0; i < outputs_recovery_names.size(); i++)
{
if (!strcasecmp(v.c_str(), outputs_recovery_names[i].c_str()))
{
out = (outputs_recovery_type) i;
return true;
}
}
return false;
}
bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt) bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt)
{ {
if ((size_t) v < priority_names.size()) if ((size_t) v < priority_names.size())

View File

@@ -21,6 +21,8 @@ limitations under the License.
#include <mutex> #include <mutex>
#include <sinsp.h> #include <sinsp.h>
#define DEFAULT_OUTPUTS_QUEUE_CAPACITY 0
// //
// Most falco_* classes can throw exceptions. Unless directly related // Most falco_* classes can throw exceptions. Unless directly related
// to low-level failures like inability to open file, etc, they will // to low-level failures like inability to open file, etc, they will
@@ -52,6 +54,13 @@ struct falco_exception : std::exception
namespace falco_common namespace falco_common
{ {
enum outputs_recovery_type {
RECOVERY_CONTINUE = 0, /* queue_capacity_outputs recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */
};
const std::string syscall_source = sinsp_syscall_event_source_name; const std::string syscall_source = sinsp_syscall_event_source_name;
// Same as numbers/indices into the above vector // Same as numbers/indices into the above vector
@@ -69,6 +78,7 @@ namespace falco_common
bool parse_priority(std::string v, priority_type& out); bool parse_priority(std::string v, priority_type& out);
priority_type parse_priority(std::string v); priority_type parse_priority(std::string v);
bool parse_recovery(std::string v, outputs_recovery_type& out);
bool format_priority(priority_type v, std::string& out, bool shortfmt=false); bool format_priority(priority_type v, std::string& out, bool shortfmt=false);
std::string format_priority(priority_type v, bool shortfmt=false); std::string format_priority(priority_type v, bool shortfmt=false);

View File

@@ -63,8 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s)
s.config->m_json_include_tags_property, s.config->m_json_include_tags_property,
s.config->m_output_timeout, s.config->m_output_timeout,
s.config->m_buffered_outputs, s.config->m_buffered_outputs,
s.config->m_queue_capacity_outputs_items, s.config->m_outputs_queue_capacity,
s.config->m_queue_capacity_outputs_recovery, s.config->m_outputs_queue_recovery,
s.config->m_time_format_iso_8601, s.config->m_time_format_iso_8601,
hostname)); hostname));

View File

@@ -28,7 +28,6 @@ limitations under the License.
#include "falco_utils.h" #include "falco_utils.h"
#include "configuration.h" #include "configuration.h"
#include "configuration_aux.h"
#include "logger.h" #include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used #include "banned.h" // This raises a compilation error when certain functions are used
@@ -41,8 +40,8 @@ falco_configuration::falco_configuration():
m_rule_matching(falco_common::rule_matching::FIRST), m_rule_matching(falco_common::rule_matching::FIRST),
m_watch_config_files(true), m_watch_config_files(true),
m_buffered_outputs(false), m_buffered_outputs(false),
m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS), m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY),
m_queue_capacity_outputs_recovery(RECOVERY_EXIT), m_outputs_queue_recovery(falco_common::RECOVERY_EXIT),
m_time_format_iso_8601(false), m_time_format_iso_8601(false),
m_output_timeout(2000), m_output_timeout(2000),
m_grpc_enabled(false), m_grpc_enabled(false),
@@ -284,8 +283,13 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h
} }
m_buffered_outputs = config.get_scalar<bool>("buffered_outputs", false); m_buffered_outputs = config.get_scalar<bool>("buffered_outputs", false);
m_queue_capacity_outputs_items = config.get_scalar<size_t>("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS); m_outputs_queue_capacity = config.get_scalar<size_t>("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY);
m_queue_capacity_outputs_recovery = config.get_scalar<uint32_t>("queue_capacity_outputs.recovery", RECOVERY_EXIT); std::string recovery = config.get_scalar<std::string>("outputs_queue.recovery", "exit");
if (!falco_common::parse_recovery(recovery, m_outputs_queue_recovery))
{
throw std::logic_error("Unknown recovery \"" + recovery + "\"--must be one of exit, continue, empty");
}
m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false); m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false);
m_webserver_enabled = config.get_scalar<bool>("webserver.enabled", false); m_webserver_enabled = config.get_scalar<bool>("webserver.enabled", false);

View File

@@ -72,8 +72,8 @@ public:
bool m_watch_config_files; bool m_watch_config_files;
bool m_buffered_outputs; bool m_buffered_outputs;
size_t m_queue_capacity_outputs_items; size_t m_outputs_queue_capacity;
uint32_t m_queue_capacity_outputs_recovery; falco_common::outputs_recovery_type m_outputs_queue_recovery;
bool m_time_format_iso_8601; bool m_time_format_iso_8601;
uint32_t m_output_timeout; uint32_t m_output_timeout;

View File

@@ -1,22 +0,0 @@
/*
Copyright (C) 2023 The Falco Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 0
enum outputs_recovery_code {
RECOVERY_DROP_CURRENT = 0, /* queue_capacity_outputs recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */
};

View File

@@ -20,7 +20,6 @@ limitations under the License.
#include "falco_outputs.h" #include "falco_outputs.h"
#include "config_falco.h" #include "config_falco.h"
#include "configuration_aux.h"
#include "formats.h" #include "formats.h"
#include "logger.h" #include "logger.h"
@@ -47,8 +46,8 @@ falco_outputs::falco_outputs(
bool json_include_tags_property, bool json_include_tags_property,
uint32_t timeout, uint32_t timeout,
bool buffered, bool buffered,
size_t queue_capacity_outputs_items, size_t outputs_queue_capacity,
uint32_t queue_capacity_outputs_recovery, falco_common::outputs_recovery_type outputs_queue_recovery,
bool time_format_iso_8601, bool time_format_iso_8601,
const std::string& hostname) const std::string& hostname)
{ {
@@ -68,12 +67,11 @@ falco_outputs::falco_outputs(
} }
#ifndef __EMSCRIPTEN__ #ifndef __EMSCRIPTEN__
m_worker_thread = std::thread(&falco_outputs::worker, this); m_worker_thread = std::thread(&falco_outputs::worker, this);
if (queue_capacity_outputs_items > 0) if (outputs_queue_capacity > 0)
{ {
m_queue.set_capacity(queue_capacity_outputs_items); m_queue.set_capacity(outputs_queue_capacity);
} }
m_recovery = outputs_queue_recovery;
m_recovery = queue_capacity_outputs_recovery;
#endif #endif
} }
@@ -292,12 +290,13 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
{ {
switch (m_recovery) switch (m_recovery)
{ {
case RECOVERY_EXIT: case falco_common::RECOVERY_EXIT:
fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
case RECOVERY_EMPTY: case falco_common::RECOVERY_EMPTY:
fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n");
m_queue.empty(); m_queue.empty();
break;
default: default:
fprintf(stderr, "Output queue out of memory. Continue on ... \n"); fprintf(stderr, "Output queue out of memory. Continue on ... \n");
break; break;

View File

@@ -48,8 +48,8 @@ public:
bool json_include_tags_property, bool json_include_tags_property,
uint32_t timeout, uint32_t timeout,
bool buffered, bool buffered,
size_t queue_capacity_outputs_items, size_t outputs_queue_capacity,
uint32_t queue_capacity_outputs_recovery, falco_common::outputs_recovery_type outputs_queue_recovery,
bool time_format_iso_8601, bool time_format_iso_8601,
const std::string& hostname); const std::string& hostname);

View File

@@ -91,9 +91,9 @@ stats_writer::stats_writer(
m_config = config; m_config = config;
// capacity and controls should not be relevant for stats outputs, adopt capacity // capacity and controls should not be relevant for stats outputs, adopt capacity
// for completeness, but do not implement config recovery strategies. // for completeness, but do not implement config recovery strategies.
if (config->m_queue_capacity_outputs_items > 0) if (config->m_outputs_queue_capacity > 0)
{ {
m_queue.set_capacity(config->m_queue_capacity_outputs_items); m_queue.set_capacity(config->m_outputs_queue_capacity);
} }
if (config->m_metrics_enabled) if (config->m_metrics_enabled)
{ {