From 87fed11f16e1480c06966b3cc063bba2c29e88b3 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Tue, 3 Sep 2019 18:20:16 +0000 Subject: [PATCH] fix: grpc service must be registered and grpc context state must be handled for threads Signed-off-by: Leonardo Di Donato --- userspace/falco/grpc_server.cpp | 63 ++++++++++++++++++++++++++------- userspace/falco/grpc_server.h | 4 +++ 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 5b546179..0848cf92 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -16,16 +16,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include - #ifdef GRPC_INCLUDE_IS_GRPCPP #include #else #include #endif - #include // sleep +#include "logger.h" #include "grpc_server.h" #include "grpc_context.h" @@ -100,7 +98,10 @@ void request_stream_context::end(fa bool falco_grpc_server_impl::is_running() { - // TODO: this must act as a switch to shut down the server + if(m_stop) + { + return false; + } return true; } @@ -115,15 +116,14 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou else { // Start (or continue) streaming - // ctx.m_status == stream_context::STREAMING // todo > do we want batching? - + sleep(15); res.set_source(source::SYSCALL); - res.set_rule("regola 1"); + res.set_rule("rule X"); - ctx.m_has_more = false; + ctx.m_has_more = true; } // todo > print/store statistics } @@ -144,10 +144,46 @@ void falco_grpc_server::thread_process(int thread_index) { if(tag == nullptr) { - // TODO: empty tag returned, log, what to do? + // TODO: empty tag returned, log "completion queue with empty tag" continue; } + + // Obtain the context for a given tag + request_context_base* ctx = static_cast(tag); + + // When event has not been read successfully + if(!event_read_success) + { + if(ctx->m_state != request_context_base::REQUEST) + { + // todo > log "server completion queue failed to read event for tag `tag`" + // End the context with error + ctx->end(this, true); + } + continue; + } + + // Process the event + switch(ctx->m_state) + { + case request_context_base::REQUEST: + // Completion of m_request_func + case request_context_base::WRITE: + // Completion of ServerAsyncWriter::Write() + ctx->process(this); + break; + case request_context_base::FINISH: + // Completion of ServerAsyncWriter::Finish() + ctx->end(this, false); + + default: + // todo > log "unkown completion queue event" + // todo > abort? + break; + } } + + // todo > log "thread `thread_index` complete" } // @@ -164,16 +200,19 @@ void falco_grpc_server::thread_process(int thread_index) void falco_grpc_server::run() { + // Setup server grpc::ServerBuilder builder; // Listen on the given address without any authentication mechanism. builder.AddListeningPort(m_server_addr, grpc::InsecureServerCredentials()); - // builder.RegisterService(&falco_output_svc); // TODO: enable this when we do the impl + builder.RegisterService(&m_svc); + // builder.SetMaxSendMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size? + // builder.SetMaxReceiveMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size? m_completion_queue = builder.AddCompletionQueue(); m_server = builder.BuildAndStart(); - std::cout << "Server listening on " << m_server_addr << std::endl; + falco_logger::log(LOG_INFO, "Starting gRPC webserver at " + m_server_addr + "\n"); - int context_count = m_threadiness * 10; + int context_count = m_threadiness * 1; // todo > 10 or 100? PROCESS_STREAM(falco_output_request, falco_output_response, subscribe, subscribe, context_count) m_threads.resize(m_threadiness); diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index cf5445f8..26254a83 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -20,6 +20,7 @@ limitations under the License. #include #include +#include #include "falco_output.grpc.pb.h" #include "falco_output.pb.h" @@ -35,6 +36,9 @@ protected: bool is_running(); void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res); + +private: + std::atomic m_stop{false}; }; class falco_grpc_server : public falco_grpc_server_impl