Compare commits

...

28 Commits

Author SHA1 Message Date
Leonardo Di Donato
beafd2b868 update(userspace/falco): access peer from context method
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-21 13:37:55 +00:00
Leonardo Di Donato
31bb5c5070 build: refinements to comments (in CMakeLists.txt files and in output to stdout)
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-21 13:37:29 +00:00
Leonardo Di Donato
195b475204 new(userspace/falco): push back header metadata (session and request ID) when received from clients
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-21 13:36:49 +00:00
Leonardo Di Donato
4da9cd3764 wip(userspace/falco): evaluate usage of any protobuf type
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-17 18:48:36 +00:00
Leonardo Di Donato
b7e4913de1 build(userspace/falco): compile inputs proto
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-17 18:47:56 +00:00
Leonardo Di Donato
356188542c chore: output the build type during the build
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-17 15:59:20 +00:00
Leonardo Di Donato
fd7731cf09 new(userspace/falco): initial inputs service implementation
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-17 15:58:52 +00:00
Leonardo Di Donato
b1d33ddf08 new(userspace/falco): initial inputs.input RPC endpoint (unary)
Initial implementation of the start, process, end methods for the unary
version of the Inputs API.

Infact, in some use cases we do not want a streaming API but an unary
one.
Also, having a unary API that accepts repeated events can prove to be
more performant than a streaming one. But this needs to be proven by
numbers.

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-17 15:56:34 +00:00
Leonardo Di Donato
f7c66cbbdc wip(userspace/falco): initial input and event proto files
Atm, these protos try to mimic sinps_event structure. It's very likely,
for performances reasons, decoding reasons, copying reasons, we do not
want them to be so big.

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-17 15:53:58 +00:00
toc-me[bot]
d30df38e4b update(proposals): toc for 20190826-grpc-outputs.md
Co-authored-by: Leonardo Di Donato <leodidonato@gmail.com>
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 12:57:23 +00:00
Leonardo Di Donato
74d1a1f18f update(userspace/falco): use falco::outputs
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 12:54:54 +00:00
Leonardo Di Donato
cc847f53bb build: using newer outputs.proto
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 12:51:20 +00:00
Leonardo Di Donato
051a1a6f74 chore(userspace/falco): renaming output.proto, packages, and RPC name to plural
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 12:50:51 +00:00
Leonardo Di Donato
9c112890d4 update(proposals): naming of Outputs API
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 12:49:29 +00:00
Leonardo Di Donato
8ecf208901 update(userspace/falco): use internal protobuf API for gRPC stream contexts and request contexts
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 01:59:45 +00:00
Leonardo Di Donato
bd3c2ce8e8 build: compile internal protobuf API for gRPC
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 01:56:25 +00:00
Leonardo Di Donato
f49014bbe4 new(userspace/falco): introducing internal protobuf API for gRPC
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 01:55:43 +00:00
Leonardo Di Donato
e4fe9104f3 update(userspace/falco): reuse falco protobuf schema for grpc logging level, too
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 00:45:00 +00:00
Leonardo Di Donato
03df81af23 update(userspace/falco): set gRPC logging severity using Falco logging level (config)
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 00:27:02 +00:00
Leonardo Di Donato
fcb33d32cf fix(userspace/falco): fixing logs without new line
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 00:23:15 +00:00
Leonardo Di Donato
cb1cb5b12c fix(userspace/falco): make log level a project-wide config
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-11 00:22:36 +00:00
Leonardo Di Donato
467f33c5ff update(userspace/falco): log (debug + error) info about gRPC events per thread
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-10 19:51:12 +00:00
Leonardo Di Donato
4e916a7a58 chore(userspace/falco): print debug info for gRPC service implementations
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-10 19:50:10 +00:00
Leonardo Di Donato
325357c465 update(userspace/falco): store a representation of grpc meta into the context
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-10 19:48:59 +00:00
Leonardo Di Donato
0f81e9b95a chore(userspace/falco): log request's context info like tag, state, stream (grpc)
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-10 18:30:18 +00:00
Leonardo Di Donato
8b167bb1d9 chore(userspace/falco): log grpc debug info like session_id, request_id, context status, ...
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-10 18:29:16 +00:00
Leonardo Di Donato
8dba2485e2 update(userspace/falco): make grpc context accessible
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-10 18:28:29 +00:00
Leonardo Di Donato
85cd219682 chore(userspace/falco): enable grpc debug logging verbosity
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
2020-02-10 18:27:33 +00:00
22 changed files with 546 additions and 144 deletions

View File

