mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 12:14:48 +00:00
runtime: pass span context to agent in ttRPC client
Pass span context through ttRPC metadata, that agent can get parent from the context to create new sub-spans. Fixes: #1968 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
parent
a57118d03a
commit
ae46e7bf97
@ -2014,18 +2014,18 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
||||
}
|
||||
}
|
||||
|
||||
func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) {
|
||||
ctx = context.Background()
|
||||
func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) {
|
||||
newCtx = ctx
|
||||
switch reqName {
|
||||
case grpcWaitProcessRequest, grpcGetOOMEventRequest:
|
||||
// Wait and GetOOMEvent have no timeout
|
||||
case grpcCheckRequest:
|
||||
ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
|
||||
newCtx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
|
||||
default:
|
||||
ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
|
||||
newCtx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
|
||||
}
|
||||
|
||||
return ctx, cancel
|
||||
return newCtx, cancel
|
||||
}
|
||||
|
||||
func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) {
|
||||
@ -2044,7 +2044,7 @@ func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (inter
|
||||
return nil, errors.New("Invalid request type")
|
||||
}
|
||||
message := request.(proto.Message)
|
||||
ctx, cancel := k.getReqContext(msgName)
|
||||
ctx, cancel := k.getReqContext(spanCtx, msgName)
|
||||
if cancel != nil {
|
||||
defer cancel()
|
||||
}
|
||||
|
@ -20,7 +20,11 @@ import (
|
||||
|
||||
"github.com/mdlayher/vsock"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opentelemetry.io/otel"
|
||||
otelLabel "go.opentelemetry.io/otel/label"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
grpcStatus "google.golang.org/grpc/status"
|
||||
|
||||
"github.com/containerd/ttrpc"
|
||||
@ -80,32 +84,8 @@ func NewAgentClient(ctx context.Context, sock string, timeout uint32) (*AgentCli
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
/*
|
||||
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
|
||||
dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux)))
|
||||
|
||||
var tracer opentracing.Tracer
|
||||
|
||||
span := opentracing.SpanFromContext(ctx)
|
||||
|
||||
// If the context contains a trace span, trace all client comms
|
||||
if span != nil {
|
||||
tracer = span.Tracer()
|
||||
|
||||
dialOpts = append(dialOpts,
|
||||
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
|
||||
dialOpts = append(dialOpts,
|
||||
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*/
|
||||
client := ttrpc.NewClient(conn)
|
||||
client := ttrpc.NewClient(conn, ttrpc.WithUnaryClientInterceptor(TraceUnaryClientInterceptor()))
|
||||
|
||||
return &AgentClient{
|
||||
AgentServiceClient: agentgrpc.NewAgentServiceClient(client),
|
||||
@ -119,6 +99,89 @@ func (c *AgentClient) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func TraceUnaryClientInterceptor() ttrpc.UnaryClientInterceptor {
|
||||
return func(
|
||||
ctx context.Context,
|
||||
req *ttrpc.Request,
|
||||
resp *ttrpc.Response,
|
||||
ci *ttrpc.UnaryClientInfo,
|
||||
invoker ttrpc.Invoker,
|
||||
) error {
|
||||
requestMetadata := make(ttrpc.MD)
|
||||
|
||||
tracer := otel.Tracer("kata")
|
||||
var span trace.Span
|
||||
ctx, span = tracer.Start(
|
||||
ctx,
|
||||
fmt.Sprintf("ttrpc.%s", req.Method),
|
||||
trace.WithSpanKind(trace.SpanKindClient),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
inject(ctx, &requestMetadata)
|
||||
ctx = ttrpc.WithMetadata(ctx, requestMetadata)
|
||||
setRequest(req, &requestMetadata)
|
||||
|
||||
err := invoker(ctx, req, resp)
|
||||
|
||||
if err != nil {
|
||||
span.SetAttributes(otelLabel.Key("RPC_ERROR").Bool(true))
|
||||
}
|
||||
// err can be nil, that will return an OK response code
|
||||
if status, _ := status.FromError(err); status != nil {
|
||||
span.SetAttributes(otelLabel.Key("RPC_CODE").Uint((uint)(status.Code())))
|
||||
span.SetAttributes(otelLabel.Key("RPC_MESSAGE").String(status.Message()))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
type metadataSupplier struct {
|
||||
metadata *ttrpc.MD
|
||||
}
|
||||
|
||||
func (s *metadataSupplier) Get(key string) string {
|
||||
values, ok := s.metadata.Get(key)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
return values[0]
|
||||
}
|
||||
|
||||
func (s *metadataSupplier) Set(key string, value string) {
|
||||
s.metadata.Set(key, value)
|
||||
}
|
||||
|
||||
func inject(ctx context.Context, metadata *ttrpc.MD) {
|
||||
otel.GetTextMapPropagator().Inject(ctx, &metadataSupplier{
|
||||
metadata: metadata,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func setRequest(req *ttrpc.Request, md *ttrpc.MD) {
|
||||
newMD := make([]*ttrpc.KeyValue, 0)
|
||||
for _, kv := range req.Metadata {
|
||||
// not found in md, means that we can copy old kv
|
||||
// otherwise, we will use the values in md to overwrite it
|
||||
if _, found := md.Get(kv.Key); !found {
|
||||
newMD = append(newMD, kv)
|
||||
}
|
||||
}
|
||||
|
||||
req.Metadata = newMD
|
||||
|
||||
for k, values := range *md {
|
||||
for _, v := range values {
|
||||
req.Metadata = append(req.Metadata, &ttrpc.KeyValue{
|
||||
Key: k,
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// vsock scheme is self-defined to be kept from being parsed by grpc.
|
||||
// Any format starting with "scheme://" will be parsed by grpc and we lose
|
||||
// all address information because vsock scheme is not supported by grpc.
|
||||
|
Loading…
Reference in New Issue
Block a user