mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
collapse patch conflict retry onto GuaranteedUpdate
builds on #62868 1. When the incoming patch specified a resourceVersion that failed as a precondition, the patch handler would retry uselessly 5 times. This PR collapses onto GuaranteedUpdate, which immediately stops retrying in that case. 2. When the incoming patch did not specify a resourceVersion, and persisting to etcd contended with other etcd updates, the retry would try to detect patch conflicts with deltas from the first 'current object' retrieved from etcd and fail with a conflict error in that case. Given that the user did not provide any information about the starting version they expected their patch to apply to, this does not make sense, and results in arbitrary conflict errors, depending on when the patch was submitted relative to other changes made to the resource. This PR changes the patch application to be performed on the object retrieved from etcd identically on every attempt. fixes #58017 SMP is no longer computed for CRD objects fixes #42644 No special state is retained on the first attempt, so the patch handler correctly handles the cached storage optimistically trying with a cached object first
This commit is contained in:
parent
6251402266
commit
fbd6f38084
@ -582,17 +582,6 @@ func TestPatch(t *testing.T) {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// this call waits for the resourceVersion to be reached in the cache before returning.
|
||||
// We need to do this because the patch gets its initial object from the storage, and the cache serves that.
|
||||
// If it is out of date, then our initial patch is applied to an old resource version, which conflicts
|
||||
// and then the updated object shows a conflicting diff, which permanently fails the patch.
|
||||
// This gives expected stability in the patch without retrying on an known number of conflicts below in the test.
|
||||
// See https://issue.k8s.io/42644
|
||||
_, err = noxuNamespacedResourceClient.Get("foo", metav1.GetOptions{ResourceVersion: createdNoxuInstance.GetResourceVersion()})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// a patch with no change
|
||||
createdNoxuInstance, err = noxuNamespacedResourceClient.Patch("foo", types.MergePatchType, patch)
|
||||
if err != nil {
|
||||
|
@ -24,10 +24,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/evanphx/json-patch"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -210,13 +208,6 @@ type patcher struct {
|
||||
namespace string
|
||||
updatedObjectInfo rest.UpdatedObjectInfo
|
||||
mechanism patchMechanism
|
||||
|
||||
// Set on first iteration, currently only used to construct error messages
|
||||
originalResourceVersion string
|
||||
|
||||
// Modified each iteration
|
||||
iterationCount int
|
||||
lastConflictErr error
|
||||
}
|
||||
|
||||
func (p *patcher) toUnversioned(versionedObj runtime.Object) (runtime.Object, error) {
|
||||
@ -225,44 +216,29 @@ func (p *patcher) toUnversioned(versionedObj runtime.Object) (runtime.Object, er
|
||||
}
|
||||
|
||||
type patchMechanism interface {
|
||||
firstPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error)
|
||||
subsequentPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error)
|
||||
applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error)
|
||||
}
|
||||
|
||||
type jsonPatcher struct {
|
||||
*patcher
|
||||
|
||||
// set by firstPatchAttempt
|
||||
originalObjJS []byte
|
||||
originalPatchedObjJS []byte
|
||||
|
||||
// State for originalStrategicMergePatch
|
||||
originalPatchBytes []byte
|
||||
}
|
||||
|
||||
func (p *jsonPatcher) firstPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) {
|
||||
func (p *jsonPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
|
||||
// Encode will convert & return a versioned object in JSON.
|
||||
// Store this JS for future use.
|
||||
originalObjJS, err := runtime.Encode(p.codec, currentObject)
|
||||
currentObjJS, err := runtime.Encode(p.codec, currentObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Apply the patch. Store patched result for future use.
|
||||
originalPatchedObjJS, err := p.applyJSPatch(originalObjJS)
|
||||
// Apply the patch.
|
||||
patchedObjJS, err := p.applyJSPatch(currentObjJS)
|
||||
if err != nil {
|
||||
return nil, interpretPatchError(err)
|
||||
}
|
||||
|
||||
// Since both succeeded, store the results. (This shouldn't be
|
||||
// necessary since neither of the above items can return conflict
|
||||
// errors, but it also doesn't hurt.)
|
||||
p.originalObjJS = originalObjJS
|
||||
p.originalPatchedObjJS = originalPatchedObjJS
|
||||
|
||||
// Construct the resulting typed, unversioned object.
|
||||
objToUpdate := p.restPatcher.New()
|
||||
if err := runtime.DecodeInto(p.codec, p.originalPatchedObjJS, objToUpdate); err != nil {
|
||||
if err := runtime.DecodeInto(p.codec, patchedObjJS, objToUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -287,71 +263,11 @@ func (p *jsonPatcher) applyJSPatch(versionedJS []byte) (patchedJS []byte, retErr
|
||||
}
|
||||
}
|
||||
|
||||
func (p *jsonPatcher) subsequentPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) {
|
||||
if len(p.originalObjJS) == 0 || len(p.originalPatchedObjJS) == 0 {
|
||||
return nil, errors.NewInternalError(fmt.Errorf("unexpected error on patch retry: this indicates a bug in the server; remaking the patch against the most recent version of the object might succeed"))
|
||||
}
|
||||
return subsequentPatchLogic(p.patcher, p, currentObject, currentResourceVersion)
|
||||
}
|
||||
|
||||
// Return a fresh strategic patch map if needed for conflict retries. We have
|
||||
// to rebuild it each time we need it, because the map gets mutated when being
|
||||
// applied.
|
||||
func (p *jsonPatcher) originalStrategicMergePatch() (map[string]interface{}, error) {
|
||||
if p.originalPatchBytes == nil {
|
||||
// Compute once. Compute here instead of in the first patch
|
||||
// attempt because this isn't needed unless there's actually a
|
||||
// conflict.
|
||||
var err error
|
||||
p.originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(p.originalObjJS, p.originalPatchedObjJS, p.schemaReferenceObj)
|
||||
if err != nil {
|
||||
return nil, interpretPatchError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Return a fresh map every time
|
||||
originalPatchMap := make(map[string]interface{})
|
||||
if err := json.Unmarshal(p.originalPatchBytes, &originalPatchMap); err != nil {
|
||||
return nil, errors.NewBadRequest(err.Error())
|
||||
}
|
||||
return originalPatchMap, nil
|
||||
}
|
||||
|
||||
// TODO: this will totally fail for CR types today, as no schema is available.
|
||||
// The interface should be changed.
|
||||
func (p *jsonPatcher) computeStrategicMergePatch(currentObject runtime.Object, _ map[string]interface{}) (map[string]interface{}, error) {
|
||||
// Compute current patch.
|
||||
currentObjJS, err := runtime.Encode(p.codec, currentObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentPatch, err := strategicpatch.CreateTwoWayMergePatch(p.originalObjJS, currentObjJS, p.schemaReferenceObj)
|
||||
if err != nil {
|
||||
return nil, interpretPatchError(err)
|
||||
}
|
||||
currentPatchMap := make(map[string]interface{})
|
||||
if err := json.Unmarshal(currentPatch, ¤tPatchMap); err != nil {
|
||||
return nil, errors.NewBadRequest(err.Error())
|
||||
}
|
||||
|
||||
return currentPatchMap, nil
|
||||
}
|
||||
|
||||
type smpPatcher struct {
|
||||
*patcher
|
||||
originalObjMap map[string]interface{}
|
||||
}
|
||||
|
||||
func (p *smpPatcher) firstPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) {
|
||||
// first time through,
|
||||
// 1. apply the patch
|
||||
// 2. save the original and patched to detect whether there were conflicting changes on retries
|
||||
|
||||
// For performance reasons, in case of strategicpatch, we avoid json
|
||||
// marshaling and unmarshaling and operate just on map[string]interface{}.
|
||||
// In case of other patch types, we still have to operate on JSON
|
||||
// representations.
|
||||
|
||||
func (p *smpPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
|
||||
// Since the patch is applied on versioned objects, we need to convert the
|
||||
// current object to versioned representation first.
|
||||
currentVersionedObject, err := p.unsafeConvertor.ConvertToVersion(currentObject, p.kind.GroupVersion())
|
||||
@ -362,15 +278,6 @@ func (p *smpPatcher) firstPatchAttempt(currentObject runtime.Object, currentReso
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Capture the original object map and patch for possible retries.
|
||||
originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Store only after success. (This shouldn't be necessary since neither
|
||||
// of the above items can return conflict errors, but it also doesn't
|
||||
// hurt.)
|
||||
p.originalObjMap = originalObjMap
|
||||
if err := strategicPatchObject(p.codec, p.defaulter, currentVersionedObject, p.patchJS, versionedObjToUpdate, p.schemaReferenceObj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -412,116 +319,6 @@ func strategicPatchObject(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *smpPatcher) subsequentPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) {
|
||||
if p.originalObjMap == nil {
|
||||
return nil, errors.NewInternalError(fmt.Errorf("unexpected error on (SMP) patch retry: this indicates a bug in the server; remaking the patch against the most recent version of the object might succeed"))
|
||||
}
|
||||
return subsequentPatchLogic(p.patcher, p, currentObject, currentResourceVersion)
|
||||
}
|
||||
|
||||
// Return a fresh strategic patch map if needed for conflict retries. We have
|
||||
// to rebuild it each time we need it, because the map gets mutated when being
|
||||
// applied.
|
||||
func (p *smpPatcher) originalStrategicMergePatch() (map[string]interface{}, error) {
|
||||
patchMap := make(map[string]interface{})
|
||||
if err := json.Unmarshal(p.patchJS, &patchMap); err != nil {
|
||||
return nil, errors.NewBadRequest(err.Error())
|
||||
}
|
||||
return patchMap, nil
|
||||
}
|
||||
|
||||
func (p *smpPatcher) computeStrategicMergePatch(_ runtime.Object, currentObjMap map[string]interface{}) (map[string]interface{}, error) {
|
||||
o, err := strategicpatch.CreateTwoWayMergeMapPatch(p.originalObjMap, currentObjMap, p.schemaReferenceObj)
|
||||
if err != nil {
|
||||
return nil, interpretPatchError(err)
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// patchSource lets you get two SMPs, an original and a current. These can be
|
||||
// compared for conflicts.
|
||||
//
|
||||
// TODO: Instead of computing two 2-way merges and comparing them for
|
||||
// conflicts, we could do one 3-way merge, which can detect the same
|
||||
// conflicts. This would likely be more readable and more efficient,
|
||||
// and should be logically exactly the same operation.
|
||||
//
|
||||
// TODO: Currently, the user gets this behavior whether or not they
|
||||
// specified a RV. I believe we can stop doing this if the user did not
|
||||
// specify an RV, and that would not be a breaking change.
|
||||
//
|
||||
type patchSource interface {
|
||||
// originalStrategicMergePatch must reconstruct this map each time,
|
||||
// because it is consumed when it is used.
|
||||
originalStrategicMergePatch() (map[string]interface{}, error)
|
||||
computeStrategicMergePatch(unversionedObject runtime.Object, currentVersionedObjMap map[string]interface{}) (map[string]interface{}, error)
|
||||
}
|
||||
|
||||
func subsequentPatchLogic(p *patcher, ps patchSource, currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) {
|
||||
// on a conflict (which is the only reason to have more than one attempt),
|
||||
// 1. build a strategic merge patch from originalJS and the patchedJS. Different patch types can
|
||||
// be specified, but a strategic merge patch should be expressive enough handle them. Build the
|
||||
// patch with this type to handle those cases.
|
||||
// 2. build a strategic merge patch from originalJS and the currentJS
|
||||
// 3. ensure no conflicts between the two patches
|
||||
// 4. apply the #1 patch to the currentJS object
|
||||
|
||||
// Since the patch is applied on versioned objects, we need to convert the
|
||||
// current object to versioned representation first.
|
||||
currentVersionedObject, err := p.unsafeConvertor.ConvertToVersion(currentObject, p.kind.GroupVersion())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
currentPatchMap, err := ps.computeStrategicMergePatch(currentObject, currentObjMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get a fresh copy of the original strategic patch each time through, since applying it mutates the map
|
||||
originalPatchMap, err := ps.originalStrategicMergePatch()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
patchMetaFromStruct, err := strategicpatch.NewPatchMetaFromStruct(p.schemaReferenceObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hasConflicts, err := strategicpatch.MergingMapsHaveConflicts(originalPatchMap, currentPatchMap, patchMetaFromStruct)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if hasConflicts {
|
||||
diff1, _ := json.Marshal(currentPatchMap)
|
||||
diff2, _ := json.Marshal(originalPatchMap)
|
||||
patchDiffErr := fmt.Errorf("there is a meaningful conflict (firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", p.originalResourceVersion, currentResourceVersion, string(diff1), string(diff2))
|
||||
glog.V(4).Infof("patchResource failed for resource %s, because there is a meaningful conflict(firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", p.name, p.originalResourceVersion, currentResourceVersion, string(diff1), string(diff2))
|
||||
|
||||
// Return the last conflict error we got if we have one
|
||||
if p.lastConflictErr != nil {
|
||||
return nil, p.lastConflictErr
|
||||
}
|
||||
// Otherwise manufacture one of our own
|
||||
return nil, errors.NewConflict(p.resource.GroupResource(), p.name, patchDiffErr)
|
||||
}
|
||||
|
||||
versionedObjToUpdate, err := p.creater.New(p.kind)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := applyPatchToObject(p.codec, p.defaulter, currentObjMap, originalPatchMap, versionedObjToUpdate, p.schemaReferenceObj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Convert the object back to unversioned.
|
||||
return p.toUnversioned(versionedObjToUpdate)
|
||||
}
|
||||
|
||||
// applyPatch is called every time GuaranteedUpdate asks for the updated object,
|
||||
// and is given the currently persisted object as input.
|
||||
func (p *patcher) applyPatch(_ context.Context, _, currentObject runtime.Object) (runtime.Object, error) {
|
||||
@ -533,25 +330,14 @@ func (p *patcher) applyPatch(_ context.Context, _, currentObject runtime.Object)
|
||||
return nil, errors.NewNotFound(p.resource.GroupResource(), p.name)
|
||||
}
|
||||
|
||||
currentResourceVersion := ""
|
||||
if currentMetadata, err := meta.Accessor(currentObject); err == nil {
|
||||
currentResourceVersion = currentMetadata.GetResourceVersion()
|
||||
objToUpdate, err := p.mechanism.applyPatchToCurrentObject(currentObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.iterationCount++
|
||||
|
||||
if p.iterationCount == 1 {
|
||||
p.originalResourceVersion = currentResourceVersion
|
||||
objToUpdate, err := p.mechanism.firstPatchAttempt(currentObject, currentResourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := checkName(objToUpdate, p.name, p.namespace, p.namer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objToUpdate, nil
|
||||
if err := checkName(objToUpdate, p.name, p.namespace, p.namer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.mechanism.subsequentPatchAttempt(currentObject, currentResourceVersion)
|
||||
return objToUpdate, nil
|
||||
}
|
||||
|
||||
// applyAdmission is called every time GuaranteedUpdate asks for the updated object,
|
||||
@ -561,19 +347,6 @@ func (p *patcher) applyAdmission(ctx context.Context, patchedObject runtime.Obje
|
||||
return patchedObject, p.admissionCheck(patchedObject, currentObject)
|
||||
}
|
||||
|
||||
func (p *patcher) requestLoop(ctx context.Context) func() (runtime.Object, error) {
|
||||
// return a function to catch ctx in a closure, until finishRequest
|
||||
// starts handling the context.
|
||||
return func() (runtime.Object, error) {
|
||||
updateObject, _, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation)
|
||||
for i := 0; i < MaxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ {
|
||||
p.lastConflictErr = updateErr
|
||||
updateObject, _, updateErr = p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation)
|
||||
}
|
||||
return updateObject, updateErr
|
||||
}
|
||||
}
|
||||
|
||||
// patchResource divides PatchResource for easier unit testing
|
||||
func (p *patcher) patchResource(ctx context.Context) (runtime.Object, error) {
|
||||
p.namespace = request.NamespaceValue(ctx)
|
||||
@ -586,7 +359,10 @@ func (p *patcher) patchResource(ctx context.Context) (runtime.Object, error) {
|
||||
return nil, fmt.Errorf("%v: unimplemented patch type", p.patchType)
|
||||
}
|
||||
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
|
||||
return finishRequest(p.timeout, p.requestLoop(ctx))
|
||||
return finishRequest(p.timeout, func() (runtime.Object, error) {
|
||||
updateObject, _, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation)
|
||||
return updateObject, updateErr
|
||||
})
|
||||
}
|
||||
|
||||
// applyPatchToObject applies a strategic merge patch of <patchMap> to
|
||||
|
@ -90,9 +90,6 @@ func (scope *RequestScope) AllowsStreamSchema(s string) bool {
|
||||
return s == "watch"
|
||||
}
|
||||
|
||||
// MaxRetryWhenPatchConflicts is the maximum number of conflicts retry during a patch operation before returning failure
|
||||
const MaxRetryWhenPatchConflicts = 5
|
||||
|
||||
// ConnectResource returns a function that handles a connect request on a rest.Storage object.
|
||||
func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, req *http.Request) {
|
||||
|
@ -173,32 +173,46 @@ func (p *testPatcher) New() runtime.Object {
|
||||
}
|
||||
|
||||
func (p *testPatcher) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) {
|
||||
currentPod := p.startingPod
|
||||
if p.numUpdates > 0 {
|
||||
currentPod = p.updatePod
|
||||
}
|
||||
p.numUpdates++
|
||||
// Simulate GuaranteedUpdate behavior (retries internally on etcd changes if the incoming resource doesn't pin resourceVersion)
|
||||
for {
|
||||
currentPod := p.startingPod
|
||||
if p.numUpdates > 0 {
|
||||
currentPod = p.updatePod
|
||||
}
|
||||
p.numUpdates++
|
||||
|
||||
obj, err := objInfo.UpdatedObject(ctx, currentPod)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
inPod := obj.(*example.Pod)
|
||||
if inPod.ResourceVersion != p.updatePod.ResourceVersion {
|
||||
return nil, false, apierrors.NewConflict(example.Resource("pods"), inPod.Name, fmt.Errorf("existing %v, new %v", p.updatePod.ResourceVersion, inPod.ResourceVersion))
|
||||
}
|
||||
// Remember the current resource version
|
||||
currentResourceVersion := currentPod.ResourceVersion
|
||||
|
||||
if currentPod == nil {
|
||||
if err := createValidation(currentPod); err != nil {
|
||||
obj, err := objInfo.UpdatedObject(ctx, currentPod)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
} else {
|
||||
if err := updateValidation(currentPod, inPod); err != nil {
|
||||
return nil, false, err
|
||||
inPod := obj.(*example.Pod)
|
||||
if inPod.ResourceVersion == "" || inPod.ResourceVersion == "0" {
|
||||
inPod.ResourceVersion = p.updatePod.ResourceVersion
|
||||
}
|
||||
if inPod.ResourceVersion != p.updatePod.ResourceVersion {
|
||||
// If the patch didn't have an opinion on the resource version, retry like GuaranteedUpdate does
|
||||
if inPod.ResourceVersion == currentResourceVersion {
|
||||
continue
|
||||
}
|
||||
// If the patch changed the resource version and it mismatches, conflict
|
||||
return nil, false, apierrors.NewConflict(example.Resource("pods"), inPod.Name, fmt.Errorf("existing %v, new %v", p.updatePod.ResourceVersion, inPod.ResourceVersion))
|
||||
}
|
||||
}
|
||||
|
||||
return inPod, false, nil
|
||||
if currentPod == nil {
|
||||
if err := createValidation(currentPod); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
} else {
|
||||
if err := updateValidation(currentPod, inPod); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
return inPod, false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *testPatcher) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
@ -549,8 +563,7 @@ func TestPatchResourceWithConflict(t *testing.T) {
|
||||
startingPod: &example.Pod{},
|
||||
changedPod: &example.Pod{},
|
||||
updatePod: &example.Pod{},
|
||||
|
||||
expectedError: `Operation cannot be fulfilled on pods.example.apiserver.k8s.io "foo": existing 2, new 1`,
|
||||
expectedPod: &example.Pod{},
|
||||
}
|
||||
|
||||
// See issue #63104 for discussion of how much sense this makes.
|
||||
@ -576,6 +589,13 @@ func TestPatchResourceWithConflict(t *testing.T) {
|
||||
tc.updatePod.APIVersion = examplev1.SchemeGroupVersion.String()
|
||||
tc.updatePod.Spec.NodeName = "anywhere"
|
||||
|
||||
tc.expectedPod.Name = name
|
||||
tc.expectedPod.Namespace = namespace
|
||||
tc.expectedPod.UID = uid
|
||||
tc.expectedPod.ResourceVersion = "2"
|
||||
tc.expectedPod.APIVersion = examplev1.SchemeGroupVersion.String()
|
||||
tc.expectedPod.Spec.NodeName = "there"
|
||||
|
||||
tc.Run(t)
|
||||
}
|
||||
|
||||
|
@ -43,7 +43,6 @@ go_test(
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
|
@ -29,11 +29,10 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
// Tests that the apiserver retries non-overlapping conflicts on patches
|
||||
// Tests that the apiserver retries patches
|
||||
func TestPatchConflicts(t *testing.T) {
|
||||
s, clientSet, closeFn := setup(t)
|
||||
defer closeFn()
|
||||
@ -41,7 +40,7 @@ func TestPatchConflicts(t *testing.T) {
|
||||
ns := framework.CreateTestingNamespace("status-code", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
|
||||
numOfConcurrentPatches := 2 * handlers.MaxRetryWhenPatchConflicts
|
||||
numOfConcurrentPatches := 100
|
||||
|
||||
UIDs := make([]types.UID, numOfConcurrentPatches)
|
||||
ownerRefs := []metav1.OwnerReference{}
|
||||
@ -69,10 +68,8 @@ func TestPatchConflicts(t *testing.T) {
|
||||
|
||||
successes := int32(0)
|
||||
|
||||
// Run a lot of simultaneous patch operations to exercise internal API server retry of patch application.
|
||||
// Internally, a patch API call retries up to MaxRetryWhenPatchConflicts times if the resource version of the object has changed.
|
||||
// If the resource version of the object changed between attempts, that means another one of our patch requests succeeded.
|
||||
// That means if we run 2*MaxRetryWhenPatchConflicts patch attempts, we should see at least MaxRetryWhenPatchConflicts succeed.
|
||||
// Run a lot of simultaneous patch operations to exercise internal API server retry of application of patches that do not specify resourceVersion.
|
||||
// They should all succeed.
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < numOfConcurrentPatches; i++ {
|
||||
wg.Add(1)
|
||||
@ -123,8 +120,8 @@ func TestPatchConflicts(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if successes < handlers.MaxRetryWhenPatchConflicts {
|
||||
t.Errorf("Expected at least %d successful patches for %s, got %d", handlers.MaxRetryWhenPatchConflicts, "secrets", successes)
|
||||
if successes < int32(numOfConcurrentPatches) {
|
||||
t.Errorf("Expected at least %d successful patches for %s, got %d", numOfConcurrentPatches, "secrets", successes)
|
||||
} else {
|
||||
t.Logf("Got %d successful patches for %s", successes, "secrets")
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ go_test(
|
||||
"//test/utils/image:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@ -286,23 +285,6 @@ func TestPatch(t *testing.T) {
|
||||
t.Logf("%v", string(jsonObj))
|
||||
}
|
||||
|
||||
obj, err := result.Get()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// this call waits for the resourceVersion to be reached in the cache before returning. We need to do this because
|
||||
// the patch gets its initial object from the storage, and the cache serves that. If it is out of date,
|
||||
// then our initial patch is applied to an old resource version, which conflicts and then the updated object shows
|
||||
// a conflicting diff, which permanently fails the patch. This gives expected stability in the patch without
|
||||
// retrying on an known number of conflicts below in the test.
|
||||
if _, err := c.Core().Pods(ns.Name).Get(name, metav1.GetOptions{ResourceVersion: metadata.GetResourceVersion()}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user