bc(userspace/falco): the Falco gRPC Outputs API are now "falco.outputs.service/get" and "falco.outputs.service/sub"

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato
2020-05-29 23:12:35 +00:00
committed by poiana
parent bdbdf7b830
commit b88767f558
9 changed files with 51 additions and 52 deletions

View File

@@ -2,15 +2,14 @@
<!-- toc -->
- [Falco gRPC Outputs](#falco-grpc-outputs)
- [Summary](#summary)
- [Motivation](#motivation)
- [Goals](#goals)
- [Non-Goals](#non-goals)
- [Proposal](#proposal)
- [Use cases](#use-cases)
- [Diagrams](#diagrams)
- [Design Details](#design-details)
- [Summary](#summary)
- [Motivation](#motivation)
* [Goals](#goals)
* [Non-Goals](#non-goals)
- [Proposal](#proposal)
* [Use cases](#use-cases)
* [Diagrams](#diagrams)
* [Design Details](#design-details)
<!-- tocstop -->

View File

@@ -19,23 +19,24 @@ add_custom_command(
${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/version.pb.h
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.pb.h
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.h
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.h
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.h
COMMENT "Generate gRPC version API"
COMMENT "Generate gRPC API"
# Falco gRPC Version API
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/version.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/version.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
${CMAKE_CURRENT_SOURCE_DIR}/version.proto
COMMENT "Generate gRPC outputs API"
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/output.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/output.proto
# Falco gRPC Outputs API
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=. ${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto
${CMAKE_CURRENT_SOURCE_DIR}/schema.proto
COMMAND ${PROTOC} -I ${CMAKE_CURRENT_SOURCE_DIR} --grpc_out=. --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
${CMAKE_CURRENT_SOURCE_DIR}/output.proto
${CMAKE_CURRENT_SOURCE_DIR}/outputs.proto
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_executable(
@@ -54,8 +55,8 @@ add_executable(
grpc_server.cpp
${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/output.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/schema.pb.cc)
add_dependencies(falco civetweb string-view-lite)

View File

@@ -22,11 +22,10 @@ limitations under the License.
#include "formats.h"
#include "logger.h"
#include "falco_output_queue.h"
#include "falco_outputs_queue.h"
#include "banned.h" // This raises a compilation error when certain functions are used
using namespace std;
using namespace falco::output;
const static struct luaL_reg ll_falco_outputs [] =
{
@@ -316,7 +315,7 @@ int falco_outputs::handle_grpc(lua_State *ls)
lua_error(ls);
}
response grpc_res = response();
auto grpc_res = falco::outputs::response();
// time
gen_event *evt = (gen_event *)lua_topointer(ls, 1);
@@ -366,7 +365,7 @@ int falco_outputs::handle_grpc(lua_State *ls)
auto host = grpc_res.mutable_hostname();
*host = (char *)lua_tostring(ls, 7);
falco::output::queue::get().push(grpc_res);
falco::outputs::queue::get().push(grpc_res);
return 1;
}

View File

@@ -16,12 +16,12 @@ limitations under the License.
#pragma once
#include "output.pb.h"
#include "outputs.pb.h"
#include "tbb/concurrent_queue.h"
namespace falco
{
namespace output
namespace outputs
{
typedef tbb::concurrent_queue<response> response_cq;

View File

@@ -24,12 +24,12 @@ namespace grpc
{
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv)
void request_stream_context<outputs::service, outputs::request, outputs::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<output::response>(srvctx));
m_res_writer.reset(new ::grpc::ServerAsyncWriter<outputs::response>(srvctx));
m_stream_ctx.reset();
m_req.Clear();
auto cq = srv->m_completion_queue.get();
@@ -38,7 +38,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
}
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
void request_stream_context<outputs::service, outputs::request, outputs::response>::process(server* srv)
{
// When it is the 1st process call
if(m_state == request_context_base::REQUEST)
@@ -48,7 +48,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
}
// Processing
output::response res;
outputs::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // get()
if(!m_stream_ctx->m_is_running)
@@ -75,7 +75,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
}
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool error)
void request_stream_context<outputs::service, outputs::request, outputs::response>::end(server* srv, bool error)
{
if(m_stream_ctx)
{
@@ -86,7 +86,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
m_stream_ctx->m_status = error ? stream_context::ERROR : stream_context::SUCCESS;
// Complete the processing
output::response res;
outputs::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // get()
}
else
@@ -104,7 +104,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::start(server* srv)
void request_context<version::service, version::request, version::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
@@ -119,7 +119,7 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv)
void request_context<version::service, version::request, version::response>::process(server* srv)
{
version::response res;
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
@@ -131,7 +131,7 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool error)
void request_context<version::service, version::request, version::response>::end(server* srv, bool error)
{
// todo(leodido) > handle processing errors here
@@ -140,12 +140,12 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
}
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv)
void request_bidi_context<outputs::service, outputs::request, outputs::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter<output::response, output::request>(srvctx));
m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter<outputs::response, outputs::request>(srvctx));
m_req.Clear();
auto cq = srv->m_completion_queue.get();
// Request to start processing given requests.
@@ -155,7 +155,7 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
};
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
void request_bidi_context<outputs::service, outputs::request, outputs::response>::process(server* srv)
{
switch(m_state)
{
@@ -168,7 +168,7 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
case request_context_base::WRITE:
// Processing
{
output::response res;
outputs::response res;
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub()
if(!m_bidi_ctx->m_is_running)
@@ -196,14 +196,14 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
};
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool error)
void request_bidi_context<outputs::service, outputs::request, outputs::response>::end(server* srv, bool error)
{
if(m_bidi_ctx)
{
m_bidi_ctx->m_status = error ? bidi_context::ERROR : bidi_context::SUCCESS;
// Complete the processing
output::response res;
outputs::response res;
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub()
}

View File

@@ -208,8 +208,8 @@ void falco::grpc::server::run()
// todo(leodido) > take a look at thread_stress_test.cc into grpc repository
REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num)
REGISTER_STREAM(output::request, output::response, output::service, get, get, context_num)
REGISTER_BIDI(output::request, output::response, output::service, sub, sub, context_num)
REGISTER_STREAM(outputs::request, outputs::response, outputs::service, get, get, context_num)
REGISTER_BIDI(outputs::request, outputs::response, outputs::service, sub, sub, context_num)
m_threads.resize(m_threadiness);
int thread_idx = 0;

View File

@@ -44,7 +44,7 @@ public:
void run();
void stop();
output::service::AsyncService m_output_svc;
outputs::service::AsyncService m_output_svc;
version::service::AsyncService m_version_svc;
std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue;

View File

@@ -16,7 +16,7 @@ limitations under the License.
#include "config_falco.h"
#include "grpc_server_impl.h"
#include "falco_output_queue.h"
#include "falco_outputs_queue.h"
#include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used
@@ -29,7 +29,7 @@ bool falco::grpc::server_impl::is_running()
return true;
}
void falco::grpc::server_impl::get(const stream_context& ctx, const output::request& req, output::response& res)
void falco::grpc::server_impl::get(const stream_context& ctx, const outputs::request& req, outputs::response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
@@ -44,10 +44,10 @@ void falco::grpc::server_impl::get(const stream_context& ctx, const output::requ
// m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
ctx.m_has_more = output::queue::get().try_pop(res);
ctx.m_has_more = outputs::queue::get().try_pop(res);
}
void falco::grpc::server_impl::sub(const bidi_context& ctx, const output::request& req, output::response& res)
void falco::grpc::server_impl::sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
@@ -61,7 +61,7 @@ void falco::grpc::server_impl::sub(const bidi_context& ctx, const output::reques
// m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
ctx.m_has_more = output::queue::get().try_pop(res);
ctx.m_has_more = outputs::queue::get().try_pop(res);
}
void falco::grpc::server_impl::version(const context& ctx, const version::request&, version::response& res)

View File

@@ -17,7 +17,7 @@ limitations under the License.
#pragma once
#include <atomic>
#include "output.grpc.pb.h"
#include "outputs.grpc.pb.h"
#include "version.grpc.pb.h"
#include "grpc_context.h"
@@ -37,8 +37,8 @@ protected:
bool is_running();
// Outputs
void get(const stream_context& ctx, const output::request& req, output::response& res);
void sub(const bidi_context& ctx, const output::request& req, output::response& res);
void get(const stream_context& ctx, const outputs::request& req, outputs::response& res);
void sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res);
// Version
void version(const context& ctx, const version::request& req, version::response& res);