fix(userspace/falco): make multi-source termination condition more stable

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce 2022-10-10 11:02:15 +00:00 committed by poiana
parent 3f3386cfe0
commit fd4d521a5f
2 changed files with 37 additions and 12 deletions

View File

@ -20,6 +20,7 @@ limitations under the License.
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#include <atomic>
#include <unordered_map> #include <unordered_map>
#include "falco_utils.h" #include "falco_utils.h"
@ -226,6 +227,7 @@ void application::process_inspector_events(
std::shared_ptr<sinsp> inspector, std::shared_ptr<sinsp> inspector,
std::shared_ptr<stats_writer> statsw, std::shared_ptr<stats_writer> statsw,
std::string source, // an empty source represents capture mode std::string source, // an empty source represents capture mode
std::atomic<bool>* finished,
application::run_result* res) noexcept application::run_result* res) noexcept
{ {
try try
@ -273,6 +275,8 @@ void application::process_inspector_events(
{ {
*res = run_result::fatal(e.what()); *res = run_result::fatal(e.what());
} }
finished->store(true, std::memory_order_seq_cst);
} }
static std::shared_ptr<stats_writer> init_stats_writer(const cmdline_options& opts) static std::shared_ptr<stats_writer> init_stats_writer(const cmdline_options& opts)
@ -310,7 +314,8 @@ application::run_result application::process_events()
return res; return res;
} }
process_inspector_events(m_state->offline_inspector, statsw, "", &res); std::atomic<bool> finished;
process_inspector_events(m_state->offline_inspector, statsw, "", &finished, &res);
m_state->offline_inspector->close(); m_state->offline_inspector->close();
// Honor -M also when using a trace file. // Honor -M also when using a trace file.
@ -333,6 +338,10 @@ application::run_result application::process_events()
// the name of the source of which events are processed // the name of the source of which events are processed
std::string source; std::string source;
// set to true when the event processing loop finishes
std::unique_ptr<std::atomic<bool>> finished;
// set to true when the result has been collected after finishing
std::unique_ptr<std::atomic<bool>> joined;
// the result of the event processing loop // the result of the event processing loop
application::run_result res; application::run_result res;
// if non-null, the thread on which events are processed // if non-null, the thread on which events are processed
@ -349,6 +358,10 @@ application::run_result application::process_events()
ctxs.emplace_back(); ctxs.emplace_back();
auto& ctx = ctxs[ctxs.size() - 1]; auto& ctx = ctxs[ctxs.size() - 1];
ctx.source = source; ctx.source = source;
ctx.finished.reset(new std::atomic<bool>());
ctx.joined.reset(new std::atomic<bool>());
ctx.finished->store(false, std::memory_order_seq_cst);
ctx.joined->store(false, std::memory_order_seq_cst);
auto src_info = m_state->source_infos.at(source); auto src_info = m_state->source_infos.at(source);
try try
@ -366,13 +379,13 @@ application::run_result application::process_events()
if (m_state->enabled_sources.size() == 1) if (m_state->enabled_sources.size() == 1)
{ {
// optimization: with only one source we don't spawn additional threads // optimization: with only one source we don't spawn additional threads
process_inspector_events(src_info->inspector, statsw, source, &ctx.res); process_inspector_events(src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res);
} }
else else
{ {
ctx.thread.reset(new std::thread( ctx.thread.reset(new std::thread(
&application::process_inspector_events, &application::process_inspector_events,
this, src_info->inspector, statsw, source, &ctx.res)); this, src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res));
} }
} }
catch (std::exception &e) catch (std::exception &e)
@ -394,24 +407,35 @@ application::run_result application::process_events()
{ {
if (!res.success && !termination_forced) if (!res.success && !termination_forced)
{ {
falco_logger::log(LOG_INFO, "An error occurred in one event source, forcing termination...\n");
terminate(); terminate();
termination_forced = true; termination_forced = true;
} }
for (auto &ctx : ctxs) for (auto &ctx : ctxs)
{ {
if (ctx.thread) if (ctx.finished->load(std::memory_order_seq_cst)
&& !ctx.joined->load(std::memory_order_seq_cst))
{ {
if (!ctx.thread->joinable()) if (ctx.thread)
{ {
continue; if (!ctx.thread->joinable())
{
// thread has finished executing but
// we already joined it, so we skip to the next one.
// technically, we should never get here because
// ctx.joined should already be true at this point
continue;
}
ctx.thread->join();
} }
ctx.thread->join();
ctx.thread = nullptr; falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n");
m_state->source_infos.at(ctx.source)->inspector->close();
res = run_result::merge(res, ctx.res);
ctx.joined->store(true, std::memory_order_seq_cst);
closed_count++;
} }
falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n");
m_state->source_infos.at(ctx.source)->inspector->close();
res = run_result::merge(res, ctx.res);
closed_count++;
} }
} }
} }

View File

@ -304,6 +304,7 @@ private:
std::shared_ptr<sinsp> inspector, std::shared_ptr<sinsp> inspector,
std::shared_ptr<stats_writer> statsw, std::shared_ptr<stats_writer> statsw,
std::string source, // an empty source represents capture mode std::string source, // an empty source represents capture mode
std::atomic<bool>* finished,
run_result* res) noexcept; run_result* res) noexcept;
/* Returns true if we are in capture mode. */ /* Returns true if we are in capture mode. */