refactor(userspace/falco): improve design and docs of stats writer

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce 2022-07-01 12:49:27 +00:00 committed by poiana
parent 28ff6ad3bd
commit 2d8efee73e
3 changed files with 128 additions and 52 deletions

View File

@ -45,7 +45,7 @@ application::run_result application::do_inspect(syscall_evt_drop_mgr &sdropmgr,
{ {
int32_t rc; int32_t rc;
sinsp_evt* ev; sinsp_evt* ev;
stats_writer::state stats_state; stats_writer::collector stats_collector(statsw);
uint64_t duration_start = 0; uint64_t duration_start = 0;
uint32_t timeouts_since_last_success_or_msg = 0; uint32_t timeouts_since_last_success_or_msg = 0;
std::size_t source_idx; std::size_t source_idx;
@ -79,7 +79,7 @@ application::run_result application::do_inspect(syscall_evt_drop_mgr &sdropmgr,
rc = m_state->inspector->next(&ev); rc = m_state->inspector->next(&ev);
statsw->handle(m_state->inspector, stats_state); stats_collector.collect(m_state->inspector);
if(m_state->terminate.load(std::memory_order_acquire) if(m_state->terminate.load(std::memory_order_acquire)
|| m_state->restart.load(std::memory_order_acquire)) || m_state->restart.load(std::memory_order_acquire))
@ -204,7 +204,7 @@ application::run_result application::process_events()
if (!m_options.stats_filename.empty()) if (!m_options.stats_filename.empty())
{ {
std::string err; std::string err;
if (!stats_writer::set_timer(m_options.stats_interval, err)) if (!stats_writer::init_ticker(m_options.stats_interval, err))
{ {
return run_result::fatal(err); return run_result::fatal(err);
} }

View File

@ -24,17 +24,17 @@ limitations under the License.
#include "banned.h" // This raises a compilation error when certain functions are used #include "banned.h" // This raises a compilation error when certain functions are used
#include "logger.h" #include "logger.h"
// note: uint16_t is enough because we don't care about // note: ticker_t is an uint16_t, which is enough because we don't care about
// overflows here. Threads calling stats_writer::handle() will just // overflows here. Threads calling stats_writer::handle() will just
// check that this value changed since their last observation. // check that this value changed since their last observation.
static std::atomic<uint16_t> s_last_tick((uint16_t) 0); static std::atomic<stats_writer::ticker_t> s_timer((stats_writer::ticker_t) 0);
static void timer_handler(int signum) static void timer_handler(int signum)
{ {
s_last_tick.fetch_add(1, std::memory_order_relaxed); s_timer.fetch_add(1, std::memory_order_relaxed);
} }
bool stats_writer::set_timer(uint32_t interval_msec, string &err) bool stats_writer::init_ticker(uint32_t interval_msec, string &err)
{ {
struct itimerval timer; struct itimerval timer;
struct sigaction handler; struct sigaction handler;
@ -59,10 +59,15 @@ bool stats_writer::set_timer(uint32_t interval_msec, string &err)
return true; return true;
} }
stats_writer::ticker_t stats_writer::get_ticker()
{
return s_timer.load(std::memory_order_relaxed);
}
stats_writer::stats_writer() stats_writer::stats_writer()
: m_initialized(false), m_total_samples(0) : m_initialized(false), m_total_samples(0)
{ {
// the stats writter do nothing
} }
stats_writer::stats_writer(const std::string &filename) stats_writer::stats_writer(const std::string &filename)
@ -82,37 +87,9 @@ stats_writer::~stats_writer()
} }
} }
void stats_writer::handle(const std::shared_ptr<sinsp>& inspector, stats_writer::state& s) bool stats_writer::has_output() const
{ {
if (m_initialized) return m_initialized;
{
auto tick = s_last_tick.load(std::memory_order_relaxed);
if (tick != s.last_tick)
{
// gather stats sample and fill-up message
stats_writer::msg msg;
msg.stop = false;
inspector->get_capture_stats(&msg.stats);
if(s.samples == 1)
{
msg.delta = msg.stats;
}
else
{
msg.delta.n_evts = msg.stats.n_evts - s.last_stats.n_evts;
msg.delta.n_drops = msg.stats.n_drops - s.last_stats.n_drops;
msg.delta.n_preemptions = msg.stats.n_preemptions - s.last_stats.n_preemptions;
}
// update state
s.samples++;
s.last_tick = tick;
s.last_stats = msg.stats;
// push message into the queue
push(msg);
}
}
} }
void stats_writer::stop_worker() void stats_writer::stop_worker()
@ -140,13 +117,14 @@ void stats_writer::worker() noexcept
stats_writer::msg m; stats_writer::msg m;
while(true) while(true)
{ {
// Block until a message becomes available. // blocks until a message becomes availables
m_queue.pop(m); m_queue.pop(m);
if (m.stop) if (m.stop)
{ {
return; return;
} }
m_total_samples++;
try try
{ {
jmsg["sample"] = m_num_stats; jmsg["sample"] = m_num_stats;
@ -166,3 +144,40 @@ void stats_writer::worker() noexcept
} }
} }
} }
stats_writer::collector::collector(std::shared_ptr<stats_writer> writer)
: m_writer(writer), m_last_tick(0), m_samples(0)
{
}
void stats_writer::collector::collect(std::shared_ptr<sinsp> inspector)
{
// just skip if no output is configured
if (m_writer->has_output())
{
// collect stats once per each ticker period
auto tick = stats_writer::get_ticker();
if (tick != m_last_tick)
{
stats_writer::msg msg;
msg.stop = false;
inspector->get_capture_stats(&msg.stats);
m_samples++;
if(m_samples == 1)
{
msg.delta = msg.stats;
}
else
{
msg.delta.n_evts = msg.stats.n_evts - m_last_stats.n_evts;
msg.delta.n_drops = msg.stats.n_drops - m_last_stats.n_drops;
msg.delta.n_preemptions = msg.stats.n_preemptions - m_last_stats.n_preemptions;
}
m_last_tick = tick;
m_last_stats = msg.stats;
m_writer->push(msg);
}
}
}

