diff --git a/test/integration/volume/persistent_volumes_test.go b/test/integration/volume/persistent_volumes_test.go index 5669301aaf5..eed2c652628 100644 --- a/test/integration/volume/persistent_volumes_test.go +++ b/test/integration/volume/persistent_volumes_test.go @@ -29,12 +29,14 @@ import ( storage "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" + "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -284,7 +286,9 @@ func TestPersistentVolumeBindRace(t *testing.T) { waitForPersistentVolumePhase(testClient, pv.Name, watchPV, v1.VolumeBound) klog.V(2).Infof("TestPersistentVolumeBindRace pv bound") - waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) + if err := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound); err != nil { + t.Fatalf("Unexpected error waiting for any pvc to be bound: %v", err) + } klog.V(2).Infof("TestPersistentVolumeBindRace pvc bound") pv, err = testClient.CoreV1().PersistentVolumes().Get(context.TODO(), pv.Name, metav1.GetOptions{}) @@ -871,10 +875,11 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { }() // wait until the binder pairs all claims - for i := 0; i < objCount; i++ { - waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) - klog.V(1).Infof("%d claims bound", i+1) + err := waitForSomePersistentVolumeClaimPhase(tCtx, testClient, namespaceName, objCount, watchPVC, v1.ClaimBound) + if err != nil { + t.Fatalf("Failed to wait for all claims to be bound: %v", err) } + // wait until the binder pairs all volumes for i := 0; i < objCount; i++ { waitForPersistentVolumePhase(testClient, pvs[i].Name, watchPV, v1.VolumeBound) @@ -952,7 +957,9 @@ func TestPersistentVolumeControllerStartup(t *testing.T) { // Drain watchPVC with all events generated by the PVC until it's bound // We don't want to catch "PVC created with Status.Phase == Pending" // later in this test. - waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) + if err := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound); err != nil { + t.Fatalf("Unexpected error waiting for any pvc to be bound: %v", err) + } pv := createPV(pvName, "/tmp/foo"+strconv.Itoa(i), "1G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRetain) @@ -1058,34 +1065,6 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) { defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) defer testClient.StorageV1().StorageClasses().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) - // Watch all events in the namespace, and save them to artifacts for debugging. - // TODO: This is a temporary solution to debug flaky tests `panic: test timed out after 10m0s`. - // We should remove this once https://github.com/kubernetes/kubernetes/issues/124136 is fixed. - go func() { - w, err := testClient.EventsV1().Events(ns.Name).Watch(tCtx, metav1.ListOptions{}) - if err != nil { - return - } - for { - select { - case event, ok := <-w.ResultChan(): - if !ok { - klog.Info("Event watch channel closed") - w, err = testClient.EventsV1().Events(ns.Name).Watch(tCtx, metav1.ListOptions{}) - if err != nil { - klog.ErrorS(err, "Failed to restart event watch") - return - } - continue - } - reportToArtifacts(t.Name()+"-events.text", event.Object) - case <-tCtx.Done(): - w.Stop() - return - } - } - }() - storageClass := storage.StorageClass{ TypeMeta: metav1.TypeMeta{ Kind: "StorageClass", @@ -1117,9 +1096,9 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) { }() // Wait until the controller provisions and binds all of them - for i := 0; i < objCount; i++ { - waitForAnyPersistentVolumeClaimPhaseAndReportIt(t, watchPVC, v1.ClaimBound) - klog.V(1).Infof("%d claims bound", i+1) + err := waitForSomePersistentVolumeClaimPhase(tCtx, testClient, namespaceName, objCount, watchPVC, v1.ClaimBound) + if err != nil { + t.Fatalf("Failed to wait for all claims to be bound: %v", err) } klog.V(2).Infof("TestPersistentVolumeProvisionMultiPVCs: claims are bound") @@ -1488,31 +1467,40 @@ func waitForAnyPersistentVolumePhase(w watch.Interface, phase v1.PersistentVolum } } -func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase v1.PersistentVolumeClaimPhase) { - for { - event := <-w.ResultChan() - claim, ok := event.Object.(*v1.PersistentVolumeClaim) - if !ok { - continue +func waitForSomePersistentVolumeClaimPhase(ctx context.Context, testClient clientset.Interface, namespace string, objCount int, watchPVC watch.Interface, phase v1.PersistentVolumeClaimPhase) error { + return wait.ExponentialBackoffWithContext(ctx, retry.DefaultBackoff, func(ctx context.Context) (bool, error) { + for i := 0; i < objCount; i++ { + waitErr := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound) + if waitErr != nil { + klog.Errorf("Failed to wait for a claim (%d/%d) to be bound: %v", i+1, objCount, waitErr) + klog.Info("Recreating a watch for claims and re-checking the count of bound claims") + newWatchPVC, err := testClient.CoreV1().PersistentVolumeClaims(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + watchPVC.Stop() + watchPVC = newWatchPVC + return false, waitErr + } + klog.V(1).Infof("%d claims bound", i+1) } - if claim.Status.Phase == phase { - klog.V(2).Infof("claim %q is %s", claim.Name, phase) - break - } - } + return true, nil + }) } -func waitForAnyPersistentVolumeClaimPhaseAndReportIt(t *testing.T, w watch.Interface, phase v1.PersistentVolumeClaimPhase) { +func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase v1.PersistentVolumeClaimPhase) error { for { - event := <-w.ResultChan() - reportToArtifacts(t.Name()+"-watched-pvcs.text", event) + event, ok := <-w.ResultChan() + if !ok { + return fmt.Errorf("watch closed") + } claim, ok := event.Object.(*v1.PersistentVolumeClaim) if !ok { continue } if claim.Status.Phase == phase { klog.V(2).Infof("claim %q is %s", claim.Name, phase) - break + return nil } } } diff --git a/test/integration/volume/util_test.go b/test/integration/volume/util_test.go deleted file mode 100644 index a6bd6457603..00000000000 --- a/test/integration/volume/util_test.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package volume - -import ( - "encoding/json" - "os" - "path/filepath" - - "k8s.io/klog/v2" -) - -func reportToArtifacts(filename string, obj any) { - if os.Getenv("ARTIFACTS") == "" { - return - } - path := filepath.Join(os.Getenv("ARTIFACTS"), filename) - file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - klog.Error("Error opening file:", err) - return - } - defer func() { - if err := file.Close(); err != nil { - klog.Error("Error closing file:", err) - } - }() - - content, err := json.Marshal(obj) - if err != nil { - klog.Error("Error marshalling to json:", err) - return - } - content = append(content, '\n') - _, err = file.Write(content) - if err != nil { - klog.Error("Error writing to file:", err) - return - } -}