new(metrics): add falco.outputs_queue_num_drops metrics + plus fix rebase leftovers

Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
This commit is contained in:
Melissa Kilby 2023-08-25 17:43:55 +00:00 committed by poiana
parent 85883b7200
commit 1e94598eca
6 changed files with 26 additions and 11 deletions

View File

@ -27,18 +27,16 @@ static std::vector<std::string> priority_names = {
"Debug" "Debug"
}; };
<<<<<<< HEAD
static std::vector<std::string> rule_matching_names = { static std::vector<std::string> rule_matching_names = {
"first", "first",
"all" "all"
}; };
=======
static std::vector<std::string> outputs_recovery_names = { static std::vector<std::string> outputs_recovery_names = {
"continue", "continue",
"exit", "exit",
"empty", "empty",
}; };
>>>>>>> 92bd5767 (cleanup(outputs): adopt different style for outputs_queue params encodings)
bool falco_common::parse_priority(std::string v, priority_type& out) bool falco_common::parse_priority(std::string v, priority_type& out)
{ {

View File

@ -281,7 +281,7 @@ static falco::app::run_result do_inspect(
} }
// for capture mode, the source name can change at every event // for capture mode, the source name can change at every event
stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], num_evts); stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], s.outputs, num_evts);
} }
else else
{ {
@ -300,7 +300,7 @@ static falco::app::run_result do_inspect(
} }
// for live mode, the source name is constant // for live mode, the source name is constant
stats_collector.collect(inspector, source, num_evts); stats_collector.collect(inspector, source, s.outputs, num_evts);
} }
// Reset the timeouts counter, Falco successfully got an event to process // Reset the timeouts counter, Falco successfully got an event to process

View File

@ -72,6 +72,7 @@ falco_outputs::falco_outputs(
m_queue.set_capacity(outputs_queue_capacity); m_queue.set_capacity(outputs_queue_capacity);
} }
m_recovery = outputs_queue_recovery; m_recovery = outputs_queue_recovery;
m_outputs_queue_num_drops = 0UL;
#endif #endif
} }
@ -294,10 +295,12 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
case falco_common::RECOVERY_EMPTY: case falco_common::RECOVERY_EMPTY:
m_outputs_queue_num_drops += m_queue.size();
fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n");
m_queue.empty(); m_queue.empty();
break; break;
default: default:
m_outputs_queue_num_drops++;
fprintf(stderr, "Output queue out of memory. Continue on ... \n"); fprintf(stderr, "Output queue out of memory. Continue on ... \n");
break; break;
} }
@ -364,3 +367,8 @@ inline void falco_outputs::process_msg(falco::outputs::abstract_output* o, const
falco_logger::log(LOG_DEBUG, "Outputs worker received an unknown message type\n"); falco_logger::log(LOG_DEBUG, "Outputs worker received an unknown message type\n");
} }
} }
uint64_t falco_outputs::get_outputs_queue_num_drops()
{
return m_outputs_queue_num_drops;
}

View File

@ -85,6 +85,12 @@ public:
*/ */
void reopen_outputs(); void reopen_outputs();
/*!
\brief Return the number of currently dropped events as a result of failed push attempts
into the outputs queue when using `continue` or `empty` recovery strategies.
*/
uint64_t get_outputs_queue_num_drops();
private: private:
std::unique_ptr<falco_formats> m_formats; std::unique_ptr<falco_formats> m_formats;
@ -113,6 +119,7 @@ private:
typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq; typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq;
falco_outputs_cbq m_queue; falco_outputs_cbq m_queue;
uint32_t m_recovery; uint32_t m_recovery;
uint64_t m_outputs_queue_num_drops;
#endif #endif
std::thread m_worker_thread; std::thread m_worker_thread;

View File

@ -223,7 +223,7 @@ stats_writer::collector::collector(const std::shared_ptr<stats_writer>& writer)
void stats_writer::collector::get_metrics_output_fields_wrapper( void stats_writer::collector::get_metrics_output_fields_wrapper(
nlohmann::json& output_fields, nlohmann::json& output_fields,
const std::shared_ptr<sinsp>& inspector, uint64_t now, const std::shared_ptr<sinsp>& inspector, uint64_t now,
const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec) const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec)
{ {
static const char* all_driver_engines[] = { static const char* all_driver_engines[] = {
BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE, BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE,
@ -240,6 +240,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper(
output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch; output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch;
output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */ output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */
output_fields["falco.host_num_cpus"] = machine_info->num_cpus; output_fields["falco.host_num_cpus"] = machine_info->num_cpus;
output_fields["falco.outputs_queue_num_drops"] = outputs_queue_num_drops;
output_fields["evt.source"] = src; output_fields["evt.source"] = src;
for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++) for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++)
@ -424,7 +425,7 @@ void stats_writer::collector::get_metrics_output_fields_additional(
#endif #endif
} }
void stats_writer::collector::collect(const std::shared_ptr<sinsp>& inspector, const std::string &src, uint64_t num_evts) void stats_writer::collector::collect(const std::shared_ptr<sinsp>& inspector, const std::string &src, const std::shared_ptr<falco_outputs>& outputs, uint64_t num_evts)
{ {
if (m_writer->has_output()) if (m_writer->has_output())
{ {
@ -445,7 +446,8 @@ void stats_writer::collector::collect(const std::shared_ptr<sinsp>& inspector, c
/* Get respective metrics output_fields. */ /* Get respective metrics output_fields. */
nlohmann::json output_fields; nlohmann::json output_fields;
get_metrics_output_fields_wrapper(output_fields, inspector, now, src, num_evts, stats_snapshot_time_delta_sec); uint64_t outputs_queue_num_drops = outputs->get_outputs_queue_num_drops();
get_metrics_output_fields_wrapper(output_fields, inspector, now, src, outputs_queue_num_drops, num_evts, stats_snapshot_time_delta_sec);
get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src); get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src);
/* Send message in the queue */ /* Send message in the queue */

View File

@ -60,13 +60,13 @@ public:
\brief Collects one stats sample from an inspector \brief Collects one stats sample from an inspector
and for the given event source name and for the given event source name
*/ */
void collect(const std::shared_ptr<sinsp>& inspector, const std::string& src, uint64_t num_evts); void collect(const std::shared_ptr<sinsp>& inspector, const std::string& src, const std::shared_ptr<falco_outputs>& outputs, uint64_t num_evts);
private: private:
/*! /*!
\brief Collect snapshot metrics wrapper fields as internal rule formatted output fields. \brief Collect snapshot metrics wrapper fields as internal rule formatted output fields.
*/ */
void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr<sinsp>& inspector, uint64_t now, const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec); void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr<sinsp>& inspector, uint64_t now, const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec);
/*! /*!
\brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields. \brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields.