Merge pull request #75111 from tnozicka/fix-e2e-watches

Fix watches in e2e tests
This commit is contained in:
Kubernetes Prow Robot 2020-04-23 19:20:07 -07:00 committed by GitHub
commit a62cfe8451
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 179 additions and 80 deletions

View File

@ -25,6 +25,8 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@ -1024,9 +1026,12 @@ func watchRecreateDeployment(c clientset.Interface, d *appsv1.Deployment) error
return fmt.Errorf("deployment %q does not use a Recreate strategy: %s", d.Name, d.Spec.Strategy.Type)
}
w, err := c.AppsV1().Deployments(d.Namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: d.Name, ResourceVersion: d.ResourceVersion}))
if err != nil {
return err
fieldSelector := fields.OneTermEqualSelector("metadata.name", d.Name).String()
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return c.AppsV1().Deployments(d.Namespace).Watch(context.TODO(), options)
},
}
status := d.Status
@ -1053,7 +1058,7 @@ func watchRecreateDeployment(c clientset.Interface, d *appsv1.Deployment) error
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, condition)
_, err := watchtools.Until(ctx, d.ResourceVersion, w, condition)
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("deployment %q never completed: %#v", d.Name, status)
}

View File

@ -24,15 +24,19 @@ import (
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -572,8 +576,14 @@ var _ = SIGDescribe("StatefulSet", func() {
*/
framework.ConformanceIt("Scaling should happen in predictable order and halt if any stateful pod is unhealthy [Slow]", func() {
psLabels := klabels.Set(labels)
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.LabelSelector = psLabels.AsSelector().String()
return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options)
},
}
ginkgo.By("Initializing watcher for selector " + psLabels.String())
watcher, err := f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), metav1.ListOptions{
pl, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: psLabels.AsSelector().String(),
})
framework.ExpectNoError(err)
@ -602,7 +612,7 @@ var _ = SIGDescribe("StatefulSet", func() {
expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
_, err = watchtools.Until(ctx, pl.ResourceVersion, w, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
return false, nil
}
@ -616,7 +626,7 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.ExpectNoError(err)
ginkgo.By("Scale down will halt with unhealthy stateful pod")
watcher, err = f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), metav1.ListOptions{
pl, err = f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: psLabels.AsSelector().String(),
})
framework.ExpectNoError(err)
@ -635,7 +645,7 @@ var _ = SIGDescribe("StatefulSet", func() {
expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"}
ctx, cancel = watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
_, err = watchtools.Until(ctx, pl.ResourceVersion, w, func(event watch.Event) (bool, error) {
if event.Type != watch.Deleted {
return false, nil
}
@ -738,12 +748,21 @@ var _ = SIGDescribe("StatefulSet", func() {
var initialStatefulPodUID types.UID
ginkgo.By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name)
w, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName}))
framework.ExpectNoError(err)
fieldSelector := fields.OneTermEqualSelector("metadata.name", statefulPodName).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
options.FieldSelector = fieldSelector
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), options)
},
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulPodTimeout)
defer cancel()
// we need to get UID from pod in any state and wait until stateful set controller will remove pod at least once
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
_, err = watchtools.ListWatchUntil(ctx, lw, func(event watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
switch event.Type {
case watch.Deleted:

View File

@ -23,23 +23,32 @@ import (
"strings"
"time"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/client/conditions"
"k8s.io/kubernetes/test/e2e/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
)
func recordEvents(events []watch.Event, f func(watch.Event) (bool, error)) func(watch.Event) (bool, error) {
return func(e watch.Event) (bool, error) {
events = append(events, e)
return f(e)
}
}
// invariantFunc is a func that checks for invariant.
type invariantFunc func(older, newer runtime.Object) error
@ -199,14 +208,23 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
}
framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta))
framework.ExpectNoError(err, "error watching a pod")
wr := watch.NewRecorder(w)
fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel()
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodCompleted)
event, err := watchtools.Until(ctx, startedPod.ResourceVersion, w,
recordEvents(events, conditions.PodCompleted),
)
gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodSucceeded)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -269,14 +287,21 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
}
framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta))
framework.ExpectNoError(err, "error watching a pod")
wr := watch.NewRecorder(w)
fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel()
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodRunning)
event, err := watchtools.Until(ctx, startedPod.ResourceVersion, w, recordEvents(events, conditions.PodRunning))
gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodRunning)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -340,14 +365,22 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
}
framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta))
framework.ExpectNoError(err, "error watching a pod")
wr := watch.NewRecorder(w)
fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel()
event, err := watchtools.UntilWithoutRetry(
ctx, wr,
event, err := watchtools.Until(
ctx,
startedPod.ResourceVersion,
w,
// check for the first container to fail at least once
func(evt watch.Event) (bool, error) {
switch t := evt.Object.(type) {
@ -397,7 +430,8 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
},
)
gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodPending)
_, init := podutil.GetPodCondition(&endPod.Status, v1.PodInitialized)
@ -457,55 +491,62 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
framework.Logf("PodSpec: initContainers in spec.initContainers")
startedPod := podClient.Create(pod)
w, err := podClient.Watch(context.TODO(), metav1.SingleObject(startedPod.ObjectMeta))
framework.ExpectNoError(err, "error watching a pod")
fieldSelector := fields.OneTermEqualSelector("metadata.name", startedPod.Name).String()
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return podClient.Watch(context.TODO(), options)
},
}
wr := watch.NewRecorder(w)
var events []watch.Event
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel()
event, err := watchtools.UntilWithoutRetry(
ctx, wr,
// check for the second container to fail at least once
func(evt watch.Event) (bool, error) {
switch t := evt.Object.(type) {
case *v1.Pod:
for _, status := range t.Status.ContainerStatuses {
if status.State.Waiting == nil {
return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status)
event, err := watchtools.Until(
ctx, startedPod.ResourceVersion, w,
recordEvents(events,
// check for the second container to fail at least once
func(evt watch.Event) (bool, error) {
switch t := evt.Object.(type) {
case *v1.Pod:
for _, status := range t.Status.ContainerStatuses {
if status.State.Waiting == nil {
return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status)
}
if status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status)
}
}
if status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status)
if len(t.Status.InitContainerStatuses) != 2 {
return false, nil
}
}
if len(t.Status.InitContainerStatuses) != 2 {
return false, nil
}
status := t.Status.InitContainerStatuses[0]
if status.State.Terminated == nil {
if status.State.Waiting != nil && status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status)
status := t.Status.InitContainerStatuses[0]
if status.State.Terminated == nil {
if status.State.Waiting != nil && status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status)
}
return false, nil
}
return false, nil
if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 {
return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status)
}
status = t.Status.InitContainerStatuses[1]
if status.State.Terminated == nil {
return false, nil
}
if status.State.Terminated.ExitCode == 0 {
return false, fmt.Errorf("second init container should have failed: %#v", status)
}
return true, nil
default:
return false, fmt.Errorf("unexpected object: %#v", t)
}
if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 {
return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status)
}
status = t.Status.InitContainerStatuses[1]
if status.State.Terminated == nil {
return false, nil
}
if status.State.Terminated.ExitCode == 0 {
return false, fmt.Errorf("second init container should have failed: %#v", status)
}
return true, nil
default:
return false, fmt.Errorf("unexpected object: %#v", t)
}
},
conditions.PodCompleted,
}),
recordEvents(events, conditions.PodCompleted),
)
gomega.Expect(err).To(gomega.BeNil())
checkInvariants(wr.Events(), containerInitInvariant)
checkInvariants(events, containerInitInvariant)
endPod := event.Object.(*v1.Pod)
framework.ExpectEqual(endPod.Status.Phase, v1.PodFailed)

