refactor(userspace/falco): ensure falco outputs are non-blocking and define exiting condition

Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
This commit is contained in:
Jason Dellaluce 2022-06-23 11:08:43 +00:00 committed by poiana
parent bc765f1b7d
commit f2aba88a6c
2 changed files with 19 additions and 9 deletions

View File

@ -156,7 +156,7 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
cmsg.tags.insert(tags.begin(), tags.end()); cmsg.tags.insert(tags.begin(), tags.end());
cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT; cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
m_queue.push(cmsg); this->push(cmsg);
} }
void falco_outputs::handle_msg(uint64_t ts, void falco_outputs::handle_msg(uint64_t ts,
@ -219,17 +219,17 @@ void falco_outputs::handle_msg(uint64_t ts,
} }
cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT; cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
m_queue.push(cmsg); this->push(cmsg);
} }
void falco_outputs::cleanup_outputs() void falco_outputs::cleanup_outputs()
{ {
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP); this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP);
} }
void falco_outputs::reopen_outputs() void falco_outputs::reopen_outputs()
{ {
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN); this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN);
} }
void falco_outputs::stop_worker() void falco_outputs::stop_worker()
@ -238,22 +238,31 @@ void falco_outputs::stop_worker()
wd.start([&](void *) -> void { wd.start([&](void *) -> void {
falco_logger::log(LOG_NOTICE, "output channels still blocked, discarding all remaining notifications\n"); falco_logger::log(LOG_NOTICE, "output channels still blocked, discarding all remaining notifications\n");
m_queue.clear(); m_queue.clear();
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP); this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP);
}); });
wd.set_timeout(m_timeout, nullptr); wd.set_timeout(m_timeout, nullptr);
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP); this->push_ctrl(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP);
if(m_worker_thread.joinable()) if(m_worker_thread.joinable())
{ {
m_worker_thread.join(); m_worker_thread.join();
} }
} }
inline void falco_outputs::push(ctrl_msg_type cmt) inline void falco_outputs::push_ctrl(ctrl_msg_type cmt)
{ {
falco_outputs::ctrl_msg cmsg = {}; falco_outputs::ctrl_msg cmsg = {};
cmsg.type = cmt; cmsg.type = cmt;
m_queue.push(cmsg); this->push(cmsg);
}
inline void falco_outputs::push(const ctrl_msg& cmsg)
{
if (!m_queue.try_push(cmsg))
{
fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n");
exit(EXIT_FAILURE);
}
} }
// todo(leogr,leodido): this function is not supposed to throw exceptions, and with "noexcept", // todo(leogr,leodido): this function is not supposed to throw exceptions, and with "noexcept",

View File

@ -110,7 +110,8 @@ private:
falco_outputs_cbq m_queue; falco_outputs_cbq m_queue;
std::thread m_worker_thread; std::thread m_worker_thread;
inline void push(ctrl_msg_type cmt); inline void push(const ctrl_msg& cmsg);
inline void push_ctrl(ctrl_msg_type cmt);
void worker() noexcept; void worker() noexcept;
void stop_worker(); void stop_worker();
void add_output(falco::outputs::config oc); void add_output(falco::outputs::config oc);