diff --git a/staging/src/k8s.io/apimachinery/pkg/util/strategicpatch/patch.go b/staging/src/k8s.io/apimachinery/pkg/util/strategicpatch/patch.go index 5878da4f53f..9b1e325fcbf 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/strategicpatch/patch.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/strategicpatch/patch.go @@ -553,6 +553,7 @@ func handleUnmarshal(j []byte) (map[string]interface{}, error) { // StrategicMergePatch applies a strategic merge patch. The original and patch documents // must be JSONMap. A patch can be created from an original and modified document by // calling CreateTwoWayMergeMapPatch. +// Warning: the original and patch JSONMap objects are mutated by this function and should not be reused. func StrategicMergeMapPatch(original, patch JSONMap, dataStruct interface{}) (JSONMap, error) { t, err := getTagStructType(dataStruct) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go index c8b8890703c..48c20e5e437 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go @@ -86,21 +86,21 @@ func strategicPatchObject( patchJS []byte, objToUpdate runtime.Object, versionedObj runtime.Object, -) (originalObjMap map[string]interface{}, patchMap map[string]interface{}, retErr error) { - originalObjMap = make(map[string]interface{}) +) error { + originalObjMap := make(map[string]interface{}) if err := unstructured.DefaultConverter.ToUnstructured(originalObject, &originalObjMap); err != nil { - return nil, nil, err + return err } - patchMap = make(map[string]interface{}) + patchMap := make(map[string]interface{}) if err := json.Unmarshal(patchJS, &patchMap); err != nil { - return nil, nil, err + return err } if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil { - return nil, nil, err + return err } - return + return nil } // applyPatchToObject applies a strategic merge patch of to diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index 276183d3499..0d509374211 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -570,7 +570,7 @@ func patchResource( originalObjJS []byte originalPatchedObjJS []byte originalObjMap map[string]interface{} - originalPatchMap map[string]interface{} + getOriginalPatchMap func() (map[string]interface{}, error) lastConflictErr error originalResourceVersion string ) @@ -610,6 +610,26 @@ func patchResource( return nil, err } originalObjJS, originalPatchedObjJS = originalJS, patchedJS + + // Make a getter that can return a fresh strategic patch map if needed for conflict retries + // We have to rebuild it each time we need it, because the map gets mutated when being applied + var originalPatchBytes []byte + getOriginalPatchMap = func() (map[string]interface{}, error) { + if originalPatchBytes == nil { + // Compute once + originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj) + if err != nil { + return nil, err + } + } + // Return a fresh map every time + originalPatchMap := make(map[string]interface{}) + if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil { + return nil, err + } + return originalPatchMap, nil + } + case types.StrategicMergePatchType: // Since the patch is applied on versioned objects, we need to convert the // current object to versioned representation first. @@ -621,8 +641,12 @@ func patchResource( if err != nil { return nil, err } - originalMap, patchMap, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj) - if err != nil { + // Capture the original object map and patch for possible retries. + originalMap := make(map[string]interface{}) + if err := unstructured.DefaultConverter.ToUnstructured(currentVersionedObject, &originalMap); err != nil { + return nil, err + } + if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil { return nil, err } // Convert the object back to unversioned. @@ -632,8 +656,17 @@ func patchResource( return nil, err } objToUpdate = unversionedObjToUpdate - // Store unstructured representations for possible retries. - originalObjMap, originalPatchMap = originalMap, patchMap + // Store unstructured representation for possible retries. + originalObjMap = originalMap + // Make a getter that can return a fresh strategic patch map if needed for conflict retries + // We have to rebuild it each time we need it, because the map gets mutated when being applied + getOriginalPatchMap = func() (map[string]interface{}, error) { + patchMap := make(map[string]interface{}) + if err := json.Unmarshal(patchJS, &patchMap); err != nil { + return nil, err + } + return patchMap, nil + } } if err := checkName(objToUpdate, name, namespace, namer); err != nil { return nil, err @@ -669,17 +702,6 @@ func patchResource( return nil, err } } else { - if originalPatchMap == nil { - // Compute original patch, if we already didn't do this in previous retries. - originalPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj) - if err != nil { - return nil, err - } - originalPatchMap = make(map[string]interface{}) - if err := json.Unmarshal(originalPatch, &originalPatchMap); err != nil { - return nil, err - } - } // Compute current patch. currentObjJS, err := runtime.Encode(codec, currentObject) if err != nil { @@ -695,6 +717,12 @@ func patchResource( } } + // Get a fresh copy of the original strategic patch each time through, since applying it mutates the map + originalPatchMap, err := getOriginalPatchMap() + if err != nil { + return nil, err + } + hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap) if err != nil { return nil, err diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go index bc0d2c3d9eb..06c295719e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go @@ -74,7 +74,7 @@ func TestPatchAnonymousField(t *testing.T) { } actual := &testPatchType{} - _, _, err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{}) + err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -314,7 +314,7 @@ func TestNumberConversion(t *testing.T) { patchJS := []byte(`{"spec":{"ports":[{"port":80,"nodePort":31789}]}}`) - _, _, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj) + err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj) if err != nil { t.Fatal(err) } diff --git a/test/integration/apiserver/patch_test.go b/test/integration/apiserver/patch_test.go new file mode 100644 index 00000000000..8899c7df4e7 --- /dev/null +++ b/test/integration/apiserver/patch_test.go @@ -0,0 +1,116 @@ +// +build integration,!no-etcd + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + + "github.com/pborman/uuid" + + "reflect" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/test/integration/framework" +) + +// Tests that the apiserver retries non-overlapping conflicts on patches +func TestPatchConflicts(t *testing.T) { + s, clientSet := setup(t) + defer s.Close() + + ns := framework.CreateTestingNamespace("status-code", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + // Create the object we're going to conflict on + clientSet.Core().Secrets(ns.Name).Create(&v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + // Populate annotations so the strategic patch descends, compares, and notices the $patch directive + Annotations: map[string]string{"initial": "value"}, + }, + }) + client := clientSet.Core().RESTClient() + + successes := int32(0) + + // Run a lot of simultaneous patch operations to exercise internal API server retry of patch application. + // Internally, a patch API call retries up to MaxRetryWhenPatchConflicts times if the resource version of the object has changed. + // If the resource version of the object changed between attempts, that means another one of our patch requests succeeded. + // That means if we run 2*MaxRetryWhenPatchConflicts patch attempts, we should see at least MaxRetryWhenPatchConflicts succeed. + wg := sync.WaitGroup{} + for i := 0; i < (2 * handlers.MaxRetryWhenPatchConflicts); i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + annotationName := fmt.Sprintf("annotation-%d", i) + labelName := fmt.Sprintf("label-%d", i) + value := uuid.NewRandom().String() + + obj, err := client.Patch(types.StrategicMergePatchType). + Namespace(ns.Name). + Resource("secrets"). + Name("test"). + Body([]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}, "annotations":{"$patch":"replace","%s":"%s"}}}`, labelName, value, annotationName, value))). + Do(). + Get() + + if errors.IsConflict(err) { + t.Logf("tolerated conflict error patching %s: %v", "secrets", err) + return + } + if err != nil { + t.Errorf("error patching %s: %v", "secrets", err) + return + } + + accessor, err := meta.Accessor(obj) + if err != nil { + t.Errorf("error getting object from %s: %v", "secrets", err) + return + } + // make sure the label we wanted was effective + if accessor.GetLabels()[labelName] != value { + t.Errorf("patch of %s was ineffective, expected %s=%s, got labels %#v", "secrets", labelName, value, accessor.GetLabels()) + return + } + // make sure the patch directive didn't get lost, and that the entire annotation map was replaced + if !reflect.DeepEqual(accessor.GetAnnotations(), map[string]string{annotationName: value}) { + t.Errorf("patch of %s with $patch directive was ineffective, didn't replace entire annotations map: %#v", "secrets", accessor.GetAnnotations()) + } + + atomic.AddInt32(&successes, 1) + }(i) + } + wg.Wait() + + if successes < handlers.MaxRetryWhenPatchConflicts { + t.Errorf("Expected at least %d successful patches for %s, got %d", handlers.MaxRetryWhenPatchConflicts, "secrets", successes) + } else { + t.Logf("Got %d successful patches for %s", successes, "secrets") + } + +}