users of watchtools.NewIndexerInformerWatcher should wait for the informer to sync

previously users of this method were relying on the fact that a call to LIST was made.
Instead, users should use the dedicated `HasSynced` method.
This commit is contained in:
Lukasz Szaszkiewicz 2022-05-04 17:18:54 +02:00
parent 2fd498f620
commit 4a7845b485
3 changed files with 29 additions and 42 deletions

View File

@ -249,20 +249,10 @@ var _ = SIGDescribe("Pods", func() {
framework.ExpectNoError(err, "failed to query for pods")
framework.ExpectEqual(len(pods.Items), 0)
listCompleted := make(chan bool, 1)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = selector.String()
podList, err := podClient.List(context.TODO(), options)
if err == nil {
select {
case listCompleted <- true:
framework.Logf("observed the pod list")
return podList, err
default:
framework.Logf("channel blocked")
}
}
return podList, err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
@ -270,9 +260,15 @@ var _ = SIGDescribe("Pods", func() {
return podClient.Watch(context.TODO(), options)
},
}
_, _, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.Pod{})
_, informer, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.Pod{})
defer w.Stop()
ctx, cancelCtx := context.WithTimeout(context.TODO(), wait.ForeverTestTimeout)
defer cancelCtx()
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
framework.Failf("Timeout while waiting to Pod informer to sync")
}
ginkgo.By("submitting the pod to kubernetes")
podClient.Create(pod)
@ -285,17 +281,12 @@ var _ = SIGDescribe("Pods", func() {
ginkgo.By("verifying pod creation was observed")
select {
case <-listCompleted:
select {
case event := <-w.ResultChan():
if event.Type != watch.Added {
framework.Failf("Failed to observe pod creation: %v", event)
}
case <-time.After(framework.PodStartTimeout):
framework.Failf("Timeout while waiting for pod creation")
case event := <-w.ResultChan():
if event.Type != watch.Added {
framework.Failf("Failed to observe pod creation: %v", event)
}
case <-time.After(10 * time.Second):
framework.Failf("Timeout while waiting to observe pod list")
case <-time.After(framework.PodStartTimeout):
framework.Failf("Timeout while waiting for pod creation")
}
// We need to wait for the pod to be running, otherwise the deletion

View File

@ -75,20 +75,10 @@ var _ = SIGDescribe("LimitRange", func() {
framework.ExpectNoError(err, "failed to query for limitRanges")
framework.ExpectEqual(len(limitRanges.Items), 0)
listCompleted := make(chan bool, 1)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = selector.String()
limitRanges, err := f.ClientSet.CoreV1().LimitRanges(f.Namespace.Name).List(context.TODO(), options)
if err == nil {
select {
case listCompleted <- true:
framework.Logf("observed the limitRanges list")
return limitRanges, err
default:
framework.Logf("channel blocked")
}
}
return limitRanges, err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
@ -96,26 +86,27 @@ var _ = SIGDescribe("LimitRange", func() {
return f.ClientSet.CoreV1().LimitRanges(f.Namespace.Name).Watch(context.TODO(), options)
},
}
_, _, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.LimitRange{})
_, informer, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.LimitRange{})
defer w.Stop()
ctx, cancelCtx := context.WithTimeout(context.TODO(), wait.ForeverTestTimeout)
defer cancelCtx()
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
framework.Failf("Timeout while waiting for LimitRange informer to sync")
}
ginkgo.By("Submitting a LimitRange")
limitRange, err = f.ClientSet.CoreV1().LimitRanges(f.Namespace.Name).Create(context.TODO(), limitRange, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Verifying LimitRange creation was observed")
select {
case <-listCompleted:
select {
case event, _ := <-w.ResultChan():
if event.Type != watch.Added {
framework.Failf("Failed to observe limitRange creation : %v", event)
}
case <-time.After(e2eservice.RespondingTimeout):
framework.Failf("Timeout while waiting for LimitRange creation")
case event, _ := <-w.ResultChan():
if event.Type != watch.Added {
framework.Failf("Failed to observe limitRange creation : %v", event)
}
case <-time.After(e2eservice.RespondingTimeout):
framework.Failf("Timeout while waiting for LimitRange list complete")
framework.Failf("Timeout while waiting for LimitRange creation")
}
ginkgo.By("Fetching the LimitRange to ensure it has proper values")

View File

@ -198,7 +198,12 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
return getWatchFunc(c, secret)(options)
},
}
_, _, w, done := watchtools.NewIndexerInformerWatcher(lw, &corev1.Secret{})
// there is an inherent race between a producer (generateEvents) and a consumer (the watcher) that needs to be solved here
// since the watcher is driven by an informer it is crucial to start producing only after the informer has synced
// otherwise we might not get all expected events since the informer LIST (or watchelist) and only then WATCHES
// all events received during the initial LIST (or watchlist) will be seen as a single event (to most recent version of an obj)
_, informer, w, done := watchtools.NewIndexerInformerWatcher(lw, &corev1.Secret{})
cache.WaitForCacheSync(context.TODO().Done(), informer.HasSynced)
return w, nil, func() { <-done }
},
normalizeOutputFunc: normalizeInformerOutputFunc(initialCount),