mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-30 04:34:27 +00:00
runtime: pass span context to agent in ttRPC client
Pass span context through ttRPC metadata, that agent can get parent from the context to create new sub-spans. Fixes: #1968 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
parent
a57118d03a
commit
ae46e7bf97
@ -2014,18 +2014,18 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) {
|
func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) {
|
||||||
ctx = context.Background()
|
newCtx = ctx
|
||||||
switch reqName {
|
switch reqName {
|
||||||
case grpcWaitProcessRequest, grpcGetOOMEventRequest:
|
case grpcWaitProcessRequest, grpcGetOOMEventRequest:
|
||||||
// Wait and GetOOMEvent have no timeout
|
// Wait and GetOOMEvent have no timeout
|
||||||
case grpcCheckRequest:
|
case grpcCheckRequest:
|
||||||
ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
|
newCtx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
|
||||||
default:
|
default:
|
||||||
ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
|
newCtx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx, cancel
|
return newCtx, cancel
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) {
|
func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) {
|
||||||
@ -2044,7 +2044,7 @@ func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (inter
|
|||||||
return nil, errors.New("Invalid request type")
|
return nil, errors.New("Invalid request type")
|
||||||
}
|
}
|
||||||
message := request.(proto.Message)
|
message := request.(proto.Message)
|
||||||
ctx, cancel := k.getReqContext(msgName)
|
ctx, cancel := k.getReqContext(spanCtx, msgName)
|
||||||
if cancel != nil {
|
if cancel != nil {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,11 @@ import (
|
|||||||
|
|
||||||
"github.com/mdlayher/vsock"
|
"github.com/mdlayher/vsock"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
otelLabel "go.opentelemetry.io/otel/label"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
grpcStatus "google.golang.org/grpc/status"
|
grpcStatus "google.golang.org/grpc/status"
|
||||||
|
|
||||||
"github.com/containerd/ttrpc"
|
"github.com/containerd/ttrpc"
|
||||||
@ -80,32 +84,8 @@ func NewAgentClient(ctx context.Context, sock string, timeout uint32) (*AgentCli
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
|
|
||||||
dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux)))
|
|
||||||
|
|
||||||
var tracer opentracing.Tracer
|
client := ttrpc.NewClient(conn, ttrpc.WithUnaryClientInterceptor(TraceUnaryClientInterceptor()))
|
||||||
|
|
||||||
span := opentracing.SpanFromContext(ctx)
|
|
||||||
|
|
||||||
// If the context contains a trace span, trace all client comms
|
|
||||||
if span != nil {
|
|
||||||
tracer = span.Tracer()
|
|
||||||
|
|
||||||
dialOpts = append(dialOpts,
|
|
||||||
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
|
|
||||||
dialOpts = append(dialOpts,
|
|
||||||
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
|
|
||||||
defer cancel()
|
|
||||||
conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
client := ttrpc.NewClient(conn)
|
|
||||||
|
|
||||||
return &AgentClient{
|
return &AgentClient{
|
||||||
AgentServiceClient: agentgrpc.NewAgentServiceClient(client),
|
AgentServiceClient: agentgrpc.NewAgentServiceClient(client),
|
||||||
@ -119,6 +99,89 @@ func (c *AgentClient) Close() error {
|
|||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TraceUnaryClientInterceptor() ttrpc.UnaryClientInterceptor {
|
||||||
|
return func(
|
||||||
|
ctx context.Context,
|
||||||
|
req *ttrpc.Request,
|
||||||
|
resp *ttrpc.Response,
|
||||||
|
ci *ttrpc.UnaryClientInfo,
|
||||||
|
invoker ttrpc.Invoker,
|
||||||
|
) error {
|
||||||
|
requestMetadata := make(ttrpc.MD)
|
||||||
|
|
||||||
|
tracer := otel.Tracer("kata")
|
||||||
|
var span trace.Span
|
||||||
|
ctx, span = tracer.Start(
|
||||||
|
ctx,
|
||||||
|
fmt.Sprintf("ttrpc.%s", req.Method),
|
||||||
|
trace.WithSpanKind(trace.SpanKindClient),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
inject(ctx, &requestMetadata)
|
||||||
|
ctx = ttrpc.WithMetadata(ctx, requestMetadata)
|
||||||
|
setRequest(req, &requestMetadata)
|
||||||
|
|
||||||
|
err := invoker(ctx, req, resp)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
span.SetAttributes(otelLabel.Key("RPC_ERROR").Bool(true))
|
||||||
|
}
|
||||||
|
// err can be nil, that will return an OK response code
|
||||||
|
if status, _ := status.FromError(err); status != nil {
|
||||||
|
span.SetAttributes(otelLabel.Key("RPC_CODE").Uint((uint)(status.Code())))
|
||||||
|
span.SetAttributes(otelLabel.Key("RPC_MESSAGE").String(status.Message()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type metadataSupplier struct {
|
||||||
|
metadata *ttrpc.MD
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *metadataSupplier) Get(key string) string {
|
||||||
|
values, ok := s.metadata.Get(key)
|
||||||
|
if !ok {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return values[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *metadataSupplier) Set(key string, value string) {
|
||||||
|
s.metadata.Set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func inject(ctx context.Context, metadata *ttrpc.MD) {
|
||||||
|
otel.GetTextMapPropagator().Inject(ctx, &metadataSupplier{
|
||||||
|
metadata: metadata,
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func setRequest(req *ttrpc.Request, md *ttrpc.MD) {
|
||||||
|
newMD := make([]*ttrpc.KeyValue, 0)
|
||||||
|
for _, kv := range req.Metadata {
|
||||||
|
// not found in md, means that we can copy old kv
|
||||||
|
// otherwise, we will use the values in md to overwrite it
|
||||||
|
if _, found := md.Get(kv.Key); !found {
|
||||||
|
newMD = append(newMD, kv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Metadata = newMD
|
||||||
|
|
||||||
|
for k, values := range *md {
|
||||||
|
for _, v := range values {
|
||||||
|
req.Metadata = append(req.Metadata, &ttrpc.KeyValue{
|
||||||
|
Key: k,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// vsock scheme is self-defined to be kept from being parsed by grpc.
|
// vsock scheme is self-defined to be kept from being parsed by grpc.
|
||||||
// Any format starting with "scheme://" will be parsed by grpc and we lose
|
// Any format starting with "scheme://" will be parsed by grpc and we lose
|
||||||
// all address information because vsock scheme is not supported by grpc.
|
// all address information because vsock scheme is not supported by grpc.
|
||||||
|
Loading…
Reference in New Issue
Block a user