diff --git a/test/integration/volume/persistent_volumes_test.go b/test/integration/volume/persistent_volumes_test.go index eed2c652628..d3a9e6c3463 100644 --- a/test/integration/volume/persistent_volumes_test.go +++ b/test/integration/volume/persistent_volumes_test.go @@ -1468,19 +1468,30 @@ func waitForAnyPersistentVolumePhase(w watch.Interface, phase v1.PersistentVolum } func waitForSomePersistentVolumeClaimPhase(ctx context.Context, testClient clientset.Interface, namespace string, objCount int, watchPVC watch.Interface, phase v1.PersistentVolumeClaimPhase) error { + // The caller doesn't know about new watches, so we have to stop them ourselves. + var newWatchPVC watch.Interface + defer func() { + if newWatchPVC != nil { + newWatchPVC.Stop() + } + }() + return wait.ExponentialBackoffWithContext(ctx, retry.DefaultBackoff, func(ctx context.Context) (bool, error) { for i := 0; i < objCount; i++ { - waitErr := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) - if waitErr != nil { - klog.Errorf("Failed to wait for a claim (%d/%d) to be bound: %v", i+1, objCount, waitErr) + err := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) + if err != nil { + klog.Errorf("Failed to wait for a claim (%d/%d) to be bound: %v", i+1, objCount, err) klog.Info("Recreating a watch for claims and re-checking the count of bound claims") - newWatchPVC, err := testClient.CoreV1().PersistentVolumeClaims(namespace).Watch(ctx, metav1.ListOptions{}) + newWatchPVC, err = testClient.CoreV1().PersistentVolumeClaims(namespace).Watch(ctx, metav1.ListOptions{}) if err != nil { return false, err } watchPVC.Stop() watchPVC = newWatchPVC - return false, waitErr + // The error from waitForAnyPersistentVolumeClaimPhase is assumed to be harmless + // and/or to be fixed by recreating the watch, so here we continue polling + // via ExponentialBackoffWithContext. + return false, nil } klog.V(1).Infof("%d claims bound", i+1) }