Move functions from e2e/framework/util.go Part-4

This is the last PR which moves functions from e2e/framework/util.go

- WaitForServiceWithSelector: Moved to e2e/cloud/gcp
- WaitForStatefulSetReplicasReady: Moved to e2e/storage
- WaitForRCToStabilize: Moved to e2e/kubectl
- CheckInvariants: Moved to e2e/common
- ContainerInitInvariant: Moved to e2e/common
- DumpEventsInNamespace: Renamed to local function
- WaitForDaemonSets: Moved to e2e/e2e.go
This commit is contained in:
Kenichi Omichi 2019-11-12 18:38:00 +00:00
parent 402e551ca2
commit 74f68dfbce
9 changed files with 242 additions and 232 deletions

View File

@ -57,6 +57,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/plugin/pkg/client/auth:go_default_library",

View File

@ -31,6 +31,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo"
@ -350,7 +351,7 @@ func waitForReplicationControllerInAddonTest(c clientset.Interface, addonNamespa
}
func waitForServicewithSelectorInAddonTest(c clientset.Interface, addonNamespace string, exist bool, selector labels.Selector) {
framework.ExpectNoError(framework.WaitForServiceWithSelector(c, addonNamespace, selector, exist, addonTestPollInterval, addonTestPollTimeout))
framework.ExpectNoError(waitForServiceWithSelector(c, addonNamespace, selector, exist, addonTestPollInterval, addonTestPollTimeout))
}
func waitForReplicationControllerwithSelectorInAddonTest(c clientset.Interface, addonNamespace string, exist bool, selector labels.Selector) {
@ -376,6 +377,33 @@ func waitForReplicationController(c clientset.Interface, namespace, name string,
return nil
}
// waitForServiceWithSelector waits until any service with given selector appears (exist == true), or disappears (exist == false)
func waitForServiceWithSelector(c clientset.Interface, namespace string, selector labels.Selector, exist bool, interval,
timeout time.Duration) error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
services, err := c.CoreV1().Services(namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
switch {
case len(services.Items) != 0:
framework.Logf("Service with %s in namespace %s found.", selector.String(), namespace)
return exist, nil
case len(services.Items) == 0:
framework.Logf("Service with %s in namespace %s disappeared.", selector.String(), namespace)
return !exist, nil
case !testutils.IsRetryableAPIError(err):
framework.Logf("Non-retryable failure while listing service.")
return false, err
default:
framework.Logf("List service with %s in namespace %s failed: %v", selector.String(), namespace, err)
return false, nil
}
})
if err != nil {
stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
return fmt.Errorf("error waiting for service with %s in namespace %s %s: %v", selector.String(), namespace, stateMsg[exist], err)
}
return nil
}
// waitForReplicationControllerWithSelector waits until any RC with given selector appears (exist == true), or disappears (exist == false)
func waitForReplicationControllerWithSelector(c clientset.Interface, namespace string, selector labels.Selector, exist bool, interval,
timeout time.Duration) error {

View File

@ -20,11 +20,14 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/watch"
watchtools "k8s.io/client-go/tools/watch"
@ -37,6 +40,113 @@ import (
"github.com/onsi/gomega"
)
// invariantFunc is a func that checks for invariant.
type invariantFunc func(older, newer runtime.Object) error
// checkInvariants checks for invariant of the each events.
func checkInvariants(events []watch.Event, fns ...invariantFunc) error {
errs := sets.NewString()
for i := range events {
j := i + 1
if j >= len(events) {
continue
}
for _, fn := range fns {
if err := fn(events[i].Object, events[j].Object); err != nil {
errs.Insert(err.Error())
}
}
}
if errs.Len() > 0 {
return fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* "))
}
return nil
}
// containerInitInvariant checks for an init containers are initialized and invariant on both older and newer.
func containerInitInvariant(older, newer runtime.Object) error {
oldPod := older.(*v1.Pod)
newPod := newer.(*v1.Pod)
if len(oldPod.Spec.InitContainers) == 0 {
return nil
}
if len(oldPod.Spec.InitContainers) != len(newPod.Spec.InitContainers) {
return fmt.Errorf("init container list changed")
}
if oldPod.UID != newPod.UID {
return fmt.Errorf("two different pods exist in the condition: %s vs %s", oldPod.UID, newPod.UID)
}
if err := initContainersInvariants(oldPod); err != nil {
return err
}
if err := initContainersInvariants(newPod); err != nil {
return err
}
oldInit, _, _ := initialized(oldPod)
newInit, _, _ := initialized(newPod)
if oldInit && !newInit {
// TODO: we may in the future enable resetting initialized = false if the kubelet needs to restart it
// from scratch
return fmt.Errorf("pod cannot be initialized and then regress to not being initialized")
}
return nil
}
// initialized checks the state of all init containers in the pod.
func initialized(pod *v1.Pod) (ok bool, failed bool, err error) {
allInit := true
initFailed := false
for _, s := range pod.Status.InitContainerStatuses {
switch {
case initFailed && s.State.Waiting == nil:
return allInit, initFailed, fmt.Errorf("container %s is after a failed container but isn't waiting", s.Name)
case allInit && s.State.Waiting == nil:
return allInit, initFailed, fmt.Errorf("container %s is after an initializing container but isn't waiting", s.Name)
case s.State.Terminated == nil:
allInit = false
case s.State.Terminated.ExitCode != 0:
allInit = false
initFailed = true
case !s.Ready:
return allInit, initFailed, fmt.Errorf("container %s initialized but isn't marked as ready", s.Name)
}
}
return allInit, initFailed, nil
}
func initContainersInvariants(pod *v1.Pod) error {
allInit, initFailed, err := initialized(pod)
if err != nil {
return err
}
if !allInit || initFailed {
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting == nil || s.RestartCount != 0 {
return fmt.Errorf("container %s is not waiting but initialization not complete", s.Name)
}
if s.State.Waiting.Reason != "PodInitializing" {
return fmt.Errorf("container %s should have reason PodInitializing: %s", s.Name, s.State.Waiting.Reason)
}
}
}
_, c := podutil.GetPodCondition(&pod.Status, v1.PodInitialized)
if c == nil {
return fmt.Errorf("pod does not have initialized condition")
}
if c.LastTransitionTime.IsZero() {
return fmt.Errorf("PodInitialized condition should always have a transition time")
}
switch {
case c.Status == v1.ConditionUnknown:
return fmt.Errorf("PodInitialized condition should never be Unknown")
case c.Status == v1.ConditionTrue && (initFailed || !allInit):
return fmt.Errorf("PodInitialized condition was True but all not all containers initialized")
case c.Status == v1.ConditionFalse && (!initFailed && allInit):
return fmt.Errorf("PodInitialized condition was False but all containers initialized")
}
return nil
}
var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
f := framework.NewDefaultFramework("init-container")
var podClient *framework.PodClient
@ -96,7 +206,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
defer cancel()
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodCompleted)
gomega.Expect(err).To(gomega.BeNil())
framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant)
checkInvariants(wr.Events(), containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodSucceeded)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -167,7 +277,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
defer cancel()
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodRunning)
gomega.Expect(err).To(gomega.BeNil())
framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant)
checkInvariants(wr.Events(), containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodRunning)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -289,7 +399,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
},
)
gomega.Expect(err).To(gomega.BeNil())
framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant)
checkInvariants(wr.Events(), containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodPending)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -398,7 +508,7 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
conditions.PodCompleted,
)
gomega.Expect(err).To(gomega.BeNil())
framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant)
checkInvariants(wr.Events(), containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodFailed)

