mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #80572 from knight42/fix/scale-cr
Fix missing resource version when updating the scale subresource of custom resource
This commit is contained in:
commit
a6f51da500
@ -237,48 +237,17 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
||||||
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
|
scaleObjInfo := &scaleUpdatedObjectInfo{
|
||||||
if err != nil {
|
reqObjInfo: objInfo,
|
||||||
return nil, false, err
|
specReplicasPath: r.specReplicasPath,
|
||||||
}
|
labelSelectorPath: r.labelSelectorPath,
|
||||||
cr := obj.(*unstructured.Unstructured)
|
statusReplicasPath: r.statusReplicasPath,
|
||||||
|
|
||||||
const invalidSpecReplicas = -2147483648 // smallest int32
|
|
||||||
oldScale, replicasFound, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
if !replicasFound {
|
|
||||||
oldScale.Spec.Replicas = invalidSpecReplicas // signal that this was not set before
|
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err = objInfo.UpdatedObject(ctx, oldScale)
|
obj, _, err := r.store.Update(
|
||||||
if err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
if obj == nil {
|
|
||||||
return nil, false, apierrors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
|
|
||||||
}
|
|
||||||
|
|
||||||
scale, ok := obj.(*autoscalingv1.Scale)
|
|
||||||
if !ok {
|
|
||||||
return nil, false, apierrors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj))
|
|
||||||
}
|
|
||||||
|
|
||||||
if scale.Spec.Replicas == invalidSpecReplicas {
|
|
||||||
return nil, false, apierrors.NewBadRequest(fmt.Sprintf("the spec replicas field %q cannot be empty", r.specReplicasPath))
|
|
||||||
}
|
|
||||||
|
|
||||||
specReplicasPath := strings.TrimPrefix(r.specReplicasPath, ".") // ignore leading period
|
|
||||||
if err = unstructured.SetNestedField(cr.Object, int64(scale.Spec.Replicas), strings.Split(specReplicasPath, ".")...); err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
cr.SetResourceVersion(scale.ResourceVersion)
|
|
||||||
|
|
||||||
obj, _, err = r.store.Update(
|
|
||||||
ctx,
|
ctx,
|
||||||
cr.GetName(),
|
name,
|
||||||
rest.DefaultUpdatedObjectInfo(cr),
|
scaleObjInfo,
|
||||||
toScaleCreateValidation(createValidation, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath),
|
toScaleCreateValidation(createValidation, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath),
|
||||||
toScaleUpdateValidation(updateValidation, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath),
|
toScaleUpdateValidation(updateValidation, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath),
|
||||||
false,
|
false,
|
||||||
@ -287,12 +256,13 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
cr = obj.(*unstructured.Unstructured)
|
cr := obj.(*unstructured.Unstructured)
|
||||||
|
|
||||||
newScale, _, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath)
|
newScale, _, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, apierrors.NewBadRequest(err.Error())
|
return nil, false, apierrors.NewBadRequest(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
return newScale, false, err
|
return newScale, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -372,3 +342,55 @@ func scaleFromCustomResource(cr *unstructured.Unstructured, specReplicasPath, st
|
|||||||
|
|
||||||
return scale, foundSpecReplicas, nil
|
return scale, foundSpecReplicas, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type scaleUpdatedObjectInfo struct {
|
||||||
|
reqObjInfo rest.UpdatedObjectInfo
|
||||||
|
specReplicasPath string
|
||||||
|
statusReplicasPath string
|
||||||
|
labelSelectorPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
|
||||||
|
return i.reqObjInfo.Preconditions()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
|
||||||
|
cr := oldObj.DeepCopyObject().(*unstructured.Unstructured)
|
||||||
|
const invalidSpecReplicas = -2147483648 // smallest int32
|
||||||
|
oldScale, replicasFound, err := scaleFromCustomResource(cr, i.specReplicasPath, i.statusReplicasPath, i.labelSelectorPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !replicasFound {
|
||||||
|
oldScale.Spec.Replicas = invalidSpecReplicas // signal that this was not set before
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if obj == nil {
|
||||||
|
return nil, apierrors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
|
||||||
|
}
|
||||||
|
|
||||||
|
scale, ok := obj.(*autoscalingv1.Scale)
|
||||||
|
if !ok {
|
||||||
|
return nil, apierrors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj))
|
||||||
|
}
|
||||||
|
|
||||||
|
if scale.Spec.Replicas == invalidSpecReplicas {
|
||||||
|
return nil, apierrors.NewBadRequest(fmt.Sprintf("the spec replicas field %q cannot be empty", i.specReplicasPath))
|
||||||
|
}
|
||||||
|
|
||||||
|
specReplicasPath := strings.TrimPrefix(i.specReplicasPath, ".") // ignore leading period
|
||||||
|
|
||||||
|
if err := unstructured.SetNestedField(cr.Object, int64(scale.Spec.Replicas), strings.Split(specReplicasPath, ".")...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(scale.ResourceVersion) != 0 {
|
||||||
|
// The client provided a resourceVersion precondition.
|
||||||
|
// Set that precondition and return any conflict errors to the client.
|
||||||
|
cr.SetResourceVersion(scale.ResourceVersion)
|
||||||
|
}
|
||||||
|
return cr, nil
|
||||||
|
}
|
||||||
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||||||
package customresource_test
|
package customresource_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -526,6 +528,184 @@ func TestScaleUpdateWithoutSpecReplicas(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScaleUpdateWithoutResourceVersion(t *testing.T) {
|
||||||
|
storage, server := newStorage(t)
|
||||||
|
defer server.Terminate(t)
|
||||||
|
defer storage.CustomResource.Store.DestroyFunc()
|
||||||
|
|
||||||
|
name := "foo"
|
||||||
|
|
||||||
|
var cr unstructured.Unstructured
|
||||||
|
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
|
||||||
|
key := "/noxus/" + metav1.NamespaceDefault + "/" + name
|
||||||
|
if err := storage.CustomResource.Storage.Create(ctx, key, &validCustomResource, &cr, 0, false); err != nil {
|
||||||
|
t.Fatalf("error setting new custom resource (key: %s) %v: %v", key, validCustomResource, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
replicas := int32(8)
|
||||||
|
update := autoscalingv1.Scale{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Spec: autoscalingv1.ScaleSpec{
|
||||||
|
Replicas: replicas,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, _, err := storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
|
||||||
|
t.Fatalf("error updating scale %v: %v", update, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error fetching scale for %s: %v", name, err)
|
||||||
|
}
|
||||||
|
scale := obj.(*autoscalingv1.Scale)
|
||||||
|
if scale.Spec.Replicas != replicas {
|
||||||
|
t.Errorf("wrong replicas count: expected: %d got: %d", replicas, scale.Spec.Replicas)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScaleUpdateWithoutResourceVersionWithConflicts(t *testing.T) {
|
||||||
|
storage, server := newStorage(t)
|
||||||
|
defer server.Terminate(t)
|
||||||
|
defer storage.CustomResource.Store.DestroyFunc()
|
||||||
|
|
||||||
|
name := "foo"
|
||||||
|
|
||||||
|
var cr unstructured.Unstructured
|
||||||
|
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
|
||||||
|
key := "/noxus/" + metav1.NamespaceDefault + "/" + name
|
||||||
|
if err := storage.CustomResource.Storage.Create(ctx, key, &validCustomResource, &cr, 0, false); err != nil {
|
||||||
|
t.Fatalf("error setting new custom resource (key: %s) %v: %v", key, validCustomResource, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchObject := func(name string) (*unstructured.Unstructured, error) {
|
||||||
|
gotObj, err := storage.CustomResource.Get(ctx, name, &metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error fetching custom resource %s: %v", name, err)
|
||||||
|
}
|
||||||
|
return gotObj.(*unstructured.Unstructured), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
applyPatch := func(labelName, labelValue string) rest.TransformFunc {
|
||||||
|
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
|
||||||
|
o := currentObject.(metav1.Object)
|
||||||
|
o.SetLabels(map[string]string{
|
||||||
|
labelName: labelValue,
|
||||||
|
})
|
||||||
|
return currentObject, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := make(chan error, 1)
|
||||||
|
rounds := 100
|
||||||
|
go func() {
|
||||||
|
// continuously submits a patch that updates a label and verifies the label update was effective
|
||||||
|
labelName := "timestamp"
|
||||||
|
for i := 0; i < rounds; i++ {
|
||||||
|
expectedLabelValue := fmt.Sprint(i)
|
||||||
|
update, err := fetchObject(name)
|
||||||
|
if err != nil {
|
||||||
|
errs <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
setNestedField(update, expectedLabelValue, "metadata", "labels", labelName)
|
||||||
|
if _, _, err := storage.CustomResource.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
|
||||||
|
|
||||||
|
errs <- fmt.Errorf("error updating custom resource label: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
gotObj, err := fetchObject(name)
|
||||||
|
if err != nil {
|
||||||
|
errs <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
gotLabelValue, _, err := unstructured.NestedString(gotObj.Object, "metadata", "labels", labelName)
|
||||||
|
if err != nil {
|
||||||
|
errs <- fmt.Errorf("error getting label %s of custom resource %s: %v", labelName, name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if gotLabelValue != expectedLabelValue {
|
||||||
|
errs <- fmt.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
replicas := int32(0)
|
||||||
|
update := autoscalingv1.Scale{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// continuously submits a scale update without a resourceVersion for a monotonically increasing replica value
|
||||||
|
// and verifies the scale update was effective
|
||||||
|
for i := 0; i < rounds; i++ {
|
||||||
|
select {
|
||||||
|
case err := <-errs:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
replicas++
|
||||||
|
update.Spec.Replicas = replicas
|
||||||
|
if _, _, err := storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
|
||||||
|
t.Fatalf("error updating scale %v: %v", update, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error fetching scale for %s: %v", name, err)
|
||||||
|
}
|
||||||
|
scale := obj.(*autoscalingv1.Scale)
|
||||||
|
if scale.Spec.Replicas != replicas {
|
||||||
|
t.Errorf("wrong replicas count: expected: %d got: %d", replicas, scale.Spec.Replicas)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScaleUpdateWithResourceVersionWithConflicts(t *testing.T) {
|
||||||
|
storage, server := newStorage(t)
|
||||||
|
defer server.Terminate(t)
|
||||||
|
defer storage.CustomResource.Store.DestroyFunc()
|
||||||
|
|
||||||
|
name := "foo"
|
||||||
|
|
||||||
|
var cr unstructured.Unstructured
|
||||||
|
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
|
||||||
|
key := "/noxus/" + metav1.NamespaceDefault + "/" + name
|
||||||
|
if err := storage.CustomResource.Storage.Create(ctx, key, &validCustomResource, &cr, 0, false); err != nil {
|
||||||
|
t.Fatalf("error setting new custom resource (key: %s) %v: %v", key, validCustomResource, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error fetching scale for %s: %v", name, err)
|
||||||
|
}
|
||||||
|
scale, ok := obj.(*autoscalingv1.Scale)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("%v is not of the type autoscalingv1.Scale", scale)
|
||||||
|
}
|
||||||
|
|
||||||
|
replicas := int32(12)
|
||||||
|
update := autoscalingv1.Scale{
|
||||||
|
ObjectMeta: scale.ObjectMeta,
|
||||||
|
Spec: autoscalingv1.ScaleSpec{
|
||||||
|
Replicas: replicas,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
update.ResourceVersion = "1"
|
||||||
|
|
||||||
|
_, _, err = storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expecting an update conflict error")
|
||||||
|
}
|
||||||
|
if !errors.IsConflict(err) {
|
||||||
|
t.Fatalf("unexpected error, expecting an update conflict but got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func setSpecReplicas(u *unstructured.Unstructured, replicas int64) {
|
func setSpecReplicas(u *unstructured.Unstructured, replicas int64) {
|
||||||
setNestedField(u, replicas, "spec", "replicas")
|
setNestedField(u, replicas, "spec", "replicas")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user