Use ScaleHandler for all scalable resources

This commit is contained in:
Antoine Pelisse 2021-03-11 11:05:05 -08:00 committed by Andrea Nodari
parent a9ea98b3b9
commit 816e80206c
13 changed files with 647 additions and 326 deletions

View File

@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
@ -50,6 +51,11 @@ type ReplicaSetStorage struct {
Scale *ScaleREST
}
// maps a group version to the replicas path in a replicaset object
var replicasPathInReplicaSet = fieldmanager.ResourcePathMappings{
schema.GroupVersion{Group: "apps", Version: "v1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
}
// NewStorage returns new instance of ReplicaSetStorage.
func NewStorage(optsGetter generic.RESTOptionsGetter) (ReplicaSetStorage, error) {
replicaSetRest, replicaSetStatusRest, err := NewREST(optsGetter)
@ -279,12 +285,24 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, errors.NewNotFound(apps.Resource("replicasets/scale"), i.name)
}
managedFieldsHandler := fieldmanager.NewScaleHandler(
replicaset.ManagedFields,
schema.GroupVersion{Group: "apps", Version: "v1"},
replicasPathInReplicaSet,
)
// replicaset -> old scale
oldScale, err := scaleFromReplicaSet(replicaset)
if err != nil {
return nil, err
}
scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields
// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
if err != nil {
@ -315,5 +333,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// move replicas/resourceVersion fields to object and return
replicaset.Spec.Replicas = scale.Spec.Replicas
replicaset.ResourceVersion = scale.ResourceVersion
updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
replicaset.ManagedFields = updatedEntries
return replicaset, nil
}

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
@ -47,6 +48,11 @@ type StatefulSetStorage struct {
Scale *ScaleREST
}
// maps a group version to the replicas path in a statefulset object
var replicasPathInStatefulSet = fieldmanager.ResourcePathMappings{
schema.GroupVersion{Group: "apps", Version: "v1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
}
// NewStorage returns new instance of StatefulSetStorage.
func NewStorage(optsGetter generic.RESTOptionsGetter) (StatefulSetStorage, error) {
statefulSetRest, statefulSetStatusRest, err := NewREST(optsGetter)
@ -265,11 +271,22 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, errors.NewNotFound(apps.Resource("statefulsets/scale"), i.name)
}
managedFieldsHandler := fieldmanager.NewScaleHandler(
statefulset.ManagedFields,
schema.GroupVersion{Group: "apps", Version: "v1"},
replicasPathInStatefulSet,
)
// statefulset -> old scale
oldScale, err := scaleFromStatefulSet(statefulset)
if err != nil {
return nil, err
}
scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields
// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
@ -301,5 +318,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// move replicas/resourceVersion fields to object and return
statefulset.Spec.Replicas = scale.Spec.Replicas
statefulset.ResourceVersion = scale.ResourceVersion
updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
statefulset.ManagedFields = updatedEntries
return statefulset, nil
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
@ -49,6 +50,11 @@ type ControllerStorage struct {
Scale *ScaleREST
}
// maps a group version to the replicas path in a deployment object
var replicasPathInReplicationController = fieldmanager.ResourcePathMappings{
schema.GroupVersion{Group: "", Version: "v1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
}
func NewStorage(optsGetter generic.RESTOptionsGetter) (ControllerStorage, error) {
controllerREST, statusREST, err := NewREST(optsGetter)
if err != nil {
@ -239,8 +245,19 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, errors.NewNotFound(api.Resource("replicationcontrollers/scale"), i.name)
}
managedFieldsHandler := fieldmanager.NewScaleHandler(
replicationcontroller.ManagedFields,
schema.GroupVersion{Group: "", Version: "v1"},
replicasPathInReplicationController,
)
// replicationcontroller -> old scale
oldScale := scaleFromRC(replicationcontroller)
scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields
// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
@ -272,5 +289,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// move replicas/resourceVersion fields to object and return
replicationcontroller.Spec.Replicas = scale.Spec.Replicas
replicationcontroller.ResourceVersion = scale.ResourceVersion
updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
replicationcontroller.ManagedFields = updatedEntries
return replicationcontroller, nil
}

View File

@ -165,6 +165,13 @@ func (c *crConverter) ConvertToVersion(in runtime.Object, target runtime.GroupVe
// TODO: should this be a typed error?
return nil, fmt.Errorf("%v is unstructured and is not suitable for converting to %q", fromGVK.String(), target)
}
// Special-case typed scale conversion if this custom resource supports a scale endpoint
if c.convertScale {
if _, isInScale := in.(*autoscalingv1.Scale); isInScale {
return typedscheme.Scheme.ConvertToVersion(in, target)
}
}
if !c.validVersions[toGVK.GroupVersion()] {
return nil, fmt.Errorf("request to convert CR to an invalid group/version: %s", toGVK.GroupVersion().String())
}

View File

@ -699,6 +699,27 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
if err != nil {
return nil, err
}
// Create replicasPathInCustomResource
replicasPathInCustomResource := fieldmanager.ResourcePathMappings{}
for _, v := range crd.Spec.Versions {
subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR subresources")
}
if subresources == nil || subresources.Scale == nil {
continue
}
path := fieldpath.Path{}
splitReplicasPath := strings.Split(strings.TrimPrefix(subresources.Scale.SpecReplicasPath, "."), ".")
for _, element := range splitReplicasPath {
s := element
path = append(path, fieldpath.PathElement{FieldName: &s})
}
replicasPathInCustomResource[schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}.String()] = path
}
for _, v := range crd.Spec.Versions {
// In addition to Unstructured objects (Custom Resources), we also may sometimes need to
// decode unversioned Options objects, so we delegate to parameterScheme for such types.
@ -803,6 +824,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
},
crd.Status.AcceptedNames.Categories,
table,
replicasPathInCustomResource,
)
selfLinkPrefix := ""
@ -892,8 +914,19 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
SelfLinkPathPrefix: selfLinkPrefix,
SelfLinkPathSuffix: "/scale",
}
// TODO(issues.k8s.io/82046): We can't effectively track ownership on scale requests yet.
scaleScope.FieldManager = nil
if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) && subresources != nil && subresources.Scale != nil {
scaleScope, err = scopeWithFieldManager(
typeConverter,
scaleScope,
nil,
"scale",
)
if err != nil {
return nil, err
}
}
scaleScopes[v.Name] = &scaleScope
// override status subresource values

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
@ -41,7 +42,7 @@ type CustomResourceStorage struct {
Scale *ScaleREST
}
func NewStorage(resource schema.GroupResource, kind, listKind schema.GroupVersionKind, strategy customResourceStrategy, optsGetter generic.RESTOptionsGetter, categories []string, tableConvertor rest.TableConvertor) CustomResourceStorage {
func NewStorage(resource schema.GroupResource, kind, listKind schema.GroupVersionKind, strategy customResourceStrategy, optsGetter generic.RESTOptionsGetter, categories []string, tableConvertor rest.TableConvertor, replicasPathMapping fieldmanager.ResourcePathMappings) CustomResourceStorage {
customResourceREST, customResourceStatusREST := newREST(resource, kind, listKind, strategy, optsGetter, categories, tableConvertor)
s := CustomResourceStorage{
@ -63,6 +64,8 @@ func NewStorage(resource schema.GroupResource, kind, listKind schema.GroupVersio
specReplicasPath: scale.SpecReplicasPath,
statusReplicasPath: scale.StatusReplicasPath,
labelSelectorPath: labelSelectorPath,
parentGV: kind.GroupVersion(),
replicasPathMapping: replicasPathMapping,
}
}
@ -213,6 +216,8 @@ type ScaleREST struct {
specReplicasPath string
statusReplicasPath string
labelSelectorPath string
parentGV schema.GroupVersion
replicasPathMapping fieldmanager.ResourcePathMappings
}
// ScaleREST implements Patcher
@ -251,6 +256,8 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
specReplicasPath: r.specReplicasPath,
labelSelectorPath: r.labelSelectorPath,
statusReplicasPath: r.statusReplicasPath,
parentGV: r.parentGV,
replicasPathMapping: r.replicasPathMapping,
}
obj, _, err := r.store.Update(
@ -299,19 +306,22 @@ func toScaleUpdateValidation(f rest.ValidateObjectUpdateFunc, specReplicasPath,
}
}
// Split the path per period, ignoring the leading period.
func splitReplicasPath(replicasPath string) []string {
return strings.Split(strings.TrimPrefix(replicasPath, "."), ".")
}
// scaleFromCustomResource returns a scale subresource for a customresource and a bool signalling wether
// the specReplicas value was found.
func scaleFromCustomResource(cr *unstructured.Unstructured, specReplicasPath, statusReplicasPath, labelSelectorPath string) (*autoscalingv1.Scale, bool, error) {
specReplicasPath = strings.TrimPrefix(specReplicasPath, ".") // ignore leading period
specReplicas, foundSpecReplicas, err := unstructured.NestedInt64(cr.UnstructuredContent(), strings.Split(specReplicasPath, ".")...)
specReplicas, foundSpecReplicas, err := unstructured.NestedInt64(cr.UnstructuredContent(), splitReplicasPath(specReplicasPath)...)
if err != nil {
return nil, false, err
} else if !foundSpecReplicas {
specReplicas = 0
}
statusReplicasPath = strings.TrimPrefix(statusReplicasPath, ".") // ignore leading period
statusReplicas, found, err := unstructured.NestedInt64(cr.UnstructuredContent(), strings.Split(statusReplicasPath, ".")...)
statusReplicas, found, err := unstructured.NestedInt64(cr.UnstructuredContent(), splitReplicasPath(statusReplicasPath)...)
if err != nil {
return nil, false, err
} else if !found {
@ -320,8 +330,7 @@ func scaleFromCustomResource(cr *unstructured.Unstructured, specReplicasPath, st
var labelSelector string
if len(labelSelectorPath) > 0 {
labelSelectorPath = strings.TrimPrefix(labelSelectorPath, ".") // ignore leading period
labelSelector, _, err = unstructured.NestedString(cr.UnstructuredContent(), strings.Split(labelSelectorPath, ".")...)
labelSelector, _, err = unstructured.NestedString(cr.UnstructuredContent(), splitReplicasPath(labelSelectorPath)...)
if err != nil {
return nil, false, err
}
@ -357,6 +366,8 @@ type scaleUpdatedObjectInfo struct {
specReplicasPath string
statusReplicasPath string
labelSelectorPath string
parentGV schema.GroupVersion
replicasPathMapping fieldmanager.ResourcePathMappings
}
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
@ -366,6 +377,13 @@ func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
cr := oldObj.DeepCopyObject().(*unstructured.Unstructured)
const invalidSpecReplicas = -2147483648 // smallest int32
managedFieldsHandler := fieldmanager.NewScaleHandler(
cr.GetManagedFields(),
i.parentGV,
i.replicasPathMapping,
)
oldScale, replicasFound, err := scaleFromCustomResource(cr, i.specReplicasPath, i.statusReplicasPath, i.labelSelectorPath)
if err != nil {
return nil, err
@ -374,6 +392,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
oldScale.Spec.Replicas = invalidSpecReplicas // signal that this was not set before
}
scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields
obj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
if err != nil {
return nil, err
@ -391,9 +415,7 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, apierrors.NewBadRequest(fmt.Sprintf("the spec replicas field %q cannot be empty", i.specReplicasPath))
}
specReplicasPath := strings.TrimPrefix(i.specReplicasPath, ".") // ignore leading period
if err := unstructured.SetNestedField(cr.Object, int64(scale.Spec.Replicas), strings.Split(specReplicasPath, ".")...); err != nil {
if err := unstructured.SetNestedField(cr.Object, int64(scale.Spec.Replicas), splitReplicasPath(i.specReplicasPath)...); err != nil {
return nil, err
}
if len(scale.ResourceVersion) != 0 {
@ -401,5 +423,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// Set that precondition and return any conflict errors to the client.
cr.SetResourceVersion(scale.ResourceVersion)
}
updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
cr.SetManagedFields(updatedEntries)
return cr, nil
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
registrytest "k8s.io/apiserver/pkg/registry/generic/testing"
@ -106,6 +107,7 @@ func newStorage(t *testing.T) (customresource.CustomResourceStorage, *etcd3testi
restOptions,
[]string{"all"},
table,
fieldmanager.ResourcePathMappings{},
)
return storage, server

View File

@ -33,8 +33,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/features"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
featuregatetesting "k8s.io/component-base/featuregate/testing"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
@ -114,6 +116,26 @@ func NewNoxuSubresourceInstance(namespace, name, version string) *unstructured.U
}
}
func NewNoxuSubresourceInstanceWithReplicas(namespace, name, version, replicasField string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": fmt.Sprintf("mygroup.example.com/%s", version),
"kind": "WishIHadChosenNoxu",
"metadata": map[string]interface{}{
"namespace": namespace,
"name": name,
},
"spec": map[string]interface{}{
"num": int64(10),
replicasField: int64(3),
},
"status": map[string]interface{}{
"replicas": int64(7),
},
},
}
}
func TestStatusSubresource(t *testing.T) {
tearDown, apiExtensionClient, dynamicClient, err := fixtures.StartDefaultServerWithClients(t)
if err != nil {
@ -373,6 +395,87 @@ func TestScaleSubresource(t *testing.T) {
}
}
func TestApplyScaleSubresource(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()
tearDown, config, _, err := fixtures.StartDefaultServer(t)
if err != nil {
t.Fatal(err)
}
defer tearDown()
apiExtensionClient, err := clientset.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
noxuDefinition := NewNoxuSubresourcesCRDs(apiextensionsv1.NamespaceScoped)[0]
subresources, err := getSubresourcesForVersion(noxuDefinition, "v1beta1")
if err != nil {
t.Fatal(err)
}
subresources.Scale.SpecReplicasPath = ".spec.replicas[0]"
noxuDefinition, err = fixtures.CreateNewV1CustomResourceDefinition(noxuDefinition, apiExtensionClient, dynamicClient)
if err != nil {
t.Fatal(err)
}
// Create a client for it.
ns := "not-the-default"
noxuResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
obj := NewNoxuSubresourceInstanceWithReplicas(ns, "foo", "v1beta1", "replicas[0]")
obj, err = noxuResourceClient.Create(context.TODO(), obj, metav1.CreateOptions{})
if err != nil {
t.Logf("%#v", obj)
t.Fatalf("Failed to create CustomResource: %v", err)
}
noxuResourceClient = newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1")
patch := `{"metadata": {"name": "foo"}, "kind": "WishIHadChosenNoxu", "apiVersion": "mygroup.example.com/v1", "spec": {"replicas": 3}}`
obj, err = noxuResourceClient.Patch(context.TODO(), "foo", types.ApplyPatchType, []byte(patch), metav1.PatchOptions{FieldManager: "applier"})
if err != nil {
t.Logf("%#v", obj)
t.Fatalf("Failed to Apply CustomResource: %v", err)
}
if got := len(obj.GetManagedFields()); got != 2 {
t.Fatalf("Expected 2 managed fields, got %v: %v", got, obj.GetManagedFields())
}
_, err = noxuResourceClient.Patch(context.TODO(), "foo", types.MergePatchType, []byte(`{"spec": {"replicas": 5}}`), metav1.PatchOptions{FieldManager: "scaler"}, "scale")
if err != nil {
t.Fatal(err)
}
obj, err = noxuResourceClient.Get(context.TODO(), "foo", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to Get CustomResource: %v", err)
}
// Managed fields should have 3 entries: one for scale, one for spec, and one for the rest of the fields
managedFields := obj.GetManagedFields()
if len(managedFields) != 3 {
t.Fatalf("Expected 3 managed fields, got %v: %v", len(managedFields), obj.GetManagedFields())
}
specEntry := managedFields[0]
if specEntry.Manager != "applier" || specEntry.APIVersion != "mygroup.example.com/v1" || specEntry.Operation != "Apply" || string(specEntry.FieldsV1.Raw) != `{"f:spec":{}}` || specEntry.Subresource != "" {
t.Fatalf("Unexpected entry: %v", specEntry)
}
scaleEntry := managedFields[1]
if scaleEntry.Manager != "scaler" || scaleEntry.APIVersion != "mygroup.example.com/v1" || scaleEntry.Operation != "Update" || string(scaleEntry.FieldsV1.Raw) != `{"f:spec":{"f:replicas":{}}}` || scaleEntry.Subresource != "scale" {
t.Fatalf("Unexpected entry: %v", scaleEntry)
}
restEntry := managedFields[2]
if restEntry.Manager != "integration.test" || restEntry.APIVersion != "mygroup.example.com/v1beta1" {
t.Fatalf("Unexpected entry: %v", restEntry)
}
}
func TestValidationSchemaWithStatus(t *testing.T) {
tearDown, config, _, err := fixtures.StartDefaultServer(t)
if err != nil {

View File

@ -76,7 +76,7 @@ func NewCRDStructuredMergeManager(typeConverter TypeConverter, objectConverter r
func (f *structuredMergeManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
return nil, nil, fmt.Errorf("failed to convert new object to proper version: %v (from %v to %v)", err, newObj.GetObjectKind().GroupVersionKind(), f.groupVersion)
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {

View File

@ -3424,300 +3424,3 @@ func TestSubresourceField(t *testing.T) {
t.Fatalf(`Unexpected entry, got: %v`, managedFields[1])
}
}
func TestApplyOnScaleDeployment(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()
_, client, closeFn := setup(t)
defer closeFn()
validDeployment := []byte(`{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "deployment"
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"app": "nginx"
}
},
"template": {
"metadata": {
"labels": {
"app": "nginx"
}
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest"
}]
}
}
}
}`)
// Create deployment
_, err := client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
Name("deployment").
Param("fieldManager", "apply_test").
Body(validDeployment).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Failed to create object using Apply patch: %v", err)
}
deployment, err := client.AppsV1().Deployments("default").Get(context.TODO(), "deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
if *deployment.Spec.Replicas != 1 {
t.Fatalf("Expected replicas to be 1, but got: %d", *deployment.Spec.Replicas)
}
// Call scale subresource to update replicas
_, err = client.CoreV1().RESTClient().
Patch(types.MergePatchType).
AbsPath("/apis/apps/v1").
Name("deployment").
Resource("deployments").
SubResource("scale").
Namespace("default").
Param("fieldManager", "scale_test").
Body([]byte(`{"spec":{"replicas": 5}}`)).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Error updating scale subresource: %v ", err)
}
deployment, err = client.AppsV1().Deployments("default").Get(context.TODO(), "deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
if *deployment.Spec.Replicas != 5 {
t.Fatalf("Expected replicas to be 5, but got: %d", *deployment.Spec.Replicas)
}
assertReplicasOwnership(t, (*deployment).GetManagedFields(), "scale_test")
// Re-apply the original object, it should fail with conflict because replicas have changed
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
Name("deployment").
Param("fieldManager", "apply_test").
Body(validDeployment).
Do(context.TODO()).
Get()
if !apierrors.IsConflict(err) {
t.Fatalf("Expected conflict error but got: %v", err)
}
// Re-apply forcing the changes should succeed
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
Name("deployment").
Param("fieldManager", "apply_test").
Param("force", "true").
Body(validDeployment).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Error updating deployment: %v ", err)
}
deployment, err = client.AppsV1().Deployments("default").Get(context.TODO(), "deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
if *deployment.Spec.Replicas != 1 {
t.Fatalf("Expected replicas to be 1, but got: %d", *deployment.Spec.Replicas)
}
assertReplicasOwnership(t, (*deployment).GetManagedFields(), "apply_test")
// Run "Apply" with a scale object with a different number of replicas. It should generate a conflict.
_, err = client.CoreV1().RESTClient().
Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
SubResource("scale").
Name("deployment").
Param("fieldManager", "apply_scale").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"deployment","namespace":"default"},"spec":{"replicas":17}}`)).
Do(context.TODO()).
Get()
if !apierrors.IsConflict(err) {
t.Fatalf("Expected conflict error but got: %v", err)
}
if !strings.Contains(err.Error(), "apply_test") {
t.Fatalf("Expected conflict with `apply_test` manager but got: %v", err)
}
// Same as before but force. Only the new manager should own .spec.replicas
_, err = client.CoreV1().RESTClient().
Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
SubResource("scale").
Name("deployment").
Param("fieldManager", "apply_scale").
Param("force", "true").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"deployment","namespace":"default"},"spec":{"replicas":17}}`)).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Error updating deployment: %v ", err)
}
deployment, err = client.AppsV1().Deployments("default").Get(context.TODO(), "deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
if *deployment.Spec.Replicas != 17 {
t.Fatalf("Expected to replicas to be 17, but got: %d", *deployment.Spec.Replicas)
}
assertReplicasOwnership(t, (*deployment).GetManagedFields(), "apply_scale")
// Replace scale object
_, err = client.CoreV1().RESTClient().
Put().
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
SubResource("scale").
Name("deployment").
Param("fieldManager", "replace_test").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"deployment","namespace":"default"},"spec":{"replicas":7}}`)).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Error updating deployment: %v ", err)
}
deployment, err = client.AppsV1().Deployments("default").Get(context.TODO(), "deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
if *deployment.Spec.Replicas != 7 {
t.Fatalf("Expected to replicas to be 7, but got: %d", *deployment.Spec.Replicas)
}
assertReplicasOwnership(t, (*deployment).GetManagedFields(), "replace_test")
// Apply the same number of replicas, both managers should own the field
_, err = client.CoreV1().RESTClient().
Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
SubResource("scale").
Name("deployment").
Param("fieldManager", "co_owning_test").
Param("force", "true").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"deployment","namespace":"default"},"spec":{"replicas":7}}`)).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Error updating deployment: %v ", err)
}
deployment, err = client.AppsV1().Deployments("default").Get(context.TODO(), "deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
if *deployment.Spec.Replicas != 7 {
t.Fatalf("Expected to replicas to be 7, but got: %d", *deployment.Spec.Replicas)
}
assertReplicasOwnership(t, (*deployment).GetManagedFields(), "replace_test", "co_owning_test")
// Scaling again should make this manager the only owner of replicas
_, err = client.CoreV1().RESTClient().
Patch(types.MergePatchType).
AbsPath("/apis/apps/v1").
Name("deployment").
Resource("deployments").
SubResource("scale").
Namespace("default").
Param("fieldManager", "scale_test").
Body([]byte(`{"spec":{"replicas": 5}}`)).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Error updating scale subresource: %v ", err)
}
deployment, err = client.AppsV1().Deployments("default").Get(context.TODO(), "deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
if *deployment.Spec.Replicas != 5 {
t.Fatalf("Expected replicas to be 5, but got: %d", *deployment.Spec.Replicas)
}
assertReplicasOwnership(t, (*deployment).GetManagedFields(), "scale_test")
}
func assertReplicasOwnership(t *testing.T, managedFields []metav1.ManagedFieldsEntry, fieldManagers ...string) {
t.Helper()
seen := make(map[string]bool)
for _, m := range fieldManagers {
seen[m] = false
}
for _, managedField := range managedFields {
var entryJSON map[string]interface{}
if err := json.Unmarshal(managedField.FieldsV1.Raw, &entryJSON); err != nil {
t.Fatalf("failed to read into json")
}
spec, ok := entryJSON["f:spec"].(map[string]interface{})
if !ok {
// continue with the next managedField, as we this field does not hold the spec entry
continue
}
if _, ok := spec["f:replicas"]; !ok {
// continue with the next managedField, as we this field does not hold the spec.replicas entry
continue
}
// check if the manager is one of the ones we expect
if _, ok := seen[managedField.Manager]; !ok {
t.Fatalf("Unexpected field manager, found %q, expected to be in: %v", managedField.Manager, seen)
}
seen[managedField.Manager] = true
}
var missingManagers []string
for manager, managerSeen := range seen {
if !managerSeen {
missingManagers = append(missingManagers, manager)
}
}
if len(missingManagers) > 0 {
t.Fatalf("replicas fields should be owned by %v", missingManagers)
}
}

