refactor(userspace/falco): falco_outputs

Signed-off-by: Leonardo Grasso <me@leonardograsso.com>
This commit is contained in:
Leonardo Grasso
2020-09-22 17:46:39 +02:00
committed by poiana
parent 99d4a7d5c5
commit 78fa43708b
4 changed files with 102 additions and 249 deletions

View File

@@ -47,6 +47,10 @@ add_executable(
configuration.cpp configuration.cpp
logger.cpp logger.cpp
falco_outputs.cpp falco_outputs.cpp
falco_outputs_file.cpp
falco_outputs_program.cpp
falco_outputs_stdout.cpp
falco_outputs_syslog.cpp
event_drops.cpp event_drops.cpp
statsfilewriter.cpp statsfilewriter.cpp
falco.cpp falco.cpp
@@ -57,6 +61,12 @@ else()
configuration.cpp configuration.cpp
logger.cpp logger.cpp
falco_outputs.cpp falco_outputs.cpp
falco_outputs_file.cpp
falco_outputs_grpc.cpp
falco_outputs_http.cpp
falco_outputs_program.cpp
falco_outputs_stdout.cpp
falco_outputs_syslog.cpp
event_drops.cpp event_drops.cpp
statsfilewriter.cpp statsfilewriter.cpp
falco.cpp falco.cpp

View File

@@ -24,22 +24,20 @@ limitations under the License.
#include "formats.h" #include "formats.h"
#include "logger.h" #include "logger.h"
#include "falco_outputs_file.h"
#include "falco_outputs_program.h"
#include "falco_outputs_stdout.h"
#include "falco_outputs_syslog.h"
#ifndef MINIMAL_BUILD #ifndef MINIMAL_BUILD
#include "falco_outputs_queue.h" #include "falco_outputs_http.h"
#include "falco_outputs_grpc.h"
#endif #endif
#include "banned.h" // This raises a compilation error when certain functions are used #include "banned.h" // This raises a compilation error when certain functions are used
using namespace std; using namespace std;
const static struct luaL_reg ll_falco_outputs [] =
{
#ifndef MINIMAL_BUILD
{"handle_http", &falco_outputs::handle_http},
{"handle_grpc", &falco_outputs::handle_grpc},
#endif
{NULL, NULL}
};
falco_outputs::falco_outputs(falco_engine *engine): falco_outputs::falco_outputs(falco_engine *engine):
m_falco_engine(engine), m_falco_engine(engine),
m_initialized(false), m_initialized(false),
@@ -52,25 +50,12 @@ falco_outputs::falco_outputs(falco_engine *engine):
falco_outputs::~falco_outputs() falco_outputs::~falco_outputs()
{ {
// Note: The assert()s in this destructor were previously places where
// exceptions were thrown. C++11 doesn't allow destructors to
// emit exceptions; if they're thrown, they'll trigger a call
// to 'terminate()'. To maintain similar behavior, the exceptions
// were replace with calls to 'assert()'
if(m_initialized) if(m_initialized)
{ {
lua_getglobal(m_ls, m_lua_output_cleanup.c_str()); // falco_formats::free_formatters(); // todo(leogr): find a better place to free formatters
if(!lua_isfunction(m_ls, -1)) for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{ {
falco_logger::log(LOG_ERR, std::string("No function ") + m_lua_output_cleanup + " found. "); (*it)->cleanup();
assert(nullptr == "Missing lua cleanup function in ~falco_outputs");
}
if(lua_pcall(m_ls, 0, 0, 0) != 0)
{
const char *lerr = lua_tostring(m_ls, -1);
falco_logger::log(LOG_ERR, std::string("lua_pcall failed, err: ") + lerr);
assert(nullptr == "lua_pcall failed in ~falco_outputs");
} }
} }
} }
@@ -79,7 +64,7 @@ void falco_outputs::init(bool json_output,
bool json_include_output_property, bool json_include_output_property,
uint32_t rate, uint32_t max_burst, bool buffered, uint32_t rate, uint32_t max_burst, bool buffered,
bool time_format_iso_8601, string hostname, bool time_format_iso_8601, string hostname,
const string& alternate_lua_dir) const string &alternate_lua_dir)
{ {
// The engine must have been given an inspector by now. // The engine must have been given an inspector by now.
if(!m_inspector) if(!m_inspector)
@@ -89,16 +74,9 @@ void falco_outputs::init(bool json_output,
m_json_output = json_output; m_json_output = json_output;
falco_common::init(m_lua_main_filename.c_str(), alternate_lua_dir.c_str()); // todo(leogr): explain why falco_formats does not need to be initialized here
falco_formats::s_json_output = json_output;
// Note that falco_formats is added to both the lua state used falco_formats::s_json_include_output_property = json_include_output_property;
// by the falco engine as well as the separate lua state used
// by falco outputs.
falco_formats::init(m_inspector, m_falco_engine, m_ls, json_output, json_include_output_property);
falco_logger::init(m_ls);
luaL_openlib(m_ls, "c_outputs", ll_falco_outputs, 0);
m_notifications_tb.init(rate, max_burst); m_notifications_tb.init(rate, max_burst);
@@ -109,40 +87,47 @@ void falco_outputs::init(bool json_output,
m_initialized = true; m_initialized = true;
} }
void falco_outputs::add_output(output_config oc) void falco_outputs::add_output(falco::outputs::output::config oc)
{ {
uint8_t nargs = 3;
lua_getglobal(m_ls, m_lua_add_output.c_str());
if(!lua_isfunction(m_ls, -1)) falco::outputs::output *oo;
if(oc.name == "file")
{ {
throw falco_exception("No function " + m_lua_add_output + " found. "); oo = new falco::outputs::output_file();
} }
lua_pushstring(m_ls, oc.name.c_str()); else if(oc.name == "program")
lua_pushnumber(m_ls, (m_buffered ? 1 : 0));
lua_pushnumber(m_ls, (m_time_format_iso_8601 ? 1 : 0));
// If we have options, build up a lua table containing them
if(oc.options.size())
{ {
nargs = 4; oo = new falco::outputs::output_program();
lua_createtable(m_ls, 0, oc.options.size());
for(auto it = oc.options.cbegin(); it != oc.options.cend(); ++it)
{
lua_pushstring(m_ls, (*it).second.c_str());
lua_setfield(m_ls, -2, (*it).first.c_str());
} }
else if(oc.name == "stdout")
{
oo = new falco::outputs::output_stdout();
}
else if(oc.name == "syslog")
{
oo = new falco::outputs::output_syslog();
}
#ifndef MINIMAL_BUILD
else if(oc.name == "http")
{
oo = new falco::outputs::output_http();
}
else if(oc.name == "grpc")
{
oo = new falco::outputs::output_grpc();
}
#endif
else
{
throw falco_exception("Output not supported: " + oc.name);
} }
if(lua_pcall(m_ls, nargs, 0, 0) != 0) oo->init(oc, m_buffered, m_time_format_iso_8601, m_hostname);
{ m_outputs.push_back(oo);
const char *lerr = lua_tostring(m_ls, -1);
throw falco_exception(string(lerr));
}
} }
void falco_outputs::handle_event(gen_event *ev, string &rule, string &source, void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
falco_common::priority_type priority, string &format) falco_common::priority_type priority, string &format)
{ {
if(!m_notifications_tb.claim()) if(!m_notifications_tb.claim())
@@ -151,29 +136,46 @@ void falco_outputs::handle_event(gen_event *ev, string &rule, string &source,
return; return;
} }
std::lock_guard<std::mutex> guard(m_ls_semaphore); string sformat;
lua_getglobal(m_ls, m_lua_output_event.c_str()); if(source == "syscall")
if(lua_isfunction(m_ls, -1))
{ {
lua_pushlightuserdata(m_ls, ev); if(m_time_format_iso_8601)
lua_pushstring(m_ls, rule.c_str());
lua_pushstring(m_ls, source.c_str());
lua_pushstring(m_ls, falco_common::priority_names[priority].c_str());
lua_pushnumber(m_ls, priority);
lua_pushstring(m_ls, format.c_str());
lua_pushstring(m_ls, m_hostname.c_str());
if(lua_pcall(m_ls, 7, 0, 0) != 0)
{ {
const char *lerr = lua_tostring(m_ls, -1); sformat = "*%evt.time.iso8601: " + falco_common::priority_names[priority];
string err = "Error invoking function output: " + string(lerr); }
throw falco_exception(err); else
{
sformat = "*%evt.time: " + falco_common::priority_names[priority];
} }
} }
else else
{ {
throw falco_exception("No function " + m_lua_output_event + " found in lua compiler module"); if(m_time_format_iso_8601)
{
sformat = "*%jevt.time.iso8601: " + falco_common::priority_names[priority];
}
else
{
sformat = "*%jevt.time: " + falco_common::priority_names[priority];
}
}
// if format starts with a *, remove it, as we added our own prefix
if(format[0] == '*')
{
sformat += " " + format.substr(1, format.length() - 1);
}
else
{
sformat += " " + format;
}
string msg;
msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat);
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->output_event(evt, rule, source, priority, sformat, msg);
} }
} }
@@ -230,151 +232,16 @@ void falco_outputs::handle_msg(uint64_t now,
full_msg += ")"; full_msg += ")";
} }
std::lock_guard<std::mutex> guard(m_ls_semaphore); for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
lua_getglobal(m_ls, m_lua_output_msg.c_str());
if(lua_isfunction(m_ls, -1))
{ {
lua_pushstring(m_ls, full_msg.c_str()); (*it)->output_msg(priority, full_msg);
lua_pushstring(m_ls, falco_common::priority_names[priority].c_str());
lua_pushnumber(m_ls, priority);
if(lua_pcall(m_ls, 3, 0, 0) != 0)
{
const char *lerr = lua_tostring(m_ls, -1);
string err = "Error invoking function output: " + string(lerr);
throw falco_exception(err);
}
}
else
{
throw falco_exception("No function " + m_lua_output_msg + " found in lua compiler module");
} }
} }
void falco_outputs::reopen_outputs() void falco_outputs::reopen_outputs()
{ {
lua_getglobal(m_ls, m_lua_output_reopen.c_str()); for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
if(!lua_isfunction(m_ls, -1))
{ {
throw falco_exception("No function " + m_lua_output_reopen + " found. "); (*it)->reopen();
}
if(lua_pcall(m_ls, 0, 0, 0) != 0)
{
const char *lerr = lua_tostring(m_ls, -1);
throw falco_exception(string(lerr));
} }
} }
#ifndef MINIMAL_BUILD
int falco_outputs::handle_http(lua_State *ls)
{
CURL *curl = NULL;
CURLcode res = CURLE_FAILED_INIT;
struct curl_slist *slist1;
slist1 = NULL;
if(!lua_isstring(ls, -1) ||
!lua_isstring(ls, -2))
{
lua_pushstring(ls, "Invalid arguments passed to handle_http()");
lua_error(ls);
}
string url = (char *)lua_tostring(ls, 1);
string msg = (char *)lua_tostring(ls, 2);
curl = curl_easy_init();
if(curl)
{
slist1 = curl_slist_append(slist1, "Content-Type: application/json");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist1);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, msg.c_str());
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, -1L);
res = curl_easy_perform(curl);
if(res != CURLE_OK)
{
falco_logger::log(LOG_ERR, "libcurl error: " + string(curl_easy_strerror(res)));
}
curl_easy_cleanup(curl);
curl = NULL;
curl_slist_free_all(slist1);
slist1 = NULL;
}
return 1;
}
int falco_outputs::handle_grpc(lua_State *ls)
{
// check parameters
if(!lua_islightuserdata(ls, -8) ||
!lua_isstring(ls, -7) ||
!lua_isstring(ls, -6) ||
!lua_isstring(ls, -5) ||
!lua_isstring(ls, -4) ||
!lua_istable(ls, -3) ||
!lua_isstring(ls, -2) ||
!lua_istable(ls, -1))
{
lua_pushstring(ls, "Invalid arguments passed to handle_grpc()");
lua_error(ls);
}
falco::outputs::response grpc_res;
// time
gen_event *evt = (gen_event *)lua_topointer(ls, 1);
auto timestamp = grpc_res.mutable_time();
*timestamp = google::protobuf::util::TimeUtil::NanosecondsToTimestamp(evt->get_ts());
// rule
auto rule = grpc_res.mutable_rule();
*rule = (char *)lua_tostring(ls, 2);
// source
falco::schema::source s = falco::schema::source::SYSCALL;
string sstr = (char *)lua_tostring(ls, 3);
if(!falco::schema::source_Parse(sstr, &s))
{
lua_pushstring(ls, "Unknown source passed to to handle_grpc()");
lua_error(ls);
}
grpc_res.set_source(s);
// priority
falco::schema::priority p = falco::schema::priority::EMERGENCY;
string pstr = (char *)lua_tostring(ls, 4);
if(!falco::schema::priority_Parse(pstr, &p))
{
lua_pushstring(ls, "Unknown priority passed to to handle_grpc()");
lua_error(ls);
}
grpc_res.set_priority(p);
// output
auto output = grpc_res.mutable_output();
*output = (char *)lua_tostring(ls, 5);
// output fields
auto &fields = *grpc_res.mutable_output_fields();
lua_pushnil(ls); // so that lua_next removes it from stack and puts (k, v) on it
while(lua_next(ls, 6) != 0)
{
fields[lua_tostring(ls, -2)] = lua_tostring(ls, -1);
lua_pop(ls, 1); // remove value, keep key for lua_next
}
lua_pop(ls, 1); // pop table
// hostname
auto host = grpc_res.mutable_hostname();
*host = (char *)lua_tostring(ls, 7);
falco::outputs::queue::get().push(grpc_res);
return 1;
}
#endif

