Fix conflicts patching scale subresources

This commit is contained in:
Jordan Liggitt 2020-04-21 15:05:07 -04:00
parent 5655350b2b
commit dab2a830da
9 changed files with 753 additions and 132 deletions

View File

@ -279,39 +279,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
// Update alters scale subset of Deployment object.
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, errors.NewNotFound(apps.Resource("deployments/scale"), name)
}
deployment := obj.(*apps.Deployment)
oldScale, err := scaleFromDeployment(deployment)
if err != nil {
return nil, false, err
}
obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, false, err
}
if obj == nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
}
scale, ok := obj.(*autoscaling.Scale)
if !ok {
return nil, false, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", obj))
}
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), name, errs)
}
deployment.Spec.Replicas = scale.Spec.Replicas
deployment.ResourceVersion = scale.ResourceVersion
obj, _, err = r.store.Update(
obj, _, err := r.store.Update(
ctx,
deployment.Name,
rest.DefaultUpdatedObjectInfo(deployment),
name,
&scaleUpdatedObjectInfo{name, objInfo},
toScaleCreateValidation(createValidation),
toScaleUpdateValidation(updateValidation),
false,
@ -320,7 +291,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
if err != nil {
return nil, false, err
}
deployment = obj.(*apps.Deployment)
deployment := obj.(*apps.Deployment)
newScale, err := scaleFromDeployment(deployment)
if err != nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))
@ -376,3 +347,62 @@ func scaleFromDeployment(deployment *apps.Deployment) (*autoscaling.Scale, error
},
}, nil
}
// scaleUpdatedObjectInfo transforms existing deployment -> existing scale -> new scale -> new deployment
type scaleUpdatedObjectInfo struct {
name string
reqObjInfo rest.UpdatedObjectInfo
}
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
return i.reqObjInfo.Preconditions()
}
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
deployment, ok := oldObj.DeepCopyObject().(*apps.Deployment)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be Deployment, got %T", deployment))
}
// if zero-value, the existing object does not exist
if len(deployment.ResourceVersion) == 0 {
return nil, errors.NewNotFound(apps.Resource("deployments/scale"), i.name)
}
// deployment -> old scale
oldScale, err := scaleFromDeployment(deployment)
if err != nil {
return nil, err
}
// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, err
}
if newScaleObj == nil {
return nil, errors.NewBadRequest("nil update passed to Scale")
}
scale, ok := newScaleObj.(*autoscaling.Scale)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj))
}
// validate
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
return nil, errors.NewInvalid(autoscaling.Kind("Scale"), deployment.Name, errs)
}
// validate precondition if specified (resourceVersion matching is handled by storage)
if len(scale.UID) > 0 && scale.UID != deployment.UID {
return nil, errors.NewConflict(
apps.Resource("deployments/scale"),
deployment.Name,
fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, deployment.UID),
)
}
// move replicas/resourceVersion fields to object and return
deployment.Spec.Replicas = scale.Spec.Replicas
deployment.ResourceVersion = scale.ResourceVersion
return deployment, nil
}

View File

@ -21,10 +21,12 @@ import (
"fmt"
"net/http"
"reflect"
"sync"
"testing"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -451,3 +453,123 @@ func TestCategories(t *testing.T) {
expected := []string{"all"}
registrytest.AssertCategories(t, storage.Deployment, expected)
}
func TestScalePatchErrors(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := validNewDeployment()
resourceStore := storage.Deployment.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
{
applyNotFoundPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
t.Errorf("notfound patch called")
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected notfound, got %v", err)
}
}
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Errorf("Unexpected error: %v", err)
}
{
applyBadUIDPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).UID = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
{
applyBadResourceVersionPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).ResourceVersion = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
}
func TestScalePatchConflicts(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := validNewDeployment()
resourceStore := storage.Deployment.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
return currentObject, nil
}
}
stopCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// continuously submits a patch that updates a label and verifies the label update was effective
labelName := "timestamp"
for i := 0; ; i++ {
select {
case <-stopCh:
return
default:
expectedLabelValue := fmt.Sprint(i)
updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Errorf("error patching main resource: %v", err)
return
}
gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
if gotLabelValue != expectedLabelValue {
t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
return
}
}
}
}()
// continuously submits a scale patch of replicas for a monotonically increasing replica value
applyReplicaPatch := func(replicas int) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
return currentObject, nil
}
}
for i := 0; i < 100; i++ {
result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Fatalf("error patching scale: %v", err)
}
scale := result.(*autoscaling.Scale)
if scale.Spec.Replicas != int32(i) {
t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
}
}
close(stopCh)
wg.Wait()
}

View File

