migrate apiserver utiltrace usage to component-base/tracing

This commit is contained in:
David Ashpole 2022-10-20 18:15:38 +00:00
parent 6e31c6531f
commit de26b9023f
No known key found for this signature in database
GPG Key ID: 563A85007BFA1BA2
19 changed files with 276 additions and 231 deletions

View File

@ -17,6 +17,7 @@ require (
github.com/stretchr/testify v1.8.0
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/v3 v3.5.5
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21
google.golang.org/grpc v1.49.0
@ -95,7 +96,6 @@ require (
go.etcd.io/etcd/server/v3 v3.5.5 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.0 // indirect
go.opentelemetry.io/otel v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 // indirect

View File

@ -22,6 +22,8 @@ import (
"fmt"
"time"
"go.opentelemetry.io/otel/attribute"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
@ -35,7 +37,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/rest"
utiltrace "k8s.io/utils/trace"
"k8s.io/component-base/tracing"
)
type webhookConverterFactory struct {
@ -226,6 +228,7 @@ func getConvertedObjectsFromResponse(expectedUID types.UID, response runtime.Obj
}
func (c *webhookConverter) Convert(in runtime.Object, toGV schema.GroupVersion) (runtime.Object, error) {
ctx := context.TODO()
// In general, the webhook should not do any defaulting or validation. A special case of that is an empty object
// conversion that must result an empty object and practically is the same as nopConverter.
// A smoke test in API machinery calls the converter on empty objects. As this case happens consistently
@ -258,24 +261,23 @@ func (c *webhookConverter) Convert(in runtime.Object, toGV schema.GroupVersion)
return out, nil
}
trace := utiltrace.New("Call conversion webhook",
utiltrace.Field{"custom-resource-definition", c.name},
utiltrace.Field{"desired-api-version", desiredAPIVersion},
utiltrace.Field{"object-count", objCount},
utiltrace.Field{"UID", requestUID})
ctx, span := tracing.Start(ctx, "Call conversion webhook",
attribute.String("custom-resource-definition", c.name),
attribute.String("desired-api-version", desiredAPIVersion),
attribute.Int("object-count", objCount),
attribute.String("UID", string(requestUID)))
// Only log conversion webhook traces that exceed a 8ms per object limit plus a 50ms request overhead allowance.
// The per object limit uses the SLO for conversion webhooks (~4ms per object) plus time to serialize/deserialize
// the conversion request on the apiserver side (~4ms per object).
defer trace.LogIfLong(time.Duration(50+8*objCount) * time.Millisecond)
defer span.End(time.Duration(50+8*objCount) * time.Millisecond)
// TODO: Figure out if adding one second timeout make sense here.
ctx := context.TODO()
r := c.restClient.Post().Body(request).Do(ctx)
if err := r.Into(response); err != nil {
// TODO: Return a webhook specific error to be able to convert it to meta.Status
return nil, fmt.Errorf("conversion webhook for %v failed: %v", in.GetObjectKind().GroupVersionKind(), err)
}
trace.Step("Request completed")
span.AddEvent("Request completed")
convertedObjects, err := getConvertedObjectsFromResponse(requestUID, response)
if err != nil {

View File

@ -24,6 +24,7 @@ import (
"time"
jsonpatch "github.com/evanphx/json-patch"
"go.opentelemetry.io/otel/attribute"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/klog/v2"
@ -46,7 +47,7 @@ import (
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
utiltrace "k8s.io/utils/trace"
"k8s.io/component-base/tracing"
)
const (
@ -233,14 +234,14 @@ func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admiss
if err != nil {
return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("could not get REST client: %w", err), Status: apierrors.NewBadRequest("error getting REST client")}
}
trace := utiltrace.New("Call mutating webhook",
utiltrace.Field{"configuration", configurationName},
utiltrace.Field{"webhook", h.Name},
utiltrace.Field{"resource", attr.GetResource()},
utiltrace.Field{"subresource", attr.GetSubresource()},
utiltrace.Field{"operation", attr.GetOperation()},
utiltrace.Field{"UID", uid})
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "Call mutating webhook",
attribute.String("configuration", configurationName),
attribute.String("webhook", h.Name),
attribute.Stringer("resource", attr.GetResource()),
attribute.String("subresource", attr.GetSubresource()),
attribute.String("operation", string(attr.GetOperation())),
attribute.String("UID", string(uid)))
defer span.End(500 * time.Millisecond)
// if the webhook has a specific timeout, wrap the context to apply it
if h.TimeoutSeconds != nil {
@ -279,7 +280,7 @@ func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admiss
}
return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("failed to call webhook: %w", err), Status: status}
}
trace.Step("Request completed")
span.AddEvent("Request completed")
result, err := webhookrequest.VerifyAdmissionResponse(uid, true, response)
if err != nil {
@ -353,7 +354,7 @@ func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admiss
}
changed = !apiequality.Semantic.DeepEqual(attr.VersionedObject, newVersionedObject)
trace.Step("Patch applied")
span.AddEvent("Patch applied")
annotator.addPatchAnnotation(patchObj, result.PatchType)
attr.Dirty = true
attr.VersionedObject = newVersionedObject

View File

@ -22,6 +22,8 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
v1 "k8s.io/api/admissionregistration/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -35,8 +37,8 @@ import (
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
const (
@ -232,14 +234,14 @@ func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWeb
if err != nil {
return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("could not get REST client: %w", err), Status: apierrors.NewBadRequest("error getting REST client")}
}
trace := utiltrace.New("Call validating webhook",
utiltrace.Field{"configuration", invocation.Webhook.GetConfigurationName()},
utiltrace.Field{"webhook", h.Name},
utiltrace.Field{"resource", attr.GetResource()},
utiltrace.Field{"subresource", attr.GetSubresource()},
utiltrace.Field{"operation", attr.GetOperation()},
utiltrace.Field{"UID", uid})
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "Call validating webhook",
attribute.String("configuration", invocation.Webhook.GetConfigurationName()),
attribute.String("webhook", h.Name),
attribute.Stringer("resource", attr.GetResource()),
attribute.String("subresource", attr.GetSubresource()),
attribute.String("operation", string(attr.GetOperation())),
attribute.String("UID", string(uid)))
defer span.End(500 * time.Millisecond)
// if the webhook has a specific timeout, wrap the context to apply it
if h.TimeoutSeconds != nil {
@ -278,7 +280,7 @@ func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWeb
}
return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("failed to call webhook: %w", err), Status: status}
}
trace.Step("Request completed")
span.AddEvent("Request completed")
result, err := webhookrequest.VerifyAdmissionResponse(uid, false, response)
if err != nil {

View File

@ -26,6 +26,8 @@ import (
"unicode"
"unicode/utf8"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
@ -42,17 +44,18 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
var namespaceGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// For performance tracking purposes.
trace := utiltrace.New("Create", traceFields(req)...)
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "Create", traceFields(req)...)
defer span.End(500 * time.Millisecond)
namespace, name, err := scope.Namer.Name(req)
if err != nil {
@ -72,7 +75,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
// enforce a timeout of at most requestTimeoutUpperBound (34s) or less if the user-provided
// timeout inside the parent context is lower than requestTimeoutUpperBound.
ctx, cancel := context.WithTimeout(req.Context(), requestTimeoutUpperBound)
ctx, cancel := context.WithTimeout(ctx, requestTimeoutUpperBound)
defer cancel()
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
@ -88,11 +91,12 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
body, err := limitedReadBodyWithRecordMetric(ctx, req, scope.MaxRequestBodyBytes, scope.Resource.GroupResource().String(), requestmetrics.Create)
trace.Step("limitedReadBody done", utiltrace.Field{"len", len(body)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("limitedReadBody failed", attribute.Int("len", len(body)), attribute.String("err", err.Error()))
scope.err(err, w, req)
return
}
span.AddEvent("limitedReadBody succeeded", attribute.Int("len", len(body)))
options := &metav1.CreateOptions{}
values := req.URL.Query()
@ -118,7 +122,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion)
trace.Step("About to convert to expected version")
span.AddEvent("About to convert to expected version")
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
if err != nil {
strictError, isStrictError := runtime.AsStrictDecodingError(err)
@ -141,7 +145,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
scope.err(err, w, req)
return
}
trace.Step("Conversion done")
span.AddEvent("Conversion done")
// On create, get name from new object if unset
if len(name) == 0 {
@ -168,7 +172,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
}
trace.Step("About to store object in database")
span.AddEvent("About to store object in database")
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
requestFunc := func() (runtime.Object, error) {
return r.Create(
@ -208,11 +212,12 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
return result, err
})
trace.Step("Write to database call finished", utiltrace.Field{"len", len(body)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("Write to database call failed", attribute.Int("len", len(body)), attribute.String("err", err.Error()))
scope.err(err, w, req)
return
}
span.AddEvent("Write to database call succeeded", attribute.Int("len", len(body)))
code := http.StatusCreated
status, ok := result.(*metav1.Status)
@ -220,9 +225,9 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
status.Code = int32(code)
}
trace.Step("About to write a response")
defer trace.Step("Writing http response done")
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
span.AddEvent("About to write a response")
defer span.AddEvent("Writing http response done")
transformResponseObject(ctx, scope, req, w, code, outputMediaType, result)
}
}

