update(userspace/falco)!: adapt stats writer for multiple parallel event sources

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce 2022-08-30 14:01:37 +00:00 committed by poiana
parent b65cc49221
commit 4bc9fc74c8
3 changed files with 57 additions and 35 deletions

View File

@ -60,6 +60,8 @@ application::run_result application::do_inspect(
bool is_capture_mode = source.empty(); bool is_capture_mode = source.empty();
bool syscall_source_engine_idx = m_state->sources.at(falco_common::syscall_source)->engine_idx; bool syscall_source_engine_idx = m_state->sources.at(falco_common::syscall_source)->engine_idx;
std::size_t source_engine_idx = 0; std::size_t source_engine_idx = 0;
std::vector<std::string> source_names = inspector->get_plugin_manager()->sources();
source_names.push_back(falco_common::syscall_source);
if (!is_capture_mode) if (!is_capture_mode)
{ {
source_engine_idx = m_state->sources.at(source)->engine_idx; source_engine_idx = m_state->sources.at(source)->engine_idx;
@ -93,10 +95,32 @@ application::run_result application::do_inspect(
// //
while(1) while(1)
{ {
rc = inspector->next(&ev); rc = inspector->next(&ev);
stats_collector.collect(inspector); // if we are in live mode, we already have the right source engine idx
if (is_capture_mode)
{
source_engine_idx = syscall_source_engine_idx;
if (ev->get_type() == PPME_PLUGINEVENT_E)
{
// note: here we can assume that the source index will be the same
// in both the falco engine and the sinsp plugin manager. See the
// comment in init_falco_engine.cpp for more details.
source_engine_idx = inspector->get_plugin_manager()->source_idx_by_plugin_id(*(int32_t *)ev->get_param(0)->m_val, source_engine_idx_found);
if (!source_engine_idx_found)
{
return run_result::fatal("Unknown plugin ID in inspector: " + std::to_string(*(int32_t *)ev->get_param(0)->m_val));
}
}
// for capture mode, the source name can change at every event
stats_collector.collect(inspector, source_names[source_engine_idx]);
}
else
{
// for live mode, the source name is constant
stats_collector.collect(inspector, source);
}
if(m_state->terminate.load(std::memory_order_acquire) if(m_state->terminate.load(std::memory_order_acquire)
|| m_state->restart.load(std::memory_order_acquire)) || m_state->restart.load(std::memory_order_acquire))
@ -170,23 +194,6 @@ application::run_result application::do_inspect(
continue; continue;
} }
// if we are in live mode, we already have the right source engine idx
if (is_capture_mode)
{
source_engine_idx = syscall_source_engine_idx;
if (ev->get_type() == PPME_PLUGINEVENT_E)
{
// note: here we can assume that the source index will be the same
// in both the falco engine and the sinsp plugin manager. See the
// comment in init_falco_engine.cpp for more details.
source_engine_idx = inspector->get_plugin_manager()->source_idx_by_plugin_id(*(int32_t *)ev->get_param(0)->m_val, source_engine_idx_found);
if (!source_engine_idx_found)
{
return run_result::fatal("Unknown plugin ID in inspector: " + std::to_string(*(int32_t *)ev->get_param(0)->m_val));
}
}
}
// As the inspector has no filter at its level, all // As the inspector has no filter at its level, all
// events are returned here. Pass them to the falco // events are returned here. Pass them to the falco
// engine, which will match the event against the set // engine, which will match the event against the set

View File

@ -21,6 +21,7 @@ limitations under the License.
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include "falco_common.h"
#include "stats_writer.h" #include "stats_writer.h"
#include "logger.h" #include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used #include "banned.h" // This raises a compilation error when certain functions are used
@ -118,6 +119,8 @@ void stats_writer::worker() noexcept
{ {
stats_writer::msg m; stats_writer::msg m;
nlohmann::json jmsg; nlohmann::json jmsg;
auto tick = stats_writer::get_ticker();
auto last_tick = tick;
while(true) while(true)
{ {
@ -128,18 +131,26 @@ void stats_writer::worker() noexcept
return; return;
} }
// update records for this event source
jmsg[m.source]["cur"]["events"] = m.stats.n_evts;
jmsg[m.source]["delta"]["events"] = m.delta.n_evts;
if (m.source == falco_common::syscall_source)
{
jmsg[m.source]["cur"]["drops"] = m.stats.n_drops;
jmsg[m.source]["cur"]["preemptions"] = m.stats.n_preemptions;
jmsg[m.source]["cur"]["drop_pct"] = (m.stats.n_evts == 0 ? 0.0 : (100.0*m.stats.n_drops/m.stats.n_evts));
jmsg[m.source]["delta"]["drops"] = m.delta.n_drops;
jmsg[m.source]["delta"]["preemptions"] = m.delta.n_preemptions;
jmsg[m.source]["delta"]["drop_pct"] = (m.delta.n_evts == 0 ? 0.0 : (100.0*m.delta.n_drops/m.delta.n_evts));
}
tick = stats_writer::get_ticker();
if (last_tick != tick)
{
m_total_samples++; m_total_samples++;
try try
{ {
jmsg["sample"] = m_total_samples; jmsg["sample"] = m_total_samples;
jmsg["cur"]["events"] = m.stats.n_evts;
jmsg["cur"]["drops"] = m.stats.n_drops;
jmsg["cur"]["preemptions"] = m.stats.n_preemptions;
jmsg["cur"]["drop_pct"] = (m.stats.n_evts == 0 ? 0.0 : (100.0*m.stats.n_drops/m.stats.n_evts));
jmsg["delta"]["events"] = m.delta.n_evts;
jmsg["delta"]["drops"] = m.delta.n_drops;
jmsg["delta"]["preemptions"] = m.delta.n_preemptions;
jmsg["delta"]["drop_pct"] = (m.delta.n_evts == 0 ? 0.0 : (100.0*m.delta.n_drops/m.delta.n_evts));
m_output << jmsg.dump() << endl; m_output << jmsg.dump() << endl;
} }
catch(const exception &e) catch(const exception &e)
@ -147,6 +158,7 @@ void stats_writer::worker() noexcept
falco_logger::log(LOG_ERR, "stats_writer (worker): " + string(e.what()) + "\n"); falco_logger::log(LOG_ERR, "stats_writer (worker): " + string(e.what()) + "\n");
} }
} }
}
} }
stats_writer::collector::collector(std::shared_ptr<stats_writer> writer) stats_writer::collector::collector(std::shared_ptr<stats_writer> writer)
@ -155,7 +167,7 @@ stats_writer::collector::collector(std::shared_ptr<stats_writer> writer)
} }
void stats_writer::collector::collect(std::shared_ptr<sinsp> inspector) void stats_writer::collector::collect(std::shared_ptr<sinsp> inspector, const std::string& src)
{ {
// just skip if no output is configured // just skip if no output is configured
if (m_writer->has_output()) if (m_writer->has_output())
@ -166,6 +178,7 @@ void stats_writer::collector::collect(std::shared_ptr<sinsp> inspector)
{ {
stats_writer::msg msg; stats_writer::msg msg;
msg.stop = false; msg.stop = false;
msg.source = src;
inspector->get_capture_stats(&msg.stats); inspector->get_capture_stats(&msg.stats);
m_samples++; m_samples++;
if(m_samples == 1) if(m_samples == 1)

View File

@ -54,8 +54,9 @@ public:
/*! /*!
\brief Collects one stats sample from an inspector \brief Collects one stats sample from an inspector
and for the given event source name
*/ */
void collect(std::shared_ptr<sinsp> inspector); void collect(std::shared_ptr<sinsp> inspector, const std::string& src);
private: private:
std::shared_ptr<stats_writer> m_writer; std::shared_ptr<stats_writer> m_writer;
@ -111,6 +112,7 @@ private:
bool stop; bool stop;
scap_stats delta; scap_stats delta;
scap_stats stats; scap_stats stats;
std::string source;
}; };
void worker() noexcept; void worker() noexcept;