diff --git a/userspace/falco/grpc_server.cpp b/userspace/falco/grpc_server.cpp index 56f2c7d7..bdaec631 100644 --- a/userspace/falco/grpc_server.cpp +++ b/userspace/falco/grpc_server.cpp @@ -96,39 +96,7 @@ void request_stream_context::end(fa start(srv); } -bool falco_grpc_server_impl::is_running() -{ - if(m_stop) - { - return false; - } - return true; -} -void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res) -{ - if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) - { - // todo > logic - - ctx.m_stream = nullptr; - } - else - { - // Start (or continue) streaming - // ctx.m_status == stream_context::STREAMING - if(m_event_queue.try_pop(res) && !req.keepalive()) - { - ctx.m_has_more = true; - return; - } - while(!m_event_queue.try_pop(res) && req.keepalive()) - { - } - - ctx.m_has_more = req.keepalive(); - } -} void falco_grpc_server::thread_process(int thread_index) { diff --git a/userspace/falco/grpc_server.h b/userspace/falco/grpc_server.h index 37f5c3c3..2c89c0e9 100644 --- a/userspace/falco/grpc_server.h +++ b/userspace/falco/grpc_server.h @@ -20,40 +20,10 @@ limitations under the License. #include #include -#include + #include -#include "tbb/concurrent_queue.h" - -#include "falco_output.grpc.pb.h" -#include "falco_output.pb.h" -#include "grpc_context.h" - -using namespace tbb; - -typedef concurrent_queue falco_output_response_cq; - -class falco_grpc_server_impl -{ -public: - falco_grpc_server_impl() = default; - ~falco_grpc_server_impl() = default; - - falco_output_response_cq& m_event_queue; - - falco_grpc_server_impl(falco_output_response_cq& event_queue): - m_event_queue(event_queue) - { - } - -protected: - bool is_running(); - - void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res); - -private: - std::atomic m_stop{false}; -}; +#include "grpc_server_impl.h" class falco_grpc_server : public falco_grpc_server_impl { @@ -64,7 +34,6 @@ public: m_threadiness(threadiness) { } - virtual ~falco_grpc_server() = default; void thread_process(int thread_index); diff --git a/userspace/falco/grpc_server_impl.cpp b/userspace/falco/grpc_server_impl.cpp new file mode 100644 index 00000000..f89082a6 --- /dev/null +++ b/userspace/falco/grpc_server_impl.cpp @@ -0,0 +1,53 @@ +/* +Copyright (C) 2016-2019 The Falco Authors + +This file is part of falco. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "grpc_server_impl.h" + +bool falco_grpc_server_impl::is_running() +{ + if(m_stop) + { + return false; + } + return true; +} + +void falco_grpc_server_impl::subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res) +{ + if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR) + { + // todo > logic + + ctx.m_stream = nullptr; + } + else + { + // Start (or continue) streaming + // ctx.m_status == stream_context::STREAMING + if(m_event_queue.try_pop(res) && !req.keepalive()) + { + ctx.m_has_more = true; + return; + } + while(!m_event_queue.try_pop(res) && req.keepalive()) + { + } + + ctx.m_has_more = req.keepalive(); + } +} \ No newline at end of file diff --git a/userspace/falco/grpc_server_impl.h b/userspace/falco/grpc_server_impl.h new file mode 100644 index 00000000..7b169b53 --- /dev/null +++ b/userspace/falco/grpc_server_impl.h @@ -0,0 +1,50 @@ +/* +Copyright (C) 2016-2019 The Falco Authors + +This file is part of falco. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#pragma once + +#include + +#include "tbb/concurrent_queue.h" +#include "falco_output.grpc.pb.h" +#include "falco_output.pb.h" +#include "grpc_context.h" + +typedef tbb::concurrent_queue falco_output_response_cq; + +class falco_grpc_server_impl +{ +public: + falco_grpc_server_impl() = default; + ~falco_grpc_server_impl() = default; + + falco_output_response_cq& m_event_queue; + + falco_grpc_server_impl(falco_output_response_cq& event_queue): + m_event_queue(event_queue) + { + } + +protected: + bool is_running(); + + void subscribe(const stream_context& ctx, const falco_output_request& req, falco_output_response& res); + +private: + std::atomic m_stop{false}; +}; \ No newline at end of file