mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Change rest storage Update interface to retrieve updated object
Add OldObject to admission attributes Update resthandler Patch/Update admission plumbing
This commit is contained in:
@@ -77,6 +77,7 @@ type RequestScope struct {
|
||||
|
||||
Creater runtime.ObjectCreater
|
||||
Convertor runtime.ObjectConvertor
|
||||
Copier runtime.ObjectCopier
|
||||
|
||||
Resource unversioned.GroupVersionResource
|
||||
Kind unversioned.GroupVersionKind
|
||||
@@ -200,7 +201,7 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
|
||||
}
|
||||
userInfo, _ := api.UserFrom(ctx)
|
||||
|
||||
err = admit.Admit(admission.NewAttributesRecord(connectRequest, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, userInfo))
|
||||
err = admit.Admit(admission.NewAttributesRecord(connectRequest, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, userInfo))
|
||||
if err != nil {
|
||||
scope.err(err, res.ResponseWriter, req.Request)
|
||||
return
|
||||
@@ -391,7 +392,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
|
||||
if admit != nil && admit.Handles(admission.Create) {
|
||||
userInfo, _ := api.UserFrom(ctx)
|
||||
|
||||
err = admit.Admit(admission.NewAttributesRecord(obj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, userInfo))
|
||||
err = admit.Admit(admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, userInfo))
|
||||
if err != nil {
|
||||
scope.err(err, res.ResponseWriter, req.Request)
|
||||
return
|
||||
@@ -491,16 +492,16 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper
|
||||
scope.Serializer.DecoderToVersion(s, unversioned.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal}),
|
||||
)
|
||||
|
||||
updateAdmit := func(updatedObject runtime.Object) error {
|
||||
updateAdmit := func(updatedObject runtime.Object, currentObject runtime.Object) error {
|
||||
if admit != nil && admit.Handles(admission.Update) {
|
||||
userInfo, _ := api.UserFrom(ctx)
|
||||
return admit.Admit(admission.NewAttributesRecord(updatedObject, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo))
|
||||
return admit.Admit(admission.NewAttributesRecord(updatedObject, currentObject, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
result, err := patchResource(ctx, updateAdmit, timeout, versionedObj, r, name, patchType, patchJS, scope.Namer, codec)
|
||||
result, err := patchResource(ctx, updateAdmit, timeout, versionedObj, r, name, patchType, patchJS, scope.Namer, scope.Copier, scope.Resource, codec)
|
||||
if err != nil {
|
||||
scope.err(err, res.ResponseWriter, req.Request)
|
||||
return
|
||||
@@ -516,43 +517,69 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper
|
||||
|
||||
}
|
||||
|
||||
type updateAdmissionFunc func(updatedObject runtime.Object) error
|
||||
type updateAdmissionFunc func(updatedObject runtime.Object, currentObject runtime.Object) error
|
||||
|
||||
// patchResource divides PatchResource for easier unit testing
|
||||
func patchResource(ctx api.Context, admit updateAdmissionFunc, timeout time.Duration, versionedObj runtime.Object, patcher rest.Patcher, name string, patchType api.PatchType, patchJS []byte, namer ScopeNamer, codec runtime.Codec) (runtime.Object, error) {
|
||||
func patchResource(
|
||||
ctx api.Context,
|
||||
admit updateAdmissionFunc,
|
||||
timeout time.Duration,
|
||||
versionedObj runtime.Object,
|
||||
patcher rest.Patcher,
|
||||
name string,
|
||||
patchType api.PatchType,
|
||||
patchJS []byte,
|
||||
namer ScopeNamer,
|
||||
copier runtime.ObjectCopier,
|
||||
resource unversioned.GroupVersionResource,
|
||||
codec runtime.Codec,
|
||||
) (runtime.Object, error) {
|
||||
|
||||
namespace := api.NamespaceValue(ctx)
|
||||
|
||||
original, err := patcher.Get(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
originalObjJS []byte
|
||||
originalPatchedObjJS []byte
|
||||
lastConflictErr error
|
||||
)
|
||||
|
||||
originalObjJS, err := runtime.Encode(codec, original)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
originalPatchedObjJS, err := getPatchedJS(patchType, originalObjJS, patchJS, versionedObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objToUpdate := patcher.New()
|
||||
if err := runtime.DecodeInto(codec, originalPatchedObjJS, objToUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return finishRequest(timeout, func() (runtime.Object, error) {
|
||||
if err := admit(objToUpdate); err != nil {
|
||||
// applyPatch is called every time GuaranteedUpdate asks for the updated object,
|
||||
// and is given the currently persisted object as input.
|
||||
applyPatch := func(_ api.Context, _, currentObject runtime.Object) (runtime.Object, error) {
|
||||
// Make sure we actually have a persisted currentObject
|
||||
if hasUID, err := hasUID(currentObject); err != nil {
|
||||
return nil, err
|
||||
} else if !hasUID {
|
||||
return nil, errors.NewNotFound(resource.GroupResource(), name)
|
||||
}
|
||||
|
||||
// update should never create as previous get would fail
|
||||
updateObject, _, updateErr := patcher.Update(ctx, objToUpdate)
|
||||
for i := 0; i < MaxPatchConflicts && (errors.IsConflict(updateErr)); i++ {
|
||||
switch {
|
||||
case len(originalObjJS) == 0 || len(originalPatchedObjJS) == 0:
|
||||
// first time through,
|
||||
// 1. apply the patch
|
||||
// 2. save the originalJS and patchedJS to detect whether there were conflicting changes on retries
|
||||
if js, err := runtime.Encode(codec, currentObject); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
originalObjJS = js
|
||||
}
|
||||
|
||||
if js, err := getPatchedJS(patchType, originalObjJS, patchJS, versionedObj); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
originalPatchedObjJS = js
|
||||
}
|
||||
|
||||
objToUpdate := patcher.New()
|
||||
if err := runtime.DecodeInto(codec, originalPatchedObjJS, objToUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objToUpdate, nil
|
||||
|
||||
default:
|
||||
// on a conflict,
|
||||
// 1. build a strategic merge patch from originalJS and the patchedJS. Different patch types can
|
||||
// be specified, but a strategic merge patch should be expressive enough handle them. Build the
|
||||
@@ -560,16 +587,10 @@ func patchResource(ctx api.Context, admit updateAdmissionFunc, timeout time.Dura
|
||||
// 2. build a strategic merge patch from originalJS and the currentJS
|
||||
// 3. ensure no conflicts between the two patches
|
||||
// 4. apply the #1 patch to the currentJS object
|
||||
// 5. retry the update
|
||||
currentObject, err := patcher.Get(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentObjectJS, err := runtime.Encode(codec, currentObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
currentPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, currentObjectJS, versionedObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -592,25 +613,41 @@ func patchResource(ctx api.Context, admit updateAdmissionFunc, timeout time.Dura
|
||||
return nil, err
|
||||
}
|
||||
if hasConflicts {
|
||||
glog.V(4).Infof("patchResource failed for resource %s, becauase there is a meaningful conflict.\n diff1=%v\n, diff2=%v\n", name, diff1, diff2)
|
||||
return updateObject, updateErr
|
||||
glog.V(4).Infof("patchResource failed for resource %s, because there is a meaningful conflict.\n diff1=%v\n, diff2=%v\n", name, diff1, diff2)
|
||||
// Return the last conflict error we got if we have one
|
||||
if lastConflictErr != nil {
|
||||
return nil, lastConflictErr
|
||||
}
|
||||
// Otherwise manufacture one of our own
|
||||
return nil, errors.NewConflict(resource.GroupResource(), name, nil)
|
||||
}
|
||||
|
||||
newlyPatchedObjJS, err := getPatchedJS(api.StrategicMergePatchType, currentObjectJS, originalPatch, versionedObj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
objToUpdate := patcher.New()
|
||||
if err := runtime.DecodeInto(codec, newlyPatchedObjJS, objToUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := admit(objToUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updateObject, _, updateErr = patcher.Update(ctx, objToUpdate)
|
||||
return objToUpdate, nil
|
||||
}
|
||||
}
|
||||
|
||||
// applyAdmission is called every time GuaranteedUpdate asks for the updated object,
|
||||
// and is given the currently persisted object and the patched object as input.
|
||||
applyAdmission := func(ctx api.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) {
|
||||
return patchedObject, admit(patchedObject, currentObject)
|
||||
}
|
||||
|
||||
updatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, copier, applyPatch, applyAdmission)
|
||||
|
||||
return finishRequest(timeout, func() (runtime.Object, error) {
|
||||
updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo)
|
||||
for i := 0; i < MaxPatchConflicts && (errors.IsConflict(updateErr)); i++ {
|
||||
lastConflictErr = updateErr
|
||||
updateObject, _, updateErr = patcher.Update(ctx, name, updatedObjectInfo)
|
||||
}
|
||||
return updateObject, updateErr
|
||||
})
|
||||
}
|
||||
@@ -667,20 +704,18 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
|
||||
return
|
||||
}
|
||||
|
||||
var transformers []rest.TransformFunc
|
||||
if admit != nil && admit.Handles(admission.Update) {
|
||||
userInfo, _ := api.UserFrom(ctx)
|
||||
|
||||
err = admit.Admit(admission.NewAttributesRecord(obj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo))
|
||||
if err != nil {
|
||||
scope.err(err, res.ResponseWriter, req.Request)
|
||||
return
|
||||
}
|
||||
transformers = append(transformers, func(ctx api.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {
|
||||
userInfo, _ := api.UserFrom(ctx)
|
||||
return newObj, admit.Admit(admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo))
|
||||
})
|
||||
}
|
||||
|
||||
trace.Step("About to store object in database")
|
||||
wasCreated := false
|
||||
result, err := finishRequest(timeout, func() (runtime.Object, error) {
|
||||
obj, created, err := r.Update(ctx, obj)
|
||||
obj, created, err := r.Update(ctx, name, rest.DefaultUpdatedObjectInfo(obj, scope.Copier, transformers...))
|
||||
wasCreated = created
|
||||
return obj, err
|
||||
})
|
||||
@@ -753,7 +788,7 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
|
||||
if admit != nil && admit.Handles(admission.Delete) {
|
||||
userInfo, _ := api.UserFrom(ctx)
|
||||
|
||||
err = admit.Admit(admission.NewAttributesRecord(nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, userInfo))
|
||||
err = admit.Admit(admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, userInfo))
|
||||
if err != nil {
|
||||
scope.err(err, res.ResponseWriter, req.Request)
|
||||
return
|
||||
@@ -814,7 +849,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
|
||||
if admit != nil && admit.Handles(admission.Delete) {
|
||||
userInfo, _ := api.UserFrom(ctx)
|
||||
|
||||
err = admit.Admit(admission.NewAttributesRecord(nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, userInfo))
|
||||
err = admit.Admit(admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, userInfo))
|
||||
if err != nil {
|
||||
scope.err(err, res.ResponseWriter, req.Request)
|
||||
return
|
||||
@@ -969,6 +1004,20 @@ func setSelfLink(obj runtime.Object, req *restful.Request, namer ScopeNamer) err
|
||||
return namer.SetSelfLink(obj, newURL.String())
|
||||
}
|
||||
|
||||
func hasUID(obj runtime.Object) (bool, error) {
|
||||
if obj == nil {
|
||||
return false, nil
|
||||
}
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return false, errors.NewInternalError(err)
|
||||
}
|
||||
if len(accessor.GetUID()) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// checkName checks the provided name against the request
|
||||
func checkName(obj runtime.Object, name, namespace string, namer ScopeNamer) error {
|
||||
if objNamespace, objName, err := namer.ObjectName(obj); err == nil {
|
||||
|
||||
Reference in New Issue
Block a user