@ -138,7 +138,7 @@ func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.Updat
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
}
// ScaleREST implements a Scale for Deployment.
// ScaleREST implements a Scale for ReplicaSet.
type ScaleREST struct {
store *genericregistry.Store
}
@ -182,40 +182,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
// Update alters scale subset of ReplicaSet object.
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, errors.NewNotFound(apps.Resource("replicasets/scale"), name)
}
rs := obj.(*apps.ReplicaSet)
oldScale, err := scaleFromReplicaSet(rs)
if err != nil {
return nil, false, err
}
// TODO: should this pass admission?
obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, false, err
}
if obj == nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
}
scale, ok := obj.(*autoscaling.Scale)
if !ok {
return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj))
}
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs)
}
rs.Spec.Replicas = scale.Spec.Replicas
rs.ResourceVersion = scale.ResourceVersion
obj, _, err = r.store.Update(
obj, _, err := r.store.Update(
ctx,
rs.Name,
rest.DefaultUpdatedObjectInfo(rs),
name,
&scaleUpdatedObjectInfo{name, objInfo},
toScaleCreateValidation(createValidation),
toScaleUpdateValidation(updateValidation),
false,
@ -224,7 +194,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
if err != nil {
return nil, false, err
}
rs = obj.(*apps.ReplicaSet)
rs := obj.(*apps.ReplicaSet)
newScale, err := scaleFromReplicaSet(rs)
if err != nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))
@ -280,3 +250,62 @@ func scaleFromReplicaSet(rs *apps.ReplicaSet) (*autoscaling.Scale, error) {
},
}, nil
}
// scaleUpdatedObjectInfo transforms existing replicaset -> existing scale -> new scale -> new replicaset
type scaleUpdatedObjectInfo struct {
name string
reqObjInfo rest.UpdatedObjectInfo
}
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
return i.reqObjInfo.Preconditions()
}
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
replicaset, ok := oldObj.DeepCopyObject().(*apps.ReplicaSet)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be ReplicaSet, got %T", replicaset))
}
// if zero-value, the existing object does not exist
if len(replicaset.ResourceVersion) == 0 {
return nil, errors.NewNotFound(apps.Resource("replicasets/scale"), i.name)
}
// replicaset -> old scale
oldScale, err := scaleFromReplicaSet(replicaset)
if err != nil {
return nil, err
}
// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, err
}
if newScaleObj == nil {
return nil, errors.NewBadRequest("nil update passed to Scale")
}
scale, ok := newScaleObj.(*autoscaling.Scale)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj))
}
// validate
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
return nil, errors.NewInvalid(autoscaling.Kind("Scale"), replicaset.Name, errs)
}
// validate precondition if specified (resourceVersion matching is handled by storage)
if len(scale.UID) > 0 && scale.UID != replicaset.UID {
return nil, errors.NewConflict(
apps.Resource("replicasets/scale"),
replicaset.Name,
fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, replicaset.UID),
)
}
// move replicas/resourceVersion fields to object and return
replicaset.Spec.Replicas = scale.Spec.Replicas
replicaset.ResourceVersion = scale.ResourceVersion
return replicaset, nil
}

View File

@ -17,10 +17,14 @@ limitations under the License.
package storage
import (
"context"
"fmt"
"sync"
"testing"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -395,3 +399,127 @@ func TestCategories(t *testing.T) {
expected := []string{"all"}
registrytest.AssertCategories(t, storage.ReplicaSet, expected)
}
func TestScalePatchErrors(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := &validReplicaSet
namespace := validObj.Namespace
name := validObj.Name
resourceStore := storage.ReplicaSet.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
{
applyNotFoundPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
t.Errorf("notfound patch called")
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected notfound, got %v", err)
}
}
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Errorf("Unexpected error: %v", err)
}
{
applyBadUIDPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).UID = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
{
applyBadResourceVersionPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).ResourceVersion = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
}
func TestScalePatchConflicts(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := &validReplicaSet
namespace := validObj.Namespace
name := validObj.Name
resourceStore := storage.ReplicaSet.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
return currentObject, nil
}
}
stopCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// continuously submits a patch that updates a label and verifies the label update was effective
labelName := "timestamp"
for i := 0; ; i++ {
select {
case <-stopCh:
return
default:
expectedLabelValue := fmt.Sprint(i)
updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Errorf("error patching main resource: %v", err)
return
}
gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
if gotLabelValue != expectedLabelValue {
t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
return
}
}
}
}()
// continuously submits a scale patch of replicas for a monotonically increasing replica value
applyReplicaPatch := func(replicas int) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
return currentObject, nil
}
}
for i := 0; i < 100; i++ {
result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Fatalf("error patching scale: %v", err)
}
scale := result.(*autoscaling.Scale)
if scale.Spec.Replicas != int32(i) {
t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
}
}
close(stopCh)
wg.Wait()
}

View File

@ -20,6 +20,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",

