diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index 214f8bfc..49a4fc41 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -20,6 +20,7 @@ limitations under the License. #include #include #include +#include #include #include "falco_utils.h" @@ -226,6 +227,7 @@ void application::process_inspector_events( std::shared_ptr inspector, std::shared_ptr statsw, std::string source, // an empty source represents capture mode + std::atomic* finished, application::run_result* res) noexcept { try @@ -273,6 +275,8 @@ void application::process_inspector_events( { *res = run_result::fatal(e.what()); } + + finished->store(true, std::memory_order_seq_cst); } static std::shared_ptr init_stats_writer(const cmdline_options& opts) @@ -310,7 +314,8 @@ application::run_result application::process_events() return res; } - process_inspector_events(m_state->offline_inspector, statsw, "", &res); + std::atomic finished; + process_inspector_events(m_state->offline_inspector, statsw, "", &finished, &res); m_state->offline_inspector->close(); // Honor -M also when using a trace file. @@ -333,6 +338,10 @@ application::run_result application::process_events() // the name of the source of which events are processed std::string source; + // set to true when the event processing loop finishes + std::unique_ptr> finished; + // set to true when the result has been collected after finishing + std::unique_ptr> joined; // the result of the event processing loop application::run_result res; // if non-null, the thread on which events are processed @@ -349,6 +358,10 @@ application::run_result application::process_events() ctxs.emplace_back(); auto& ctx = ctxs[ctxs.size() - 1]; ctx.source = source; + ctx.finished.reset(new std::atomic()); + ctx.joined.reset(new std::atomic()); + ctx.finished->store(false, std::memory_order_seq_cst); + ctx.joined->store(false, std::memory_order_seq_cst); auto src_info = m_state->source_infos.at(source); try @@ -366,13 +379,13 @@ application::run_result application::process_events() if (m_state->enabled_sources.size() == 1) { // optimization: with only one source we don't spawn additional threads - process_inspector_events(src_info->inspector, statsw, source, &ctx.res); + process_inspector_events(src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res); } else { ctx.thread.reset(new std::thread( &application::process_inspector_events, - this, src_info->inspector, statsw, source, &ctx.res)); + this, src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res)); } } catch (std::exception &e) @@ -394,24 +407,35 @@ application::run_result application::process_events() { if (!res.success && !termination_forced) { + falco_logger::log(LOG_INFO, "An error occurred in one event source, forcing termination...\n"); terminate(); termination_forced = true; } for (auto &ctx : ctxs) { - if (ctx.thread) + if (ctx.finished->load(std::memory_order_seq_cst) + && !ctx.joined->load(std::memory_order_seq_cst)) { - if (!ctx.thread->joinable()) + if (ctx.thread) { - continue; + if (!ctx.thread->joinable()) + { + // thread has finished executing but + // we already joined it, so we skip to the next one. + // technically, we should never get here because + // ctx.joined should already be true at this point + continue; + } + ctx.thread->join(); } - ctx.thread->join(); - ctx.thread = nullptr; + + falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n"); + m_state->source_infos.at(ctx.source)->inspector->close(); + + res = run_result::merge(res, ctx.res); + ctx.joined->store(true, std::memory_order_seq_cst); + closed_count++; } - falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n"); - m_state->source_infos.at(ctx.source)->inspector->close(); - res = run_result::merge(res, ctx.res); - closed_count++; } } } diff --git a/userspace/falco/application.h b/userspace/falco/application.h index 9b123fa4..767f3257 100644 --- a/userspace/falco/application.h +++ b/userspace/falco/application.h @@ -304,6 +304,7 @@ private: std::shared_ptr inspector, std::shared_ptr statsw, std::string source, // an empty source represents capture mode + std::atomic* finished, run_result* res) noexcept; /* Returns true if we are in capture mode. */