From 74f68dfbcef83754c9a586208cf81e9db78da2e3 Mon Sep 17 00:00:00 2001 From: Kenichi Omichi Date: Tue, 12 Nov 2019 18:38:00 +0000 Subject: [PATCH] Move functions from e2e/framework/util.go Part-4 This is the last PR which moves functions from e2e/framework/util.go - WaitForServiceWithSelector: Moved to e2e/cloud/gcp - WaitForStatefulSetReplicasReady: Moved to e2e/storage - WaitForRCToStabilize: Moved to e2e/kubectl - CheckInvariants: Moved to e2e/common - ContainerInitInvariant: Moved to e2e/common - DumpEventsInNamespace: Renamed to local function - WaitForDaemonSets: Moved to e2e/e2e.go --- test/e2e/BUILD | 1 + test/e2e/cloud/gcp/addon_update.go | 30 +++- test/e2e/common/init_container.go | 118 ++++++++++++++- test/e2e/e2e.go | 37 ++++- test/e2e/framework/BUILD | 1 - test/e2e/framework/util.go | 225 +---------------------------- test/e2e/kubectl/BUILD | 3 + test/e2e/kubectl/kubectl.go | 37 ++++- test/e2e/storage/regional_pd.go | 22 ++- 9 files changed, 242 insertions(+), 232 deletions(-) diff --git a/test/e2e/BUILD b/test/e2e/BUILD index e5ffc1ba5a4..fddc3b89fe8 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -57,6 +57,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/plugin/pkg/client/auth:go_default_library", diff --git a/test/e2e/cloud/gcp/addon_update.go b/test/e2e/cloud/gcp/addon_update.go index 18e71483edf..7be7e7f9f90 100644 --- a/test/e2e/cloud/gcp/addon_update.go +++ b/test/e2e/cloud/gcp/addon_update.go @@ -31,6 +31,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" + testutils "k8s.io/kubernetes/test/utils" imageutils "k8s.io/kubernetes/test/utils/image" "github.com/onsi/ginkgo" @@ -350,7 +351,7 @@ func waitForReplicationControllerInAddonTest(c clientset.Interface, addonNamespa } func waitForServicewithSelectorInAddonTest(c clientset.Interface, addonNamespace string, exist bool, selector labels.Selector) { - framework.ExpectNoError(framework.WaitForServiceWithSelector(c, addonNamespace, selector, exist, addonTestPollInterval, addonTestPollTimeout)) + framework.ExpectNoError(waitForServiceWithSelector(c, addonNamespace, selector, exist, addonTestPollInterval, addonTestPollTimeout)) } func waitForReplicationControllerwithSelectorInAddonTest(c clientset.Interface, addonNamespace string, exist bool, selector labels.Selector) { @@ -376,6 +377,33 @@ func waitForReplicationController(c clientset.Interface, namespace, name string, return nil } +// waitForServiceWithSelector waits until any service with given selector appears (exist == true), or disappears (exist == false) +func waitForServiceWithSelector(c clientset.Interface, namespace string, selector labels.Selector, exist bool, interval, + timeout time.Duration) error { + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + services, err := c.CoreV1().Services(namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + switch { + case len(services.Items) != 0: + framework.Logf("Service with %s in namespace %s found.", selector.String(), namespace) + return exist, nil + case len(services.Items) == 0: + framework.Logf("Service with %s in namespace %s disappeared.", selector.String(), namespace) + return !exist, nil + case !testutils.IsRetryableAPIError(err): + framework.Logf("Non-retryable failure while listing service.") + return false, err + default: + framework.Logf("List service with %s in namespace %s failed: %v", selector.String(), namespace, err) + return false, nil + } + }) + if err != nil { + stateMsg := map[bool]string{true: "to appear", false: "to disappear"} + return fmt.Errorf("error waiting for service with %s in namespace %s %s: %v", selector.String(), namespace, stateMsg[exist], err) + } + return nil +} + // waitForReplicationControllerWithSelector waits until any RC with given selector appears (exist == true), or disappears (exist == false) func waitForReplicationControllerWithSelector(c clientset.Interface, namespace string, selector labels.Selector, exist bool, interval, timeout time.Duration) error { diff --git a/test/e2e/common/init_container.go b/test/e2e/common/init_container.go index eff5ba4cb3e..f6f5b26eb71 100644 --- a/test/e2e/common/init_container.go +++ b/test/e2e/common/init_container.go @@ -20,11 +20,14 @@ import ( "context" "fmt" "strconv" + "strings" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/watch" watchtools "k8s.io/client-go/tools/watch" @@ -37,6 +40,113 @@ import ( "github.com/onsi/gomega" ) +// invariantFunc is a func that checks for invariant. +type invariantFunc func(older, newer runtime.Object) error + +// checkInvariants checks for invariant of the each events. +func checkInvariants(events []watch.Event, fns ...invariantFunc) error { + errs := sets.NewString() + for i := range events { + j := i + 1 + if j >= len(events) { + continue + } + for _, fn := range fns { + if err := fn(events[i].Object, events[j].Object); err != nil { + errs.Insert(err.Error()) + } + } + } + if errs.Len() > 0 { + return fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* ")) + } + return nil +} + +// containerInitInvariant checks for an init containers are initialized and invariant on both older and newer. +func containerInitInvariant(older, newer runtime.Object) error { + oldPod := older.(*v1.Pod) + newPod := newer.(*v1.Pod) + if len(oldPod.Spec.InitContainers) == 0 { + return nil + } + if len(oldPod.Spec.InitContainers) != len(newPod.Spec.InitContainers) { + return fmt.Errorf("init container list changed") + } + if oldPod.UID != newPod.UID { + return fmt.Errorf("two different pods exist in the condition: %s vs %s", oldPod.UID, newPod.UID) + } + if err := initContainersInvariants(oldPod); err != nil { + return err + } + if err := initContainersInvariants(newPod); err != nil { + return err + } + oldInit, _, _ := initialized(oldPod) + newInit, _, _ := initialized(newPod) + if oldInit && !newInit { + // TODO: we may in the future enable resetting initialized = false if the kubelet needs to restart it + // from scratch + return fmt.Errorf("pod cannot be initialized and then regress to not being initialized") + } + return nil +} + +// initialized checks the state of all init containers in the pod. +func initialized(pod *v1.Pod) (ok bool, failed bool, err error) { + allInit := true + initFailed := false + for _, s := range pod.Status.InitContainerStatuses { + switch { + case initFailed && s.State.Waiting == nil: + return allInit, initFailed, fmt.Errorf("container %s is after a failed container but isn't waiting", s.Name) + case allInit && s.State.Waiting == nil: + return allInit, initFailed, fmt.Errorf("container %s is after an initializing container but isn't waiting", s.Name) + case s.State.Terminated == nil: + allInit = false + case s.State.Terminated.ExitCode != 0: + allInit = false + initFailed = true + case !s.Ready: + return allInit, initFailed, fmt.Errorf("container %s initialized but isn't marked as ready", s.Name) + } + } + return allInit, initFailed, nil +} + +func initContainersInvariants(pod *v1.Pod) error { + allInit, initFailed, err := initialized(pod) + if err != nil { + return err + } + if !allInit || initFailed { + for _, s := range pod.Status.ContainerStatuses { + if s.State.Waiting == nil || s.RestartCount != 0 { + return fmt.Errorf("container %s is not waiting but initialization not complete", s.Name) + } + if s.State.Waiting.Reason != "PodInitializing" { + return fmt.Errorf("container %s should have reason PodInitializing: %s", s.Name, s.State.Waiting.Reason) + } + } + } + _, c := podutil.GetPodCondition(&pod.Status, v1.PodInitialized) + if c == nil { + return fmt.Errorf("pod does not have initialized condition") + } + if c.LastTransitionTime.IsZero() { + return fmt.Errorf("PodInitialized condition should always have a transition time") + } + switch { + case c.Status == v1.ConditionUnknown: + return fmt.Errorf("PodInitialized condition should never be Unknown") + case c.Status == v1.ConditionTrue && (initFailed || !allInit): + return fmt.Errorf("PodInitialized condition was True but all not all containers initialized") + case c.Status == v1.ConditionFalse && (!initFailed && allInit): + return fmt.Errorf("PodInitialized condition was False but all containers initialized") + } + return nil +} + var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { f := framework.NewDefaultFramework("init-container") var podClient *framework.PodClient @@ -96,7 +206,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { defer cancel() event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodCompleted) gomega.Expect(err).To(gomega.BeNil()) - framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant) + checkInvariants(wr.Events(), containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodSucceeded) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) @@ -167,7 +277,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { defer cancel() event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodRunning) gomega.Expect(err).To(gomega.BeNil()) - framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant) + checkInvariants(wr.Events(), containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodRunning) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) @@ -289,7 +399,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { }, ) gomega.Expect(err).To(gomega.BeNil()) - framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant) + checkInvariants(wr.Events(), containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodPending) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) @@ -398,7 +508,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { conditions.PodCompleted, ) gomega.Expect(err).To(gomega.BeNil()) - framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant) + checkInvariants(wr.Events(), containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodFailed) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index bbecc6ce55e..6046ce4573e 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -33,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtimeutils "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/component-base/logs" "k8s.io/component-base/version" commontest "k8s.io/kubernetes/test/e2e/common" @@ -160,6 +161,40 @@ func getDefaultClusterIPFamily(c clientset.Interface) string { return "ipv4" } +// waitForDaemonSets for all daemonsets in the given namespace to be ready +// (defined as all but 'allowedNotReadyNodes' pods associated with that +// daemonset are ready). +func waitForDaemonSets(c clientset.Interface, ns string, allowedNotReadyNodes int32, timeout time.Duration) error { + start := time.Now() + framework.Logf("Waiting up to %v for all daemonsets in namespace '%s' to start", + timeout, ns) + + return wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { + dsList, err := c.AppsV1().DaemonSets(ns).List(metav1.ListOptions{}) + if err != nil { + framework.Logf("Error getting daemonsets in namespace: '%s': %v", ns, err) + if testutils.IsRetryableAPIError(err) { + return false, nil + } + return false, err + } + var notReadyDaemonSets []string + for _, ds := range dsList.Items { + framework.Logf("%d / %d pods ready in namespace '%s' in daemonset '%s' (%d seconds elapsed)", ds.Status.NumberReady, ds.Status.DesiredNumberScheduled, ns, ds.ObjectMeta.Name, int(time.Since(start).Seconds())) + if ds.Status.DesiredNumberScheduled-ds.Status.NumberReady > allowedNotReadyNodes { + notReadyDaemonSets = append(notReadyDaemonSets, ds.ObjectMeta.Name) + } + } + + if len(notReadyDaemonSets) > 0 { + framework.Logf("there are not ready daemonsets: %v", notReadyDaemonSets) + return false, nil + } + + return true, nil + }) +} + // setupSuite is the boilerplate that can be used to setup ginkgo test suites, on the SynchronizedBeforeSuite step. // There are certain operations we only want to run once per overall test invocation // (such as deleting old namespaces, or verifying that all system pods are running. @@ -229,7 +264,7 @@ func setupSuite() { framework.Failf("Error waiting for all pods to be running and ready: %v", err) } - if err := framework.WaitForDaemonSets(c, metav1.NamespaceSystem, int32(framework.TestContext.AllowedNotReadyNodes), framework.TestContext.SystemDaemonsetStartupTimeout); err != nil { + if err := waitForDaemonSets(c, metav1.NamespaceSystem, int32(framework.TestContext.AllowedNotReadyNodes), framework.TestContext.SystemDaemonsetStartupTimeout); err != nil { framework.Logf("WARNING: Waiting for all daemonsets to be ready failed: %v", err) } diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 767dfabd131..8250bebc630 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -57,7 +57,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library", diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 75ec0681007..ea07a9f0d19 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -57,7 +57,6 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" utilyaml "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -252,40 +251,6 @@ func NodeOSDistroIs(supportedNodeOsDistros ...string) bool { return false } -// WaitForDaemonSets for all daemonsets in the given namespace to be ready -// (defined as all but 'allowedNotReadyNodes' pods associated with that -// daemonset are ready). -func WaitForDaemonSets(c clientset.Interface, ns string, allowedNotReadyNodes int32, timeout time.Duration) error { - start := time.Now() - Logf("Waiting up to %v for all daemonsets in namespace '%s' to start", - timeout, ns) - - return wait.PollImmediate(Poll, timeout, func() (bool, error) { - dsList, err := c.AppsV1().DaemonSets(ns).List(metav1.ListOptions{}) - if err != nil { - Logf("Error getting daemonsets in namespace: '%s': %v", ns, err) - if testutils.IsRetryableAPIError(err) { - return false, nil - } - return false, err - } - var notReadyDaemonSets []string - for _, ds := range dsList.Items { - Logf("%d / %d pods ready in namespace '%s' in daemonset '%s' (%d seconds elapsed)", ds.Status.NumberReady, ds.Status.DesiredNumberScheduled, ns, ds.ObjectMeta.Name, int(time.Since(start).Seconds())) - if ds.Status.DesiredNumberScheduled-ds.Status.NumberReady > allowedNotReadyNodes { - notReadyDaemonSets = append(notReadyDaemonSets, ds.ObjectMeta.Name) - } - } - - if len(notReadyDaemonSets) > 0 { - Logf("there are not ready daemonsets: %v", notReadyDaemonSets) - return false, nil - } - - return true, nil - }) -} - func kubectlLogPod(c clientset.Interface, pod v1.Pod, containerNameSubstr string, logFunc func(ftm string, args ...interface{})) { for _, container := range pod.Spec.Containers { if strings.Contains(container.Name, containerNameSubstr) { @@ -402,24 +367,6 @@ func WaitForDefaultServiceAccountInNamespace(c clientset.Interface, namespace st return waitForServiceAccountInNamespace(c, namespace, "default", ServiceAccountProvisionTimeout) } -// WaitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first. -func WaitForStatefulSetReplicasReady(statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error { - Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName) - for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) { - sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{}) - if err != nil { - Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err) - continue - } - if sts.Status.ReadyReplicas == *sts.Spec.Replicas { - Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start)) - return nil - } - Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas) - } - return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout) -} - // WaitForPersistentVolumeDeleted waits for a PersistentVolume to get deleted or until timeout occurs, whichever comes first. func WaitForPersistentVolumeDeleted(c clientset.Interface, pvName string, Poll, timeout time.Duration) error { Logf("Waiting up to %v for PersistentVolume %s to get deleted", timeout, pvName) @@ -545,123 +492,6 @@ func CheckTestingNSDeletedExcept(c clientset.Interface, skip string) error { return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out") } -// ContainerInitInvariant checks for an init containers are initialized and invariant on both older and newer. -func ContainerInitInvariant(older, newer runtime.Object) error { - oldPod := older.(*v1.Pod) - newPod := newer.(*v1.Pod) - if len(oldPod.Spec.InitContainers) == 0 { - return nil - } - if len(oldPod.Spec.InitContainers) != len(newPod.Spec.InitContainers) { - return fmt.Errorf("init container list changed") - } - if oldPod.UID != newPod.UID { - return fmt.Errorf("two different pods exist in the condition: %s vs %s", oldPod.UID, newPod.UID) - } - if err := initContainersInvariants(oldPod); err != nil { - return err - } - if err := initContainersInvariants(newPod); err != nil { - return err - } - oldInit, _, _ := initialized(oldPod) - newInit, _, _ := initialized(newPod) - if oldInit && !newInit { - // TODO: we may in the future enable resetting initialized = false if the kubelet needs to restart it - // from scratch - return fmt.Errorf("pod cannot be initialized and then regress to not being initialized") - } - return nil -} - -func initContainersInvariants(pod *v1.Pod) error { - allInit, initFailed, err := initialized(pod) - if err != nil { - return err - } - if !allInit || initFailed { - for _, s := range pod.Status.ContainerStatuses { - if s.State.Waiting == nil || s.RestartCount != 0 { - return fmt.Errorf("container %s is not waiting but initialization not complete", s.Name) - } - if s.State.Waiting.Reason != "PodInitializing" { - return fmt.Errorf("container %s should have reason PodInitializing: %s", s.Name, s.State.Waiting.Reason) - } - } - } - _, c := podutil.GetPodCondition(&pod.Status, v1.PodInitialized) - if c == nil { - return fmt.Errorf("pod does not have initialized condition") - } - if c.LastTransitionTime.IsZero() { - return fmt.Errorf("PodInitialized condition should always have a transition time") - } - switch { - case c.Status == v1.ConditionUnknown: - return fmt.Errorf("PodInitialized condition should never be Unknown") - case c.Status == v1.ConditionTrue && (initFailed || !allInit): - return fmt.Errorf("PodInitialized condition was True but all not all containers initialized") - case c.Status == v1.ConditionFalse && (!initFailed && allInit): - return fmt.Errorf("PodInitialized condition was False but all containers initialized") - } - return nil -} - -// InvariantFunc is a func that checks for invariant. -type InvariantFunc func(older, newer runtime.Object) error - -// CheckInvariants checks for invariant of the each events. -func CheckInvariants(events []watch.Event, fns ...InvariantFunc) error { - errs := sets.NewString() - for i := range events { - j := i + 1 - if j >= len(events) { - continue - } - for _, fn := range fns { - if err := fn(events[i].Object, events[j].Object); err != nil { - errs.Insert(err.Error()) - } - } - } - if errs.Len() > 0 { - return fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* ")) - } - return nil -} - -// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status. -func WaitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.Duration) error { - options := metav1.ListOptions{FieldSelector: fields.Set{ - "metadata.name": name, - "metadata.namespace": ns, - }.AsSelector().String()} - w, err := c.CoreV1().ReplicationControllers(ns).Watch(options) - if err != nil { - return err - } - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() - _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) { - switch event.Type { - case watch.Deleted: - return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "") - } - switch rc := event.Object.(type) { - case *v1.ReplicationController: - if rc.Name == name && rc.Namespace == ns && - rc.Generation <= rc.Status.ObservedGeneration && - *(rc.Spec.Replicas) == rc.Status.Replicas { - return true, nil - } - Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d", - name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas) - } - return false, nil - }) - return err -} - // WaitForService waits until the service appears (exist == true), or disappears (exist == false) func WaitForService(c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error { err := wait.PollImmediate(interval, timeout, func() (bool, error) { @@ -688,33 +518,6 @@ func WaitForService(c clientset.Interface, namespace, name string, exist bool, i return nil } -// WaitForServiceWithSelector waits until any service with given selector appears (exist == true), or disappears (exist == false) -func WaitForServiceWithSelector(c clientset.Interface, namespace string, selector labels.Selector, exist bool, interval, - timeout time.Duration) error { - err := wait.PollImmediate(interval, timeout, func() (bool, error) { - services, err := c.CoreV1().Services(namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) - switch { - case len(services.Items) != 0: - Logf("Service with %s in namespace %s found.", selector.String(), namespace) - return exist, nil - case len(services.Items) == 0: - Logf("Service with %s in namespace %s disappeared.", selector.String(), namespace) - return !exist, nil - case !testutils.IsRetryableAPIError(err): - Logf("Non-retryable failure while listing service.") - return false, err - default: - Logf("List service with %s in namespace %s failed: %v", selector.String(), namespace, err) - return false, nil - } - }) - if err != nil { - stateMsg := map[bool]string{true: "to appear", false: "to disappear"} - return fmt.Errorf("error waiting for service with %s in namespace %s %s: %v", selector.String(), namespace, stateMsg[exist], err) - } - return nil -} - //WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum. func WaitForServiceEndpointsNum(c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error { return wait.Poll(interval, timeout, func() (bool, error) { @@ -1159,8 +962,8 @@ func (f *Framework) MatchContainerOutput( // EventsLister is a func that lists events. type EventsLister func(opts metav1.ListOptions, ns string) (*v1.EventList, error) -// DumpEventsInNamespace dumps events in the given namespace. -func DumpEventsInNamespace(eventsLister EventsLister, namespace string) { +// dumpEventsInNamespace dumps events in the given namespace. +func dumpEventsInNamespace(eventsLister EventsLister, namespace string) { ginkgo.By(fmt.Sprintf("Collecting events from namespace %q.", namespace)) events, err := eventsLister(metav1.ListOptions{}, namespace) ExpectNoError(err, "failed to list events in namespace %q", namespace) @@ -1181,7 +984,7 @@ func DumpEventsInNamespace(eventsLister EventsLister, namespace string) { // DumpAllNamespaceInfo dumps events, pods and nodes information in the given namespace. func DumpAllNamespaceInfo(c clientset.Interface, namespace string) { - DumpEventsInNamespace(func(opts metav1.ListOptions, ns string) (*v1.EventList, error) { + dumpEventsInNamespace(func(opts metav1.ListOptions, ns string) (*v1.EventList, error) { return c.CoreV1().Events(ns).List(opts) }, namespace) @@ -2223,25 +2026,3 @@ func PrettyPrintJSON(metrics interface{}) string { } return string(formatted.Bytes()) } - -// initialized checks the state of all init containers in the pod. -func initialized(pod *v1.Pod) (ok bool, failed bool, err error) { - allInit := true - initFailed := false - for _, s := range pod.Status.InitContainerStatuses { - switch { - case initFailed && s.State.Waiting == nil: - return allInit, initFailed, fmt.Errorf("container %s is after a failed container but isn't waiting", s.Name) - case allInit && s.State.Waiting == nil: - return allInit, initFailed, fmt.Errorf("container %s is after an initializing container but isn't waiting", s.Name) - case s.State.Terminated == nil: - allInit = false - case s.State.Terminated.ExitCode != 0: - allInit = false - initFailed = true - case !s.Ready: - return allInit, initFailed, fmt.Errorf("container %s initialized but isn't marked as ready", s.Name) - } - } - return allInit, initFailed, nil -} diff --git a/test/e2e/kubectl/BUILD b/test/e2e/kubectl/BUILD index f9c20465f7c..32717ed6a20 100644 --- a/test/e2e/kubectl/BUILD +++ b/test/e2e/kubectl/BUILD @@ -23,16 +23,19 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/kubectl/pkg/polymorphichelpers:go_default_library", "//test/e2e/common:go_default_library", "//test/e2e/framework:go_default_library", diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 2e89d326af1..3963f85ce86 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -50,16 +50,19 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/serviceaccount" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubernetes/pkg/controller" commonutils "k8s.io/kubernetes/test/e2e/common" @@ -1690,7 +1693,7 @@ metadata: if checkContainersImage(containers, httpdImage) { framework.Failf("Failed creating rc %s for 1 pod with expected image %s", rcName, httpdImage) } - framework.WaitForRCToStabilize(c, ns, rcName, framework.PodStartTimeout) + waitForRCToStabilize(c, ns, rcName, framework.PodStartTimeout) ginkgo.By("rolling-update to same image controller") @@ -2606,3 +2609,35 @@ func createObjValidateOutputAndCleanup(client dynamic.ResourceInterface, obj *un framework.ExpectNotEqual(fields, []string{"NAME", "AGE"}, fmt.Sprintf("expected non-default fields for resource: %s", resource.Name)) } } + +// waitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status. +func waitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.Duration) error { + options := metav1.ListOptions{FieldSelector: fields.Set{ + "metadata.name": name, + "metadata.namespace": ns, + }.AsSelector().String()} + w, err := c.CoreV1().ReplicationControllers(ns).Watch(options) + if err != nil { + return err + } + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "") + } + switch rc := event.Object.(type) { + case *v1.ReplicationController: + if rc.Name == name && rc.Namespace == ns && + rc.Generation <= rc.Status.ObservedGeneration && + *(rc.Spec.Replicas) == rc.Status.Replicas { + return true, nil + } + framework.Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d", + name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas) + } + return false, nil + }) + return err +} diff --git a/test/e2e/storage/regional_pd.go b/test/e2e/storage/regional_pd.go index cce22b459b5..1d49e81d202 100644 --- a/test/e2e/storage/regional_pd.go +++ b/test/e2e/storage/regional_pd.go @@ -214,7 +214,7 @@ func testZonalFailover(c clientset.Interface, ns string) { } }() - err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout) + err = waitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout) if err != nil { pod := getPod(c, ns, regionalPDLabels) gomega.Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(gomega.BeTrue(), @@ -266,7 +266,7 @@ func testZonalFailover(c clientset.Interface, ns string) { }) framework.ExpectNoError(err, "Error waiting for pod to be scheduled in a different zone (%q): %v", otherZone, err) - err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout) + err = waitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout) if err != nil { pod := getPod(c, ns, regionalPDLabels) gomega.Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(gomega.BeTrue(), @@ -629,3 +629,21 @@ func checkZonesFromLabelAndAffinity(pv *v1.PersistentVolume, zones sets.String, } } } + +// waitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first. +func waitForStatefulSetReplicasReady(statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error { + framework.Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) { + sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{}) + if err != nil { + framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err) + continue + } + if sts.Status.ReadyReplicas == *sts.Spec.Replicas { + framework.Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start)) + return nil + } + framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas) + } + return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout) +}