new(userspace/falco): initial work for version gRPC svc registration

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2019-10-04 10:26:54 +00:00 committed by poiana
parent 714a6619ad
commit c224633454
2 changed files with 84 additions and 12 deletions

View File

@ -26,13 +26,24 @@ limitations under the License.
#include "utils.h"
#include "banned.h"
#define REGISTER_STREAM(req, res, svc, rpc, impl, num) \
std::vector<request_stream_context<req, res>> rpc##_contexts(num); \
for(request_stream_context<req, res> & ctx : rpc##_contexts) \
{ \
ctx.m_process_func = &server::impl; \
ctx.m_request_func = &svc::AsyncService::Request##rpc; \
ctx.start(this); \
// todo(leodido) > remove this macro doing only one REGISTER macro (only the context typeschange and they inherit from a base class)
#define REGISTER_STREAM(req, res, svc, rpc, impl, num) \
std::vector<request_stream_context<req, res>> rpc##_contexts(num); \
for(request_stream_context<req, res> & c : rpc##_contexts) \
{ \
c.m_process_func = &server::impl; \
c.m_request_func = &svc::AsyncService::Request##rpc; \
c.start(this); \
}
#define REGISTER_UNARY(req, res, svc, rpc, impl, num) \
std::vector<request_context<req, res>> rpc##_contexts(num); \
for(request_context<req, res> & c : rpc##_contexts) \
{ \
c.m_process_func = &server::impl; \
c.m_request_func = &svc::AsyncService::Request##rpc; \
c.start(this); \
}
namespace falco
@ -99,6 +110,37 @@ void request_stream_context<falco::output::request, falco::output::response>::en
} // namespace grpc
} // namespace falco
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::start(server* srv)
{
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_res_writer.reset(new ::grpc::ServerAsyncWriter<version::response>(srvctx));
m_req.Clear();
// auto cq = srv->m_completion_queue.get();
// fixme(leodido) > m_svc is output::service not version::service
// (srv->m_svc.*m_request_func)(srvctx, &m_req, m_res_writer.get(), cq, cq, this);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv)
{
version::response res;
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res);
// Done
m_state = request_context_base::FINISH;
m_res_writer->Write(res, this);
m_res_writer->Finish(::grpc::Status::OK, this);
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored)
{
// todo(leodido) > what to do when errored is true?
start(srv);
}
void falco::grpc::server::thread_process(int thread_index)
{
void* tag = nullptr;
@ -172,6 +214,7 @@ void falco::grpc::server::run()
::grpc::ServerBuilder builder;
builder.AddListeningPort(m_server_addr, ::grpc::SslServerCredentials(ssl_opts));
builder.RegisterService(&m_svc);
// fixme(leodido) > register various services ...
m_completion_queue = builder.AddCompletionQueue();
m_server = builder.BuildAndStart();
@ -181,9 +224,9 @@ void falco::grpc::server::run()
// This defines the number of simultaneous completion queue requests of the same type (service::AsyncService::Request##RPC)
// For this approach to be sufficient server::IMPL have to be fast
int context_num = m_threadiness * 10;
REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num)
// register_stream<output::request, output::response>(subscribe, context_num)
// REGISTER_UNARY(version::request, version::response, version::service, version, version, context_num)
REGISTER_STREAM(output::request, output::response, output::service, subscribe, subscribe, context_num)
m_threads.resize(m_threadiness);
int thread_idx = 0;

View File

@ -46,6 +46,7 @@ public:
void run();
void stop();
// fixme(leodido) > wny the output::service:: ..?
output::service::AsyncService m_svc;
std::unique_ptr<::grpc::ServerCompletionQueue> m_completion_queue;
@ -79,9 +80,8 @@ public:
virtual void end(server* srv, bool isError) = 0;
};
//
// Template class to handle streaming responses
//
// The responsibility of `request_stream_context` template class
// is to handle streaming responses.
template<class Request, class Response>
class request_stream_context : public request_context_base
{
@ -94,6 +94,7 @@ public:
// Pointer to function that does actual processing
void (server::*m_process_func)(const stream_context&, const Request&, Response&);
// fixme(leodido) > why output::service:: ... ?
// Pointer to function that requests the system to start processing given requests
void (output::service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
@ -106,5 +107,33 @@ private:
std::unique_ptr<stream_context> m_stream_ctx;
Request m_req;
};
// The responsibility of `request_context` template class
// is to handle unary responses.
template<class Service, class Request, class Response>
class request_context : public request_context_base
{
public:
request_context():
m_process_func(nullptr),
m_request_func(nullptr){};
~request_context() = default;
// Pointer to function that does actual processing
void (server::*m_process_func)(const context&, const Request&, Response&);
// fixme(leodido) > why output::service:: ... ?
// Pointer to function that requests the system to start processing given requests
void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, Request*, ::grpc::ServerAsyncWriter<Response>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(server* srv);
void process(server* srv);
void end(server* srv, bool isError);
private:
// todo(leodido) > factorize these two into tbe base class?
std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;
Request m_req;
};
} // namespace grpc
} // namespace falco