From 59ba2f9aab3e77df3cb81e55f78e3c9636abb804 Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Thu, 6 Oct 2022 14:21:03 +0000 Subject: [PATCH] fix(userspace/falco): properly terminate threads Signed-off-by: Jason Dellaluce --- .../falco/app_actions/process_events.cpp | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index 22c20687..d2d12703 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -284,6 +284,7 @@ static std::shared_ptr init_stats_writer(const cmdline_options& op application::run_result application::process_events() { application::run_result res = run_result::ok(); + bool termination_forced = false; // Notify engine that we finished loading and enabling all rules m_state->engine->complete_rule_loading(); @@ -313,49 +314,62 @@ application::run_result application::process_events() } else { - typedef struct + struct live_context { + live_context() = default; + live_context(live_context&&) = default; + live_context& operator = (live_context&&) = default; + live_context(const live_context&) = default; + live_context& operator = (const live_context&) = default; + // the name of the source of which events are processed std::string source; // the result of the event processing loop application::run_result res; // if non-null, the thread on which events are processed std::unique_ptr thread; - } live_context; + }; // start event processing for all enabled sources std::vector ctxs; ctxs.reserve(m_state->enabled_sources.size()); - for (auto source : m_state->enabled_sources) + for (const auto& source : m_state->enabled_sources) { - auto src_info = m_state->source_infos.at(source); - auto ctx_idx = ctxs.size(); ctxs.emplace_back(); - ctxs[ctx_idx].source = source; + auto& ctx = ctxs[ctxs.size() - 1]; + ctx.source = source; + auto src_info = m_state->source_infos.at(source); + try { falco_logger::log(LOG_DEBUG, "Opening event source '" + source + "'\n"); res = open_live_inspector(src_info->inspector, source); if (!res.success) { - return res; + // note: we don't return here because we need to reach + // the thread termination loop below to make sure all + // already-spawned threads get terminated gracefully + break; } if (m_state->enabled_sources.size() == 1) { // optimization: with only one source we don't spawn additional threads - process_inspector_events(src_info->inspector, statsw, source, &ctxs[ctx_idx].res); + process_inspector_events(src_info->inspector, statsw, source, &ctx.res); } else { - ctxs[ctx_idx].thread.reset(new std::thread( + ctx.thread.reset(new std::thread( &application::process_inspector_events, - this, src_info->inspector, statsw, source, &ctxs[ctx_idx].res)); + this, src_info->inspector, statsw, source, &ctx.res)); } } catch (std::exception &e) { - ctxs[ctx_idx].res = run_result::fatal(e.what()); + // note: we don't return here because we need to reach + // the thread termination loop below to make sure all + // already-spawned threads get terminated gracefully + ctx.res = run_result::fatal(e.what()); break; } } @@ -365,13 +379,12 @@ application::run_result application::process_events() // to force all other event streams to termiante too. // We accomulate the errors in a single run_result. size_t closed_count = 0; - bool forced_termination = false; while (closed_count < ctxs.size()) { - if (!res.success && !forced_termination) + if (!res.success && !termination_forced) { terminate(); - forced_termination = true; + termination_forced = true; } for (auto &ctx : ctxs) {