diff --git a/userspace/falco/statsfilewriter.cpp b/userspace/falco/statsfilewriter.cpp index bf4d20ef..7da3a810 100644 --- a/userspace/falco/statsfilewriter.cpp +++ b/userspace/falco/statsfilewriter.cpp @@ -17,44 +17,33 @@ limitations under the License. #include #include #include +#include #include "statsfilewriter.h" +#include "logger.h" #include "banned.h" // This raises a compilation error when certain functions are used #include "logger.h" -using namespace std; +// note: uint16_t is enough because we don't care about +// overflows here. Threads calling stats_writer::handle() will just +// check that this value changed since their last observation. +static std::atomic s_last_tick((uint16_t) 0); -static bool g_save_stats = false; -static void timer_handler (int signum) +static void timer_handler(int signum) { - g_save_stats = true; + s_last_tick.fetch_add(1, std::memory_order_relaxed); } -StatsFileWriter::StatsFileWriter() - : m_num_stats(0) -{ -} - -StatsFileWriter::~StatsFileWriter() -{ - m_output.close(); -} - -bool StatsFileWriter::init(std::shared_ptr inspector, string &filename, uint32_t interval_msec, string &errstr) +bool stats_writer::set_timer(uint32_t interval_msec, string &err) { struct itimerval timer; struct sigaction handler; - m_inspector = inspector; - - m_output.exceptions ( ofstream::failbit | ofstream::badbit ); - m_output.open(filename, ios_base::app); - memset (&handler, 0, sizeof (handler)); handler.sa_handler = &timer_handler; if (sigaction(SIGALRM, &handler, NULL) == -1) { - errstr = string("Could not set up signal handler for periodic timer: ") + strerror(errno); + err = string("Could not set up signal handler for periodic timer: ") + strerror(errno); return false; } @@ -63,34 +52,99 @@ bool StatsFileWriter::init(std::shared_ptr inspector, string &filename, u timer.it_interval = timer.it_value; if (setitimer(ITIMER_REAL, &timer, NULL) == -1) { - errstr = string("Could not set up periodic timer: ") + strerror(errno); + err = string("Could not set up periodic timer: ") + strerror(errno); return false; } return true; } -void StatsFileWriter::handle() +stats_writer::stats_writer() + : m_initialized(false), m_total_samples(0) { - if (g_save_stats) + // the stats writter do nothing +} + +stats_writer::stats_writer(const std::string &filename) + : m_initialized(true), m_total_samples(0) +{ + m_output.exceptions(ofstream::failbit | ofstream::badbit); + m_output.open(filename, ios_base::app); + m_worker = std::thread(&stats_writer::worker, this); +} + +stats_writer::~stats_writer() +{ + if (m_initialized) { - scap_stats cstats; - scap_stats delta; - nlohmann::json jmsg; + stop_worker(); + m_output.close(); + } +} - g_save_stats = false; - m_num_stats++; - m_inspector->get_capture_stats(&cstats); - - if(m_num_stats == 1) +void stats_writer::handle(const std::shared_ptr& inspector, stats_writer::state& s) +{ + if (m_initialized) + { + auto tick = s_last_tick.load(std::memory_order_relaxed); + if (tick != s.last_tick) { - delta = cstats; + // gather stats sample and fill-up message + stats_writer::msg msg; + msg.stop = false; + inspector->get_capture_stats(&msg.stats); + if(s.samples == 1) + { + msg.delta = msg.stats; + } + else + { + msg.delta.n_evts = msg.stats.n_evts - s.last_stats.n_evts; + msg.delta.n_drops = msg.stats.n_drops - s.last_stats.n_drops; + msg.delta.n_preemptions = msg.stats.n_preemptions - s.last_stats.n_preemptions; + } + + // update state + s.samples++; + s.last_tick = tick; + s.last_stats = msg.stats; + + // push message into the queue + push(msg); } - else + } +} + +void stats_writer::stop_worker() +{ + stats_writer::msg msg; + msg.stop = true; + push(msg); + if(m_worker.joinable()) + { + m_worker.join(); + } +} + +inline void stats_writer::push(const stats_writer::msg& m) +{ + if (!m_queue.try_push(m)) + { + fprintf(stderr, "Fatal error: Stats queue reached maximum capacity. Exiting.\n"); + exit(EXIT_FAILURE); + } +} + +void stats_writer::worker() noexcept +{ + stats_writer::msg m; + while(true) + { + // Block until a message becomes available. + m_queue.pop(m); + if (m.stop) { - delta.n_evts = cstats.n_evts - m_last_stats.n_evts; - delta.n_drops = cstats.n_drops - m_last_stats.n_drops; - delta.n_preemptions = cstats.n_preemptions - m_last_stats.n_preemptions; + return; } try @@ -105,12 +159,10 @@ void StatsFileWriter::handle() jmsg["delta"]["preemptions"] = delta.n_preemptions; jmsg["delta"]["drop_pct"] = (delta.n_evts == 0 ? 0 : (100.0*delta.n_drops/delta.n_evts)); m_output << jmsg.dump() << endl; - } + } catch(const exception &e) { - falco_logger::log(LOG_ERR, "StatsFileWriter (handle): " + string(e.what()) + "\n"); + falco_logger::log(LOG_ERR, "stats_writer (worker): " + string(e.what()) + "\n"); } - - m_last_stats = cstats; } } diff --git a/userspace/falco/statsfilewriter.h b/userspace/falco/statsfilewriter.h index f3a4a3c0..7eee6458 100644 --- a/userspace/falco/statsfilewriter.h +++ b/userspace/falco/statsfilewriter.h @@ -27,27 +27,6 @@ limitations under the License. // Periodically collects scap stats files and writes them to a file as // json. -class StatsFileWriter { -public: - StatsFileWriter(); - virtual ~StatsFileWriter(); - - // Returns success as bool. On false fills in errstr. - bool init(std::shared_ptr inspector, std::string &filename, - uint32_t interval_msec, - string &errstr); - - // Should be called often (like for each event in a sinsp - // loop). - void handle(); - -protected: - uint32_t m_num_stats; - std::shared_ptr m_inspector; - std::ofstream m_output; - scap_stats m_last_stats; -}; - class stats_writer { public: @@ -63,13 +42,13 @@ public: stats_writer(); explicit stats_writer(const std::string &filename); ~stats_writer(); - + + void handle(const std::shared_ptr& inspector, stats_writer::state& s); + static bool set_timer(uint32_t interval_msec, std::string &err); - void handle(const std::shared_ptr& inspector, state& s); - private: - struct worker_msg + struct msg { bool stop; scap_stats delta; @@ -78,11 +57,11 @@ private: void worker() noexcept; void stop_worker(); - inline void push(const worker_msg& m); + inline void push(const stats_writer::msg& m); bool m_initialized; uint64_t m_total_samples; std::thread m_worker; std::ofstream m_output; - tbb::concurrent_bounded_queue m_queue; + tbb::concurrent_bounded_queue m_queue; }; \ No newline at end of file