cleanup(falco): consolidate falco::grpc::server in one class

Signed-off-by: Luca Guerra <luca@guerra.sh>
This commit is contained in:
Luca Guerra 2024-03-27 17:32:03 +00:00 committed by poiana
parent a8018a2894
commit 13c8e37a41
5 changed files with 96 additions and 153 deletions

View File

@ -94,11 +94,9 @@ if(CMAKE_SYSTEM_NAME MATCHES "Linux" AND NOT MINIMAL_BUILD)
outputs_http.cpp
webserver.cpp
grpc_context.cpp
grpc_server_impl.cpp
grpc_request_context.cpp
grpc_server.cpp
grpc_context.cpp
grpc_server_impl.cpp
${CMAKE_CURRENT_BINARY_DIR}/version.grpc.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/version.pb.cc
${CMAKE_CURRENT_BINARY_DIR}/outputs.grpc.pb.cc

View File

@ -21,8 +21,12 @@ limitations under the License.
#include <grpc++/grpc++.h>
#endif
#include "config_falco.h"
#include "falco_engine.h"
#include "falco_engine_version.h"
#include "logger.h"
#include "grpc_server.h"
#include "grpc_queue.h"
#include "grpc_request_context.h"
#include "falco_utils.h"
@ -252,3 +256,76 @@ void falco::grpc::server::stop()
falco_logger::log(falco_logger::level::INFO, "Shutting down gRPC server complete\n");
}
bool falco::grpc::server::is_running()
{
if(m_stop)
{
return false;
}
return true;
}
void falco::grpc::server::get(const stream_context& ctx, const outputs::request& req, outputs::response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
// todo(leodido) > log "status=ctx->m_status, stream=ctx->m_stream"
ctx.m_stream = nullptr;
return;
}
ctx.m_is_running = is_running();
// Start or continue streaming
// m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
ctx.m_has_more = queue::get().try_pop(res);
}
void falco::grpc::server::sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
ctx.m_stream = nullptr;
return;
}
ctx.m_is_running = is_running();
// Start or continue streaming
// m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
ctx.m_has_more = queue::get().try_pop(res);
}
void falco::grpc::server::version(const context& ctx, const version::request&, version::response& res)
{
auto& build = *res.mutable_build();
build = FALCO_VERSION_BUILD;
auto& prerelease = *res.mutable_prerelease();
prerelease = FALCO_VERSION_PRERELEASE;
auto& version = *res.mutable_version();
version = FALCO_VERSION;
res.set_engine_version(FALCO_ENGINE_VERSION);
res.set_engine_fields_checksum(FALCO_ENGINE_CHECKSUM);
auto engine_version = falco_engine::engine_version();
res.set_engine_major(engine_version.major());
res.set_engine_minor(engine_version.minor());
res.set_engine_patch(engine_version.patch());
res.set_major(FALCO_VERSION_MAJOR);
res.set_minor(FALCO_VERSION_MINOR);
res.set_patch(FALCO_VERSION_PATCH);
}
void falco::grpc::server::shutdown()
{
m_stop = true;
m_server->Shutdown();
}

View File

@ -19,15 +19,18 @@ limitations under the License.
#include <thread>
#include <string>
#include <atomic>
#include "grpc_server_impl.h"
#include "outputs.grpc.pb.h"
#include "version.grpc.pb.h"
#include "grpc_context.h"
namespace falco
{
namespace grpc
{
class server : public server_impl
class server
{
public:
server() = default;
@ -44,6 +47,7 @@ public:
void thread_process(int thread_index);
void run();
void stop();
void shutdown();
outputs::service::AsyncService m_output_svc;
version::service::AsyncService m_version_svc;
@ -61,6 +65,19 @@ private:
::grpc::ServerBuilder m_server_builder;
void init_mtls_server_builder();
void init_unix_server_builder();
bool is_running();
// Outputs
void get(const stream_context& ctx, const outputs::request& req, outputs::response& res);
void sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res);
// Version
void version(const context& ctx, const version::request& req, version::response& res);
std::unique_ptr<::grpc::Server> m_server;
std::atomic<bool> m_stop{false};
};
} // namespace grpc

View File

@ -1,96 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
/*
Copyright (C) 2023 The Falco Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "config_falco.h"
#include "falco_engine.h"
#include "falco_engine_version.h"
#include "grpc_server_impl.h"
#include "grpc_queue.h"
#include "logger.h"
bool falco::grpc::server_impl::is_running()
{
if(m_stop)
{
return false;
}
return true;
}
void falco::grpc::server_impl::get(const stream_context& ctx, const outputs::request& req, outputs::response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
// todo(leodido) > log "status=ctx->m_status, stream=ctx->m_stream"
ctx.m_stream = nullptr;
return;
}
ctx.m_is_running = is_running();
// Start or continue streaming
// m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
ctx.m_has_more = queue::get().try_pop(res);
}
void falco::grpc::server_impl::sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res)
{
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
ctx.m_stream = nullptr;
return;
}
ctx.m_is_running = is_running();
// Start or continue streaming
// m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
ctx.m_has_more = queue::get().try_pop(res);
}
void falco::grpc::server_impl::version(const context& ctx, const version::request&, version::response& res)
{
auto& build = *res.mutable_build();
build = FALCO_VERSION_BUILD;
auto& prerelease = *res.mutable_prerelease();
prerelease = FALCO_VERSION_PRERELEASE;
auto& version = *res.mutable_version();
version = FALCO_VERSION;
res.set_engine_version(FALCO_ENGINE_VERSION);
res.set_engine_fields_checksum(FALCO_ENGINE_CHECKSUM);
auto engine_version = falco_engine::engine_version();
res.set_engine_major(engine_version.major());
res.set_engine_minor(engine_version.minor());
res.set_engine_patch(engine_version.patch());
res.set_major(FALCO_VERSION_MAJOR);
res.set_minor(FALCO_VERSION_MINOR);
res.set_patch(FALCO_VERSION_PATCH);
}
void falco::grpc::server_impl::shutdown()
{
m_stop = true;
m_server->Shutdown();
}

View File

@ -1,53 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
/*
Copyright (C) 2023 The Falco Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include <atomic>
#include "outputs.grpc.pb.h"
#include "version.grpc.pb.h"
#include "grpc_context.h"
namespace falco
{
namespace grpc
{
class server_impl
{
public:
server_impl() = default;
~server_impl() = default;
void shutdown();
protected:
bool is_running();
// Outputs
void get(const stream_context& ctx, const outputs::request& req, outputs::response& res);
void sub(const bidi_context& ctx, const outputs::request& req, outputs::response& res);
// Version
void version(const context& ctx, const version::request& req, version::response& res);
std::unique_ptr<::grpc::Server> m_server;
private:
std::atomic<bool> m_stop{false};
};
} // namespace grpc
} // namespace falco