Merge pull request #285 from draios/add-unbuffered-output

Add ability to make outputs unbuffered
This commit is contained in:
Mark Stemm 2017-10-06 21:51:53 -07:00 committed by GitHub
commit 3a60caa9ed
7 changed files with 85 additions and 15 deletions

View File

@ -31,6 +31,10 @@ log_level: info
# "info", "debug". # "info", "debug".
priority: debug priority: debug
# Whether or not output to any of the output channels below is
# buffered. Defaults to true
buffered_outputs: true
# A throttling mechanism implemented as a token bucket limits the # A throttling mechanism implemented as a token bucket limits the
# rate of falco notifications. This throttling is controlled by the following configuration # rate of falco notifications. This throttling is controlled by the following configuration
# options: # options:

View File

@ -22,7 +22,8 @@ along with falco. If not, see <http://www.gnu.org/licenses/>.
using namespace std; using namespace std;
falco_configuration::falco_configuration() falco_configuration::falco_configuration()
: m_config(NULL) : m_buffered_outputs(true),
m_config(NULL)
{ {
} }
@ -142,6 +143,8 @@ void falco_configuration::init(string conf_filename, list<string> &cmdline_optio
} }
m_min_priority = (falco_common::priority_type) (it - falco_common::priority_names.begin()); m_min_priority = (falco_common::priority_type) (it - falco_common::priority_names.begin());
m_buffered_outputs = m_config->get_scalar<bool>("buffered_outputs", true);
falco_logger::log_stderr = m_config->get_scalar<bool>("log_stderr", false); falco_logger::log_stderr = m_config->get_scalar<bool>("log_stderr", false);
falco_logger::log_syslog = m_config->get_scalar<bool>("log_syslog", true); falco_logger::log_syslog = m_config->get_scalar<bool>("log_syslog", true);
} }

View File

@ -172,6 +172,8 @@ class falco_configuration
uint32_t m_notifications_max_burst; uint32_t m_notifications_max_burst;
falco_common::priority_type m_min_priority; falco_common::priority_type m_min_priority;
bool m_buffered_outputs;
private: private:
void init_cmdline_options(std::list<std::string> &cmdline_options); void init_cmdline_options(std::list<std::string> &cmdline_options);

View File

