diff --git a/userspace/falco/application.h b/userspace/falco/application.h index b1978bcf..d41c4d8a 100644 --- a/userspace/falco/application.h +++ b/userspace/falco/application.h @@ -16,6 +16,7 @@ limitations under the License. #pragma once +#include "semaphore.h" #include "configuration.h" #include "stats_writer.h" #ifndef MINIMAL_BUILD @@ -198,6 +199,67 @@ private: bool proceed; }; + // used to synchronize different event source running in parallel + class source_sync_context + { + public: + source_sync_context(falco::semaphore& s) + : m_finished(false), m_joined(false), m_semaphore(s) { } + source_sync_context(source_sync_context&&) = default; + source_sync_context& operator = (source_sync_context&&) = default; + source_sync_context(const source_sync_context&) = delete; + source_sync_context& operator = (const source_sync_context&) = delete; + + inline void finish() + { + bool v = false; + while (!m_finished.compare_exchange_weak( + v, true, + std::memory_order_seq_cst, + std::memory_order_seq_cst)) + { + if (v) + { + throw falco_exception("source_sync_context has been finished twice"); + } + } + m_semaphore.release(); + } + + inline void join() + { + bool v = false; + while (!m_joined.compare_exchange_weak( + v, true, + std::memory_order_seq_cst, + std::memory_order_seq_cst)) + { + if (v) + { + throw falco_exception("source_sync_context has been joined twice"); + } + } + } + + inline bool joined() + { + return m_joined.load(std::memory_order_seq_cst); + } + + inline bool finished() + { + return m_finished.load(std::memory_order_seq_cst); + } + + private: + // set to true when the event processing loop finishes + std::atomic m_finished; + // set to true when the result has been collected after finishing + std::atomic m_joined; + // used to notify the waiting thread when finished gets set to true + falco::semaphore& m_semaphore; + }; + // Convenience method. Read a sequence of filenames and fill // in a vector of rules contents. // Also fill in the provided rules_contents_t with a mapping from