View File

@ -168,39 +168,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
// Update alters scale subset of StatefulSet object.
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, err
}
ss := obj.(*apps.StatefulSet)
oldScale, err := scaleFromStatefulSet(ss)
if err != nil {
return nil, false, err
}
obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, false, err
}
if obj == nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
}
scale, ok := obj.(*autoscaling.Scale)
if !ok {
return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj))
}
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs)
}
ss.Spec.Replicas = scale.Spec.Replicas
ss.ResourceVersion = scale.ResourceVersion
obj, _, err = r.store.Update(
obj, _, err := r.store.Update(
ctx,
ss.Name,
rest.DefaultUpdatedObjectInfo(ss),
name,
&scaleUpdatedObjectInfo{name, objInfo},
toScaleCreateValidation(createValidation),
toScaleUpdateValidation(updateValidation),
false,
@ -209,7 +180,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
if err != nil {
return nil, false, err
}
ss = obj.(*apps.StatefulSet)
ss := obj.(*apps.StatefulSet)
newScale, err := scaleFromStatefulSet(ss)
if err != nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))
@ -265,3 +236,62 @@ func scaleFromStatefulSet(ss *apps.StatefulSet) (*autoscaling.Scale, error) {
},
}, nil
}
// scaleUpdatedObjectInfo transforms existing statefulset -> existing scale -> new scale -> new statefulset
type scaleUpdatedObjectInfo struct {
name string
reqObjInfo rest.UpdatedObjectInfo
}
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
return i.reqObjInfo.Preconditions()
}
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
statefulset, ok := oldObj.DeepCopyObject().(*apps.StatefulSet)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be StatefulSet, got %T", statefulset))
}
// if zero-value, the existing object does not exist
if len(statefulset.ResourceVersion) == 0 {
return nil, errors.NewNotFound(apps.Resource("statefulsets/scale"), i.name)
}
// statefulset -> old scale
oldScale, err := scaleFromStatefulSet(statefulset)
if err != nil {
return nil, err
}
// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, err
}
if newScaleObj == nil {
return nil, errors.NewBadRequest("nil update passed to Scale")
}
scale, ok := newScaleObj.(*autoscaling.Scale)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj))
}
// validate
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
return nil, errors.NewInvalid(autoscaling.Kind("Scale"), statefulset.Name, errs)
}
// validate precondition if specified (resourceVersion matching is handled by storage)
if len(scale.UID) > 0 && scale.UID != statefulset.UID {
return nil, errors.NewConflict(
apps.Resource("statefulsets/scale"),
statefulset.Name,
fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, statefulset.UID),
)
}
// move replicas/resourceVersion fields to object and return
statefulset.Spec.Replicas = scale.Spec.Replicas
statefulset.ResourceVersion = scale.ResourceVersion
return statefulset, nil
}

View File

@ -17,13 +17,18 @@ limitations under the License.
package storage
import (
"context"
"fmt"
"sync"
"testing"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
@ -293,3 +298,127 @@ func TestScaleUpdate(t *testing.T) {
}
// TODO: Test generation number.
func TestScalePatchErrors(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := &validStatefulSet
namespace := validObj.Namespace
name := validObj.Name
resourceStore := storage.StatefulSet.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
{
applyNotFoundPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
t.Errorf("notfound patch called")
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected notfound, got %v", err)
}
}
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Errorf("Unexpected error: %v", err)
}
{
applyBadUIDPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).UID = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
{
applyBadResourceVersionPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).ResourceVersion = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !apierrors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
}
func TestScalePatchConflicts(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := &validStatefulSet
namespace := validObj.Namespace
name := validObj.Name
resourceStore := storage.StatefulSet.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
return currentObject, nil
}
}
stopCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// continuously submits a patch that updates a label and verifies the label update was effective
labelName := "timestamp"
for i := 0; ; i++ {
select {
case <-stopCh:
return
default:
expectedLabelValue := fmt.Sprint(i)
updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Errorf("error patching main resource: %v", err)
return
}
gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
if gotLabelValue != expectedLabelValue {
t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
return
}
}
}
}()
// continuously submits a scale patch of replicas for a monotonically increasing replica value
applyReplicaPatch := func(replicas int) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
return currentObject, nil
}
}
for i := 0; i < 100; i++ {
result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Fatalf("error patching scale: %v", err)
}
scale := result.(*autoscaling.Scale)
if scale.Spec.Replicas != int32(i) {
t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
}
}
close(stopCh)
wg.Wait()
}

View File