@ -107,6 +107,10 @@ static void usage()
" Can not be specified with -t.\n" " Can not be specified with -t.\n"
" -t <tag> Only run those rules with a tag=<tag>. Can be specified multiple times.\n" " -t <tag> Only run those rules with a tag=<tag>. Can be specified multiple times.\n"
" Can not be specified with -T/-D.\n" " Can not be specified with -T/-D.\n"
" -U,--unbuffered Turn off output buffering to configured outputs. This causes every\n"
" single line emitted by falco to be flushed, which generates higher CPU\n"
" usage but is useful when piping those outputs into another process\n"
" or into a script.\n"
" -v Verbose output.\n" " -v Verbose output.\n"
" --version Print version number.\n" " --version Print version number.\n"
"\n" "\n"
@ -256,6 +260,8 @@ int falco_init(int argc, char **argv)
int file_limit = 0; int file_limit = 0;
unsigned long event_limit = 0L; unsigned long event_limit = 0L;
bool compress = false; bool compress = false;
bool buffered_outputs = true;
bool buffered_cmdline = false;
// Used for stats // Used for stats
uint64_t num_evts; uint64_t num_evts;
@ -272,6 +278,7 @@ int falco_init(int argc, char **argv)
{"option", required_argument, 0, 'o'}, {"option", required_argument, 0, 'o'},
{"print", required_argument, 0, 'p' }, {"print", required_argument, 0, 'p' },
{"pidfile", required_argument, 0, 'P' }, {"pidfile", required_argument, 0, 'P' },
{"unbuffered", no_argument, 0, 'U' },
{"version", no_argument, 0, 0 }, {"version", no_argument, 0, 0 },
{"writefile", required_argument, 0, 'w' }, {"writefile", required_argument, 0, 'w' },
@ -290,7 +297,7 @@ int falco_init(int argc, char **argv)
// Parse the args // Parse the args
// //
while((op = getopt_long(argc, argv, while((op = getopt_long(argc, argv,
"hc:AdD:e:k:K:Ll:m:M:o:P:p:r:s:T:t:vw:", "hc:AdD:e:k:K:Ll:m:M:o:P:p:r:s:T:t:Uvw:",
long_options, &long_index)) != -1) long_options, &long_index)) != -1)
{ {
switch(op) switch(op)
@ -378,6 +385,10 @@ int falco_init(int argc, char **argv)
case 't': case 't':
enabled_rule_tags.insert(optarg); enabled_rule_tags.insert(optarg);
break; break;
case 'U':
buffered_outputs = false;
buffered_cmdline = true;
break;
case 'v': case 'v':
verbose = true; verbose = true;
break; break;
@ -463,6 +474,11 @@ int falco_init(int argc, char **argv)
engine->set_min_priority(config.m_min_priority); engine->set_min_priority(config.m_min_priority);
if(buffered_cmdline)
{
config.m_buffered_outputs = buffered_outputs;
}
for (auto filename : config.m_rules_filenames) for (auto filename : config.m_rules_filenames)
{ {
engine->load_rules_file(filename, verbose, all_events); engine->load_rules_file(filename, verbose, all_events);
@ -503,7 +519,9 @@ int falco_init(int argc, char **argv)
engine->enable_rule_by_tag(enabled_rule_tags, true); engine->enable_rule_by_tag(enabled_rule_tags, true);
} }
outputs->init(config.m_json_output, config.m_notifications_rate, config.m_notifications_max_burst); outputs->init(config.m_json_output,
config.m_notifications_rate, config.m_notifications_max_burst,
config.m_buffered_outputs);
if(!all_events) if(!all_events)
{ {

View File

@ -27,7 +27,8 @@ along with falco. If not, see <http://www.gnu.org/licenses/>.
using namespace std; using namespace std;
falco_outputs::falco_outputs() falco_outputs::falco_outputs()
: m_initialized(false) : m_initialized(false),
m_buffered(true)
{ {
} }
@ -51,7 +52,7 @@ falco_outputs::~falco_outputs()
} }
} }
void falco_outputs::init(bool json_output, uint32_t rate, uint32_t max_burst) void falco_outputs::init(bool json_output, uint32_t rate, uint32_t max_burst, bool buffered)
{ {
// The engine must have been given an inspector by now. // The engine must have been given an inspector by now.
if(! m_inspector) if(! m_inspector)
@ -70,12 +71,14 @@ void falco_outputs::init(bool json_output, uint32_t rate, uint32_t max_burst)
m_notifications_tb.init(rate, max_burst); m_notifications_tb.init(rate, max_burst);
m_buffered = buffered;
m_initialized = true; m_initialized = true;
} }
void falco_outputs::add_output(output_config oc) void falco_outputs::add_output(output_config oc)
{ {
uint8_t nargs = 1; uint8_t nargs = 2;
lua_getglobal(m_ls, m_lua_add_output.c_str()); lua_getglobal(m_ls, m_lua_add_output.c_str());
if(!lua_isfunction(m_ls, -1)) if(!lua_isfunction(m_ls, -1))
@ -83,11 +86,12 @@ void falco_outputs::add_output(output_config oc)
throw falco_exception("No function " + m_lua_add_output + " found. "); throw falco_exception("No function " + m_lua_add_output + " found. ");
} }
lua_pushstring(m_ls, oc.name.c_str()); lua_pushstring(m_ls, oc.name.c_str());
lua_pushnumber(m_ls, (m_buffered ? 1 : 0));
// If we have options, build up a lua table containing them // If we have options, build up a lua table containing them
if (oc.options.size()) if (oc.options.size())
{ {
nargs = 2; nargs = 3;
lua_createtable(m_ls, 0, oc.options.size()); lua_createtable(m_ls, 0, oc.options.size());
for (auto it = oc.options.cbegin(); it != oc.options.cend(); ++it) for (auto it = oc.options.cbegin(); it != oc.options.cend(); ++it)

View File

@ -41,7 +41,7 @@ public:
std::map<std::string, std::string> options; std::map<std::string, std::string> options;
}; };
void init(bool json_output, uint32_t rate, uint32_t max_burst); void init(bool json_output, uint32_t rate, uint32_t max_burst, bool buffered);
void add_output(output_config oc); void add_output(output_config oc);
@ -57,6 +57,8 @@ private:
// Rate limits notifications // Rate limits notifications
token_bucket m_notifications_tb; token_bucket m_notifications_tb;
bool m_buffered;
std::string m_lua_add_output = "add_output"; std::string m_lua_add_output = "add_output";
std::string m_lua_output_event = "output_event"; std::string m_lua_output_event = "output_event";
std::string m_lua_output_cleanup = "output_cleanup"; std::string m_lua_output_cleanup = "output_cleanup";

