Adding scale error retries

This commit is contained in:
Krzysztof Siedlecki 2018-06-07 10:57:43 +02:00
parent f73101066a
commit 8a3c2dcc6d
4 changed files with 44 additions and 17 deletions

View File

@ -149,14 +149,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec
return err return err
} }
if waitForReplicas != nil { if waitForReplicas != nil {
err := wait.PollImmediate( return WaitForScaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, newSize, waitForReplicas)
waitForReplicas.Interval,
waitForReplicas.Timeout,
scaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, int32(newSize)))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", resourceName)
}
return err
} }
return nil return nil
} }
@ -177,3 +170,19 @@ func scaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupRe
desiredReplicas == actualScale.Status.Replicas, nil desiredReplicas == actualScale.Status.Replicas, nil
} }
} }
// WaitForScaleHasDesiredReplicas waits until condition scaleHasDesiredReplicas is satisfied
// or returns error when timeout happens
func WaitForScaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, newSize uint, waitForReplicas *RetryParams) error {
if waitForReplicas == nil {
return fmt.Errorf("waitForReplicas parameter cannot be nil!")
}
err := wait.PollImmediate(
waitForReplicas.Interval,
waitForReplicas.Timeout,
scaleHasDesiredReplicas(sClient, gr, resourceName, namespace, int32(newSize)))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", resourceName)
}
return err
}

View File

@ -60,7 +60,6 @@ go_library(
"//pkg/controller/deployment/util:go_default_library", "//pkg/controller/deployment/util:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/nodelifecycle:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library", "//pkg/kubelet/apis/stats/v1alpha1:go_default_library",

View File

@ -88,7 +88,6 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle" nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubectl"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
@ -2847,8 +2846,7 @@ func ScaleResource(
gr schema.GroupResource, gr schema.GroupResource,
) error { ) error {
By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size)) By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
scaler := kubectl.NewScaler(scalesGetter) if err := testutils.ScaleResourceWithRetries(scalesGetter, ns, name, size, gr); err != nil {
if err := testutils.ScaleResourceWithRetries(scaler, ns, name, size, gr); err != nil {
return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err) return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err)
} }
if !wait { if !wait {

View File

@ -21,6 +21,8 @@ import (
"time" "time"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl"
) )
@ -30,13 +32,32 @@ const (
updateRetryInterval = 5 * time.Second updateRetryInterval = 5 * time.Second
updateRetryTimeout = 1 * time.Minute updateRetryTimeout = 1 * time.Minute
waitRetryInterval = 5 * time.Second waitRetryInterval = 5 * time.Second
waitRetryTImeout = 5 * time.Minute waitRetryTimeout = 5 * time.Minute
) )
func ScaleResourceWithRetries(scaler kubectl.Scaler, namespace, name string, size uint, gr schema.GroupResource) error { func RetryErrorCondition(condition wait.ConditionFunc) wait.ConditionFunc {
waitForScale := kubectl.NewRetryParams(updateRetryInterval, updateRetryTimeout) return func() (bool, error) {
waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTImeout) done, err := condition()
if err := scaler.Scale(namespace, name, size, nil, waitForScale, waitForReplicas, gr); err != nil { if err != nil && IsRetryableAPIError(err) {
return false, nil
}
return done, err
}
}
func ScaleResourceWithRetries(scalesGetter scale.ScalesGetter, namespace, name string, size uint, gr schema.GroupResource) error {
scaler := kubectl.NewScaler(scalesGetter)
preconditions := &kubectl.ScalePrecondition{
Size: -1,
ResourceVersion: "",
}
waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTimeout)
cond := RetryErrorCondition(kubectl.ScaleCondition(scaler, preconditions, namespace, name, size, nil, gr))
err := wait.PollImmediate(updateRetryInterval, updateRetryTimeout, cond)
if err == nil {
err = kubectl.WaitForScaleHasDesiredReplicas(scalesGetter, gr, name, namespace, size, waitForReplicas)
}
if err != nil {
return fmt.Errorf("Error while scaling %s to %d replicas: %v", name, size, err) return fmt.Errorf("Error while scaling %s to %d replicas: %v", name, size, err)
} }
return nil return nil