diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index 680f741f..01f1f76d 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -228,7 +228,7 @@ void application::process_inspector_events( std::shared_ptr inspector, std::shared_ptr statsw, std::string source, // an empty source represents capture mode - std::atomic* finished, + application::source_sync_context* sync, application::run_result* res) noexcept { try @@ -277,7 +277,10 @@ void application::process_inspector_events( *res = run_result::fatal(e.what()); } - finished->store(true, std::memory_order_seq_cst); + if (sync) + { + sync->finish(); + } } static std::shared_ptr init_stats_writer(const cmdline_options& opts) @@ -315,8 +318,7 @@ application::run_result application::process_events() return res; } - std::atomic finished; - process_inspector_events(m_state->offline_inspector, statsw, "", &finished, &res); + process_inspector_events(m_state->offline_inspector, statsw, "", nullptr, &res); m_state->offline_inspector->close(); // Honor -M also when using a trace file. @@ -339,19 +341,18 @@ application::run_result application::process_events() // the name of the source of which events are processed std::string source; - // set to true when the event processing loop finishes - std::unique_ptr> finished; - // set to true when the result has been collected after finishing - std::unique_ptr> joined; // the result of the event processing loop application::run_result res; // if non-null, the thread on which events are processed std::unique_ptr thread; + // used for thread synchronization purposes + std::unique_ptr sync; }; print_enabled_event_sources(); // start event processing for all enabled sources + falco::semaphore termination_sem(m_state->enabled_sources.size()); std::vector ctxs; ctxs.reserve(m_state->enabled_sources.size()); for (const auto& source : m_state->enabled_sources) @@ -359,35 +360,33 @@ application::run_result application::process_events() ctxs.emplace_back(); auto& ctx = ctxs[ctxs.size() - 1]; ctx.source = source; - ctx.finished.reset(new std::atomic()); - ctx.joined.reset(new std::atomic()); - ctx.finished->store(false, std::memory_order_seq_cst); - ctx.joined->store(false, std::memory_order_seq_cst); + ctx.sync.reset(new application::source_sync_context(termination_sem)); auto src_info = m_state->source_infos.at(source); try { falco_logger::log(LOG_DEBUG, "Opening event source '" + source + "'\n"); + termination_sem.acquire(); res = open_live_inspector(src_info->inspector, source); if (!res.success) { // note: we don't return here because we need to reach // the thread termination loop below to make sure all // already-spawned threads get terminated gracefully - ctx.finished->store(true, std::memory_order_seq_cst); + ctx.sync->finish(); break; } if (m_state->enabled_sources.size() == 1) { // optimization: with only one source we don't spawn additional threads - process_inspector_events(src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res); + process_inspector_events(src_info->inspector, statsw, source, ctx.sync.get(), &ctx.res); } else { ctx.thread.reset(new std::thread( &application::process_inspector_events, - this, src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res)); + this, src_info->inspector, statsw, source, ctx.sync.get(), &ctx.res)); } } catch (std::exception &e) @@ -396,7 +395,7 @@ application::run_result application::process_events() // the thread termination loop below to make sure all // already-spawned threads get terminated gracefully ctx.res = run_result::fatal(e.what()); - ctx.finished->store(true, std::memory_order_seq_cst); + ctx.sync->finish(); break; } } @@ -408,16 +407,24 @@ application::run_result application::process_events() size_t closed_count = 0; while (closed_count < ctxs.size()) { + // This is shared across all running event source threads an + // keeps the main thread sleepy until one of the parallel + // threads terminates and invokes release(). At that point, + // we know that at least one thread finished running and we can + // attempt joining it. Not that this also works when only one + // event source is enabled, in which we have no additional threads. + termination_sem.acquire(); + if (!res.success && !termination_forced) { falco_logger::log(LOG_INFO, "An error occurred in an event source, forcing termination...\n"); terminate(false); termination_forced = true; } + for (auto &ctx : ctxs) { - if (ctx.finished->load(std::memory_order_seq_cst) - && !ctx.joined->load(std::memory_order_seq_cst)) + if (ctx.sync->finished() && !ctx.sync->joined()) { if (ctx.thread) { @@ -436,7 +443,7 @@ application::run_result application::process_events() m_state->source_infos.at(ctx.source)->inspector->close(); res = run_result::merge(res, ctx.res); - ctx.joined->store(true, std::memory_order_seq_cst); + ctx.sync->join(); closed_count++; } } diff --git a/userspace/falco/application.h b/userspace/falco/application.h index d41c4d8a..efd368d2 100644 --- a/userspace/falco/application.h +++ b/userspace/falco/application.h @@ -369,7 +369,7 @@ private: std::shared_ptr inspector, std::shared_ptr statsw, std::string source, // an empty source represents capture mode - std::atomic* finished, + application::source_sync_context* sync, run_result* res) noexcept; /* Returns true if we are in capture mode. */