View File

@ -33,6 +33,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtimeutils "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/component-base/logs"
"k8s.io/component-base/version"
commontest "k8s.io/kubernetes/test/e2e/common"
@ -160,6 +161,40 @@ func getDefaultClusterIPFamily(c clientset.Interface) string {
return "ipv4"
}
// waitForDaemonSets for all daemonsets in the given namespace to be ready
// (defined as all but 'allowedNotReadyNodes' pods associated with that
// daemonset are ready).
func waitForDaemonSets(c clientset.Interface, ns string, allowedNotReadyNodes int32, timeout time.Duration) error {
start := time.Now()
framework.Logf("Waiting up to %v for all daemonsets in namespace '%s' to start",
timeout, ns)
return wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
dsList, err := c.AppsV1().DaemonSets(ns).List(metav1.ListOptions{})
if err != nil {
framework.Logf("Error getting daemonsets in namespace: '%s': %v", ns, err)
if testutils.IsRetryableAPIError(err) {
return false, nil
}
return false, err
}
var notReadyDaemonSets []string
for _, ds := range dsList.Items {
framework.Logf("%d / %d pods ready in namespace '%s' in daemonset '%s' (%d seconds elapsed)", ds.Status.NumberReady, ds.Status.DesiredNumberScheduled, ns, ds.ObjectMeta.Name, int(time.Since(start).Seconds()))
if ds.Status.DesiredNumberScheduled-ds.Status.NumberReady > allowedNotReadyNodes {
notReadyDaemonSets = append(notReadyDaemonSets, ds.ObjectMeta.Name)
}
}
if len(notReadyDaemonSets) > 0 {
framework.Logf("there are not ready daemonsets: %v", notReadyDaemonSets)
return false, nil
}
return true, nil
})
}
// setupSuite is the boilerplate that can be used to setup ginkgo test suites, on the SynchronizedBeforeSuite step.
// There are certain operations we only want to run once per overall test invocation
// (such as deleting old namespaces, or verifying that all system pods are running.
@ -229,7 +264,7 @@ func setupSuite() {
framework.Failf("Error waiting for all pods to be running and ready: %v", err)
}
if err := framework.WaitForDaemonSets(c, metav1.NamespaceSystem, int32(framework.TestContext.AllowedNotReadyNodes), framework.TestContext.SystemDaemonsetStartupTimeout); err != nil {
if err := waitForDaemonSets(c, metav1.NamespaceSystem, int32(framework.TestContext.AllowedNotReadyNodes), framework.TestContext.SystemDaemonsetStartupTimeout); err != nil {
framework.Logf("WARNING: Waiting for all daemonsets to be ready failed: %v", err)
}

View File

@ -57,7 +57,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",

View File

@ -57,7 +57,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
@ -252,40 +251,6 @@ func NodeOSDistroIs(supportedNodeOsDistros ...string) bool {
return false
}
// WaitForDaemonSets for all daemonsets in the given namespace to be ready
// (defined as all but 'allowedNotReadyNodes' pods associated with that
// daemonset are ready).
func WaitForDaemonSets(c clientset.Interface, ns string, allowedNotReadyNodes int32, timeout time.Duration) error {
start := time.Now()
Logf("Waiting up to %v for all daemonsets in namespace '%s' to start",
timeout, ns)
return wait.PollImmediate(Poll, timeout, func() (bool, error) {
dsList, err := c.AppsV1().DaemonSets(ns).List(metav1.ListOptions{})
if err != nil {
Logf("Error getting daemonsets in namespace: '%s': %v", ns, err)
if testutils.IsRetryableAPIError(err) {
return false, nil
}
return false, err
}
var notReadyDaemonSets []string
for _, ds := range dsList.Items {
Logf("%d / %d pods ready in namespace '%s' in daemonset '%s' (%d seconds elapsed)", ds.Status.NumberReady, ds.Status.DesiredNumberScheduled, ns, ds.ObjectMeta.Name, int(time.Since(start).Seconds()))
if ds.Status.DesiredNumberScheduled-ds.Status.NumberReady > allowedNotReadyNodes {
notReadyDaemonSets = append(notReadyDaemonSets, ds.ObjectMeta.Name)
}
}
if len(notReadyDaemonSets) > 0 {
Logf("there are not ready daemonsets: %v", notReadyDaemonSets)
return false, nil
}
return true, nil
})
}
func kubectlLogPod(c clientset.Interface, pod v1.Pod, containerNameSubstr string, logFunc func(ftm string, args ...interface{})) {
for _, container := range pod.Spec.Containers {
if strings.Contains(container.Name, containerNameSubstr) {
@ -402,24 +367,6 @@ func WaitForDefaultServiceAccountInNamespace(c clientset.Interface, namespace st
return waitForServiceAccountInNamespace(c, namespace, "default", ServiceAccountProvisionTimeout)
}
// WaitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first.
func WaitForStatefulSetReplicasReady(statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error {
Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{})
if err != nil {
Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err)
continue
}
if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start))
return nil
}
Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
}
return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout)
}
// WaitForPersistentVolumeDeleted waits for a PersistentVolume to get deleted or until timeout occurs, whichever comes first.
func WaitForPersistentVolumeDeleted(c clientset.Interface, pvName string, Poll, timeout time.Duration) error {
Logf("Waiting up to %v for PersistentVolume %s to get deleted", timeout, pvName)
@ -545,123 +492,6 @@ func CheckTestingNSDeletedExcept(c clientset.Interface, skip string) error {
return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out")
}
// ContainerInitInvariant checks for an init containers are initialized and invariant on both older and newer.
func ContainerInitInvariant(older, newer runtime.Object) error {
oldPod := older.(*v1.Pod)
newPod := newer.(*v1.Pod)
if len(oldPod.Spec.InitContainers) == 0 {
return nil
}
if len(oldPod.Spec.InitContainers) != len(newPod.Spec.InitContainers) {
return fmt.Errorf("init container list changed")
}
if oldPod.UID != newPod.UID {
return fmt.Errorf("two different pods exist in the condition: %s vs %s", oldPod.UID, newPod.UID)
}
if err := initContainersInvariants(oldPod); err != nil {
return err
}
if err := initContainersInvariants(newPod); err != nil {
return err
}
oldInit, _, _ := initialized(oldPod)
newInit, _, _ := initialized(newPod)
if oldInit && !newInit {
// TODO: we may in the future enable resetting initialized = false if the kubelet needs to restart it
// from scratch
return fmt.Errorf("pod cannot be initialized and then regress to not being initialized")
}
return nil
}
func initContainersInvariants(pod *v1.Pod) error {
allInit, initFailed, err := initialized(pod)
if err != nil {
return err
}
if !allInit || initFailed {
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting == nil || s.RestartCount != 0 {
return fmt.Errorf("container %s is not waiting but initialization not complete", s.Name)
}
if s.State.Waiting.Reason != "PodInitializing" {
return fmt.Errorf("container %s should have reason PodInitializing: %s", s.Name, s.State.Waiting.Reason)
}
}
}
_, c := podutil.GetPodCondition(&pod.Status, v1.PodInitialized)
if c == nil {
return fmt.Errorf("pod does not have initialized condition")
}
if c.LastTransitionTime.IsZero() {
return fmt.Errorf("PodInitialized condition should always have a transition time")
}
switch {
case c.Status == v1.ConditionUnknown:
return fmt.Errorf("PodInitialized condition should never be Unknown")
case c.Status == v1.ConditionTrue && (initFailed || !allInit):
return fmt.Errorf("PodInitialized condition was True but all not all containers initialized")
case c.Status == v1.ConditionFalse && (!initFailed && allInit):
return fmt.Errorf("PodInitialized condition was False but all containers initialized")
}
return nil
}
// InvariantFunc is a func that checks for invariant.
type InvariantFunc func(older, newer runtime.Object) error
// CheckInvariants checks for invariant of the each events.
func CheckInvariants(events []watch.Event, fns ...InvariantFunc) error {
errs := sets.NewString()
for i := range events {
j := i + 1
if j >= len(events) {
continue
}
for _, fn := range fns {
if err := fn(events[i].Object, events[j].Object); err != nil {
errs.Insert(err.Error())
}
}
}
if errs.Len() > 0 {
return fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* "))
}
return nil
}
// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
func WaitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.Duration) error {
options := metav1.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}.AsSelector().String()}
w, err := c.CoreV1().ReplicationControllers(ns).Watch(options)
if err != nil {
return err
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "")
}
switch rc := event.Object.(type) {
case *v1.ReplicationController:
if rc.Name == name && rc.Namespace == ns &&
rc.Generation <= rc.Status.ObservedGeneration &&
*(rc.Spec.Replicas) == rc.Status.Replicas {
return true, nil
}
Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas)
}
return false, nil
})
return err
}
// WaitForService waits until the service appears (exist == true), or disappears (exist == false)
func WaitForService(c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
@ -688,33 +518,6 @@ func WaitForService(c clientset.Interface, namespace, name string, exist bool, i
return nil
}
// WaitForServiceWithSelector waits until any service with given selector appears (exist == true), or disappears (exist == false)
func WaitForServiceWithSelector(c clientset.Interface, namespace string, selector labels.Selector, exist bool, interval,
timeout time.Duration) error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
services, err := c.CoreV1().Services(namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
switch {
case len(services.Items) != 0:
Logf("Service with %s in namespace %s found.", selector.String(), namespace)
return exist, nil
case len(services.Items) == 0:
Logf("Service with %s in namespace %s disappeared.", selector.String(), namespace)
return !exist, nil
case !testutils.IsRetryableAPIError(err):
Logf("Non-retryable failure while listing service.")
return false, err
default:
Logf("List service with %s in namespace %s failed: %v", selector.String(), namespace, err)
return false, nil
}
})
if err != nil {
stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
return fmt.Errorf("error waiting for service with %s in namespace %s %s: %v", selector.String(), namespace, stateMsg[exist], err)
}
return nil
}
//WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
func WaitForServiceEndpointsNum(c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.Poll(interval, timeout, func() (bool, error) {
@ -1159,8 +962,8 @@ func (f *Framework) MatchContainerOutput(
// EventsLister is a func that lists events.
type EventsLister func(opts metav1.ListOptions, ns string) (*v1.EventList, error)
// DumpEventsInNamespace dumps events in the given namespace.
func DumpEventsInNamespace(eventsLister EventsLister, namespace string) {
// dumpEventsInNamespace dumps events in the given namespace.
func dumpEventsInNamespace(eventsLister EventsLister, namespace string) {
ginkgo.By(fmt.Sprintf("Collecting events from namespace %q.", namespace))
events, err := eventsLister(metav1.ListOptions{}, namespace)
ExpectNoError(err, "failed to list events in namespace %q", namespace)
@ -1181,7 +984,7 @@ func DumpEventsInNamespace(eventsLister EventsLister, namespace string) {
// DumpAllNamespaceInfo dumps events, pods and nodes information in the given namespace.
func DumpAllNamespaceInfo(c clientset.Interface, namespace string) {
DumpEventsInNamespace(func(opts metav1.ListOptions, ns string) (*v1.EventList, error) {
dumpEventsInNamespace(func(opts metav1.ListOptions, ns string) (*v1.EventList, error) {
return c.CoreV1().Events(ns).List(opts)
}, namespace)
@ -2223,25 +2026,3 @@ func PrettyPrintJSON(metrics interface{}) string {
}
return string(formatted.Bytes())
}
// initialized checks the state of all init containers in the pod.
func initialized(pod *v1.Pod) (ok bool, failed bool, err error) {
allInit := true
initFailed := false
for _, s := range pod.Status.InitContainerStatuses {
switch {
case initFailed && s.State.Waiting == nil:
return allInit, initFailed, fmt.Errorf("container %s is after a failed container but isn't waiting", s.Name)
case allInit && s.State.Waiting == nil:
return allInit, initFailed, fmt.Errorf("container %s is after an initializing container but isn't waiting", s.Name)
case s.State.Terminated == nil:
allInit = false
case s.State.Terminated.ExitCode != 0:
allInit = false
initFailed = true
case !s.Ready:
return allInit, initFailed, fmt.Errorf("container %s initialized but isn't marked as ready", s.Name)
}
}
return allInit, initFailed, nil
}

View File

@ -23,16 +23,19 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/kubectl/pkg/polymorphichelpers:go_default_library",
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",

View File

@ -50,16 +50,19 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubernetes/pkg/controller"
commonutils "k8s.io/kubernetes/test/e2e/common"
@ -1690,7 +1693,7 @@ metadata:
if checkContainersImage(containers, httpdImage) {
framework.Failf("Failed creating rc %s for 1 pod with expected image %s", rcName, httpdImage)
}
framework.WaitForRCToStabilize(c, ns, rcName, framework.PodStartTimeout)
waitForRCToStabilize(c, ns, rcName, framework.PodStartTimeout)
ginkgo.By("rolling-update to same image controller")
@ -2606,3 +2609,35 @@ func createObjValidateOutputAndCleanup(client dynamic.ResourceInterface, obj *un
framework.ExpectNotEqual(fields, []string{"NAME", "AGE"}, fmt.Sprintf("expected non-default fields for resource: %s", resource.Name))
}
}
// waitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
func waitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.Duration) error {
options := metav1.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}.AsSelector().String()}
w, err := c.CoreV1().ReplicationControllers(ns).Watch(options)
if err != nil {
return err
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "")
}
switch rc := event.Object.(type) {
case *v1.ReplicationController:
if rc.Name == name && rc.Namespace == ns &&
rc.Generation <= rc.Status.ObservedGeneration &&
*(rc.Spec.Replicas) == rc.Status.Replicas {
return true, nil
}
framework.Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas)
}
return false, nil
})
return err
}

View File

@ -214,7 +214,7 @@ func testZonalFailover(c clientset.Interface, ns string) {
}
}()
err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
err = waitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
if err != nil {
pod := getPod(c, ns, regionalPDLabels)
gomega.Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(gomega.BeTrue(),
@ -266,7 +266,7 @@ func testZonalFailover(c clientset.Interface, ns string) {
})
framework.ExpectNoError(err, "Error waiting for pod to be scheduled in a different zone (%q): %v", otherZone, err)
err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
err = waitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
if err != nil {
pod := getPod(c, ns, regionalPDLabels)
gomega.Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(gomega.BeTrue(),
@ -629,3 +629,21 @@ func checkZonesFromLabelAndAffinity(pv *v1.PersistentVolume, zones sets.String,
}
}
}
// waitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first.
func waitForStatefulSetReplicasReady(statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error {
framework.Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{})
if err != nil {
framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err)
continue
}
if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
framework.Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start))
return nil
}
framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
}
return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout)
}