@@ -18,7 +18,8 @@ option(USE_BUNDLED_DEPS "Bundle hard to find dependencies into the Falco binary"
option(BUILD_WARNINGS_AS_ERRORS "Enable building with -Wextra -Werror flags" OFF)
# Elapsed time
# set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time") # TODO(fntlnz, leodido): add a flag to enable this
# TODO(fntlnz, leodido): add a flag to enable this
# set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time")
# Make flag for parallel processing
include(ProcessorCount)
@@ -48,6 +49,7 @@ else()
set(CMAKE_BUILD_TYPE "release")
set(KBUILD_FLAGS "${DRAIOS_FEATURE_FLAGS}")
endif()
message(STATUS "Build type: ${CMAKE_BUILD_TYPE}")
set(CMAKE_COMMON_FLAGS "-Wall -ggdb ${DRAIOS_FEATURE_FLAGS}")

View File

@@ -16,7 +16,7 @@ set(SYSDIG_CMAKE_WORKING_DIR "${CMAKE_BINARY_DIR}/sysdig-repo")
# this needs to be here at the top
if(USE_BUNDLED_DEPS)
# explicitly force this dependency to use the system OpenSSL
# explicitly force this dependency to use the bundled OpenSSL
set(USE_BUNDLED_OPENSSL ON)
endif()

View File

@@ -1,4 +1,4 @@
# gRPC Falco Output
# gRPC Falco Outputs
<!-- toc -->
@@ -77,18 +77,18 @@ syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "schema.proto";
package falco.output;
package falco.outputs;
option go_package = "github.com/falcosecurity/client-go/pkg/api/output";
option go_package = "github.com/falcosecurity/client-go/pkg/api/outputs";
// The `subscribe` service defines the RPC call
// The `outputs` service defines a server-streaming RPC call
// to perform an output `request` which will lead to obtain an output `response`.
service service {
rpc subscribe(request) returns (stream response);
rpc outputs(request) returns (stream response);
}
// The `request` message is the logical representation of the request model.
// It is the input of the `subscribe` service.
// It is the input of the `outputs` service.
// It is used to configure the kind of subscription to the gRPC streaming server.
message request {
bool keepalive = 1;

View File

@@ -19,24 +19,43 @@ add_custom_command(
${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/version.pb.h
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.pb.h
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.h
${CMAKE_CURRENT_BINARY_DIR}/inputs.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/inputs.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/inputs.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/inputs.pb.h
${CMAKE_CURRENT_BINARY_DIR}/event.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/event.pb.h
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.h
COMMENT "Generate gRPC version API"
${CMAKE_CURRENT_BINARY_DIR}/grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/grpc.pb.h
COMMENT "Generate gRPC API"
# version API
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/version.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/version.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
${CMAKE_CURRENT_SOURCE_DIR}/version.proto
COMMENT "Generate gRPC outputs API"
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/output.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/output.proto
# outputs API
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto ${CMAKE_CURRENT_SOURCE_DIR}/schema.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto
${CMAKE_CURRENT_SOURCE_DIR}/schema.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
${CMAKE_CURRENT_SOURCE_DIR}/output.proto
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
# inputs API
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/inputs.proto ${CMAKE_CURRENT_SOURCE_DIR}/event.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/inputs.proto
${CMAKE_CURRENT_SOURCE_DIR}/event.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
${CMAKE_CURRENT_SOURCE_DIR}/inputs.proto
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
# context API
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/grpc.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/grpc.proto)
add_executable(
falco
@@ -55,9 +74,13 @@ add_executable(
utils.cpp
${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc)
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/inputs.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/inputs.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/event.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/grpc.pb.cc)
add_dependencies(falco civetweb)

View File

@@ -170,9 +170,9 @@ void falco_configuration::init(string conf_filename, list<string> &cmdline_optio
throw logic_error("Error reading config file (" + m_config_file + "): No outputs configured. Please configure at least one output file output enabled but no filename in configuration block");
}
string log_level = m_config->get_scalar<string>("log_level", "info");
m_log_level = m_config->get_scalar<string>("log_level", "info");
falco_logger::set_level(log_level);
falco_logger::set_level(m_log_level);
m_notifications_rate = m_config->get_scalar<uint32_t>("outputs", "rate", 1);
m_notifications_max_burst = m_config->get_scalar<uint32_t>("outputs", "max_burst", 1000);

View File

@@ -195,6 +195,7 @@ public:
std::list<std::string> m_rules_filenames;
bool m_json_output;
bool m_json_include_output_property;
std::string m_log_level;
std::vector<falco_outputs::output_config> m_outputs;
uint32_t m_notifications_rate;
uint32_t m_notifications_max_burst;

159
userspace/falco/event.proto Normal file
View File

@@ -0,0 +1,159 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package falco.event;
// note > from ppm_events_public.h
// (ppm_param_type)
enum param_type {
PT_NONE = 0;
PT_INT8 = 1;
PT_INT16 = 2;
PT_INT32 = 3;
PT_INT64 = 4;
PT_UINT8 = 5;
PT_UINT16 = 6;
PT_UINT32 = 7;
PT_UINT64 = 8;
PT_CHARBUF = 9; // A printable buffer of bytes, NULL terminated
PT_BYTEBUF = 10; // A raw buffer of bytes not suitable for printing
PT_ERRNO = 11; // This is an INT4; but will be interpreted as an error code
PT_SOCKADDR = 12; // A sockaddr structure, 1byte family + data
PT_SOCKTUPLE = 13; // A sockaddr tuple,1byte family + 12byte data + 12byte data
PT_FD = 14; // An fd, 64bit
PT_PID = 15; // A pid/tid, 64bit
PT_FDLIST = 16; // A list of fds, 16bit count + count * (64bit fd + 16bit flags)
PT_FSPATH = 17; // A string containing a relative or absolute file system path, null terminated
PT_SYSCALLID = 18; // A 16bit system call ID that can be used as a key for the g_syscall_info_table table
PT_SIGTYPE = 19; // An 8bit signal number
PT_RELTIME = 20; // A relative time. Seconds * 10^9 + nanoseconds, 64bit
PT_ABSTIME = 21; // An absolute time interval. Seconds from epoch * 10^9 + nanoseconds, 64bit
PT_PORT = 22; // A TCP/UDP port, 2 bytes
PT_L4PROTO = 23; // A 1 byte IP protocol type
PT_SOCKFAMILY = 24; // A 1 byte socket family
PT_BOOL = 25; // A boolean value, 4 bytes
PT_IPV4ADDR = 26; // A 4 byte raw IPv4 address
PT_DYN = 27; // Type can vary depending on the context (used for filter fields like evt.rawarg)
PT_FLAGS8 = 28; // This is an UINT8; but will be interpreted as 8 bit flags
PT_FLAGS16 = 29; // This is an UINT6; but will be interpreted as 16 bit flags
PT_FLAGS32 = 30; // This is an UINT2; but will be interpreted as 32 bit flags
PT_UID = 31; // This is an UINT2; MAX_UINT32 will be interpreted as no value
PT_GID = 32; // This is an UINT2; MAX_UINT32 will be interpreted as no value
PT_DOUBLE = 33; // This is a double precision floating point number
PT_SIGSET = 34; // sigset_t (only the lower UINT32 of it)
PT_CHARBUFARRAY = 35; // Pointer to an array of strings exported by the user events decoder, 64bit (internal use only)
PT_CHARBUF_PAIR_ARRAY = 36; // Pointer to an array of string pairs, exported by the user events decoder, 64bit (internal use only)
PT_IPV4NET = 37; // An IPv4 network
PT_IPV6ADDR = 38; // A 16 byte raw IPv6 address
PT_IPV6NET = 39; // An IPv6 network
PT_IPADDR = 40; // Either an IPv4 or IPv6 address; the length indicateswhich one it is
PT_IPNET = 41; // Either an IPv4 or IPv6 network; the length indicates which one it is
PT_MODE = 42; // A 32 bit bitmask to represent file modes
PT_MAX = 43; // Array size
};
// note > ppm_events_public.h
// (ppm_event_flags)
enum event_flags {
EF_NONE = 0;
EF_CREATES_FD = 1; // this event creates an FD (e.g. open)
EF_DESTROYS_FD = 2; // this event destroys an FD (e.g. close)
EF_USES_FD = 4; // this event operates on an FD
EF_READS_FROM_FD = 8; // this event reads data from an FD
EF_WRITES_TO_FD = 16; // this event writes data to an FD
EF_MODIFIES_STATE = 32; // this event causes the machine state to change and should not be dropped by the filtering engine
EF_UNUSED = 64; // this event is not used
EF_WAITS = 128; // this event reads data from an FD
EF_SKIPPARSERESET = 256; // this event shouldn't pollute the parser lastevent state tracker
EF_OLD_VERSION = 512; // this event is kept for backward compatibility
EF_DROP_SIMPLE_CONS = 1024; // this event can be skipped by consumers that privilege low overhead to full event capture
}
// todo(leodido) > complete
// https://github.com/draios/sysdig/blob/master/driver/ppm_events_public.h
// (ppm_event_type)
enum event_type {
PPME_GENERIC_E = 0;
PPME_GENERIC_X = 1;
}
// todo(leodido) > complete
// https://github.com/draios/sysdig/blob/master/driver/ppm_events_public.h
// (ppm_event_category)
enum event_category {
EC_UNKNOWN = 0;
EC_OTHER = 1;
EC_FILE = 2;
}
message parameter {
string name = 1;
uint32 value = 2;
}
message parameter_info {
string name = 1; // parameter name, e.g. 'size'
param_type type = 2; // parameter type, e.g. 'uint16', 'string'
// print_format fmt = 3;
// ? info = 4;
}
message ppm_event {
string name = 1;
event_category category = 2; // event category, e.g. 'file, 'net'
uint32 flags = 3;
uint32 nparams = 4; // number of parameters in the parameters array
repeated parameter_info params = 5;
}
message scap_event {
google.protobuf.Timestamp ts = 1;
uint64 tid = 2;
event_type type = 3;
uint32 nparams = 4;
}
enum command_category {
CAT_NONE = 0;
CAT_CONTAINER = 1;
CAT_HEALTHCHECK = 2;
CAT_LIVENESS_PROBE = 3;
CAT_READINESS_PROBE = 4;
}
// note > threadinfo.h
message thread_info {
uint64 tid = 1; // id of this thread
uint64 pid = 2; // id of the process containing this thread
uint64 ptid = 3; // id of the process that started this thread
uint64 sid = 4; // session id of the process containing this thread
string comm = 5; // name of the process containing this thread, e.g. "top"
string exe = 6; // name of the process containing this thread from argv[0], e.g. "/bin/top"
string exepath = 7; // full executable path of the process containing this thread, e.g. "/bin/top"
string cwd = 8; // working directory of the process containing this thread
repeated string env = 9; // values of all environment variables for the process containing this thread
repeated string args = 10; // command line arguments, e.g., -d1
// string container_id = 11;
// ...
command_category category = 12;
}
// note > event.h
message event {
scap_event evt = 1;
uint32 cpuid = 2;
event_flags flags = 3; // fixme(leodido) > should this be a uint32?
ppm_event info = 4;
string params = 5;
thread_info tinfo = 6;
// fdinfo = 7
uint32 iosize = 8;
// bool fdinfo_name_changed = 9;
// int64 fd_num = 10;
// uint32 num_params = 11;
// map<uint32, string> param_name = 12;
// map<uint32, string> param_value = 13;
}

View File

@@ -76,7 +76,7 @@ bool syscall_evt_drop_mgr::process_event(sinsp *inspector, sinsp_evt *evt)
if(m_simulate_drops)
{
falco_logger::log(LOG_INFO, "Simulating syscall event drop");
falco_logger::log(LOG_INFO, "Simulating syscall event drop\n");
delta.n_drops++;
}
@@ -94,7 +94,7 @@ bool syscall_evt_drop_mgr::process_event(sinsp *inspector, sinsp_evt *evt)
}
else
{
falco_logger::log(LOG_DEBUG, "Syscall event drop but token bucket depleted, skipping actions");
falco_logger::log(LOG_DEBUG, "Syscall event drop but token bucket depleted, skipping actions\n");
}
}
}
@@ -156,7 +156,7 @@ bool syscall_evt_drop_mgr::perform_actions(uint64_t now, scap_stats &delta, bool
if(should_exit)
{
falco_logger::log(LOG_CRIT, msg);
falco_logger::log(LOG_CRIT, "Exiting.");
falco_logger::log(LOG_CRIT, "Exiting.\n");
return false;
}

View File

@@ -1192,7 +1192,7 @@ int falco_init(int argc, char **argv)
{
// TODO(fntlnz,leodido): when we want to spawn multiple threads we need to have a queue per thread, or implement
// different queuing mechanisms, round robin, fanout? What we want to achieve?
grpc_server.init(config.m_grpc_bind_address, config.m_grpc_threadiness, config.m_grpc_private_key, config.m_grpc_cert_chain, config.m_grpc_root_certs);
grpc_server.init(config.m_grpc_bind_address, config.m_grpc_private_key, config.m_grpc_cert_chain, config.m_grpc_root_certs, config.m_grpc_threadiness, config.m_log_level);
grpc_server_thread = std::thread([&grpc_server] {
grpc_server.run();
});

View File

@@ -16,12 +16,12 @@ limitations under the License.
#pragma once
#include "output.pb.h"
#include "outputs.pb.h"
#include "tbb/concurrent_queue.h"
namespace falco
{
namespace output
namespace outputs
{
typedef tbb::concurrent_queue<response> response_cq;

View File

@@ -26,9 +26,8 @@ limitations under the License.
#include "banned.h"
using namespace std;
using namespace falco::output;
const static struct luaL_reg ll_falco_outputs [] =
const static struct luaL_reg ll_falco_outputs[] =
{
{"handle_http", &falco_outputs::handle_http},
{"handle_grpc", &falco_outputs::handle_grpc},
@@ -57,7 +56,7 @@ falco_outputs::~falco_outputs()
lua_getglobal(m_ls, m_lua_output_cleanup.c_str());
if(!lua_isfunction(m_ls, -1))
{
falco_logger::log(LOG_ERR, std::string("No function ") + m_lua_output_cleanup + " found. ");
falco_logger::log(LOG_ERR, std::string("No function ") + m_lua_output_cleanup + " found.\n");
assert(nullptr == "Missing lua cleanup function in ~falco_outputs");
}
@@ -316,7 +315,7 @@ int falco_outputs::handle_grpc(lua_State *ls)
lua_error(ls);
}
response grpc_res = response();
falco::outputs::response grpc_res = falco::outputs::response();
// time
gen_event *evt = (gen_event *)lua_topointer(ls, 1);
@@ -366,7 +365,7 @@ int falco_outputs::handle_grpc(lua_State *ls)
auto host = grpc_res.mutable_hostname();
*host = (char *)lua_tostring(ls, 7);
falco::output::queue::get().push(grpc_res);
falco::outputs::queue::get().push(grpc_res);
return 1;
}

View File

@@ -0,0 +1,16 @@
syntax = "proto3";
package falco.grpc;
enum stream_status {
STREAMING = 0;
SUCCESS = 1;
ERROR = 2;
}
enum request_state {
UNKNOWN = 0;
REQUEST = 1;
WRITE = 2;
FINISH = 3;
}

View File

@@ -22,32 +22,29 @@ limitations under the License.
falco::grpc::context::context(::grpc::ServerContext* ctx):
m_ctx(ctx)
{
std::string session_id;
std::string request_id;
get_metadata(meta_session, m_session_id);
get_metadata(meta_request, m_request_id);
get_metadata(meta_session, session_id);
get_metadata(meta_request, request_id);
bool has_meta = false;
std::stringstream meta;
if(!session_id.empty())
if(!m_session_id.empty())
{
meta << "[sid=" << session_id << "]";
has_meta = true;
ctx->AddInitialMetadata(meta_session, m_session_id);
meta << "sid=" << m_session_id << "";
}
if(!request_id.empty())
if(!m_request_id.empty())
{
meta << "[rid=" << request_id << "]";
has_meta = true;
}
if(has_meta)
{
meta << " ";
ctx->AddInitialMetadata(meta_request, m_request_id);
meta << ", rid=" << m_request_id << "";
}
m_prefix = meta.str();
}
void falco::grpc::context::context::get_metadata(std::string key, std::string& val)
std::string falco::grpc::context::peer() const
{
return m_ctx->peer();
}
void falco::grpc::context::get_metadata(std::string key, std::string& val)
{
const std::multimap<::grpc::string_ref, ::grpc::string_ref>& client_metadata = m_ctx->client_metadata();
auto it = client_metadata.find(key);
@@ -55,4 +52,4 @@ void falco::grpc::context::context::get_metadata(std::string key, std::string& v
{
val.assign(it->second.data(), it->second.size());
}
}
}

View File

@@ -24,6 +24,8 @@ limitations under the License.
#include <grpc++/grpc++.h>
#endif
#include "grpc.pb.h"
namespace falco
{
namespace grpc
@@ -39,10 +41,14 @@ public:
~context() = default;
void get_metadata(std::string key, std::string& val);
std::string peer() const;
std::string m_prefix; // todo(leodido) > making this read only?
private:
std::string m_session_id;
std::string m_request_id;
::grpc::ServerContext* m_ctx = nullptr;
std::string m_prefix;
};
class stream_context : public context
@@ -52,14 +58,9 @@ public:
context(ctx){};
~stream_context() = default;
enum : char
{
STREAMING = 1,
SUCCESS,
ERROR
} m_status = STREAMING;
stream_status m_status = stream_status::STREAMING;
mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future
mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future (request-specific stream data)
mutable bool m_has_more = false;
};

View File

@@ -24,37 +24,55 @@ namespace grpc
{
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv)
void request_stream_context<falco::outputs::service, falco::outputs::request, falco::outputs::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_state = request_state::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<output::response>(srvctx));
m_res_writer.reset(new ::grpc::ServerAsyncWriter<outputs::response>(srvctx));
m_stream_ctx.reset();
m_req.Clear();
auto cq = srv->m_completion_queue.get();
// todo(leodido) > log "calling m_request_func: tag=this, state=m_state"
(srv->m_output_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
// m_stream_ctx->m_stream = this; // todo(leodido) > save the tag - ie., this - into the stream?
gpr_log(
GPR_DEBUG,
"request_stream_context<outputs>::%s -> m_request_func: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
(srv->m_outputs_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
void request_stream_context<falco::outputs::service, falco::outputs::request, falco::outputs::response>::process(server* srv)
{
// When it is the 1st process call
if(m_state == request_context_base::REQUEST)
if(m_state == request_state::REQUEST)
{
m_state = request_context_base::WRITE;
m_state = request_state::WRITE;
m_stream_ctx.reset(new stream_context(m_srv_ctx.get()));
}
// Processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
gpr_log(
GPR_DEBUG,
"request_stream_context<outputs>::%s -> m_process_func: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
outputs::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // outputs_impl()
// When there are still more responses to stream
if(m_stream_ctx->m_has_more)
{
// todo(leodido) > log "write: tag=this, state=m_state"
gpr_log(
GPR_DEBUG,
"request_stream_context<outputs>::%s -> write: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
m_res_writer->Write(res, this);
}
// No more responses to stream
@@ -62,26 +80,37 @@ void request_stream_context<falco::output::service, falco::output::request, falc
{
// Communicate to the gRPC runtime that we have finished.
// The memory address of "this" instance uniquely identifies the event.
m_state = request_context_base::FINISH;
// todo(leodido) > log "finish: tag=this, state=m_state"
m_state = request_state::FINISH;
gpr_log(
GPR_DEBUG,
"request_stream_context<outputs>::%s -> finish: tag=%p, state=finish",
__func__,
this);
m_res_writer->Finish(::grpc::Status::OK, this);
}
}
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool errored)
void request_stream_context<falco::outputs::service, falco::outputs::request, falco::outputs::response>::end(server* srv, bool errored)
{
if(m_stream_ctx)
{
if(errored)
{
// todo(leodido) > log error "error streaming: tag=this, state=m_state, stream=m_stream_ctx->m_stream"
gpr_log(
GPR_ERROR,
"request_stream_context<outputs>::%s -> error streaming: tag=%p, state=%s, stream=%p",
__func__,
this,
request_state_Name(m_state).c_str(),
m_stream_ctx->m_stream);
}
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
m_stream_ctx->m_status = errored ? stream_status::ERROR : stream_status::SUCCESS;
// Complete the processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
outputs::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // outputs()
}
else
{
@@ -90,7 +119,12 @@ void request_stream_context<falco::output::service, falco::output::request, falc
// So, `m_stream_ctx` is null because it is set into the `process()` function.
// The stream haven't started.
// todo(leodido) > log error "ending streaming: tag=this, state=m_state, stream=null"
gpr_log(
GPR_ERROR,
"%s -> ending streaming: tag=%p, state=%s, stream=never started",
__func__,
this,
request_state_Name(m_state).c_str());
}
// Ask to start processing requests
@@ -100,7 +134,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_state = request_state::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncResponseWriter<version::response>(srvctx));
@@ -109,17 +143,32 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
// Request to start processing given requests.
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request.
// In this way, different contexts can serve different requests concurrently.
gpr_log(
GPR_DEBUG,
"request_context<version>::%s -> m_request_func: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
(srv->m_version_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv)
{
gpr_log(
GPR_DEBUG,
"request_context<version>::%s -> m_process_func: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
// Create empty response
version::response res;
// Call version service implementation
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
// Notify the gRPC runtime that this processing is done
m_state = request_context_base::FINISH;
m_state = request_state::FINISH;
// Using "this"- ie., the memory address of this context - to uniquely identify the event.
m_res_writer->Finish(res, ::grpc::Status::OK, this);
}
@@ -127,8 +176,73 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored)
{
// todo(leodido) > handle processing errors here
if(errored)
{
gpr_log(
GPR_ERROR,
"request_context<version>::%s -> error replying: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
}
// Ask to start processing requests
start(srv);
}
template<>
void falco::grpc::request_context<falco::inputs::service, falco::inputs::request, falco::inputs::response>::start(server* srv)
{
m_state = request_state::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncResponseWriter<inputs::response>(srvctx));
m_req.Clear();
auto cq = srv->m_completion_queue.get();
// Request to start processing given requests.
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request.
// In this way, different contexts can serve different requests concurrently.
gpr_log(
GPR_DEBUG,
"request_context<inputs>::%s -> m_request_func: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
(srv->m_inputs_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void falco::grpc::request_context<falco::inputs::service, falco::inputs::request, falco::inputs::response>::process(server* srv)
{
gpr_log(
GPR_DEBUG,
"request_context<inputs>::%s -> m_process_func: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
inputs::response res;
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
// Notify the gRPC runtime that this processing is done
m_state = request_state::FINISH;
// Using "this"- ie., the memory address of this context - to uniquely identify the event.
m_res_writer->Finish(res, ::grpc::Status::OK, this);
}
template<>
void falco::grpc::request_context<falco::inputs::service, falco::inputs::request, falco::inputs::response>::end(server* srv, bool errored)
{
if(errored)
{
gpr_log(
GPR_ERROR,
"request_context<inputs>::%s -> error replying: tag=%p, state=%s",
__func__,
this,
request_state_Name(m_state).c_str());
}
// Ask to start processing requests
start(srv);
}

View File

@@ -32,13 +32,8 @@ public:
~request_context_base() = default;
std::unique_ptr<::grpc::ServerContext> m_srv_ctx;
enum : char
{
UNKNOWN = 0,
REQUEST,
WRITE,
FINISH
} m_state = UNKNOWN;
request_state m_state = request_state::UNKNOWN;
virtual void start(server* srv) = 0;
virtual void process(server* srv) = 0;
virtual void end(server* srv, bool isError) = 0;

View File

@@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include <unistd.h>
#ifdef GRPC_INCLUDE_IS_GRPCPP
#include <grpcpp/grpcpp.h>
#else
#include <grpc++/grpc++.h>
#endif
#include "logger.h"
#include "grpc_server.h"
#include "grpc_request_context.h"
#include "utils.h"
@@ -52,21 +53,34 @@ void falco::grpc::server::thread_process(int thread_index)
{
if(tag == nullptr)
{
// todo(leodido) > log error "server completion queue error: empty tag"
gpr_log(
GPR_ERROR,
"server::%s -> server completion queue error: tag=(empty)",
__func__);
continue;
}
// Obtain the context for a given tag
request_context_base* ctx = static_cast<request_context_base*>(tag);
// todo(leodido) > log "next event: tag=tag, read_success=event_read_success, state=ctx->m_state"
gpr_log(
GPR_DEBUG,
"server::%s -> next event: tag=%p, read success=%s, state=%s",
__func__,
tag,
event_read_success ? "true" : "false",
request_state_Name(ctx->m_state).c_str());
// When event has not been read successfully
if(!event_read_success)
{
if(ctx->m_state != request_context_base::REQUEST)
if(ctx->m_state != request_state::REQUEST)
{
// todo(leodido) > log error "server completion queue failing to read: tag=tag"
gpr_log(
GPR_ERROR,
"server::%s -> server completion queue failing to read: tag=%p",
__func__,
tag);
// End the context with error
ctx->end(this, true);
@@ -77,39 +91,71 @@ void falco::grpc::server::thread_process(int thread_index)
// Process the event
switch(ctx->m_state)
{
case request_context_base::REQUEST:
case request_state::REQUEST:
// Completion of m_request_func
case request_context_base::WRITE:
case request_state::WRITE:
// Completion of Write()
ctx->process(this);
break;
case request_context_base::FINISH:
case request_state::FINISH:
// Completion of Finish()
ctx->end(this, false);
break;
default:
// todo(leodido) > log error "unkown completion queue event: tag=tag, state=ctx->m_state"
gpr_log(
GPR_ERROR,
"server::%s -> unkown completion queue event: tag=%p, state=%s",
__func__,
tag,
request_state_Name(ctx->m_state).c_str());
break;
}
// todo(leodido) > log "thread completed: index=thread_index"
gpr_log(
GPR_DEBUG,
"server::%s -> thread completed: tag=%p, index=%d",
__func__,
tag,
thread_index);
}
}
void falco::grpc::server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs)
void falco::grpc::server::init(std::string server_addr, std::string private_key, std::string cert_chain, std::string root_certs, int threadiness, std::string log_level)
{
m_server_addr = server_addr;
m_threadiness = threadiness;
m_private_key = private_key;
m_cert_chain = cert_chain;
m_root_certs = root_certs;
falco::schema::priority logging_level = falco::schema::INFORMATIONAL;
falco::schema::priority_Parse(log_level, &logging_level);
switch(logging_level)
{
case falco::schema::ERROR:
gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
break;
case falco::schema::DEBUG:
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
break;
case falco::schema::INFORMATIONAL:
default:
// note > info will always enter here since it is != from "informational"
gpr_set_log_verbosity(GPR_LOG_SEVERITY_INFO);
break;
}
// gpr_set_log_function(custom_log);
gpr_log_verbosity_init();
}
// static void custom_log(gpr_log_func_args* args){};
void falco::grpc::server::run()
{
string private_key;
string cert_chain;
string root_certs;
std::string private_key;
std::string cert_chain;
std::string root_certs;
falco::utils::read(m_cert_chain, cert_chain);
falco::utils::read(m_private_key, private_key);
@@ -123,12 +169,13 @@ void falco::grpc::server::run()
::grpc::ServerBuilder builder;
builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts));
builder.RegisterService(&m_output_svc);
builder.RegisterService(&m_outputs_svc);
builder.RegisterService(&m_version_svc);
builder.RegisterService(&m_inputs_svc);
m_completion_queue = builder.AddCompletionQueue();
m_server = builder.BuildAndStart();
falco_logger::log(LOG_INFO, "Starting gRPC server at " + m_server_addr + "\n");
gpr_log(GPR_INFO, "gRPC server starting: address=%s", m_server_addr.c_str());
// The number of contexts is multiple of the number of threads
// This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC)
@@ -136,8 +183,9 @@ void falco::grpc::server::run()
int context_num = m_threadiness * 10;
// todo(leodido) > take a look at thread_stress_test.cc into grpc repository
REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num)
REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num)
REGISTER_UNARY(version::request, version::response, version::service, version, version_impl, context_num)
REGISTER_UNARY(inputs::request, inputs::response, inputs::service, input, input_impl, context_num)
REGISTER_STREAM(outputs::request, outputs::response, outputs::service, outputs, outputs_impl, context_num)
m_threads.resize(m_threadiness);
int thread_idx = 0;
@@ -145,23 +193,23 @@ void falco::grpc::server::run()
{
thread = std::thread(&server::thread_process, this, thread_idx++);
}
// todo(leodido) > log "gRPC server running: threadiness=m_threads.size()"
gpr_log(GPR_INFO, "gRPC server running: threadiness=%zu", m_threads.size());
while(server_impl::is_running())
{
sleep(1);
}
// todo(leodido) > log "stopping gRPC server"
gpr_log(GPR_INFO, "gRPC server stopping");
stop();
}
void falco::grpc::server::stop()
{
falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n");
gpr_log(GPR_INFO, "gRPC server shutting down");
m_server->Shutdown();
m_completion_queue->Shutdown();
falco_logger::log(LOG_INFO, "Waiting for the gRPC threads to complete\n");
gpr_log(GPR_DEBUG, "gRPC server shutting down: waiting for the gRPC threads to complete");
for(std::thread& t : m_threads)
{
if(t.joinable())
@@ -171,7 +219,7 @@ void falco::grpc::server::stop()
}
m_threads.clear();
falco_logger::log(LOG_INFO, "Draining all the remaining gRPC events\n");
gpr_log(GPR_DEBUG, "gRPC server shutting down: draining all the remaining gRPC events");
// Ignore remaining events
void* ignore_tag = nullptr;
bool ignore_ok = false;
@@ -179,5 +227,5 @@ void falco::grpc::server::stop()
{
}
falco_logger::log(LOG_INFO, "Shutting down gRPC server complete\n");
gpr_log(GPR_INFO, "gRPC server shutting down: done");
}

View File

@@ -29,26 +29,17 @@ namespace grpc
class server : public server_impl
{
public:
server()
{
}
server(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs):
m_server_addr(server_addr),
m_threadiness(threadiness),
m_private_key(private_key),
m_cert_chain(cert_chain),
m_root_certs(root_certs)
{
}
server() = default;
virtual ~server() = default;
void init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs);
void init(std::string server_addr, std::string private_key, std::string cert_chain, std::string root_certs, int threadiness, std::string log_level);
void thread_process(int thread_index);
void run();
void stop();
output::service::AsyncService m_output_svc;
outputs::service::AsyncService m_outputs_svc;
version::service::AsyncService m_version_svc;
inputs::service::AsyncService m_inputs_svc;
std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue;

View File

@@ -28,24 +28,42 @@ bool falco::grpc::server_impl::is_running()
return true;
}
void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output::request& req, output::response& res)
void falco::grpc::server_impl::outputs_impl(const stream_context& ctx, const outputs::request& req, outputs::response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
std::string client = ctx.peer();
if(ctx.m_status == stream_status::SUCCESS || ctx.m_status == stream_status::ERROR)
{
// todo(leodido) > log "status=ctx->m_status, stream=ctx->m_stream"
// Entering here when the streaming completed (request_context_base::FINISH)
// context m_status == stream_context::SUCCESS when the gRPC server shutdown the context
// context m_status == stream_context::ERROR when the gRPC client shutdown the context
gpr_log(
GPR_DEBUG,
"server_impl::%s -> streaming done: %s, client=%s, status=%s, stream=%p",
__func__,
ctx.m_prefix.c_str(),
client.c_str(),
stream_status_Name(ctx.m_status).c_str(),
ctx.m_stream);
ctx.m_stream = nullptr;
}
else
{
// Start or continue streaming
// todo(leodido) > check for m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
if(output::queue::get().try_pop(res) && !req.keepalive())
// Start or continue streaming (m_status == stream_context::STREAMING)
gpr_log(
GPR_DEBUG,
"server_impl::%s -> start or continue streaming: %s, client=%s, status=%s, stream=%p",
__func__,
ctx.m_prefix.c_str(),
client.c_str(),
stream_status_Name(ctx.m_status).c_str(),
ctx.m_stream);
// note(leodido) > set request-specific data on m_stream here, in case it is needed
if(outputs::queue::get().try_pop(res) && !req.keepalive())
{
ctx.m_has_more = true;
return;
}
while(is_running() && !output::queue::get().try_pop(res) && req.keepalive())
while(is_running() && !outputs::queue::get().try_pop(res) && req.keepalive())
{
}
@@ -53,8 +71,10 @@ void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output
}
}
void falco::grpc::server_impl::version(const context& ctx, const version::request&, version::response& res)
void falco::grpc::server_impl::version_impl(const context& ctx, const version::request& req, version::response& res)
{
gpr_log(GPR_DEBUG, "server_impl::%s -> replying: %s, client=%s", __func__, ctx.m_prefix.c_str(), ctx.peer().c_str());
auto& build = *res.mutable_build();
build = FALCO_VERSION_BUILD;
@@ -69,6 +89,18 @@ void falco::grpc::server_impl::version(const context& ctx, const version::reques
res.set_patch(FALCO_VERSION_PATCH);
}
void falco::grpc::server_impl::input_impl(const context& ctx, const inputs::request& req, inputs::response& res)
{
std::string client = ctx.peer();
gpr_log(GPR_DEBUG, "server_impl::%s -> replying: %s, client=%s", __func__, ctx.m_prefix.c_str(), client.c_str());
// todo(leodido) > implement
// retrieve metadata
// if type = K8S_AUDIT
// ...
// if type = SYSCALL
// ...
}
void falco::grpc::server_impl::shutdown()
{
m_stop = true;

View File

@@ -17,8 +17,9 @@ limitations under the License.
#pragma once
#include <atomic>
#include "output.grpc.pb.h"
#include "outputs.grpc.pb.h"
#include "version.grpc.pb.h"
#include "inputs.grpc.pb.h"
#include "grpc_context.h"
namespace falco
@@ -36,9 +37,11 @@ public:
protected:
bool is_running();
void subscribe(const stream_context& ctx, const output::request& req, output::response& res);
void outputs_impl(const stream_context& ctx, const outputs::request& req, outputs::response& res);
void version(const context& ctx, const version::request& req, version::response& res);
void version_impl(const context& ctx, const version::request& req, version::response& res);
void input_impl(const context& ctx, const inputs::request& req, inputs::response& res);
private:
std::atomic<bool> m_stop{false};

View File

@@ -0,0 +1,21 @@
syntax = "proto3";
// import "event.proto";
import "google/protobuf/any.proto";
package falco.inputs;
option go_package = "github.com/falcosecurity/client-go/pkg/api/inputs";
// service service { rpc input(request) returns (response); }
// message request { repeated falco.event.event data = 1; }
// message response {};
service service { rpc input(request) returns (response); }
message request { repeated google.protobuf.Any events = 1; }
message response {};

View File

@@ -3,18 +3,18 @@ syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "schema.proto";
package falco.output;
package falco.outputs;
option go_package = "github.com/falcosecurity/client-go/pkg/api/output";
option go_package = "github.com/falcosecurity/client-go/pkg/api/outputs";
// The `subscribe` service defines the RPC call
// The `outputs` service defines a server-streaming RPC call
// to perform an output `request` which will lead to obtain an output `response`.
service service {
rpc subscribe(request) returns (stream response);
rpc outputs(request) returns (stream response);
}
// The `request` message is the logical representation of the request model.
// It is the input of the `subscribe` service.
// It is the input of the `outputs` service.
// It is used to configure the kind of subscription to the gRPC streaming server.
//
// By default the request asks to the server to only receive the accumulated events.