runtime: pass span context to agent in ttRPC client

Pass span context through ttRPC metadata, that
agent can get parent from the context to create
new sub-spans.

Fixes: #1968

Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
bin 2021-03-12 14:59:27 +08:00
parent a57118d03a
commit ae46e7bf97
2 changed files with 94 additions and 31 deletions

View File

@ -2014,18 +2014,18 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
}
}
func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) {
ctx = context.Background()
func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) {
newCtx = ctx
switch reqName {
case grpcWaitProcessRequest, grpcGetOOMEventRequest:
// Wait and GetOOMEvent have no timeout
case grpcCheckRequest:
ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
newCtx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
default:
ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
newCtx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
}
return ctx, cancel
return newCtx, cancel
}
func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) {
@ -2044,7 +2044,7 @@ func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (inter
return nil, errors.New("Invalid request type")
}
message := request.(proto.Message)
ctx, cancel := k.getReqContext(msgName)
ctx, cancel := k.getReqContext(spanCtx, msgName)
if cancel != nil {
defer cancel()
}

View File

@ -20,7 +20,11 @@ import (
"github.com/mdlayher/vsock"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
otelLabel "go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcStatus "google.golang.org/grpc/status"
"github.com/containerd/ttrpc"
@ -80,32 +84,8 @@ func NewAgentClient(ctx context.Context, sock string, timeout uint32) (*AgentCli
if err != nil {
return nil, err
}
/*
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux)))
var tracer opentracing.Tracer
span := opentracing.SpanFromContext(ctx)
// If the context contains a trace span, trace all client comms
if span != nil {
tracer = span.Tracer()
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
dialOpts = append(dialOpts,
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...)
if err != nil {
return nil, err
}
*/
client := ttrpc.NewClient(conn)
client := ttrpc.NewClient(conn, ttrpc.WithUnaryClientInterceptor(TraceUnaryClientInterceptor()))
return &AgentClient{
AgentServiceClient: agentgrpc.NewAgentServiceClient(client),
@ -119,6 +99,89 @@ func (c *AgentClient) Close() error {
return c.conn.Close()
}
func TraceUnaryClientInterceptor() ttrpc.UnaryClientInterceptor {
return func(
ctx context.Context,
req *ttrpc.Request,
resp *ttrpc.Response,
ci *ttrpc.UnaryClientInfo,
invoker ttrpc.Invoker,
) error {
requestMetadata := make(ttrpc.MD)
tracer := otel.Tracer("kata")
var span trace.Span
ctx, span = tracer.Start(
ctx,
fmt.Sprintf("ttrpc.%s", req.Method),
trace.WithSpanKind(trace.SpanKindClient),
)
defer span.End()
inject(ctx, &requestMetadata)
ctx = ttrpc.WithMetadata(ctx, requestMetadata)
setRequest(req, &requestMetadata)
err := invoker(ctx, req, resp)
if err != nil {
span.SetAttributes(otelLabel.Key("RPC_ERROR").Bool(true))
}
// err can be nil, that will return an OK response code
if status, _ := status.FromError(err); status != nil {
span.SetAttributes(otelLabel.Key("RPC_CODE").Uint((uint)(status.Code())))
span.SetAttributes(otelLabel.Key("RPC_MESSAGE").String(status.Message()))
}
return err
}
}
type metadataSupplier struct {
metadata *ttrpc.MD
}
func (s *metadataSupplier) Get(key string) string {
values, ok := s.metadata.Get(key)
if !ok {
return ""
}
return values[0]
}
func (s *metadataSupplier) Set(key string, value string) {
s.metadata.Set(key, value)
}
func inject(ctx context.Context, metadata *ttrpc.MD) {
otel.GetTextMapPropagator().Inject(ctx, &metadataSupplier{
metadata: metadata,
})
}
func setRequest(req *ttrpc.Request, md *ttrpc.MD) {
newMD := make([]*ttrpc.KeyValue, 0)
for _, kv := range req.Metadata {
// not found in md, means that we can copy old kv
// otherwise, we will use the values in md to overwrite it
if _, found := md.Get(kv.Key); !found {
newMD = append(newMD, kv)
}
}
req.Metadata = newMD
for k, values := range *md {
for _, v := range values {
req.Metadata = append(req.Metadata, &ttrpc.KeyValue{
Key: k,
Value: v,
})
}
}
}
// vsock scheme is self-defined to be kept from being parsed by grpc.
// Any format starting with "scheme://" will be parsed by grpc and we lose
// all address information because vsock scheme is not supported by grpc.