Merge pull request #127260 from carlory/fix-124136

Fix TestPersistentVolumeProvisionMultiPVCs
This commit is contained in:
Kubernetes Prow Robot 2024-10-04 15:02:50 +01:00 committed by GitHub
commit 7478a30fdc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 105 deletions

View File

@ -29,12 +29,14 @@ import (
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -284,7 +286,9 @@ func TestPersistentVolumeBindRace(t *testing.T) {
waitForPersistentVolumePhase(testClient, pv.Name, watchPV, v1.VolumeBound)
klog.V(2).Infof("TestPersistentVolumeBindRace pv bound")
waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound)
if err := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound); err != nil {
t.Fatalf("Unexpected error waiting for any pvc to be bound: %v", err)
}
klog.V(2).Infof("TestPersistentVolumeBindRace pvc bound")
pv, err = testClient.CoreV1().PersistentVolumes().Get(context.TODO(), pv.Name, metav1.GetOptions{})
@ -871,10 +875,11 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
}()
// wait until the binder pairs all claims
for i := 0; i < objCount; i++ {
waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound)
klog.V(1).Infof("%d claims bound", i+1)
err := waitForSomePersistentVolumeClaimPhase(tCtx, testClient, namespaceName, objCount, watchPVC, v1.ClaimBound)
if err != nil {
t.Fatalf("Failed to wait for all claims to be bound: %v", err)
}
// wait until the binder pairs all volumes
for i := 0; i < objCount; i++ {
waitForPersistentVolumePhase(testClient, pvs[i].Name, watchPV, v1.VolumeBound)
@ -952,7 +957,9 @@ func TestPersistentVolumeControllerStartup(t *testing.T) {
// Drain watchPVC with all events generated by the PVC until it's bound
// We don't want to catch "PVC created with Status.Phase == Pending"
// later in this test.
waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound)
if err := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound); err != nil {
t.Fatalf("Unexpected error waiting for any pvc to be bound: %v", err)
}
pv := createPV(pvName, "/tmp/foo"+strconv.Itoa(i), "1G",
[]v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRetain)
@ -1058,34 +1065,6 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
defer testClient.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
defer testClient.StorageV1().StorageClasses().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
// Watch all events in the namespace, and save them to artifacts for debugging.
// TODO: This is a temporary solution to debug flaky tests `panic: test timed out after 10m0s`.
// We should remove this once https://github.com/kubernetes/kubernetes/issues/124136 is fixed.
go func() {
w, err := testClient.EventsV1().Events(ns.Name).Watch(tCtx, metav1.ListOptions{})
if err != nil {
return
}
for {
select {
case event, ok := <-w.ResultChan():
if !ok {
klog.Info("Event watch channel closed")
w, err = testClient.EventsV1().Events(ns.Name).Watch(tCtx, metav1.ListOptions{})
if err != nil {
klog.ErrorS(err, "Failed to restart event watch")
return
}
continue
}
reportToArtifacts(t.Name()+"-events.text", event.Object)
case <-tCtx.Done():
w.Stop()
return
}
}
}()
storageClass := storage.StorageClass{
TypeMeta: metav1.TypeMeta{
Kind: "StorageClass",
@ -1117,9 +1096,9 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
}()
// Wait until the controller provisions and binds all of them
for i := 0; i < objCount; i++ {
waitForAnyPersistentVolumeClaimPhaseAndReportIt(t, watchPVC, v1.ClaimBound)
klog.V(1).Infof("%d claims bound", i+1)
err := waitForSomePersistentVolumeClaimPhase(tCtx, testClient, namespaceName, objCount, watchPVC, v1.ClaimBound)
if err != nil {
t.Fatalf("Failed to wait for all claims to be bound: %v", err)
}
klog.V(2).Infof("TestPersistentVolumeProvisionMultiPVCs: claims are bound")
@ -1488,31 +1467,40 @@ func waitForAnyPersistentVolumePhase(w watch.Interface, phase v1.PersistentVolum
}
}
func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase v1.PersistentVolumeClaimPhase) {
for {
event := <-w.ResultChan()
claim, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
continue
func waitForSomePersistentVolumeClaimPhase(ctx context.Context, testClient clientset.Interface, namespace string, objCount int, watchPVC watch.Interface, phase v1.PersistentVolumeClaimPhase) error {
return wait.ExponentialBackoffWithContext(ctx, retry.DefaultBackoff, func(ctx context.Context) (bool, error) {
for i := 0; i < objCount; i++ {
waitErr := waitForAnyPersistentVolumeClaimPhase(watchPVC, v1.ClaimBound)
if waitErr != nil {
klog.Errorf("Failed to wait for a claim (%d/%d) to be bound: %v", i+1, objCount, waitErr)
klog.Info("Recreating a watch for claims and re-checking the count of bound claims")
newWatchPVC, err := testClient.CoreV1().PersistentVolumeClaims(namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
return false, err
}
watchPVC.Stop()
watchPVC = newWatchPVC
return false, waitErr
}
klog.V(1).Infof("%d claims bound", i+1)
}
if claim.Status.Phase == phase {
klog.V(2).Infof("claim %q is %s", claim.Name, phase)
break
}
}
return true, nil
})
}
func waitForAnyPersistentVolumeClaimPhaseAndReportIt(t *testing.T, w watch.Interface, phase v1.PersistentVolumeClaimPhase) {
func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase v1.PersistentVolumeClaimPhase) error {
for {
event := <-w.ResultChan()
reportToArtifacts(t.Name()+"-watched-pvcs.text", event)
event, ok := <-w.ResultChan()
if !ok {
return fmt.Errorf("watch closed")
}
claim, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
continue
}
if claim.Status.Phase == phase {
klog.V(2).Infof("claim %q is %s", claim.Name, phase)
break
return nil
}
}
}

View File

@ -1,54 +0,0 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volume
import (
"encoding/json"
"os"
"path/filepath"
"k8s.io/klog/v2"
)
func reportToArtifacts(filename string, obj any) {
if os.Getenv("ARTIFACTS") == "" {
return
}
path := filepath.Join(os.Getenv("ARTIFACTS"), filename)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
klog.Error("Error opening file:", err)
return
}
defer func() {
if err := file.Close(); err != nil {
klog.Error("Error closing file:", err)
}
}()
content, err := json.Marshal(obj)
if err != nil {
klog.Error("Error marshalling to json:", err)
return
}
content = append(content, '\n')
_, err = file.Write(content)
if err != nil {
klog.Error("Error writing to file:", err)
return
}
}