feat(userspace/falco): non-blocking outputs

Signed-off-by: Leonardo Grasso <me@leonardograsso.com>
This commit is contained in:
Leonardo Grasso
2020-10-19 14:55:24 +02:00
committed by poiana
parent 8eb7d83ee8
commit 90d71a8e92
2 changed files with 113 additions and 29 deletions

View File

@@ -51,9 +51,11 @@ falco_outputs::~falco_outputs()
{ {
if(m_initialized) if(m_initialized)
{ {
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) this->cleanup_outputs();
this->stop_worker();
if(m_worker_thread.joinable())
{ {
(*it)->cleanup(); m_worker_thread.join();
} }
} }
} }
@@ -77,9 +79,12 @@ void falco_outputs::init(bool json_output,
m_time_format_iso_8601 = time_format_iso_8601; m_time_format_iso_8601 = time_format_iso_8601;
m_hostname = hostname; m_hostname = hostname;
m_worker_thread = std::thread(&falco_outputs::worker, this);
m_initialized = true; m_initialized = true;
} }
// todo(leogr): when the worker has started, adding an outputs is not thread-safe
void falco_outputs::add_output(falco::outputs::config oc) void falco_outputs::add_output(falco::outputs::config oc)
{ {
@@ -129,6 +134,12 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
return; return;
} }
falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = evt->get_ts();
cmsg.priority = priority;
cmsg.source = source;
cmsg.rule = rule;
string sformat; string sformat;
if(source == "syscall") if(source == "syscall")
{ {
@@ -163,35 +174,38 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
sformat += " " + format; sformat += " " + format;
} }
string msg; cmsg.msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat);
msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat); cmsg.fields = falco_formats::resolve_tokens(evt, source, sformat);
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
{ m_queue.push(cmsg);
(*it)->output_event(evt, rule, source, priority, sformat, msg);
}
} }
void falco_outputs::handle_msg(uint64_t now, void falco_outputs::handle_msg(uint64_t ts,
falco_common::priority_type priority, falco_common::priority_type priority,
std::string &msg, std::string &msg,
std::string &rule, std::string &rule,
std::map<std::string, std::string> &output_fields) std::map<std::string, std::string> &output_fields)
{ {
std::string full_msg; falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = ts;
cmsg.priority = priority;
cmsg.source = "";
cmsg.rule = rule;
cmsg.fields = output_fields;
if(m_json_output) if(m_json_output)
{ {
nlohmann::json jmsg; nlohmann::json jmsg;
// Convert the time-as-nanoseconds to a more json-friendly ISO8601. // Convert the time-as-nanoseconds to a more json-friendly ISO8601.
time_t evttime = now / 1000000000; time_t evttime = ts / 1000000000;
char time_sec[20]; // sizeof "YYYY-MM-DDTHH:MM:SS" char time_sec[20]; // sizeof "YYYY-MM-DDTHH:MM:SS"
char time_ns[12]; // sizeof ".sssssssssZ" char time_ns[12]; // sizeof ".sssssssssZ"
string iso8601evttime; string iso8601evttime;
strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime(&evttime)); strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime(&evttime));
snprintf(time_ns, sizeof(time_ns), ".%09luZ", now % 1000000000); snprintf(time_ns, sizeof(time_ns), ".%09luZ", ts % 1000000000);
iso8601evttime = time_sec; iso8601evttime = time_sec;
iso8601evttime += time_ns; iso8601evttime += time_ns;
@@ -201,15 +215,15 @@ void falco_outputs::handle_msg(uint64_t now,
jmsg["time"] = iso8601evttime; jmsg["time"] = iso8601evttime;
jmsg["output_fields"] = output_fields; jmsg["output_fields"] = output_fields;
full_msg = jmsg.dump(); cmsg.msg = jmsg.dump();
} }
else else
{ {
std::string timestr; std::string timestr;
bool first = true; bool first = true;
sinsp_utils::ts_to_string(now, &timestr, false, true); sinsp_utils::ts_to_string(ts, &timestr, false, true);
full_msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " ("; cmsg.msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " (";
for(auto &pair : output_fields) for(auto &pair : output_fields)
{ {
if(first) if(first)
@@ -218,23 +232,71 @@ void falco_outputs::handle_msg(uint64_t now,
} }
else else
{ {
full_msg += " "; cmsg.msg += " ";
} }
full_msg += pair.first + "=" + pair.second; cmsg.msg += pair.first + "=" + pair.second;
} }
full_msg += ")"; cmsg.msg += ")";
} }
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
{ m_queue.push(cmsg);
(*it)->output_msg(priority, full_msg); }
}
void falco_outputs::cleanup_outputs()
{
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP);
} }
void falco_outputs::reopen_outputs() void falco_outputs::reopen_outputs()
{ {
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it) this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN);
}
void falco_outputs::stop_worker()
{
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP);
}
inline void falco_outputs::push(ctrl_msg_type cmt)
{
falco_outputs::ctrl_msg cmsg = {};
cmsg.type = cmt;
m_queue.push(cmsg);
}
void falco_outputs::worker()
{
falco_outputs::ctrl_msg cmsg;
while(true)
{ {
(*it)->reopen(); // Block until a message becomes available.
m_queue.pop(cmsg);
switch(cmsg.type)
{
case ctrl_msg_type::CTRL_MSG_OUTPUT:
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->output(&cmsg);
}
break;
case ctrl_msg_type::CTRL_MSG_CLEANUP:
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->cleanup();
}
break;
case ctrl_msg_type::CTRL_MSG_REOPEN:
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->reopen();
}
break;
case ctrl_msg_type::CTRL_MSG_STOP:
return;
default:
break;
}
} }
} }

