From f2aba88a6c905c4a27f095fb3531bb4f03086091 Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Thu, 23 Jun 2022 11:08:43 +0000 Subject: [PATCH] refactor(userspace/falco): ensure falco outputs are non-blocking and define exiting condition Signed-off-by: Jason Dellaluce --- userspace/falco/falco_outputs.cpp | 25 +++++++++++++++++-------- userspace/falco/falco_outputs.h | 3 ++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 2c69e6a4..99b358c6 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -156,7 +156,7 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source, cmsg.tags.insert(tags.begin(), tags.end()); cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT; - m_queue.push(cmsg); + this->push(cmsg); } void falco_outputs::handle_msg(uint64_t ts, @@ -219,17 +219,17 @@ void falco_outputs::handle_msg(uint64_t ts, } cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT; - m_queue.push(cmsg); + this->push(cmsg); } void falco_outputs::cleanup_outputs() { - this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP); + this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP); } void falco_outputs::reopen_outputs() { - this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN); + this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN); } void falco_outputs::stop_worker() @@ -238,22 +238,31 @@ void falco_outputs::stop_worker() wd.start([&](void *) -> void { falco_logger::log(LOG_NOTICE, "output channels still blocked, discarding all remaining notifications\n"); m_queue.clear(); - this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP); + this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP); }); wd.set_timeout(m_timeout, nullptr); - this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP); + this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP); if(m_worker_thread.joinable()) { m_worker_thread.join(); } } -inline void falco_outputs::push(ctrl_msg_type cmt) +inline void falco_outputs::push_ctrl(ctrl_msg_type cmt) { falco_outputs::ctrl_msg cmsg = {}; cmsg.type = cmt; - m_queue.push(cmsg); + this->push(cmsg); +} + +inline void falco_outputs::push(const ctrl_msg& cmsg) +{ + if (!m_queue.try_push(cmsg)) + { + fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n"); + exit(EXIT_FAILURE); + } } // todo(leogr,leodido): this function is not supposed to throw exceptions, and with "noexcept", diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 17ade49f..0da2267b 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -110,7 +110,8 @@ private: falco_outputs_cbq m_queue; std::thread m_worker_thread; - inline void push(ctrl_msg_type cmt); + inline void push(const ctrl_msg& cmsg); + inline void push_ctrl(ctrl_msg_type cmt); void worker() noexcept; void stop_worker(); void add_output(falco::outputs::config oc);