Fix watches in init container e2e tests

This commit is contained in:
Tomas Nozicka 2019-03-08 14:58:19 +01:00
parent 2256991c93
commit c0ddbb0e97

View File

@ -23,23 +23,32 @@ import (
"strings" "strings"
"time" "time"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch" watchtools "k8s.io/client-go/tools/watch"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/client/conditions" "k8s.io/kubernetes/pkg/client/conditions"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
) )
func recordEvents(events []watch.Event, f func(watch.Event) (bool, error)) func(watch.Event) (bool, error) {
return func(e watch.Event) (bool, error) {
events = append(events, e)
return f(e)
}
}
// invariantFunc is a func that checks for invariant. // invariantFunc is a func that checks for invariant.
type invariantFunc func(older, newer runtime.Object) error type invariantFunc func(older, newer runtime.Object) error
@ -199,14 +208,23 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
} }
framework.Logf("PodSpec: initContainers in spec.initContainers") framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod) startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta))
framework.ExpectNoError(err, "error watching a pod") fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
wr := watch.NewRecorder(w) w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel() defer cancel()
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodCompleted) event, err := watchtools.Until(ctx, startedPod.ResourceVersion, w,
recordEvents(events, conditions.PodCompleted),
)
gomega.Expect(err).To(gomega.BeNil()) gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod) endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodSucceeded) framework.ExpectEqual(endPod.Status.Phase, v1.PodSucceeded)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -269,14 +287,21 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
} }
framework.Logf("PodSpec: initContainers in spec.initContainers") framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod) startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta))
framework.ExpectNoError(err, "error watching a pod") fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
wr := watch.NewRecorder(w) w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel() defer cancel()
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodRunning) event, err := watchtools.Until(ctx, startedPod.ResourceVersion, w, recordEvents(events, conditions.PodRunning))
gomega.Expect(err).To(gomega.BeNil()) gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod) endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodRunning) framework.ExpectEqual(endPod.Status.Phase, v1.PodRunning)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -340,14 +365,22 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
} }
framework.Logf("PodSpec: initContainers in spec.initContainers") framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod) startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta))
framework.ExpectNoError(err, "error watching a pod")
wr := watch.NewRecorder(w) fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel() defer cancel()
event, err := watchtools.UntilWithoutRetry( event, err := watchtools.Until(
ctx, wr, ctx,
startedPod.ResourceVersion,
w,
// check for the first container to fail at least once // check for the first container to fail at least once
func(evt watch.Event) (bool, error) { func(evt watch.Event) (bool, error) {
switch t := evt.Object.(type) { switch t := evt.Object.(type) {
@ -397,7 +430,8 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
}, },
) )
gomega.Expect(err).To(gomega.BeNil()) gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod) endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodPending) framework.ExpectEqual(endPod.Status.Phase, v1.PodPending)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized) _, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -457,14 +491,20 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
framework.Logf("PodSpec: initContainers in spec.initContainers") framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod) startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta)) fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
framework.ExpectNoError(err, "error watching a pod") w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
wr := watch.NewRecorder(w) var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel() defer cancel()
event, err := watchtools.UntilWithoutRetry( event, err := watchtools.Until(
ctx, wr, ctx, startedPod.ResourceVersion, w,
recordEvents(events,
// check for the second container to fail at least once // check for the second container to fail at least once
func(evt watch.Event) (bool, error) { func(evt watch.Event) (bool, error) {
switch t := evt.Object.(type) { switch t := evt.Object.(type) {
@ -501,11 +541,12 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
default: default:
return false, fmt.Errorf("unexpected object: %#v", t) return false, fmt.Errorf("unexpected object: %#v", t)
} }
}, }),
conditions.PodCompleted, recordEvents(events, conditions.PodCompleted),
) )
gomega.Expect(err).To(gomega.BeNil()) gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod) endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodFailed) framework.ExpectEqual(endPod.Status.Phase, v1.PodFailed)