update(userspace/falco/grpc): dealing with multiple streaming requests

Co-Authored-By: Leonardo Di Donato <leodidonato@gmail.com>
Signed-off-by: Lorenzo Fontana <lo@linux.com>
This commit is contained in:
Lorenzo Fontana 2020-05-28 15:56:06 +02:00 committed by poiana
parent 2ebc55f897
commit d9f2cda8cf

View File

@ -143,6 +143,7 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter<output::response, output::request>(srvctx));
m_req.Clear();
auto cq = srv->m_completion_queue.get();
// Request to start processing given requests.
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request.
@ -157,10 +158,6 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
{
case request_context_base::REQUEST:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - REQUEST\n");
// m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get()));
// m_bidi_ctx->m_status = bidi_context::READY_TO_WRITE; // stream_context::STREAMING
// m_reader_writer->Read(&m_req, this);
// m_state = request_context_base::WRITE;
m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get()));
m_bidi_ctx->m_status = bidi_context::STREAMING;
@ -169,15 +166,8 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
return;
case request_context_base::WRITE:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE\n");
// m_state = request_context_base::WRITE;
// m_reader_writer->Read(&m_req, this);
// {
// // Processing
// output::response res;
// (srv->*m_process_func)(*m_bidi_ctx, m_req, res);
// }
m_reader_writer->Read(&m_req, this);
// m_reader_writer->Read(&m_req, this);
// fixme > to debug: print the address of m_bidi_ctx - is it the same?
if(!m_bidi_ctx->m_wait_write_done)
{
@ -185,25 +175,33 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
// Processing
output::response res;
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub()
if(m_bidi_ctx->m_has_more && m_bidi_ctx->m_wait_write_done)
{
m_bidi_ctx->m_wait_write_done = false;
falco_logger::log(LOG_INFO, "WRITE(OUTFAKE)\n");
m_reader_writer->Write(res, this);
}
else
{
if(m_bidi_ctx->m_has_more)
{
m_bidi_ctx->m_wait_write_done = false;
falco_logger::log(LOG_INFO, "WRITE(OUTFAKEHASMORETRUEALTERNATIVEALTROCASO)\n");
m_reader_writer->Write(res, this);
}
else
{
m_bidi_ctx->m_wait_write_done = false;
falco_logger::log(LOG_INFO, "WRITE(OUTFAKEHASMORETRUEALTERNATIVEALTROCASO - WAIT TRUEEEEEEE)\n");
m_reader_writer->Read(&m_req, this);
m_state = request_context_base::WRITE;
}
}
}
// else
// {
// if(m_bidi_ctx->m_has_more)
// {
// falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE - NO WAIT/HAS MORE => WRITE OUT\n");
// // m_reader_writer->Write(res, this); // fixme > how to get the processed res?
// m_bidi_ctx->m_wait_write_done = true;
// }
// else
// {
// m_bidi_ctx->m_status = bidi_context::STREAMING;
// m_bidi_ctx->m_wait_write_done = false;
// }
// }
return;
case request_context_base::FINISH:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - FINISH\n");
//
return;
default:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - UNKNOWN\n");