diff --git a/userspace/falco/CMakeLists.txt b/userspace/falco/CMakeLists.txt index adcd6168..6b8d0f9a 100644 --- a/userspace/falco/CMakeLists.txt +++ b/userspace/falco/CMakeLists.txt @@ -20,13 +20,16 @@ if(NOT SYSDIG_DIR) endif() configure_file("${SYSDIG_DIR}/userspace/sysdig/config_sysdig.h.in" config_sysdig.h) + add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/falco_output.grpc.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/falco_output.grpc.pb.h ${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.h + ${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc + ${CMAKE_CURRENT_BINARY_DIR}/schema.pb.h COMMENT "Generate gRPC code for falco_output" DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto - COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto + COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto ${CMAKE_CURRENT_SOURCE_DIR}/schema.proto COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN} ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) @@ -44,7 +47,8 @@ add_executable(falco grpc_server.cpp utils.cpp ${CMAKE_CURRENT_BINARY_DIR}/falco_output.grpc.pb.cc - ${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.cc) + ${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.cc + ${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc) target_include_directories(falco PUBLIC "${SYSDIG_DIR}/userspace/sysdig" diff --git a/userspace/falco/falco.cpp b/userspace/falco/falco.cpp index 7b255562..469f51e2 100644 --- a/userspace/falco/falco.cpp +++ b/userspace/falco/falco.cpp @@ -48,7 +48,6 @@ limitations under the License. #include "statsfilewriter.h" #include "webserver.h" #include "grpc_server.h" -#include "falco_output_queue.h" typedef function open_t; diff --git a/userspace/falco/falco_output.proto b/userspace/falco/falco_output.proto index 9bf6732a..44cbf72c 100644 --- a/userspace/falco/falco_output.proto +++ b/userspace/falco/falco_output.proto @@ -1,6 +1,7 @@ syntax = "proto3"; import "google/protobuf/timestamp.proto"; +import "schema.proto"; package falco.output; @@ -9,54 +10,17 @@ service service { } message request { - string duration = 1; // TODO(leodido, fntlnz): not handled yet but keeping for reference. - bool keepalive = 2; -} - -enum priority { - option allow_alias = true; - EMERGENCY = 0; - emergency = 0; - Emergency = 0; - ALERT = 1; - alert = 1; - Alert = 1; - CRITICAL = 2; - critical = 2; - Critical = 2; - ERROR = 3; - error = 3; - Error = 3; - WARNING = 4; - warning = 4; - Warning = 4; - NOTICE = 5; - notice = 5; - Notice = 5; - INFORMATIONAL = 6; - informational = 6; - Informational = 6; - DEBUG = 7; - debug = 7; - Debug = 7; -} - -enum source { - option allow_alias = true; - SYSCALL = 0; - syscall = 0; - Syscall = 0; - K8S_AUDIT = 1; - k8s_audit = 1; - K8s_audit = 1; - K8S_audit = 1; + bool keepalive = 1; + // string duration = 2; // TODO(leodido, fntlnz): not handled yet but keeping for reference. + // repeated string tags = 3; // TODO(leodido, fntlnz): not handled yet but keeping for reference. } message response { google.protobuf.Timestamp time = 1; - priority priority = 2; - source source = 3; + falco.schema.priority priority = 2; + falco.schema.source source = 3; string rule = 4; string output = 5; map output_fields = 6; + repeated string tags = 7; } \ No newline at end of file diff --git a/userspace/falco/falco_output_queue.h b/userspace/falco/falco_output_queue.h index bc41d24e..639ba952 100644 --- a/userspace/falco/falco_output_queue.h +++ b/userspace/falco/falco_output_queue.h @@ -21,16 +21,18 @@ limitations under the License. #include "falco_output.pb.h" #include "tbb/concurrent_queue.h" -using namespace falco::output; +namespace falco +{ +namespace output +{ +typedef tbb::concurrent_queue response_cq; -typedef tbb::concurrent_queue falco_output_response_cq; - -class falco_output_queue +class queue { public: - static falco_output_queue& get() + static queue& get() { - static falco_output_queue instance; + static queue instance; return instance; } @@ -45,14 +47,16 @@ public: } private: - falco_output_queue() + queue() { } - falco_output_response_cq m_queue; + response_cq m_queue; // We can use the better technique of deleting the methods we don't want. public: - falco_output_queue(falco_output_queue const&) = delete; - void operator=(falco_output_queue const&) = delete; + queue(queue const&) = delete; + void operator=(queue const&) = delete; }; +} // namespace output +} // namespace falco diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index d248840b..581ec587 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -329,9 +329,9 @@ int falco_outputs::handle_grpc(lua_State *ls) grpc_res.set_rule((char *)lua_tostring(ls, 2)); // source - source s = source::SYSCALL; + falco::schema::source s = falco::schema::source::SYSCALL; string sstr = (char *)lua_tostring(ls, 3); - if(!source_Parse(sstr, &s)) + if(!falco::schema::source_Parse(sstr, &s)) { lua_pushstring(ls, "Unknown source passed to to handle_grpc()"); lua_error(ls); @@ -339,9 +339,9 @@ int falco_outputs::handle_grpc(lua_State *ls) grpc_res.set_source(s); // priority - priority p = priority::EMERGENCY; + falco::schema::priority p = falco::schema::priority::EMERGENCY; string pstr = (char *)lua_tostring(ls, 4); - if(!priority_Parse(pstr, &p)) + if(!falco::schema::priority_Parse(pstr, &p)) { lua_pushstring(ls, "Unknown priority passed to to handle_grpc()"); lua_error(ls); @@ -361,7 +361,7 @@ int falco_outputs::handle_grpc(lua_State *ls) } lua_pop(ls, 1); // pop table - falco_output_queue::get().push(grpc_res); + falco::output::queue::get().push(grpc_res); return 1; } \ No newline at end of file diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index a8e6b8be..a0a448ce 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -33,7 +33,7 @@ void falco::grpc::request_stream_context(srvctx)); + m_res_writer.reset(new ::grpc::ServerAsyncWriter(srvctx)); m_stream_ctx.reset(); m_req.Clear(); auto cq = srv->m_completion_queue.get(); @@ -51,7 +51,7 @@ void falco::grpc::request_stream_context*m_process_func)(*m_stream_ctx, m_req, res); // When there still are more responses to stream @@ -77,7 +77,7 @@ void falco::grpc::request_stream_contextm_status = errored ? stream_context::ERROR : stream_context::SUCCESS; // Complete the processing - response res; + output::response res; (srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() } @@ -136,13 +136,13 @@ void falco::grpc::server::thread_process(int thread_index) // // Create array of contexts and start processing streaming RPC request. // -#define PROCESS_STREAM(REQ, RESP, RPC, IMPL, CONTEXT_COUNT) \ - std::vector> RPC##_contexts(CONTEXT_COUNT); \ - for(request_stream_context & ctx : RPC##_contexts) \ - { \ - ctx.m_process_func = &server::IMPL; \ - ctx.m_request_func = &service::AsyncService::Request##RPC; \ - ctx.start(this); \ +#define REGISTER_STREAM(REQ, RESP, RPC, IMPL, CONTEXT_NUM) \ + std::vector> RPC##_contexts(CONTEXT_NUM); \ + for(request_stream_context & ctx : RPC##_contexts) \ + { \ + ctx.m_process_func = &server::IMPL; \ + ctx.m_request_func = &output::service::AsyncService::Request##RPC; \ + ctx.start(this); \ } void falco::grpc::server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs) @@ -170,25 +170,19 @@ void falco::grpc::server::run() ssl_opts.pem_root_certs = root_certs; ssl_opts.pem_key_cert_pairs.push_back(cert_pair); - // Setup server ::grpc::ServerBuilder builder; - // Listen on the given address without any authentication mechanism. builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts)); builder.RegisterService(&m_svc); - // builder.SetMaxSendMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size? - // builder.SetMaxReceiveMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size? - m_completion_queue = builder.AddCompletionQueue(); m_server = builder.BuildAndStart(); falco_logger::log(LOG_INFO, "Starting gRPC server at " + m_server_addr + "\n"); - // Create context for server threads // The number of contexts is multiple of the number of threads // This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC) // For this approach to be sufficient server::IMPL have to be fast - int context_count = m_threadiness * 10; - PROCESS_STREAM(request, response, subscribe, subscribe, context_count) + int context_num = m_threadiness * 10; + REGISTER_STREAM(output::request, output::response, subscribe, subscribe, context_num) m_threads.resize(m_threadiness); int thread_idx = 0; diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 461a77a0..5d2c2d52 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -48,7 +48,7 @@ public: void run(); void stop(); - service::AsyncService m_svc; + output::service::AsyncService m_svc; std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; private: @@ -97,7 +97,7 @@ public: void (server::*m_process_func)(const stream_context&, const Request&, Response&); // Pointer to function that requests the system to start processing given requests - void (service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); + void (output::service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); void start(server* srv); void process(server* srv); diff --git a/userspace/falco/grpc_server_impl.cpp b/userspace/falco/grpc_server_impl.cpp index 5b74dbdd..f423b5af 100644 --- a/userspace/falco/grpc_server_impl.cpp +++ b/userspace/falco/grpc_server_impl.cpp @@ -17,6 +17,7 @@ limitations under the License. */ #include "grpc_server_impl.h" +#include "falco_output_queue.h" bool falco::grpc::server_impl::is_running() { @@ -39,12 +40,12 @@ void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output { // Start (or continue) streaming // ctx.m_status == stream_context::STREAMING - if(falco_output_queue::get().try_pop(res) && !req.keepalive()) + if(output::queue::get().try_pop(res) && !req.keepalive()) { ctx.m_has_more = true; return; } - while(is_running() && !falco_output_queue::get().try_pop(res) && req.keepalive()) + while(is_running() && !output::queue::get().try_pop(res) && req.keepalive()) { } diff --git a/userspace/falco/grpc_server_impl.h b/userspace/falco/grpc_server_impl.h index d1ff4fe0..a430d976 100644 --- a/userspace/falco/grpc_server_impl.h +++ b/userspace/falco/grpc_server_impl.h @@ -19,8 +19,6 @@ limitations under the License. #pragma once #include - -#include "falco_output_queue.h" #include "falco_output.grpc.pb.h" #include "grpc_context.h" diff --git a/userspace/falco/schema.proto b/userspace/falco/schema.proto new file mode 100644 index 00000000..262d096c --- /dev/null +++ b/userspace/falco/schema.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +package falco.schema; + +enum priority { + option allow_alias = true; + EMERGENCY = 0; + emergency = 0; + Emergency = 0; + ALERT = 1; + alert = 1; + Alert = 1; + CRITICAL = 2; + critical = 2; + Critical = 2; + ERROR = 3; + error = 3; + Error = 3; + WARNING = 4; + warning = 4; + Warning = 4; + NOTICE = 5; + notice = 5; + Notice = 5; + INFORMATIONAL = 6; + informational = 6; + Informational = 6; + DEBUG = 7; + debug = 7; + Debug = 7; +} + +enum source { + option allow_alias = true; + SYSCALL = 0; + syscall = 0; + Syscall = 0; + K8S_AUDIT = 1; + k8s_audit = 1; + K8s_audit = 1; + K8S_audit = 1; +} \ No newline at end of file