diff --git a/test/e2e/common/node/pods.go b/test/e2e/common/node/pods.go index 785eb4cc0e8..5e5b1f4f9e1 100644 --- a/test/e2e/common/node/pods.go +++ b/test/e2e/common/node/pods.go @@ -249,20 +249,10 @@ var _ = SIGDescribe("Pods", func() { framework.ExpectNoError(err, "failed to query for pods") framework.ExpectEqual(len(pods.Items), 0) - listCompleted := make(chan bool, 1) lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.LabelSelector = selector.String() podList, err := podClient.List(context.TODO(), options) - if err == nil { - select { - case listCompleted <- true: - framework.Logf("observed the pod list") - return podList, err - default: - framework.Logf("channel blocked") - } - } return podList, err }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { @@ -270,9 +260,15 @@ var _ = SIGDescribe("Pods", func() { return podClient.Watch(context.TODO(), options) }, } - _, _, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.Pod{}) + _, informer, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.Pod{}) defer w.Stop() + ctx, cancelCtx := context.WithTimeout(context.TODO(), wait.ForeverTestTimeout) + defer cancelCtx() + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + framework.Failf("Timeout while waiting to Pod informer to sync") + } + ginkgo.By("submitting the pod to kubernetes") podClient.Create(pod) @@ -285,17 +281,12 @@ var _ = SIGDescribe("Pods", func() { ginkgo.By("verifying pod creation was observed") select { - case <-listCompleted: - select { - case event := <-w.ResultChan(): - if event.Type != watch.Added { - framework.Failf("Failed to observe pod creation: %v", event) - } - case <-time.After(framework.PodStartTimeout): - framework.Failf("Timeout while waiting for pod creation") + case event := <-w.ResultChan(): + if event.Type != watch.Added { + framework.Failf("Failed to observe pod creation: %v", event) } - case <-time.After(10 * time.Second): - framework.Failf("Timeout while waiting to observe pod list") + case <-time.After(framework.PodStartTimeout): + framework.Failf("Timeout while waiting for pod creation") } // We need to wait for the pod to be running, otherwise the deletion diff --git a/test/e2e/scheduling/limit_range.go b/test/e2e/scheduling/limit_range.go index 7bcb20db76b..b57c38c1330 100644 --- a/test/e2e/scheduling/limit_range.go +++ b/test/e2e/scheduling/limit_range.go @@ -75,20 +75,10 @@ var _ = SIGDescribe("LimitRange", func() { framework.ExpectNoError(err, "failed to query for limitRanges") framework.ExpectEqual(len(limitRanges.Items), 0) - listCompleted := make(chan bool, 1) lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.LabelSelector = selector.String() limitRanges, err := f.ClientSet.CoreV1().LimitRanges(f.Namespace.Name).List(context.TODO(), options) - if err == nil { - select { - case listCompleted <- true: - framework.Logf("observed the limitRanges list") - return limitRanges, err - default: - framework.Logf("channel blocked") - } - } return limitRanges, err }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { @@ -96,26 +86,27 @@ var _ = SIGDescribe("LimitRange", func() { return f.ClientSet.CoreV1().LimitRanges(f.Namespace.Name).Watch(context.TODO(), options) }, } - _, _, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.LimitRange{}) + _, informer, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.LimitRange{}) defer w.Stop() + ctx, cancelCtx := context.WithTimeout(context.TODO(), wait.ForeverTestTimeout) + defer cancelCtx() + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + framework.Failf("Timeout while waiting for LimitRange informer to sync") + } + ginkgo.By("Submitting a LimitRange") limitRange, err = f.ClientSet.CoreV1().LimitRanges(f.Namespace.Name).Create(context.TODO(), limitRange, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Verifying LimitRange creation was observed") select { - case <-listCompleted: - select { - case event, _ := <-w.ResultChan(): - if event.Type != watch.Added { - framework.Failf("Failed to observe limitRange creation : %v", event) - } - case <-time.After(e2eservice.RespondingTimeout): - framework.Failf("Timeout while waiting for LimitRange creation") + case event, _ := <-w.ResultChan(): + if event.Type != watch.Added { + framework.Failf("Failed to observe limitRange creation : %v", event) } case <-time.After(e2eservice.RespondingTimeout): - framework.Failf("Timeout while waiting for LimitRange list complete") + framework.Failf("Timeout while waiting for LimitRange creation") } ginkgo.By("Fetching the LimitRange to ensure it has proper values") diff --git a/test/integration/apimachinery/watch_restart_test.go b/test/integration/apimachinery/watch_restart_test.go index fe599a3c152..e562620dc83 100644 --- a/test/integration/apimachinery/watch_restart_test.go +++ b/test/integration/apimachinery/watch_restart_test.go @@ -198,7 +198,12 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { return getWatchFunc(c, secret)(options) }, } - _, _, w, done := watchtools.NewIndexerInformerWatcher(lw, &corev1.Secret{}) + // there is an inherent race between a producer (generateEvents) and a consumer (the watcher) that needs to be solved here + // since the watcher is driven by an informer it is crucial to start producing only after the informer has synced + // otherwise we might not get all expected events since the informer LIST (or watchelist) and only then WATCHES + // all events received during the initial LIST (or watchlist) will be seen as a single event (to most recent version of an obj) + _, informer, w, done := watchtools.NewIndexerInformerWatcher(lw, &corev1.Secret{}) + cache.WaitForCacheSync(context.TODO().Done(), informer.HasSynced) return w, nil, func() { <-done } }, normalizeOutputFunc: normalizeInformerOutputFunc(initialCount),