View File

@@ -25,6 +25,7 @@ limitations under the License.
#include "token_bucket.h" #include "token_bucket.h"
#include "falco_engine.h" #include "falco_engine.h"
#include "outputs.h" #include "outputs.h"
#include "tbb/concurrent_queue.h"
// //
// This class acts as the primary interface between a program and the // This class acts as the primary interface between a program and the
@@ -44,20 +45,19 @@ public:
void add_output(falco::outputs::config oc); void add_output(falco::outputs::config oc);
// // Format then send the event to all configured outputs (`evt` is an event that has matched some rule).
// evt is an event that has matched some rule. Pass the event
// to all configured outputs.
//
void handle_event(gen_event *evt, std::string &rule, std::string &source, void handle_event(gen_event *evt, std::string &rule, std::string &source,
falco_common::priority_type priority, std::string &format); falco_common::priority_type priority, std::string &format);
// Send a generic message to all outputs. Not necessarily associated with any event. // Format then send a generic message to all outputs. Not necessarily associated with any event.
void handle_msg(uint64_t now, void handle_msg(uint64_t now,
falco_common::priority_type priority, falco_common::priority_type priority,
std::string &msg, std::string &msg,
std::string &rule, std::string &rule,
std::map<std::string, std::string> &output_fields); std::map<std::string, std::string> &output_fields);
void cleanup_outputs();
void reopen_outputs(); void reopen_outputs();
private: private:
@@ -72,4 +72,26 @@ private:
bool m_json_output; bool m_json_output;
bool m_time_format_iso_8601; bool m_time_format_iso_8601;
std::string m_hostname; std::string m_hostname;
enum ctrl_msg_type
{
CTRL_MSG_STOP = 0,
CTRL_MSG_OUTPUT = 1,
CTRL_MSG_CLEANUP = 2,
CTRL_MSG_REOPEN = 3,
};
struct ctrl_msg : falco::outputs::message
{
ctrl_msg_type type;
};
typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq;
falco_outputs_cbq m_queue;
std::thread m_worker_thread;
inline void push(ctrl_msg_type cmt);
void worker();
void stop_worker();
}; };