mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 15:05:20 +00:00
removes job scaler
This commit is contained in:
@@ -23,13 +23,11 @@ import (
|
||||
|
||||
autoscalingapi "k8s.io/api/autoscaling/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
|
||||
scaleclient "k8s.io/client-go/scale"
|
||||
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
|
||||
)
|
||||
|
||||
// TODO: Figure out if we should be waiting on initializers in the Scale() functions below.
|
||||
@@ -40,30 +38,15 @@ type Scaler interface {
|
||||
// retries in the event of resource version mismatch (if retry is not nil),
|
||||
// and optionally waits until the status of the resource matches newSize (if wait is not nil)
|
||||
// TODO: Make the implementation of this watch-based (#56075) once #31345 is fixed.
|
||||
Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams) error
|
||||
Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams, gr schema.GroupResource) error
|
||||
// ScaleSimple does a simple one-shot attempt at scaling - not useful on its own, but
|
||||
// a necessary building block for Scale
|
||||
ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error)
|
||||
}
|
||||
|
||||
// ScalerFor gets a scaler for a given resource
|
||||
func ScalerFor(kind schema.GroupKind, jobsClient batchclient.JobsGetter, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler {
|
||||
// it seems like jobs dont't follow "normal" scale semantics.
|
||||
// For example it is not clear whether HPA could make use of it or not.
|
||||
// For more details see: https://github.com/kubernetes/kubernetes/pull/58468
|
||||
switch kind {
|
||||
case batch.Kind("Job"):
|
||||
return &jobScaler{jobsClient} // Either kind of job can be scaled with Batch interface.
|
||||
default:
|
||||
return NewScaler(scalesGetter, gr)
|
||||
}
|
||||
ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gr schema.GroupResource) (updatedResourceVersion string, err error)
|
||||
}
|
||||
|
||||
// NewScaler get a scaler for a given resource
|
||||
// Note that if you are trying to crate create a scaler for "job" then stop and use ScalerFor instead.
|
||||
// When scaling jobs is dead, we'll remove ScalerFor method.
|
||||
func NewScaler(scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler {
|
||||
return &genericScaler{scalesGetter, gr}
|
||||
func NewScaler(scalesGetter scaleclient.ScalesGetter) Scaler {
|
||||
return &genericScaler{scalesGetter}
|
||||
}
|
||||
|
||||
// ScalePrecondition describes a condition that must be true for the scale to take place
|
||||
@@ -97,9 +80,9 @@ func NewRetryParams(interval, timeout time.Duration) *RetryParams {
|
||||
}
|
||||
|
||||
// ScaleCondition is a closure around Scale that facilitates retries via util.wait
|
||||
func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc {
|
||||
func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string, gr schema.GroupResource) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
rv, err := r.ScaleSimple(namespace, name, precondition, count)
|
||||
rv, err := r.ScaleSimple(namespace, name, precondition, count, gr)
|
||||
if updatedResourceVersion != nil {
|
||||
*updatedResourceVersion = rv
|
||||
}
|
||||
@@ -128,60 +111,6 @@ func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type jobScaler struct {
|
||||
c batchclient.JobsGetter
|
||||
}
|
||||
|
||||
// ScaleSimple is responsible for updating job's parallelism. It returns the
|
||||
// resourceVersion of the job if the update is successful.
|
||||
func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
|
||||
job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if preconditions != nil {
|
||||
if err := preconditions.ValidateJob(job); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
parallelism := int32(newSize)
|
||||
job.Spec.Parallelism = ¶llelism
|
||||
updatedJob, err := scaler.c.Jobs(namespace).Update(job)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return updatedJob.ObjectMeta.ResourceVersion, nil
|
||||
}
|
||||
|
||||
// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil),
|
||||
// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired
|
||||
// number, which can be less than requested based on job's current progress.
|
||||
func (scaler *jobScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||
if preconditions == nil {
|
||||
preconditions = &ScalePrecondition{-1, ""}
|
||||
}
|
||||
if retry == nil {
|
||||
// Make it try only once, immediately
|
||||
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
|
||||
}
|
||||
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
|
||||
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
|
||||
return err
|
||||
}
|
||||
if waitForReplicas != nil {
|
||||
job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, JobHasDesiredParallelism(scaler.c, job))
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return fmt.Errorf("timed out waiting for %q to be synced", name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateGeneric ensures that the preconditions match. Returns nil if they are valid, otherwise an error
|
||||
func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) error {
|
||||
if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size {
|
||||
@@ -196,14 +125,13 @@ func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) err
|
||||
// genericScaler can update scales for resources in a particular namespace
|
||||
type genericScaler struct {
|
||||
scaleNamespacer scaleclient.ScalesGetter
|
||||
targetGR schema.GroupResource
|
||||
}
|
||||
|
||||
var _ Scaler = &genericScaler{}
|
||||
|
||||
// ScaleSimple updates a scale of a given resource. It returns the resourceVersion of the scale if the update was successful.
|
||||
func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) {
|
||||
scale, err := s.scaleNamespacer.Scales(namespace).Get(s.targetGR, name)
|
||||
func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gr schema.GroupResource) (updatedResourceVersion string, err error) {
|
||||
scale, err := s.scaleNamespacer.Scales(namespace).Get(gr, name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -214,7 +142,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale
|
||||
}
|
||||
|
||||
scale.Spec.Replicas = int32(newSize)
|
||||
updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(s.targetGR, scale)
|
||||
updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(gr, scale)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -223,7 +151,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale
|
||||
|
||||
// Scale updates a scale of a given resource to a new size, with optional precondition check (if preconditions is not nil),
|
||||
// optional retries (if retry is not nil), and then optionally waits for the status to reach desired count.
|
||||
func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||
func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams, gr schema.GroupResource) error {
|
||||
if preconditions == nil {
|
||||
preconditions = &ScalePrecondition{-1, ""}
|
||||
}
|
||||
@@ -231,7 +159,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec
|
||||
// make it try only once, immediately
|
||||
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
|
||||
}
|
||||
cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil)
|
||||
cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil, gr)
|
||||
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -239,7 +167,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec
|
||||
err := wait.PollImmediate(
|
||||
waitForReplicas.Interval,
|
||||
waitForReplicas.Timeout,
|
||||
scaleHasDesiredReplicas(s.scaleNamespacer, s.targetGR, resourceName, namespace, int32(newSize)))
|
||||
scaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, int32(newSize)))
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return fmt.Errorf("timed out waiting for %q to be synced", resourceName)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user