View File

@ -20,10 +20,17 @@ local mod = {}
local outputs = {} local outputs = {}
function mod.stdout(priority, priority_num, msg) function mod.stdout(priority, priority_num, buffered, msg)
if buffered == 0 then
io.stdout:setvbuf 'no'
end
print (msg) print (msg)
end end
function mod.stdout_cleanup()
io.stdout:flush()
end
function mod.file_validate(options) function mod.file_validate(options)
if (not type(options.filename) == 'string') then if (not type(options.filename) == 'string') then
error("File output needs to be configured with a valid filename") error("File output needs to be configured with a valid filename")
@ -37,10 +44,13 @@ function mod.file_validate(options)
end end
function mod.file(priority, priority_num, msg, options) function mod.file(priority, priority_num, buffered, msg, options)
if options.keep_alive == "true" then if options.keep_alive == "true" then
if file == nil then if file == nil then
file = io.open(options.filename, "a+") file = io.open(options.filename, "a+")
if buffered == 0 then
file:setvbuf 'no'
end
end end
else else
file = io.open(options.filename, "a+") file = io.open(options.filename, "a+")
@ -55,11 +65,22 @@ function mod.file(priority, priority_num, msg, options)
end end
end end
function mod.syslog(priority, priority_num, msg, options) function mod.file_cleanup()
if file ~= nil then
file:flush()
file:close()
file = nil
end
end
function mod.syslog(priority, priority_num, buffered, msg, options)
falco.syslog(priority_num, msg) falco.syslog(priority_num, msg)
end end
function mod.program(priority, priority_num, msg, options) function mod.syslog_cleanup()
end
function mod.program(priority, priority_num, buffered, msg, options)
-- XXX Ideally we'd check that the program ran -- XXX Ideally we'd check that the program ran
-- successfully. However, the luajit we're using returns true even -- successfully. However, the luajit we're using returns true even
-- when the shell can't run the program. -- when the shell can't run the program.
@ -68,6 +89,9 @@ function mod.program(priority, priority_num, msg, options)
if options.keep_alive == "true" then if options.keep_alive == "true" then
if file == nil then if file == nil then
file = io.popen(options.program, "w") file = io.popen(options.program, "w")
if buffered == 0 then
file:setvbuf 'no'
end
end end
else else
file = io.popen(options.program, "w") file = io.popen(options.program, "w")
@ -82,6 +106,14 @@ function mod.program(priority, priority_num, msg, options)
end end
end end
function mod.program_cleanup()
if file ~= nil then
file:flush()
file:close()
file = nil
end
end
function output_event(event, rule, priority, priority_num, format) function output_event(event, rule, priority, priority_num, format)
-- If format starts with a *, remove it, as we're adding our own -- If format starts with a *, remove it, as we're adding our own
-- prefix here. -- prefix here.
@ -94,15 +126,18 @@ function output_event(event, rule, priority, priority_num, format)
msg = formats.format_event(event, rule, priority, format) msg = formats.format_event(event, rule, priority, format)
for index,o in ipairs(outputs) do for index,o in ipairs(outputs) do
o.output(priority, priority_num, msg, o.config) o.output(priority, priority_num, o.buffered, msg, o.config)
end end
end end
function output_cleanup() function output_cleanup()
formats.free_formatters() formats.free_formatters()
for index,o in ipairs(outputs) do
o.cleanup()
end
end end
function add_output(output_name, config) function add_output(output_name, buffered, config)
if not (type(mod[output_name]) == 'function') then if not (type(mod[output_name]) == 'function') then
error("rule_loader.add_output(): invalid output_name: "..output_name) error("rule_loader.add_output(): invalid output_name: "..output_name)
end end
@ -113,7 +148,9 @@ function add_output(output_name, config)
mod[output_name.."_validate"](config) mod[output_name.."_validate"](config)
end end
table.insert(outputs, {output = mod[output_name], config=config}) table.insert(outputs, {output = mod[output_name],
cleanup = mod[output_name.."_cleanup"],
buffered=buffered, config=config})
end end
return mod return mod