From dab2a830da2f6966946b7ab4292f224650c9d4ad Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Tue, 21 Apr 2020 15:05:07 -0400 Subject: [PATCH] Fix conflicts patching scale subresources --- .../apps/deployment/storage/storage.go | 96 ++++++++----- .../apps/deployment/storage/storage_test.go | 122 +++++++++++++++++ .../apps/replicaset/storage/storage.go | 99 +++++++++----- .../apps/replicaset/storage/storage_test.go | 128 +++++++++++++++++ pkg/registry/apps/statefulset/storage/BUILD | 1 + .../apps/statefulset/storage/storage.go | 96 ++++++++----- .../apps/statefulset/storage/storage_test.go | 129 ++++++++++++++++++ .../replicationcontroller/storage/storage.go | 91 +++++++----- .../storage/storage_test.go | 123 +++++++++++++++++ 9 files changed, 753 insertions(+), 132 deletions(-) diff --git a/pkg/registry/apps/deployment/storage/storage.go b/pkg/registry/apps/deployment/storage/storage.go index 45d5eb11bd4..c310d3b6bb1 100644 --- a/pkg/registry/apps/deployment/storage/storage.go +++ b/pkg/registry/apps/deployment/storage/storage.go @@ -279,39 +279,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt // Update alters scale subset of Deployment object. func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - obj, err := r.store.Get(ctx, name, &metav1.GetOptions{}) - if err != nil { - return nil, false, errors.NewNotFound(apps.Resource("deployments/scale"), name) - } - deployment := obj.(*apps.Deployment) - - oldScale, err := scaleFromDeployment(deployment) - if err != nil { - return nil, false, err - } - - obj, err = objInfo.UpdatedObject(ctx, oldScale) - if err != nil { - return nil, false, err - } - if obj == nil { - return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale")) - } - scale, ok := obj.(*autoscaling.Scale) - if !ok { - return nil, false, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", obj)) - } - - if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 { - return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), name, errs) - } - - deployment.Spec.Replicas = scale.Spec.Replicas - deployment.ResourceVersion = scale.ResourceVersion - obj, _, err = r.store.Update( + obj, _, err := r.store.Update( ctx, - deployment.Name, - rest.DefaultUpdatedObjectInfo(deployment), + name, + &scaleUpdatedObjectInfo{name, objInfo}, toScaleCreateValidation(createValidation), toScaleUpdateValidation(updateValidation), false, @@ -320,7 +291,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update if err != nil { return nil, false, err } - deployment = obj.(*apps.Deployment) + deployment := obj.(*apps.Deployment) newScale, err := scaleFromDeployment(deployment) if err != nil { return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err)) @@ -376,3 +347,62 @@ func scaleFromDeployment(deployment *apps.Deployment) (*autoscaling.Scale, error }, }, nil } + +// scaleUpdatedObjectInfo transforms existing deployment -> existing scale -> new scale -> new deployment +type scaleUpdatedObjectInfo struct { + name string + reqObjInfo rest.UpdatedObjectInfo +} + +func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions { + return i.reqObjInfo.Preconditions() +} + +func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) { + deployment, ok := oldObj.DeepCopyObject().(*apps.Deployment) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be Deployment, got %T", deployment)) + } + // if zero-value, the existing object does not exist + if len(deployment.ResourceVersion) == 0 { + return nil, errors.NewNotFound(apps.Resource("deployments/scale"), i.name) + } + + // deployment -> old scale + oldScale, err := scaleFromDeployment(deployment) + if err != nil { + return nil, err + } + + // old scale -> new scale + newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale) + if err != nil { + return nil, err + } + if newScaleObj == nil { + return nil, errors.NewBadRequest("nil update passed to Scale") + } + scale, ok := newScaleObj.(*autoscaling.Scale) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj)) + } + + // validate + if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 { + return nil, errors.NewInvalid(autoscaling.Kind("Scale"), deployment.Name, errs) + } + + // validate precondition if specified (resourceVersion matching is handled by storage) + if len(scale.UID) > 0 && scale.UID != deployment.UID { + return nil, errors.NewConflict( + apps.Resource("deployments/scale"), + deployment.Name, + fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, deployment.UID), + ) + } + + // move replicas/resourceVersion fields to object and return + deployment.Spec.Replicas = scale.Spec.Replicas + deployment.ResourceVersion = scale.ResourceVersion + return deployment, nil +} diff --git a/pkg/registry/apps/deployment/storage/storage_test.go b/pkg/registry/apps/deployment/storage/storage_test.go index 590c9f8af58..b47ee52939a 100644 --- a/pkg/registry/apps/deployment/storage/storage_test.go +++ b/pkg/registry/apps/deployment/storage/storage_test.go @@ -21,10 +21,12 @@ import ( "fmt" "net/http" "reflect" + "sync" "testing" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -451,3 +453,123 @@ func TestCategories(t *testing.T) { expected := []string{"all"} registrytest.AssertCategories(t, storage.Deployment, expected) } + +func TestScalePatchErrors(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := validNewDeployment() + resourceStore := storage.Deployment.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + + { + applyNotFoundPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + t.Errorf("notfound patch called") + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected notfound, got %v", err) + } + } + + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + { + applyBadUIDPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).UID = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } + + { + applyBadResourceVersionPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).ResourceVersion = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } +} + +func TestScalePatchConflicts(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := validNewDeployment() + resourceStore := storage.Deployment.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue}) + return currentObject, nil + } + } + stopCh := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // continuously submits a patch that updates a label and verifies the label update was effective + labelName := "timestamp" + for i := 0; ; i++ { + select { + case <-stopCh: + return + default: + expectedLabelValue := fmt.Sprint(i) + updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Errorf("error patching main resource: %v", err) + return + } + gotLabelValue := updated.(metav1.Object).GetLabels()[labelName] + if gotLabelValue != expectedLabelValue { + t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue) + return + } + } + } + }() + + // continuously submits a scale patch of replicas for a monotonically increasing replica value + applyReplicaPatch := func(replicas int) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas) + return currentObject, nil + } + } + for i := 0; i < 100; i++ { + result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("error patching scale: %v", err) + } + scale := result.(*autoscaling.Scale) + if scale.Spec.Replicas != int32(i) { + t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas) + } + } + close(stopCh) + wg.Wait() +} diff --git a/pkg/registry/apps/replicaset/storage/storage.go b/pkg/registry/apps/replicaset/storage/storage.go index e50719dbb3b..7a751ef0964 100644 --- a/pkg/registry/apps/replicaset/storage/storage.go +++ b/pkg/registry/apps/replicaset/storage/storage.go @@ -138,7 +138,7 @@ func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.Updat return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options) } -// ScaleREST implements a Scale for Deployment. +// ScaleREST implements a Scale for ReplicaSet. type ScaleREST struct { store *genericregistry.Store } @@ -182,40 +182,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt // Update alters scale subset of ReplicaSet object. func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - obj, err := r.store.Get(ctx, name, &metav1.GetOptions{}) - if err != nil { - return nil, false, errors.NewNotFound(apps.Resource("replicasets/scale"), name) - } - rs := obj.(*apps.ReplicaSet) - - oldScale, err := scaleFromReplicaSet(rs) - if err != nil { - return nil, false, err - } - - // TODO: should this pass admission? - obj, err = objInfo.UpdatedObject(ctx, oldScale) - if err != nil { - return nil, false, err - } - if obj == nil { - return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale")) - } - scale, ok := obj.(*autoscaling.Scale) - if !ok { - return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj)) - } - - if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 { - return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs) - } - - rs.Spec.Replicas = scale.Spec.Replicas - rs.ResourceVersion = scale.ResourceVersion - obj, _, err = r.store.Update( + obj, _, err := r.store.Update( ctx, - rs.Name, - rest.DefaultUpdatedObjectInfo(rs), + name, + &scaleUpdatedObjectInfo{name, objInfo}, toScaleCreateValidation(createValidation), toScaleUpdateValidation(updateValidation), false, @@ -224,7 +194,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update if err != nil { return nil, false, err } - rs = obj.(*apps.ReplicaSet) + rs := obj.(*apps.ReplicaSet) newScale, err := scaleFromReplicaSet(rs) if err != nil { return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err)) @@ -280,3 +250,62 @@ func scaleFromReplicaSet(rs *apps.ReplicaSet) (*autoscaling.Scale, error) { }, }, nil } + +// scaleUpdatedObjectInfo transforms existing replicaset -> existing scale -> new scale -> new replicaset +type scaleUpdatedObjectInfo struct { + name string + reqObjInfo rest.UpdatedObjectInfo +} + +func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions { + return i.reqObjInfo.Preconditions() +} + +func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) { + replicaset, ok := oldObj.DeepCopyObject().(*apps.ReplicaSet) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be ReplicaSet, got %T", replicaset)) + } + // if zero-value, the existing object does not exist + if len(replicaset.ResourceVersion) == 0 { + return nil, errors.NewNotFound(apps.Resource("replicasets/scale"), i.name) + } + + // replicaset -> old scale + oldScale, err := scaleFromReplicaSet(replicaset) + if err != nil { + return nil, err + } + + // old scale -> new scale + newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale) + if err != nil { + return nil, err + } + if newScaleObj == nil { + return nil, errors.NewBadRequest("nil update passed to Scale") + } + scale, ok := newScaleObj.(*autoscaling.Scale) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj)) + } + + // validate + if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 { + return nil, errors.NewInvalid(autoscaling.Kind("Scale"), replicaset.Name, errs) + } + + // validate precondition if specified (resourceVersion matching is handled by storage) + if len(scale.UID) > 0 && scale.UID != replicaset.UID { + return nil, errors.NewConflict( + apps.Resource("replicasets/scale"), + replicaset.Name, + fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, replicaset.UID), + ) + } + + // move replicas/resourceVersion fields to object and return + replicaset.Spec.Replicas = scale.Spec.Replicas + replicaset.ResourceVersion = scale.ResourceVersion + return replicaset, nil +} diff --git a/pkg/registry/apps/replicaset/storage/storage_test.go b/pkg/registry/apps/replicaset/storage/storage_test.go index ca4b10c2542..bd4d739a360 100644 --- a/pkg/registry/apps/replicaset/storage/storage_test.go +++ b/pkg/registry/apps/replicaset/storage/storage_test.go @@ -17,10 +17,14 @@ limitations under the License. package storage import ( + "context" + "fmt" + "sync" "testing" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -395,3 +399,127 @@ func TestCategories(t *testing.T) { expected := []string{"all"} registrytest.AssertCategories(t, storage.ReplicaSet, expected) } + +func TestScalePatchErrors(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := &validReplicaSet + namespace := validObj.Namespace + name := validObj.Name + resourceStore := storage.ReplicaSet.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + + { + applyNotFoundPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + t.Errorf("notfound patch called") + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected notfound, got %v", err) + } + } + + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + { + applyBadUIDPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).UID = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } + + { + applyBadResourceVersionPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).ResourceVersion = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } +} + +func TestScalePatchConflicts(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := &validReplicaSet + namespace := validObj.Namespace + name := validObj.Name + resourceStore := storage.ReplicaSet.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue}) + return currentObject, nil + } + } + stopCh := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // continuously submits a patch that updates a label and verifies the label update was effective + labelName := "timestamp" + for i := 0; ; i++ { + select { + case <-stopCh: + return + default: + expectedLabelValue := fmt.Sprint(i) + updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Errorf("error patching main resource: %v", err) + return + } + gotLabelValue := updated.(metav1.Object).GetLabels()[labelName] + if gotLabelValue != expectedLabelValue { + t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue) + return + } + } + } + }() + + // continuously submits a scale patch of replicas for a monotonically increasing replica value + applyReplicaPatch := func(replicas int) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas) + return currentObject, nil + } + } + for i := 0; i < 100; i++ { + result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("error patching scale: %v", err) + } + scale := result.(*autoscaling.Scale) + if scale.Spec.Replicas != int32(i) { + t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas) + } + } + close(stopCh) + wg.Wait() +} diff --git a/pkg/registry/apps/statefulset/storage/BUILD b/pkg/registry/apps/statefulset/storage/BUILD index e4369dcfc2c..ac35776facf 100644 --- a/pkg/registry/apps/statefulset/storage/BUILD +++ b/pkg/registry/apps/statefulset/storage/BUILD @@ -20,6 +20,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", diff --git a/pkg/registry/apps/statefulset/storage/storage.go b/pkg/registry/apps/statefulset/storage/storage.go index e20bcc421ba..34e31723f12 100644 --- a/pkg/registry/apps/statefulset/storage/storage.go +++ b/pkg/registry/apps/statefulset/storage/storage.go @@ -168,39 +168,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt // Update alters scale subset of StatefulSet object. func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - obj, err := r.store.Get(ctx, name, &metav1.GetOptions{}) - if err != nil { - return nil, false, err - } - ss := obj.(*apps.StatefulSet) - - oldScale, err := scaleFromStatefulSet(ss) - if err != nil { - return nil, false, err - } - - obj, err = objInfo.UpdatedObject(ctx, oldScale) - if err != nil { - return nil, false, err - } - if obj == nil { - return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale")) - } - scale, ok := obj.(*autoscaling.Scale) - if !ok { - return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj)) - } - - if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 { - return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs) - } - - ss.Spec.Replicas = scale.Spec.Replicas - ss.ResourceVersion = scale.ResourceVersion - obj, _, err = r.store.Update( + obj, _, err := r.store.Update( ctx, - ss.Name, - rest.DefaultUpdatedObjectInfo(ss), + name, + &scaleUpdatedObjectInfo{name, objInfo}, toScaleCreateValidation(createValidation), toScaleUpdateValidation(updateValidation), false, @@ -209,7 +180,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update if err != nil { return nil, false, err } - ss = obj.(*apps.StatefulSet) + ss := obj.(*apps.StatefulSet) newScale, err := scaleFromStatefulSet(ss) if err != nil { return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err)) @@ -265,3 +236,62 @@ func scaleFromStatefulSet(ss *apps.StatefulSet) (*autoscaling.Scale, error) { }, }, nil } + +// scaleUpdatedObjectInfo transforms existing statefulset -> existing scale -> new scale -> new statefulset +type scaleUpdatedObjectInfo struct { + name string + reqObjInfo rest.UpdatedObjectInfo +} + +func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions { + return i.reqObjInfo.Preconditions() +} + +func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) { + statefulset, ok := oldObj.DeepCopyObject().(*apps.StatefulSet) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be StatefulSet, got %T", statefulset)) + } + // if zero-value, the existing object does not exist + if len(statefulset.ResourceVersion) == 0 { + return nil, errors.NewNotFound(apps.Resource("statefulsets/scale"), i.name) + } + + // statefulset -> old scale + oldScale, err := scaleFromStatefulSet(statefulset) + if err != nil { + return nil, err + } + + // old scale -> new scale + newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale) + if err != nil { + return nil, err + } + if newScaleObj == nil { + return nil, errors.NewBadRequest("nil update passed to Scale") + } + scale, ok := newScaleObj.(*autoscaling.Scale) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj)) + } + + // validate + if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 { + return nil, errors.NewInvalid(autoscaling.Kind("Scale"), statefulset.Name, errs) + } + + // validate precondition if specified (resourceVersion matching is handled by storage) + if len(scale.UID) > 0 && scale.UID != statefulset.UID { + return nil, errors.NewConflict( + apps.Resource("statefulsets/scale"), + statefulset.Name, + fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, statefulset.UID), + ) + } + + // move replicas/resourceVersion fields to object and return + statefulset.Spec.Replicas = scale.Spec.Replicas + statefulset.ResourceVersion = scale.ResourceVersion + return statefulset, nil +} diff --git a/pkg/registry/apps/statefulset/storage/storage_test.go b/pkg/registry/apps/statefulset/storage/storage_test.go index 4e392f41913..82505925dd4 100644 --- a/pkg/registry/apps/statefulset/storage/storage_test.go +++ b/pkg/registry/apps/statefulset/storage/storage_test.go @@ -17,13 +17,18 @@ limitations under the License. package storage import ( + "context" + "fmt" + "sync" "testing" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" @@ -293,3 +298,127 @@ func TestScaleUpdate(t *testing.T) { } // TODO: Test generation number. + +func TestScalePatchErrors(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := &validStatefulSet + namespace := validObj.Namespace + name := validObj.Name + resourceStore := storage.StatefulSet.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + + { + applyNotFoundPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + t.Errorf("notfound patch called") + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected notfound, got %v", err) + } + } + + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + { + applyBadUIDPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).UID = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } + + { + applyBadResourceVersionPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).ResourceVersion = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !apierrors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } +} + +func TestScalePatchConflicts(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := &validStatefulSet + namespace := validObj.Namespace + name := validObj.Name + resourceStore := storage.StatefulSet.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue}) + return currentObject, nil + } + } + stopCh := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // continuously submits a patch that updates a label and verifies the label update was effective + labelName := "timestamp" + for i := 0; ; i++ { + select { + case <-stopCh: + return + default: + expectedLabelValue := fmt.Sprint(i) + updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Errorf("error patching main resource: %v", err) + return + } + gotLabelValue := updated.(metav1.Object).GetLabels()[labelName] + if gotLabelValue != expectedLabelValue { + t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue) + return + } + } + } + }() + + // continuously submits a scale patch of replicas for a monotonically increasing replica value + applyReplicaPatch := func(replicas int) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas) + return currentObject, nil + } + } + for i := 0; i < 100; i++ { + result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("error patching scale: %v", err) + } + scale := result.(*autoscaling.Scale) + if scale.Spec.Replicas != int32(i) { + t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas) + } + } + close(stopCh) + wg.Wait() +} diff --git a/pkg/registry/core/replicationcontroller/storage/storage.go b/pkg/registry/core/replicationcontroller/storage/storage.go index db5c2d1a7ae..1eac4333c3d 100644 --- a/pkg/registry/core/replicationcontroller/storage/storage.go +++ b/pkg/registry/core/replicationcontroller/storage/storage.go @@ -159,37 +159,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt } func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - obj, err := r.store.Get(ctx, name, &metav1.GetOptions{}) - if err != nil { - return nil, false, errors.NewNotFound(autoscaling.Resource("replicationcontrollers/scale"), name) - } - rc := obj.(*api.ReplicationController) - - oldScale := scaleFromRC(rc) - // TODO: should this pass validation? - obj, err = objInfo.UpdatedObject(ctx, oldScale) - if err != nil { - return nil, false, err - } - - if obj == nil { - return nil, false, errors.NewBadRequest("nil update passed to Scale") - } - scale, ok := obj.(*autoscaling.Scale) - if !ok { - return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj)) - } - - if errs := validation.ValidateScale(scale); len(errs) > 0 { - return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs) - } - - rc.Spec.Replicas = scale.Spec.Replicas - rc.ResourceVersion = scale.ResourceVersion - obj, _, err = r.store.Update( + obj, _, err := r.store.Update( ctx, - rc.Name, - rest.DefaultUpdatedObjectInfo(rc), + name, + &scaleUpdatedObjectInfo{name, objInfo}, toScaleCreateValidation(createValidation), toScaleUpdateValidation(updateValidation), false, @@ -198,7 +171,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update if err != nil { return nil, false, err } - rc = obj.(*api.ReplicationController) + rc := obj.(*api.ReplicationController) return scaleFromRC(rc), false, nil } @@ -237,3 +210,59 @@ func scaleFromRC(rc *api.ReplicationController) *autoscaling.Scale { }, } } + +// scaleUpdatedObjectInfo transforms existing replication controller -> existing scale -> new scale -> new replication controller +type scaleUpdatedObjectInfo struct { + name string + reqObjInfo rest.UpdatedObjectInfo +} + +func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions { + return i.reqObjInfo.Preconditions() +} + +func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) { + replicationcontroller, ok := oldObj.DeepCopyObject().(*api.ReplicationController) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be ReplicationController, got %T", replicationcontroller)) + } + // if zero-value, the existing object does not exist + if len(replicationcontroller.ResourceVersion) == 0 { + return nil, errors.NewNotFound(api.Resource("replicationcontrollers/scale"), i.name) + } + + // replicationcontroller -> old scale + oldScale := scaleFromRC(replicationcontroller) + + // old scale -> new scale + newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale) + if err != nil { + return nil, err + } + if newScaleObj == nil { + return nil, errors.NewBadRequest("nil update passed to Scale") + } + scale, ok := newScaleObj.(*autoscaling.Scale) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj)) + } + + // validate + if errs := validation.ValidateScale(scale); len(errs) > 0 { + return nil, errors.NewInvalid(autoscaling.Kind("Scale"), replicationcontroller.Name, errs) + } + + // validate precondition if specified (resourceVersion matching is handled by storage) + if len(scale.UID) > 0 && scale.UID != replicationcontroller.UID { + return nil, errors.NewConflict( + api.Resource("replicationcontrollers/scale"), + replicationcontroller.Name, + fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, replicationcontroller.UID), + ) + } + + // move replicas/resourceVersion fields to object and return + replicationcontroller.Spec.Replicas = scale.Spec.Replicas + replicationcontroller.ResourceVersion = scale.ResourceVersion + return replicationcontroller, nil +} diff --git a/pkg/registry/core/replicationcontroller/storage/storage_test.go b/pkg/registry/core/replicationcontroller/storage/storage_test.go index 3bd23598452..009fed33b1a 100644 --- a/pkg/registry/core/replicationcontroller/storage/storage_test.go +++ b/pkg/registry/core/replicationcontroller/storage/storage_test.go @@ -17,6 +17,9 @@ limitations under the License. package storage import ( + "context" + "fmt" + "sync" "testing" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -348,3 +351,123 @@ func TestCategories(t *testing.T) { expected := []string{"all"} registrytest.AssertCategories(t, storage.Controller, expected) } + +func TestScalePatchErrors(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := validController + resourceStore := storage.Controller.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + + { + applyNotFoundPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + t.Errorf("notfound patch called") + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !errors.IsNotFound(err) { + t.Errorf("expected notfound, got %v", err) + } + } + + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + { + applyBadUIDPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).UID = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !errors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } + + { + applyBadResourceVersionPatch := func() rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).ResourceVersion = "123" + return currentObject, nil + } + } + _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if !errors.IsConflict(err) { + t.Errorf("expected conflict, got %v", err) + } + } +} + +func TestScalePatchConflicts(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + validObj := validController + resourceStore := storage.Controller.Store + scaleStore := storage.Scale + + defer resourceStore.DestroyFunc() + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace) + if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue}) + return currentObject, nil + } + } + stopCh := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // continuously submits a patch that updates a label and verifies the label update was effective + labelName := "timestamp" + for i := 0; ; i++ { + select { + case <-stopCh: + return + default: + expectedLabelValue := fmt.Sprint(i) + updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Errorf("error patching main resource: %v", err) + return + } + gotLabelValue := updated.(metav1.Object).GetLabels()[labelName] + if gotLabelValue != expectedLabelValue { + t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue) + return + } + } + } + }() + + // continuously submits a scale patch of replicas for a monotonically increasing replica value + applyReplicaPatch := func(replicas int) rest.TransformFunc { + return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) { + currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas) + return currentObject, nil + } + } + for i := 0; i < 100; i++ { + result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("error patching scale: %v", err) + } + scale := result.(*autoscaling.Scale) + if scale.Spec.Replicas != int32(i) { + t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas) + } + } + close(stopCh) + wg.Wait() +}