diff --git a/pkg/kubectl/BUILD b/pkg/kubectl/BUILD index 28787c947e5..f6cd4a553f9 100644 --- a/pkg/kubectl/BUILD +++ b/pkg/kubectl/BUILD @@ -45,7 +45,6 @@ go_test( "//pkg/apis/extensions:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", - "//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/kubectl/util:go_default_library", "//pkg/util/pointer:go_default_library", @@ -137,6 +136,7 @@ go_library( "//pkg/controller/statefulset:go_default_library", "//pkg/credentialprovider:go_default_library", "//pkg/kubectl/apps:go_default_library", + "//pkg/kubectl/cmd/scalejob:go_default_library", "//pkg/kubectl/resource:go_default_library", "//pkg/kubectl/util:go_default_library", "//pkg/kubectl/util/hash:go_default_library", diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index ab1708b29b7..c77ee065709 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -75,6 +75,7 @@ go_library( "//pkg/apis/core:go_default_library", "//pkg/apis/core/validation:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/kubectl:go_default_library", "//pkg/kubectl/apply/parse:go_default_library", @@ -83,6 +84,7 @@ go_library( "//pkg/kubectl/cmd/config:go_default_library", "//pkg/kubectl/cmd/resource:go_default_library", "//pkg/kubectl/cmd/rollout:go_default_library", + "//pkg/kubectl/cmd/scalejob:go_default_library", "//pkg/kubectl/cmd/set:go_default_library", "//pkg/kubectl/cmd/templates:go_default_library", "//pkg/kubectl/cmd/util:go_default_library", @@ -287,6 +289,7 @@ filegroup( "//pkg/kubectl/cmd/config:all-srcs", "//pkg/kubectl/cmd/resource:all-srcs", "//pkg/kubectl/cmd/rollout:all-srcs", + "//pkg/kubectl/cmd/scalejob:all-srcs", "//pkg/kubectl/cmd/set:all-srcs", "//pkg/kubectl/cmd/templates:all-srcs", "//pkg/kubectl/cmd/testdata/edit:all-srcs", diff --git a/pkg/kubectl/cmd/scale.go b/pkg/kubectl/cmd/scale.go index c7d42f022a3..c8cd0dcf343 100644 --- a/pkg/kubectl/cmd/scale.go +++ b/pkg/kubectl/cmd/scale.go @@ -22,7 +22,9 @@ import ( "github.com/spf13/cobra" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" "k8s.io/kubernetes/pkg/kubectl" + "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" @@ -60,7 +62,7 @@ var ( func NewCmdScale(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command { options := &resource.FilenameOptions{} - validArgs := []string{"deployment", "replicaset", "replicationcontroller", "job", "statefulset"} + validArgs := []string{"deployment", "replicaset", "replicationcontroller", "statefulset"} argAliases := kubectl.ResourceAliases(validArgs) cmd := &cobra.Command{ @@ -139,6 +141,15 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args return fmt.Errorf("cannot use --resource-version with multiple resources") } + currentSize := cmdutil.GetFlagInt(cmd, "current-replicas") + precondition := &kubectl.ScalePrecondition{Size: currentSize, ResourceVersion: resourceVersion} + retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout) + + var waitForReplicas *kubectl.RetryParams + if timeout := cmdutil.GetFlagDuration(cmd, "timeout"); timeout != 0 { + waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout) + } + counter := 0 err = r.Visit(func(info *resource.Info, err error) error { if err != nil { @@ -147,26 +158,29 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args mapping := info.ResourceMapping() if mapping.Resource == "jobs" { + // go down the legacy jobs path. This can be removed in 3.14 For now, contain it. fmt.Fprintf(errOut, "%s scale job is DEPRECATED and will be removed in a future version.\n", cmd.Parent().Name()) + + clientset, err := f.ClientSet() + if err != nil { + return err + } + if err := ScaleJob(info, clientset.Batch(), uint(count), precondition, retry, waitForReplicas); err != nil { + return err + } + + } else { + scaler, err := f.Scaler() + if err != nil { + return err + } + + gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource) + if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas, gvk.GroupResource()); err != nil { + return err + } } - scaler, err := f.Scaler(mapping) - if err != nil { - return err - } - - currentSize := cmdutil.GetFlagInt(cmd, "current-replicas") - precondition := &kubectl.ScalePrecondition{Size: currentSize, ResourceVersion: resourceVersion} - retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout) - - var waitForReplicas *kubectl.RetryParams - if timeout := cmdutil.GetFlagDuration(cmd, "timeout"); timeout != 0 { - waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout) - } - - if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas); err != nil { - return err - } if cmdutil.ShouldRecord(cmd, info) { patchBytes, patchType, err := cmdutil.ChangeResourcePatch(info, f.Command(cmd, true)) if err != nil { @@ -195,3 +209,23 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args } return nil } + +func ScaleJob(info *resource.Info, jobsClient batchclient.JobsGetter, count uint, preconditions *kubectl.ScalePrecondition, retry, waitForReplicas *kubectl.RetryParams) error { + scaler := scalejob.JobPsuedoScaler{ + JobsClient: jobsClient, + } + var jobPreconditions *scalejob.ScalePrecondition + if preconditions != nil { + jobPreconditions = &scalejob.ScalePrecondition{Size: preconditions.Size, ResourceVersion: preconditions.ResourceVersion} + } + var jobRetry *scalejob.RetryParams + if retry != nil { + jobRetry = &scalejob.RetryParams{Interval: retry.Interval, Timeout: retry.Timeout} + } + var jobWaitForReplicas *scalejob.RetryParams + if waitForReplicas != nil { + jobWaitForReplicas = &scalejob.RetryParams{Interval: waitForReplicas.Interval, Timeout: waitForReplicas.Timeout} + } + + return scaler.Scale(info.Namespace, info.Name, count, jobPreconditions, jobRetry, jobWaitForReplicas) +} diff --git a/pkg/kubectl/cmd/scalejob/BUILD b/pkg/kubectl/cmd/scalejob/BUILD new file mode 100644 index 00000000000..f95519c7805 --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/BUILD @@ -0,0 +1,47 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "scalejob.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob", + visibility = ["//visibility:public"], + deps = [ + "//pkg/apis/batch:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["scalejob_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/apis/batch:go_default_library", + "//pkg/apis/core:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubectl/cmd/scalejob/doc.go b/pkg/kubectl/cmd/scalejob/doc.go new file mode 100644 index 00000000000..589fa1a64ad --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package scalejob is deprecated This package contains deprecated functions used to "scale" jobs in a way inconsistent with normal scaling rules +package scalejob diff --git a/pkg/kubectl/cmd/scalejob/scalejob.go b/pkg/kubectl/cmd/scalejob/scalejob.go new file mode 100644 index 00000000000..70264cd9fa3 --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/scalejob.go @@ -0,0 +1,162 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalejob + +import ( + "fmt" + "strconv" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/apis/batch" + + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" +) + +// ScalePrecondition is a deprecated precondition +type ScalePrecondition struct { + Size int + ResourceVersion string +} + +// RetryParams is a deprecated retry struct +type RetryParams struct { + Interval, Timeout time.Duration +} + +// PreconditionError is a deprecated error +type PreconditionError struct { + Precondition string + ExpectedValue string + ActualValue string +} + +func (pe PreconditionError) Error() string { + return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue) +} + +// ScaleCondition is a closure around Scale that facilitates retries via util.wait +func scaleCondition(r *JobPsuedoScaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc { + return func() (bool, error) { + rv, err := r.ScaleSimple(namespace, name, precondition, count) + if updatedResourceVersion != nil { + *updatedResourceVersion = rv + } + // Retry only on update conflicts. + if errors.IsConflict(err) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil + } +} + +// JobPsuedoScaler is a deprecated scale-similar thing that doesn't obey scale semantics +type JobPsuedoScaler struct { + JobsClient batchclient.JobsGetter +} + +// ScaleSimple is responsible for updating job's parallelism. It returns the +// resourceVersion of the job if the update is successful. +func (scaler *JobPsuedoScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { + job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + if preconditions != nil { + if err := validateJob(job, preconditions); err != nil { + return "", err + } + } + parallelism := int32(newSize) + job.Spec.Parallelism = ¶llelism + updatedJob, err := scaler.JobsClient.Jobs(namespace).Update(job) + if err != nil { + return "", err + } + return updatedJob.ObjectMeta.ResourceVersion, nil +} + +// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil), +// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired +// number, which can be less than requested based on job's current progress. +func (scaler *JobPsuedoScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // Make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := scaleCondition(scaler, preconditions, namespace, name, newSize, nil) + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return err + } + err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, jobHasDesiredParallelism(scaler.JobsClient, job)) + if err == wait.ErrWaitTimeout { + return fmt.Errorf("timed out waiting for %q to be synced", name) + } + return err + } + return nil +} + +// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count +// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. +func jobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc { + return func() (bool, error) { + job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + // desired parallelism can be either the exact number, in which case return immediately + if job.Status.Active == *job.Spec.Parallelism { + return true, nil + } + if job.Spec.Completions == nil { + // A job without specified completions needs to wait for Active to reach Parallelism. + return false, nil + } + + // otherwise count successful + progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded + return progress == 0, nil + } +} + +func validateJob(job *batch.Job, precondition *ScalePrecondition) error { + if precondition.Size != -1 && job.Spec.Parallelism == nil { + return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"} + } + if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size { + return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))} + } + if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion} + } + return nil +} diff --git a/pkg/kubectl/cmd/scalejob/scalejob_test.go b/pkg/kubectl/cmd/scalejob/scalejob_test.go new file mode 100644 index 00000000000..d8a22f88efc --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/scalejob_test.go @@ -0,0 +1,292 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalejob + +import ( + "errors" + "testing" + + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testcore "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/apis/batch" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" +) + +type errorJobs struct { + batchclient.JobInterface + conflict bool + invalid bool +} + +func (c *errorJobs) Update(job *batch.Job) (*batch.Job, error) { + switch { + case c.invalid: + return nil, kerrors.NewInvalid(api.Kind(job.Kind), job.Name, nil) + case c.conflict: + return nil, kerrors.NewConflict(api.Resource(job.Kind), job.Name, nil) + } + return nil, errors.New("Job update failure") +} + +func (c *errorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) { + zero := int32(0) + return &batch.Job{ + Spec: batch.JobSpec{ + Parallelism: &zero, + }, + }, nil +} + +type errorJobClient struct { + batchclient.JobsGetter + conflict bool + invalid bool +} + +func (c *errorJobClient) Jobs(namespace string) batchclient.JobInterface { + return &errorJobs{ + JobInterface: c.JobsGetter.Jobs(namespace), + conflict: c.conflict, + invalid: c.invalid, + } +} + +func TestJobScaleRetry(t *testing.T) { + fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true} + scaler := &JobPsuedoScaler{JobsClient: fake} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + namespace := "default" + + scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil) + pass, err := scaleFunc() + if pass != false { + t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) + } + if err != nil { + t.Errorf("Did not expect an error on update failure, got %v", err) + } + preconditions = ScalePrecondition{3, ""} + scaleFunc = scaleCondition(scaler, &preconditions, namespace, name, count, nil) + pass, err = scaleFunc() + if err == nil { + t.Error("Expected error on precondition failure") + } +} + +func job() *batch.Job { + return &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "foo", + }, + } +} + +func TestJobScale(t *testing.T) { + fakeClientset := fake.NewSimpleClientset(job()) + scaler := &JobPsuedoScaler{JobsClient: fakeClientset.Batch()} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + scaler.Scale("default", name, count, &preconditions, nil, nil) + + actions := fakeClientset.Actions() + if len(actions) != 2 { + t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions) + } + if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name { + t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name) + } + if action, ok := actions[1].(testcore.UpdateAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || *action.GetObject().(*batch.Job).Spec.Parallelism != int32(count) { + t.Errorf("unexpected action %v, expected update-job with parallelism = %d", actions[1], count) + } +} + +func TestJobScaleInvalid(t *testing.T) { + fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true} + scaler := &JobPsuedoScaler{JobsClient: fake} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + namespace := "default" + + scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil) + pass, err := scaleFunc() + if pass { + t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) + } + if err == nil { + t.Errorf("Expected error on invalid update failure, got %v", err) + } +} + +func TestJobScaleFailsPreconditions(t *testing.T) { + ten := int32(10) + fake := fake.NewSimpleClientset(&batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }) + scaler := &JobPsuedoScaler{JobsClient: fake.Batch()} + preconditions := ScalePrecondition{2, ""} + count := uint(3) + name := "foo" + scaler.Scale("default", name, count, &preconditions, nil, nil) + + actions := fake.Actions() + if len(actions) != 1 { + t.Errorf("unexpected actions: %v, expected 1 actions (get)", actions) + } + if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name { + t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name) + } +} + +func TestValidateJob(t *testing.T) { + zero, ten, twenty := int32(0), int32(10), int32(20) + tests := []struct { + preconditions ScalePrecondition + job batch.Job + expectError bool + test string + }{ + { + preconditions: ScalePrecondition{-1, ""}, + expectError: false, + test: "defaults", + }, + { + preconditions: ScalePrecondition{-1, ""}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "defaults 2", + }, + { + preconditions: ScalePrecondition{0, ""}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &zero, + }, + }, + expectError: false, + test: "size matches", + }, + { + preconditions: ScalePrecondition{-1, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "resource version matches", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "both match", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &twenty, + }, + }, + expectError: true, + test: "size different", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + }, + expectError: true, + test: "parallelism nil", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "bar", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: true, + test: "version different", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "bar", + }, + Spec: batch.JobSpec{ + Parallelism: &twenty, + }, + }, + expectError: true, + test: "both different", + }, + } + for _, test := range tests { + err := validateJob(&test.job, &test.preconditions) + if err != nil && !test.expectError { + t.Errorf("unexpected error: %v (%s)", err, test.test) + } + if err == nil && test.expectError { + t.Errorf("expected an error: %v (%s)", err, test.test) + } + } +} diff --git a/pkg/kubectl/cmd/util/factory.go b/pkg/kubectl/cmd/util/factory.go index 90bdd717502..7d0c4350eb3 100644 --- a/pkg/kubectl/cmd/util/factory.go +++ b/pkg/kubectl/cmd/util/factory.go @@ -211,7 +211,7 @@ type BuilderFactory interface { // PluginRunner provides the implementation to be used to run cli plugins. PluginRunner() plugins.PluginRunner // Returns a Scaler for changing the size of the specified RESTMapping type or an error - Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) + Scaler() (kubectl.Scaler, error) // ScaleClient gives you back scale getter ScaleClient() (scaleclient.ScalesGetter, error) // Returns a Reaper for gracefully shutting down resources. diff --git a/pkg/kubectl/cmd/util/factory_builder.go b/pkg/kubectl/cmd/util/factory_builder.go index 080e39fc4b2..1d2edd9c506 100644 --- a/pkg/kubectl/cmd/util/factory_builder.go +++ b/pkg/kubectl/cmd/util/factory_builder.go @@ -103,19 +103,13 @@ func (f *ring2Factory) ScaleClient() (scaleclient.ScalesGetter, error) { return scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver), nil } -func (f *ring2Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) { - clientset, err := f.clientAccessFactory.ClientSet() - if err != nil { - return nil, err - } - +func (f *ring2Factory) Scaler() (kubectl.Scaler, error) { scalesGetter, err := f.ScaleClient() if err != nil { return nil, err } - gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource) - return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset.Batch(), scalesGetter, gvk.GroupResource()), nil + return kubectl.NewScaler(scalesGetter), nil } func (f *ring2Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) { diff --git a/pkg/kubectl/conditions.go b/pkg/kubectl/conditions.go index 666b248f416..771ccbc4fb8 100644 --- a/pkg/kubectl/conditions.go +++ b/pkg/kubectl/conditions.go @@ -26,11 +26,9 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/apis/apps" - "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/extensions" appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion" - batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion" ) @@ -99,30 +97,6 @@ func StatefulSetHasDesiredReplicas(ssClient appsclient.StatefulSetsGetter, ss *a } } -// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count -// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. -func JobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc { - return func() (bool, error) { - job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - - // desired parallelism can be either the exact number, in which case return immediately - if job.Status.Active == *job.Spec.Parallelism { - return true, nil - } - if job.Spec.Completions == nil { - // A job without specified completions needs to wait for Active to reach Parallelism. - return false, nil - } - - // otherwise count successful - progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded - return progress == 0, nil - } -} - // DeploymentHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a deployment equals its updated replicas count. // (non-terminated pods that have the desired template spec). diff --git a/pkg/kubectl/delete.go b/pkg/kubectl/delete.go index 310ed8bb029..0d1f8a46ea6 100644 --- a/pkg/kubectl/delete.go +++ b/pkg/kubectl/delete.go @@ -39,6 +39,7 @@ import ( coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob" ) const ( @@ -155,7 +156,7 @@ func getOverlappingControllers(rcClient coreclient.ReplicationControllerInterfac func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error { rc := reaper.client.ReplicationControllers(namespace) - scaler := NewScaler(reaper.scaleClient, schema.GroupResource{Resource: "replicationcontrollers"}) + scaler := NewScaler(reaper.scaleClient) ctrl, err := rc.Get(name, metav1.GetOptions{}) if err != nil { return err @@ -206,7 +207,7 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout // No overlapping controllers. retry := NewRetryParams(reaper.pollInterval, reaper.timeout) waitForReplicas := NewRetryParams(reaper.pollInterval, timeout) - if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) { + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil && !errors.IsNotFound(err) { return err } } @@ -224,7 +225,7 @@ func getOverlappingReplicaSets(c extensionsclient.ReplicaSetInterface, rs *exten func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error { rsc := reaper.client.ReplicaSets(namespace) - scaler := NewScaler(reaper.scaleClient, reaper.gr) + scaler := NewScaler(reaper.scaleClient) rs, err := rsc.Get(name, metav1.GetOptions{}) if err != nil { return err @@ -276,7 +277,7 @@ func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Durati // No overlapping ReplicaSets. retry := NewRetryParams(reaper.pollInterval, reaper.timeout) waitForReplicas := NewRetryParams(reaper.pollInterval, timeout) - if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) { + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas, reaper.gr); err != nil && !errors.IsNotFound(err) { return err } } @@ -325,7 +326,7 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error { statefulsets := reaper.client.StatefulSets(namespace) - scaler := NewScaler(reaper.scaleClient, apps.Resource("statefulsets")) + scaler := NewScaler(reaper.scaleClient) ss, err := statefulsets.Get(name, metav1.GetOptions{}) if err != nil { return err @@ -340,7 +341,7 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat retry := NewRetryParams(reaper.pollInterval, reaper.timeout) waitForStatefulSet := NewRetryParams(reaper.pollInterval, timeout) - if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet); err != nil && !errors.IsNotFound(err) { + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet, apps.Resource("statefulsets")); err != nil && !errors.IsNotFound(err) { return err } @@ -354,7 +355,9 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error { jobs := reaper.client.Jobs(namespace) pods := reaper.podClient.Pods(namespace) - scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, reaper.client, nil, schema.GroupResource{}) + scaler := &scalejob.JobPsuedoScaler{ + JobsClient: reaper.client, + } job, err := jobs.Get(name, metav1.GetOptions{}) if err != nil { return err @@ -366,8 +369,8 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra } // TODO: handle overlapping jobs - retry := NewRetryParams(reaper.pollInterval, reaper.timeout) - waitForJobs := NewRetryParams(reaper.pollInterval, timeout) + retry := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout} + waitForJobs := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout} if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil && !errors.IsNotFound(err) { return err } diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index e1ee2bc10a5..084dce74d49 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -400,8 +400,8 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi // scalerScaleAndWait scales a controller using a Scaler and a real client. func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { - scaler := NewScaler(r.scaleClient, schema.GroupResource{Resource: "replicationcontrollers"}) - if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil { + scaler := NewScaler(r.scaleClient) + if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil { return nil, err } return r.rcClient.ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{}) diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index bc2eb178aa5..590b600fd49 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -23,13 +23,10 @@ import ( autoscalingapi "k8s.io/api/autoscaling/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/pkg/apis/batch" scaleclient "k8s.io/client-go/scale" - batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" ) // TODO: Figure out if we should be waiting on initializers in the Scale() functions below. @@ -40,30 +37,15 @@ type Scaler interface { // retries in the event of resource version mismatch (if retry is not nil), // and optionally waits until the status of the resource matches newSize (if wait is not nil) // TODO: Make the implementation of this watch-based (#56075) once #31345 is fixed. - Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams) error + Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams, gr schema.GroupResource) error // ScaleSimple does a simple one-shot attempt at scaling - not useful on its own, but // a necessary building block for Scale - ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) -} - -// ScalerFor gets a scaler for a given resource -func ScalerFor(kind schema.GroupKind, jobsClient batchclient.JobsGetter, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler { - // it seems like jobs dont't follow "normal" scale semantics. - // For example it is not clear whether HPA could make use of it or not. - // For more details see: https://github.com/kubernetes/kubernetes/pull/58468 - switch kind { - case batch.Kind("Job"): - return &jobScaler{jobsClient} // Either kind of job can be scaled with Batch interface. - default: - return NewScaler(scalesGetter, gr) - } + ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gr schema.GroupResource) (updatedResourceVersion string, err error) } // NewScaler get a scaler for a given resource -// Note that if you are trying to crate create a scaler for "job" then stop and use ScalerFor instead. -// When scaling jobs is dead, we'll remove ScalerFor method. -func NewScaler(scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler { - return &genericScaler{scalesGetter, gr} +func NewScaler(scalesGetter scaleclient.ScalesGetter) Scaler { + return &genericScaler{scalesGetter} } // ScalePrecondition describes a condition that must be true for the scale to take place @@ -97,9 +79,9 @@ func NewRetryParams(interval, timeout time.Duration) *RetryParams { } // ScaleCondition is a closure around Scale that facilitates retries via util.wait -func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc { +func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string, gr schema.GroupResource) wait.ConditionFunc { return func() (bool, error) { - rv, err := r.ScaleSimple(namespace, name, precondition, count) + rv, err := r.ScaleSimple(namespace, name, precondition, count, gr) if updatedResourceVersion != nil { *updatedResourceVersion = rv } @@ -114,74 +96,6 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s } } -// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise. -func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error { - if precondition.Size != -1 && job.Spec.Parallelism == nil { - return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"} - } - if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size { - return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))} - } - if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion { - return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion} - } - return nil -} - -type jobScaler struct { - c batchclient.JobsGetter -} - -// ScaleSimple is responsible for updating job's parallelism. It returns the -// resourceVersion of the job if the update is successful. -func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { - job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return "", err - } - if preconditions != nil { - if err := preconditions.ValidateJob(job); err != nil { - return "", err - } - } - parallelism := int32(newSize) - job.Spec.Parallelism = ¶llelism - updatedJob, err := scaler.c.Jobs(namespace).Update(job) - if err != nil { - return "", err - } - return updatedJob.ObjectMeta.ResourceVersion, nil -} - -// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil), -// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired -// number, which can be less than requested based on job's current progress. -func (scaler *jobScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { - if preconditions == nil { - preconditions = &ScalePrecondition{-1, ""} - } - if retry == nil { - // Make it try only once, immediately - retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} - } - cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) - if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { - return err - } - if waitForReplicas != nil { - job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return err - } - err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, JobHasDesiredParallelism(scaler.c, job)) - if err == wait.ErrWaitTimeout { - return fmt.Errorf("timed out waiting for %q to be synced", name) - } - return err - } - return nil -} - // validateGeneric ensures that the preconditions match. Returns nil if they are valid, otherwise an error func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) error { if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size { @@ -196,14 +110,13 @@ func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) err // genericScaler can update scales for resources in a particular namespace type genericScaler struct { scaleNamespacer scaleclient.ScalesGetter - targetGR schema.GroupResource } var _ Scaler = &genericScaler{} // ScaleSimple updates a scale of a given resource. It returns the resourceVersion of the scale if the update was successful. -func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) { - scale, err := s.scaleNamespacer.Scales(namespace).Get(s.targetGR, name) +func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gr schema.GroupResource) (updatedResourceVersion string, err error) { + scale, err := s.scaleNamespacer.Scales(namespace).Get(gr, name) if err != nil { return "", err } @@ -214,7 +127,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale } scale.Spec.Replicas = int32(newSize) - updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(s.targetGR, scale) + updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(gr, scale) if err != nil { return "", err } @@ -223,7 +136,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale // Scale updates a scale of a given resource to a new size, with optional precondition check (if preconditions is not nil), // optional retries (if retry is not nil), and then optionally waits for the status to reach desired count. -func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { +func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams, gr schema.GroupResource) error { if preconditions == nil { preconditions = &ScalePrecondition{-1, ""} } @@ -231,7 +144,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec // make it try only once, immediately retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } - cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil) + cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil, gr) if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } @@ -239,7 +152,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec err := wait.PollImmediate( waitForReplicas.Interval, waitForReplicas.Timeout, - scaleHasDesiredReplicas(s.scaleNamespacer, s.targetGR, resourceName, namespace, int32(newSize))) + scaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, int32(newSize))) if err == wait.ErrWaitTimeout { return fmt.Errorf("timed out waiting for %q to be synced", resourceName) } diff --git a/pkg/kubectl/scale_test.go b/pkg/kubectl/scale_test.go index d75b9ac15a1..e6a5106d56c 100644 --- a/pkg/kubectl/scale_test.go +++ b/pkg/kubectl/scale_test.go @@ -17,7 +17,6 @@ limitations under the License. package kubectl import ( - "errors" "fmt" "testing" "time" @@ -26,11 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/scale" - testcore "k8s.io/client-go/testing" - "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" ) func TestReplicationControllerScaleRetry(t *testing.T) { @@ -39,13 +34,13 @@ func TestReplicationControllerScaleRetry(t *testing.T) { } scaleClientExpectedAction := []string{"get", "update", "get"} scaleClient := createFakeScaleClient("replicationcontrollers", "foo-v1", 2, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo-v1" namespace := metav1.NamespaceDefault - scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -54,7 +49,7 @@ func TestReplicationControllerScaleRetry(t *testing.T) { t.Errorf("Did not expect an error on update conflict failure, got %v", err) } preconditions = ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil) + scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) pass, err = scaleFunc() if err == nil { t.Errorf("Expected error on precondition failure") @@ -76,13 +71,13 @@ func TestReplicationControllerScaleInvalid(t *testing.T) { } scaleClientExpectedAction := []string{"get", "update"} scaleClient := createFakeScaleClient("replicationcontrollers", "foo-v1", 1, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo-v1" namespace := "default" - scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -104,11 +99,11 @@ func TestReplicationControllerScaleInvalid(t *testing.T) { func TestReplicationControllerScale(t *testing.T) { scaleClientExpectedAction := []string{"get", "update"} scaleClient := createFakeScaleClient("replicationcontrollers", "foo-v1", 2, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo-v1" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) if err != nil { t.Fatalf("unexpected error occurred = %v while scaling the resource", err) @@ -127,11 +122,11 @@ func TestReplicationControllerScale(t *testing.T) { func TestReplicationControllerScaleFailsPreconditions(t *testing.T) { scaleClientExpectedAction := []string{"get"} scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 10, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{2, ""} count := uint(3) name := "foo" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"}) if err == nil { t.Fatal("expected to get an error but none was returned") } @@ -146,281 +141,19 @@ func TestReplicationControllerScaleFailsPreconditions(t *testing.T) { } } -type errorJobs struct { - batchclient.JobInterface - conflict bool - invalid bool -} - -func (c *errorJobs) Update(job *batch.Job) (*batch.Job, error) { - switch { - case c.invalid: - return nil, kerrors.NewInvalid(api.Kind(job.Kind), job.Name, nil) - case c.conflict: - return nil, kerrors.NewConflict(api.Resource(job.Kind), job.Name, nil) - } - return nil, errors.New("Job update failure") -} - -func (c *errorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) { - zero := int32(0) - return &batch.Job{ - Spec: batch.JobSpec{ - Parallelism: &zero, - }, - }, nil -} - -type errorJobClient struct { - batchclient.JobsGetter - conflict bool - invalid bool -} - -func (c *errorJobClient) Jobs(namespace string) batchclient.JobInterface { - return &errorJobs{ - JobInterface: c.JobsGetter.Jobs(namespace), - conflict: c.conflict, - invalid: c.invalid, - } -} - -func TestJobScaleRetry(t *testing.T) { - fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true} - scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake, nil, schema.GroupResource{}) - preconditions := ScalePrecondition{-1, ""} - count := uint(3) - name := "foo" - namespace := "default" - - scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil) - pass, err := scaleFunc() - if pass != false { - t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) - } - if err != nil { - t.Errorf("Did not expect an error on update failure, got %v", err) - } - preconditions = ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil) - pass, err = scaleFunc() - if err == nil { - t.Error("Expected error on precondition failure") - } -} - -func job() *batch.Job { - return &batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: metav1.NamespaceDefault, - Name: "foo", - }, - } -} - -func TestJobScale(t *testing.T) { - fakeClientset := fake.NewSimpleClientset(job()) - scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fakeClientset.Batch(), nil, schema.GroupResource{}) - preconditions := ScalePrecondition{-1, ""} - count := uint(3) - name := "foo" - scaler.Scale("default", name, count, &preconditions, nil, nil) - - actions := fakeClientset.Actions() - if len(actions) != 2 { - t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions) - } - if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name { - t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name) - } - if action, ok := actions[1].(testcore.UpdateAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || *action.GetObject().(*batch.Job).Spec.Parallelism != int32(count) { - t.Errorf("unexpected action %v, expected update-job with parallelism = %d", actions[1], count) - } -} - -func TestJobScaleInvalid(t *testing.T) { - fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true} - scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake, nil, schema.GroupResource{}) - preconditions := ScalePrecondition{-1, ""} - count := uint(3) - name := "foo" - namespace := "default" - - scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil) - pass, err := scaleFunc() - if pass { - t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) - } - if err == nil { - t.Errorf("Expected error on invalid update failure, got %v", err) - } -} - -func TestJobScaleFailsPreconditions(t *testing.T) { - ten := int32(10) - fake := fake.NewSimpleClientset(&batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: metav1.NamespaceDefault, - Name: "foo", - }, - Spec: batch.JobSpec{ - Parallelism: &ten, - }, - }) - scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake.Batch(), nil, schema.GroupResource{}) - preconditions := ScalePrecondition{2, ""} - count := uint(3) - name := "foo" - scaler.Scale("default", name, count, &preconditions, nil, nil) - - actions := fake.Actions() - if len(actions) != 1 { - t.Errorf("unexpected actions: %v, expected 1 actions (get)", actions) - } - if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name { - t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name) - } -} - -func TestValidateJob(t *testing.T) { - zero, ten, twenty := int32(0), int32(10), int32(20) - tests := []struct { - preconditions ScalePrecondition - job batch.Job - expectError bool - test string - }{ - { - preconditions: ScalePrecondition{-1, ""}, - expectError: false, - test: "defaults", - }, - { - preconditions: ScalePrecondition{-1, ""}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "foo", - }, - Spec: batch.JobSpec{ - Parallelism: &ten, - }, - }, - expectError: false, - test: "defaults 2", - }, - { - preconditions: ScalePrecondition{0, ""}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "foo", - }, - Spec: batch.JobSpec{ - Parallelism: &zero, - }, - }, - expectError: false, - test: "size matches", - }, - { - preconditions: ScalePrecondition{-1, "foo"}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "foo", - }, - Spec: batch.JobSpec{ - Parallelism: &ten, - }, - }, - expectError: false, - test: "resource version matches", - }, - { - preconditions: ScalePrecondition{10, "foo"}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "foo", - }, - Spec: batch.JobSpec{ - Parallelism: &ten, - }, - }, - expectError: false, - test: "both match", - }, - { - preconditions: ScalePrecondition{10, "foo"}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "foo", - }, - Spec: batch.JobSpec{ - Parallelism: &twenty, - }, - }, - expectError: true, - test: "size different", - }, - { - preconditions: ScalePrecondition{10, "foo"}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "foo", - }, - }, - expectError: true, - test: "parallelism nil", - }, - { - preconditions: ScalePrecondition{10, "foo"}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "bar", - }, - Spec: batch.JobSpec{ - Parallelism: &ten, - }, - }, - expectError: true, - test: "version different", - }, - { - preconditions: ScalePrecondition{10, "foo"}, - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "bar", - }, - Spec: batch.JobSpec{ - Parallelism: &twenty, - }, - }, - expectError: true, - test: "both different", - }, - } - for _, test := range tests { - err := test.preconditions.ValidateJob(&test.job) - if err != nil && !test.expectError { - t.Errorf("unexpected error: %v (%s)", err, test.test) - } - if err == nil && test.expectError { - t.Errorf("expected an error: %v (%s)", err, test.test) - } - } -} - func TestDeploymentScaleRetry(t *testing.T) { verbsOnError := map[string]*kerrors.StatusError{ "update": kerrors.NewConflict(api.Resource("Status"), "foo", nil), } scaleClientExpectedAction := []string{"get", "update", "get"} scaleClient := createFakeScaleClient("deployments", "foo", 2, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"}) + scaler := NewScaler(scaleClient) preconditions := &ScalePrecondition{-1, ""} count := uint(3) name := "foo" namespace := "default" - scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "deployments"}) pass, err := scaleFunc() if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -429,7 +162,7 @@ func TestDeploymentScaleRetry(t *testing.T) { t.Errorf("Did not expect an error on update failure, got %v", err) } preconditions = &ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil) + scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "deployments"}) pass, err = scaleFunc() if err == nil { t.Error("Expected error on precondition failure") @@ -448,11 +181,11 @@ func TestDeploymentScaleRetry(t *testing.T) { func TestDeploymentScale(t *testing.T) { scaleClientExpectedAction := []string{"get", "update"} scaleClient := createFakeScaleClient("deployments", "foo", 2, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "deployments"}) if err != nil { t.Fatal(err) } @@ -473,13 +206,13 @@ func TestDeploymentScaleInvalid(t *testing.T) { "update": kerrors.NewInvalid(api.Kind("Status"), "foo", nil), } scaleClient := createFakeScaleClient("deployments", "foo", 2, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" namespace := "default" - scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "deployments"}) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -501,11 +234,11 @@ func TestDeploymentScaleInvalid(t *testing.T) { func TestDeploymentScaleFailsPreconditions(t *testing.T) { scaleClientExpectedAction := []string{"get"} scaleClient := createFakeScaleClient("deployments", "foo", 10, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{2, ""} count := uint(3) name := "foo" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "deployments"}) if err == nil { t.Fatal("exptected to get an error but none was returned") } @@ -523,11 +256,11 @@ func TestDeploymentScaleFailsPreconditions(t *testing.T) { func TestStatefulSetScale(t *testing.T) { scaleClientExpectedAction := []string{"get", "update"} scaleClient := createFakeScaleClient("statefulsets", "foo", 2, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefullset"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "statefullset"}) if err != nil { t.Fatal(err) } @@ -548,13 +281,13 @@ func TestStatefulSetScaleRetry(t *testing.T) { "update": kerrors.NewConflict(api.Resource("Status"), "foo", nil), } scaleClient := createFakeScaleClient("statefulsets", "foo", 2, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefulsets"}) + scaler := NewScaler(scaleClient) preconditions := &ScalePrecondition{-1, ""} count := uint(3) name := "foo" namespace := "default" - scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"}) pass, err := scaleFunc() if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -563,7 +296,7 @@ func TestStatefulSetScaleRetry(t *testing.T) { t.Errorf("Did not expect an error on update failure, got %v", err) } preconditions = &ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil) + scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"}) pass, err = scaleFunc() if err == nil { t.Error("Expected error on precondition failure") @@ -585,13 +318,13 @@ func TestStatefulSetScaleInvalid(t *testing.T) { "update": kerrors.NewInvalid(api.Kind("Status"), "foo", nil), } scaleClient := createFakeScaleClient("statefulsets", "foo", 2, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefulsets"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" namespace := "default" - scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"}) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -613,11 +346,11 @@ func TestStatefulSetScaleInvalid(t *testing.T) { func TestStatefulSetScaleFailsPreconditions(t *testing.T) { scaleClientExpectedAction := []string{"get"} scaleClient := createFakeScaleClient("statefulsets", "foo", 10, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefulsets"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{2, ""} count := uint(3) name := "foo" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"}) if err == nil { t.Fatal("expected to get an error but none was returned") } @@ -635,11 +368,11 @@ func TestStatefulSetScaleFailsPreconditions(t *testing.T) { func TestReplicaSetScale(t *testing.T) { scaleClientExpectedAction := []string{"get", "update"} scaleClient := createFakeScaleClient("replicasets", "foo", 10, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) if err != nil { t.Fatal(err) } @@ -660,13 +393,13 @@ func TestReplicaSetScaleRetry(t *testing.T) { } scaleClientExpectedAction := []string{"get", "update", "get"} scaleClient := createFakeScaleClient("replicasets", "foo", 2, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) + scaler := NewScaler(scaleClient) preconditions := &ScalePrecondition{-1, ""} count := uint(3) name := "foo" namespace := "default" - scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) pass, err := scaleFunc() if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -675,7 +408,7 @@ func TestReplicaSetScaleRetry(t *testing.T) { t.Errorf("Did not expect an error on update failure, got %v", err) } preconditions = &ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil) + scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) pass, err = scaleFunc() if err == nil { t.Error("Expected error on precondition failure") @@ -697,13 +430,13 @@ func TestReplicaSetScaleInvalid(t *testing.T) { } scaleClientExpectedAction := []string{"get", "update"} scaleClient := createFakeScaleClient("replicasets", "foo", 2, verbsOnError) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" namespace := "default" - scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil) + scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -725,11 +458,11 @@ func TestReplicaSetScaleInvalid(t *testing.T) { func TestReplicaSetsGetterFailsPreconditions(t *testing.T) { scaleClientExpectedAction := []string{"get"} scaleClient := createFakeScaleClient("replicasets", "foo", 10, nil) - scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) + scaler := NewScaler(scaleClient) preconditions := ScalePrecondition{2, ""} count := uint(3) name := "foo" - err := scaler.Scale("default", name, count, &preconditions, nil, nil) + err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"}) if err == nil { t.Fatal("expected to get an error but non was returned") } @@ -812,9 +545,9 @@ func TestGenericScaleSimple(t *testing.T) { // act for index, scenario := range scenarios { t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) { - target := NewScaler(scenario.scaleGetter, scenario.targetGR) + target := NewScaler(scenario.scaleGetter) - resVersion, err := target.ScaleSimple("default", scenario.resName, &scenario.precondition, uint(scenario.newSize)) + resVersion, err := target.ScaleSimple("default", scenario.resName, &scenario.precondition, uint(scenario.newSize), scenario.targetGR) if scenario.expectError && err == nil { t.Fatal("expected an error but was not returned") @@ -880,9 +613,9 @@ func TestGenericScale(t *testing.T) { // act for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { - target := NewScaler(scenario.scaleGetter, scenario.targetGR) + target := NewScaler(scenario.scaleGetter) - err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas) + err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas, scenario.targetGR) if scenario.expectError && err == nil { t.Fatal("expected an error but was not returned") diff --git a/test/e2e/apps/daemon_restart.go b/test/e2e/apps/daemon_restart.go index c70556918e1..dfb2c6a1a2c 100644 --- a/test/e2e/apps/daemon_restart.go +++ b/test/e2e/apps/daemon_restart.go @@ -258,7 +258,7 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { // that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC // to the same size achieves this, because the scale operation advances the RC's sequence number // and awaits it to be observed and reported back in the RC's status. - framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods, true) + framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rcName, numPods, true) // Only check the keys, the pods can be different if the kubelet updated it. // TODO: Can it really? @@ -289,9 +289,9 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { restarter.kill() // This is best effort to try and create pods while the scheduler is down, // since we don't know exactly when it is restarted after the kill signal. - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, false)) restarter.waitUp() - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, true)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, true)) }) It("Kubelet should not restart containers across restart", func() { diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 6fa937c89f6..bb7427f5a46 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -521,7 +521,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling rethinkdb") - framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "rethinkdb-rc", 2, true) + framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, "rethinkdb-rc", 2, true) checkDbInstances() By("starting admin") @@ -564,7 +564,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling hazelcast") - framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "hazelcast", 2, true) + framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, "hazelcast", 2, true) forEachPod("name", "hazelcast", func(pod v1.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/framework/deployment_util.go b/test/e2e/framework/deployment_util.go index fe7dfab0f50..bd9b50c1bb9 100644 --- a/test/e2e/framework/deployment_util.go +++ b/test/e2e/framework/deployment_util.go @@ -31,7 +31,6 @@ import ( clientset "k8s.io/client-go/kubernetes" scaleclient "k8s.io/client-go/scale" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" testutils "k8s.io/kubernetes/test/utils" ) @@ -179,8 +178,8 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er return err } -func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments")) +func ScaleDeployment(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments")) } func RunDeployment(config testutils.DeploymentConfig) error { diff --git a/test/e2e/framework/rc_util.go b/test/e2e/framework/rc_util.go index 7e3fedbc76f..ced976c48e9 100644 --- a/test/e2e/framework/rc_util.go +++ b/test/e2e/framework/rc_util.go @@ -85,9 +85,7 @@ func RcByNameContainer(name string, replicas int32, image string, labels map[str // ScaleRCByLabels scales an RC via ns/label lookup. If replicas == 0 it waits till // none are running, otherwise it does what a synchronous scale operation would do. -//TODO(p0lyn0mial): remove internalClientset. -//TODO(p0lyn0mial): update the callers. -func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error { +func ScaleRCByLabels(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error { listOpts := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(l)).String()} rcs, err := clientset.CoreV1().ReplicationControllers(ns).List(listOpts) if err != nil { @@ -99,7 +97,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl Logf("Scaling %v RCs with labels %v in ns %v to %v replicas.", len(rcs.Items), l, ns, replicas) for _, labelRC := range rcs.Items { name := labelRC.Name - if err := ScaleRC(clientset, internalClientset, scalesGetter, ns, name, replicas, false); err != nil { + if err := ScaleRC(clientset, scalesGetter, ns, name, replicas, false); err != nil { return err } rc, err := clientset.CoreV1().ReplicationControllers(ns).Get(name, metav1.GetOptions{}) @@ -159,8 +157,8 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl return DeleteResourceAndPods(clientset, internalClientset, scaleClient, api.Kind("ReplicationController"), ns, name) } -func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers")) +func ScaleRC(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers")) } func RunRC(config testutils.RCConfig) error { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 3384cd06ac0..442a1198f9d 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2803,7 +2803,6 @@ func RemoveAvoidPodsOffNode(c clientset.Interface, nodeName string) { func ScaleResource( clientset clientset.Interface, - internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, @@ -2812,8 +2811,8 @@ func ScaleResource( gr schema.GroupResource, ) error { By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size)) - scaler := kubectl.ScalerFor(kind, internalClientset.Batch(), scalesGetter, gr) - if err := testutils.ScaleResourceWithRetries(scaler, ns, name, size); err != nil { + scaler := kubectl.NewScaler(scalesGetter) + if err := testutils.ScaleResourceWithRetries(scaler, ns, name, size, gr); err != nil { return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err) } if !wait { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 4ee76c2a7c5..3524a47e3c8 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -1291,7 +1291,7 @@ var _ = SIGDescribe("Services", func() { } By("Scaling down replication controller to zero") - framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false) + framework.ScaleRC(f.ClientSet, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false) By("Update service to not tolerate unready services") _, err = framework.UpdateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) { diff --git a/test/e2e/scalability/load.go b/test/e2e/scalability/load.go index e0a935dcfab..1fb4cb48cf7 100644 --- a/test/e2e/scalability/load.go +++ b/test/e2e/scalability/load.go @@ -649,7 +649,6 @@ func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scaling newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2) framework.ExpectNoError(framework.ScaleResource( config.GetClient(), - config.GetInternalClient(), config.GetScalesGetter(), config.GetNamespace(), config.GetName(), diff --git a/test/e2e/scheduling/equivalence_cache_predicates.go b/test/e2e/scheduling/equivalence_cache_predicates.go index 3bde9d2bcc1..7c7e9345780 100644 --- a/test/e2e/scheduling/equivalence_cache_predicates.go +++ b/test/e2e/scheduling/equivalence_cache_predicates.go @@ -156,7 +156,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() { By("Trying to schedule another equivalent Pod should fail due to node label has been removed.") // use scale to create another equivalent pod and wait for failure event WaitForSchedulerAfterAction(f, func() error { - err := framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false) + err := framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false) return err }, ns, affinityRCName, false) // and this new pod should be rejected since node label has been updated diff --git a/test/e2e/scheduling/priorities.go b/test/e2e/scheduling/priorities.go index 26032d6140e..7a552adbedc 100644 --- a/test/e2e/scheduling/priorities.go +++ b/test/e2e/scheduling/priorities.go @@ -195,7 +195,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() { By(fmt.Sprintf("Scale the RC: %s to len(nodeList.Item)-1 : %v.", rc.Name, len(nodeList.Items)-1)) - framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true) + framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true) testPods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{ LabelSelector: "name=scheduler-priority-avoid-pod", }) diff --git a/test/e2e/scheduling/rescheduler.go b/test/e2e/scheduling/rescheduler.go index 8224bc98bfb..5452f119244 100644 --- a/test/e2e/scheduling/rescheduler.go +++ b/test/e2e/scheduling/rescheduler.go @@ -69,8 +69,8 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() { deployment := deployments.Items[0] replicas := uint(*(deployment.Spec.Replicas)) - err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true) - defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true)) + err = framework.ScaleDeployment(f.ClientSet, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true) + defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true)) framework.ExpectNoError(err) }) @@ -81,7 +81,7 @@ func reserveAllCpu(f *framework.Framework, id string, millicores int) error { replicas := millicores / 100 reserveCpu(f, id, 1, 100) - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false)) for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) { pods, err := framework.GetPodsInNamespace(f.ClientSet, f.Namespace.Name, framework.ImagePullerLabels) diff --git a/test/utils/update_resources.go b/test/utils/update_resources.go index df435295d2b..b666454e54a 100644 --- a/test/utils/update_resources.go +++ b/test/utils/update_resources.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/kubectl" ) @@ -32,10 +33,10 @@ const ( waitRetryTImeout = 5 * time.Minute ) -func ScaleResourceWithRetries(scaler kubectl.Scaler, namespace, name string, size uint) error { +func ScaleResourceWithRetries(scaler kubectl.Scaler, namespace, name string, size uint, gr schema.GroupResource) error { waitForScale := kubectl.NewRetryParams(updateRetryInterval, updateRetryTimeout) waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTImeout) - if err := scaler.Scale(namespace, name, size, nil, waitForScale, waitForReplicas); err != nil { + if err := scaler.Scale(namespace, name, size, nil, waitForScale, waitForReplicas, gr); err != nil { return fmt.Errorf("Error while scaling %s to %d replicas: %v", name, size, err) } return nil