View File

@ -22,6 +22,8 @@ import (
"net/http"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
@ -38,16 +40,17 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utiltrace "k8s.io/utils/trace"
"k8s.io/component-base/tracing"
)
// DeleteResource returns a function that will handle a resource deletion
// TODO admission here becomes solely validating admission
func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// For performance tracking purposes.
trace := utiltrace.New("Delete", traceFields(req)...)
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "Delete", traceFields(req)...)
defer span.End(500 * time.Millisecond)
namespace, name, err := scope.Namer.Name(req)
if err != nil {
@ -57,7 +60,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
// enforce a timeout of at most requestTimeoutUpperBound (34s) or less if the user-provided
// timeout inside the parent context is lower than requestTimeoutUpperBound.
ctx, cancel := context.WithTimeout(req.Context(), requestTimeoutUpperBound)
ctx, cancel := context.WithTimeout(ctx, requestTimeoutUpperBound)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
@ -72,11 +75,12 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
options := &metav1.DeleteOptions{}
if allowsOptions {
body, err := limitedReadBodyWithRecordMetric(ctx, req, scope.MaxRequestBodyBytes, scope.Resource.GroupResource().String(), requestmetrics.Delete)
trace.Step("limitedReadBody done", utiltrace.Field{"len", len(body)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("limitedReadBody failed", attribute.Int("len", len(body)), attribute.String("err", err.Error()))
scope.err(err, w, req)
return
}
span.AddEvent("limitedReadBody succeeded", attribute.Int("len", len(body)))
if len(body) > 0 {
s, err := negotiation.NegotiateInputSerializer(req, false, metainternalversionscheme.Codecs)
if err != nil {
@ -95,11 +99,11 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), w, req)
return
}
trace.Step("Decoded delete options")
span.AddEvent("Decoded delete options")
objGV := gvk.GroupVersion()
audit.LogRequestObject(req.Context(), obj, objGV, scope.Resource, scope.Subresource, metainternalversionscheme.Codecs)
trace.Step("Recorded the audit event")
span.AddEvent("Recorded the audit event")
} else {
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, options); err != nil {
err = errors.NewBadRequest(err.Error())
@ -115,7 +119,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
}
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("DeleteOptions"))
trace.Step("About to delete object from database")
span.AddEvent("About to delete object from database")
wasDeleted := true
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
@ -128,7 +132,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
scope.err(err, w, req)
return
}
trace.Step("Object deleted from database")
span.AddEvent("Object deleted from database")
status := http.StatusOK
// Return http.StatusAccepted if the resource was not deleted immediately and
@ -155,17 +159,18 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
}
}
trace.Step("About to write a response")
defer trace.Step("Writing http response done")
transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)
span.AddEvent("About to write a response")
defer span.AddEvent("Writing http response done")
transformResponseObject(ctx, scope, req, w, status, outputMediaType, result)
}
}
// DeleteCollection returns a function that will handle a collection deletion
func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
trace := utiltrace.New("Delete", traceFields(req)...)
defer trace.LogIfLong(500 * time.Millisecond)
ctx := req.Context()
ctx, span := tracing.Start(ctx, "Delete", traceFields(req)...)
defer span.End(500 * time.Millisecond)
namespace, err := scope.Namer.Namespace(req)
if err != nil {
@ -175,7 +180,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
// enforce a timeout of at most requestTimeoutUpperBound (34s) or less if the user-provided
// timeout inside the parent context is lower than requestTimeoutUpperBound.
ctx, cancel := context.WithTimeout(req.Context(), requestTimeoutUpperBound)
ctx, cancel := context.WithTimeout(ctx, requestTimeoutUpperBound)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
@ -216,11 +221,12 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
options := &metav1.DeleteOptions{}
if checkBody {
body, err := limitedReadBodyWithRecordMetric(ctx, req, scope.MaxRequestBodyBytes, scope.Resource.GroupResource().String(), requestmetrics.DeleteCollection)
trace.Step("limitedReadBody done", utiltrace.Field{"len", len(body)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("limitedReadBody failed", attribute.Int("len", len(body)), attribute.String("err", err.Error()))
scope.err(err, w, req)
return
}
span.AddEvent("limitedReadBody succeeded", attribute.Int("len", len(body)))
if len(body) > 0 {
s, err := negotiation.NegotiateInputSerializer(req, false, metainternalversionscheme.Codecs)
if err != nil {
@ -280,8 +286,8 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
}
}
trace.Step("About to write a response")
defer trace.Step("Writing http response done")
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
span.AddEvent("About to write a response")
defer span.AddEvent("Writing http response done")
transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result)
}
}

