mirror of
https://github.com/falcosecurity/falco.git
synced 2025-07-07 19:59:25 +00:00
new(usperspace/falco): request stream context specialization and process stream macro
Co-authored-by: Lorenzo Fontana <lo@linux.com> Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
parent
2f917c578d
commit
13f5a76b97
@ -27,13 +27,54 @@ limitations under the License.
|
|||||||
#include "grpc_server.h"
|
#include "grpc_server.h"
|
||||||
#include "grpc_context.h"
|
#include "grpc_context.h"
|
||||||
|
|
||||||
bool grpc_server_impl::is_running()
|
template<>
|
||||||
|
void request_stream_context<falco_output_request, falco_output_response>::start(falco_grpc_server* srv)
|
||||||
|
{
|
||||||
|
m_state = request_context_base::REQUEST;
|
||||||
|
m_srv_ctx.reset(new grpc::ServerContext);
|
||||||
|
auto srvctx = m_srv_ctx.get();
|
||||||
|
m_res_writer.reset(new grpc::ServerAsyncWriter<falco_output_response>(srvctx));
|
||||||
|
m_stream_ctx.reset();
|
||||||
|
m_req.Clear();
|
||||||
|
|
||||||
|
auto cq = srv->m_completion_queue.get();
|
||||||
|
(srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void request_stream_context<falco_output_request, falco_output_response>::process(falco_grpc_server* srv)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void request_stream_context<falco_output_request, falco_output_response>::end(falco_grpc_server* srv, bool isError)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool falco_grpc_server_impl::is_running()
|
||||||
{
|
{
|
||||||
// TODO: this must act as a switch to shut down the server
|
// TODO: this must act as a switch to shut down the server
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void grpc_server::thread_process(int thread_index)
|
void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res)
|
||||||
|
{
|
||||||
|
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
|
||||||
|
{
|
||||||
|
// todo > logic
|
||||||
|
|
||||||
|
ctx.m_stream = nullptr;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Start (or continue) streaming
|
||||||
|
// ctx.m_status == stream_context::STREAMING
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo > print/store statistics
|
||||||
|
}
|
||||||
|
|
||||||
|
void falco_grpc_server::thread_process(int thread_index)
|
||||||
{
|
{
|
||||||
// TODO: is this right? That's what we want?
|
// TODO: is this right? That's what we want?
|
||||||
// Tell pthread to not handle termination signals in the current thread
|
// Tell pthread to not handle termination signals in the current thread
|
||||||
@ -55,7 +96,19 @@ void grpc_server::thread_process(int thread_index)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void grpc_server::run()
|
//
|
||||||
|
// Create array of contexts and start processing streaming RPC request.
|
||||||
|
//
|
||||||
|
#define PROCESS_STREAM(REQ, RESP, RPC, IMPL, CONTEXT_COUNT) \
|
||||||
|
std::vector<request_stream_context<REQ, RESP>> RPC##_contexts(CONTEXT_COUNT); \
|
||||||
|
for(request_stream_context<REQ, RESP> & ctx : RPC##_contexts) \
|
||||||
|
{ \
|
||||||
|
ctx.m_process_func = &falco_grpc_server::IMPL; \
|
||||||
|
ctx.m_request_func = &falco_output_service::AsyncService::Request##RPC; \
|
||||||
|
ctx.start(this); \
|
||||||
|
}
|
||||||
|
|
||||||
|
void falco_grpc_server::run()
|
||||||
{
|
{
|
||||||
grpc::ServerBuilder builder;
|
grpc::ServerBuilder builder;
|
||||||
// Listen on the given address without any authentication mechanism.
|
// Listen on the given address without any authentication mechanism.
|
||||||
@ -66,14 +119,14 @@ void grpc_server::run()
|
|||||||
m_server = builder.BuildAndStart();
|
m_server = builder.BuildAndStart();
|
||||||
std::cout << "Server listening on " << m_server_addr << std::endl;
|
std::cout << "Server listening on " << m_server_addr << std::endl;
|
||||||
|
|
||||||
// int context_count = threadiness * 10;
|
int context_count = m_threadiness * 10;
|
||||||
|
PROCESS_STREAM(falco_output_request, falco_output_response, subscribe, subscribe, context_count)
|
||||||
|
|
||||||
m_threads.resize(m_threadiness);
|
m_threads.resize(m_threadiness);
|
||||||
|
|
||||||
int thread_idx = 0;
|
int thread_idx = 0;
|
||||||
for(std::thread& thread : m_threads)
|
for(std::thread& thread : m_threads)
|
||||||
{
|
{
|
||||||
thread = std::thread(&grpc_server::thread_process, this, thread_idx++);
|
thread = std::thread(&falco_grpc_server::thread_process, this, thread_idx++);
|
||||||
}
|
}
|
||||||
|
|
||||||
while(is_running())
|
while(is_running())
|
||||||
@ -81,12 +134,8 @@ void grpc_server::run()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void grpc_server::subscribe_handler(const stream_context& ctx, falco_output_request req, falco_output_response res)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void start_grpc_server(std::string server_address, int threadiness)
|
void start_grpc_server(std::string server_address, int threadiness)
|
||||||
{
|
{
|
||||||
grpc_server srv(server_address, threadiness);
|
falco_grpc_server srv(server_address, threadiness);
|
||||||
srv.run();
|
srv.run();
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user