mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-05 07:27:21 +00:00
make patch handle conflicts gracefully
This commit is contained in:
@@ -17,10 +17,12 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
gpath "path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -83,6 +85,9 @@ type RequestScope struct {
|
||||
// may be used to deserialize an options object to pass to the getter.
|
||||
type getterFunc func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error)
|
||||
|
||||
// MaxPatchConflicts is the maximum number of conflicts retry for during a patch operation before returning failure
|
||||
const MaxPatchConflicts = 5
|
||||
|
||||
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
|
||||
// passed-in getterFunc to perform the actual get.
|
||||
func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunction {
|
||||
@@ -392,49 +397,26 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper
|
||||
}
|
||||
}
|
||||
|
||||
versionedObj, err := converter.ConvertToVersion(obj, scope.APIVersion)
|
||||
versionedObj, err := converter.ConvertToVersion(r.New(), scope.APIVersion)
|
||||
if err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
original, err := r.Get(ctx, name)
|
||||
if err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
contentType := req.HeaderParameter("Content-Type")
|
||||
// Remove "; charset=" if included in header.
|
||||
if idx := strings.Index(contentType, ";"); idx > 0 {
|
||||
contentType = contentType[:idx]
|
||||
}
|
||||
patchType := api.PatchType(contentType)
|
||||
|
||||
originalObjJS, err := scope.Codec.Encode(original)
|
||||
if err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
}
|
||||
patchJS, err := readBody(req.Request)
|
||||
if err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
}
|
||||
contentType := req.HeaderParameter("Content-Type")
|
||||
patchedObjJS, err := getPatchedJS(contentType, originalObjJS, patchJS, versionedObj)
|
||||
if err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
if err := scope.Codec.DecodeInto(patchedObjJS, obj); err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
}
|
||||
if err := checkName(obj, name, namespace, scope.Namer); err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
// update should never create as previous get would fail
|
||||
obj, _, err := r.Update(ctx, obj)
|
||||
return obj, err
|
||||
})
|
||||
result, err := patchResource(ctx, timeout, versionedObj, r, name, patchType, patchJS, scope.Namer, scope.Codec)
|
||||
if err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
@@ -447,6 +429,130 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper
|
||||
|
||||
write(http.StatusOK, scope.APIVersion, scope.Codec, result, w, req.Request)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// patchResource divides PatchResource for easier unit testing
|
||||
func patchResource(ctx api.Context, timeout time.Duration, versionedObj runtime.Object, patcher rest.Patcher, name string, patchType api.PatchType, patchJS []byte, namer ScopeNamer, codec runtime.Codec) (runtime.Object, error) {
|
||||
namespace := api.NamespaceValue(ctx)
|
||||
|
||||
original, err := patcher.Get(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
originalObjJS, err := codec.Encode(original)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
originalPatchedObjJS, err := getPatchedJS(patchType, originalObjJS, patchJS, versionedObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objToUpdate := patcher.New()
|
||||
if err := codec.DecodeInto(originalPatchedObjJS, objToUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return finishRequest(timeout, func() (runtime.Object, error) {
|
||||
// update should never create as previous get would fail
|
||||
updateObject, _, updateErr := patcher.Update(ctx, objToUpdate)
|
||||
for i := 0; i < MaxPatchConflicts && (errors.IsConflict(updateErr)); i++ {
|
||||
|
||||
// on a conflict,
|
||||
// 1. build a strategic merge patch from originalJS and the patchedJS. Different patch types can
|
||||
// be specified, but a strategic merge patch should be expressive enough handle them. Build the
|
||||
// patch with this type to handle those cases.
|
||||
// 2. build a strategic merge patch from originalJS and the currentJS
|
||||
// 3. ensure no conflicts between the two patches
|
||||
// 4. apply the #1 patch to the currentJS object
|
||||
// 5. retry the update
|
||||
currentObject, err := patcher.Get(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentObjectJS, err := codec.Encode(currentObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
currentPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, currentObjectJS, patcher.New())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
originalPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, originalPatchedObjJS, patcher.New())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
diff1 := make(map[string]interface{})
|
||||
if err := json.Unmarshal(originalPatch, &diff1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
diff2 := make(map[string]interface{})
|
||||
if err := json.Unmarshal(currentPatch, &diff2); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if hasConflicts(diff1, diff2) {
|
||||
return updateObject, updateErr
|
||||
}
|
||||
|
||||
newlyPatchedObjJS, err := getPatchedJS(api.StrategicMergePatchType, currentObjectJS, originalPatch, versionedObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := codec.DecodeInto(newlyPatchedObjJS, objToUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updateObject, _, updateErr = patcher.Update(ctx, objToUpdate)
|
||||
}
|
||||
|
||||
return updateObject, updateErr
|
||||
})
|
||||
}
|
||||
|
||||
// hasConflicts returns true if the left and right JSON interface objects overlap with
|
||||
// different values in any key. The code will panic if an unrecognized type is passed
|
||||
// (anything not returned by a JSON decode). All keys are required to be strings.
|
||||
func hasConflicts(left, right interface{}) bool {
|
||||
switch typedLeft := left.(type) {
|
||||
case map[string]interface{}:
|
||||
switch typedRight := right.(type) {
|
||||
case map[string]interface{}:
|
||||
for key, leftValue := range typedLeft {
|
||||
if rightValue, ok := typedRight[key]; ok && hasConflicts(leftValue, rightValue) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
case []interface{}:
|
||||
switch typedRight := right.(type) {
|
||||
case []interface{}:
|
||||
if len(typedLeft) != len(typedRight) {
|
||||
return true
|
||||
}
|
||||
for i := range typedLeft {
|
||||
if hasConflicts(typedLeft[i], typedRight[i]) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
case string, float64, bool, int, int64, nil:
|
||||
return !reflect.DeepEqual(left, right)
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown type: %v", reflect.TypeOf(left)))
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateResource returns a function that will handle a resource update
|
||||
@@ -736,12 +842,7 @@ func setListSelfLink(obj runtime.Object, req *restful.Request, namer ScopeNamer)
|
||||
|
||||
}
|
||||
|
||||
func getPatchedJS(contentType string, originalJS, patchJS []byte, obj runtime.Object) ([]byte, error) {
|
||||
// Remove "; charset=" if included in header.
|
||||
if idx := strings.Index(contentType, ";"); idx > 0 {
|
||||
contentType = contentType[:idx]
|
||||
}
|
||||
patchType := api.PatchType(contentType)
|
||||
func getPatchedJS(patchType api.PatchType, originalJS, patchJS []byte, obj runtime.Object) ([]byte, error) {
|
||||
switch patchType {
|
||||
case api.JSONPatchType:
|
||||
patchObj, err := jsonpatch.DecodePatch(patchJS)
|
||||
@@ -755,6 +856,6 @@ func getPatchedJS(contentType string, originalJS, patchJS []byte, obj runtime.Ob
|
||||
return strategicpatch.StrategicMergePatchData(originalJS, patchJS, obj)
|
||||
default:
|
||||
// only here as a safety net - go-restful filters content-type
|
||||
return nil, fmt.Errorf("unknown Content-Type header for patch: %s", contentType)
|
||||
return nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user