diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go index db9f325a43b..6b9346955f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go @@ -43,8 +43,7 @@ import ( utiltrace "k8s.io/apiserver/pkg/util/trace" ) -// PatchResource returns a function that will handle a resource patch -// TODO: Eventually PatchResource should just use GuaranteedUpdate and this routine should be a bit cleaner +// PatchResource returns a function that will handle a resource patch. func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface, converter runtime.ObjectConvertor, patchTypes []string) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { // For performance tracking purposes. @@ -80,7 +79,14 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface ctx := req.Context() ctx = request.WithNamespace(ctx, namespace) - versionedObj, err := converter.ConvertToVersion(r.New(), scope.Kind.GroupVersion()) + // TODO: this is NOT using the scope's convertor [sic]. Figure + // out if this is intentional or not. Perhaps it matters on + // subresources? Rename this parameter if this is purposful and + // delete it (using scope.Convertor instead) otherwise. + // + // Already some tests set this converter but apparently not the + // scope's unsafeConvertor. + schemaReferenceObj, err := converter.ConvertToVersion(r.New(), scope.Kind.GroupVersion()) if err != nil { scope.err(err, w, req) return @@ -109,24 +115,40 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface userInfo, _ := request.UserFrom(ctx) staticAdmissionAttributes := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo) - updateMutation := func(updatedObject runtime.Object, currentObject runtime.Object) error { + admissionCheck := func(updatedObject runtime.Object, currentObject runtime.Object) error { if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && admit.Handles(admission.Update) { return mutatingAdmission.Admit(admission.NewAttributesRecord(updatedObject, currentObject, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo)) } return nil } - result, err := patchResource( - ctx, - updateMutation, - rest.AdmissionToValidateObjectFunc(admit, staticAdmissionAttributes), - rest.AdmissionToValidateObjectUpdateFunc(admit, staticAdmissionAttributes), - timeout, versionedObj, - r, - name, - patchType, - patchJS, - scope.Namer, scope.Creater, scope.Defaulter, scope.UnsafeConvertor, scope.Kind, scope.Resource, codec, trace) + p := patcher{ + namer: scope.Namer, + creater: scope.Creater, + defaulter: scope.Defaulter, + unsafeConvertor: scope.UnsafeConvertor, + kind: scope.Kind, + resource: scope.Resource, + + createValidation: rest.AdmissionToValidateObjectFunc(admit, staticAdmissionAttributes), + updateValidation: rest.AdmissionToValidateObjectUpdateFunc(admit, staticAdmissionAttributes), + admissionCheck: admissionCheck, + + codec: codec, + + timeout: timeout, + + schemaReferenceObj: schemaReferenceObj, + + restPatcher: r, + name: name, + patchType: patchType, + patchJS: patchJS, + + trace: trace, + } + + result, err := p.patchResource(ctx) if err != nil { scope.err(err, w, req) return @@ -150,290 +172,215 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface type mutateObjectUpdateFunc func(obj, old runtime.Object) error -// patchResource divides PatchResource for easier unit testing -func patchResource( - ctx context.Context, - updateMutation mutateObjectUpdateFunc, - createValidation rest.ValidateObjectFunc, - updateValidation rest.ValidateObjectUpdateFunc, - timeout time.Duration, - versionedObj runtime.Object, - patcher rest.Patcher, - name string, - patchType types.PatchType, - patchJS []byte, - namer ScopeNamer, - creater runtime.ObjectCreater, - defaulter runtime.ObjectDefaulter, - unsafeConvertor runtime.ObjectConvertor, - kind schema.GroupVersionKind, - resource schema.GroupVersionResource, - codec runtime.Codec, - trace *utiltrace.Trace, -) (runtime.Object, error) { +// patcher breaks the process of patch application and retries into smaller +// pieces of functionality. +// TODO: Use builder pattern to construct this object? +// TODO: As part of that effort, some aspects of PatchResource above could be +// moved into this type. +type patcher struct { + // Pieces of RequestScope + namer ScopeNamer + creater runtime.ObjectCreater + defaulter runtime.ObjectDefaulter + unsafeConvertor runtime.ObjectConvertor + resource schema.GroupVersionResource + kind schema.GroupVersionKind - namespace := request.NamespaceValue(ctx) + // Validation functions + createValidation rest.ValidateObjectFunc + updateValidation rest.ValidateObjectUpdateFunc + admissionCheck mutateObjectUpdateFunc - var ( - originalObjJS []byte - originalPatchedObjJS []byte - originalObjMap map[string]interface{} - getOriginalPatchMap func() (map[string]interface{}, error) - lastConflictErr error - originalResourceVersion string - ) + codec runtime.Codec - // applyPatch is called every time GuaranteedUpdate asks for the updated object, - // and is given the currently persisted object as input. - applyPatch := func(_ context.Context, _, currentObject runtime.Object) (runtime.Object, error) { - // Make sure we actually have a persisted currentObject - trace.Step("About to apply patch") - if hasUID, err := hasUID(currentObject); err != nil { - return nil, err - } else if !hasUID { - return nil, errors.NewNotFound(resource.GroupResource(), name) - } + timeout time.Duration - currentResourceVersion := "" - if currentMetadata, err := meta.Accessor(currentObject); err == nil { - currentResourceVersion = currentMetadata.GetResourceVersion() - } + // Schema + schemaReferenceObj runtime.Object - switch { - case originalObjJS == nil && originalObjMap == nil: - // first time through, - // 1. apply the patch - // 2. save the original and patched to detect whether there were conflicting changes on retries + // Operation information + restPatcher rest.Patcher + name string + patchType types.PatchType + patchJS []byte - originalResourceVersion = currentResourceVersion - objToUpdate := patcher.New() + trace *utiltrace.Trace - // For performance reasons, in case of strategicpatch, we avoid json - // marshaling and unmarshaling and operate just on map[string]interface{}. - // In case of other patch types, we still have to operate on JSON - // representations. - switch patchType { - case types.JSONPatchType, types.MergePatchType: - originalJS, patchedJS, err := patchObjectJSON(patchType, codec, currentObject, patchJS, objToUpdate, versionedObj) - if err != nil { - return nil, interpretPatchError(err) - } - originalObjJS, originalPatchedObjJS = originalJS, patchedJS + // Set at invocation-time (by applyPatch) and immutable thereafter + namespace string + updatedObjectInfo rest.UpdatedObjectInfo + mechanism patchMechanism - // Make a getter that can return a fresh strategic patch map if needed for conflict retries - // We have to rebuild it each time we need it, because the map gets mutated when being applied - var originalPatchBytes []byte - getOriginalPatchMap = func() (map[string]interface{}, error) { - if originalPatchBytes == nil { - // Compute once - originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj) - if err != nil { - return nil, interpretPatchError(err) - } - } - // Return a fresh map every time - originalPatchMap := make(map[string]interface{}) - if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil { - return nil, errors.NewBadRequest(err.Error()) - } - return originalPatchMap, nil - } + // Set on first iteration, currently only used to construct error messages + originalResourceVersion string - case types.StrategicMergePatchType: - // Since the patch is applied on versioned objects, we need to convert the - // current object to versioned representation first. - currentVersionedObject, err := unsafeConvertor.ConvertToVersion(currentObject, kind.GroupVersion()) - if err != nil { - return nil, err - } - versionedObjToUpdate, err := creater.New(kind) - if err != nil { - return nil, err - } - // Capture the original object map and patch for possible retries. - originalMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject) - if err != nil { - return nil, err - } - if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil { - return nil, err - } - // Convert the object back to unversioned. - gvk := kind.GroupKind().WithVersion(runtime.APIVersionInternal) - unversionedObjToUpdate, err := unsafeConvertor.ConvertToVersion(versionedObjToUpdate, gvk.GroupVersion()) - if err != nil { - return nil, err - } - objToUpdate = unversionedObjToUpdate - // Store unstructured representation for possible retries. - originalObjMap = originalMap - // Make a getter that can return a fresh strategic patch map if needed for conflict retries - // We have to rebuild it each time we need it, because the map gets mutated when being applied - getOriginalPatchMap = func() (map[string]interface{}, error) { - patchMap := make(map[string]interface{}) - if err := json.Unmarshal(patchJS, &patchMap); err != nil { - return nil, errors.NewBadRequest(err.Error()) - } - return patchMap, nil - } - } - if err := checkName(objToUpdate, name, namespace, namer); err != nil { - return nil, err - } - return objToUpdate, nil - - default: - // on a conflict, - // 1. build a strategic merge patch from originalJS and the patchedJS. Different patch types can - // be specified, but a strategic merge patch should be expressive enough handle them. Build the - // patch with this type to handle those cases. - // 2. build a strategic merge patch from originalJS and the currentJS - // 3. ensure no conflicts between the two patches - // 4. apply the #1 patch to the currentJS object - - // Since the patch is applied on versioned objects, we need to convert the - // current object to versioned representation first. - currentVersionedObject, err := unsafeConvertor.ConvertToVersion(currentObject, kind.GroupVersion()) - if err != nil { - return nil, err - } - currentObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject) - if err != nil { - return nil, err - } - - var currentPatchMap map[string]interface{} - if originalObjMap != nil { - var err error - currentPatchMap, err = strategicpatch.CreateTwoWayMergeMapPatch(originalObjMap, currentObjMap, versionedObj) - if err != nil { - return nil, interpretPatchError(err) - } - } else { - // Compute current patch. - currentObjJS, err := runtime.Encode(codec, currentObject) - if err != nil { - return nil, err - } - currentPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, currentObjJS, versionedObj) - if err != nil { - return nil, interpretPatchError(err) - } - currentPatchMap = make(map[string]interface{}) - if err := json.Unmarshal(currentPatch, ¤tPatchMap); err != nil { - return nil, errors.NewBadRequest(err.Error()) - } - } - - // Get a fresh copy of the original strategic patch each time through, since applying it mutates the map - originalPatchMap, err := getOriginalPatchMap() - if err != nil { - return nil, err - } - - patchMetaFromStruct, err := strategicpatch.NewPatchMetaFromStruct(versionedObj) - if err != nil { - return nil, err - } - hasConflicts, err := strategicpatch.MergingMapsHaveConflicts(originalPatchMap, currentPatchMap, patchMetaFromStruct) - if err != nil { - return nil, err - } - - if hasConflicts { - diff1, _ := json.Marshal(currentPatchMap) - diff2, _ := json.Marshal(originalPatchMap) - patchDiffErr := fmt.Errorf("there is a meaningful conflict (firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", originalResourceVersion, currentResourceVersion, string(diff1), string(diff2)) - glog.V(4).Infof("patchResource failed for resource %s, because there is a meaningful conflict(firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", name, originalResourceVersion, currentResourceVersion, string(diff1), string(diff2)) - - // Return the last conflict error we got if we have one - if lastConflictErr != nil { - return nil, lastConflictErr - } - // Otherwise manufacture one of our own - return nil, errors.NewConflict(resource.GroupResource(), name, patchDiffErr) - } - - versionedObjToUpdate, err := creater.New(kind) - if err != nil { - return nil, err - } - if err := applyPatchToObject(codec, defaulter, currentObjMap, originalPatchMap, versionedObjToUpdate, versionedObj); err != nil { - return nil, err - } - // Convert the object back to unversioned. - gvk := kind.GroupKind().WithVersion(runtime.APIVersionInternal) - objToUpdate, err := unsafeConvertor.ConvertToVersion(versionedObjToUpdate, gvk.GroupVersion()) - if err != nil { - return nil, err - } - - return objToUpdate, nil - } - } - - // applyAdmission is called every time GuaranteedUpdate asks for the updated object, - // and is given the currently persisted object and the patched object as input. - applyAdmission := func(ctx context.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) { - trace.Step("About to check admission control") - return patchedObject, updateMutation(patchedObject, currentObject) - } - updatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, applyPatch, applyAdmission) - - return finishRequest(timeout, func() (runtime.Object, error) { - updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo, createValidation, updateValidation) - for i := 0; i < MaxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ { - lastConflictErr = updateErr - updateObject, _, updateErr = patcher.Update(ctx, name, updatedObjectInfo, createValidation, updateValidation) - } - return updateObject, updateErr - }) + // Modified each iteration + iterationCount int + lastConflictErr error } -// patchObjectJSON patches the with and stores -// the result in . -// Currently it also returns the original and patched objects serialized to -// JSONs (this may not be needed once we can apply patches at the -// map[string]interface{} level). -func patchObjectJSON( - patchType types.PatchType, - codec runtime.Codec, - originalObject runtime.Object, - patchJS []byte, - objToUpdate runtime.Object, - versionedObj runtime.Object, -) (originalObjJS []byte, patchedObjJS []byte, retErr error) { - js, err := runtime.Encode(codec, originalObject) - if err != nil { - return nil, nil, err - } - originalObjJS = js +func (p *patcher) toUnversioned(versionedObj runtime.Object) (runtime.Object, error) { + gvk := p.kind.GroupKind().WithVersion(runtime.APIVersionInternal) + return p.unsafeConvertor.ConvertToVersion(versionedObj, gvk.GroupVersion()) +} - switch patchType { +type patchMechanism interface { + firstPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) + subsequentPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) +} + +type jsonPatcher struct { + *patcher + + // set by firstPatchAttempt + originalObjJS []byte + originalPatchedObjJS []byte + + // State for originalStrategicMergePatch + originalPatchBytes []byte +} + +func (p *jsonPatcher) firstPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) { + // Encode will convert & return a versioned object in JSON. + // Store this JS for future use. + originalObjJS, err := runtime.Encode(p.codec, currentObject) + if err != nil { + return nil, err + } + + // Apply the patch. Store patched result for future use. + originalPatchedObjJS, err := p.applyJSPatch(originalObjJS) + if err != nil { + return nil, interpretPatchError(err) + } + + // Since both succeeded, store the results. (This shouldn't be + // necessary since neither of the above items can return conflict + // errors, but it also doesn't hurt.) + p.originalObjJS = originalObjJS + p.originalPatchedObjJS = originalPatchedObjJS + + // Construct the resulting typed, unversioned object. + objToUpdate := p.restPatcher.New() + if err := runtime.DecodeInto(p.codec, p.originalPatchedObjJS, objToUpdate); err != nil { + return nil, err + } + + return objToUpdate, nil +} + +// patchJS applies the patch. Input and output objects must both have +// the external version, since that is what the patch must have been constructed against. +func (p *jsonPatcher) applyJSPatch(versionedJS []byte) (patchedJS []byte, retErr error) { + switch p.patchType { case types.JSONPatchType: - patchObj, err := jsonpatch.DecodePatch(patchJS) + patchObj, err := jsonpatch.DecodePatch(p.patchJS) if err != nil { - return nil, nil, err - } - if patchedObjJS, err = patchObj.Apply(originalObjJS); err != nil { - return nil, nil, err + return nil, err } + return patchObj.Apply(versionedJS) case types.MergePatchType: - if patchedObjJS, err = jsonpatch.MergePatch(originalObjJS, patchJS); err != nil { - return nil, nil, err - } - case types.StrategicMergePatchType: - if patchedObjJS, err = strategicpatch.StrategicMergePatch(originalObjJS, patchJS, versionedObj); err != nil { - return nil, nil, err - } + return jsonpatch.MergePatch(versionedJS, p.patchJS) default: // only here as a safety net - go-restful filters content-type - return nil, nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType) + return nil, fmt.Errorf("unknown Content-Type header for patch: %v", p.patchType) } - if err := runtime.DecodeInto(codec, patchedObjJS, objToUpdate); err != nil { - return nil, nil, err +} + +func (p *jsonPatcher) subsequentPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) { + if len(p.originalObjJS) == 0 || len(p.originalPatchedObjJS) == 0 { + return nil, errors.NewInternalError(fmt.Errorf("unexpected error on patch retry: this indicates a bug in the server; remaking the patch against the most recent version of the object might succeed")) } - return + return subsequentPatchLogic(p.patcher, p, currentObject, currentResourceVersion) +} + +// Return a fresh strategic patch map if needed for conflict retries. We have +// to rebuild it each time we need it, because the map gets mutated when being +// applied. +func (p *jsonPatcher) originalStrategicMergePatch() (map[string]interface{}, error) { + if p.originalPatchBytes == nil { + // Compute once. Compute here instead of in the first patch + // attempt because this isn't needed unless there's actually a + // conflict. + var err error + p.originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(p.originalObjJS, p.originalPatchedObjJS, p.schemaReferenceObj) + if err != nil { + return nil, interpretPatchError(err) + } + } + + // Return a fresh map every time + originalPatchMap := make(map[string]interface{}) + if err := json.Unmarshal(p.originalPatchBytes, &originalPatchMap); err != nil { + return nil, errors.NewBadRequest(err.Error()) + } + return originalPatchMap, nil +} + +// TODO: this will totally fail for CR types today, as no schema is available. +// The interface should be changed. +func (p *jsonPatcher) computeStrategicMergePatch(currentObject runtime.Object, _ map[string]interface{}) (map[string]interface{}, error) { + // Compute current patch. + currentObjJS, err := runtime.Encode(p.codec, currentObject) + if err != nil { + return nil, err + } + currentPatch, err := strategicpatch.CreateTwoWayMergePatch(p.originalObjJS, currentObjJS, p.schemaReferenceObj) + if err != nil { + return nil, interpretPatchError(err) + } + currentPatchMap := make(map[string]interface{}) + if err := json.Unmarshal(currentPatch, ¤tPatchMap); err != nil { + return nil, errors.NewBadRequest(err.Error()) + } + + return currentPatchMap, nil +} + +type smpPatcher struct { + *patcher + originalObjMap map[string]interface{} +} + +func (p *smpPatcher) firstPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) { + // first time through, + // 1. apply the patch + // 2. save the original and patched to detect whether there were conflicting changes on retries + + // For performance reasons, in case of strategicpatch, we avoid json + // marshaling and unmarshaling and operate just on map[string]interface{}. + // In case of other patch types, we still have to operate on JSON + // representations. + + // Since the patch is applied on versioned objects, we need to convert the + // current object to versioned representation first. + currentVersionedObject, err := p.unsafeConvertor.ConvertToVersion(currentObject, p.kind.GroupVersion()) + if err != nil { + return nil, err + } + versionedObjToUpdate, err := p.creater.New(p.kind) + if err != nil { + return nil, err + } + // Capture the original object map and patch for possible retries. + originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject) + if err != nil { + return nil, err + } + // Store only after success. (This shouldn't be necessary since neither + // of the above items can return conflict errors, but it also doesn't + // hurt.) + p.originalObjMap = originalObjMap + if err := strategicPatchObject(p.codec, p.defaulter, currentVersionedObject, p.patchJS, versionedObjToUpdate, p.schemaReferenceObj); err != nil { + return nil, err + } + // Convert the object back to unversioned (aka internal version). + unversionedObjToUpdate, err := p.toUnversioned(versionedObjToUpdate) + if err != nil { + return nil, err + } + + return unversionedObjToUpdate, nil } // strategicPatchObject applies a strategic merge patch of to @@ -447,7 +394,7 @@ func strategicPatchObject( originalObject runtime.Object, patchJS []byte, objToUpdate runtime.Object, - versionedObj runtime.Object, + schemaReferenceObj runtime.Object, ) error { originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(originalObject) if err != nil { @@ -459,12 +406,189 @@ func strategicPatchObject( return errors.NewBadRequest(err.Error()) } - if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil { + if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, schemaReferenceObj); err != nil { return err } return nil } +func (p *smpPatcher) subsequentPatchAttempt(currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) { + if p.originalObjMap == nil { + return nil, errors.NewInternalError(fmt.Errorf("unexpected error on (SMP) patch retry: this indicates a bug in the server; remaking the patch against the most recent version of the object might succeed")) + } + return subsequentPatchLogic(p.patcher, p, currentObject, currentResourceVersion) +} + +// Return a fresh strategic patch map if needed for conflict retries. We have +// to rebuild it each time we need it, because the map gets mutated when being +// applied. +func (p *smpPatcher) originalStrategicMergePatch() (map[string]interface{}, error) { + patchMap := make(map[string]interface{}) + if err := json.Unmarshal(p.patchJS, &patchMap); err != nil { + return nil, errors.NewBadRequest(err.Error()) + } + return patchMap, nil +} + +func (p *smpPatcher) computeStrategicMergePatch(_ runtime.Object, currentObjMap map[string]interface{}) (map[string]interface{}, error) { + o, err := strategicpatch.CreateTwoWayMergeMapPatch(p.originalObjMap, currentObjMap, p.schemaReferenceObj) + if err != nil { + return nil, interpretPatchError(err) + } + return o, nil +} + +// patchSource lets you get two SMPs, an original and a current. These can be +// compared for conflicts. +// +// TODO: Instead of computing two 2-way merges and comparing them for +// conflicts, we could do one 3-way merge, which can detect the same +// conflicts. This would likely be more readable and more efficient, +// and should be logically exactly the same operation. +// +// TODO: Currently, the user gets this behavior whether or not they +// specified a RV. I believe we can stop doing this if the user did not +// specify an RV, and that would not be a breaking change. +// +type patchSource interface { + // originalStrategicMergePatch must reconstruct this map each time, + // because it is consumed when it is used. + originalStrategicMergePatch() (map[string]interface{}, error) + computeStrategicMergePatch(unversionedObject runtime.Object, currentVersionedObjMap map[string]interface{}) (map[string]interface{}, error) +} + +func subsequentPatchLogic(p *patcher, ps patchSource, currentObject runtime.Object, currentResourceVersion string) (runtime.Object, error) { + // on a conflict (which is the only reason to have more than one attempt), + // 1. build a strategic merge patch from originalJS and the patchedJS. Different patch types can + // be specified, but a strategic merge patch should be expressive enough handle them. Build the + // patch with this type to handle those cases. + // 2. build a strategic merge patch from originalJS and the currentJS + // 3. ensure no conflicts between the two patches + // 4. apply the #1 patch to the currentJS object + + // Since the patch is applied on versioned objects, we need to convert the + // current object to versioned representation first. + currentVersionedObject, err := p.unsafeConvertor.ConvertToVersion(currentObject, p.kind.GroupVersion()) + if err != nil { + return nil, err + } + currentObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject) + if err != nil { + return nil, err + } + + currentPatchMap, err := ps.computeStrategicMergePatch(currentObject, currentObjMap) + if err != nil { + return nil, err + } + + // Get a fresh copy of the original strategic patch each time through, since applying it mutates the map + originalPatchMap, err := ps.originalStrategicMergePatch() + if err != nil { + return nil, err + } + + patchMetaFromStruct, err := strategicpatch.NewPatchMetaFromStruct(p.schemaReferenceObj) + if err != nil { + return nil, err + } + hasConflicts, err := strategicpatch.MergingMapsHaveConflicts(originalPatchMap, currentPatchMap, patchMetaFromStruct) + if err != nil { + return nil, err + } + + if hasConflicts { + diff1, _ := json.Marshal(currentPatchMap) + diff2, _ := json.Marshal(originalPatchMap) + patchDiffErr := fmt.Errorf("there is a meaningful conflict (firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", p.originalResourceVersion, currentResourceVersion, string(diff1), string(diff2)) + glog.V(4).Infof("patchResource failed for resource %s, because there is a meaningful conflict(firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", p.name, p.originalResourceVersion, currentResourceVersion, string(diff1), string(diff2)) + + // Return the last conflict error we got if we have one + if p.lastConflictErr != nil { + return nil, p.lastConflictErr + } + // Otherwise manufacture one of our own + return nil, errors.NewConflict(p.resource.GroupResource(), p.name, patchDiffErr) + } + + versionedObjToUpdate, err := p.creater.New(p.kind) + if err != nil { + return nil, err + } + if err := applyPatchToObject(p.codec, p.defaulter, currentObjMap, originalPatchMap, versionedObjToUpdate, p.schemaReferenceObj); err != nil { + return nil, err + } + // Convert the object back to unversioned. + return p.toUnversioned(versionedObjToUpdate) +} + +// applyPatch is called every time GuaranteedUpdate asks for the updated object, +// and is given the currently persisted object as input. +func (p *patcher) applyPatch(_ context.Context, _, currentObject runtime.Object) (runtime.Object, error) { + // Make sure we actually have a persisted currentObject + p.trace.Step("About to apply patch") + if hasUID, err := hasUID(currentObject); err != nil { + return nil, err + } else if !hasUID { + return nil, errors.NewNotFound(p.resource.GroupResource(), p.name) + } + + currentResourceVersion := "" + if currentMetadata, err := meta.Accessor(currentObject); err == nil { + currentResourceVersion = currentMetadata.GetResourceVersion() + } + + p.iterationCount++ + + if p.iterationCount == 1 { + p.originalResourceVersion = currentResourceVersion + objToUpdate, err := p.mechanism.firstPatchAttempt(currentObject, currentResourceVersion) + if err != nil { + return nil, err + } + if err := checkName(objToUpdate, p.name, p.namespace, p.namer); err != nil { + return nil, err + } + return objToUpdate, nil + } + return p.mechanism.subsequentPatchAttempt(currentObject, currentResourceVersion) +} + +// applyAdmission is called every time GuaranteedUpdate asks for the updated object, +// and is given the currently persisted object and the patched object as input. +func (p *patcher) applyAdmission(ctx context.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) { + p.trace.Step("About to check admission control") + return patchedObject, p.admissionCheck(patchedObject, currentObject) +} + +func (p *patcher) requestLoop(ctx context.Context) func() (runtime.Object, error) { + // return a function to catch ctx in a closure, until finishRequest + // starts handling the context. + return func() (runtime.Object, error) { + updateObject, _, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation) + for i := 0; i < MaxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ { + p.lastConflictErr = updateErr + updateObject, _, updateErr = p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation) + } + return updateObject, updateErr + } +} + +// patchResource divides PatchResource for easier unit testing +func (p *patcher) patchResource(ctx context.Context) (runtime.Object, error) { + p.namespace = request.NamespaceValue(ctx) + switch p.patchType { + case types.JSONPatchType, types.MergePatchType: + p.mechanism = &jsonPatcher{patcher: p} + case types.StrategicMergePatchType: + p.mechanism = &smpPatcher{patcher: p} + default: + return nil, fmt.Errorf("%v: unimplemented patch type", p.patchType) + } + p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission) + return finishRequest(p.timeout, p.requestLoop(ctx)) +} + // applyPatchToObject applies a strategic merge patch of to // and stores the result in . // NOTE: must be a versioned object. @@ -474,9 +598,9 @@ func applyPatchToObject( originalMap map[string]interface{}, patchMap map[string]interface{}, objToUpdate runtime.Object, - versionedObj runtime.Object, + schemaReferenceObj runtime.Object, ) error { - patchedObjMap, err := strategicpatch.StrategicMergeMapPatch(originalMap, patchMap, versionedObj) + patchedObjMap, err := strategicpatch.StrategicMergeMapPatch(originalMap, patchMap, schemaReferenceObj) if err != nil { return interpretPatchError(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go index 1db9b740480..64df8ebef7d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go @@ -293,7 +293,7 @@ func (tc *patchTestCase) Run(t *testing.T) { convertor := runtime.UnsafeObjectConvertor(scheme) kind := examplev1.SchemeGroupVersion.WithKind("Pod") resource := examplev1.SchemeGroupVersion.WithResource("pods") - versionedObj := &examplev1.Pod{} + schemaReferenceObj := &examplev1.Pod{} for _, patchType := range []types.PatchType{types.JSONPatchType, types.MergePatchType, types.StrategicMergePatchType} { // This needs to be reset on each iteration. @@ -323,7 +323,7 @@ func (tc *patchTestCase) Run(t *testing.T) { patch := []byte{} switch patchType { case types.StrategicMergePatchType: - patch, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, changedJS, versionedObj) + patch, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, changedJS, schemaReferenceObj) if err != nil { t.Errorf("%s: unexpected error: %v", tc.name, err) continue @@ -338,18 +338,33 @@ func (tc *patchTestCase) Run(t *testing.T) { } - resultObj, err := patchResource( - ctx, - admissionMutation, - rest.ValidateAllObjectFunc, - admissionValidation, - 1*time.Second, - versionedObj, - testPatcher, - name, - patchType, - patch, - namer, creater, defaulter, convertor, kind, resource, codec, utiltrace.New("Patch"+name)) + p := patcher{ + namer: namer, + creater: creater, + defaulter: defaulter, + unsafeConvertor: convertor, + kind: kind, + resource: resource, + + createValidation: rest.ValidateAllObjectFunc, + updateValidation: admissionValidation, + admissionCheck: admissionMutation, + + codec: codec, + + timeout: 1 * time.Second, + + schemaReferenceObj: schemaReferenceObj, + + restPatcher: testPatcher, + name: name, + patchType: patchType, + patchJS: patch, + + trace: utiltrace.New("Patch" + name), + } + + resultObj, err := p.patchResource(ctx) if len(tc.expectedError) != 0 { if err == nil || err.Error() != tc.expectedError { t.Errorf("%s: expected error %v, but got %v", tc.name, tc.expectedError, err) @@ -407,11 +422,11 @@ func TestNumberConversion(t *testing.T) { }, } versionedObjToUpdate := &examplev1.Pod{} - versionedObj := &examplev1.Pod{} + schemaReferenceObj := &examplev1.Pod{} patchJS := []byte(`{"spec":{"terminationGracePeriodSeconds":42,"activeDeadlineSeconds":120}}`) - err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj) + err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, schemaReferenceObj) if err != nil { t.Fatal(err) } @@ -538,6 +553,8 @@ func TestPatchResourceWithConflict(t *testing.T) { expectedError: `Operation cannot be fulfilled on pods.example.apiserver.k8s.io "foo": existing 2, new 1`, } + // See issue #63104 for discussion of how much sense this makes. + tc.startingPod.Name = name tc.startingPod.Namespace = namespace tc.startingPod.UID = uid