View File

@ -0,0 +1,371 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"context"
"encoding/json"
"fmt"
"path"
"strings"
"testing"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
type scaleTest struct {
kind string
resource string
path string
validObj string
}
func TestScaleAllResources(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()
_, client, closeFn := setup(t)
defer closeFn()
tests := []scaleTest{
{
kind: "Deployment",
resource: "deployments",
path: "/apis/apps/v1",
validObj: validAppsV1("Deployment"),
},
{
kind: "StatefulSet",
resource: "statefulsets",
path: "/apis/apps/v1",
validObj: validAppsV1("StatefulSet"),
},
{
kind: "ReplicaSet",
resource: "replicasets",
path: "/apis/apps/v1",
validObj: validAppsV1("ReplicaSet"),
},
{
kind: "ReplicationController",
resource: "replicationcontrollers",
path: "/api/v1",
validObj: validV1ReplicationController(),
},
}
for _, test := range tests {
t.Run(test.kind, func(t *testing.T) {
validObject := []byte(test.validObj)
// Create the object
_, err := client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
Name("test").
Param("fieldManager", "apply_test").
Body(validObject).
Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Failed to create object using apply: %v", err)
}
obj := retrieveObject(t, client, test)
assertReplicasValue(t, obj, 1)
assertReplicasOwnership(t, obj, "apply_test")
// Call scale subresource to update replicas
_, err = client.CoreV1().RESTClient().
Patch(types.MergePatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
Name("test").
SubResource("scale").
Param("fieldManager", "scale_test").
Body([]byte(`{"spec":{"replicas": 5}}`)).
Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Failed to scale object: %v", err)
}
obj = retrieveObject(t, client, test)
assertReplicasValue(t, obj, 5)
assertReplicasOwnership(t, obj, "scale_test")
// Re-apply the original object, it should fail with conflict because replicas have changed
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
Name("test").
Param("fieldManager", "apply_test").
Body(validObject).
Do(context.TODO()).Get()
if !apierrors.IsConflict(err) {
t.Fatalf("Expected conflict when re-applying the original object, but got: %v", err)
}
// Re-apply forcing the changes should succeed
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
Name("test").
Param("fieldManager", "apply_test").
Param("force", "true").
Body(validObject).
Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Error force-updating: %v", err)
}
obj = retrieveObject(t, client, test)
assertReplicasValue(t, obj, 1)
assertReplicasOwnership(t, obj, "apply_test")
// Run "Apply" with a scale object with a different number of replicas. It should generate a conflict.
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
SubResource("scale").
Name("test").
Param("fieldManager", "apply_scale").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"test","namespace":"default"},"spec":{"replicas":17}}`)).
Do(context.TODO()).Get()
if !apierrors.IsConflict(err) {
t.Fatalf("Expected conflict error but got: %v", err)
}
if !strings.Contains(err.Error(), "apply_test") {
t.Fatalf("Expected conflict with `apply_test` manager when but got: %v", err)
}
// Same as before but force. Only the new manager should own .spec.replicas
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
SubResource("scale").
Name("test").
Param("fieldManager", "apply_scale").
Param("force", "true").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"test","namespace":"default"},"spec":{"replicas":17}}`)).
Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Error updating object by applying scale and forcing: %v ", err)
}
obj = retrieveObject(t, client, test)
assertReplicasValue(t, obj, 17)
assertReplicasOwnership(t, obj, "apply_scale")
// Replace scale object
_, err = client.CoreV1().RESTClient().Put().
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
SubResource("scale").
Name("test").
Param("fieldManager", "replace_test").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"test","namespace":"default"},"spec":{"replicas":7}}`)).
Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Error replacing object: %v", err)
}
obj = retrieveObject(t, client, test)
assertReplicasValue(t, obj, 7)
assertReplicasOwnership(t, obj, "replace_test")
// Apply the same number of replicas, both managers should own the field
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
SubResource("scale").
Name("test").
Param("fieldManager", "co_owning_test").
Body([]byte(`{"kind":"Scale","apiVersion":"autoscaling/v1","metadata":{"name":"test","namespace":"default"},"spec":{"replicas":7}}`)).
Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Error updating object: %v", err)
}
obj = retrieveObject(t, client, test)
assertReplicasValue(t, obj, 7)
assertReplicasOwnership(t, obj, "replace_test", "co_owning_test")
// Scaling again should make this manager the only owner of replicas
_, err = client.CoreV1().RESTClient().Patch(types.MergePatchType).
AbsPath(test.path).
Namespace("default").
Resource(test.resource).
SubResource("scale").
Name("test").
Param("fieldManager", "scale_test").
Body([]byte(`{"spec":{"replicas": 5}}`)).
Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Error scaling object: %v", err)
}
obj = retrieveObject(t, client, test)
assertReplicasValue(t, obj, 5)
assertReplicasOwnership(t, obj, "scale_test")
})
}
}
func validAppsV1(kind string) string {
return fmt.Sprintf(`{
"apiVersion": "apps/v1",
"kind": "%s",
"metadata": {
"name": "test"
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"app": "nginx"
}
},
"template": {
"metadata": {
"labels": {
"app": "nginx"
}
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest"
}]
}
}
}
}`, kind)
}
func validV1ReplicationController() string {
return `{
"apiVersion": "v1",
"kind": "ReplicationController",
"metadata": {
"name": "test"
},
"spec": {
"replicas": 1,
"selector": {
"app": "nginx"
},
"template": {
"metadata": {
"labels": {
"app": "nginx"
}
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest"
}]
}
}
}
}`
}
func retrieveObject(t *testing.T, client clientset.Interface, test scaleTest) *unstructured.Unstructured {
t.Helper()
urlPath := path.Join(test.path, "namespaces", "default", test.resource, "test")
bytes, err := client.CoreV1().RESTClient().Get().AbsPath(urlPath).DoRaw(context.TODO())
if err != nil {
t.Fatalf("Failed to retrieve object: %v", err)
}
obj := &unstructured.Unstructured{}
if err := json.Unmarshal(bytes, obj); err != nil {
t.Fatalf("Error unmarshalling the retrieved object: %v", err)
}
return obj
}
func assertReplicasValue(t *testing.T, obj *unstructured.Unstructured, value int) {
actualValue, found, err := unstructured.NestedInt64(obj.Object, "spec", "replicas")
if err != nil {
t.Fatalf("Error when retriving replicas field: %v", err)
}
if !found {
t.Fatalf("Replicas field not found")
}
if int(actualValue) != value {
t.Fatalf("Expected replicas field value to be %d but got %d", value, actualValue)
}
}
func assertReplicasOwnership(t *testing.T, obj *unstructured.Unstructured, fieldManagers ...string) {
t.Helper()
accessor, err := meta.Accessor(obj)
if err != nil {
t.Fatalf("Failed to get meta accessor for object: %v", err)
}
seen := make(map[string]bool)
for _, m := range fieldManagers {
seen[m] = false
}
for _, managedField := range accessor.GetManagedFields() {
var entryJSON map[string]interface{}
if err := json.Unmarshal(managedField.FieldsV1.Raw, &entryJSON); err != nil {
t.Fatalf("failed to read into json")
}
spec, ok := entryJSON["f:spec"].(map[string]interface{})
if !ok {
// continue with the next managedField, as we this field does not hold the spec entry
continue
}
if _, ok := spec["f:replicas"]; !ok {
// continue with the next managedField, as we this field does not hold the spec.replicas entry
continue
}
// check if the manager is one of the ones we expect
if _, ok := seen[managedField.Manager]; !ok {
t.Fatalf("Unexpected field manager, found %q, expected to be in: %v", managedField.Manager, seen)
}
seen[managedField.Manager] = true
}
var missingManagers []string
for manager, managerSeen := range seen {
if !managerSeen {
missingManagers = append(missingManagers, manager)
}
}
if len(missingManagers) > 0 {
t.Fatalf("replicas fields should be owned by %v", missingManagers)
}
}