From 13cd40d718b04e054cb6bf241654ec56df3baf76 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 6 Oct 2025 17:35:59 +0200 Subject: [PATCH] E2E volume: fix restarting of watch Presumably https://github.com/kubernetes/kubernetes/pull/127260/files#r2405215911 was meant to continue polling after a watch was closed by the apiserver. This is something that can happen under load. However, returning the error has the effect that polling stops. This can be seen as test failures when testing with race detection enabled: persistent_volumes_test.go:1101: Failed to wait for all claims to be bound: watch closed --- .../volume/persistent_volumes_test.go | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/test/integration/volume/persistent_volumes_test.go b/test/integration/volume/persistent_volumes_test.go index eed2c652628..d3a9e6c3463 100644 --- a/test/integration/volume/persistent_volumes_test.go +++ b/test/integration/volume/persistent_volumes_test.go @@ -1468,19 +1468,30 @@ func waitForAnyPersistentVolumePhase(w watch.Interface, phase v1.PersistentVolum } func waitForSomePersistentVolumeClaimPhase(ctx context.Context, testClient clientset.Interface, namespace string, objCount int, watchPVC watch.Interface, phase v1.PersistentVolumeClaimPhase) error { + // The caller doesn't know about new watches, so we have to stop them ourselves. + var newWatchPVC watch.Interface + defer func() { + if newWatchPVC != nil { + newWatchPVC.Stop() + } + }() + return wait.ExponentialBackoffWithContext(ctx, retry.DefaultBackoff, func(ctx context.Context) (bool, error) { for i := 0; i < objCount; i++ { - waitErr := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) - if waitErr != nil { - klog.Errorf("Failed to wait for a claim (%d/%d) to be bound: %v", i+1, objCount, waitErr) + err := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) + if err != nil { + klog.Errorf("Failed to wait for a claim (%d/%d) to be bound: %v", i+1, objCount, err) klog.Info("Recreating a watch for claims and re-checking the count of bound claims") - newWatchPVC, err := testClient.CoreV1().PersistentVolumeClaims(namespace).Watch(ctx, metav1.ListOptions{}) + newWatchPVC, err = testClient.CoreV1().PersistentVolumeClaims(namespace).Watch(ctx, metav1.ListOptions{}) if err != nil { return false, err } watchPVC.Stop() watchPVC = newWatchPVC - return false, waitErr + // The error from waitForAnyPersistentVolumeClaimPhase is assumed to be harmless + // and/or to be fixed by recreating the watch, so here we continue polling + // via ExponentialBackoffWithContext. + return false, nil } klog.V(1).Infof("%d claims bound", i+1) }