fix(userspace/falco): properly terminate threads

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce 2022-10-06 14:21:03 +00:00 committed by poiana
parent 32ec3240b4
commit 59ba2f9aab

View File

@ -284,6 +284,7 @@ static std::shared_ptr<stats_writer> init_stats_writer(const cmdline_options& op
application::run_result application::process_events() application::run_result application::process_events()
{ {
application::run_result res = run_result::ok(); application::run_result res = run_result::ok();
bool termination_forced = false;
// Notify engine that we finished loading and enabling all rules // Notify engine that we finished loading and enabling all rules
m_state->engine->complete_rule_loading(); m_state->engine->complete_rule_loading();
@ -313,49 +314,62 @@ application::run_result application::process_events()
} }
else else
{ {
typedef struct struct live_context
{ {
live_context() = default;
live_context(live_context&&) = default;
live_context& operator = (live_context&&) = default;
live_context(const live_context&) = default;
live_context& operator = (const live_context&) = default;
// the name of the source of which events are processed // the name of the source of which events are processed
std::string source; std::string source;
// the result of the event processing loop // the result of the event processing loop
application::run_result res; application::run_result res;
// if non-null, the thread on which events are processed // if non-null, the thread on which events are processed
std::unique_ptr<std::thread> thread; std::unique_ptr<std::thread> thread;
} live_context; };
// start event processing for all enabled sources // start event processing for all enabled sources
std::vector<live_context> ctxs; std::vector<live_context> ctxs;
ctxs.reserve(m_state->enabled_sources.size()); ctxs.reserve(m_state->enabled_sources.size());
for (auto source : m_state->enabled_sources) for (const auto& source : m_state->enabled_sources)
{ {
auto src_info = m_state->source_infos.at(source);
auto ctx_idx = ctxs.size();
ctxs.emplace_back(); ctxs.emplace_back();
ctxs[ctx_idx].source = source; auto& ctx = ctxs[ctxs.size() - 1];
ctx.source = source;
auto src_info = m_state->source_infos.at(source);
try try
{ {
falco_logger::log(LOG_DEBUG, "Opening event source '" + source + "'\n"); falco_logger::log(LOG_DEBUG, "Opening event source '" + source + "'\n");
res = open_live_inspector(src_info->inspector, source); res = open_live_inspector(src_info->inspector, source);
if (!res.success) if (!res.success)
{ {
return res; // note: we don't return here because we need to reach
// the thread termination loop below to make sure all
// already-spawned threads get terminated gracefully
break;
} }
if (m_state->enabled_sources.size() == 1) if (m_state->enabled_sources.size() == 1)
{ {
// optimization: with only one source we don't spawn additional threads // optimization: with only one source we don't spawn additional threads
process_inspector_events(src_info->inspector, statsw, source, &ctxs[ctx_idx].res); process_inspector_events(src_info->inspector, statsw, source, &ctx.res);
} }
else else
{ {
ctxs[ctx_idx].thread.reset(new std::thread( ctx.thread.reset(new std::thread(
&application::process_inspector_events, &application::process_inspector_events,
this, src_info->inspector, statsw, source, &ctxs[ctx_idx].res)); this, src_info->inspector, statsw, source, &ctx.res));
} }
} }
catch (std::exception &e) catch (std::exception &e)
{ {
ctxs[ctx_idx].res = run_result::fatal(e.what()); // note: we don't return here because we need to reach
// the thread termination loop below to make sure all
// already-spawned threads get terminated gracefully
ctx.res = run_result::fatal(e.what());
break; break;
} }
} }
@ -365,13 +379,12 @@ application::run_result application::process_events()
// to force all other event streams to termiante too. // to force all other event streams to termiante too.
// We accomulate the errors in a single run_result. // We accomulate the errors in a single run_result.
size_t closed_count = 0; size_t closed_count = 0;
bool forced_termination = false;
while (closed_count < ctxs.size()) while (closed_count < ctxs.size())
{ {
if (!res.success && !forced_termination) if (!res.success && !termination_forced)
{ {
terminate(); terminate();
forced_termination = true; termination_forced = true;
} }
for (auto &ctx : ctxs) for (auto &ctx : ctxs)
{ {