diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go b/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go index 4903beb8121..5f2f1423d3f 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/apply/patcher.go @@ -17,6 +17,7 @@ limitations under the License. package apply import ( + "context" "encoding/json" "fmt" "io" @@ -387,7 +388,7 @@ func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, name return modified, nil, err } // TODO: use wait - if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, p.Timeout, true, func(ctx context.Context) (bool, error) { if _, err := p.Helper.Get(namespace, name); !apierrors.IsNotFound(err) { return false, err } diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/replace/replace.go b/staging/src/k8s.io/kubectl/pkg/cmd/replace/replace.go index b8989762a40..97c5816a3eb 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/replace/replace.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/replace/replace.go @@ -17,6 +17,7 @@ limitations under the License. package replace import ( + "context" "fmt" "net/url" "os" @@ -358,8 +359,7 @@ func (o *ReplaceOptions) forceReplace() error { if err != nil { return err } - - return wait.PollImmediate(1*time.Second, timeout, func() (bool, error) { + return wait.PollUntilContextTimeout(context.Background(), 1*time.Second, timeout, true, func(ctx context.Context) (bool, error) { if err := info.Get(); !errors.IsNotFound(err) { return false, err } diff --git a/staging/src/k8s.io/kubectl/pkg/scale/scale.go b/staging/src/k8s.io/kubectl/pkg/scale/scale.go index 04df94ff695..d2dc51fc69a 100644 --- a/staging/src/k8s.io/kubectl/pkg/scale/scale.go +++ b/staging/src/k8s.io/kubectl/pkg/scale/scale.go @@ -18,12 +18,13 @@ package scale import ( "context" + "errors" "fmt" "strconv" "time" autoscalingv1 "k8s.io/api/autoscaling/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -80,14 +81,14 @@ func NewRetryParams(interval, timeout time.Duration) *RetryParams { } // ScaleCondition is a closure around Scale that facilitates retries via util.wait -func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string, gvr schema.GroupVersionResource, dryRun bool) wait.ConditionFunc { - return func() (bool, error) { +func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string, gvr schema.GroupVersionResource, dryRun bool) wait.ConditionWithContextFunc { + return func(context.Context) (bool, error) { rv, err := r.ScaleSimple(namespace, name, precondition, count, gvr, dryRun) if updatedResourceVersion != nil { *updatedResourceVersion = rv } // Retry only on update conflicts. - if errors.IsConflict(err) { + if apierrors.IsConflict(err) { return false, nil } if err != nil { @@ -171,7 +172,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil, gvr, dryRun) - if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { + if err := wait.PollUntilContextTimeout(context.Background(), retry.Interval, retry.Timeout, true, cond); err != nil { return err } if waitForReplicas != nil { @@ -182,9 +183,9 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec // scaleHasDesiredReplicas returns a condition that will be true if and only if the desired replica // count for a scale (Spec) equals its updated replicas count (Status) -func scaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, desiredReplicas int32) wait.ConditionFunc { - return func() (bool, error) { - actualScale, err := sClient.Scales(namespace).Get(context.TODO(), gr, resourceName, metav1.GetOptions{}) +func scaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, desiredReplicas int32) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + actualScale, err := sClient.Scales(namespace).Get(ctx, gr, resourceName, metav1.GetOptions{}) if err != nil { return false, err } @@ -203,11 +204,9 @@ func WaitForScaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema. if waitForReplicas == nil { return fmt.Errorf("waitForReplicas parameter cannot be nil") } - err := wait.PollImmediate( - waitForReplicas.Interval, - waitForReplicas.Timeout, - scaleHasDesiredReplicas(sClient, gr, resourceName, namespace, int32(newSize))) - if err == wait.ErrWaitTimeout { + err := wait.PollUntilContextTimeout(context.Background(), waitForReplicas.Interval, waitForReplicas.Timeout, true, scaleHasDesiredReplicas(sClient, gr, resourceName, namespace, int32(newSize))) + + if errors.Is(err, context.DeadlineExceeded) { return fmt.Errorf("timed out waiting for %q to be synced", resourceName) } return err diff --git a/staging/src/k8s.io/kubectl/pkg/scale/scale_test.go b/staging/src/k8s.io/kubectl/pkg/scale/scale_test.go index f9c9720eb8b..ab3f37f8f3f 100644 --- a/staging/src/k8s.io/kubectl/pkg/scale/scale_test.go +++ b/staging/src/k8s.io/kubectl/pkg/scale/scale_test.go @@ -17,6 +17,7 @@ limitations under the License. package scale import ( + "context" "encoding/json" "fmt" "testing" @@ -69,7 +70,7 @@ func TestReplicationControllerScaleRetry(t *testing.T) { namespace := metav1.NamespaceDefault scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, rcgvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } @@ -78,7 +79,7 @@ func TestReplicationControllerScaleRetry(t *testing.T) { } preconditions := ScalePrecondition{3, ""} scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil, rcgvr, false) - _, err = scaleFunc() + _, err = scaleFunc(context.Background()) if err == nil { t.Errorf("Expected error on precondition failure") } @@ -105,7 +106,7 @@ func TestReplicationControllerScaleInvalid(t *testing.T) { namespace := "default" scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, rcgvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } @@ -179,7 +180,7 @@ func TestDeploymentScaleRetry(t *testing.T) { namespace := "default" scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, deploygvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } @@ -188,7 +189,7 @@ func TestDeploymentScaleRetry(t *testing.T) { } preconditions := &ScalePrecondition{3, ""} scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, deploygvr, false) - _, err = scaleFunc() + _, err = scaleFunc(context.Background()) if err == nil { t.Error("Expected error on precondition failure") } @@ -236,7 +237,7 @@ func TestDeploymentScaleInvalid(t *testing.T) { namespace := "default" scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, deploygvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } @@ -309,7 +310,7 @@ func TestStatefulSetScaleRetry(t *testing.T) { namespace := "default" scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, stsgvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } @@ -318,7 +319,7 @@ func TestStatefulSetScaleRetry(t *testing.T) { } preconditions := &ScalePrecondition{3, ""} scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, stsgvr, false) - _, err = scaleFunc() + _, err = scaleFunc(context.Background()) if err == nil { t.Error("Expected error on precondition failure") } @@ -345,7 +346,7 @@ func TestStatefulSetScaleInvalid(t *testing.T) { namespace := "default" scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, stsgvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } @@ -418,7 +419,7 @@ func TestReplicaSetScaleRetry(t *testing.T) { namespace := "default" scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, rsgvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } @@ -427,7 +428,7 @@ func TestReplicaSetScaleRetry(t *testing.T) { } preconditions := &ScalePrecondition{3, ""} scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, rsgvr, false) - _, err = scaleFunc() + _, err = scaleFunc(context.Background()) if err == nil { t.Error("Expected error on precondition failure") } @@ -454,7 +455,7 @@ func TestReplicaSetScaleInvalid(t *testing.T) { namespace := "default" scaleFunc := ScaleCondition(scaler, nil, namespace, name, count, nil, rsgvr, false) - pass, err := scaleFunc() + pass, err := scaleFunc(context.Background()) if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } diff --git a/test/utils/update_resources.go b/test/utils/update_resources.go index 96d30800f59..af5a59fc2f8 100644 --- a/test/utils/update_resources.go +++ b/test/utils/update_resources.go @@ -17,6 +17,7 @@ limitations under the License. package utils import ( + "context" "fmt" "time" @@ -35,9 +36,9 @@ const ( waitRetryTimeout = 5 * time.Minute ) -func RetryErrorCondition(condition wait.ConditionFunc) wait.ConditionFunc { - return func() (bool, error) { - done, err := condition() +func RetryErrorCondition(condition wait.ConditionWithContextFunc) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + done, err := condition(ctx) return done, err } } @@ -50,7 +51,7 @@ func ScaleResourceWithRetries(scalesGetter scaleclient.ScalesGetter, namespace, } waitForReplicas := scale.NewRetryParams(waitRetryInterval, waitRetryTimeout) cond := RetryErrorCondition(scale.ScaleCondition(scaler, preconditions, namespace, name, size, nil, gvr, false)) - err := wait.PollImmediate(updateRetryInterval, updateRetryTimeout, cond) + err := wait.PollUntilContextTimeout(context.Background(), updateRetryInterval, updateRetryTimeout, true, cond) if err == nil { err = scale.WaitForScaleHasDesiredReplicas(scalesGetter, gvr.GroupResource(), name, namespace, size, waitForReplicas) }