fix(userspace/falco): grpc server implementation subscribe handle output queue stop

Signed-off-by: Lorenzo Fontana <lo@linux.com>
This commit is contained in:
Lorenzo Fontana 2019-09-19 16:05:39 +02:00 committed by Leo Di Donato
parent d35971e1bc
commit 0565ce2f50
4 changed files with 18 additions and 38 deletions

View File

@ -21,7 +21,6 @@ limitations under the License.
#else
#include <grpc++/grpc++.h>
#endif
#include <signal.h> // pthread_sigmask
#include "logger.h"
#include "grpc_server.h"
@ -36,7 +35,6 @@ void request_stream_context<request, response>::start(falco_grpc_server* srv)
m_res_writer.reset(new grpc::ServerAsyncWriter<response>(srvctx));
m_stream_ctx.reset();
m_req.Clear();
auto cq = srv->m_completion_queue.get();
(srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
@ -71,40 +69,22 @@ void request_stream_context<request, response>::process(falco_grpc_server* srv)
}
template<>
void request_stream_context<request, response>::end(falco_grpc_server* srv, bool isError)
void request_stream_context<request, response>::end(falco_grpc_server* srv, bool errored)
{
if(m_stream_ctx)
{
m_stream_ctx->m_status = stream_context::SUCCESS;
if(isError)
{
m_stream_ctx->m_status = stream_context::ERROR;
// todo > log error
}
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
// Complete the processing
response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
}
else
{
// Handle the edge case when `m_request_func` event failed
// which means `m_stream_ctx` was not set
// todo > log error
}
start(srv);
}
void falco_grpc_server::thread_process(int thread_index)
{
// Tell pthread to not handle termination signals in the current thread
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGTERM);
// sigaddset(&set, SIGHUP); // todo > SIGHUP should restart Falco, what to do?
sigaddset(&set, SIGINT);
pthread_sigmask(SIG_BLOCK, &set, nullptr);
void* tag = nullptr;
bool event_read_success = false;
@ -143,15 +123,13 @@ void falco_grpc_server::thread_process(int thread_index)
case request_context_base::FINISH:
// Completion of ServerAsyncWriter::Finish()
ctx->end(this, false);
break;
default:
// todo > log "unkown completion queue event"
// todo > abort?
break;
}
}
// todo > log "thread `thread_index` complete"
}
//
@ -240,32 +218,36 @@ void falco_grpc_server::run()
thread = std::thread(&falco_grpc_server::thread_process, this, thread_idx++);
}
while(is_running())
while(falco_grpc_server_impl::is_running())
{
sleep(1);
}
stop();
}
void falco_grpc_server::stop()
{
falco_logger::log(LOG_INFO, "Shutting down gRPC server. Waiting until external connections are closed by clients\n");
m_server->Shutdown();
m_completion_queue->Shutdown();
// todo > log "waiting for the server threads to complete"
falco_logger::log(LOG_INFO, "Waiting for the gRPC threads to complete\n");
for(std::thread& t : m_threads)
{
t.join();
if(t.joinable())
{
t.join();
}
}
m_threads.clear();
// todo > log "all server threads complete"
falco_logger::log(LOG_INFO, "Ignoring all the remaining gRPC events\n");
// Ignore remaining events
void* ignore_tag = nullptr;
bool ignore_ok = false;
while(m_completion_queue->Next(&ignore_tag, &ignore_ok))
{
}
falco_logger::log(LOG_INFO, "gRPC shutdown is now complete\n");
}

View File

@ -21,8 +21,6 @@ limitations under the License.
#include <thread>
#include <string>
#include <queue>
#include "grpc_server_impl.h"
class falco_grpc_server : public falco_grpc_server_impl

View File

@ -44,15 +44,15 @@ void falco_grpc_server_impl::subscribe(const stream_context& ctx, const request&
ctx.m_has_more = true;
return;
}
while(!falco_output_queue::get().try_pop(res) && req.keepalive())
while(is_running() && !falco_output_queue::get().try_pop(res) && req.keepalive())
{
}
ctx.m_has_more = req.keepalive();
ctx.m_has_more = !is_running() ? false : req.keepalive();
}
}
void falco_grpc_server_impl::shutdown()
{
m_stop = true;
}
}

View File

@ -41,4 +41,4 @@ protected:
private:
std::atomic<bool> m_stop{false};
};
};