fix(userspace/falco): avoid using CPU when main thread waits for parallel event sources

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce 2022-10-14 10:22:13 +00:00 committed by poiana
parent 3d7677ce5b
commit 10fe9fd84b
2 changed files with 27 additions and 20 deletions

View File

@ -228,7 +228,7 @@ void application::process_inspector_events(
std::shared_ptr<sinsp> inspector, std::shared_ptr<sinsp> inspector,
std::shared_ptr<stats_writer> statsw, std::shared_ptr<stats_writer> statsw,
std::string source, // an empty source represents capture mode std::string source, // an empty source represents capture mode
std::atomic<bool>* finished, application::source_sync_context* sync,
application::run_result* res) noexcept application::run_result* res) noexcept
{ {
try try
@ -277,7 +277,10 @@ void application::process_inspector_events(
*res = run_result::fatal(e.what()); *res = run_result::fatal(e.what());
} }
finished->store(true, std::memory_order_seq_cst); if (sync)
{
sync->finish();
}
} }
static std::shared_ptr<stats_writer> init_stats_writer(const cmdline_options& opts) static std::shared_ptr<stats_writer> init_stats_writer(const cmdline_options& opts)
@ -315,8 +318,7 @@ application::run_result application::process_events()
return res; return res;
} }
std::atomic<bool> finished; process_inspector_events(m_state->offline_inspector, statsw, "", nullptr, &res);
process_inspector_events(m_state->offline_inspector, statsw, "", &finished, &res);
m_state->offline_inspector->close(); m_state->offline_inspector->close();
// Honor -M also when using a trace file. // Honor -M also when using a trace file.
@ -339,19 +341,18 @@ application::run_result application::process_events()
// the name of the source of which events are processed // the name of the source of which events are processed
std::string source; std::string source;
// set to true when the event processing loop finishes
std::unique_ptr<std::atomic<bool>> finished;
// set to true when the result has been collected after finishing
std::unique_ptr<std::atomic<bool>> joined;
// the result of the event processing loop // the result of the event processing loop
application::run_result res; application::run_result res;
// if non-null, the thread on which events are processed // if non-null, the thread on which events are processed
std::unique_ptr<std::thread> thread; std::unique_ptr<std::thread> thread;
// used for thread synchronization purposes
std::unique_ptr<application::source_sync_context> sync;
}; };
print_enabled_event_sources(); print_enabled_event_sources();
// start event processing for all enabled sources // start event processing for all enabled sources
falco::semaphore termination_sem(m_state->enabled_sources.size());
std::vector<live_context> ctxs; std::vector<live_context> ctxs;
ctxs.reserve(m_state->enabled_sources.size()); ctxs.reserve(m_state->enabled_sources.size());
for (const auto& source : m_state->enabled_sources) for (const auto& source : m_state->enabled_sources)
@ -359,35 +360,33 @@ application::run_result application::process_events()
ctxs.emplace_back(); ctxs.emplace_back();
auto& ctx = ctxs[ctxs.size() - 1]; auto& ctx = ctxs[ctxs.size() - 1];
ctx.source = source; ctx.source = source;
ctx.finished.reset(new std::atomic<bool>()); ctx.sync.reset(new application::source_sync_context(termination_sem));
ctx.joined.reset(new std::atomic<bool>());
ctx.finished->store(false, std::memory_order_seq_cst);
ctx.joined->store(false, std::memory_order_seq_cst);
auto src_info = m_state->source_infos.at(source); auto src_info = m_state->source_infos.at(source);
try try
{ {
falco_logger::log(LOG_DEBUG, "Opening event source '" + source + "'\n"); falco_logger::log(LOG_DEBUG, "Opening event source '" + source + "'\n");
termination_sem.acquire();
res = open_live_inspector(src_info->inspector, source); res = open_live_inspector(src_info->inspector, source);
if (!res.success) if (!res.success)
{ {
// note: we don't return here because we need to reach // note: we don't return here because we need to reach
// the thread termination loop below to make sure all // the thread termination loop below to make sure all
// already-spawned threads get terminated gracefully // already-spawned threads get terminated gracefully
ctx.finished->store(true, std::memory_order_seq_cst); ctx.sync->finish();
break; break;
} }
if (m_state->enabled_sources.size() == 1) if (m_state->enabled_sources.size() == 1)
{ {
// optimization: with only one source we don't spawn additional threads // optimization: with only one source we don't spawn additional threads
process_inspector_events(src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res); process_inspector_events(src_info->inspector, statsw, source, ctx.sync.get(), &ctx.res);
} }
else else
{ {
ctx.thread.reset(new std::thread( ctx.thread.reset(new std::thread(
&application::process_inspector_events, &application::process_inspector_events,
this, src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res)); this, src_info->inspector, statsw, source, ctx.sync.get(), &ctx.res));
} }
} }
catch (std::exception &e) catch (std::exception &e)
@ -396,7 +395,7 @@ application::run_result application::process_events()
// the thread termination loop below to make sure all // the thread termination loop below to make sure all
// already-spawned threads get terminated gracefully // already-spawned threads get terminated gracefully
ctx.res = run_result::fatal(e.what()); ctx.res = run_result::fatal(e.what());
ctx.finished->store(true, std::memory_order_seq_cst); ctx.sync->finish();
break; break;
} }
} }
@ -408,16 +407,24 @@ application::run_result application::process_events()
size_t closed_count = 0; size_t closed_count = 0;
while (closed_count < ctxs.size()) while (closed_count < ctxs.size())
{ {
// This is shared across all running event source threads an
// keeps the main thread sleepy until one of the parallel
// threads terminates and invokes release(). At that point,
// we know that at least one thread finished running and we can
// attempt joining it. Not that this also works when only one
// event source is enabled, in which we have no additional threads.
termination_sem.acquire();
if (!res.success && !termination_forced) if (!res.success && !termination_forced)
{ {
falco_logger::log(LOG_INFO, "An error occurred in an event source, forcing termination...\n"); falco_logger::log(LOG_INFO, "An error occurred in an event source, forcing termination...\n");
terminate(false); terminate(false);
termination_forced = true; termination_forced = true;
} }
for (auto &ctx : ctxs) for (auto &ctx : ctxs)
{ {
if (ctx.finished->load(std::memory_order_seq_cst) if (ctx.sync->finished() && !ctx.sync->joined())
&& !ctx.joined->load(std::memory_order_seq_cst))
{ {
if (ctx.thread) if (ctx.thread)
{ {
@ -436,7 +443,7 @@ application::run_result application::process_events()
m_state->source_infos.at(ctx.source)->inspector->close(); m_state->source_infos.at(ctx.source)->inspector->close();
res = run_result::merge(res, ctx.res); res = run_result::merge(res, ctx.res);
ctx.joined->store(true, std::memory_order_seq_cst); ctx.sync->join();
closed_count++; closed_count++;
} }
} }

View File

@ -369,7 +369,7 @@ private:
std::shared_ptr<sinsp> inspector, std::shared_ptr<sinsp> inspector,
std::shared_ptr<stats_writer> statsw, std::shared_ptr<stats_writer> statsw,
std::string source, // an empty source represents capture mode std::string source, // an empty source represents capture mode
std::atomic<bool>* finished, application::source_sync_context* sync,
run_result* res) noexcept; run_result* res) noexcept;
/* Returns true if we are in capture mode. */ /* Returns true if we are in capture mode. */