fix(outputs): expose queue_capacity_outputs config for memory control

Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
This commit is contained in:
Melissa Kilby 2023-08-01 05:02:45 +00:00 committed by poiana
parent d1b932d2e9
commit b55b209edf
8 changed files with 76 additions and 3 deletions

View File

@ -39,6 +39,7 @@
# json_include_tags_property
# buffered_outputs
# outputs (throttling)
# queue_capacity_outputs
# Falco outputs channels
# stdout_output
# syslog_output
@ -319,6 +320,27 @@ outputs:
# deploying it in production.
rule_matching: first
# [Experimental] `queue_capacity_outputs`
#
# Falco utilizes tbb::concurrent_bounded_queue for the outputs, and this parameter
# allows you to customize the capacity. Refer to the official documentation:
# https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html.
# On a healthy system with tuned Falco rules, the queue should not fill up.
# If it does, it most likely happens if the entire event flow is too slow. This
# could indicate that the server is under heavy load.
#
# Lowering the number of items can prevent steadily increasing memory until the OOM
# killer stops the Falco process. We expose recovery actions to self-limit or self
# OOM kill earlier similar to how we expose the kernel buffer size as parameter.
# However, it will not address the root cause of the event pipe not holding up.
queue_capacity_outputs:
# number of max items in queue
items: 1000000
# continue: 0 (default)
# exit: 1
# empty queue then continue: 2
recovery: 0
##########################
# Falco outputs channels #

View File

@ -63,6 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s)
s.config->m_json_include_tags_property,
s.config->m_output_timeout,
s.config->m_buffered_outputs,
s.config->m_queue_capacity_outputs_items,
s.config->m_queue_capacity_outputs_recovery,
s.config->m_time_format_iso_8601,
hostname));

View File

@ -28,6 +28,7 @@ limitations under the License.
#include "falco_utils.h"
#include "configuration.h"
#include "configuration_aux.h"
#include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used
@ -40,6 +41,8 @@ falco_configuration::falco_configuration():
m_rule_matching(falco_common::rule_matching::FIRST),
m_watch_config_files(true),
m_buffered_outputs(false),
m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS),
m_queue_capacity_outputs_recovery(RECOVERY_DROP_CURRENT),
m_time_format_iso_8601(false),
m_output_timeout(2000),
m_grpc_enabled(false),
@ -281,6 +284,8 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h
}
m_buffered_outputs = config.get_scalar<bool>("buffered_outputs", false);
m_queue_capacity_outputs_items = config.get_scalar<size_t>("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS);
m_queue_capacity_outputs_recovery = config.get_scalar<uint32_t>("queue_capacity_outputs.recovery", RECOVERY_DROP_CURRENT);
m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false);
m_webserver_enabled = config.get_scalar<bool>("webserver.enabled", false);

View File

@ -72,6 +72,8 @@ public:
bool m_watch_config_files;
bool m_buffered_outputs;
size_t m_queue_capacity_outputs_items;
uint32_t m_queue_capacity_outputs_recovery;
bool m_time_format_iso_8601;
uint32_t m_output_timeout;

View File

@ -0,0 +1,22 @@
/*
Copyright (C) 2023 The Falco Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 1000000UL
enum outputs_recovery_code {
RECOVERY_DROP_CURRENT = 0, /* queue_capacity_outputs recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */
};

View File

@ -19,8 +19,8 @@ limitations under the License.
#endif
#include "falco_outputs.h"
#include "config_falco.h"
#include "configuration_aux.h"
#include "formats.h"
#include "logger.h"
@ -47,6 +47,8 @@ falco_outputs::falco_outputs(
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t queue_capacity_outputs_items,
uint32_t queue_capacity_outputs_recovery,
bool time_format_iso_8601,
const std::string& hostname)
{
@ -66,6 +68,8 @@ falco_outputs::falco_outputs(
}
#ifndef __EMSCRIPTEN__
m_worker_thread = std::thread(&falco_outputs::worker, this);
m_queue.set_capacity(queue_capacity_outputs_items);
m_recovery = queue_capacity_outputs_recovery;
#endif
}
@ -282,8 +286,18 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
#ifndef __EMSCRIPTEN__
if (!m_queue.try_push(cmsg))
{
fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n");
exit(EXIT_FAILURE);
switch (m_recovery)
{
case RECOVERY_EXIT:
fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting ... \n");
exit(EXIT_FAILURE);
case RECOVERY_EMPTY:
fprintf(stderr, "Output queue reached maximum capacity. Empty queue and continue ... \n");
m_queue.empty();
default:
fprintf(stderr, "Output queue reached maximum capacity. Continue on ... \n");
break;
}
}
#else
for (auto o : m_outputs)

View File

@ -48,6 +48,8 @@ public:
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t queue_capacity_outputs_items,
uint32_t queue_capacity_outputs_recovery,
bool time_format_iso_8601,
const std::string& hostname);
@ -110,6 +112,7 @@ private:
#ifndef __EMSCRIPTEN__
typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq;
falco_outputs_cbq m_queue;
uint32_t m_recovery;
#endif
std::thread m_worker_thread;

View File

@ -89,6 +89,9 @@ stats_writer::stats_writer(
: m_initialized(false), m_total_samples(0)
{
m_config = config;
// capacity and controls should not be relevant for stats outputs, adopt capacity
// for completeness, but do not implement config recovery strategies.
m_queue.set_capacity(config->m_queue_capacity_outputs_items);
if (config->m_metrics_enabled)
{
if (!config->m_metrics_output_file.empty())