update(userspace/falco): context class for bidirectional gRPC services

Co-authored-by: Lorenzo Fontana <lo@linux.com>
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2020-05-27 19:59:35 +00:00 committed by poiana
parent a568c42adb
commit cf31712fad
2 changed files with 93 additions and 12 deletions

View File

@ -16,6 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "logger.h"
#include "grpc_request_context.h"
namespace falco
@ -40,6 +41,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
{
falco_logger::log(LOG_INFO, "process\n");
// When it is the 1st process call
if(m_state == request_context_base::REQUEST)
{
@ -49,7 +51,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
// Processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
(srv->*m_process_func)(*m_stream_ctx, m_req, res);
// When there are still more responses to stream
if(m_stream_ctx->m_has_more)
@ -69,19 +71,19 @@ void request_stream_context<falco::output::service, falco::output::request, falc
}
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool errored)
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool error)
{
if(m_stream_ctx)
{
if(errored)
if(error)
{
// todo(leodido) > log error "error streaming: tag=this, state=m_state, stream=m_stream_ctx->m_stream"
}
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS;
m_stream_ctx->m_status = error ? stream_context::ERROR : stream_context::SUCCESS;
// Complete the processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe()
(srv->*m_process_func)(*m_stream_ctx, m_req, res);
}
else
{
@ -125,13 +127,66 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
}
template<>
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored)
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool error)
{
// todo(leodido) > handle processing errors here
// Ask to start processing requests
start(srv);
}
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::START\n");
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter<output::response, output::request>(srvctx));
auto cq = srv->m_completion_queue.get();
// Request to start processing given requests.
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request.
// In this way, different contexts can serve different requests concurrently.
(srv->m_output_svc.*m_request_func)(srvctx, m_reader_writer.get(), cq, cq, this);
};
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS\n");
switch(m_state)
{
case request_context_base::REQUEST:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - REQUEST\n");
m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get()));
m_bidi_ctx->m_status = bidi_context::READY_TO_WRITE; // stream_context::STREAMING
m_state = request_context_base::WRITE;
m_reader_writer->Read(&m_req, this);
return;
case request_context_base::WRITE:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE\n");
m_state = request_context_base::WRITE;
m_reader_writer->Read(&m_req, this);
{
// Processing
output::response res;
(srv->*m_process_func)(*m_bidi_ctx, m_req, res);
}
return;
case request_context_base::FINISH:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - FINISH\n");
//
return;
default:
return;
}
};
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool error)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::END\n");
};
} // namespace grpc
} // namespace falco
} // namespace falco

View File

@ -40,7 +40,7 @@ public:
WRITE,
FINISH
} m_state = UNKNOWN;
virtual void start(server* srv) = 0;
virtual void process(server* srv) = 0;
virtual void end(server* srv, bool isError) = 0;
@ -65,7 +65,7 @@ public:
void start(server* srv);
void process(server* srv);
void end(server* srv, bool isError);
void end(server* srv, bool error);
private:
std::unique_ptr<::grpc::ServerAsyncWriter<Response>> m_res_writer;
@ -92,11 +92,37 @@ public:
void start(server* srv);
void process(server* srv);
void end(server* srv, bool isError);
void end(server* srv, bool error);
private:
std::unique_ptr<::grpc::ServerAsyncResponseWriter<Response>> m_res_writer;
Request m_req;
};
template<class Service, class Request, class Response>
class request_bidi_context : public request_context_base
{
public:
request_bidi_context():
m_process_func(nullptr),
m_request_func(nullptr){};
~request_bidi_context() = default;
// Pointer to function that does actual processing
void (server::*m_process_func)(const bidi_context&, const Request&, Response&);
// Pointer to function that requests the system to start processing given requests
void (Service::AsyncService::*m_request_func)(::grpc::ServerContext*, ::grpc::ServerAsyncReaderWriter<Response, Request>*, ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);
void start(server* srv);
void process(server* srv);
void end(server* srv, bool error);
private:
std::unique_ptr<::grpc::ServerAsyncReaderWriter<Response, Request>> m_reader_writer;
std::unique_ptr<bidi_context> m_bidi_ctx;
Request m_req;
};
} // namespace grpc
} // namespace falco
} // namespace falco