diff --git a/src/runtime/virtcontainers/kata_agent.go b/src/runtime/virtcontainers/kata_agent.go index 365ac7e013..3c8bb07c0a 100644 --- a/src/runtime/virtcontainers/kata_agent.go +++ b/src/runtime/virtcontainers/kata_agent.go @@ -2014,18 +2014,18 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { } } -func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) { - ctx = context.Background() +func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) { + newCtx = ctx switch reqName { case grpcWaitProcessRequest, grpcGetOOMEventRequest: // Wait and GetOOMEvent have no timeout case grpcCheckRequest: - ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout) + newCtx, cancel = context.WithTimeout(ctx, checkRequestTimeout) default: - ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout) + newCtx, cancel = context.WithTimeout(ctx, defaultRequestTimeout) } - return ctx, cancel + return newCtx, cancel } func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) { @@ -2044,7 +2044,7 @@ func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (inter return nil, errors.New("Invalid request type") } message := request.(proto.Message) - ctx, cancel := k.getReqContext(msgName) + ctx, cancel := k.getReqContext(spanCtx, msgName) if cancel != nil { defer cancel() } diff --git a/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go b/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go index 50e4b9f8ba..362dca0ad2 100644 --- a/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go +++ b/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go @@ -20,7 +20,11 @@ import ( "github.com/mdlayher/vsock" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + otelLabel "go.opentelemetry.io/otel/label" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" grpcStatus "google.golang.org/grpc/status" "github.com/containerd/ttrpc" @@ -80,32 +84,8 @@ func NewAgentClient(ctx context.Context, sock string, timeout uint32) (*AgentCli if err != nil { return nil, err } - /* - dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} - dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux))) - var tracer opentracing.Tracer - - span := opentracing.SpanFromContext(ctx) - - // If the context contains a trace span, trace all client comms - if span != nil { - tracer = span.Tracer() - - dialOpts = append(dialOpts, - grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer))) - dialOpts = append(dialOpts, - grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer))) - } - - ctx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...) - if err != nil { - return nil, err - } - */ - client := ttrpc.NewClient(conn) + client := ttrpc.NewClient(conn, ttrpc.WithUnaryClientInterceptor(TraceUnaryClientInterceptor())) return &AgentClient{ AgentServiceClient: agentgrpc.NewAgentServiceClient(client), @@ -119,6 +99,89 @@ func (c *AgentClient) Close() error { return c.conn.Close() } +func TraceUnaryClientInterceptor() ttrpc.UnaryClientInterceptor { + return func( + ctx context.Context, + req *ttrpc.Request, + resp *ttrpc.Response, + ci *ttrpc.UnaryClientInfo, + invoker ttrpc.Invoker, + ) error { + requestMetadata := make(ttrpc.MD) + + tracer := otel.Tracer("kata") + var span trace.Span + ctx, span = tracer.Start( + ctx, + fmt.Sprintf("ttrpc.%s", req.Method), + trace.WithSpanKind(trace.SpanKindClient), + ) + defer span.End() + + inject(ctx, &requestMetadata) + ctx = ttrpc.WithMetadata(ctx, requestMetadata) + setRequest(req, &requestMetadata) + + err := invoker(ctx, req, resp) + + if err != nil { + span.SetAttributes(otelLabel.Key("RPC_ERROR").Bool(true)) + } + // err can be nil, that will return an OK response code + if status, _ := status.FromError(err); status != nil { + span.SetAttributes(otelLabel.Key("RPC_CODE").Uint((uint)(status.Code()))) + span.SetAttributes(otelLabel.Key("RPC_MESSAGE").String(status.Message())) + } + + return err + } +} + +type metadataSupplier struct { + metadata *ttrpc.MD +} + +func (s *metadataSupplier) Get(key string) string { + values, ok := s.metadata.Get(key) + if !ok { + return "" + } + return values[0] +} + +func (s *metadataSupplier) Set(key string, value string) { + s.metadata.Set(key, value) +} + +func inject(ctx context.Context, metadata *ttrpc.MD) { + otel.GetTextMapPropagator().Inject(ctx, &metadataSupplier{ + metadata: metadata, + }) + +} + +func setRequest(req *ttrpc.Request, md *ttrpc.MD) { + newMD := make([]*ttrpc.KeyValue, 0) + for _, kv := range req.Metadata { + // not found in md, means that we can copy old kv + // otherwise, we will use the values in md to overwrite it + if _, found := md.Get(kv.Key); !found { + newMD = append(newMD, kv) + } + } + + req.Metadata = newMD + + for k, values := range *md { + for _, v := range values { + req.Metadata = append(req.Metadata, &ttrpc.KeyValue{ + Key: k, + Value: v, + }) + } + } +} + // vsock scheme is self-defined to be kept from being parsed by grpc. // Any format starting with "scheme://" will be parsed by grpc and we lose // all address information because vsock scheme is not supported by grpc.