diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/equality.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/equality.go index 74cc27fd149..ddc0ec8c965 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/equality.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/equality.go @@ -20,12 +20,14 @@ import ( "context" "fmt" "reflect" + "time" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/endpoints/metrics" ) var ignoreTimestampEqualities = func() conversion.Equalities { @@ -54,7 +56,20 @@ func IgnoreManagedFieldsTimestampsTransformer( _ context.Context, newObj runtime.Object, oldObj runtime.Object, -) (runtime.Object, error) { +) (res runtime.Object, err error) { + outcome := "unequal_objects_fast" + start := time.Now() + err = nil + res = nil + + defer func() { + if err != nil { + outcome = "error" + } + + metrics.RecordTimestampComparisonLatency(outcome, time.Since(start)) + }() + // If managedFields modulo timestamps are unchanged // and // rest of object is unchanged @@ -85,12 +100,41 @@ func IgnoreManagedFieldsTimestampsTransformer( oldManagedFields := oldAccessor.GetManagedFields() newManagedFields := accessor.GetManagedFields() + if len(oldManagedFields) != len(newManagedFields) { + // Return early if any managed fields entry was added/removed. + // We want to retain user expectation that even if they write to a field + // whose value did not change, they will still result as the field + // manager at the end. + return newObj, nil + } else if len(newManagedFields) == 0 { + // This transformation only makes sense when managedFields are + // non-empty + return newObj, nil + } + + // This transformation only makes sense if the managed fields has at least one + // changed timestamp; and are otherwise equal. Return early if there are no + // changed timestamps. + allTimesUnchanged := true + for i, e := range newManagedFields { + if !e.Time.Equal(oldManagedFields[i].Time) { + allTimesUnchanged = false + break + } + } + + if allTimesUnchanged { + return newObj, nil + } + // This condition ensures the managed fields are always compared first. If // this check fails, the if statement will short circuit. If the check // succeeds the slow path is taken which compares entire objects. - if ignoreTimestampEqualities.DeepEqualWithNilDifferentFromEmpty(oldManagedFields, newManagedFields) && - ignoreTimestampEqualities.DeepEqualWithNilDifferentFromEmpty(newObj, oldObj) { + if !ignoreTimestampEqualities.DeepEqualWithNilDifferentFromEmpty(oldManagedFields, newManagedFields) { + return newObj, nil + } + if ignoreTimestampEqualities.DeepEqualWithNilDifferentFromEmpty(newObj, oldObj) { // Remove any changed timestamps, so that timestamp is not the only // change seen by etcd. // @@ -103,7 +147,10 @@ func IgnoreManagedFieldsTimestampsTransformer( } accessor.SetManagedFields(newManagedFields) + outcome = "equal_objects" return newObj, nil } + + outcome = "unequal_objects_slow" return newObj, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index 4f851bb4a3b..ee754812881 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -238,6 +238,18 @@ var ( []string{"source", "status"}, ) + requestTimestampComparisonDuration = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Name: "apiserver_request_timestamp_comparison_time", + Help: "Time taken for comparison of old vs new objects in UPDATE or PATCH requests", + Buckets: []float64{0.0001, 0.0003, 0.001, 0.003, 0.01, 0.03, 0.1, 0.3, 1.0, 5.0}, + StabilityLevel: compbasemetrics.ALPHA, + }, + // Path the code takes to reach a conclusion: + // i.e. unequalObjectsFast, unequalObjectsSlow, equalObjectsSlow + []string{"code_path"}, + ) + metrics = []resettableCollector{ deprecatedRequestGauge, requestCounter, @@ -256,6 +268,7 @@ var ( requestFilterDuration, requestAbortsTotal, requestPostTimeoutTotal, + requestTimestampComparisonDuration, } // these are the valid request methods which we report in our metrics. Any other request methods @@ -366,6 +379,10 @@ func RecordFilterLatency(ctx context.Context, name string, elapsed time.Duration requestFilterDuration.WithContext(ctx).WithLabelValues(name).Observe(elapsed.Seconds()) } +func RecordTimestampComparisonLatency(codePath string, elapsed time.Duration) { + requestTimestampComparisonDuration.WithLabelValues(codePath).Observe(elapsed.Seconds()) +} + func RecordRequestPostTimeout(source string, status string) { requestPostTimeoutTotal.WithLabelValues(source, status).Inc() } diff --git a/test/integration/apiserver/timestamp_transformer_test.go b/test/integration/apiserver/timestamp_transformer_test.go new file mode 100644 index 00000000000..bdc564e7da8 --- /dev/null +++ b/test/integration/apiserver/timestamp_transformer_test.go @@ -0,0 +1,149 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + "encoding/json" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + k8sfuzz "k8s.io/apimachinery/pkg/api/apitesting/fuzzer" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + k8stest "k8s.io/kubernetes/pkg/api/testing" +) + +func convertToUnstructured(b *testing.B, obj runtime.Object) runtime.Object { + converter := fieldmanager.DeducedTypeConverter{} + typed, err := converter.ObjectToTyped(obj) + require.NoError(b, err) + res, err := converter.TypedToObject(typed) + require.NoError(b, err) + return res +} + +func doBench(b *testing.B, useUnstructured bool, shortCircuit bool) { + var ( + expectedLarge runtime.Object + actualLarge runtime.Object + expectedSmall runtime.Object + actualSmall runtime.Object + ) + + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + seed := rand.Int63() + fuzzer := k8sfuzz.FuzzerFor(k8stest.FuzzerFuncs, rand.NewSource(seed), codecs) + fuzzer.NilChance(0) + + fuzzer.MaxDepth(1000).NilChance(0.2).NumElements(2, 15) + pod := &v1.Pod{} + fuzzer.Fuzz(pod) + + fuzzer.NilChance(0.2).NumElements(10, 100).MaxDepth(10) + deployment := &v1.Endpoints{} + fuzzer.Fuzz(deployment) + + bts, err := json.Marshal(deployment) + require.NoError(b, err) + b.Logf("Small (Deployment): %v bytes", len(bts)) + bts, err = json.Marshal(pod) + require.NoError(b, err) + b.Logf("Large (Pod): %v bytes", len(bts)) + + expectedLarge = deployment + expectedSmall = pod + + if useUnstructured { + expectedSmall = convertToUnstructured(b, expectedSmall) + expectedLarge = convertToUnstructured(b, expectedLarge) + } + + actualLarge = expectedLarge.DeepCopyObject() + actualSmall = expectedSmall.DeepCopyObject() + + if shortCircuit { + // Modify managed fields of the compared objects to induce a short circuit + now := metav1.Now() + extraEntry := &metav1.ManagedFieldsEntry{ + Manager: "sidecar_controller", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "apps/v1", + Time: &now, + FieldsType: "FieldsV1", + FieldsV1: &metav1.FieldsV1{ + Raw: []byte(`{"f:metadata":{"f:labels":{"f:sidecar_version":{}}},"f:spec":{"f:template":{"f:spec":{"f:containers":{"k:{\"name\":\"sidecar\"}":{".":{},"f:image":{},"f:name":{}}}}}}}`), + }, + } + + largeMeta, err := meta.Accessor(actualLarge) + require.NoError(b, err) + largeMeta.SetManagedFields(append(largeMeta.GetManagedFields(), *extraEntry)) + + smallMeta, err := meta.Accessor(actualSmall) + require.NoError(b, err) + smallMeta.SetManagedFields(append(smallMeta.GetManagedFields(), *extraEntry)) + } + + b.ResetTimer() + + b.Run("Large", func(b2 *testing.B) { + for i := 0; i < b2.N; i++ { + if _, err := fieldmanager.IgnoreManagedFieldsTimestampsTransformer( + context.TODO(), + actualLarge, + expectedLarge, + ); err != nil { + b2.Fatal(err) + } + } + }) + + b.Run("Small", func(b2 *testing.B) { + for i := 0; i < b2.N; i++ { + if _, err := fieldmanager.IgnoreManagedFieldsTimestampsTransformer( + context.TODO(), + actualSmall, + expectedSmall, + ); err != nil { + b2.Fatal(err) + } + } + }) +} + +func BenchmarkIgnoreManagedFieldsTimestampTransformerStructuredShortCircuit(b *testing.B) { + doBench(b, false, true) +} + +func BenchmarkIgnoreManagedFieldsTimestampTransformerStructuredWorstCase(b *testing.B) { + doBench(b, false, false) +} + +func BenchmarkIgnoreManagedFieldsTimestampTransformerUnstructuredShortCircuit(b *testing.B) { + doBench(b, true, true) +} + +func BenchmarkIgnoreManagedFieldsTimestampTransformerUnstructuredWorstCase(b *testing.B) { + doBench(b, true, false) +}