View File

@@ -19,17 +19,12 @@ limitations under the License.
#include <memory> #include <memory>
#include <map> #include <map>
extern "C" {
#include "lua.h"
#include "lualib.h"
#include "lauxlib.h"
}
#include "gen_filter.h" #include "gen_filter.h"
#include "json_evt.h" #include "json_evt.h"
#include "falco_common.h" #include "falco_common.h"
#include "token_bucket.h" #include "token_bucket.h"
#include "falco_engine.h" #include "falco_engine.h"
#include "falco_output.h"
// //
// This class acts as the primary interface between a program and the // This class acts as the primary interface between a program and the
@@ -43,27 +38,19 @@ public:
falco_outputs(falco_engine *engine); falco_outputs(falco_engine *engine);
virtual ~falco_outputs(); virtual ~falco_outputs();
// The way to refer to an output (file, syslog, stdout, etc.)
// An output has a name and set of options.
struct output_config
{
std::string name;
std::map<std::string, std::string> options;
};
void init(bool json_output, void init(bool json_output,
bool json_include_output_property, bool json_include_output_property,
uint32_t rate, uint32_t max_burst, bool buffered, uint32_t rate, uint32_t max_burst, bool buffered,
bool time_format_iso_8601, std::string hostname, bool time_format_iso_8601, std::string hostname,
const std::string& alternate_lua_dir); const std::string& alternate_lua_dir);
void add_output(output_config oc); void add_output(falco::outputs::output::config oc);
// //
// ev is an event that has matched some rule. Pass the event // ev is an event that has matched some rule. Pass the event
// to all configured outputs. // to all configured outputs.
// //
void handle_event(gen_event *ev, std::string &rule, std::string &source, void handle_event(gen_event *evt, std::string &rule, std::string &source,
falco_common::priority_type priority, std::string &format); falco_common::priority_type priority, std::string &format);
// Send a generic message to all outputs. Not necessarily associated with any event. // Send a generic message to all outputs. Not necessarily associated with any event.
@@ -71,21 +58,17 @@ public:
falco_common::priority_type priority, falco_common::priority_type priority,
std::string &msg, std::string &msg,
std::string &rule, std::string &rule,
std::map<std::string,std::string> &output_fields); std::map<std::string, std::string> &output_fields);
void reopen_outputs(); void reopen_outputs();
#ifndef MINIMAL_BUILD
static int handle_http(lua_State *ls);
static int handle_grpc(lua_State *ls);
#endif
private: private:
falco_engine *m_falco_engine; falco_engine *m_falco_engine;
bool m_initialized; bool m_initialized;
std::vector<falco::outputs::output *> m_outputs;
// Rate limits notifications // Rate limits notifications
token_bucket m_notifications_tb; token_bucket m_notifications_tb;
@@ -93,11 +76,4 @@ private:
bool m_json_output; bool m_json_output;
bool m_time_format_iso_8601; bool m_time_format_iso_8601;
std::string m_hostname; std::string m_hostname;
std::string m_lua_add_output = "add_output";
std::string m_lua_output_event = "output_event";
std::string m_lua_output_msg = "output_msg";
std::string m_lua_output_cleanup = "output_cleanup";
std::string m_lua_output_reopen = "output_reopen";
std::string m_lua_main_filename = "output.lua";
}; };

View File

@@ -56,5 +56,5 @@ public:
queue(queue const&) = delete; queue(queue const&) = delete;
void operator=(queue const&) = delete; void operator=(queue const&) = delete;
}; };
} // namespace output } // namespace outputs
} // namespace falco } // namespace falco