diff --git a/test/integration/volumescheduling/BUILD b/test/integration/volumescheduling/BUILD index 36687ea9b58..cb5b316f393 100644 --- a/test/integration/volumescheduling/BUILD +++ b/test/integration/volumescheduling/BUILD @@ -32,6 +32,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//test/integration/framework:go_default_library", "//test/utils/image:go_default_library", diff --git a/test/integration/volumescheduling/volume_binding_test.go b/test/integration/volumescheduling/volume_binding_test.go index 531f76aa2f5..26934663de4 100644 --- a/test/integration/volumescheduling/volume_binding_test.go +++ b/test/integration/volumescheduling/volume_binding_test.go @@ -19,6 +19,7 @@ package volumescheduling // This file tests the VolumeScheduling feature. import ( + "context" "fmt" "os" "strconv" @@ -38,6 +39,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" @@ -432,9 +434,10 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, klog.Infof("Start creating PVs and PVCs") // Create enough PVs and PVCs for all the pods - pvs := []*v1.PersistentVolume{} - pvcs := []*v1.PersistentVolumeClaim{} - for i := 0; i < podLimit*volsPerPod; i++ { + podVolumesCount := podLimit * volsPerPod + pvs := make([]*v1.PersistentVolume, podVolumesCount) + pvcs := make([]*v1.PersistentVolumeClaim, podVolumesCount) + workqueue.ParallelizeUntil(context.TODO(), 16, podVolumesCount, func(i int) { var ( pv *v1.PersistentVolume pvc *v1.PersistentVolumeClaim @@ -453,7 +456,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) } - pvs = append(pvs, pv) + pvs[i] = pv } if pv != nil && pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Name == pvcName { pvc = makePVC(pvcName, config.ns, &classImmediate, pv.Name) @@ -463,12 +466,12 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil { t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err) } - pvcs = append(pvcs, pvc) - } + pvcs[i] = pvc + }) klog.Infof("Start creating Pods") - pods := []*v1.Pod{} - for i := 0; i < podLimit; i++ { + pods := make([]*v1.Pod, podLimit) + workqueue.ParallelizeUntil(context.TODO(), 16, podLimit, func(i int) { // Generate string of all the PVCs for the pod podPvcs := []string{} for j := i * volsPerPod; j < (i+1)*volsPerPod; j++ { @@ -479,27 +482,32 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil { t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) } - pods = append(pods, pod) - } + pods[i] = pod + }) klog.Infof("Start validating pod scheduled") // Validate Pods scheduled - for _, pod := range pods { + workqueue.ParallelizeUntil(context.TODO(), 16, len(pods), func(i int) { + pod := pods[i] // Use increased timeout for stress test because there is a higher chance of // PV sync error if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil { t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err) } - } + }) klog.Infof("Start validating PVCs scheduled") // Validate PVC/PV binding - for _, pvc := range pvcs { - validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound, dynamic) - } - klog.Infof("Start validating PVs scheduled") - for _, pv := range pvs { - validatePVPhase(t, config.client, pv.Name, v1.VolumeBound) + workqueue.ParallelizeUntil(context.TODO(), 16, len(pvcs), func(i int) { + validatePVCPhase(t, config.client, pvcs[i].Name, config.ns, v1.ClaimBound, dynamic) + }) + + // Don't validate pv for dynamic provisioning test + if !dynamic { + klog.Infof("Start validating PVs scheduled") + workqueue.ParallelizeUntil(context.TODO(), 16, len(pvs), func(i int) { + validatePVPhase(t, config.client, pvs[i].Name, v1.VolumeBound) + }) } } @@ -921,6 +929,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t ns: ns, stop: context.stopCh, teardown: func() { + klog.Infof("test cluster %q start to tear down", ns) deleteTestObjects(clientset, ns, nil) cleanupTest(t, context) },