mirror of
https://github.com/falcosecurity/falco.git
synced 2026-03-20 11:42:06 +00:00
Compare commits
28 Commits
test/ci
...
feature/in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
beafd2b868 | ||
|
|
31bb5c5070 | ||
|
|
195b475204 | ||
|
|
4da9cd3764 | ||
|
|
b7e4913de1 | ||
|
|
356188542c | ||
|
|
fd7731cf09 | ||
|
|
b1d33ddf08 | ||
|
|
f7c66cbbdc | ||
|
|
d30df38e4b | ||
|
|
74d1a1f18f | ||
|
|
cc847f53bb | ||
|
|
051a1a6f74 | ||
|
|
9c112890d4 | ||
|
|
8ecf208901 | ||
|
|
bd3c2ce8e8 | ||
|
|
f49014bbe4 | ||
|
|
e4fe9104f3 | ||
|
|
03df81af23 | ||
|
|
fcb33d32cf | ||
|
|
cb1cb5b12c | ||
|
|
467f33c5ff | ||
|
|
4e916a7a58 | ||
|
|
325357c465 | ||
|
|
0f81e9b95a | ||
|
|
8b167bb1d9 | ||
|
|
8dba2485e2 | ||
|
|
85cd219682 |
@@ -18,7 +18,8 @@ option(USE_BUNDLED_DEPS "Bundle hard to find dependencies into the Falco binary"
|
||||
option(BUILD_WARNINGS_AS_ERRORS "Enable building with -Wextra -Werror flags" OFF)
|
||||
|
||||
# Elapsed time
|
||||
# set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time") # TODO(fntlnz, leodido): add a flag to enable this
|
||||
# TODO(fntlnz, leodido): add a flag to enable this
|
||||
# set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time")
|
||||
|
||||
# Make flag for parallel processing
|
||||
include(ProcessorCount)
|
||||
@@ -48,6 +49,7 @@ else()
|
||||
set(CMAKE_BUILD_TYPE "release")
|
||||
set(KBUILD_FLAGS "${DRAIOS_FEATURE_FLAGS}")
|
||||
endif()
|
||||
message(STATUS "Build type: ${CMAKE_BUILD_TYPE}")
|
||||
|
||||
set(CMAKE_COMMON_FLAGS "-Wall -ggdb ${DRAIOS_FEATURE_FLAGS}")
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ set(SYSDIG_CMAKE_WORKING_DIR "${CMAKE_BINARY_DIR}/sysdig-repo")
|
||||
|
||||
# this needs to be here at the top
|
||||
if(USE_BUNDLED_DEPS)
|
||||
# explicitly force this dependency to use the system OpenSSL
|
||||
# explicitly force this dependency to use the bundled OpenSSL
|
||||
set(USE_BUNDLED_OPENSSL ON)
|
||||
endif()
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# gRPC Falco Output
|
||||
# gRPC Falco Outputs
|
||||
|
||||
<!-- toc -->
|
||||
|
||||
@@ -77,18 +77,18 @@ syntax = "proto3";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "schema.proto";
|
||||
|
||||
package falco.output;
|
||||
package falco.outputs;
|
||||
|
||||
option go_package = "github.com/falcosecurity/client-go/pkg/api/output";
|
||||
option go_package = "github.com/falcosecurity/client-go/pkg/api/outputs";
|
||||
|
||||
// The `subscribe` service defines the RPC call
|
||||
// The `outputs` service defines a server-streaming RPC call
|
||||
// to perform an output `request` which will lead to obtain an output `response`.
|
||||
service service {
|
||||
rpc subscribe(request) returns (stream response);
|
||||
rpc outputs(request) returns (stream response);
|
||||
}
|
||||
|
||||
// The `request` message is the logical representation of the request model.
|
||||
// It is the input of the `subscribe` service.
|
||||
// It is the input of the `outputs` service.
|
||||
// It is used to configure the kind of subscription to the gRPC streaming server.
|
||||
message request {
|
||||
bool keepalive = 1;
|
||||
|
||||
@@ -19,24 +19,43 @@ add_custom_command(
|
||||
${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/version.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/output.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/inputs.grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/inputs.grpc.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/inputs.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/inputs.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/event.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/event.pb.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.h
|
||||
COMMENT "Generate gRPC version API"
|
||||
${CMAKE_CURRENT_BINARY_DIR}/grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/grpc.pb.h
|
||||
COMMENT "Generate gRPC API"
|
||||
# version API
|
||||
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/version.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/version.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/version.proto
|
||||
COMMENT "Generate gRPC outputs API"
|
||||
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/output.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/output.proto
|
||||
# outputs API
|
||||
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto ${CMAKE_CURRENT_SOURCE_DIR}/schema.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/schema.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/output.proto
|
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto
|
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
|
||||
# inputs API
|
||||
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/inputs.proto ${CMAKE_CURRENT_SOURCE_DIR}/event.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/inputs.proto
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/event.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/inputs.proto
|
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
|
||||
# context API
|
||||
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/grpc.proto
|
||||
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/grpc.proto)
|
||||
|
||||
add_executable(
|
||||
falco
|
||||
@@ -55,9 +74,13 @@ add_executable(
|
||||
utils.cpp
|
||||
${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc)
|
||||
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/inputs.grpc.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/inputs.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/event.pb.cc
|
||||
${CMAKE_CURRENT_BINARY_DIR}/grpc.pb.cc)
|
||||
|
||||
add_dependencies(falco civetweb)
|
||||
|
||||
|
||||
@@ -170,9 +170,9 @@ void falco_configuration::init(string conf_filename, list<string> &cmdline_optio
|
||||
throw logic_error("Error reading config file (" + m_config_file + "): No outputs configured. Please configure at least one output file output enabled but no filename in configuration block");
|
||||
}
|
||||
|
||||
string log_level = m_config->get_scalar<string>("log_level", "info");
|
||||
m_log_level = m_config->get_scalar<string>("log_level", "info");
|
||||
|
||||
falco_logger::set_level(log_level);
|
||||
falco_logger::set_level(m_log_level);
|
||||
|
||||
m_notifications_rate = m_config->get_scalar<uint32_t>("outputs", "rate", 1);
|
||||
m_notifications_max_burst = m_config->get_scalar<uint32_t>("outputs", "max_burst", 1000);
|
||||
|
||||
@@ -195,6 +195,7 @@ public:
|
||||
std::list<std::string> m_rules_filenames;
|
||||
bool m_json_output;
|
||||
bool m_json_include_output_property;
|
||||
std::string m_log_level;
|
||||
std::vector<falco_outputs::output_config> m_outputs;
|
||||
uint32_t m_notifications_rate;
|
||||
uint32_t m_notifications_max_burst;
|
||||
|
||||
159
userspace/falco/event.proto
Normal file
159
userspace/falco/event.proto
Normal file
@@ -0,0 +1,159 @@
|
||||
syntax = "proto3";
|
||||
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
package falco.event;
|
||||
|
||||
// note > from ppm_events_public.h
|
||||
// (ppm_param_type)
|
||||
enum param_type {
|
||||
PT_NONE = 0;
|
||||
PT_INT8 = 1;
|
||||
PT_INT16 = 2;
|
||||
PT_INT32 = 3;
|
||||
PT_INT64 = 4;
|
||||
PT_UINT8 = 5;
|
||||
PT_UINT16 = 6;
|
||||
PT_UINT32 = 7;
|
||||
PT_UINT64 = 8;
|
||||
PT_CHARBUF = 9; // A printable buffer of bytes, NULL terminated
|
||||
PT_BYTEBUF = 10; // A raw buffer of bytes not suitable for printing
|
||||
PT_ERRNO = 11; // This is an INT4; but will be interpreted as an error code
|
||||
PT_SOCKADDR = 12; // A sockaddr structure, 1byte family + data
|
||||
PT_SOCKTUPLE = 13; // A sockaddr tuple,1byte family + 12byte data + 12byte data
|
||||
PT_FD = 14; // An fd, 64bit
|
||||
PT_PID = 15; // A pid/tid, 64bit
|
||||
PT_FDLIST = 16; // A list of fds, 16bit count + count * (64bit fd + 16bit flags)
|
||||
PT_FSPATH = 17; // A string containing a relative or absolute file system path, null terminated
|
||||
PT_SYSCALLID = 18; // A 16bit system call ID that can be used as a key for the g_syscall_info_table table
|
||||
PT_SIGTYPE = 19; // An 8bit signal number
|
||||
PT_RELTIME = 20; // A relative time. Seconds * 10^9 + nanoseconds, 64bit
|
||||
PT_ABSTIME = 21; // An absolute time interval. Seconds from epoch * 10^9 + nanoseconds, 64bit
|
||||
PT_PORT = 22; // A TCP/UDP port, 2 bytes
|
||||
PT_L4PROTO = 23; // A 1 byte IP protocol type
|
||||
PT_SOCKFAMILY = 24; // A 1 byte socket family
|
||||
PT_BOOL = 25; // A boolean value, 4 bytes
|
||||
PT_IPV4ADDR = 26; // A 4 byte raw IPv4 address
|
||||
PT_DYN = 27; // Type can vary depending on the context (used for filter fields like evt.rawarg)
|
||||
PT_FLAGS8 = 28; // This is an UINT8; but will be interpreted as 8 bit flags
|
||||
PT_FLAGS16 = 29; // This is an UINT6; but will be interpreted as 16 bit flags
|
||||
PT_FLAGS32 = 30; // This is an UINT2; but will be interpreted as 32 bit flags
|
||||
PT_UID = 31; // This is an UINT2; MAX_UINT32 will be interpreted as no value
|
||||
PT_GID = 32; // This is an UINT2; MAX_UINT32 will be interpreted as no value
|
||||
PT_DOUBLE = 33; // This is a double precision floating point number
|
||||
PT_SIGSET = 34; // sigset_t (only the lower UINT32 of it)
|
||||
PT_CHARBUFARRAY = 35; // Pointer to an array of strings exported by the user events decoder, 64bit (internal use only)
|
||||
PT_CHARBUF_PAIR_ARRAY = 36; // Pointer to an array of string pairs, exported by the user events decoder, 64bit (internal use only)
|
||||
PT_IPV4NET = 37; // An IPv4 network
|
||||
PT_IPV6ADDR = 38; // A 16 byte raw IPv6 address
|
||||
PT_IPV6NET = 39; // An IPv6 network
|
||||
PT_IPADDR = 40; // Either an IPv4 or IPv6 address; the length indicateswhich one it is
|
||||
PT_IPNET = 41; // Either an IPv4 or IPv6 network; the length indicates which one it is
|
||||
PT_MODE = 42; // A 32 bit bitmask to represent file modes
|
||||
PT_MAX = 43; // Array size
|
||||
};
|
||||
|
||||
// note > ppm_events_public.h
|
||||
// (ppm_event_flags)
|
||||
enum event_flags {
|
||||
EF_NONE = 0;
|
||||
EF_CREATES_FD = 1; // this event creates an FD (e.g. open)
|
||||
EF_DESTROYS_FD = 2; // this event destroys an FD (e.g. close)
|
||||
EF_USES_FD = 4; // this event operates on an FD
|
||||
EF_READS_FROM_FD = 8; // this event reads data from an FD
|
||||
EF_WRITES_TO_FD = 16; // this event writes data to an FD
|
||||
EF_MODIFIES_STATE = 32; // this event causes the machine state to change and should not be dropped by the filtering engine
|
||||
EF_UNUSED = 64; // this event is not used
|
||||
EF_WAITS = 128; // this event reads data from an FD
|
||||
EF_SKIPPARSERESET = 256; // this event shouldn't pollute the parser lastevent state tracker
|
||||
EF_OLD_VERSION = 512; // this event is kept for backward compatibility
|
||||
EF_DROP_SIMPLE_CONS = 1024; // this event can be skipped by consumers that privilege low overhead to full event capture
|
||||
}
|
||||
|
||||
// todo(leodido) > complete
|
||||
// https://github.com/draios/sysdig/blob/master/driver/ppm_events_public.h
|
||||
// (ppm_event_type)
|
||||
enum event_type {
|
||||
PPME_GENERIC_E = 0;
|
||||
PPME_GENERIC_X = 1;
|
||||
}
|
||||
|
||||
// todo(leodido) > complete
|
||||
// https://github.com/draios/sysdig/blob/master/driver/ppm_events_public.h
|
||||
// (ppm_event_category)
|
||||
enum event_category {
|
||||
EC_UNKNOWN = 0;
|
||||
EC_OTHER = 1;
|
||||
EC_FILE = 2;
|
||||
}
|
||||
|
||||
message parameter {
|
||||
string name = 1;
|
||||
uint32 value = 2;
|
||||
}
|
||||
|
||||
message parameter_info {
|
||||
string name = 1; // parameter name, e.g. 'size'
|
||||
param_type type = 2; // parameter type, e.g. 'uint16', 'string'
|
||||
// print_format fmt = 3;
|
||||
// ? info = 4;
|
||||
}
|
||||
|
||||
message ppm_event {
|
||||
string name = 1;
|
||||
event_category category = 2; // event category, e.g. 'file, 'net'
|
||||
uint32 flags = 3;
|
||||
uint32 nparams = 4; // number of parameters in the parameters array
|
||||
repeated parameter_info params = 5;
|
||||
}
|
||||
|
||||
message scap_event {
|
||||
google.protobuf.Timestamp ts = 1;
|
||||
uint64 tid = 2;
|
||||
event_type type = 3;
|
||||
uint32 nparams = 4;
|
||||
}
|
||||
|
||||
enum command_category {
|
||||
CAT_NONE = 0;
|
||||
CAT_CONTAINER = 1;
|
||||
CAT_HEALTHCHECK = 2;
|
||||
CAT_LIVENESS_PROBE = 3;
|
||||
CAT_READINESS_PROBE = 4;
|
||||
}
|
||||
|
||||
// note > threadinfo.h
|
||||
message thread_info {
|
||||
uint64 tid = 1; // id of this thread
|
||||
uint64 pid = 2; // id of the process containing this thread
|
||||
uint64 ptid = 3; // id of the process that started this thread
|
||||
uint64 sid = 4; // session id of the process containing this thread
|
||||
string comm = 5; // name of the process containing this thread, e.g. "top"
|
||||
string exe = 6; // name of the process containing this thread from argv[0], e.g. "/bin/top"
|
||||
string exepath = 7; // full executable path of the process containing this thread, e.g. "/bin/top"
|
||||
string cwd = 8; // working directory of the process containing this thread
|
||||
repeated string env = 9; // values of all environment variables for the process containing this thread
|
||||
repeated string args = 10; // command line arguments, e.g., -d1
|
||||
// string container_id = 11;
|
||||
// ...
|
||||
command_category category = 12;
|
||||
}
|
||||
|
||||
// note > event.h
|
||||
message event {
|
||||
scap_event evt = 1;
|
||||
uint32 cpuid = 2;
|
||||
event_flags flags = 3; // fixme(leodido) > should this be a uint32?
|
||||
ppm_event info = 4;
|
||||
string params = 5;
|
||||
thread_info tinfo = 6;
|
||||
// fdinfo = 7
|
||||
uint32 iosize = 8;
|
||||
|
||||
|
||||
// bool fdinfo_name_changed = 9;
|
||||
// int64 fd_num = 10;
|
||||
// uint32 num_params = 11;
|
||||
// map<uint32, string> param_name = 12;
|
||||
// map<uint32, string> param_value = 13;
|
||||
}
|
||||
@@ -76,7 +76,7 @@ bool syscall_evt_drop_mgr::process_event(sinsp *inspector, sinsp_evt *evt)
|
||||
|
||||
if(m_simulate_drops)
|
||||
{
|
||||
falco_logger::log(LOG_INFO, "Simulating syscall event drop");
|
||||
falco_logger::log(LOG_INFO, "Simulating syscall event drop\n");
|
||||
delta.n_drops++;
|
||||
}
|
||||
|
||||
@@ -94,7 +94,7 @@ bool syscall_evt_drop_mgr::process_event(sinsp *inspector, sinsp_evt *evt)
|
||||
}
|
||||
else
|
||||
{
|
||||
falco_logger::log(LOG_DEBUG, "Syscall event drop but token bucket depleted, skipping actions");
|
||||
falco_logger::log(LOG_DEBUG, "Syscall event drop but token bucket depleted, skipping actions\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -156,7 +156,7 @@ bool syscall_evt_drop_mgr::perform_actions(uint64_t now, scap_stats &delta, bool
|
||||
if(should_exit)
|
||||
{
|
||||
falco_logger::log(LOG_CRIT, msg);
|
||||
falco_logger::log(LOG_CRIT, "Exiting.");
|
||||
falco_logger::log(LOG_CRIT, "Exiting.\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -1192,7 +1192,7 @@ int falco_init(int argc, char **argv)
|
||||
{
|
||||
// TODO(fntlnz,leodido): when we want to spawn multiple threads we need to have a queue per thread, or implement
|
||||
// different queuing mechanisms, round robin, fanout? What we want to achieve?
|
||||
grpc_server.init(config.m_grpc_bind_address, config.m_grpc_threadiness, config.m_grpc_private_key, config.m_grpc_cert_chain, config.m_grpc_root_certs);
|
||||
grpc_server.init(config.m_grpc_bind_address, config.m_grpc_private_key, config.m_grpc_cert_chain, config.m_grpc_root_certs, config.m_grpc_threadiness, config.m_log_level);
|
||||
grpc_server_thread = std::thread([&grpc_server] {
|
||||
grpc_server.run();
|
||||
});
|
||||
|
||||
@@ -16,12 +16,12 @@ limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "output.pb.h"
|
||||
#include "outputs.pb.h"
|
||||
#include "tbb/concurrent_queue.h"
|
||||
|
||||
namespace falco
|
||||
{
|
||||
namespace output
|
||||
namespace outputs
|
||||
{
|
||||
typedef tbb::concurrent_queue<response> response_cq;
|
||||
|
||||
|
||||
@@ -26,9 +26,8 @@ limitations under the License.
|
||||
#include "banned.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace falco::output;
|
||||
|
||||
const static struct luaL_reg ll_falco_outputs [] =
|
||||
const static struct luaL_reg ll_falco_outputs[] =
|
||||
{
|
||||
{"handle_http", &falco_outputs::handle_http},
|
||||
{"handle_grpc", &falco_outputs::handle_grpc},
|
||||
@@ -57,7 +56,7 @@ falco_outputs::~falco_outputs()
|
||||
lua_getglobal(m_ls, m_lua_output_cleanup.c_str());
|
||||
if(!lua_isfunction(m_ls, -1))
|
||||
{
|
||||
falco_logger::log(LOG_ERR, std::string("No function ") + m_lua_output_cleanup + " found. ");
|
||||
falco_logger::log(LOG_ERR, std::string("No function ") + m_lua_output_cleanup + " found.\n");
|
||||
assert(nullptr == "Missing lua cleanup function in ~falco_outputs");
|
||||
}
|
||||
|
||||
@@ -316,7 +315,7 @@ int falco_outputs::handle_grpc(lua_State *ls)
|
||||
lua_error(ls);
|
||||
}
|
||||
|
||||
response grpc_res = response();
|
||||
falco::outputs::response grpc_res = falco::outputs::response();
|
||||
|
||||
// time
|
||||
gen_event *evt = (gen_event *)lua_topointer(ls, 1);
|
||||
@@ -366,7 +365,7 @@ int falco_outputs::handle_grpc(lua_State *ls)
|
||||
auto host = grpc_res.mutable_hostname();
|
||||
*host = (char *)lua_tostring(ls, 7);
|
||||
|
||||
falco::output::queue::get().push(grpc_res);
|
||||
falco::outputs::queue::get().push(grpc_res);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
16
userspace/falco/grpc.proto
Normal file
16
userspace/falco/grpc.proto
Normal file
@@ -0,0 +1,16 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package falco.grpc;
|
||||
|
||||
enum stream_status {
|
||||
STREAMING = 0;
|
||||
SUCCESS = 1;
|
||||
ERROR = 2;
|
||||
}
|
||||
|
||||
enum request_state {
|
||||
UNKNOWN = 0;
|
||||
REQUEST = 1;
|
||||
WRITE = 2;
|
||||
FINISH = 3;
|
||||
}
|
||||
@@ -22,32 +22,29 @@ limitations under the License.
|
||||
falco::grpc::context::context(::grpc::ServerContext* ctx):
|
||||
m_ctx(ctx)
|
||||
{
|
||||
std::string session_id;
|
||||
std::string request_id;
|
||||
get_metadata(meta_session, m_session_id);
|
||||
get_metadata(meta_request, m_request_id);
|
||||
|
||||
get_metadata(meta_session, session_id);
|
||||
get_metadata(meta_request, request_id);
|
||||
|
||||
bool has_meta = false;
|
||||
std::stringstream meta;
|
||||
if(!session_id.empty())
|
||||
if(!m_session_id.empty())
|
||||
{
|
||||
meta << "[sid=" << session_id << "]";
|
||||
has_meta = true;
|
||||
ctx->AddInitialMetadata(meta_session, m_session_id);
|
||||
meta << "sid=" << m_session_id << "";
|
||||
}
|
||||
if(!request_id.empty())
|
||||
if(!m_request_id.empty())
|
||||
{
|
||||
meta << "[rid=" << request_id << "]";
|
||||
has_meta = true;
|
||||
}
|
||||
if(has_meta)
|
||||
{
|
||||
meta << " ";
|
||||
ctx->AddInitialMetadata(meta_request, m_request_id);
|
||||
meta << ", rid=" << m_request_id << "";
|
||||
}
|
||||
m_prefix = meta.str();
|
||||
}
|
||||
|
||||
void falco::grpc::context::context::get_metadata(std::string key, std::string& val)
|
||||
std::string falco::grpc::context::peer() const
|
||||
{
|
||||
return m_ctx->peer();
|
||||
}
|
||||
|
||||
void falco::grpc::context::get_metadata(std::string key, std::string& val)
|
||||
{
|
||||
const std::multimap<::grpc::string_ref, ::grpc::string_ref>& client_metadata = m_ctx->client_metadata();
|
||||
auto it = client_metadata.find(key);
|
||||
@@ -55,4 +52,4 @@ void falco::grpc::context::context::get_metadata(std::string key, std::string& v
|
||||
{
|
||||
val.assign(it->second.data(), it->second.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,8 @@ limitations under the License.
|
||||
#include <grpc++/grpc++.h>
|
||||
#endif
|
||||
|
||||
#include "grpc.pb.h"
|
||||
|
||||
namespace falco
|
||||
{
|
||||
namespace grpc
|
||||
@@ -39,10 +41,14 @@ public:
|
||||
~context() = default;
|
||||
|
||||
void get_metadata(std::string key, std::string& val);
|
||||
std::string peer() const;
|
||||
|
||||
std::string m_prefix; // todo(leodido) > making this read only?
|
||||
|
||||
private:
|
||||
std::string m_session_id;
|
||||
std::string m_request_id;
|
||||
::grpc::ServerContext* m_ctx = nullptr;
|
||||
std::string m_prefix;
|
||||
};
|
||||
|
||||
class stream_context : public context
|
||||
@@ -52,14 +58,9 @@ public:
|
||||
context(ctx){};
|
||||
~stream_context() = default;
|
||||
|
||||
enum : char
|
||||
{
|
||||
STREAMING = 1,
|
||||
SUCCESS,
|
||||
ERROR
|
||||
} m_status = STREAMING;
|
||||
stream_status m_status = stream_status::STREAMING;
|
||||
|
||||
mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future
|
||||
mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future (request-specific stream data)
|
||||
mutable bool m_has_more = false;
|
||||
};
|
||||
|
||||
|
||||
@@ -24,37 +24,55 @@ namespace grpc
|
||||
{
|
||||
|
||||
template<>
|
||||
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv)
|
||||
void request_stream_context<falco::outputs::service, falco::outputs::request, falco::outputs::response>::start(server* srv)
|
||||
{
|
||||
m_state = request_context_base::REQUEST;
|
||||
m_state = request_state::REQUEST;
|
||||
m_srv_ctx.reset(new ::grpc::ServerContext);
|
||||
auto srvctx = m_srv_ctx.get();
|
||||
m_res_writer.reset(new ::grpc::ServerAsyncWriter<output::response>(srvctx));
|
||||
m_res_writer.reset(new ::grpc::ServerAsyncWriter<outputs::response>(srvctx));
|
||||
m_stream_ctx.reset();
|
||||
m_req.Clear();
|
||||
auto cq = srv->m_completion_queue.get();
|
||||
// todo(leodido) > log "calling m_request_func: tag=this, state=m_state"
|
||||
(srv->m_output_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
|
||||
// m_stream_ctx->m_stream = this; // todo(leodido) > save the tag - ie., this - into the stream?
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_stream_context<outputs>::%s -> m_request_func: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
(srv->m_outputs_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
|
||||
}
|
||||
|
||||
template<>
|
||||
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
|
||||
void request_stream_context<falco::outputs::service, falco::outputs::request, falco::outputs::response>::process(server* srv)
|
||||
{
|
||||
// When it is the 1st process call
|
||||
if(m_state == request_context_base::REQUEST)
|
||||
if(m_state == request_state::REQUEST)
|
||||
{
|
||||
m_state = request_context_base::WRITE;
|
||||
m_state = request_state::WRITE;
|
||||
m_stream_ctx.reset(new stream_context(m_srv_ctx.get()));
|
||||
}
|
||||
|
||||
// Processing
|
||||
output::response res;
|
||||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_stream_context<outputs>::%s -> m_process_func: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
|
||||
outputs::response res;
|
||||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // outputs_impl()
|
||||
|
||||
// When there are still more responses to stream
|
||||
if(m_stream_ctx->m_has_more)
|
||||
{
|
||||
// todo(leodido) > log "write: tag=this, state=m_state"
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_stream_context<outputs>::%s -> write: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
m_res_writer->Write(res, this);
|
||||
}
|
||||
// No more responses to stream
|
||||
@@ -62,26 +80,37 @@ void request_stream_context<falco::output::service, falco::output::request, falc
|
||||
{
|
||||
// Communicate to the gRPC runtime that we have finished.
|
||||
// The memory address of "this" instance uniquely identifies the event.
|
||||
m_state = request_context_base::FINISH;
|
||||
// todo(leodido) > log "finish: tag=this, state=m_state"
|
||||
m_state = request_state::FINISH;
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_stream_context<outputs>::%s -> finish: tag=%p, state=finish",
|
||||
__func__,
|
||||
this);
|
||||
|
||||
m_res_writer->Finish(::grpc::Status::OK, this);
|
||||
}
|
||||
}
|
||||
|
||||
template<>
|
||||
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool errored)
|
||||
void request_stream_context<falco::outputs::service, falco::outputs::request, falco::outputs::response>::end(server* srv, bool errored)
|
||||
{
|
||||
if(m_stream_ctx)
|
||||
{
|
||||
if(errored)
|
||||
{
|
||||
// todo(leodido) > log error "error streaming: tag=this, state=m_state, stream=m_stream_ctx->m_stream"
|
||||
gpr_log(
|
||||
GPR_ERROR,
|
||||
"request_stream_context<outputs>::%s -> error streaming: tag=%p, state=%s, stream=%p",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str(),
|
||||
m_stream_ctx->m_stream);
|
||||
}
|
||||
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
|
||||
m_stream_ctx->m_status = errored ? stream_status::ERROR : stream_status::SUCCESS;
|
||||
|
||||
// Complete the processing
|
||||
output::response res;
|
||||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
|
||||
outputs::response res;
|
||||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // outputs()
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -90,7 +119,12 @@ void request_stream_context<falco::output::service, falco::output::request, falc
|
||||
// So, `m_stream_ctx` is null because it is set into the `process()` function.
|
||||
// The stream haven't started.
|
||||
|
||||
// todo(leodido) > log error "ending streaming: tag=this, state=m_state, stream=null"
|
||||
gpr_log(
|
||||
GPR_ERROR,
|
||||
"%s -> ending streaming: tag=%p, state=%s, stream=never started",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
}
|
||||
|
||||
// Ask to start processing requests
|
||||
@@ -100,7 +134,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
|
||||
template<>
|
||||
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::start(server* srv)
|
||||
{
|
||||
m_state = request_context_base::REQUEST;
|
||||
m_state = request_state::REQUEST;
|
||||
m_srv_ctx.reset(new ::grpc::ServerContext);
|
||||
auto srvctx = m_srv_ctx.get();
|
||||
m_res_writer.reset(new ::grpc::ServerAsyncResponseWriter<version::response>(srvctx));
|
||||
@@ -109,17 +143,32 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
|
||||
// Request to start processing given requests.
|
||||
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request.
|
||||
// In this way, different contexts can serve different requests concurrently.
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_context<version>::%s -> m_request_func: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
(srv->m_version_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
|
||||
}
|
||||
|
||||
template<>
|
||||
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv)
|
||||
{
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_context<version>::%s -> m_process_func: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
|
||||
// Create empty response
|
||||
version::response res;
|
||||
// Call version service implementation
|
||||
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
|
||||
|
||||
// Notify the gRPC runtime that this processing is done
|
||||
m_state = request_context_base::FINISH;
|
||||
m_state = request_state::FINISH;
|
||||
// Using "this"- ie., the memory address of this context - to uniquely identify the event.
|
||||
m_res_writer->Finish(res, ::grpc::Status::OK, this);
|
||||
}
|
||||
@@ -127,8 +176,73 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
|
||||
template<>
|
||||
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored)
|
||||
{
|
||||
// todo(leodido) > handle processing errors here
|
||||
|
||||
if(errored)
|
||||
{
|
||||
gpr_log(
|
||||
GPR_ERROR,
|
||||
"request_context<version>::%s -> error replying: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
}
|
||||
|
||||
// Ask to start processing requests
|
||||
start(srv);
|
||||
}
|
||||
|
||||
template<>
|
||||
void falco::grpc::request_context<falco::inputs::service, falco::inputs::request, falco::inputs::response>::start(server* srv)
|
||||
{
|
||||
m_state = request_state::REQUEST;
|
||||
m_srv_ctx.reset(new ::grpc::ServerContext);
|
||||
auto srvctx = m_srv_ctx.get();
|
||||
m_res_writer.reset(new ::grpc::ServerAsyncResponseWriter<inputs::response>(srvctx));
|
||||
m_req.Clear();
|
||||
auto cq = srv->m_completion_queue.get();
|
||||
// Request to start processing given requests.
|
||||
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request.
|
||||
// In this way, different contexts can serve different requests concurrently.
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_context<inputs>::%s -> m_request_func: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
(srv->m_inputs_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
|
||||
}
|
||||
|
||||
template<>
|
||||
void falco::grpc::request_context<falco::inputs::service, falco::inputs::request, falco::inputs::response>::process(server* srv)
|
||||
{
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"request_context<inputs>::%s -> m_process_func: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
|
||||
inputs::response res;
|
||||
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
|
||||
|
||||
// Notify the gRPC runtime that this processing is done
|
||||
m_state = request_state::FINISH;
|
||||
// Using "this"- ie., the memory address of this context - to uniquely identify the event.
|
||||
m_res_writer->Finish(res, ::grpc::Status::OK, this);
|
||||
}
|
||||
|
||||
template<>
|
||||
void falco::grpc::request_context<falco::inputs::service, falco::inputs::request, falco::inputs::response>::end(server* srv, bool errored)
|
||||
{
|
||||
if(errored)
|
||||
{
|
||||
gpr_log(
|
||||
GPR_ERROR,
|
||||
"request_context<inputs>::%s -> error replying: tag=%p, state=%s",
|
||||
__func__,
|
||||
this,
|
||||
request_state_Name(m_state).c_str());
|
||||
}
|
||||
|
||||
// Ask to start processing requests
|
||||
start(srv);
|
||||
}
|
||||
|
||||
@@ -32,13 +32,8 @@ public:
|
||||
~request_context_base() = default;
|
||||
|
||||
std::unique_ptr<::grpc::ServerContext> m_srv_ctx;
|
||||
enum : char
|
||||
{
|
||||
UNKNOWN = 0,
|
||||
REQUEST,
|
||||
WRITE,
|
||||
FINISH
|
||||
} m_state = UNKNOWN;
|
||||
request_state m_state = request_state::UNKNOWN;
|
||||
|
||||
virtual void start(server* srv) = 0;
|
||||
virtual void process(server* srv) = 0;
|
||||
virtual void end(server* srv, bool isError) = 0;
|
||||
|
||||
@@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#ifdef GRPC_INCLUDE_IS_GRPCPP
|
||||
#include <grpcpp/grpcpp.h>
|
||||
#else
|
||||
#include <grpc++/grpc++.h>
|
||||
#endif
|
||||
|
||||
#include "logger.h"
|
||||
#include "grpc_server.h"
|
||||
#include "grpc_request_context.h"
|
||||
#include "utils.h"
|
||||
@@ -52,21 +53,34 @@ void falco::grpc::server::thread_process(int thread_index)
|
||||
{
|
||||
if(tag == nullptr)
|
||||
{
|
||||
// todo(leodido) > log error "server completion queue error: empty tag"
|
||||
gpr_log(
|
||||
GPR_ERROR,
|
||||
"server::%s -> server completion queue error: tag=(empty)",
|
||||
__func__);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Obtain the context for a given tag
|
||||
request_context_base* ctx = static_cast<request_context_base*>(tag);
|
||||
|
||||
// todo(leodido) > log "next event: tag=tag, read_success=event_read_success, state=ctx->m_state"
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"server::%s -> next event: tag=%p, read success=%s, state=%s",
|
||||
__func__,
|
||||
tag,
|
||||
event_read_success ? "true" : "false",
|
||||
request_state_Name(ctx->m_state).c_str());
|
||||
|
||||
// When event has not been read successfully
|
||||
if(!event_read_success)
|
||||
{
|
||||
if(ctx->m_state != request_context_base::REQUEST)
|
||||
if(ctx->m_state != request_state::REQUEST)
|
||||
{
|
||||
// todo(leodido) > log error "server completion queue failing to read: tag=tag"
|
||||
gpr_log(
|
||||
GPR_ERROR,
|
||||
"server::%s -> server completion queue failing to read: tag=%p",
|
||||
__func__,
|
||||
tag);
|
||||
|
||||
// End the context with error
|
||||
ctx->end(this, true);
|
||||
@@ -77,39 +91,71 @@ void falco::grpc::server::thread_process(int thread_index)
|
||||
// Process the event
|
||||
switch(ctx->m_state)
|
||||
{
|
||||
case request_context_base::REQUEST:
|
||||
case request_state::REQUEST:
|
||||
// Completion of m_request_func
|
||||
case request_context_base::WRITE:
|
||||
case request_state::WRITE:
|
||||
// Completion of Write()
|
||||
ctx->process(this);
|
||||
break;
|
||||
case request_context_base::FINISH:
|
||||
case request_state::FINISH:
|
||||
// Completion of Finish()
|
||||
ctx->end(this, false);
|
||||
break;
|
||||
default:
|
||||
// todo(leodido) > log error "unkown completion queue event: tag=tag, state=ctx->m_state"
|
||||
gpr_log(
|
||||
GPR_ERROR,
|
||||
"server::%s -> unkown completion queue event: tag=%p, state=%s",
|
||||
__func__,
|
||||
tag,
|
||||
request_state_Name(ctx->m_state).c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
// todo(leodido) > log "thread completed: index=thread_index"
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"server::%s -> thread completed: tag=%p, index=%d",
|
||||
__func__,
|
||||
tag,
|
||||
thread_index);
|
||||
}
|
||||
}
|
||||
|
||||
void falco::grpc::server::init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs)
|
||||
void falco::grpc::server::init(std::string server_addr, std::string private_key, std::string cert_chain, std::string root_certs, int threadiness, std::string log_level)
|
||||
{
|
||||
m_server_addr = server_addr;
|
||||
m_threadiness = threadiness;
|
||||
m_private_key = private_key;
|
||||
m_cert_chain = cert_chain;
|
||||
m_root_certs = root_certs;
|
||||
|
||||
falco::schema::priority logging_level = falco::schema::INFORMATIONAL;
|
||||
falco::schema::priority_Parse(log_level, &logging_level);
|
||||
switch(logging_level)
|
||||
{
|
||||
case falco::schema::ERROR:
|
||||
gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
|
||||
break;
|
||||
case falco::schema::DEBUG:
|
||||
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
|
||||
break;
|
||||
case falco::schema::INFORMATIONAL:
|
||||
default:
|
||||
// note > info will always enter here since it is != from "informational"
|
||||
gpr_set_log_verbosity(GPR_LOG_SEVERITY_INFO);
|
||||
break;
|
||||
}
|
||||
|
||||
// gpr_set_log_function(custom_log);
|
||||
gpr_log_verbosity_init();
|
||||
}
|
||||
|
||||
// static void custom_log(gpr_log_func_args* args){};
|
||||
|
||||
void falco::grpc::server::run()
|
||||
{
|
||||
string private_key;
|
||||
string cert_chain;
|
||||
string root_certs;
|
||||
std::string private_key;
|
||||
std::string cert_chain;
|
||||
std::string root_certs;
|
||||
|
||||
falco::utils::read(m_cert_chain, cert_chain);
|
||||
falco::utils::read(m_private_key, private_key);
|
||||
@@ -123,12 +169,13 @@ void falco::grpc::server::run()
|
||||
|
||||
::grpc::ServerBuilder builder;
|
||||
builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts));
|
||||
builder.RegisterService(&m_output_svc);
|
||||
builder.RegisterService(&m_outputs_svc);
|
||||
builder.RegisterService(&m_version_svc);
|
||||
builder.RegisterService(&m_inputs_svc);
|
||||
|
||||
m_completion_queue = builder.AddCompletionQueue();
|
||||
m_server = builder.BuildAndStart();
|
||||
falco_logger::log(LOG_INFO, "Starting gRPC server at " + m_server_addr + "\n");
|
||||
gpr_log(GPR_INFO, "gRPC server starting: address=%s", m_server_addr.c_str());
|
||||
|
||||
// The number of contexts is multiple of the number of threads
|
||||
// This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC)
|
||||
@@ -136,8 +183,9 @@ void falco::grpc::server::run()
|
||||
int context_num = m_threadiness * 10;
|
||||
// todo(leodido) > take a look at thread_stress_test.cc into grpc repository
|
||||
|
||||
REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num)
|
||||
REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num)
|
||||
REGISTER_UNARY(version::request, version::response, version::service, version, version_impl, context_num)
|
||||
REGISTER_UNARY(inputs::request, inputs::response, inputs::service, input, input_impl, context_num)
|
||||
REGISTER_STREAM(outputs::request, outputs::response, outputs::service, outputs, outputs_impl, context_num)
|
||||
|
||||
m_threads.resize(m_threadiness);
|
||||
int thread_idx = 0;
|
||||
@@ -145,23 +193,23 @@ void falco::grpc::server::run()
|
||||
{
|
||||
thread = std::thread(&server::thread_process, this, thread_idx++);
|
||||
}
|
||||
// todo(leodido) > log "gRPC server running: threadiness=m_threads.size()"
|
||||
gpr_log(GPR_INFO, "gRPC server running: threadiness=%zu", m_threads.size());
|
||||
|
||||
while(server_impl::is_running())
|
||||
{
|
||||
sleep(1);
|
||||
}
|
||||
// todo(leodido) > log "stopping gRPC server"
|
||||
gpr_log(GPR_INFO, "gRPC server stopping");
|
||||
stop();
|
||||
}
|
||||
|
||||
void falco::grpc::server::stop()
|
||||
{
|
||||
falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n");
|
||||
gpr_log(GPR_INFO, "gRPC server shutting down");
|
||||
m_server->Shutdown();
|
||||
m_completion_queue->Shutdown();
|
||||
|
||||
falco_logger::log(LOG_INFO, "Waiting for the gRPC threads to complete\n");
|
||||
gpr_log(GPR_DEBUG, "gRPC server shutting down: waiting for the gRPC threads to complete");
|
||||
for(std::thread& t : m_threads)
|
||||
{
|
||||
if(t.joinable())
|
||||
@@ -171,7 +219,7 @@ void falco::grpc::server::stop()
|
||||
}
|
||||
m_threads.clear();
|
||||
|
||||
falco_logger::log(LOG_INFO, "Draining all the remaining gRPC events\n");
|
||||
gpr_log(GPR_DEBUG, "gRPC server shutting down: draining all the remaining gRPC events");
|
||||
// Ignore remaining events
|
||||
void* ignore_tag = nullptr;
|
||||
bool ignore_ok = false;
|
||||
@@ -179,5 +227,5 @@ void falco::grpc::server::stop()
|
||||
{
|
||||
}
|
||||
|
||||
falco_logger::log(LOG_INFO, "Shutting down gRPC server complete\n");
|
||||
gpr_log(GPR_INFO, "gRPC server shutting down: done");
|
||||
}
|
||||
|
||||
@@ -29,26 +29,17 @@ namespace grpc
|
||||
class server : public server_impl
|
||||
{
|
||||
public:
|
||||
server()
|
||||
{
|
||||
}
|
||||
server(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs):
|
||||
m_server_addr(server_addr),
|
||||
m_threadiness(threadiness),
|
||||
m_private_key(private_key),
|
||||
m_cert_chain(cert_chain),
|
||||
m_root_certs(root_certs)
|
||||
{
|
||||
}
|
||||
server() = default;
|
||||
virtual ~server() = default;
|
||||
|
||||
void init(std::string server_addr, int threadiness, std::string private_key, std::string cert_chain, std::string root_certs);
|
||||
void init(std::string server_addr, std::string private_key, std::string cert_chain, std::string root_certs, int threadiness, std::string log_level);
|
||||
void thread_process(int thread_index);
|
||||
void run();
|
||||
void stop();
|
||||
|
||||
output::service::AsyncService m_output_svc;
|
||||
outputs::service::AsyncService m_outputs_svc;
|
||||
version::service::AsyncService m_version_svc;
|
||||
inputs::service::AsyncService m_inputs_svc;
|
||||
|
||||
std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue;
|
||||
|
||||
|
||||
@@ -28,24 +28,42 @@ bool falco::grpc::server_impl::is_running()
|
||||
return true;
|
||||
}
|
||||
|
||||
void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output::request& req, output::response& res)
|
||||
void falco::grpc::server_impl::outputs_impl(const stream_context& ctx, const outputs::request& req, outputs::response& res)
|
||||
{
|
||||
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
|
||||
std::string client = ctx.peer();
|
||||
if(ctx.m_status == stream_status::SUCCESS || ctx.m_status == stream_status::ERROR)
|
||||
{
|
||||
// todo(leodido) > log "status=ctx->m_status, stream=ctx->m_stream"
|
||||
// Entering here when the streaming completed (request_context_base::FINISH)
|
||||
// context m_status == stream_context::SUCCESS when the gRPC server shutdown the context
|
||||
// context m_status == stream_context::ERROR when the gRPC client shutdown the context
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"server_impl::%s -> streaming done: %s, client=%s, status=%s, stream=%p",
|
||||
__func__,
|
||||
ctx.m_prefix.c_str(),
|
||||
client.c_str(),
|
||||
stream_status_Name(ctx.m_status).c_str(),
|
||||
ctx.m_stream);
|
||||
ctx.m_stream = nullptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Start or continue streaming
|
||||
// todo(leodido) > check for m_status == stream_context::STREAMING?
|
||||
// todo(leodido) > set m_stream
|
||||
if(output::queue::get().try_pop(res) && !req.keepalive())
|
||||
// Start or continue streaming (m_status == stream_context::STREAMING)
|
||||
gpr_log(
|
||||
GPR_DEBUG,
|
||||
"server_impl::%s -> start or continue streaming: %s, client=%s, status=%s, stream=%p",
|
||||
__func__,
|
||||
ctx.m_prefix.c_str(),
|
||||
client.c_str(),
|
||||
stream_status_Name(ctx.m_status).c_str(),
|
||||
ctx.m_stream);
|
||||
// note(leodido) > set request-specific data on m_stream here, in case it is needed
|
||||
if(outputs::queue::get().try_pop(res) && !req.keepalive())
|
||||
{
|
||||
ctx.m_has_more = true;
|
||||
return;
|
||||
}
|
||||
while(is_running() && !output::queue::get().try_pop(res) && req.keepalive())
|
||||
while(is_running() && !outputs::queue::get().try_pop(res) && req.keepalive())
|
||||
{
|
||||
}
|
||||
|
||||
@@ -53,8 +71,10 @@ void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output
|
||||
}
|
||||
}
|
||||
|
||||
void falco::grpc::server_impl::version(const context& ctx, const version::request&, version::response& res)
|
||||
void falco::grpc::server_impl::version_impl(const context& ctx, const version::request& req, version::response& res)
|
||||
{
|
||||
gpr_log(GPR_DEBUG, "server_impl::%s -> replying: %s, client=%s", __func__, ctx.m_prefix.c_str(), ctx.peer().c_str());
|
||||
|
||||
auto& build = *res.mutable_build();
|
||||
build = FALCO_VERSION_BUILD;
|
||||
|
||||
@@ -69,6 +89,18 @@ void falco::grpc::server_impl::version(const context& ctx, const version::reques
|
||||
res.set_patch(FALCO_VERSION_PATCH);
|
||||
}
|
||||
|
||||
void falco::grpc::server_impl::input_impl(const context& ctx, const inputs::request& req, inputs::response& res)
|
||||
{
|
||||
std::string client = ctx.peer();
|
||||
gpr_log(GPR_DEBUG, "server_impl::%s -> replying: %s, client=%s", __func__, ctx.m_prefix.c_str(), client.c_str());
|
||||
// todo(leodido) > implement
|
||||
// retrieve metadata
|
||||
// if type = K8S_AUDIT
|
||||
// ...
|
||||
// if type = SYSCALL
|
||||
// ...
|
||||
}
|
||||
|
||||
void falco::grpc::server_impl::shutdown()
|
||||
{
|
||||
m_stop = true;
|
||||
|
||||
@@ -17,8 +17,9 @@ limitations under the License.
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include "output.grpc.pb.h"
|
||||
#include "outputs.grpc.pb.h"
|
||||
#include "version.grpc.pb.h"
|
||||
#include "inputs.grpc.pb.h"
|
||||
#include "grpc_context.h"
|
||||
|
||||
namespace falco
|
||||
@@ -36,9 +37,11 @@ public:
|
||||
protected:
|
||||
bool is_running();
|
||||
|
||||
void subscribe(const stream_context& ctx, const output::request& req, output::response& res);
|
||||
void outputs_impl(const stream_context& ctx, const outputs::request& req, outputs::response& res);
|
||||
|
||||
void version(const context& ctx, const version::request& req, version::response& res);
|
||||
void version_impl(const context& ctx, const version::request& req, version::response& res);
|
||||
|
||||
void input_impl(const context& ctx, const inputs::request& req, inputs::response& res);
|
||||
|
||||
private:
|
||||
std::atomic<bool> m_stop{false};
|
||||
|
||||
21
userspace/falco/inputs.proto
Normal file
21
userspace/falco/inputs.proto
Normal file
@@ -0,0 +1,21 @@
|
||||
syntax = "proto3";
|
||||
|
||||
// import "event.proto";
|
||||
|
||||
import "google/protobuf/any.proto";
|
||||
|
||||
package falco.inputs;
|
||||
|
||||
option go_package = "github.com/falcosecurity/client-go/pkg/api/inputs";
|
||||
|
||||
// service service { rpc input(request) returns (response); }
|
||||
|
||||
// message request { repeated falco.event.event data = 1; }
|
||||
|
||||
// message response {};
|
||||
|
||||
service service { rpc input(request) returns (response); }
|
||||
|
||||
message request { repeated google.protobuf.Any events = 1; }
|
||||
|
||||
message response {};
|
||||
@@ -3,18 +3,18 @@ syntax = "proto3";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "schema.proto";
|
||||
|
||||
package falco.output;
|
||||
package falco.outputs;
|
||||
|
||||
option go_package = "github.com/falcosecurity/client-go/pkg/api/output";
|
||||
option go_package = "github.com/falcosecurity/client-go/pkg/api/outputs";
|
||||
|
||||
// The `subscribe` service defines the RPC call
|
||||
// The `outputs` service defines a server-streaming RPC call
|
||||
// to perform an output `request` which will lead to obtain an output `response`.
|
||||
service service {
|
||||
rpc subscribe(request) returns (stream response);
|
||||
rpc outputs(request) returns (stream response);
|
||||
}
|
||||
|
||||
// The `request` message is the logical representation of the request model.
|
||||
// It is the input of the `subscribe` service.
|
||||
// It is the input of the `outputs` service.
|
||||
// It is used to configure the kind of subscription to the gRPC streaming server.
|
||||
//
|
||||
// By default the request asks to the server to only receive the accumulated events.
|
||||
Reference in New Issue
Block a user