wip(userspace/falco): bidirectional gRPC outputs logic (initial)

Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2020-05-28 00:17:40 +00:00 committed by poiana
parent 01ae8701d9
commit 2ebc55f897
3 changed files with 102 additions and 22 deletions

View File

@ -36,7 +36,7 @@ class context
{
public:
context(::grpc::ServerContext* ctx);
~context() = default;
virtual ~context() = default;
void get_metadata(std::string key, std::string& val);
@ -63,23 +63,33 @@ public:
mutable bool m_has_more = false;
};
class bidi_context : public context
// class bidi_context : public context
// {
// public:
// bidi_context(::grpc::ServerContext* ctx):
// context(ctx){};
// ~bidi_context() = default;
// enum : char
// {
// WAIT_CONNECT = 1,
// READY_TO_WRITE,
// WAIT_WRITE_DONE,
// FINISHED,
// } m_status = WAIT_CONNECT;
// mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future
// mutable bool m_has_more = false; // fixme > needed?
// };
class bidi_context : public stream_context
{
public:
bidi_context(::grpc::ServerContext* ctx):
context(ctx){};
stream_context(ctx){};
~bidi_context() = default;
enum : char
{
WAIT_CONNECT = 1,
READY_TO_WRITE,
WAIT_WRITE_DONE,
FINISHED,
} m_status = WAIT_CONNECT;
mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future
mutable bool m_has_more = false; // fixme > needed?
mutable bool m_wait_write_done = false;
};
} // namespace grpc

View File

@ -51,7 +51,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
// Processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res);
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // get()
// When there are still more responses to stream
if(m_stream_ctx->m_has_more)
@ -83,7 +83,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc
// Complete the processing
output::response res;
(srv->*m_process_func)(*m_stream_ctx, m_req, res);
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // get()
}
else
{
@ -153,31 +153,60 @@ void request_bidi_context<falco::output::service, falco::output::request, falco:
template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS\n");
switch(m_state)
{
case request_context_base::REQUEST:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - REQUEST\n");
// m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get()));
// m_bidi_ctx->m_status = bidi_context::READY_TO_WRITE; // stream_context::STREAMING
// m_reader_writer->Read(&m_req, this);
// m_state = request_context_base::WRITE;
m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get()));
m_bidi_ctx->m_status = bidi_context::READY_TO_WRITE; // stream_context::STREAMING
m_bidi_ctx->m_status = bidi_context::STREAMING;
m_reader_writer->Read(&m_req, this); // todo > do we need this?
m_state = request_context_base::WRITE;
m_reader_writer->Read(&m_req, this);
return;
case request_context_base::WRITE:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE\n");
m_state = request_context_base::WRITE;
// m_state = request_context_base::WRITE;
// m_reader_writer->Read(&m_req, this);
// {
// // Processing
// output::response res;
// (srv->*m_process_func)(*m_bidi_ctx, m_req, res);
// }
m_reader_writer->Read(&m_req, this);
// fixme > to debug: print the address of m_bidi_ctx - is it the same?
if(!m_bidi_ctx->m_wait_write_done)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE - WAIT\n");
// Processing
output::response res;
(srv->*m_process_func)(*m_bidi_ctx, m_req, res);
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub()
}
// else
// {
// if(m_bidi_ctx->m_has_more)
// {
// falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - WRITE - NO WAIT/HAS MORE => WRITE OUT\n");
// // m_reader_writer->Write(res, this); // fixme > how to get the processed res?
// m_bidi_ctx->m_wait_write_done = true;
// }
// else
// {
// m_bidi_ctx->m_status = bidi_context::STREAMING;
// m_bidi_ctx->m_wait_write_done = false;
// }
// }
return;
case request_context_base::FINISH:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - FINISH\n");
//
return;
default:
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::PROCESS - UNKNOWN\n");
return;
}
};
@ -186,6 +215,23 @@ template<>
void request_bidi_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool error)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::END\n");
// todo > error management? status management?
// todo > when to call .Finish?
if(m_bidi_ctx->m_has_more && m_bidi_ctx->m_wait_write_done)
{
falco_logger::log(LOG_INFO, "REQUEST_BIDI_CONTEXT::END - HAS MORE && WAIT_WRITE_DONE\n");
falco_logger::log(LOG_INFO, "WRITE(OUT)\n");
// m_reader_writer->Write(res, this); // fixme > how to get the processed res?
m_bidi_ctx->m_wait_write_done = false;
}
else // todo > do we need to remove this else?
{
// m_bidi_ctx->m_status = bidi_context::STREAMING;
// m_bidi_ctx->m_wait_write_done = true;
start(srv);
}
};
} // namespace grpc

View File

@ -51,9 +51,33 @@ void falco::grpc::server_impl::get(const stream_context& ctx, const output::requ
void falco::grpc::server_impl::sub(const bidi_context& ctx, const output::request& req, output::response& res)
{
// todo
falco_logger::log(LOG_INFO, "SUB!\n");
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
return;
}
falco_logger::log(LOG_INFO, "SUB\n");
ctx.m_has_more = output::queue::get().unsafe_size() > 0;
if(ctx.m_has_more)
{
falco_logger::log(LOG_INFO, "SUB - HAS MORE? TRUE\n");
}
else
{
falco_logger::log(LOG_INFO, "SUB - HAS MORE? FALSE\n");
}
if(output::queue::get().try_pop(res))
{
falco_logger::log(LOG_INFO, "SUB - WAIT WRITE DONE: TRUE\n");
ctx.m_wait_write_done = true;
}
else
{
falco_logger::log(LOG_INFO, "SUB - WAIT WRITE DONE: FALSE\n");
ctx.m_wait_write_done = false;
}
}
void falco::grpc::server_impl::version(const context& ctx, const version::request&, version::response& res)