diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index 1a3456f824b..b5e938350bb 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -24,15 +24,19 @@ import ( "github.com/onsi/ginkgo" "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" klabels "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -572,8 +576,14 @@ var _ = SIGDescribe("StatefulSet", func() { */ framework.ConformanceIt("Scaling should happen in predictable order and halt if any stateful pod is unhealthy [Slow]", func() { psLabels := klabels.Set(labels) + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.LabelSelector = psLabels.AsSelector().String() + return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options) + }, + } ginkgo.By("Initializing watcher for selector " + psLabels.String()) - watcher, err := f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), metav1.ListOptions{ + pl, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{ LabelSelector: psLabels.AsSelector().String(), }) framework.ExpectNoError(err) @@ -602,7 +612,7 @@ var _ = SIGDescribe("StatefulSet", func() { expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"} ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout) defer cancel() - _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) { + _, err = watchtools.Until(ctx, pl.ResourceVersion, w, func(event watch.Event) (bool, error) { if event.Type != watch.Added { return false, nil } @@ -616,7 +626,7 @@ var _ = SIGDescribe("StatefulSet", func() { framework.ExpectNoError(err) ginkgo.By("Scale down will halt with unhealthy stateful pod") - watcher, err = f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), metav1.ListOptions{ + pl, err = f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{ LabelSelector: psLabels.AsSelector().String(), }) framework.ExpectNoError(err) @@ -635,7 +645,7 @@ var _ = SIGDescribe("StatefulSet", func() { expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"} ctx, cancel = watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout) defer cancel() - _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) { + _, err = watchtools.Until(ctx, pl.ResourceVersion, w, func(event watch.Event) (bool, error) { if event.Type != watch.Deleted { return false, nil } @@ -738,12 +748,21 @@ var _ = SIGDescribe("StatefulSet", func() { var initialStatefulPodUID types.UID ginkgo.By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name) - w, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName})) - framework.ExpectNoError(err) + fieldSelector := fields.OneTermEqualSelector("metadata.name", statefulPodName).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + options.FieldSelector = fieldSelector + return f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), options) + }, + } ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulPodTimeout) defer cancel() // we need to get UID from pod in any state and wait until stateful set controller will remove pod at least once - _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) { + _, err = watchtools.ListWatchUntil(ctx, lw, func(event watch.Event) (bool, error) { pod := event.Object.(*v1.Pod) switch event.Type { case watch.Deleted: