chore(userspace/falco): better organization of schema and grpc server

Co-Authored-By: Leonardo Di Donato <leodidonato@gmail.com>
Signed-off-by: Lorenzo Fontana <lo@linux.com>
This commit is contained in:
Lorenzo Fontana
2019-09-24 17:31:22 +02:00
committed by Leo Di Donato
parent 6cf2ccf857
commit eb8248fe04
10 changed files with 91 additions and 85 deletions

View File

@@ -20,13 +20,16 @@ if(NOT SYSDIG_DIR)
endif()
configure_file("${SYSDIG_DIR}/userspace/sysdig/config_sysdig.h.in" config_sysdig.h)
add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/falco_output.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/falco_output.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.h
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.h
COMMENT "Generate gRPC code for falco_output"
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto ${CMAKE_CURRENT_SOURCE_DIR}/schema.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN} ${CMAKE_CURRENT_SOURCE_DIR}/falco_output.proto
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
@@ -44,7 +47,8 @@ add_executable(falco
grpc_server.cpp
utils.cpp
${CMAKE_CURRENT_BINARY_DIR}/falco_output.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.cc)
${CMAKE_CURRENT_BINARY_DIR}/falco_output.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc)
target_include_directories(falco PUBLIC
"${SYSDIG_DIR}/userspace/sysdig"

View File

@@ -48,7 +48,6 @@ limitations under the License.
#include "statsfilewriter.h"
#include "webserver.h"
#include "grpc_server.h"
#include "falco_output_queue.h"
typedef function<void(sinsp* inspector)> open_t;

View File

@@ -1,6 +1,7 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "schema.proto";
package falco.output;
@@ -9,54 +10,17 @@ service service {
}
message request {
string duration = 1; // TODO(leodido, fntlnz): not handled yet but keeping for reference.
bool keepalive = 2;
}
enum priority {
option allow_alias = true;
EMERGENCY = 0;
emergency = 0;
Emergency = 0;
ALERT = 1;
alert = 1;
Alert = 1;
CRITICAL = 2;
critical = 2;
Critical = 2;
ERROR = 3;
error = 3;
Error = 3;
WARNING = 4;
warning = 4;
Warning = 4;
NOTICE = 5;
notice = 5;
Notice = 5;
INFORMATIONAL = 6;
informational = 6;
Informational = 6;
DEBUG = 7;
debug = 7;
Debug = 7;
}
enum source {
option allow_alias = true;
SYSCALL = 0;
syscall = 0;
Syscall = 0;
K8S_AUDIT = 1;
k8s_audit = 1;
K8s_audit = 1;
K8S_audit = 1;
bool keepalive = 1;
// string duration = 2; // TODO(leodido, fntlnz): not handled yet but keeping for reference.
// repeated string tags = 3; // TODO(leodido, fntlnz): not handled yet but keeping for reference.
}
message response {
google.protobuf.Timestamp time = 1;
priority priority = 2;
source source = 3;
falco.schema.priority priority = 2;
falco.schema.source source = 3;
string rule = 4;
string output = 5;
map<string, string> output_fields = 6;
repeated string tags = 7;
}

View File

@@ -21,16 +21,18 @@ limitations under the License.
#include "falco_output.pb.h"
#include "tbb/concurrent_queue.h"
using namespace falco::output;
namespace falco
{
namespace output
{
typedef tbb::concurrent_queue<response> response_cq;
typedef tbb::concurrent_queue<response> falco_output_response_cq;
class falco_output_queue
class queue
{
public:
static falco_output_queue& get()
static queue& get()
{
static falco_output_queue instance;
static queue instance;
return instance;
}
@@ -45,14 +47,16 @@ public:
}
private:
falco_output_queue()
queue()
{
}
falco_output_response_cq m_queue;
response_cq m_queue;
// We can use the better technique of deleting the methods we don't want.
public:
falco_output_queue(falco_output_queue const&) = delete;
void operator=(falco_output_queue const&) = delete;
queue(queue const&) = delete;
void operator=(queue const&) = delete;
};
} // namespace output
} // namespace falco

View File

@@ -329,9 +329,9 @@ int falco_outputs::handle_grpc(lua_State *ls)
grpc_res.set_rule((char *)lua_tostring(ls, 2));
// source
source s = source::SYSCALL;
falco::schema::source s = falco::schema::source::SYSCALL;
string sstr = (char *)lua_tostring(ls, 3);
if(!source_Parse(sstr, &s))
if(!falco::schema::source_Parse(sstr, &s))
{
lua_pushstring(ls, "Unknown source passed to to handle_grpc()");
lua_error(ls);
@@ -339,9 +339,9 @@ int falco_outputs::handle_grpc(lua_State *ls)
grpc_res.set_source(s);
// priority
priority p = priority::EMERGENCY;
falco::schema::priority p = falco::schema::priority::EMERGENCY;
string pstr = (char *)lua_tostring(ls, 4);
if(!priority_Parse(pstr, &p))
if(!falco::schema::priority_Parse(pstr, &p))
{
lua_pushstring(ls, "Unknown priority passed to to handle_grpc()");
lua_error(ls);
@@ -361,7 +361,7 @@ int falco_outputs::handle_grpc(lua_State *ls)
}
lua_pop(ls, 1); // pop table
falco_output_queue::get().push(grpc_res);
falco::output::queue::get().push(grpc_res);
return 1;
}

View File

@@ -33,7 +33,7 @@ void falco::grpc::request_stream_context<falco::output::request, falco::output::
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<response>(srvctx));
m_res_writer.reset(new ::grpc::ServerAsyncWriter<output::response>(srvctx));
m_stream_ctx.reset();
m_req.Clear();
auto cq = srv->m_completion_queue.get();
@@ -51,7 +51,7 @@ void falco::grpc::request_stream_context<falco::output::request, falco::output::
}
// Processing
response res;
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res);
// When there still are more responses to stream
@@ -77,7 +77,7 @@ void falco::grpc::request_stream_context<falco::output::request, falco::output::
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
// Complete the processing
response res;
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
}
@@ -136,13 +136,13 @@ void falco::grpc::server::thread_process(int thread_index)
//
// Create array of contexts and start processing streaming RPC request.
//
#define PROCESS_STREAM(REQ, RESP, RPC, IMPL, CONTEXT_COUNT) \
std::vector<request_stream_context<REQ, RESP>> RPC##_contexts(CONTEXT_COUNT); \
for(request_stream_context<REQ, RESP> & ctx : RPC##_contexts) \
{ \
ctx.m_process_func = &server::IMPL; \
ctx.m_request_func = &service::AsyncService::Request##RPC; \
ctx.start(this); \
#define REGISTER_STREAM(REQ, RESP, RPC, IMPL, CONTEXT_NUM) \
std::vector<request_stream_context<REQ, RESP>> RPC##_contexts(CONTEXT_NUM); \
for(request_stream_context<REQ, RESP> & ctx : RPC##_contexts) \
{ \
ctx.m_process_func = &server::IMPL; \
ctx.m_request_func = &output::service::AsyncService::Request##RPC; \
ctx.start(this); \
}
void falco::grpc::server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs)
@@ -170,25 +170,19 @@ void falco::grpc::server::run()
ssl_opts.pem_root_certs = root_certs;
ssl_opts.pem_key_cert_pairs.push_back(cert_pair);
// Setup server
::grpc::ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts));
builder.RegisterService(&m_svc);
// builder.SetMaxSendMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size?
// builder.SetMaxReceiveMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size?
m_completion_queue = builder.AddCompletionQueue();
m_server = builder.BuildAndStart();
falco_logger::log(LOG_INFO, "Starting gRPC server at " + m_server_addr + "\n");
// Create context for server threads
// The number of contexts is multiple of the number of threads
// This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC)
// For this approach to be sufficient server::IMPL have to be fast
int context_count = m_threadiness * 10;
PROCESS_STREAM(request, response, subscribe, subscribe, context_count)
int context_num = m_threadiness * 10;
REGISTER_STREAM(output::request, output::response, subscribe, subscribe, context_num)
m_threads.resize(m_threadiness);
int thread_idx = 0;

View File

@@ -48,7 +48,7 @@ public:
void run();
void stop();
service::AsyncService m_svc;
output::service::AsyncService m_svc;
std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue;
private:
@@ -97,7 +97,7 @@ public:
void (server::*m_process_func)(const stream_context&, const Request&, Response&);
// Pointer to function that requests the system to start processing given requests
void (service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void (output::service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(server* srv);
void process(server* srv);

View File

@@ -17,6 +17,7 @@ limitations under the License.
*/
#include "grpc_server_impl.h"
#include "falco_output_queue.h"
bool falco::grpc::server_impl::is_running()
{
@@ -39,12 +40,12 @@ void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output
{
// Start (or continue) streaming
// ctx.m_status == stream_context::STREAMING
if(falco_output_queue::get().try_pop(res) && !req.keepalive())
if(output::queue::get().try_pop(res) && !req.keepalive())
{
ctx.m_has_more = true;
return;
}
while(is_running() && !falco_output_queue::get().try_pop(res) && req.keepalive())
while(is_running() && !output::queue::get().try_pop(res) && req.keepalive())
{
}

View File

@@ -19,8 +19,6 @@ limitations under the License.
#pragma once
#include <atomic>
#include "falco_output_queue.h"
#include "falco_output.grpc.pb.h"
#include "grpc_context.h"

View File

@@ -0,0 +1,42 @@
syntax = "proto3";
package falco.schema;
enum priority {
option allow_alias = true;
EMERGENCY = 0;
emergency = 0;
Emergency = 0;
ALERT = 1;
alert = 1;
Alert = 1;
CRITICAL = 2;
critical = 2;
Critical = 2;
ERROR = 3;
error = 3;
Error = 3;
WARNING = 4;
warning = 4;
Warning = 4;
NOTICE = 5;
notice = 5;
Notice = 5;
INFORMATIONAL = 6;
informational = 6;
Informational = 6;
DEBUG = 7;
debug = 7;
Debug = 7;
}
enum source {
option allow_alias = true;
SYSCALL = 0;
syscall = 0;
Syscall = 0;
K8S_AUDIT = 1;
k8s_audit = 1;
K8s_audit = 1;
K8S_audit = 1;
}