diff --git a/userspace/engine/falco_common.cpp b/userspace/engine/falco_common.cpp index 92fddf12..d94306aa 100644 --- a/userspace/engine/falco_common.cpp +++ b/userspace/engine/falco_common.cpp @@ -27,18 +27,16 @@ static std::vector priority_names = { "Debug" }; -<<<<<<< HEAD static std::vector rule_matching_names = { "first", "all" }; -======= + static std::vector outputs_recovery_names = { "continue", "exit", "empty", - }; ->>>>>>> 92bd5767 (cleanup(outputs): adopt different style for outputs_queue params encodings) +}; bool falco_common::parse_priority(std::string v, priority_type& out) { diff --git a/userspace/falco/app/actions/process_events.cpp b/userspace/falco/app/actions/process_events.cpp index 50ac60d7..2d92ff89 100644 --- a/userspace/falco/app/actions/process_events.cpp +++ b/userspace/falco/app/actions/process_events.cpp @@ -281,7 +281,7 @@ static falco::app::run_result do_inspect( } // for capture mode, the source name can change at every event - stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], num_evts); + stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], s.outputs, num_evts); } else { @@ -300,7 +300,7 @@ static falco::app::run_result do_inspect( } // for live mode, the source name is constant - stats_collector.collect(inspector, source, num_evts); + stats_collector.collect(inspector, source, s.outputs, num_evts); } // Reset the timeouts counter, Falco successfully got an event to process diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index f2e6d6aa..af2c642a 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -72,6 +72,7 @@ falco_outputs::falco_outputs( m_queue.set_capacity(outputs_queue_capacity); } m_recovery = outputs_queue_recovery; + m_outputs_queue_num_drops = 0UL; #endif } @@ -294,10 +295,12 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); exit(EXIT_FAILURE); case falco_common::RECOVERY_EMPTY: + m_outputs_queue_num_drops += m_queue.size(); fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); m_queue.empty(); break; default: + m_outputs_queue_num_drops++; fprintf(stderr, "Output queue out of memory. Continue on ... \n"); break; } @@ -364,3 +367,8 @@ inline void falco_outputs::process_msg(falco::outputs::abstract_output* o, const falco_logger::log(LOG_DEBUG, "Outputs worker received an unknown message type\n"); } } + +uint64_t falco_outputs::get_outputs_queue_num_drops() +{ + return m_outputs_queue_num_drops; +} diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index a60807a0..cf6efc33 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -85,6 +85,12 @@ public: */ void reopen_outputs(); + /*! + \brief Return the number of currently dropped events as a result of failed push attempts + into the outputs queue when using `continue` or `empty` recovery strategies. + */ + uint64_t get_outputs_queue_num_drops(); + private: std::unique_ptr m_formats; @@ -113,6 +119,7 @@ private: typedef tbb::concurrent_bounded_queue falco_outputs_cbq; falco_outputs_cbq m_queue; uint32_t m_recovery; + uint64_t m_outputs_queue_num_drops; #endif std::thread m_worker_thread; diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index 43b41ccd..0f3b73ea 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -223,7 +223,7 @@ stats_writer::collector::collector(const std::shared_ptr& writer) void stats_writer::collector::get_metrics_output_fields_wrapper( nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, - const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec) + const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec) { static const char* all_driver_engines[] = { BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE, @@ -240,6 +240,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper( output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch; output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */ output_fields["falco.host_num_cpus"] = machine_info->num_cpus; + output_fields["falco.outputs_queue_num_drops"] = outputs_queue_num_drops; output_fields["evt.source"] = src; for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++) @@ -424,7 +425,7 @@ void stats_writer::collector::get_metrics_output_fields_additional( #endif } -void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, uint64_t num_evts) +void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, const std::shared_ptr& outputs, uint64_t num_evts) { if (m_writer->has_output()) { @@ -445,7 +446,8 @@ void stats_writer::collector::collect(const std::shared_ptr& inspector, c /* Get respective metrics output_fields. */ nlohmann::json output_fields; - get_metrics_output_fields_wrapper(output_fields, inspector, now, src, num_evts, stats_snapshot_time_delta_sec); + uint64_t outputs_queue_num_drops = outputs->get_outputs_queue_num_drops(); + get_metrics_output_fields_wrapper(output_fields, inspector, now, src, outputs_queue_num_drops, num_evts, stats_snapshot_time_delta_sec); get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src); /* Send message in the queue */ diff --git a/userspace/falco/stats_writer.h b/userspace/falco/stats_writer.h index 42a183e7..4c68d243 100644 --- a/userspace/falco/stats_writer.h +++ b/userspace/falco/stats_writer.h @@ -60,13 +60,13 @@ public: \brief Collects one stats sample from an inspector and for the given event source name */ - void collect(const std::shared_ptr& inspector, const std::string& src, uint64_t num_evts); + void collect(const std::shared_ptr& inspector, const std::string& src, const std::shared_ptr& outputs, uint64_t num_evts); private: /*! \brief Collect snapshot metrics wrapper fields as internal rule formatted output fields. */ - void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec); + void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec); /*! \brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields.