refactor(userspace/falco): re-implement stats writer

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce
2022-07-01 10:26:47 +00:00
committed by poiana
parent c5442ccb41
commit 605dd2816d
2 changed files with 99 additions and 68 deletions

View File

@@ -17,44 +17,33 @@ limitations under the License.
#include <sys/time.h>
#include <signal.h>
#include <nlohmann/json.hpp>
#include <atomic>
#include "statsfilewriter.h"
#include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used
#include "logger.h"
using namespace std;
// note: uint16_t is enough because we don't care about
// overflows here. Threads calling stats_writer::handle() will just
// check that this value changed since their last observation.
static std::atomic<uint16_t> s_last_tick((uint16_t) 0);
static bool g_save_stats = false;
static void timer_handler (int signum)
static void timer_handler(int signum)
{
g_save_stats = true;
s_last_tick.fetch_add(1, std::memory_order_relaxed);
}
StatsFileWriter::StatsFileWriter()
: m_num_stats(0)
{
}
StatsFileWriter::~StatsFileWriter()
{
m_output.close();
}
bool StatsFileWriter::init(std::shared_ptr<sinsp> inspector, string &filename, uint32_t interval_msec, string &errstr)
bool stats_writer::set_timer(uint32_t interval_msec, string &err)
{
struct itimerval timer;
struct sigaction handler;
m_inspector = inspector;
m_output.exceptions ( ofstream::failbit | ofstream::badbit );
m_output.open(filename, ios_base::app);
memset (&handler, 0, sizeof (handler));
handler.sa_handler = &timer_handler;
if (sigaction(SIGALRM, &handler, NULL) == -1)
{
errstr = string("Could not set up signal handler for periodic timer: ") + strerror(errno);
err = string("Could not set up signal handler for periodic timer: ") + strerror(errno);
return false;
}
@@ -63,34 +52,99 @@ bool StatsFileWriter::init(std::shared_ptr<sinsp> inspector, string &filename, u
timer.it_interval = timer.it_value;
if (setitimer(ITIMER_REAL, &timer, NULL) == -1)
{
errstr = string("Could not set up periodic timer: ") + strerror(errno);
err = string("Could not set up periodic timer: ") + strerror(errno);
return false;
}
return true;
}
void StatsFileWriter::handle()
stats_writer::stats_writer()
: m_initialized(false), m_total_samples(0)
{
if (g_save_stats)
// the stats writter do nothing
}
stats_writer::stats_writer(const std::string &filename)
: m_initialized(true), m_total_samples(0)
{
m_output.exceptions(ofstream::failbit | ofstream::badbit);
m_output.open(filename, ios_base::app);
m_worker = std::thread(&stats_writer::worker, this);
}
stats_writer::~stats_writer()
{
if (m_initialized)
{
scap_stats cstats;
scap_stats delta;
nlohmann::json jmsg;
stop_worker();
m_output.close();
}
}
g_save_stats = false;
m_num_stats++;
m_inspector->get_capture_stats(&cstats);
if(m_num_stats == 1)
void stats_writer::handle(const std::shared_ptr<sinsp>& inspector, stats_writer::state& s)
{
if (m_initialized)
{
auto tick = s_last_tick.load(std::memory_order_relaxed);
if (tick != s.last_tick)
{
delta = cstats;
// gather stats sample and fill-up message
stats_writer::msg msg;
msg.stop = false;
inspector->get_capture_stats(&msg.stats);
if(s.samples == 1)
{
msg.delta = msg.stats;
}
else
{
msg.delta.n_evts = msg.stats.n_evts - s.last_stats.n_evts;
msg.delta.n_drops = msg.stats.n_drops - s.last_stats.n_drops;
msg.delta.n_preemptions = msg.stats.n_preemptions - s.last_stats.n_preemptions;
}
// update state
s.samples++;
s.last_tick = tick;
s.last_stats = msg.stats;
// push message into the queue
push(msg);
}
else
}
}
void stats_writer::stop_worker()
{
stats_writer::msg msg;
msg.stop = true;
push(msg);
if(m_worker.joinable())
{
m_worker.join();
}
}
inline void stats_writer::push(const stats_writer::msg& m)
{
if (!m_queue.try_push(m))
{
fprintf(stderr, "Fatal error: Stats queue reached maximum capacity. Exiting.\n");
exit(EXIT_FAILURE);
}
}
void stats_writer::worker() noexcept
{
stats_writer::msg m;
while(true)
{
// Block until a message becomes available.
m_queue.pop(m);
if (m.stop)
{
delta.n_evts = cstats.n_evts - m_last_stats.n_evts;
delta.n_drops = cstats.n_drops - m_last_stats.n_drops;
delta.n_preemptions = cstats.n_preemptions - m_last_stats.n_preemptions;
return;
}
try
@@ -105,12 +159,10 @@ void StatsFileWriter::handle()
jmsg["delta"]["preemptions"] = delta.n_preemptions;
jmsg["delta"]["drop_pct"] = (delta.n_evts == 0 ? 0 : (100.0*delta.n_drops/delta.n_evts));
m_output << jmsg.dump() << endl;
}
}
catch(const exception &e)
{
falco_logger::log(LOG_ERR, "StatsFileWriter (handle): " + string(e.what()) + "\n");
falco_logger::log(LOG_ERR, "stats_writer (worker): " + string(e.what()) + "\n");
}
m_last_stats = cstats;
}
}

View File

@@ -27,27 +27,6 @@ limitations under the License.
// Periodically collects scap stats files and writes them to a file as
// json.
class StatsFileWriter {
public:
StatsFileWriter();
virtual ~StatsFileWriter();
// Returns success as bool. On false fills in errstr.
bool init(std::shared_ptr<sinsp> inspector, std::string &filename,
uint32_t interval_msec,
string &errstr);
// Should be called often (like for each event in a sinsp
// loop).
void handle();
protected:
uint32_t m_num_stats;
std::shared_ptr<sinsp> m_inspector;
std::ofstream m_output;
scap_stats m_last_stats;
};
class stats_writer
{
public:
@@ -63,13 +42,13 @@ public:
stats_writer();
explicit stats_writer(const std::string &filename);
~stats_writer();
void handle(const std::shared_ptr<sinsp>& inspector, stats_writer::state& s);
static bool set_timer(uint32_t interval_msec, std::string &err);
void handle(const std::shared_ptr<sinsp>& inspector, state& s);
private:
struct worker_msg
struct msg
{
bool stop;
scap_stats delta;
@@ -78,11 +57,11 @@ private:
void worker() noexcept;
void stop_worker();
inline void push(const worker_msg& m);
inline void push(const stats_writer::msg& m);
bool m_initialized;
uint64_t m_total_samples;
std::thread m_worker;
std::ofstream m_output;
tbb::concurrent_bounded_queue<worker_msg> m_queue;
tbb::concurrent_bounded_queue<stats_writer::msg> m_queue;
};