diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go index 5a97c7d47f5..2f3197b2c01 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go @@ -154,6 +154,35 @@ func evaluatePolicyAndCreateAuditEvent(req *http.Request, policy audit.PolicyRul }, nil } +// writeLatencyToAnnotation writes the latency incurred in different +// layers of the apiserver to the annotations of the audit object. +// it should be invoked after ev.StageTimestamp has been set appropriately. +func writeLatencyToAnnotation(ctx context.Context, ev *auditinternal.Event) { + // we will track latency in annotation only when the total latency + // of the given request exceeds 500ms, this is in keeping with the + // traces in rest/handlers for create, delete, update, + // get, list, and deletecollection. + const threshold = 500 * time.Millisecond + latency := ev.StageTimestamp.Time.Sub(ev.RequestReceivedTimestamp.Time) + if latency <= threshold { + return + } + + // if we are tracking latency incurred inside different layers within + // the apiserver, add these as annotation to the audit event object. + layerLatencies := request.AuditAnnotationsFromLatencyTrackers(ctx) + if len(layerLatencies) == 0 { + // latency tracking is not enabled for this request + return + } + + // record the total latency for this request, for convenience. + audit.LogAnnotation(ev, "apiserver.latency.k8s.io/total", latency.String()) + for k, v := range layerLatencies { + audit.LogAnnotation(ev, k, v) + } +} + func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool { for _, stage := range omitStages { if ev.Stage == stage { @@ -161,11 +190,16 @@ func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.E } } - if ev.Stage == auditinternal.StageRequestReceived { + switch { + case ev.Stage == auditinternal.StageRequestReceived: ev.StageTimestamp = metav1.NewMicroTime(ev.RequestReceivedTimestamp.Time) - } else { + case ev.Stage == auditinternal.StageResponseComplete: + ev.StageTimestamp = metav1.NewMicroTime(time.Now()) + writeLatencyToAnnotation(ctx, ev) + default: ev.StageTimestamp = metav1.NewMicroTime(time.Now()) } + audit.ObserveEvent(ctx) return sink.ProcessEvents(ev) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/webhook_duration.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/webhook_duration.go index 39747a9996f..8ac47422a4e 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/webhook_duration.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/webhook_duration.go @@ -17,9 +17,18 @@ limitations under the License. package filters import ( + "context" + "fmt" "net/http" + "time" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/endpoints/responsewriter" +) + +var ( + watchVerbs = sets.NewString("watch") ) // WithLatencyTrackers adds a LatencyTrackers instance to the @@ -28,7 +37,44 @@ import ( func WithLatencyTrackers(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() + requestInfo, ok := request.RequestInfoFrom(ctx) + if !ok { + handleError(w, req, http.StatusInternalServerError, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong")) + return + } + + if watchVerbs.Has(requestInfo.Verb) { + handler.ServeHTTP(w, req) + return + } + req = req.WithContext(request.WithLatencyTrackers(ctx)) + w = responsewriter.WrapForHTTP1Or2(&writeLatencyTracker{ + ResponseWriter: w, + ctx: req.Context(), + }) + handler.ServeHTTP(w, req) }) } + +var _ http.ResponseWriter = &writeLatencyTracker{} +var _ responsewriter.UserProvidedDecorator = &writeLatencyTracker{} + +type writeLatencyTracker struct { + http.ResponseWriter + ctx context.Context +} + +func (wt *writeLatencyTracker) Unwrap() http.ResponseWriter { + return wt.ResponseWriter +} + +func (wt *writeLatencyTracker) Write(bs []byte) (int, error) { + startedAt := time.Now() + defer func() { + request.TrackResponseWriteLatency(wt.ctx, time.Since(startedAt)) + }() + + return wt.ResponseWriter.Write(bs) +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go index 55b83af0c3b..f5f311020ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" utiltrace "k8s.io/utils/trace" ) @@ -134,7 +135,13 @@ func transformResponseObject(ctx context.Context, scope *RequestScope, trace *ut scope.err(err, w, req) return } - obj, err := transformObject(ctx, result, options, mediaType, scope, req) + + var obj runtime.Object + do := func() { + obj, err = transformObject(ctx, result, options, mediaType, scope, req) + } + endpointsrequest.TrackTransformResponseObjectLatency(ctx, do) + if err != nil { scope.err(err, w, req) return diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index 0e4b98b8397..87ab7b4dfe4 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -36,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/request" + endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/flushwriter" @@ -271,7 +272,9 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiat audit.LogResponseObject(req.Context(), object, gv, s) encoder := s.EncoderForVersion(serializer.Serializer, gv) - SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object) + endpointsrequest.TrackSerializeResponseObjectLatency(req.Context(), func() { + SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object) + }) } // ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error. diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go index 641e8c7671b..120bc46bf8b 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go @@ -35,9 +35,20 @@ func maxDuration(d1 time.Duration, d2 time.Duration) time.Duration { return d2 } -// DurationTracker is a simple interface for tracking functions duration +// DurationTracker is a simple interface for tracking functions duration, +// it is safe for concurrent use by multiple goroutines. type DurationTracker interface { - Track(func()) + // Track measures time spent in the given function f and + // aggregates measured duration using aggregateFunction. + // if Track is invoked with f from multiple goroutines concurrently, + // then f must be safe to be invoked concurrently by multiple goroutines. + Track(f func()) + + // TrackDuration tracks latency from the specified duration + // and aggregate it using aggregateFunction + TrackDuration(time.Duration) + + // GetLatency returns the total latency incurred so far GetLatency() time.Duration } @@ -64,6 +75,14 @@ func (t *durationTracker) Track(f func()) { f() } +// TrackDuration tracks latency from the given duration +// using aggregateFunction +func (t *durationTracker) TrackDuration(d time.Duration) { + t.mu.Lock() + defer t.mu.Unlock() + t.latency = t.aggregateFunction(t.latency, d) +} + // GetLatency returns aggregated latency tracked by a tracker func (t *durationTracker) GetLatency() time.Duration { t.mu.Lock() @@ -96,6 +115,39 @@ type LatencyTrackers struct { // ValidatingWebhookTracker tracks the latency incurred in validating webhook(s). // Validate webhooks are done in parallel, so max function is used. ValidatingWebhookTracker DurationTracker + + // StorageTracker tracks the latency incurred inside the storage layer, + // it accounts for the time it takes to send data to the underlying + // storage layer (etcd) and get the complete response back. + // If a request involves N (N>=1) round trips to the underlying + // stogare layer, the latency will account for the total duration + // from these N round trips. + // It does not include the time incurred in admission, or validation. + StorageTracker DurationTracker + + // TransformTracker tracks the latency incurred in transforming the + // response object(s) returned from the underlying storage layer. + // This includes transforming the object to user's desired form + // (ie. as Table), and also setting appropriate API level fields. + // This does not include the latency incurred in serialization + // (json or protobuf) of the response object or writing + // of it to the http ResponseWriter object. + TransformTracker DurationTracker + + // SerializationTracker tracks the latency incurred in serialization + // (json or protobuf) of the response object. + // NOTE: serialization and writing of the serialized raw bytes to the + // associated http ResponseWriter object are interleaved, and hence + // the latency measured here will include the time spent writing the + // serialized raw bytes to the http ResponseWriter object. + SerializationTracker DurationTracker + + // ResponseWriteTracker tracks the latency incurred in writing the + // serialized raw bytes to the http ResponseWriter object (via the + // Write method) associated with the request. + // The Write method can be invoked multiple times, so we use a + // latency tracker that sums up the duration from each call. + ResponseWriteTracker DurationTracker } type latencyTrackersKeyType int @@ -116,6 +168,10 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co return WithValue(parent, latencyTrackersKey, &LatencyTrackers{ MutatingWebhookTracker: newSumLatencyTracker(c), ValidatingWebhookTracker: newMaxLatencyTracker(c), + StorageTracker: newSumLatencyTracker(c), + TransformTracker: newSumLatencyTracker(c), + SerializationTracker: newSumLatencyTracker(c), + ResponseWriteTracker: newSumLatencyTracker(c), }) } @@ -125,3 +181,92 @@ func LatencyTrackersFrom(ctx context.Context) (*LatencyTrackers, bool) { wd, ok := ctx.Value(latencyTrackersKey).(*LatencyTrackers) return wd, ok && wd != nil } + +// TrackTransformResponseObjectLatency is used to track latency incurred +// inside the function that takes an object returned from the underlying +// storage layer (etcd) and performs any necessary transformations +// of the response object. This does not include the latency incurred in +// serialization (json or protobuf) of the response object or writing of +// it to the http ResponseWriter object. +// When called multiple times, the latency incurred inside the +// transform func each time will be summed up. +func TrackTransformResponseObjectLatency(ctx context.Context, transform func()) { + if tracker, ok := LatencyTrackersFrom(ctx); ok { + tracker.TransformTracker.Track(transform) + return + } + + transform() +} + +// TrackStorageLatency is used to track latency incurred +// inside the underlying storage layer. +// When called multiple times, the latency provided will be summed up. +func TrackStorageLatency(ctx context.Context, d time.Duration) { + if tracker, ok := LatencyTrackersFrom(ctx); ok { + tracker.StorageTracker.TrackDuration(d) + } +} + +// TrackSerializeResponseObjectLatency is used to track latency incurred in +// serialization (json or protobuf) of the response object. +// When called multiple times, the latency provided will be summed up. +func TrackSerializeResponseObjectLatency(ctx context.Context, f func()) { + if tracker, ok := LatencyTrackersFrom(ctx); ok { + tracker.SerializationTracker.Track(f) + return + } + + f() +} + +// TrackResponseWriteLatency is used to track latency incurred in writing +// the serialized raw bytes to the http ResponseWriter object (via the +// Write method) associated with the request. +// When called multiple times, the latency provided will be summed up. +func TrackResponseWriteLatency(ctx context.Context, d time.Duration) { + if tracker, ok := LatencyTrackersFrom(ctx); ok { + tracker.ResponseWriteTracker.TrackDuration(d) + } +} + +// AuditAnnotationsFromLatencyTrackers will inspect each latency tracker +// associated with the request context and return a set of audit +// annotations that can be added to the API audit entry. +func AuditAnnotationsFromLatencyTrackers(ctx context.Context) map[string]string { + const ( + transformLatencyKey = "apiserver.latency.k8s.io/transform-response-object" + storageLatencyKey = "apiserver.latency.k8s.io/etcd" + serializationLatencyKey = "apiserver.latency.k8s.io/serialize-response-object" + responseWriteLatencyKey = "apiserver.latency.k8s.io/response-write" + mutatingWebhookLatencyKey = "apiserver.latency.k8s.io/mutating-webhook" + validatingWebhookLatencyKey = "apiserver.latency.k8s.io/validating-webhook" + ) + + tracker, ok := LatencyTrackersFrom(ctx) + if !ok { + return nil + } + + annotations := map[string]string{} + if latency := tracker.TransformTracker.GetLatency(); latency != 0 { + annotations[transformLatencyKey] = latency.String() + } + if latency := tracker.StorageTracker.GetLatency(); latency != 0 { + annotations[storageLatencyKey] = latency.String() + } + if latency := tracker.SerializationTracker.GetLatency(); latency != 0 { + annotations[serializationLatencyKey] = latency.String() + } + if latency := tracker.ResponseWriteTracker.GetLatency(); latency != 0 { + annotations[responseWriteLatencyKey] = latency.String() + } + if latency := tracker.MutatingWebhookTracker.GetLatency(); latency != 0 { + annotations[mutatingWebhookLatencyKey] = latency.String() + } + if latency := tracker.ValidatingWebhookTracker.GetLatency(); latency != 0 { + annotations[validatingWebhookLatencyKey] = latency.String() + } + + return annotations +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index c16407606f3..5c72800a098 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -765,8 +765,7 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c } func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { - handler := genericapifilters.WithLatencyTrackers(apiHandler) - handler = filterlatency.TrackCompleted(handler) + handler := filterlatency.TrackCompleted(apiHandler) handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer) handler = filterlatency.TrackStarted(handler, "authorization") @@ -818,6 +817,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { handler = genericapifilters.WithTracing(handler, c.TracerProvider) } + handler = genericapifilters.WithLatencyTrackers(handler) handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) handler = genericapifilters.WithRequestReceivedTimestamp(handler) handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled()) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 67bf529f9f3..0616088c7ff 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -340,8 +339,16 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { if event.Stage != auditinternal.StageResponseComplete { t.Errorf("expected event stage to be complete, got: %s", event.Stage) } - if diff := cmp.Diff(want, event.Annotations); diff != "" { - t.Errorf("event has unexpected annotations (-want +got): %s", diff) + + for wantK, wantV := range want { + gotV, ok := event.Annotations[wantK] + if !ok { + t.Errorf("expected to find annotation key %q in %#v", wantK, event.Annotations) + continue + } + if wantV != gotV { + t.Errorf("expected the annotation value to match, key: %q, want: %q got: %q", wantK, wantV, gotV) + } } } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/latency_tracker.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/latency_tracker.go new file mode 100644 index 00000000000..a387b79cc59 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/latency_tracker.go @@ -0,0 +1,107 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "context" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" +) + +// NewETCDLatencyTracker returns an implementation of +// clientv3.KV that times the calls from the specified +// 'delegate' KV instance in order to track latency incurred. +func NewETCDLatencyTracker(delegate clientv3.KV) clientv3.KV { + return &clientV3KVLatencyTracker{KV: delegate} +} + +// clientV3KVLatencyTracker decorates a clientv3.KV instance and times +// each call so we can track the latency an API request incurs in etcd +// round trips (the time it takes to send data to etcd and get the +// complete response back) +// +// If an API request involves N (N>=1) round trips to etcd, then we will sum +// up the latenciy incurred in each roundtrip. + +// It uses the context associated with the request in flight, so there +// are no states shared among the requests in flight, and so there is no +// concurrency overhead. +// If the goroutine executing the request handler makes concurrent calls +// to the underlying storage layer, that is protected since the latency +// tracking function TrackStorageLatency is thread safe. +// +// NOTE: Compact is an asynchronous process and is not associated with +// any request, so we will not be tracking its latency. +type clientV3KVLatencyTracker struct { + clientv3.KV +} + +func (c *clientV3KVLatencyTracker) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + startedAt := time.Now() + defer func() { + endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt)) + }() + + return c.KV.Put(ctx, key, val, opts...) +} + +func (c *clientV3KVLatencyTracker) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + startedAt := time.Now() + defer func() { + endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt)) + }() + + return c.KV.Get(ctx, key, opts...) +} + +func (c *clientV3KVLatencyTracker) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + startedAt := time.Now() + defer func() { + endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt)) + }() + + return c.KV.Delete(ctx, key, opts...) +} + +func (c *clientV3KVLatencyTracker) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + startedAt := time.Now() + defer func() { + endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt)) + }() + + return c.KV.Do(ctx, op) +} + +func (c *clientV3KVLatencyTracker) Txn(ctx context.Context) clientv3.Txn { + return &clientV3TxnTracker{ctx: ctx, Txn: c.KV.Txn(ctx)} +} + +type clientV3TxnTracker struct { + ctx context.Context + clientv3.Txn +} + +func (t *clientV3TxnTracker) Commit() (*clientv3.TxnResponse, error) { + startedAt := time.Now() + defer func() { + endpointsrequest.TrackStorageLatency(t.ctx, time.Since(startedAt)) + }() + + return t.Txn.Commit() +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 63e18699d6b..f54c9421f20 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -261,6 +261,9 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime. return nil, nil, err } + // decorate the KV instance so we can track etcd latency per request. + client.KV = etcd3.NewETCDLatencyTracker(client.KV) + stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval) if err != nil { return nil, nil, err