update(userspace/falco/grpc): simpler bidirectional context state

transitions

Co-Authored-By: Leonardo Di Donato <leodidonato@gmail.com>
Signed-off-by: Lorenzo Fontana <lo@linux.com>
This commit is contained in:
Lorenzo Fontana 2020-05-29 14:27:36 +02:00 committed by poiana
parent b9e6d65e69
commit 5bd9ba0529

View File

@ -16,7 +16,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "logger.h"
#include "grpc_request_context.h"
namespace falco
@ -41,7 +40,6 @@ void request_stream_context<falco::output::service, falco::output::request, falc
template<>
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
{
falco_logger::log(LOG_INFO, "process\n");
// When it is the 1st process call
if(m_state == request_context_base::REQUEST)
{
@ -53,21 +51,27 @@ void request_stream_context<falco::output::service, falco::output::request, falc
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // get()
if(!m_stream_ctx->m_is_running)
{
m_state = request_context_base::FINISH;
m_res_writer->Finish(::grpc::Status::OK, this);
return;
}
// When there are still more responses to stream
if(m_stream_ctx->m_has_more)
{
// todo(leodido) > log "write: tag=this, state=m_state"
m_res_writer->Write(res, this);
return;
}
// No more responses to stream
else
{
// Communicate to the gRPC runtime that we have finished.
// The memory address of "this" instance uniquely identifies the event.
m_state = request_context_base::FINISH;
// todo(leodido) > log "finish: tag=this, state=m_state"
m_res_writer->Finish(::grpc::Status::OK, this);
}
// Communicate to the gRPC runtime that we have finished.
// The memory address of "this" instance uniquely identifies the event.
m_state = request_context_base::FINISH;
// todo(leodido) > log "finish: tag=this, state=m_state"
m_res_writer->Finish(::grpc::Status::OK, this);
}
template<>
@ -138,7 +142,6 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::START\n");
m_state = request_context_base::REQUEST;
m_srv_ctx.reset(new ::grpc::ServerContext);
auto srvctx = m_srv_ctx.get();
@ -157,54 +160,37 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
switch(m_state)
{
case request_context_base::REQUEST:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - REQUEST\n");
m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get()));
m_bidi_ctx->m_status = bidi_context::STREAMING;
m_reader_writer->Read(&m_req, this); // todo > do we need this?
m_state = request_context_base::WRITE;
m_reader_writer->Read(&m_req, this);
return;
case request_context_base::WRITE:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE\n");
// m_reader_writer->Read(&m_req, this);
// fixme > to debug: print the address of m_bidi_ctx - is it the same?
if(!m_bidi_ctx->m_wait_write_done)
// Processing
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE - WAIT\n");
// Processing
output::response res;
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub()
if(m_bidi_ctx->m_has_more && m_bidi_ctx->m_wait_write_done)
if(!m_bidi_ctx->m_is_running)
{
m_bidi_ctx->m_wait_write_done = false;
falco_logger::log(LOG_INFO, "WRITE(OUTFAKE)\n");
m_state = request_context_base::FINISH;
m_reader_writer->Finish(::grpc::Status::OK, this);
return;
}
if(m_bidi_ctx->m_has_more)
{
m_state = request_context_base::WRITE;
m_reader_writer->Write(res, this);
return;
}
else
{
if(m_bidi_ctx->m_has_more)
{
m_bidi_ctx->m_wait_write_done = false;
falco_logger::log(LOG_INFO, "WRITE(OUTFAKEHASMORETRUEALTERNATIVEALTROCASO)\n");
m_reader_writer->Write(res, this);
}
else
{
m_bidi_ctx->m_wait_write_done = false;
falco_logger::log(LOG_INFO, "WRITE(OUTFAKEHASMORETRUEALTERNATIVEALTROCASO - WAIT TRUEEEEEEE)\n");
m_reader_writer->Read(&m_req, this);
m_state = request_context_base::WRITE;
}
}
m_state = request_context_base::WRITE;
m_reader_writer->Read(&m_req, this);
}
return;
case request_context_base::FINISH:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - FINISH\n");
return;
default:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - UNKNOWN\n");
return;
}
};
@ -212,24 +198,17 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool error)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::END\n");
// todo > error management? status management?
// todo > when to call .Finish?
if(m_bidi_ctx->m_has_more && m_bidi_ctx->m_wait_write_done)
if(m_bidi_ctx)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::END - HAS MORE && WAIT_WRITE_DONE\n");
falco_logger::log(LOG_INFO, "WRITE(OUT)\n");
// m_reader_writer->Write(res, this); // fixme > how to get the processed res?
m_bidi_ctx->m_wait_write_done = false;
}
else // todo > do we need to remove this else?
{
// m_bidi_ctx->m_status = bidi_context::STREAMING;
// m_bidi_ctx->m_wait_write_done = true;
start(srv);
m_bidi_ctx->m_status = error ? bidi_context::ERROR : bidi_context::SUCCESS;
// Complete the processing
output::response res;
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub()
}
// Ask to start processing requests
start(srv);
};
} // namespace grpc