chore(usperspace/falco): move grpc server impl

Co-authored-by: Lorenzo Fontana <lo@linux.com>
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2019-09-06 09:18:44 +00:00 committed by Leo Di Donato
parent c3abccb27b
commit a53e22d2d5
4 changed files with 105 additions and 65 deletions

View File

@ -96,39 +96,7 @@ void request_stream_context<falco_output_request, falco_output_response>::end(fa
start(srv);
}
bool falco_grpc_server_impl::is_running()
{
if(m_stop)
{
return false;
}
return true;
}
void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
// todo > logic
ctx.m_stream = nullptr;
}
else
{
// Start (or continue) streaming
// ctx.m_status == stream_context::STREAMING
if(m_event_queue.try_pop(res) && !req.keepalive())
{
ctx.m_has_more = true;
return;
}
while(!m_event_queue.try_pop(res) && req.keepalive())
{
}
ctx.m_has_more = req.keepalive();
}
}
void falco_grpc_server::thread_process(int thread_index)
{

View File

@ -20,40 +20,10 @@ limitations under the License.
#include <thread>
#include <string>
#include <atomic>
#include <queue>
#include "tbb/concurrent_queue.h"
#include "falco_output.grpc.pb.h"
#include "falco_output.pb.h"
#include "grpc_context.h"
using namespace tbb;
typedef concurrent_queue<falco_output_response> falco_output_response_cq;
class falco_grpc_server_impl
{
public:
falco_grpc_server_impl() = default;
~falco_grpc_server_impl() = default;
falco_output_response_cq& m_event_queue;
falco_grpc_server_impl(falco_output_response_cq& event_queue):
m_event_queue(event_queue)
{
}
protected:
bool is_running();
void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res);
private:
std::atomic<bool> m_stop{false};
};
#include "grpc_server_impl.h"
class falco_grpc_server : public falco_grpc_server_impl
{
@ -64,7 +34,6 @@ public:
m_threadiness(threadiness)
{
}
virtual ~falco_grpc_server() = default;
void thread_process(int thread_index);

View File

@ -0,0 +1,53 @@
/*
Copyright (C) 2016-2019 The Falco Authors
This file is part of falco.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "grpc_server_impl.h"
bool falco_grpc_server_impl::is_running()
{
if(m_stop)
{
return false;
}
return true;
}
void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
// todo > logic
ctx.m_stream = nullptr;
}
else
{
// Start (or continue) streaming
// ctx.m_status == stream_context::STREAMING
if(m_event_queue.try_pop(res) && !req.keepalive())
{
ctx.m_has_more = true;
return;
}
while(!m_event_queue.try_pop(res) && req.keepalive())
{
}
ctx.m_has_more = req.keepalive();
}
}

View File

@ -0,0 +1,50 @@
/*
Copyright (C) 2016-2019 The Falco Authors
This file is part of falco.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include <atomic>
#include "tbb/concurrent_queue.h"
#include "falco_output.grpc.pb.h"
#include "falco_output.pb.h"
#include "grpc_context.h"
typedef tbb::concurrent_queue<falco_output_response> falco_output_response_cq;
class falco_grpc_server_impl
{
public:
falco_grpc_server_impl() = default;
~falco_grpc_server_impl() = default;
falco_output_response_cq& m_event_queue;
falco_grpc_server_impl(falco_output_response_cq& event_queue):
m_event_queue(event_queue)
{
}
protected:
bool is_running();
void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res);
private:
std::atomic<bool> m_stop{false};
};