View File

@ -24,28 +24,86 @@ limitations under the License.
#include "tbb/concurrent_queue.h" #include "tbb/concurrent_queue.h"
// Periodically collects scap stats files and writes them to a file as /*!
// json. \brief Writes stats samples collected from inspectors into a given output.
Users must use a stats_writer::collector in order to collect and write stats
into a given stats_writer. This class is thread-safe, and can be shared
across multiple stats_writer::collector instances from different threads.
*/
class stats_writer class stats_writer
{ {
public: public:
struct state /*!
{ \brief Value of a ticker that dictates when stats are collected
inline state(): samples(0) { } */
typedef uint16_t ticker_t;
uint64_t samples; /*!
uint16_t last_tick; \brief Collects stats samples from an inspector and uses a writer
scap_stats last_stats; to print them in a given output. Stats are collected periodically every
time the value of stats_writer::get_ticker() changes.
This class is not thread-safe.
*/
class collector
{
public:
/*!
\brief Initializes the collector with the given writer
*/
explicit collector(std::shared_ptr<stats_writer> writer);
/*!
\brief Collects one stats sample from an inspector
*/
void collect(std::shared_ptr<sinsp> inspector);
private:
std::shared_ptr<stats_writer> m_writer;
stats_writer::ticker_t m_last_tick;
uint64_t m_samples;
scap_stats m_last_stats;
}; };
stats_writer(); stats_writer(const stats_writer&) = delete;
explicit stats_writer(const std::string &filename);
stats_writer(stats_writer&&) = delete;
stats_writer& operator=(const stats_writer&) = delete;
stats_writer& operator=(stats_writer&&) = delete;
~stats_writer(); ~stats_writer();
void handle(const std::shared_ptr<sinsp>& inspector, stats_writer::state& s); /*!
\brief Initializes a writer without any output.
With this contructor, has_output() always returns false
*/
stats_writer();
static bool set_timer(uint32_t interval_msec, std::string &err); /*!
\brief Initializes a writer that prints to a file at the given filename.
With this contructor, has_output() always returns true
*/
explicit stats_writer(const std::string &filename);
/*!
\brief Returns true if the writer is configured with a valid output
*/
inline bool has_output() const;
/*!
\brief Initializes the ticker with a given interval period defined
in milliseconds. Subsequent calls to init_ticker will dismiss the
previously-initialized ticker. Internally, this uses a timer
signal handler.
*/
static bool init_ticker(uint32_t interval_msec, std::string &err);
/*!
\brief Returns the current value of the ticker.
This function is thread-safe.
*/
inline static ticker_t get_ticker();
private: private:
struct msg struct msg
@ -64,4 +122,7 @@ private:
std::thread m_worker; std::thread m_worker;
std::ofstream m_output; std::ofstream m_output;
tbb::concurrent_bounded_queue<stats_writer::msg> m_queue; tbb::concurrent_bounded_queue<stats_writer::msg> m_queue;
// note: in this way, only collectors can push into the queue
friend class stats_writer::collector;
}; };