View File

@ -51,6 +51,7 @@ go_library(
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//staging/src/k8s.io/client-go/tools/remotecommand:go_default_library",

View File

@ -44,6 +44,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
@ -52,6 +53,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
watchtools "k8s.io/client-go/tools/watch"
@ -260,13 +262,20 @@ func WaitForNamespacesDeleted(c clientset.Interface, namespaces []string, timeou
}
func waitForServiceAccountInNamespace(c clientset.Interface, ns, serviceAccountName string, timeout time.Duration) error {
w, err := c.CoreV1().ServiceAccounts(ns).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: serviceAccountName}))
if err != nil {
return err
fieldSelector := fields.OneTermEqualSelector("metadata.name", serviceAccountName).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
options.FieldSelector = fieldSelector
return c.CoreV1().ServiceAccounts(ns).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return c.CoreV1().ServiceAccounts(ns).Watch(context.TODO(), options)
},
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, serviceAccountHasSecrets)
_, err := watchtools.UntilWithSync(ctx, lw, &v1.ServiceAccount{}, nil, serviceAccountHasSecrets)
return err
}

View File

@ -190,6 +190,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis:go_default_library",
@ -225,7 +226,6 @@ go_test(
"//pkg/kubelet/stats/pidlimit:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//test/e2e/framework/config:go_default_library",
"//test/e2e/framework/kubelet:go_default_library",
@ -240,7 +240,6 @@ go_test(
"//pkg/kubelet/stats/pidlimit:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//test/e2e/framework/config:go_default_library",
"//test/e2e/framework/kubelet:go_default_library",

View File

@ -30,9 +30,13 @@ import (
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -40,7 +44,6 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
"k8s.io/klog"
)
var _ = framework.KubeDescribe("AppArmor [Feature:AppArmor][NodeFeature:AppArmor]", func() {
@ -151,11 +154,33 @@ func runAppArmorTest(f *framework.Framework, shouldRun bool, profile string) v1.
f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout))
} else {
// Pod should remain in the pending state. Wait for the Reason to be set to "AppArmor".
w, err := f.PodClient().Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: pod.Name}))
framework.ExpectNoError(err)
fieldSelector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
w := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return f.PodClient().List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return f.PodClient().Watch(context.TODO(), options)
},
}
preconditionFunc := func(store cache.Store) (bool, error) {
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
return true, err
}
if !exists {
// We need to make sure we see the object in the cache before we start waiting for events
// or we would be waiting for the timeout if such object didn't exist.
return true, apierrors.NewNotFound(v1.Resource("pods"), pod.Name)
}
return false, nil
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
_, err := watchtools.UntilWithSync(ctx, w, &v1.Pod{}, preconditionFunc, func(e watch.Event) (bool, error) {
switch e.Type {
case watch.Deleted:
return false, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, pod.Name)