new(userspace/falco): falco outputs grpc server stop

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato
2019-09-03 16:47:55 +00:00
committed by Leo Di Donato
parent fbe4e34a57
commit 6072b7a201
2 changed files with 38 additions and 3 deletions

View File

@@ -24,6 +24,8 @@ limitations under the License.
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#endif #endif
#include <unistd.h> // sleep
#include "grpc_server.h" #include "grpc_server.h"
#include "grpc_context.h" #include "grpc_context.h"
@@ -113,9 +115,16 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_ou
else else
{ {
// Start (or continue) streaming // Start (or continue) streaming
// ctx.m_status == stream_context::STREAMING
}
// ctx.m_status == stream_context::STREAMING
// todo > do we want batching?
res.set_source(source::SYSCALL);
res.set_rule("regola 1");
ctx.m_has_more = false;
}
// todo > print/store statistics // todo > print/store statistics
} }
@@ -175,6 +184,32 @@ void falco_grpc_server::run()
} }
while(is_running()) while(is_running())
{
sleep(1);
}
stop();
}
void falco_grpc_server::stop()
{
m_server->Shutdown();
m_completion_queue->Shutdown();
// todo > log "waiting for the server threads to complete"
for(std::thread& t : m_threads)
{
t.join();
}
m_threads.clear();
// todo > log "all server threads complete"
// Ignore remaining events
void* ignore_tag = nullptr;
bool ignore_ok = false;
while(m_completion_queue->Next(&ignore_tag, &ignore_ok))
{ {
} }
} }

View File

@@ -49,6 +49,7 @@ public:
void thread_process(int thread_index); void thread_process(int thread_index);
void run(); void run();
void stop();
falco_output_service::AsyncService m_svc; falco_output_service::AsyncService m_svc;
std::unique_ptr<grpc::ServerCompletionQueue> m_completion_queue; std::unique_ptr<grpc::ServerCompletionQueue> m_completion_queue;
@@ -58,7 +59,6 @@ private:
std::string m_server_addr; std::string m_server_addr;
int m_threadiness = 0; int m_threadiness = 0;
std::vector<std::thread> m_threads; std::vector<std::thread> m_threads;
}; };
void start_grpc_server(std::string server_address, int threadiness); void start_grpc_server(std::string server_address, int threadiness);