update(userspace/falco): request context and request stream context templatize the service too now

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2019-10-04 11:35:39 +00:00 committed by poiana
parent c224633454
commit 2a91289ee4
4 changed files with 226 additions and 177 deletions

View File

@ -0,0 +1,115 @@
/*
Copyright (C) 2016-2019 The Falco Authors
This file is part of falco.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "grpc_request_context.h"
namespace falco
{
namespace grpc
{
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<output::response>(srvctx));
m_stream_ctx.reset();
m_req.Clear();
auto cq = srv->m_completion_queue.get();
(srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
{
// When it is the 1st process call
if(m_state == request_context_base::REQUEST)
{
m_state = request_context_base::WRITE;
m_stream_ctx.reset(new stream_context(m_srv_ctx.get()));
}
// Processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
// When there still are more responses to stream
if(m_stream_ctx->m_has_more)
{
m_res_writer->Write(res, this);
}
// No more responses to stream
else
{
// Communicate to the gRPC runtime that we have finished.
// The memory address of `this` instance uniquely identifies the event.
m_state = request_context_base::FINISH;
m_res_writer->Finish(::grpc::Status::OK, this);
}
}
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool errored)
{
if(m_stream_ctx)
{
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
// Complete the processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
}
start(srv);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<version::response>(srvctx));
m_req.Clear();
// auto cq = srv->m_completion_queue.get();
// fixme(leodido) > m_svc is output::service not version::service
// (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv)
{
version::response res;
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
// Done
m_state = request_context_base::FINISH;
m_res_writer->Write(res, this);
m_res_writer->Finish(::grpc::Status::OK, this);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored)
{
// todo(leodido) > what to do when errored is true?
start(srv);
}
} // namespace grpc
} // namespace falco

View File

@ -0,0 +1,100 @@
/*
Copyright (C) 2016-2019 The Falco Authors
This file is part of falco.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include "grpc_server.h"
namespace falco
{
namespace grpc
{
class request_context_base
{
public:
request_context_base() = default;
~request_context_base() = default;
std::unique_ptr<::grpc::ServerContext> m_srv_ctx;
enum : char
{
UNKNOWN = 0,
REQUEST,
WRITE,
FINISH
} m_state = UNKNOWN;
virtual void start(server* srv) = 0;
virtual void process(server* srv) = 0;
virtual void end(server* srv, bool isError) = 0;
};
// The responsibility of `request_stream_context` template class
// is to handle streaming responses.
template<class Service, class Request, class Response>
class request_stream_context : public request_context_base
{
public:
request_stream_context():
m_process_func(nullptr),
m_request_func(nullptr){};
~request_stream_context() = default;
// Pointer to function that does actual processing
void (server::*m_process_func)(const stream_context&, const Request&, Response&);
// Pointer to function that requests the system to start processing given requests
void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(server* srv);
void process(server* srv);
void end(server* srv, bool isError);
private:
std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;
std::unique_ptr<stream_context> m_stream_ctx;
Request m_req;
};
// The responsibility of `request_context` template class
// is to handle unary responses.
template<class Service, class Request, class Response>
class request_context : public request_context_base
{
public:
request_context():
m_process_func(nullptr),
m_request_func(nullptr){};
~request_context() = default;
// Pointer to function that does actual processing
void (server::*m_process_func)(const context&, const Request&, Response&);
// Pointer to function that requests the system to start processing given requests
void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncResponseWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(server* srv);
void process(server* srv);
void end(server* srv, bool isError);
private:
std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;
Request m_req;
};
} // namespace grpc
} // namespace falco

View File

@ -22,15 +22,14 @@ limitations under the License.
#include "logger.h" #include "logger.h"
#include "grpc_server.h" #include "grpc_server.h"
#include "grpc_request_context.h"
#include "grpc_context.h" #include "grpc_context.h"
#include "utils.h" #include "utils.h"
#include "banned.h" #include "banned.h"
// todo(leodido) > remove this macro doing only one REGISTER macro (only the context typeschange and they inherit from a base class)
#define REGISTER_STREAM(req, res, svc, rpc, impl, num) \ #define REGISTER_STREAM(req, res, svc, rpc, impl, num) \
std::vector<request_stream_context<req, res>> rpc##_contexts(num); \ std::vector<request_stream_context<svc, req, res>> rpc##_contexts(num); \
for(request_stream_context<req, res> & c : rpc##_contexts) \ for(request_stream_context<svc, req, res> & c : rpc##_contexts) \
{ \ { \
c.m_process_func = &server::impl; \ c.m_process_func = &server::impl; \
c.m_request_func = &svc::AsyncService::Request##rpc; \ c.m_request_func = &svc::AsyncService::Request##rpc; \
@ -38,109 +37,14 @@ limitations under the License.
} }
#define REGISTER_UNARY(req, res, svc, rpc, impl, num) \ #define REGISTER_UNARY(req, res, svc, rpc, impl, num) \
std::vector<request_context<req, res>> rpc##_contexts(num); \ std::vector<request_context<svc, req, res>> rpc##_contexts(num); \
for(request_context<req, res> & c : rpc##_contexts) \ for(request_context<svc, req, res> & c : rpc##_contexts) \
{ \ { \
c.m_process_func = &server::impl; \ c.m_process_func = &server::impl; \
c.m_request_func = &svc::AsyncService::Request##rpc; \ c.m_request_func = &svc::AsyncService::Request##rpc; \
c.start(this); \ c.start(this); \
} }
namespace falco
{
namespace grpc
{
template<>
void request_stream_context<falco::output::request, falco::output::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<output::response>(srvctx));
m_stream_ctx.reset();
m_req.Clear();
auto cq = srv->m_completion_queue.get();
(srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void request_stream_context<falco::output::request, falco::output::response>::process(server* srv)
{
// When it is the 1st process call
if(m_state == request_context_base::REQUEST)
{
m_state = request_context_base::WRITE;
m_stream_ctx.reset(new stream_context(m_srv_ctx.get()));
}
// Processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
// When there still are more responses to stream
if(m_stream_ctx->m_has_more)
{
m_res_writer->Write(res, this);
}
// No more responses to stream
else
{
// Communicate to the gRPC runtime that we have finished.
// The memory address of `this` instance uniquely identifies the event.
m_state = request_context_base::FINISH;
m_res_writer->Finish(::grpc::Status::OK, this);
}
}
template<>
void request_stream_context<falco::output::request, falco::output::response>::end(server* srv, bool errored)
{
if(m_stream_ctx)
{
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
// Complete the processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
}
start(srv);
}
} // namespace grpc
} // namespace falco
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<version::response>(srvctx));
m_req.Clear();
// auto cq = srv->m_completion_queue.get();
// fixme(leodido) > m_svc is output::service not version::service
// (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv)
{
version::response res;
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
// Done
m_state = request_context_base::FINISH;
m_res_writer->Write(res, this);
m_res_writer->Finish(::grpc::Status::OK, this);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored)
{
// todo(leodido) > what to do when errored is true?
start(srv);
}
void falco::grpc::server::thread_process(int thread_index) void falco::grpc::server::thread_process(int thread_index)
{ {
void* tag = nullptr; void* tag = nullptr;
@ -225,7 +129,7 @@ void falco::grpc::server::run()
// For this approach to be sufficient server::IMPL have to be fast // For this approach to be sufficient server::IMPL have to be fast
int context_num = m_threadiness * 10; int context_num = m_threadiness * 10;
// REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num) //REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num)
REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num) REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num)
m_threads.resize(m_threadiness); m_threads.resize(m_threadiness);

View File

@ -25,6 +25,7 @@ namespace falco
{ {
namespace grpc namespace grpc
{ {
class server : public server_impl class server : public server_impl
{ {
public: public:
@ -50,6 +51,9 @@ public:
output::service::AsyncService m_svc; output::service::AsyncService m_svc;
std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue; std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue;
// version::service::AsyncService m_version_svc;
// std::unique_ptr<::grpc::ServerCompletionQueue> m_version_completion_queue;
private: private:
std::string m_server_addr; std::string m_server_addr;
int m_threadiness; int m_threadiness;
@ -61,79 +65,5 @@ private:
std::vector<std::thread> m_threads; std::vector<std::thread> m_threads;
}; };
class request_context_base
{
public:
request_context_base() = default;
~request_context_base() = default;
std::unique_ptr<::grpc::ServerContext> m_srv_ctx;
enum : char
{
UNKNOWN = 0,
REQUEST,
WRITE,
FINISH
} m_state = UNKNOWN;
virtual void start(server* srv) = 0;
virtual void process(server* srv) = 0;
virtual void end(server* srv, bool isError) = 0;
};
// The responsibility of `request_stream_context` template class
// is to handle streaming responses.
template<class Request, class Response>
class request_stream_context : public request_context_base
{
public:
request_stream_context():
m_process_func(nullptr),
m_request_func(nullptr){};
~request_stream_context() = default;
// Pointer to function that does actual processing
void (server::*m_process_func)(const stream_context&, const Request&, Response&);
// fixme(leodido) > why output::service:: ... ?
// Pointer to function that requests the system to start processing given requests
void (output::service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(server* srv);
void process(server* srv);
void end(server* srv, bool isError);
private:
std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;
std::unique_ptr<stream_context> m_stream_ctx;
Request m_req;
};
// The responsibility of `request_context` template class
// is to handle unary responses.
template<class Service, class Request, class Response>
class request_context : public request_context_base
{
public:
request_context():
m_process_func(nullptr),
m_request_func(nullptr){};
~request_context() = default;
// Pointer to function that does actual processing
void (server::*m_process_func)(const context&, const Request&, Response&);
// fixme(leodido) > why output::service:: ... ?
// Pointer to function that requests the system to start processing given requests
void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(server* srv);
void process(server* srv);
void end(server* srv, bool isError);
private:
// todo(leodido) > factorize these two into tbe base class?
std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;
Request m_req;
};
} // namespace grpc } // namespace grpc
} // namespace falco } // namespace falco