mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #58298 from p0lyn0mial/generic_scaler_scalerfor_continued
Automatic merge from submit-queue (batch tested with PRs 58955, 58968, 58971, 58963, 58298). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. removes the remainder from ScalerFor method **What this PR does / why we need it**: this PR removes existing scalers from `ScalerFor` method **Release note**: ```release-note NONE ```
This commit is contained in:
commit
235714e7f3
@ -307,7 +307,7 @@ func (f *ring1Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error)
|
||||
scalesGetter := scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
|
||||
gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource)
|
||||
|
||||
return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset, scalesGetter, gvk.GroupResource())
|
||||
return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset.Batch(), scalesGetter, gvk.GroupResource()), nil
|
||||
}
|
||||
|
||||
func (f *ring1Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) {
|
||||
|
@ -347,7 +347,7 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat
|
||||
func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
||||
jobs := reaper.client.Jobs(namespace)
|
||||
pods := reaper.podClient.Pods(namespace)
|
||||
scaler := &JobScaler{reaper.client}
|
||||
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, reaper.client, nil, schema.GroupResource{})
|
||||
job, err := jobs.Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -34,7 +34,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
|
||||
scaleclient "k8s.io/client-go/scale"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
|
||||
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
|
||||
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
||||
@ -54,20 +53,15 @@ type Scaler interface {
|
||||
}
|
||||
|
||||
// ScalerFor gets a scaler for a given resource
|
||||
// TODO(p0lyn0mial): remove kind and internalclientset
|
||||
// TODO(p0lyn0mial): once we have only one scaler, there is no need to return an error anymore.
|
||||
func ScalerFor(kind schema.GroupKind, c internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (Scaler, error) {
|
||||
func ScalerFor(kind schema.GroupKind, jobsClient batchclient.JobsGetter, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler {
|
||||
// it seems like jobs dont't follow "normal" scale semantics.
|
||||
// For example it is not clear whether HPA could make use of it or not.
|
||||
// For more details see: https://github.com/kubernetes/kubernetes/pull/58468
|
||||
switch kind {
|
||||
case api.Kind("ReplicationController"):
|
||||
return &ReplicationControllerScaler{c.Core()}, nil
|
||||
case extensions.Kind("ReplicaSet"), apps.Kind("ReplicaSet"):
|
||||
return &ReplicaSetScaler{c.Extensions()}, nil
|
||||
case batch.Kind("Job"):
|
||||
return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface.
|
||||
case apps.Kind("StatefulSet"):
|
||||
return &StatefulSetScaler{c.Apps()}, nil
|
||||
return &jobScaler{jobsClient} // Either kind of job can be scaled with Batch interface.
|
||||
default:
|
||||
return &GenericScaler{scalesGetter, gr}, nil
|
||||
return &genericScaler{scalesGetter, gr}
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,6 +161,7 @@ func (precondition *ScalePrecondition) ValidateReplicationController(controller
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(p0lyn0mial): remove ReplicationControllerScaler
|
||||
type ReplicationControllerScaler struct {
|
||||
c coreclient.ReplicationControllersGetter
|
||||
}
|
||||
@ -265,6 +260,7 @@ func (precondition *ScalePrecondition) ValidateReplicaSet(replicaSet *extensions
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(p0lyn0mial): remove ReplicaSetScaler
|
||||
type ReplicaSetScaler struct {
|
||||
c extensionsclient.ReplicaSetsGetter
|
||||
}
|
||||
@ -339,6 +335,7 @@ func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(p0lyn0mial): remove StatefulSetsGetter
|
||||
type StatefulSetScaler struct {
|
||||
c appsclient.StatefulSetsGetter
|
||||
}
|
||||
@ -395,13 +392,13 @@ func (scaler *StatefulSetScaler) Scale(namespace, name string, newSize uint, pre
|
||||
return nil
|
||||
}
|
||||
|
||||
type JobScaler struct {
|
||||
type jobScaler struct {
|
||||
c batchclient.JobsGetter
|
||||
}
|
||||
|
||||
// ScaleSimple is responsible for updating job's parallelism. It returns the
|
||||
// resourceVersion of the job if the update is successful.
|
||||
func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
|
||||
func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
|
||||
job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", ScaleError{ScaleGetFailure, "", err}
|
||||
@ -426,7 +423,7 @@ func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *Scal
|
||||
// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil),
|
||||
// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired
|
||||
// number, which can be less than requested based on job's current progress.
|
||||
func (scaler *JobScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||
func (scaler *jobScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||
if preconditions == nil {
|
||||
preconditions = &ScalePrecondition{-1, ""}
|
||||
}
|
||||
@ -463,6 +460,7 @@ func (precondition *ScalePrecondition) ValidateDeployment(deployment *extensions
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(p0lyn0mial): remove DeploymentScaler
|
||||
type DeploymentScaler struct {
|
||||
c extensionsclient.DeploymentsGetter
|
||||
}
|
||||
@ -534,19 +532,16 @@ func (precondition *ScalePrecondition) validateGeneric(scale *autoscalingapi.Sca
|
||||
return nil
|
||||
}
|
||||
|
||||
// GenericScaler can update scales for resources in a particular namespace
|
||||
// TODO(po0lyn0mial): when the work on GenericScaler is done, don't
|
||||
// export the GenericScaler. Instead use ScalerFor method for getting the Scaler
|
||||
// also update the UTs
|
||||
type GenericScaler struct {
|
||||
// genericScaler can update scales for resources in a particular namespace
|
||||
type genericScaler struct {
|
||||
scaleNamespacer scaleclient.ScalesGetter
|
||||
targetGR schema.GroupResource
|
||||
}
|
||||
|
||||
var _ Scaler = &GenericScaler{}
|
||||
var _ Scaler = &genericScaler{}
|
||||
|
||||
// ScaleSimple updates a scale of a given resource. It returns the resourceVersion of the scale if the update was successful.
|
||||
func (s *GenericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) {
|
||||
func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) {
|
||||
scale, err := s.scaleNamespacer.Scales(namespace).Get(s.targetGR, name)
|
||||
if err != nil {
|
||||
return "", ScaleError{ScaleGetFailure, "", err}
|
||||
@ -570,7 +565,7 @@ func (s *GenericScaler) ScaleSimple(namespace, name string, preconditions *Scale
|
||||
|
||||
// Scale updates a scale of a given resource to a new size, with optional precondition check (if preconditions is not nil),
|
||||
// optional retries (if retry is not nil), and then optionally waits for the status to reach desired count.
|
||||
func (s *GenericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||
func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||
if preconditions == nil {
|
||||
preconditions = &ScalePrecondition{-1, ""}
|
||||
}
|
||||
|
@ -281,13 +281,13 @@ func TestValidateReplicationController(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type ErrorJobs struct {
|
||||
type errorJobs struct {
|
||||
batchclient.JobInterface
|
||||
conflict bool
|
||||
invalid bool
|
||||
}
|
||||
|
||||
func (c *ErrorJobs) Update(job *batch.Job) (*batch.Job, error) {
|
||||
func (c *errorJobs) Update(job *batch.Job) (*batch.Job, error) {
|
||||
switch {
|
||||
case c.invalid:
|
||||
return nil, kerrors.NewInvalid(api.Kind(job.Kind), job.Name, nil)
|
||||
@ -297,7 +297,7 @@ func (c *ErrorJobs) Update(job *batch.Job) (*batch.Job, error) {
|
||||
return nil, errors.New("Job update failure")
|
||||
}
|
||||
|
||||
func (c *ErrorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) {
|
||||
func (c *errorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) {
|
||||
zero := int32(0)
|
||||
return &batch.Job{
|
||||
Spec: batch.JobSpec{
|
||||
@ -306,14 +306,14 @@ func (c *ErrorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, err
|
||||
}, nil
|
||||
}
|
||||
|
||||
type ErrorJobClient struct {
|
||||
type errorJobClient struct {
|
||||
batchclient.JobsGetter
|
||||
conflict bool
|
||||
invalid bool
|
||||
}
|
||||
|
||||
func (c *ErrorJobClient) Jobs(namespace string) batchclient.JobInterface {
|
||||
return &ErrorJobs{
|
||||
func (c *errorJobClient) Jobs(namespace string) batchclient.JobInterface {
|
||||
return &errorJobs{
|
||||
JobInterface: c.JobsGetter.Jobs(namespace),
|
||||
conflict: c.conflict,
|
||||
invalid: c.invalid,
|
||||
@ -321,14 +321,14 @@ func (c *ErrorJobClient) Jobs(namespace string) batchclient.JobInterface {
|
||||
}
|
||||
|
||||
func TestJobScaleRetry(t *testing.T) {
|
||||
fake := &ErrorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true}
|
||||
scaler := JobScaler{fake}
|
||||
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true}
|
||||
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake, nil, schema.GroupResource{})
|
||||
preconditions := ScalePrecondition{-1, ""}
|
||||
count := uint(3)
|
||||
name := "foo"
|
||||
namespace := "default"
|
||||
|
||||
scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count, nil)
|
||||
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
|
||||
pass, err := scaleFunc()
|
||||
if pass != false {
|
||||
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
|
||||
@ -337,7 +337,7 @@ func TestJobScaleRetry(t *testing.T) {
|
||||
t.Errorf("Did not expect an error on update failure, got %v", err)
|
||||
}
|
||||
preconditions = ScalePrecondition{3, ""}
|
||||
scaleFunc = ScaleCondition(&scaler, &preconditions, namespace, name, count, nil)
|
||||
scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
|
||||
pass, err = scaleFunc()
|
||||
if err == nil {
|
||||
t.Error("Expected error on precondition failure")
|
||||
@ -355,7 +355,7 @@ func job() *batch.Job {
|
||||
|
||||
func TestJobScale(t *testing.T) {
|
||||
fakeClientset := fake.NewSimpleClientset(job())
|
||||
scaler := JobScaler{fakeClientset.Batch()}
|
||||
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fakeClientset.Batch(), nil, schema.GroupResource{})
|
||||
preconditions := ScalePrecondition{-1, ""}
|
||||
count := uint(3)
|
||||
name := "foo"
|
||||
@ -374,14 +374,14 @@ func TestJobScale(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJobScaleInvalid(t *testing.T) {
|
||||
fake := &ErrorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true}
|
||||
scaler := JobScaler{fake}
|
||||
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true}
|
||||
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake, nil, schema.GroupResource{})
|
||||
preconditions := ScalePrecondition{-1, ""}
|
||||
count := uint(3)
|
||||
name := "foo"
|
||||
namespace := "default"
|
||||
|
||||
scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count, nil)
|
||||
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
|
||||
pass, err := scaleFunc()
|
||||
if pass {
|
||||
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
|
||||
@ -403,7 +403,7 @@ func TestJobScaleFailsPreconditions(t *testing.T) {
|
||||
Parallelism: &ten,
|
||||
},
|
||||
})
|
||||
scaler := JobScaler{fake.Batch()}
|
||||
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake.Batch(), nil, schema.GroupResource{})
|
||||
preconditions := ScalePrecondition{2, ""}
|
||||
count := uint(3)
|
||||
name := "foo"
|
||||
@ -1425,7 +1425,7 @@ func TestGenericScaleSimple(t *testing.T) {
|
||||
// act
|
||||
for index, scenario := range scenarios {
|
||||
t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) {
|
||||
target := GenericScaler{scenario.scaleGetter, scenario.targetGR}
|
||||
target := ScalerFor(schema.GroupKind{}, nil, scenario.scaleGetter, scenario.targetGR)
|
||||
|
||||
resVersion, err := target.ScaleSimple("default", scenario.resName, &scenario.precondition, uint(scenario.newSize))
|
||||
|
||||
@ -1522,7 +1522,7 @@ func TestGenericScale(t *testing.T) {
|
||||
// act
|
||||
for index, scenario := range scenarios {
|
||||
t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) {
|
||||
target := GenericScaler{scenario.scaleGetter, scenario.targetGR}
|
||||
target := ScalerFor(schema.GroupKind{}, nil, scenario.scaleGetter, scenario.targetGR)
|
||||
|
||||
err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas)
|
||||
|
||||
|
@ -179,8 +179,6 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er
|
||||
return err
|
||||
}
|
||||
|
||||
//TODO(p0lyn0mial): remove internalClientset and kind.
|
||||
//TODO(p0lyn0mial): update the callers.
|
||||
func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
|
||||
return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments"))
|
||||
}
|
||||
|
@ -159,8 +159,6 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl
|
||||
return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name)
|
||||
}
|
||||
|
||||
//TODO(p0lyn0mial): remove internalClientset.
|
||||
//TODO(p0lyn0mial): update the callers.
|
||||
func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
|
||||
return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers"))
|
||||
}
|
||||
|
@ -2687,13 +2687,6 @@ func RemoveAvoidPodsOffNode(c clientset.Interface, nodeName string) {
|
||||
ExpectNoError(err)
|
||||
}
|
||||
|
||||
//TODO(p0lyn0mial): remove internalClientset and kind
|
||||
func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (kubectl.Scaler, error) {
|
||||
return kubectl.ScalerFor(kind, internalClientset, scalesGetter, gr)
|
||||
}
|
||||
|
||||
//TODO(p0lyn0mial): remove internalClientset and kind.
|
||||
//TODO(p0lyn0mial): update the callers.
|
||||
func ScaleResource(
|
||||
clientset clientset.Interface,
|
||||
internalClientset internalclientset.Interface,
|
||||
@ -2705,13 +2698,10 @@ func ScaleResource(
|
||||
gr schema.GroupResource,
|
||||
) error {
|
||||
By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
|
||||
scaler, err := getScalerForKind(internalClientset, kind, scalesGetter, gr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
scaler := kubectl.ScalerFor(kind, internalClientset.Batch(), scalesGetter, gr)
|
||||
waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute)
|
||||
waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
|
||||
if err = scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil {
|
||||
if err := scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil {
|
||||
return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err)
|
||||
}
|
||||
if !wait {
|
||||
|
Loading…
Reference in New Issue
Block a user