View File

@ -25,42 +25,42 @@ import (
"strings"
"time"
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
utiltrace "k8s.io/utils/trace"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
// getterFunc performs a get request with the given context and object name. The request
// may be used to deserialize an options object to pass to the getter.
type getterFunc func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error)
type getterFunc func(ctx context.Context, name string, req *http.Request) (runtime.Object, error)
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
trace := utiltrace.New("Get", traceFields(req)...)
defer trace.LogIfLong(500 * time.Millisecond)
ctx := req.Context()
ctx, span := tracing.Start(ctx, "Get", traceFields(req)...)
defer span.End(500 * time.Millisecond)
namespace, name, err := scope.Namer.Name(req)
if err != nil {
scope.err(err, w, req)
return
}
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
@ -69,22 +69,22 @@ func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc
return
}
result, err := getter(ctx, name, req, trace)
result, err := getter(ctx, name, req)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("About to write a response")
defer trace.Step("Writing http response done")
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
span.AddEvent("About to write a response")
defer span.AddEvent("Writing http response done")
transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result)
}
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
func(ctx context.Context, name string, req *http.Request) (runtime.Object, error) {
// check for export
options := metav1.GetOptions{}
if values := req.URL.Query(); len(values) > 0 {
@ -104,9 +104,7 @@ func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return nil, err
}
}
if trace != nil {
trace.Step("About to Get from storage")
}
tracing.SpanFromContext(ctx).AddEvent("About to Get from storage")
return r.Get(ctx, name, &options)
})
}
@ -114,16 +112,15 @@ func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
// GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResourceWithOptions(r rest.GetterWithOptions, scope *RequestScope, isSubresource bool) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
func(ctx context.Context, name string, req *http.Request) (runtime.Object, error) {
opts, subpath, subpathKey := r.NewGetOptions()
trace.Step("About to process Get options")
span := tracing.SpanFromContext(ctx)
span.AddEvent("About to process Get options")
if err := getRequestOptions(req, scope, opts, subpath, subpathKey, isSubresource); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
if trace != nil {
trace.Step("About to Get from storage")
}
span.AddEvent("About to Get from storage")
return r.Get(ctx, name, opts)
})
}
@ -168,8 +165,9 @@ func getRequestOptions(req *http.Request, scope *RequestScope, into runtime.Obje
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// For performance tracking purposes.
trace := utiltrace.New("List", traceFields(req)...)
ctx, span := tracing.Start(ctx, "List", traceFields(req)...)
namespace, err := scope.Namer.Namespace(req)
if err != nil {
@ -185,7 +183,6 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
hasName = false
}
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
@ -273,15 +270,15 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
}
// Log only long List requests (ignore Watch).
defer trace.LogIfLong(500 * time.Millisecond)
trace.Step("About to List from storage")
defer span.End(500 * time.Millisecond)
span.AddEvent("About to List from storage")
result, err := r.List(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Listing from storage done")
defer trace.Step("Writing http response done", utiltrace.Field{"count", meta.LenList(result)})
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
span.AddEvent("Listing from storage done")
defer span.AddEvent("Writing http response done", attribute.Int("count", meta.LenList(result)))
transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result)
}
}

