diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 138ab893..3e4071bd 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -51,9 +51,11 @@ falco_outputs::~falco_outputs() { if(m_initialized) { - for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) + this->cleanup_outputs(); + this->stop_worker(); + if(m_worker_thread.joinable()) { - (*it)->cleanup(); + m_worker_thread.join(); } } } @@ -77,9 +79,12 @@ void falco_outputs::init(bool json_output, m_time_format_iso_8601 = time_format_iso_8601; m_hostname = hostname; + m_worker_thread = std::thread(&falco_outputs::worker, this); + m_initialized = true; } +// todo(leogr): when the worker has started, adding an outputs is not thread-safe void falco_outputs::add_output(falco::outputs::config oc) { @@ -129,6 +134,12 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source, return; } + falco_outputs::ctrl_msg cmsg = {}; + cmsg.ts = evt->get_ts(); + cmsg.priority = priority; + cmsg.source = source; + cmsg.rule = rule; + string sformat; if(source == "syscall") { @@ -163,35 +174,38 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source, sformat += " " + format; } - string msg; - msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat); + cmsg.msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat); + cmsg.fields = falco_formats::resolve_tokens(evt, source, sformat); - for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) - { - (*it)->output_event(evt, rule, source, priority, sformat, msg); - } + cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT; + m_queue.push(cmsg); } -void falco_outputs::handle_msg(uint64_t now, +void falco_outputs::handle_msg(uint64_t ts, falco_common::priority_type priority, std::string &msg, std::string &rule, std::map &output_fields) { - std::string full_msg; + falco_outputs::ctrl_msg cmsg = {}; + cmsg.ts = ts; + cmsg.priority = priority; + cmsg.source = ""; + cmsg.rule = rule; + cmsg.fields = output_fields; if(m_json_output) { nlohmann::json jmsg; // Convert the time-as-nanoseconds to a more json-friendly ISO8601. - time_t evttime = now / 1000000000; + time_t evttime = ts / 1000000000; char time_sec[20]; // sizeof "YYYY-MM-DDTHH:MM:SS" char time_ns[12]; // sizeof ".sssssssssZ" string iso8601evttime; strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime(&evttime)); - snprintf(time_ns, sizeof(time_ns), ".%09luZ", now % 1000000000); + snprintf(time_ns, sizeof(time_ns), ".%09luZ", ts % 1000000000); iso8601evttime = time_sec; iso8601evttime += time_ns; @@ -201,15 +215,15 @@ void falco_outputs::handle_msg(uint64_t now, jmsg["time"] = iso8601evttime; jmsg["output_fields"] = output_fields; - full_msg = jmsg.dump(); + cmsg.msg = jmsg.dump(); } else { std::string timestr; bool first = true; - sinsp_utils::ts_to_string(now, ×tr, false, true); - full_msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " ("; + sinsp_utils::ts_to_string(ts, ×tr, false, true); + cmsg.msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " ("; for(auto &pair : output_fields) { if(first) @@ -218,23 +232,71 @@ void falco_outputs::handle_msg(uint64_t now, } else { - full_msg += " "; + cmsg.msg += " "; } - full_msg += pair.first + "=" + pair.second; + cmsg.msg += pair.first + "=" + pair.second; } - full_msg += ")"; + cmsg.msg += ")"; } - for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) - { - (*it)->output_msg(priority, full_msg); - } + cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT; + m_queue.push(cmsg); +} + +void falco_outputs::cleanup_outputs() +{ + this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP); } void falco_outputs::reopen_outputs() { - for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) + this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN); +} + +void falco_outputs::stop_worker() +{ + this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP); +} + +inline void falco_outputs::push(ctrl_msg_type cmt) +{ + falco_outputs::ctrl_msg cmsg = {}; + cmsg.type = cmt; + m_queue.push(cmsg); +} + +void falco_outputs::worker() +{ + falco_outputs::ctrl_msg cmsg; + while(true) { - (*it)->reopen(); + // Block until a message becomes available. + m_queue.pop(cmsg); + switch(cmsg.type) + { + case ctrl_msg_type::CTRL_MSG_OUTPUT: + for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) + { + (*it)->output(&cmsg); + } + break; + case ctrl_msg_type::CTRL_MSG_CLEANUP: + for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) + { + (*it)->cleanup(); + } + break; + case ctrl_msg_type::CTRL_MSG_REOPEN: + for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) + { + (*it)->reopen(); + } + break; + case ctrl_msg_type::CTRL_MSG_STOP: + return; + + default: + break; + } } } diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 222a8fbd..70e44195 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -25,6 +25,7 @@ limitations under the License. #include "token_bucket.h" #include "falco_engine.h" #include "outputs.h" +#include "tbb/concurrent_queue.h" // // This class acts as the primary interface between a program and the @@ -44,20 +45,19 @@ public: void add_output(falco::outputs::config oc); - // - // evt is an event that has matched some rule. Pass the event - // to all configured outputs. - // + // Format then send the event to all configured outputs (`evt` is an event that has matched some rule). void handle_event(gen_event *evt, std::string &rule, std::string &source, falco_common::priority_type priority, std::string &format); - // Send a generic message to all outputs. Not necessarily associated with any event. + // Format then send a generic message to all outputs. Not necessarily associated with any event. void handle_msg(uint64_t now, falco_common::priority_type priority, std::string &msg, std::string &rule, std::map &output_fields); + void cleanup_outputs(); + void reopen_outputs(); private: @@ -72,4 +72,26 @@ private: bool m_json_output; bool m_time_format_iso_8601; std::string m_hostname; + + enum ctrl_msg_type + { + CTRL_MSG_STOP = 0, + CTRL_MSG_OUTPUT = 1, + CTRL_MSG_CLEANUP = 2, + CTRL_MSG_REOPEN = 3, + }; + + struct ctrl_msg : falco::outputs::message + { + ctrl_msg_type type; + }; + + typedef tbb::concurrent_bounded_queue falco_outputs_cbq; + + falco_outputs_cbq m_queue; + + std::thread m_worker_thread; + inline void push(ctrl_msg_type cmt); + void worker(); + void stop_worker(); };