mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #43871 from liggitt/patch-mutation
Automatic merge from submit-queue (batch tested with PRs 43871, 44053) Fix original object mutation on patch retry When applying a patch, the original state of the object and patch are captured so that retries can determine if an overlapping update was made to the object, and the patch API call should abort with a conflict. However, the `strategicPatchObject -> applyPatchToObject -> StrategicMergeMapPatch -> mergeMap` call mutates both the original object map and the patch map, making them unsuitable for reusing for subsequent calls. We saw this in a [downstream test](https://github.com/openshift/origin/blob/master/test/integration/patch_test.go) that exercises patch conflict retries, where the data being submitted in a patch was showing up in the original object data map. Since mergeMap *also* mutates the patch map passed in (deletes patch directives as it encounters them), we *also* cannot reuse the patch map in `applyPatchToObject` once it has been used. This PR: * Builds `originalObjMap` separate from the initial patch application for later use in conflict detection * Changes the `originalPatchMap` to a helper that returns a fresh map from original sources * Adds an integration test that exercises retry of non-overlapping patches with patches containing `$patch` directives
This commit is contained in:
commit
64973188dd
@ -553,6 +553,7 @@ func handleUnmarshal(j []byte) (map[string]interface{}, error) {
|
||||
// StrategicMergePatch applies a strategic merge patch. The original and patch documents
|
||||
// must be JSONMap. A patch can be created from an original and modified document by
|
||||
// calling CreateTwoWayMergeMapPatch.
|
||||
// Warning: the original and patch JSONMap objects are mutated by this function and should not be reused.
|
||||
func StrategicMergeMapPatch(original, patch JSONMap, dataStruct interface{}) (JSONMap, error) {
|
||||
t, err := getTagStructType(dataStruct)
|
||||
if err != nil {
|
||||
|
@ -86,21 +86,21 @@ func strategicPatchObject(
|
||||
patchJS []byte,
|
||||
objToUpdate runtime.Object,
|
||||
versionedObj runtime.Object,
|
||||
) (originalObjMap map[string]interface{}, patchMap map[string]interface{}, retErr error) {
|
||||
originalObjMap = make(map[string]interface{})
|
||||
) error {
|
||||
originalObjMap := make(map[string]interface{})
|
||||
if err := unstructured.DefaultConverter.ToUnstructured(originalObject, &originalObjMap); err != nil {
|
||||
return nil, nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
patchMap = make(map[string]interface{})
|
||||
patchMap := make(map[string]interface{})
|
||||
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
|
||||
return nil, nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil {
|
||||
return nil, nil, err
|
||||
return err
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyPatchToObject applies a strategic merge patch of <patchMap> to
|
||||
|
@ -570,7 +570,7 @@ func patchResource(
|
||||
originalObjJS []byte
|
||||
originalPatchedObjJS []byte
|
||||
originalObjMap map[string]interface{}
|
||||
originalPatchMap map[string]interface{}
|
||||
getOriginalPatchMap func() (map[string]interface{}, error)
|
||||
lastConflictErr error
|
||||
originalResourceVersion string
|
||||
)
|
||||
@ -610,6 +610,26 @@ func patchResource(
|
||||
return nil, err
|
||||
}
|
||||
originalObjJS, originalPatchedObjJS = originalJS, patchedJS
|
||||
|
||||
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
|
||||
// We have to rebuild it each time we need it, because the map gets mutated when being applied
|
||||
var originalPatchBytes []byte
|
||||
getOriginalPatchMap = func() (map[string]interface{}, error) {
|
||||
if originalPatchBytes == nil {
|
||||
// Compute once
|
||||
originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// Return a fresh map every time
|
||||
originalPatchMap := make(map[string]interface{})
|
||||
if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return originalPatchMap, nil
|
||||
}
|
||||
|
||||
case types.StrategicMergePatchType:
|
||||
// Since the patch is applied on versioned objects, we need to convert the
|
||||
// current object to versioned representation first.
|
||||
@ -621,8 +641,12 @@ func patchResource(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
originalMap, patchMap, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
|
||||
if err != nil {
|
||||
// Capture the original object map and patch for possible retries.
|
||||
originalMap := make(map[string]interface{})
|
||||
if err := unstructured.DefaultConverter.ToUnstructured(currentVersionedObject, &originalMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Convert the object back to unversioned.
|
||||
@ -632,8 +656,17 @@ func patchResource(
|
||||
return nil, err
|
||||
}
|
||||
objToUpdate = unversionedObjToUpdate
|
||||
// Store unstructured representations for possible retries.
|
||||
originalObjMap, originalPatchMap = originalMap, patchMap
|
||||
// Store unstructured representation for possible retries.
|
||||
originalObjMap = originalMap
|
||||
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
|
||||
// We have to rebuild it each time we need it, because the map gets mutated when being applied
|
||||
getOriginalPatchMap = func() (map[string]interface{}, error) {
|
||||
patchMap := make(map[string]interface{})
|
||||
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return patchMap, nil
|
||||
}
|
||||
}
|
||||
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
|
||||
return nil, err
|
||||
@ -669,17 +702,6 @@ func patchResource(
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if originalPatchMap == nil {
|
||||
// Compute original patch, if we already didn't do this in previous retries.
|
||||
originalPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
originalPatchMap = make(map[string]interface{})
|
||||
if err := json.Unmarshal(originalPatch, &originalPatchMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// Compute current patch.
|
||||
currentObjJS, err := runtime.Encode(codec, currentObject)
|
||||
if err != nil {
|
||||
@ -695,6 +717,12 @@ func patchResource(
|
||||
}
|
||||
}
|
||||
|
||||
// Get a fresh copy of the original strategic patch each time through, since applying it mutates the map
|
||||
originalPatchMap, err := getOriginalPatchMap()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -74,7 +74,7 @@ func TestPatchAnonymousField(t *testing.T) {
|
||||
}
|
||||
|
||||
actual := &testPatchType{}
|
||||
_, _, err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{})
|
||||
err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@ -314,7 +314,7 @@ func TestNumberConversion(t *testing.T) {
|
||||
|
||||
patchJS := []byte(`{"spec":{"ports":[{"port":80,"nodePort":31789}]}}`)
|
||||
|
||||
_, _, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
|
||||
err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
116
test/integration/apiserver/patch_test.go
Normal file
116
test/integration/apiserver/patch_test.go
Normal file
@ -0,0 +1,116 @@
|
||||
// +build integration,!no-etcd
|
||||
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/pborman/uuid"
|
||||
|
||||
"reflect"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
// Tests that the apiserver retries non-overlapping conflicts on patches
|
||||
func TestPatchConflicts(t *testing.T) {
|
||||
s, clientSet := setup(t)
|
||||
defer s.Close()
|
||||
|
||||
ns := framework.CreateTestingNamespace("status-code", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
|
||||
// Create the object we're going to conflict on
|
||||
clientSet.Core().Secrets(ns.Name).Create(&v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test",
|
||||
// Populate annotations so the strategic patch descends, compares, and notices the $patch directive
|
||||
Annotations: map[string]string{"initial": "value"},
|
||||
},
|
||||
})
|
||||
client := clientSet.Core().RESTClient()
|
||||
|
||||
successes := int32(0)
|
||||
|
||||
// Run a lot of simultaneous patch operations to exercise internal API server retry of patch application.
|
||||
// Internally, a patch API call retries up to MaxRetryWhenPatchConflicts times if the resource version of the object has changed.
|
||||
// If the resource version of the object changed between attempts, that means another one of our patch requests succeeded.
|
||||
// That means if we run 2*MaxRetryWhenPatchConflicts patch attempts, we should see at least MaxRetryWhenPatchConflicts succeed.
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < (2 * handlers.MaxRetryWhenPatchConflicts); i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
annotationName := fmt.Sprintf("annotation-%d", i)
|
||||
labelName := fmt.Sprintf("label-%d", i)
|
||||
value := uuid.NewRandom().String()
|
||||
|
||||
obj, err := client.Patch(types.StrategicMergePatchType).
|
||||
Namespace(ns.Name).
|
||||
Resource("secrets").
|
||||
Name("test").
|
||||
Body([]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}, "annotations":{"$patch":"replace","%s":"%s"}}}`, labelName, value, annotationName, value))).
|
||||
Do().
|
||||
Get()
|
||||
|
||||
if errors.IsConflict(err) {
|
||||
t.Logf("tolerated conflict error patching %s: %v", "secrets", err)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("error patching %s: %v", "secrets", err)
|
||||
return
|
||||
}
|
||||
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
t.Errorf("error getting object from %s: %v", "secrets", err)
|
||||
return
|
||||
}
|
||||
// make sure the label we wanted was effective
|
||||
if accessor.GetLabels()[labelName] != value {
|
||||
t.Errorf("patch of %s was ineffective, expected %s=%s, got labels %#v", "secrets", labelName, value, accessor.GetLabels())
|
||||
return
|
||||
}
|
||||
// make sure the patch directive didn't get lost, and that the entire annotation map was replaced
|
||||
if !reflect.DeepEqual(accessor.GetAnnotations(), map[string]string{annotationName: value}) {
|
||||
t.Errorf("patch of %s with $patch directive was ineffective, didn't replace entire annotations map: %#v", "secrets", accessor.GetAnnotations())
|
||||
}
|
||||
|
||||
atomic.AddInt32(&successes, 1)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if successes < handlers.MaxRetryWhenPatchConflicts {
|
||||
t.Errorf("Expected at least %d successful patches for %s, got %d", handlers.MaxRetryWhenPatchConflicts, "secrets", successes)
|
||||
} else {
|
||||
t.Logf("Got %d successful patches for %s", successes, "secrets")
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user