View File

@ -24,6 +24,7 @@ import (
"time"
jsonpatch "github.com/evanphx/json-patch"
"go.opentelemetry.io/otel/attribute"
kjson "sigs.k8s.io/json"
"k8s.io/apimachinery/pkg/api/errors"
@ -50,7 +51,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utiltrace "k8s.io/utils/trace"
"k8s.io/component-base/tracing"
)
const (
@ -61,9 +62,10 @@ const (
// PatchResource returns a function that will handle a resource patch.
func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interface, patchTypes []string) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// For performance tracking purposes.
trace := utiltrace.New("Patch", traceFields(req)...)
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "Patch", traceFields(req)...)
defer span.End(500 * time.Millisecond)
// Do this first, otherwise name extraction can fail for unrecognized content types
// TODO: handle this in negotiation
@ -88,7 +90,7 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
// enforce a timeout of at most requestTimeoutUpperBound (34s) or less if the user-provided
// timeout inside the parent context is lower than requestTimeoutUpperBound.
ctx, cancel := context.WithTimeout(req.Context(), requestTimeoutUpperBound)
ctx, cancel := context.WithTimeout(ctx, requestTimeoutUpperBound)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
@ -100,11 +102,12 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
}
patchBytes, err := limitedReadBodyWithRecordMetric(ctx, req, scope.MaxRequestBodyBytes, scope.Resource.GroupResource().String(), requestmetrics.Patch)
trace.Step("limitedReadBody done", utiltrace.Field{"len", len(patchBytes)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("limitedReadBody failed", attribute.Int("len", len(patchBytes)), attribute.String("err", err.Error()))
scope.err(err, w, req)
return
}
span.AddEvent("limitedReadBody succeeded", attribute.Int("len", len(patchBytes)))
options := &metav1.PatchOptions{}
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, options); err != nil {
@ -122,7 +125,7 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
admit = admission.WithAudit(admit)
audit.LogRequestPatch(req.Context(), patchBytes)
trace.Step("Recorded the audit event")
span.AddEvent("Recorded the audit event")
baseContentType := runtime.ContentTypeJSON
if patchType == types.ApplyPatchType {
@ -219,8 +222,6 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
patchType: patchType,
patchBytes: patchBytes,
userAgent: req.UserAgent(),
trace: trace,
}
result, wasCreated, err := p.patchResource(ctx, scope)
@ -228,16 +229,16 @@ func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interfac
scope.err(err, w, req)
return
}
trace.Step("Object stored in database")
span.AddEvent("Object stored in database")
status := http.StatusOK
if wasCreated {
status = http.StatusCreated
}
trace.Step("About to write a response")
defer trace.Step("Writing http response done")
transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)
span.AddEvent("About to write a response")
defer span.AddEvent("Writing http response done")
transformResponseObject(ctx, scope, req, w, status, outputMediaType, result)
}
}
@ -281,8 +282,6 @@ type patcher struct {
patchBytes []byte
userAgent string
trace *utiltrace.Trace
// Set at invocation-time (by applyPatch) and immutable thereafter
namespace string
updatedObjectInfo rest.UpdatedObjectInfo
@ -544,7 +543,7 @@ func strategicPatchObject(
// TODO: rename this function because the name implies it is related to applyPatcher
func (p *patcher) applyPatch(ctx context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
// Make sure we actually have a persisted currentObject
p.trace.Step("About to apply patch")
tracing.SpanFromContext(ctx).AddEvent("About to apply patch")
currentObjectHasUID, err := hasUID(currentObject)
if err != nil {
return nil, err
@ -593,7 +592,7 @@ func (p *patcher) admissionAttributes(ctx context.Context, updatedObject runtime
// and is given the currently persisted object and the patched object as input.
// TODO: rename this function because the name implies it is related to applyPatcher
func (p *patcher) applyAdmission(ctx context.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) {
p.trace.Step("About to check admission control")
tracing.SpanFromContext(ctx).AddEvent("About to check admission control")
var operation admission.Operation
var options runtime.Object
if hasUID, err := hasUID(currentObject); err != nil {

View File

@ -32,7 +32,6 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
utiltrace "k8s.io/utils/trace"
)
// transformObject takes the object as returned by storage and ensures it is in
@ -129,7 +128,7 @@ func targetEncodingForTransform(scope *RequestScope, mediaType negotiation.Media
// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
// Will write the complete response object.
func transformResponseObject(ctx context.Context, scope *RequestScope, trace *utiltrace.Trace, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
func transformResponseObject(ctx context.Context, scope *RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
options, err := optionsForTransform(mediaType, req)
if err != nil {
scope.err(err, w, req)

View File

@ -18,6 +18,7 @@ package responsewriters
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
@ -27,6 +28,8 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apiserver/pkg/features"
"k8s.io/apimachinery/pkg/runtime"
@ -40,7 +43,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flushwriter"
"k8s.io/apiserver/pkg/util/wsstream"
utiltrace "k8s.io/utils/trace"
"k8s.io/component-base/tracing"
)
// StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response.
@ -87,21 +90,22 @@ func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSe
// The context is optional and can be nil. This method will perform optional content compression if requested by
// a client and the feature gate for APIResponseCompression is enabled.
func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
trace := utiltrace.New("SerializeObject",
utiltrace.Field{"audit-id", audit.GetAuditIDTruncated(req.Context())},
utiltrace.Field{"method", req.Method},
utiltrace.Field{"url", req.URL.Path},
utiltrace.Field{"protocol", req.Proto},
utiltrace.Field{"mediaType", mediaType},
utiltrace.Field{"encoder", encoder.Identifier()})
defer trace.LogIfLong(5 * time.Second)
ctx := req.Context()
ctx, span := tracing.Start(ctx, "SerializeObject",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("method", req.Method),
attribute.String("url", req.URL.Path),
attribute.String("protocol", req.Proto),
attribute.String("mediaType", mediaType),
attribute.String("encoder", string(encoder.Identifier())))
defer span.End(5 * time.Second)
w := &deferredResponseWriter{
mediaType: mediaType,
statusCode: statusCode,
contentEncoding: negotiateContentEncoding(req),
hw: hw,
trace: trace,
ctx: ctx,
}
err := encoder.Encode(object, w)
@ -191,23 +195,30 @@ type deferredResponseWriter struct {
hw http.ResponseWriter
w io.Writer
trace *utiltrace.Trace
ctx context.Context
}
func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
if w.trace != nil {
// This Step usually wraps in-memory object serialization.
w.trace.Step("About to start writing response", utiltrace.Field{"size", len(p)})
ctx := w.ctx
span := tracing.SpanFromContext(ctx)
// This Step usually wraps in-memory object serialization.
span.AddEvent("About to start writing response", attribute.Int("size", len(p)))
firstWrite := !w.hasWritten
defer func() {
w.trace.Step("Write call finished",
utiltrace.Field{"writer", fmt.Sprintf("%T", w.w)},
utiltrace.Field{"size", len(p)},
utiltrace.Field{"firstWrite", firstWrite},
utiltrace.Field{"err", err})
}()
}
firstWrite := !w.hasWritten
defer func() {
if err != nil {
span.AddEvent("Write call failed",
attribute.String("writer", fmt.Sprintf("%T", w.w)),
attribute.Int("size", len(p)),
attribute.Bool("firstWrite", firstWrite),
attribute.String("err", err.Error()))
} else {
span.AddEvent("Write call succeeded",
attribute.String("writer", fmt.Sprintf("%T", w.w)),
attribute.Int("size", len(p)),
attribute.Bool("firstWrite", firstWrite))
}
}()
if w.hasWritten {
return w.w.Write(p)
}

View File

@ -52,7 +52,6 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
utiltrace "k8s.io/utils/trace"
)
var (
@ -537,8 +536,6 @@ func (tc *patchTestCase) Run(t *testing.T) {
name: name,
patchType: patchType,
patchBytes: patch,
trace: utiltrace.New("Patch", utiltrace.Field{"name", name}),
}
ctx, cancel := context.WithTimeout(ctx, time.Second)

View File

@ -19,15 +19,16 @@ package handlers
import (
"net/http"
utiltrace "k8s.io/utils/trace"
"go.opentelemetry.io/otel/attribute"
)
func traceFields(req *http.Request) []utiltrace.Field {
return []utiltrace.Field{
{Key: "url", Value: req.URL.Path},
{Key: "user-agent", Value: &lazyTruncatedUserAgent{req: req}},
{Key: "audit-id", Value: &lazyAuditID{req: req}},
{Key: "client", Value: &lazyClientIP{req: req}},
{Key: "accept", Value: &lazyAccept{req: req}},
{Key: "protocol", Value: req.Proto}}
func traceFields(req *http.Request) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("url", req.URL.Path),
attribute.Stringer("user-agent", &lazyTruncatedUserAgent{req: req}),
attribute.Stringer("audit-id", &lazyAuditID{req: req}),
attribute.Stringer("client", &lazyClientIP{req: req}),
attribute.Stringer("accept", &lazyAccept{req: req}),
attribute.String("protocol", req.Proto),
}
}

View File

@ -23,6 +23,8 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
@ -40,16 +42,17 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
// UpdateResource returns a function that will handle a resource update
func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// For performance tracking purposes.
trace := utiltrace.New("Update", traceFields(req)...)
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "Update", traceFields(req)...)
defer span.End(500 * time.Millisecond)
namespace, name, err := scope.Namer.Name(req)
if err != nil {
@ -59,7 +62,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
// enforce a timeout of at most requestTimeoutUpperBound (34s) or less if the user-provided
// timeout inside the parent context is lower than requestTimeoutUpperBound.
ctx, cancel := context.WithTimeout(req.Context(), requestTimeoutUpperBound)
ctx, cancel := context.WithTimeout(ctx, requestTimeoutUpperBound)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
@ -71,11 +74,12 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
body, err := limitedReadBodyWithRecordMetric(ctx, req, scope.MaxRequestBodyBytes, scope.Resource.GroupResource().String(), requestmetrics.Update)
trace.Step("limitedReadBody done", utiltrace.Field{"len", len(body)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("limitedReadBody failed", attribute.Int("len", len(body)), attribute.String("err", err.Error()))
scope.err(err, w, req)
return
}
span.AddEvent("limitedReadBody succeeded", attribute.Int("len", len(body)))
options := &metav1.UpdateOptions{}
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, options); err != nil {
@ -105,7 +109,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion)
trace.Step("About to convert to expected version")
span.AddEvent("About to convert to expected version")
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
if err != nil {
strictError, isStrictError := runtime.AsStrictDecodingError(err)
@ -128,7 +132,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
scope.err(err, w, req)
return
}
trace.Step("Conversion done")
span.AddEvent("Conversion done")
audit.LogRequestObject(req.Context(), obj, objGV, scope.Resource, scope.Subresource, scope.Serializer)
admit = admission.WithAudit(admit)
@ -207,7 +211,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
Name: name,
}
trace.Step("About to store object in database")
span.AddEvent("About to store object in database")
wasCreated := false
requestFunc := func() (runtime.Object, error) {
obj, created, err := r.Update(
@ -242,20 +246,21 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
return result, err
})
trace.Step("Write to database call finished", utiltrace.Field{"len", len(body)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("Write to database call failed", attribute.Int("len", len(body)), attribute.String("err", err.Error()))
scope.err(err, w, req)
return
}
span.AddEvent("Write to database call succeeded", attribute.Int("len", len(body)))
status := http.StatusOK
if wasCreated {
status = http.StatusCreated
}
trace.Step("About to write a response")
defer trace.Step("Writing http response done")
transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)
span.AddEvent("About to write a response")
defer span.AddEvent("Writing http response done")
transformResponseObject(ctx, scope, req, w, status, outputMediaType, result)
}
}

View File

@ -29,14 +29,15 @@ import (
"strings"
"time"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/apiserver"
egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
)
@ -239,8 +240,8 @@ func (d *dialerCreator) createDialer() utilnet.DialFunc {
return directDialer
}
return func(ctx context.Context, network, addr string) (net.Conn, error) {
trace := utiltrace.New(fmt.Sprintf("Proxy via %s protocol over %s", d.options.protocol, d.options.transport), utiltrace.Field{Key: "address", Value: addr})
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, fmt.Sprintf("Proxy via %s protocol over %s", d.options.protocol, d.options.transport), attribute.String("address", addr))
defer span.End(500 * time.Millisecond)
start := egressmetrics.Metrics.Clock().Now()
proxier, err := d.connector.connect(ctx)
if err != nil {

View File

@ -24,6 +24,8 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -42,9 +44,9 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
utiltrace "k8s.io/utils/trace"
)
var (
@ -593,7 +595,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return err
}
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, getRV, key)
if err != nil {
return err
}
@ -632,9 +634,9 @@ func shouldDelegateList(opts storage.ListOptions) bool {
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
}
func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) {
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
if !recursive {
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
if err != nil {
return nil, 0, "", err
}
@ -643,7 +645,7 @@ func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPred
}
return nil, readResourceVersion, "", nil
}
return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex())
}
// GetList implements storage.Interface
@ -669,15 +671,15 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
return c.storage.GetList(ctx, key, opts, listObj)
}
trace := utiltrace.New("cacher list",
utiltrace.Field{Key: "audit-id", Value: audit.GetAuditIDTruncated(ctx)},
utiltrace.Field{Key: "type", Value: c.groupResource.String()})
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "cacher list",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.Stringer("type", c.groupResource))
defer span.End(500 * time.Millisecond)
if err := c.ready.wait(); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
trace.Step("Ready")
span.AddEvent("Ready")
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
@ -693,16 +695,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}
filter := filterWithAttrsFunction(key, pred)
objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive)
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, key, pred, recursive)
if err != nil {
return err
}
trace.Step("Listed items from cache", utiltrace.Field{Key: "count", Value: len(objs)})
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
trace.Step("Resized result")
span.AddEvent("Resized result")
}
for _, obj := range objs {
elem, ok := obj.(*storeElement)
@ -713,7 +715,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step("Filtered items", utiltrace.Field{Key: "count", Value: listVal.Len()})
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err

View File

@ -17,6 +17,7 @@ limitations under the License.
package cacher
import (
"context"
"fmt"
"math"
"sort"
@ -32,9 +33,9 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
utiltrace "k8s.io/utils/trace"
)
const (
@ -424,7 +425,7 @@ func (w *watchCache) List() []interface{} {
// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
// NOTE: This function acquired lock and doesn't release it.
// You HAVE TO explicitly call w.RUnlock() after this function.
func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error {
func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion uint64) error {
startTime := w.clock.Now()
// In case resourceVersion is 0, we accept arbitrarily stale result.
@ -449,9 +450,8 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt
}
w.RLock()
if trace != nil {
trace.Step("watchCache locked acquired")
}
span := tracing.SpanFromContext(ctx)
span.AddEvent("watchCache locked acquired")
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= blockTimeout {
// Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds.
@ -459,16 +459,14 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt
}
w.cond.Wait()
}
if trace != nil {
trace.Step("watchCache fresh enough")
}
span.AddEvent("watchCache fresh enough")
return nil
}
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, string, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
defer w.RUnlock()
if err != nil {
return nil, 0, "", err
@ -487,8 +485,8 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues [
}
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
defer w.RUnlock()
if err != nil {
return nil, false, 0, err

View File

@ -17,6 +17,7 @@ limitations under the License.
package cacher
import (
"context"
"fmt"
"strconv"
"strings"
@ -359,6 +360,7 @@ func TestMarker(t *testing.T) {
}
func TestWaitUntilFreshAndList(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{
"l:label": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
@ -387,7 +389,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}()
// list by empty MatchValues.
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(5, nil, nil)
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -406,7 +408,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
{IndexName: "l:label", Value: "value1"},
{IndexName: "f:spec.nodeName", Value: "node2"},
}
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil)
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -425,7 +427,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
{IndexName: "l:not-exist-label", Value: "whatever"},
{IndexName: "f:spec.nodeName", Value: "node2"},
}
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil)
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -443,7 +445,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
matchValues = []storage.MatchValue{
{IndexName: "l:not-exist-label", Value: "whatever"},
}
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil)
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -459,6 +461,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}
func TestWaitUntilFreshAndGet(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
// In background, update the store.
@ -467,7 +470,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
store.Add(makeTestPod("bar", 5))
}()
obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(5, "prefix/ns/bar", nil)
obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(ctx, 5, "prefix/ns/bar")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -484,6 +487,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
}
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
fc := store.clock.(*testingclock.FakeClock)
@ -501,7 +505,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store.Add(makeTestPod("bar", 5))
}()
_, _, _, err := store.WaitUntilFreshAndList(5, nil, nil)
_, _, _, err := store.WaitUntilFreshAndList(ctx, 5, nil)
if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err)
}
@ -523,10 +527,11 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
}
func TestReflectorForWatchCache(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(5, &cache.Indexers{})
{
_, version, _, err := store.WaitUntilFreshAndList(0, nil, nil)
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -549,7 +554,7 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(wait.NeverStop)
{
_, version, _, err := store.WaitUntilFreshAndList(10, nil, nil)
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -27,6 +27,7 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/otel/attribute"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@ -43,8 +44,8 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
const (
@ -152,25 +153,26 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
trace := utiltrace.New("Create etcd3",
utiltrace.Field{Key: "audit-id", Value: audit.GetAuditIDTruncated(ctx)},
utiltrace.Field{Key: "key", Value: key},
utiltrace.Field{Key: "type", Value: getTypeName(obj)},
utiltrace.Field{Key: "resource", Value: s.groupResourceString},
ctx, span := tracing.Start(ctx, "Create etcd3",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("type", getTypeName(obj)),
attribute.String("resource", s.groupResourceString),
)
defer trace.LogIfLong(500 * time.Millisecond)
defer span.End(500 * time.Millisecond)
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
trace.Step("About to Encode")
span.AddEvent("About to Encode")
data, err := runtime.Encode(s.codec, obj)
trace.Step("Encode finished", utiltrace.Field{Key: "len", Value: len(data)}, utiltrace.Field{Key: "err", Value: err})
if err != nil {
span.AddEvent("Encode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
span.AddEvent("Encode succeeded", attribute.Int("len", len(data)))
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
@ -179,10 +181,11 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}
newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(key))
trace.Step("TransformToStorage finished", utiltrace.Field{Key: "err", Value: err})
if err != nil {
span.AddEvent("TransformToStorage failed", attribute.String("err", err.Error()))
return storage.NewInternalError(err.Error())
}
span.AddEvent("TransformToStorage succeeded")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
@ -191,10 +194,11 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
clientv3.OpPut(key, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", s.groupResourceString, startTime)
trace.Step("Txn call finished", utiltrace.Field{Key: "err", Value: err})
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
return err
}
span.AddEvent("Txn call succeeded")
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
@ -203,8 +207,11 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
trace.Step("decode finished", utiltrace.Field{Key: "len", Value: len(data)}, utiltrace.Field{Key: "err", Value: err})
return err
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
}
return nil
}
@ -331,12 +338,12 @@ func (s *store) conditionalDelete(
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
trace := utiltrace.New("GuaranteedUpdate etcd3",
utiltrace.Field{Key: "audit-id", Value: audit.GetAuditIDTruncated(ctx)},
utiltrace.Field{Key: "key", Value: key},
utiltrace.Field{Key: "type", Value: getTypeName(destination)},
utiltrace.Field{Key: "resource", Value: s.groupResourceString})
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, "GuaranteedUpdate etcd3",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("type", getTypeName(destination)),
attribute.String("resource", s.groupResourceString))
defer span.End(500 * time.Millisecond)
v, err := conversion.EnforcePtr(destination)
if err != nil {
@ -365,7 +372,7 @@ func (s *store) GuaranteedUpdate(
if err != nil {
return err
}
trace.Step("initial value restored")
span.AddEvent("initial value restored")
transformContext := authenticatedDataString(key)
for {
@ -414,12 +421,13 @@ func (s *store) GuaranteedUpdate(
continue
}
trace.Step("About to Encode")
span.AddEvent("About to Encode")
data, err := runtime.Encode(s.codec, ret)
trace.Step("Encode finished", utiltrace.Field{Key: "len", Value: len(data)}, utiltrace.Field{Key: "err", Value: err})
if err != nil {
span.AddEvent("Encode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
span.AddEvent("Encode succeeded", attribute.Int("len", len(data)))
if !origState.stale && bytes.Equal(data, origState.data) {
// if we skipped the original Get in this loop, we must refresh from
// etcd in order to be sure the data in the store is equivalent to
@ -442,16 +450,17 @@ func (s *store) GuaranteedUpdate(
}
newData, err := s.transformer.TransformToStorage(ctx, data, transformContext)
trace.Step("TransformToStorage finished", utiltrace.Field{Key: "err", Value: err})
if err != nil {
span.AddEvent("TransformToStorage failed", attribute.String("err", err.Error()))
return storage.NewInternalError(err.Error())
}
span.AddEvent("TransformToStorage succeeded")
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
trace.Step("Transaction prepared")
span.AddEvent("Transaction prepared")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
@ -462,11 +471,12 @@ func (s *store) GuaranteedUpdate(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", s.groupResourceString, startTime)
trace.Step("Txn call finished", utiltrace.Field{Key: "err", Value: err})
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
return err
}
trace.Step("Transaction committed")
span.AddEvent("Txn call completed")
span.AddEvent("Transaction committed")
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
@ -474,15 +484,19 @@ func (s *store) GuaranteedUpdate(
if err != nil {
return err
}
trace.Step("Retry value restored")
span.AddEvent("Retry value restored")
origStateIsCurrent = true
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
trace.Step("decode finished", utiltrace.Field{Key: "len", Value: len(data)}, utiltrace.Field{Key: "err", Value: err})
return err
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
return nil
}
}
@ -528,14 +542,14 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate
trace := utiltrace.New(fmt.Sprintf("List(recursive=%v) etcd3", recursive),
utiltrace.Field{Key: "audit-id", Value: audit.GetAuditIDTruncated(ctx)},
utiltrace.Field{Key: "key", Value: key},
utiltrace.Field{Key: "resourceVersion", Value: resourceVersion},
utiltrace.Field{Key: "resourceVersionMatch", Value: match},
utiltrace.Field{Key: "limit", Value: pred.Limit},
utiltrace.Field{Key: "continue", Value: pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", recursive),
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resourceVersion", resourceVersion),
attribute.String("resourceVersionMatch", string(match)),
attribute.Int("limit", int(pred.Limit)),
attribute.String("continue", pred.Continue))
defer span.End(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err

View File

@ -134,7 +134,7 @@ func (b *backend) processEvents(ev ...*auditinternal.Event) error {
// allow enough time for the serialization/deserialization of audit events, which
// contain nested request and response objects plus additional event fields.
defer trace.LogIfLong(time.Duration(50+25*len(list.Items)) * time.Millisecond)
return b.w.RestClient.Post().Body(&list).Do(context.TODO())
return b.w.RestClient.Post().Body(&list).Do(context.Background())
}).Error()
}