diff --git a/test/e2e/autoscaling/horizontal_pod_autoscaling_behavior.go b/test/e2e/autoscaling/horizontal_pod_autoscaling_behavior.go index 518a26604f0..d457af524c2 100644 --- a/test/e2e/autoscaling/horizontal_pod_autoscaling_behavior.go +++ b/test/e2e/autoscaling/horizontal_pod_autoscaling_behavior.go @@ -178,7 +178,8 @@ var _ = SIGDescribe(feature.HPA, framework.WithSerial(), framework.WithSlow(), " gomega.Expect(timeWaited).To(gomega.BeNumerically(">", waitDeadline), "waited %s, wanted to wait more than %s", timeWaited, waitDeadline) ginkgo.By("verifying number of replicas") - replicas := rc.GetReplicas(ctx) + replicas, err := rc.GetReplicas(ctx) + framework.ExpectNoError(err) gomega.Expect(replicas).To(gomega.BeNumerically("==", initPods), "had %s replicas, still have %s replicas after time deadline", initPods, replicas) }) @@ -214,7 +215,8 @@ var _ = SIGDescribe(feature.HPA, framework.WithSerial(), framework.WithSlow(), " gomega.Expect(timeWaited).To(gomega.BeNumerically(">", waitDeadline), "waited %s, wanted to wait more than %s", timeWaited, waitDeadline) ginkgo.By("verifying number of replicas") - replicas := rc.GetReplicas(ctx) + replicas, err := rc.GetReplicas(ctx) + framework.ExpectNoError(err) gomega.Expect(replicas).To(gomega.BeNumerically("==", initPods), "had %s replicas, still have %s replicas after time deadline", initPods, replicas) }) diff --git a/test/e2e/framework/autoscaling/autoscaling_utils.go b/test/e2e/framework/autoscaling/autoscaling_utils.go index 68f884b8272..00a249547c4 100644 --- a/test/e2e/framework/autoscaling/autoscaling_utils.go +++ b/test/e2e/framework/autoscaling/autoscaling_utils.go @@ -29,14 +29,12 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" crdclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apiextensions-apiserver/test/integration/fixtures" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" scaleclient "k8s.io/client-go/scale" @@ -66,10 +64,6 @@ const ( timeoutRC = 120 * time.Second startServiceTimeout = time.Minute startServiceInterval = 5 * time.Second - rcIsNil = "ERROR: replicationController = nil" - deploymentIsNil = "ERROR: deployment = nil" - rsIsNil = "ERROR: replicaset = nil" - crdIsNil = "ERROR: CRD = nil" invalidKind = "ERROR: invalid workload kind for resource consumer" customMetricName = "QPS" serviceInitializationTimeout = 2 * time.Minute @@ -355,9 +349,11 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric(ctx context.Context) { } func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicores int) { - err := wait.PollUntilContextTimeout(ctx, serviceInitializationInterval, serviceInitializationTimeout, true, func(ctx context.Context) (bool, error) { + err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error { proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) - framework.ExpectNoError(err) + if err != nil { + return err + } req := proxyRequest.Namespace(rc.nsName). Name(rc.controllerName). Suffix("ConsumeCPU"). @@ -368,10 +364,10 @@ func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicore _, err = req.DoRaw(ctx) if err != nil { framework.Logf("ConsumeCPU failure: %v", err) - return false, nil + return err } - return true, nil - }) + return nil + }).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed()) // Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout // which is a side-effect to context cancelling from the cleanup task. @@ -384,9 +380,11 @@ func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicore // sendConsumeMemRequest sends POST request for memory consumption func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes int) { - err := wait.PollUntilContextTimeout(ctx, serviceInitializationInterval, serviceInitializationTimeout, true, func(ctx context.Context) (bool, error) { + err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error { proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) - framework.ExpectNoError(err) + if err != nil { + return err + } req := proxyRequest.Namespace(rc.nsName). Name(rc.controllerName). Suffix("ConsumeMem"). @@ -397,10 +395,10 @@ func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes _, err = req.DoRaw(ctx) if err != nil { framework.Logf("ConsumeMem failure: %v", err) - return false, nil + return err } - return true, nil - }) + return nil + }).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed()) // Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout // which is a side-effect to context cancelling from the cleanup task. @@ -413,9 +411,11 @@ func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes // sendConsumeCustomMetric sends POST request for custom metric consumption func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta int) { - err := wait.PollUntilContextTimeout(ctx, serviceInitializationInterval, serviceInitializationTimeout, true, func(ctx context.Context) (bool, error) { + err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error { proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) - framework.ExpectNoError(err) + if err != nil { + return err + } req := proxyRequest.Namespace(rc.nsName). Name(rc.controllerName). Suffix("BumpMetric"). @@ -427,10 +427,10 @@ func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta i _, err = req.DoRaw(ctx) if err != nil { framework.Logf("ConsumeCustomMetric failure: %v", err) - return false, nil + return err } - return true, nil - }) + return nil + }).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed()) // Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout // which is a side-effect to context cancelling from the cleanup task. @@ -442,50 +442,54 @@ func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta i } // GetReplicas get the replicas -func (rc *ResourceConsumer) GetReplicas(ctx context.Context) int { +func (rc *ResourceConsumer) GetReplicas(ctx context.Context) (int, error) { switch rc.kind { case KindRC: replicationController, err := rc.clientSet.CoreV1().ReplicationControllers(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) - framework.ExpectNoError(err) - if replicationController == nil { - framework.Failf(rcIsNil) + if err != nil { + return 0, err } - return int(replicationController.Status.ReadyReplicas) + return int(replicationController.Status.ReadyReplicas), nil case KindDeployment: deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) - framework.ExpectNoError(err) - if deployment == nil { - framework.Failf(deploymentIsNil) + if err != nil { + return 0, err } - return int(deployment.Status.ReadyReplicas) + return int(deployment.Status.ReadyReplicas), nil case KindReplicaSet: rs, err := rc.clientSet.AppsV1().ReplicaSets(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) - framework.ExpectNoError(err) - if rs == nil { - framework.Failf(rsIsNil) + if err != nil { + return 0, err } - return int(rs.Status.ReadyReplicas) + return int(rs.Status.ReadyReplicas), nil case KindCRD: deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) - framework.ExpectNoError(err) - if deployment == nil { - framework.Failf(deploymentIsNil) + if err != nil { + return 0, err } deploymentReplicas := int64(deployment.Status.ReadyReplicas) scale, err := rc.scaleClient.Scales(rc.nsName).Get(ctx, schema.GroupResource{Group: crdGroup, Resource: crdNamePlural}, rc.name, metav1.GetOptions{}) - framework.ExpectNoError(err) + if err != nil { + return 0, err + } crdInstance, err := rc.resourceClient.Get(ctx, rc.name, metav1.GetOptions{}) - framework.ExpectNoError(err) + if err != nil { + return 0, err + } // Update custom resource's status.replicas with child Deployment's current number of ready replicas. - framework.ExpectNoError(unstructured.SetNestedField(crdInstance.Object, deploymentReplicas, "status", "replicas")) + err = unstructured.SetNestedField(crdInstance.Object, deploymentReplicas, "status", "replicas") + if err != nil { + return 0, err + } _, err = rc.resourceClient.Update(ctx, crdInstance, metav1.UpdateOptions{}) - framework.ExpectNoError(err) - return int(scale.Spec.Replicas) + if err != nil { + return 0, err + } + return int(scale.Spec.Replicas), nil default: - framework.Failf(invalidKind) + return 0, fmt.Errorf(invalidKind) } - return 0 } // GetHpa get the corresponding horizontalPodAutoscaler object @@ -496,20 +500,21 @@ func (rc *ResourceConsumer) GetHpa(ctx context.Context, name string) (*autoscali // WaitForReplicas wait for the desired replicas func (rc *ResourceConsumer) WaitForReplicas(ctx context.Context, desiredReplicas int, duration time.Duration) { interval := 20 * time.Second - err := wait.PollUntilContextTimeout(ctx, interval, duration, true, func(ctx context.Context) (bool, error) { - replicas := rc.GetReplicas(ctx) - framework.Logf("waiting for %d replicas (current: %d)", desiredReplicas, replicas) - return replicas == desiredReplicas, nil // Expected number of replicas found. Exit. - }) + err := framework.Gomega().Eventually(ctx, framework.HandleRetry(rc.GetReplicas)). + WithTimeout(duration). + WithPolling(interval). + Should(gomega.Equal(desiredReplicas)) + framework.ExpectNoErrorWithOffset(1, err, "timeout waiting %v for %d replicas", duration, desiredReplicas) } // EnsureDesiredReplicasInRange ensure the replicas is in a desired range func (rc *ResourceConsumer) EnsureDesiredReplicasInRange(ctx context.Context, minDesiredReplicas, maxDesiredReplicas int, duration time.Duration, hpaName string) { interval := 10 * time.Second - desiredReplicasErr := framework.Gomega().Consistently(ctx, func(ctx context.Context) int { - return rc.GetReplicas(ctx) - }).WithTimeout(duration).WithPolling(interval).Should(gomega.And(gomega.BeNumerically(">=", minDesiredReplicas), gomega.BeNumerically("<=", maxDesiredReplicas))) + desiredReplicasErr := framework.Gomega().Consistently(ctx, framework.HandleRetry(rc.GetReplicas)). + WithTimeout(duration). + WithPolling(interval). + Should(gomega.And(gomega.BeNumerically(">=", minDesiredReplicas), gomega.BeNumerically("<=", maxDesiredReplicas))) // dump HPA for debugging as, err := rc.GetHpa(ctx, hpaName) @@ -593,7 +598,7 @@ func runServiceAndSidecarForResourceConsumer(ctx context.Context, c clientset.In _, err := createService(ctx, c, sidecarName, ns, serviceAnnotations, serviceSelectors, port, sidecarTargetPort) framework.ExpectNoError(err) - ginkgo.By(fmt.Sprintf("Running controller for sidecar")) + ginkgo.By("Running controller for sidecar") controllerName := sidecarName + "-ctrl" _, err = createService(ctx, c, controllerName, ns, map[string]string{}, map[string]string{"name": controllerName}, port, targetPort) framework.ExpectNoError(err) @@ -954,32 +959,24 @@ func CreateCustomResourceDefinition(ctx context.Context, c crdclientset.Interfac crd, err = c.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crdSchema, metav1.CreateOptions{}) framework.ExpectNoError(err) // Wait until just created CRD appears in discovery. - err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) { - return ExistsInDiscovery(crd, c, "v1") - }) + err = framework.Gomega().Eventually(ctx, framework.RetryNotFound(framework.HandleRetry(func(ctx context.Context) (*metav1.APIResourceList, error) { + return c.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + "v1") + }))).Should(framework.MakeMatcher(func(actual *metav1.APIResourceList) (func() string, error) { + for _, g := range actual.APIResources { + if g.Name == crd.Spec.Names.Plural { + return nil, nil + } + } + return func() string { + return fmt.Sprintf("CRD %s not found in discovery", crd.Spec.Names.Plural) + }, nil + })) framework.ExpectNoError(err) ginkgo.By(fmt.Sprintf("Successfully created Custom Resource Definition: %v", crd)) } return crd } -func ExistsInDiscovery(crd *apiextensionsv1.CustomResourceDefinition, apiExtensionsClient crdclientset.Interface, version string) (bool, error) { - groupResource, err := apiExtensionsClient.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + version) - if err != nil { - // Ignore 404 errors as it means the resources doesn't exist - if apierrors.IsNotFound(err) { - return false, nil - } - return false, err - } - for _, g := range groupResource.APIResources { - if g.Name == crd.Spec.Names.Plural { - return true, nil - } - } - return false, nil -} - func CreateCustomSubresourceInstance(ctx context.Context, namespace, name string, client dynamic.ResourceInterface, definition *apiextensionsv1.CustomResourceDefinition) (*unstructured.Unstructured, error) { instance := &unstructured.Unstructured{ Object: map[string]interface{}{