update(userspace): refactor metrics data flow and fix bugs

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce
2023-05-19 15:45:12 +00:00
committed by poiana
parent f0ac327f98
commit f117d5273c
9 changed files with 172 additions and 135 deletions

View File

@@ -50,7 +50,7 @@ static re2::RE2 s_rgx_prometheus_unit(RGX_PROMETHEUS_UNIT_PATTERN, re2::RE2::POS
#define HOUR_TO_MS MINUTE_TO_MS * 60
#define DAY_TO_MS HOUR_TO_MS * 24
#define WEEK_TO_MS DAY_TO_MS * 7
#define YEAR_TO_MS WEEK_TO_MS * 365
#define YEAR_TO_MS DAY_TO_MS * 365
namespace falco
{

View File

@@ -15,10 +15,32 @@ limitations under the License.
*/
#include "actions.h"
#include "falco_metrics.h"
using namespace falco::app;
using namespace falco::app::actions;
// applies legacy/in-deprecation options to the current config
static void apply_deprecated_options(
const falco::app::options& opts,
const std::shared_ptr<falco_configuration>& cfg)
{
if (!opts.stats_output_file.empty() || !opts.stats_interval.empty())
{
falco_logger::log(LOG_WARNING, "Options '-s' and '--stats-interval' are deprecated, metrics must be configured in the config file");
if (!opts.stats_output_file.empty())
{
cfg->m_metrics_enabled = true;
cfg->m_metrics_output_file = opts.stats_output_file;
if (!opts.stats_interval.empty())
{
cfg->m_metrics_interval_str = opts.stats_interval;
cfg->m_metrics_interval = falco::metrics::parse_metrics_interval(cfg->m_metrics_interval_str);
}
}
}
}
falco::app::run_result falco::app::actions::load_config(falco::app::state& s)
{
try
@@ -51,6 +73,8 @@ falco::app::run_result falco::app::actions::load_config(falco::app::state& s)
s.config->m_buffered_outputs = !s.options.unbuffered_outputs;
apply_deprecated_options(s.options, s.config);
return run_result::ok();
}

View File

@@ -397,67 +397,53 @@ static void process_inspector_events(
}
}
static std::shared_ptr<stats_writer> init_stats_writer(const options& opts, std::shared_ptr<falco_outputs> outputs, std::shared_ptr<falco_configuration> config)
static falco::app::run_result init_stats_writer(
const std::shared_ptr<const stats_writer>& sw,
const std::shared_ptr<const falco_configuration>& config)
{
auto statsw = std::make_shared<stats_writer>(outputs, config);
std::string err;
uint64_t interval = 0;
/* Continue cmd args support and old defaults for backward compatibility, scheduled for deprecation. */
if(!config->m_metrics_enabled && opts.stats_interval > 0)
if (!config->m_metrics_enabled)
{
interval = opts.stats_interval;
}
/* New metrics and configs over falco.yaml. */
else if(config->m_metrics_enabled && config->m_metrics_interval > 0)
{
interval = config->m_metrics_interval;
return falco::app::run_result::ok();
}
/* Enforce minimum bound of 100ms. */
if(interval > 100)
if(config->m_metrics_interval < 100)
{
if(!stats_writer::init_ticker(interval, err))
{
throw falco_exception(err);
}
/* Only support new info message for new metrics and configs over falco.yaml. */
if(config->m_metrics_enabled)
{
falco_logger::log(LOG_INFO, "Setting metrics interval to " + config->m_metrics_interval_str + ", equivalent to " + std::to_string(interval) + " (ms)\n");
}
return falco::app::run_result::fatal("Metrics interval must have a minimum value of 100ms");
}
/* Continue cmd args support for backward compatibility, scheduled for deprecation. */
if(!config->m_metrics_enabled && !opts.output_file.empty())
if (config->m_metrics_enabled && !sw->has_output())
{
statsw.reset(new stats_writer(opts.output_file, outputs, config));
falco_logger::log(LOG_WARNING, "Metrics are enabled with no output configured, no snapshot will be collected");
}
/* New metrics and configs over falco.yaml. */
else if(config->m_metrics_enabled && !config->m_metrics_output_file.empty())
{
statsw.reset(new stats_writer(config->m_metrics_output_file, outputs, config));
}
return statsw;
auto res = falco::app::run_result::ok();
res.success = stats_writer::init_ticker(config->m_metrics_interval, res.errstr);
res.proceed = res.success;
return res;
}
falco::app::run_result falco::app::actions::process_events(falco::app::state& s)
{
run_result res = run_result::ok();
bool termination_forced = false;
// Notify engine that we finished loading and enabling all rules
s.engine->complete_rule_loading();
// Initialize stats writer
auto statsw = init_stats_writer(s.options, s.outputs, s.config);
if (s.options.dry_run)
{
falco_logger::log(LOG_DEBUG, "Skipping event processing in dry-run\n");
return run_result::ok();
}
// Initialize stats writer
auto statsw = std::make_shared<stats_writer>(s.outputs, s.config);
auto res = init_stats_writer(statsw, s.config);
if (!res.success)
{
return res;
}
// Start processing events
bool termination_forced = false;
if(s.is_capture_mode())
{
res = open_offline_inspector(s);

View File

@@ -219,8 +219,8 @@ void options::define(cxxopts::Options& opts)
("p,print", "Add additional information to each falco notification's output.\nWith -pc or -pcontainer will use a container-friendly format.\nWith -pk or -pkubernetes will use a kubernetes-friendly format.\nAdditionally, specifying -pc/-pk will change the interpretation of %container.info in rule output fields.", cxxopts::value(print_additional), "<output_format>")
("P,pidfile", "When run as a daemon, write pid to specified file", cxxopts::value(pidfilename)->default_value("/var/run/falco.pid"), "<pid_file>")
("r", "Rules file/directory (defaults to value set in configuration file, or /etc/falco_rules.yaml). This option can be passed multiple times to read from multiple files/directories.", cxxopts::value<std::vector<std::string>>(), "<rules_file>")
("s", "If specified, append statistics related to Falco's reading/processing of events to this file (only useful in live mode).", cxxopts::value(output_file), "<stats_file>")
("stats-interval", "When using -s <stats_file>, write statistics every <msec> ms. This uses signals, so don't recommend intervals below 200 ms. Defaults to 5000 (5 seconds).", cxxopts::value(stats_interval)->default_value("5000"), "<msec>")
("s", "If specified, append statistics related to Falco's reading/processing of events to this file (only useful in live mode).", cxxopts::value(stats_output_file), "<stats_file>")
("stats-interval", "When using -s <stats_file>, write statistics every <msec> ms. This uses signals, and has a minimum threshold of 100 ms. Defaults to 5000 (5 seconds).", cxxopts::value(stats_interval), "<msec>")
("S,snaplen", "Capture the first <len> bytes of each I/O buffer. By default, the first 80 bytes are captured. Use this option with caution, it can generate huge trace files.", cxxopts::value(snaplen)->default_value("0"), "<len>")
("support", "Print support information including version, rules files used, etc. and exit.", cxxopts::value(print_support)->default_value("false"))
("T", "Disable any rules with a tag=<tag>. This option can be passed multiple times. Can not be mized with -t", cxxopts::value<std::vector<std::string>>(), "<tag>")

View File

@@ -70,8 +70,8 @@ public:
std::string pidfilename;
// Rules list as passed by the user, via cmdline option '-r'
std::list<std::string> rules_filenames;
std::string output_file;
uint64_t stats_interval;
std::string stats_output_file;
std::string stats_interval;
uint64_t snaplen;
bool print_support;
std::set<std::string> disabled_rule_tags;

View File

@@ -61,7 +61,15 @@ falco_configuration::falco_configuration():
m_cpus_for_each_syscall_buffer(2),
m_syscall_drop_failed_exit(false),
m_base_syscalls_repair(false),
m_metrics_enabled(false)
m_metrics_enabled(false),
m_metrics_interval_str("5000"),
m_metrics_interval(5000),
m_metrics_stats_rule_enabled(false),
m_metrics_output_file(""),
m_metrics_resource_utilization_enabled(true),
m_metrics_kernel_event_counters_enabled(true),
m_metrics_libbpf_stats_enabled(true),
m_metrics_convert_memory_to_mb(true)
{
init({});
}
@@ -341,9 +349,9 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h
m_base_syscalls_repair = config.get_scalar<bool>("base_syscalls.repair", false);
m_metrics_enabled = config.get_scalar<bool>("metrics.enabled", false);
m_metrics_interval_str = config.get_scalar<std::string>("metrics.interval", "0");
m_metrics_interval_str = config.get_scalar<std::string>("metrics.interval", "5000");
m_metrics_interval = falco::metrics::parse_metrics_interval(m_metrics_interval_str);
m_metrics_stats_rule_enabled = config.get_scalar<bool>("metrics.stats_rule_enabled", true);
m_metrics_stats_rule_enabled = config.get_scalar<bool>("metrics.output_rule", false);
m_metrics_output_file = config.get_scalar<std::string>("metrics.output_file", "");
m_metrics_resource_utilization_enabled = config.get_scalar<bool>("metrics.resource_utilization_enabled", true);
m_metrics_kernel_event_counters_enabled = config.get_scalar<bool>("metrics.kernel_event_counters_enabled", true);

View File

@@ -191,6 +191,7 @@ void falco_outputs::handle_msg(uint64_t ts,
jmsg["time"] = iso8601evttime;
jmsg["output_fields"] = output_fields;
jmsg["hostname"] = m_hostname;
jmsg["source"] = s_internal_source;
cmsg.msg = jmsg.dump();
}

View File

@@ -68,21 +68,32 @@ stats_writer::ticker_t stats_writer::get_ticker()
return s_timer.load(std::memory_order_relaxed);
}
stats_writer::stats_writer(std::shared_ptr<falco_outputs> outputs, std::shared_ptr<falco_configuration> config)
stats_writer::stats_writer(
const std::shared_ptr<falco_outputs>& outputs,
const std::shared_ptr<const falco_configuration>& config)
: m_initialized(false), m_total_samples(0)
{
m_outputs = outputs;
m_config = config;
}
if (config->m_metrics_enabled)
{
if (!config->m_metrics_output_file.empty())
{
m_file_output.exceptions(std::ofstream::failbit | std::ofstream::badbit);
m_file_output.open(config->m_metrics_output_file, std::ios_base::app);
m_initialized = true;
}
stats_writer::stats_writer(const std::string &filename, std::shared_ptr<falco_outputs> outputs, std::shared_ptr<falco_configuration> config)
: m_initialized(true), m_total_samples(0)
{
m_output.exceptions(std::ofstream::failbit | std::ofstream::badbit);
m_output.open(filename, std::ios_base::app);
m_worker = std::thread(&stats_writer::worker, this);
m_outputs = outputs;
m_config = config;
if (config->m_metrics_stats_rule_enabled)
{
m_outputs = outputs;
m_initialized = true;
}
}
if (m_initialized)
{
m_worker = std::thread(&stats_writer::worker, this);
}
}
stats_writer::~stats_writer()
@@ -90,15 +101,13 @@ stats_writer::~stats_writer()
if (m_initialized)
{
stop_worker();
m_output.close();
if (!m_config->m_metrics_output_file.empty())
{
m_file_output.close();
}
}
}
bool stats_writer::has_output() const
{
return m_initialized;
}
void stats_writer::stop_worker()
{
stats_writer::msg msg;
@@ -123,8 +132,11 @@ void stats_writer::worker() noexcept
{
stats_writer::msg m;
nlohmann::json jmsg;
bool use_outputs = m_config->m_metrics_stats_rule_enabled;
bool use_file = !m_config->m_metrics_output_file.empty();
auto tick = stats_writer::get_ticker();
auto last_tick = tick;
auto first_tick = tick;
while(true)
{
@@ -135,15 +147,33 @@ void stats_writer::worker() noexcept
return;
}
// this helps waiting for the first tick
tick = stats_writer::get_ticker();
if (last_tick != tick)
if (first_tick != tick)
{
m_total_samples++;
if (last_tick != tick)
{
m_total_samples++;
}
last_tick = tick;
try
{
jmsg["sample"] = m_total_samples;
jmsg["output_fields"] = m.output_fields;
m_output << jmsg.dump() << std::endl;
if (use_outputs)
{
std::string rule = "Falco internal: metrics snapshot";
std::string msg = "Falco metrics snapshot";
uint64_t ts = 0; // todo: pass timestamp in message
std::map<std::string,std::string> fields = {m.output_fields.begin(), m.output_fields.end()};
m_outputs->handle_msg(ts, falco_common::PRIORITY_INFORMATIONAL, msg, rule, fields);
}
if (use_file)
{
jmsg["sample"] = m_total_samples;
jmsg[m.source] = m.output_fields;
m_file_output << jmsg.dump() << std::endl;
}
}
catch(const std::exception &e)
{
@@ -153,14 +183,20 @@ void stats_writer::worker() noexcept
}
}
stats_writer::collector::collector(std::shared_ptr<stats_writer> writer)
: m_writer(writer), m_last_tick(0), m_samples(0), m_last_now(0), m_last_n_evts(0), m_last_n_drops(0), m_last_num_evts(0)
stats_writer::collector::collector(const std::shared_ptr<stats_writer>& writer)
: m_writer(writer), m_last_tick(0), m_samples(0),
m_last_now(0), m_last_n_evts(0), m_last_n_drops(0), m_last_num_evts(0)
{
}
std::map<std::string, std::string> stats_writer::collector::get_metrics_output_fields_wrapper(std::shared_ptr<sinsp> inspector, uint64_t now, std::string src, uint64_t num_evts, double stats_snapshot_time_delta_sec)
void stats_writer::collector::get_metrics_output_fields_wrapper(
std::unordered_map<std::string, std::string>& output_fields,
const std::shared_ptr<sinsp>& inspector, uint64_t now,
const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec)
{
std::map<std::string, std::string> output_fields;
static const char* all_driver_engines[] = {
BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE,
SOURCE_PLUGIN_ENGINE, NODRIVER_ENGINE, UDIG_ENGINE, GVISOR_ENGINE };
const scap_agent_info* agent_info = inspector->get_agent_info();
const scap_machine_info* machine_info = inspector->get_machine_info();
@@ -173,23 +209,16 @@ std::map<std::string, std::string> stats_writer::collector::get_metrics_output_f
output_fields["host_boot_ts"] = std::to_string(machine_info->boot_ts_epoch);
output_fields["hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */
output_fields["host_num_cpus"] = std::to_string(machine_info->num_cpus);
if (inspector->check_current_engine(BPF_ENGINE))
{
output_fields["driver"] = "bpf";
}
else if (inspector->check_current_engine(MODERN_BPF_ENGINE))
{
output_fields["driver"] = "modern_bpf";
}
else if (inspector->check_current_engine(KMOD_ENGINE))
{
output_fields["driver"] = "kmod";
}
else
{
output_fields["driver"] = "no_driver";
}
output_fields["src"] = src;
for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++)
{
if (inspector->check_current_engine(all_driver_engines[i]))
{
output_fields["driver"] = all_driver_engines[i];
break;
}
}
/* Falco userspace event counters. Always enabled. */
if (m_last_num_evts != 0 && stats_snapshot_time_delta_sec > 0)
@@ -200,12 +229,12 @@ std::map<std::string, std::string> stats_writer::collector::get_metrics_output_f
output_fields["falco_num_evts"] = std::to_string(num_evts);
output_fields["falco_num_evts_prev"] = std::to_string(m_last_num_evts);
m_last_num_evts = num_evts;
return output_fields;
}
std::map<std::string, std::string> stats_writer::collector::get_metrics_output_fields_additional(std::shared_ptr<sinsp> inspector, std::map<std::string, std::string> output_fields, double stats_snapshot_time_delta_sec, std::string src)
void stats_writer::collector::get_metrics_output_fields_additional(
std::unordered_map<std::string, std::string>& output_fields,
const std::shared_ptr<sinsp>& inspector,
double stats_snapshot_time_delta_sec, const std::string& src)
{
const scap_agent_info* agent_info = inspector->get_agent_info();
const scap_machine_info* machine_info = inspector->get_machine_info();
@@ -258,7 +287,7 @@ std::map<std::string, std::string> stats_writer::collector::get_metrics_output_f
if (src != falco_common::syscall_source)
{
return output_fields;
return;
}
/* Kernel side stats counters and libbpf stats if applicable. */
@@ -312,19 +341,19 @@ std::map<std::string, std::string> stats_writer::collector::get_metrics_output_f
}
}
#endif
return output_fields;
}
void stats_writer::collector::collect(std::shared_ptr<sinsp> inspector, const std::string &src, uint64_t num_evts)
void stats_writer::collector::collect(const std::shared_ptr<sinsp>& inspector, const std::string &src, uint64_t num_evts)
{
if (m_writer->m_config->m_metrics_enabled || m_writer->has_output())
if (m_writer->has_output())
{
/* Collect stats / metrics once per ticker period. */
auto tick = stats_writer::get_ticker();
if (tick != m_last_tick)
{
auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
m_last_tick = tick;
auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t stats_snapshot_time_delta = 0;
if (m_last_now != 0)
{
@@ -334,23 +363,15 @@ void stats_writer::collector::collect(std::shared_ptr<sinsp> inspector, const st
double stats_snapshot_time_delta_sec = (stats_snapshot_time_delta / (double)ONE_SECOND_IN_NS);
/* Get respective metrics output_fields. */
std::map<std::string, std::string> output_fields = stats_writer::collector::get_metrics_output_fields_wrapper(inspector, now, src, num_evts, stats_snapshot_time_delta_sec);
output_fields = stats_writer::collector::get_metrics_output_fields_additional(inspector, output_fields, stats_snapshot_time_delta_sec, src);
std::unordered_map<std::string, std::string> output_fields;
get_metrics_output_fields_wrapper(output_fields, inspector, now, src, num_evts, stats_snapshot_time_delta_sec);
get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src);
/* Pipe to respective output. */
if (m_writer->m_config->m_metrics_enabled && m_writer->m_config->m_metrics_stats_rule_enabled && m_writer->m_outputs)
{
std::string rule = "Falco internal: resource utilization stats metrics";
std::string msg = "";
m_writer->m_outputs->handle_msg(now, falco_common::PRIORITY_INFORMATIONAL, msg, rule, output_fields);
}
if (m_writer->has_output())
{
stats_writer::msg msg;
msg.output_fields = output_fields;
m_writer->push(msg);
}
m_last_tick = tick;
/* Send message in the queue */
stats_writer::msg msg;
msg.source = src;
msg.output_fields = std::move(output_fields);
m_writer->push(msg);
}
}
}

View File

@@ -18,7 +18,7 @@ limitations under the License.
#include <fstream>
#include <string>
#include <map>
#include <unordered_map>
#include <sinsp.h>
@@ -54,25 +54,26 @@ public:
/*!
\brief Initializes the collector with the given writer
*/
explicit collector(std::shared_ptr<stats_writer> writer);
explicit collector(const std::shared_ptr<stats_writer>& writer);
/*!
\brief Collects one stats sample from an inspector
and for the given event source name
*/
void collect(std::shared_ptr<sinsp> inspector, const std::string& src, uint64_t num_evts);
void collect(const std::shared_ptr<sinsp>& inspector, const std::string& src, uint64_t num_evts);
private:
/*!
\brief Collect snapshot metrics wrapper fields as internal rule formatted output fields.
*/
std::map<std::string, std::string> get_metrics_output_fields_wrapper(std::shared_ptr<sinsp> inspector, uint64_t now, std::string src, uint64_t num_evts, double stats_snapshot_time_delta_sec);
void get_metrics_output_fields_wrapper(std::unordered_map<std::string, std::string>& output_fields, const std::shared_ptr<sinsp>& inspector, uint64_t now, const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec);
/*!
\brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields.
*/
std::map<std::string, std::string> get_metrics_output_fields_additional(std::shared_ptr<sinsp> inspector, std::map<std::string, std::string> output_fields, double stats_snapshot_time_delta_sec, std::string src);
void get_metrics_output_fields_additional(std::unordered_map<std::string, std::string>& output_fields, const std::shared_ptr<sinsp>& inspector, double stats_snapshot_time_delta_sec, const std::string& src);
private:
std::shared_ptr<stats_writer> m_writer;
stats_writer::ticker_t m_last_tick;
uint64_t m_samples;
@@ -94,20 +95,18 @@ public:
~stats_writer();
/*!
\brief Initializes a writer without file output.
\brief Initializes a writer.
*/
stats_writer(std::shared_ptr<falco_outputs> outputs, std::shared_ptr<falco_configuration> config);
stats_writer(const std::shared_ptr<falco_outputs>& outputs,
const std::shared_ptr<const falco_configuration>& config);
/*!
\brief Initializes a writer that can print to a file at the given filename.
With this constructor, has_output() always returns true
\brief Returns true if the writer is configured with a valid output.
*/
explicit stats_writer(const std::string &filename, std::shared_ptr<falco_outputs> outputs, std::shared_ptr<falco_configuration> config);
/*!
\brief Returns true if the writer is configured with a valid file output
*/
inline bool has_output() const;
inline bool has_output() const
{
return m_initialized;
}
/*!
\brief Initializes the ticker with a given interval period defined
@@ -133,10 +132,8 @@ private:
msg& operator = (const msg&) = default;
bool stop;
scap_stats delta;
scap_stats stats;
std::string source;
std::map<std::string, std::string> output_fields;
std::unordered_map<std::string, std::string> output_fields;
};
void worker() noexcept;
@@ -146,10 +143,10 @@ private:
bool m_initialized;
uint64_t m_total_samples;
std::thread m_worker;
std::ofstream m_output;
std::ofstream m_file_output;
tbb::concurrent_bounded_queue<stats_writer::msg> m_queue;
std::shared_ptr<falco_outputs> m_outputs;
std::shared_ptr<falco_configuration> m_config;
std::shared_ptr<const falco_configuration> m_config;
// note: in this way, only collectors can push into the queue
friend class stats_writer::collector;