new(userspace/falco): concrete initial implementation of the subscribe gRPC service

Co-authored-by: Lorenzo Fontana <lo@linux.com>
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
This commit is contained in:
Leonardo Di Donato 2020-05-27 20:04:33 +00:00 committed by poiana
parent be6c4b273d
commit 01ae8701d9
2 changed files with 18 additions and 12 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
#include "config_falco.h"
#include "grpc_server_impl.h"
#include "falco_output_queue.h"
#include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used
bool falco::grpc::server_impl::is_running()
@ -28,8 +29,9 @@ bool falco::grpc::server_impl::is_running()
return true;
}
void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output::request& req, output::response& res)
void falco::grpc::server_impl::get(const stream_context& ctx, const output::request& req, output::response& res)
{
falco_logger::log(LOG_INFO, "get\n");
if(ctx.m_status == stream_context::SUCCESS || ctx.m_status == stream_context::ERROR)
{
// todo(leodido) > log "status=ctx->m_status, stream=ctx->m_stream"
@ -38,19 +40,20 @@ void falco::grpc::server_impl::subscribe(const stream_context& ctx, const output
else
{
// Start or continue streaming
// todo(leodido) > check for m_status == stream_context::STREAMING?
// m_status == stream_context::STREAMING?
// todo(leodido) > set m_stream
if(output::queue::get().try_pop(res) && !req.keepalive())
{
ctx.m_has_more = true;
return;
falco_logger::log(LOG_INFO, "get - else\n");
ctx.m_has_more = output::queue::get().unsafe_size() > 1;
output::queue::get().try_pop(res);
}
while(is_running() && !output::queue::get().try_pop(res) && req.keepalive())
{
}
ctx.m_has_more = !is_running() ? false : req.keepalive();
}
void falco::grpc::server_impl::sub(const bidi_context& ctx, const output::request& req, output::response& res)
{
// todo
falco_logger::log(LOG_INFO, "SUB!\n");
}
void falco::grpc::server_impl::version(const context& ctx, const version::request&, version::response& res)

View File

@ -36,8 +36,11 @@ public:
protected:
bool is_running();
void subscribe(const stream_context& ctx, const output::request& req, output::response& res);
// Outputs
void get(const stream_context& ctx, const output::request& req, output::response& res);
void sub(const bidi_context& ctx, const output::request& req, output::response& res);
// Version
void version(const context& ctx, const version::request& req, version::response& res);
private: