fix: grpc service must be registered and grpc context state must be handled for threads

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2019-09-03 18:20:16 +00:00 committed by Leo Di Donato
parent 6072b7a201
commit 87fed11f16
2 changed files with 55 additions and 12 deletions

View File

@ -16,16 +16,14 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
#include <iostream>
#ifdef GRPC_INCLUDE_IS_GRPCPP #ifdef GRPC_INCLUDE_IS_GRPCPP
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#else #else
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#endif #endif
#include <unistd.h> // sleep #include <unistd.h> // sleep
#include "logger.h"
#include "grpc_server.h" #include "grpc_server.h"
#include "grpc_context.h" #include "grpc_context.h"
@ -100,7 +98,10 @@ void request_stream_context<falco_output_request, falco_output_response>::end(fa
bool falco_grpc_server_impl::is_running() bool falco_grpc_server_impl::is_running()
{ {
// TODO: this must act as a switch to shut down the server if(m_stop)
{
return false;
}
return true; return true;
} }
@ -115,15 +116,14 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou
else else
{ {
// Start (or continue) streaming // Start (or continue) streaming
// ctx.m_status == stream_context::STREAMING // ctx.m_status == stream_context::STREAMING
// todo > do we want batching? // todo > do we want batching?
sleep(15);
res.set_source(source::SYSCALL); res.set_source(source::SYSCALL);
res.set_rule("regola 1"); res.set_rule("rule X");
ctx.m_has_more = false; ctx.m_has_more = true;
} }
// todo > print/store statistics // todo > print/store statistics
} }
@ -144,10 +144,46 @@ void falco_grpc_server::thread_process(int thread_index)
{ {
if(tag == nullptr) if(tag == nullptr)
{ {
// TODO: empty tag returned, log, what to do? // TODO: empty tag returned, log "completion queue with empty tag"
continue; continue;
} }
// Obtain the context for a given tag
request_context_base* ctx = static_cast<request_context_base*>(tag);
// When event has not been read successfully
if(!event_read_success)
{
if(ctx->m_state != request_context_base::REQUEST)
{
// todo > log "server completion queue failed to read event for tag `tag`"
// End the context with error
ctx->end(this, true);
} }
continue;
}
// Process the event
switch(ctx->m_state)
{
case request_context_base::REQUEST:
// Completion of m_request_func
case request_context_base::WRITE:
// Completion of ServerAsyncWriter::Write()
ctx->process(this);
break;
case request_context_base::FINISH:
// Completion of ServerAsyncWriter::Finish()
ctx->end(this, false);
default:
// todo > log "unkown completion queue event"
// todo > abort?
break;
}
}
// todo > log "thread `thread_index` complete"
} }
// //
@ -164,16 +200,19 @@ void falco_grpc_server::thread_process(int thread_index)
void falco_grpc_server::run() void falco_grpc_server::run()
{ {
// Setup server
grpc::ServerBuilder builder; grpc::ServerBuilder builder;
// Listen on the given address without any authentication mechanism. // Listen on the given address without any authentication mechanism.
builder.AddListeningPort(m_server_addr, grpc::InsecureServerCredentials()); builder.AddListeningPort(m_server_addr, grpc::InsecureServerCredentials());
// builder.RegisterService(&falco_output_svc); // TODO: enable this when we do the impl builder.RegisterService(&m_svc);
// builder.SetMaxSendMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size?
// builder.SetMaxReceiveMessageSize(GRPC_MAX_MESSAGE_SIZE); // testing max message size?
m_completion_queue = builder.AddCompletionQueue(); m_completion_queue = builder.AddCompletionQueue();
m_server = builder.BuildAndStart(); m_server = builder.BuildAndStart();
std::cout << "Server listening on " << m_server_addr << std::endl; falco_logger::log(LOG_INFO, "Starting gRPC webserver at " + m_server_addr + "\n");
int context_count = m_threadiness * 10; int context_count = m_threadiness * 1; // todo > 10 or 100?
PROCESS_STREAM(falco_output_request, falco_output_response, subscribe, subscribe, context_count) PROCESS_STREAM(falco_output_request, falco_output_response, subscribe, subscribe, context_count)
m_threads.resize(m_threadiness); m_threads.resize(m_threadiness);

View File

@ -20,6 +20,7 @@ limitations under the License.
#include <thread> #include <thread>
#include <string> #include <string>
#include <atomic>
#include "falco_output.grpc.pb.h" #include "falco_output.grpc.pb.h"
#include "falco_output.pb.h" #include "falco_output.pb.h"
@ -35,6 +36,9 @@ protected:
bool is_running(); bool is_running();
void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res); void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res);
private:
std::atomic<bool> m_stop{false};
}; };
class falco_grpc_server : public falco_grpc_server_impl class falco_grpc_server : public falco_grpc_server_impl