@ -159,37 +159,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
}
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, errors.NewNotFound(autoscaling.Resource("replicationcontrollers/scale"), name)
}
rc := obj.(*api.ReplicationController)
oldScale := scaleFromRC(rc)
// TODO: should this pass validation?
obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, false, err
}
if obj == nil {
return nil, false, errors.NewBadRequest("nil update passed to Scale")
}
scale, ok := obj.(*autoscaling.Scale)
if !ok {
return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj))
}
if errs := validation.ValidateScale(scale); len(errs) > 0 {
return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs)
}
rc.Spec.Replicas = scale.Spec.Replicas
rc.ResourceVersion = scale.ResourceVersion
obj, _, err = r.store.Update(
obj, _, err := r.store.Update(
ctx,
rc.Name,
rest.DefaultUpdatedObjectInfo(rc),
name,
&scaleUpdatedObjectInfo{name, objInfo},
toScaleCreateValidation(createValidation),
toScaleUpdateValidation(updateValidation),
false,
@ -198,7 +171,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
if err != nil {
return nil, false, err
}
rc = obj.(*api.ReplicationController)
rc := obj.(*api.ReplicationController)
return scaleFromRC(rc), false, nil
}
@ -237,3 +210,59 @@ func scaleFromRC(rc *api.ReplicationController) *autoscaling.Scale {
},
}
}
// scaleUpdatedObjectInfo transforms existing replication controller -> existing scale -> new scale -> new replication controller
type scaleUpdatedObjectInfo struct {
name string
reqObjInfo rest.UpdatedObjectInfo
}
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
return i.reqObjInfo.Preconditions()
}
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
replicationcontroller, ok := oldObj.DeepCopyObject().(*api.ReplicationController)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be ReplicationController, got %T", replicationcontroller))
}
// if zero-value, the existing object does not exist
if len(replicationcontroller.ResourceVersion) == 0 {
return nil, errors.NewNotFound(api.Resource("replicationcontrollers/scale"), i.name)
}
// replicationcontroller -> old scale
oldScale := scaleFromRC(replicationcontroller)
// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, err
}
if newScaleObj == nil {
return nil, errors.NewBadRequest("nil update passed to Scale")
}
scale, ok := newScaleObj.(*autoscaling.Scale)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj))
}
// validate
if errs := validation.ValidateScale(scale); len(errs) > 0 {
return nil, errors.NewInvalid(autoscaling.Kind("Scale"), replicationcontroller.Name, errs)
}
// validate precondition if specified (resourceVersion matching is handled by storage)
if len(scale.UID) > 0 && scale.UID != replicationcontroller.UID {
return nil, errors.NewConflict(
api.Resource("replicationcontrollers/scale"),
replicationcontroller.Name,
fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, replicationcontroller.UID),
)
}
// move replicas/resourceVersion fields to object and return
replicationcontroller.Spec.Replicas = scale.Spec.Replicas
replicationcontroller.ResourceVersion = scale.ResourceVersion
return replicationcontroller, nil
}

View File

@ -17,6 +17,9 @@ limitations under the License.
package storage
import (
"context"
"fmt"
"sync"
"testing"
apiequality "k8s.io/apimachinery/pkg/api/equality"
@ -348,3 +351,123 @@ func TestCategories(t *testing.T) {
expected := []string{"all"}
registrytest.AssertCategories(t, storage.Controller, expected)
}
func TestScalePatchErrors(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := validController
resourceStore := storage.Controller.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
{
applyNotFoundPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
t.Errorf("notfound patch called")
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !errors.IsNotFound(err) {
t.Errorf("expected notfound, got %v", err)
}
}
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Errorf("Unexpected error: %v", err)
}
{
applyBadUIDPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).UID = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !errors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
{
applyBadResourceVersionPatch := func() rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).ResourceVersion = "123"
return currentObject, nil
}
}
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if !errors.IsConflict(err) {
t.Errorf("expected conflict, got %v", err)
}
}
}
func TestScalePatchConflicts(t *testing.T) {
storage, server := newStorage(t)
defer server.Terminate(t)
validObj := validController
resourceStore := storage.Controller.Store
scaleStore := storage.Scale
defer resourceStore.DestroyFunc()
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
return currentObject, nil
}
}
stopCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// continuously submits a patch that updates a label and verifies the label update was effective
labelName := "timestamp"
for i := 0; ; i++ {
select {
case <-stopCh:
return
default:
expectedLabelValue := fmt.Sprint(i)
updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Errorf("error patching main resource: %v", err)
return
}
gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
if gotLabelValue != expectedLabelValue {
t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
return
}
}
}
}()
// continuously submits a scale patch of replicas for a monotonically increasing replica value
applyReplicaPatch := func(replicas int) rest.TransformFunc {
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
return currentObject, nil
}
}
for i := 0; i < 100; i++ {
result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Fatalf("error patching scale: %v", err)
}
scale := result.(*autoscaling.Scale)
if scale.Spec.Replicas != int32(i) {
t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
}
}
close(stopCh)
wg.Wait()
}