diff --git a/test/e2e/common/init_container.go b/test/e2e/common/init_container.go index 0680c412123..95302a4c696 100644 --- a/test/e2e/common/init_container.go +++ b/test/e2e/common/init_container.go @@ -23,23 +23,32 @@ import ( "strings" "time" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/client/conditions" "k8s.io/kubernetes/test/e2e/framework" imageutils "k8s.io/kubernetes/test/utils/image" - - "github.com/onsi/ginkgo" - "github.com/onsi/gomega" ) +func recordEvents(events []watch.Event, f func(watch.Event) (bool, error)) func(watch.Event) (bool, error) { + return func(e watch.Event) (bool, error) { + events = append(events, e) + return f(e) + } +} + // invariantFunc is a func that checks for invariant. type invariantFunc func(older, newer runtime.Object) error @@ -199,14 +208,23 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { } framework.Logf("PodSpec: initContainers in spec.initContainers") startedPod := podClient.Create(pod) - w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta)) - framework.ExpectNoError(err, "error watching a pod") - wr := watch.NewRecorder(w) + + fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String() + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return podClient.Watch(context.TODO(), options) + }, + } + var events []watch.Event ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) defer cancel() - event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodCompleted) + event, err := watchtools.Until(ctx, startedPod.ResourceVersion, w, + recordEvents(events, conditions.PodCompleted), + ) gomega.Expect(err).To(gomega.BeNil()) - checkInvariants(wr.Events(), containerInitInvariant) + + checkInvariants(events, containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodSucceeded) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) @@ -269,14 +287,21 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { } framework.Logf("PodSpec: initContainers in spec.initContainers") startedPod := podClient.Create(pod) - w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta)) - framework.ExpectNoError(err, "error watching a pod") - wr := watch.NewRecorder(w) + + fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String() + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return podClient.Watch(context.TODO(), options) + }, + } + var events []watch.Event ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) defer cancel() - event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodRunning) + event, err := watchtools.Until(ctx, startedPod.ResourceVersion, w, recordEvents(events, conditions.PodRunning)) gomega.Expect(err).To(gomega.BeNil()) - checkInvariants(wr.Events(), containerInitInvariant) + + checkInvariants(events, containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodRunning) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) @@ -340,14 +365,22 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { } framework.Logf("PodSpec: initContainers in spec.initContainers") startedPod := podClient.Create(pod) - w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta)) - framework.ExpectNoError(err, "error watching a pod") - wr := watch.NewRecorder(w) + fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String() + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return podClient.Watch(context.TODO(), options) + }, + } + + var events []watch.Event ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) defer cancel() - event, err := watchtools.UntilWithoutRetry( - ctx, wr, + event, err := watchtools.Until( + ctx, + startedPod.ResourceVersion, + w, // check for the first container to fail at least once func(evt watch.Event) (bool, error) { switch t := evt.Object.(type) { @@ -397,7 +430,8 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { }, ) gomega.Expect(err).To(gomega.BeNil()) - checkInvariants(wr.Events(), containerInitInvariant) + + checkInvariants(events, containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodPending) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) @@ -457,55 +491,62 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { framework.Logf("PodSpec: initContainers in spec.initContainers") startedPod := podClient.Create(pod) - w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta)) - framework.ExpectNoError(err, "error watching a pod") + fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String() + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return podClient.Watch(context.TODO(), options) + }, + } - wr := watch.NewRecorder(w) + var events []watch.Event ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) defer cancel() - event, err := watchtools.UntilWithoutRetry( - ctx, wr, - // check for the second container to fail at least once - func(evt watch.Event) (bool, error) { - switch t := evt.Object.(type) { - case *v1.Pod: - for _, status := range t.Status.ContainerStatuses { - if status.State.Waiting == nil { - return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status) + event, err := watchtools.Until( + ctx, startedPod.ResourceVersion, w, + recordEvents(events, + // check for the second container to fail at least once + func(evt watch.Event) (bool, error) { + switch t := evt.Object.(type) { + case *v1.Pod: + for _, status := range t.Status.ContainerStatuses { + if status.State.Waiting == nil { + return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status) + } + if status.State.Waiting.Reason != "PodInitializing" { + return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status) + } } - if status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status) + if len(t.Status.InitContainerStatuses) != 2 { + return false, nil } - } - if len(t.Status.InitContainerStatuses) != 2 { - return false, nil - } - status := t.Status.InitContainerStatuses[0] - if status.State.Terminated == nil { - if status.State.Waiting != nil && status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status) + status := t.Status.InitContainerStatuses[0] + if status.State.Terminated == nil { + if status.State.Waiting != nil && status.State.Waiting.Reason != "PodInitializing" { + return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status) + } + return false, nil } - return false, nil + if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 { + return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status) + } + status = t.Status.InitContainerStatuses[1] + if status.State.Terminated == nil { + return false, nil + } + if status.State.Terminated.ExitCode == 0 { + return false, fmt.Errorf("second init container should have failed: %#v", status) + } + return true, nil + default: + return false, fmt.Errorf("unexpected object: %#v", t) } - if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 { - return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status) - } - status = t.Status.InitContainerStatuses[1] - if status.State.Terminated == nil { - return false, nil - } - if status.State.Terminated.ExitCode == 0 { - return false, fmt.Errorf("second init container should have failed: %#v", status) - } - return true, nil - default: - return false, fmt.Errorf("unexpected object: %#v", t) - } - }, - conditions.PodCompleted, + }), + recordEvents(events, conditions.PodCompleted), ) gomega.Expect(err).To(gomega.BeNil()) - checkInvariants(wr.Events(), containerInitInvariant) + + checkInvariants(events, containerInitInvariant) endPod := event.Object.(*v1.Pod) framework.ExpectEqual(endPod.Status.Phase, v1.PodFailed)