fix: use workqueue to speed up of volume binding tests

This commit is contained in:
draveness 2019-08-03 11:52:07 +08:00
parent 0d7636a92f
commit bd3c8390c6
2 changed files with 28 additions and 18 deletions

View File

@ -32,6 +32,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",
"//test/utils/image:go_default_library", "//test/utils/image:go_default_library",

View File

@ -19,6 +19,7 @@ package volumescheduling
// This file tests the VolumeScheduling feature. // This file tests the VolumeScheduling feature.
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"strconv" "strconv"
@ -38,6 +39,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
@ -432,9 +434,10 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration,
klog.Infof("Start creating PVs and PVCs") klog.Infof("Start creating PVs and PVCs")
// Create enough PVs and PVCs for all the pods // Create enough PVs and PVCs for all the pods
pvs := []*v1.PersistentVolume{} podVolumesCount := podLimit * volsPerPod
pvcs := []*v1.PersistentVolumeClaim{} pvs := make([]*v1.PersistentVolume, podVolumesCount)
for i := 0; i < podLimit*volsPerPod; i++ { pvcs := make([]*v1.PersistentVolumeClaim, podVolumesCount)
workqueue.ParallelizeUntil(context.TODO(), 16, podVolumesCount, func(i int) {
var ( var (
pv *v1.PersistentVolume pv *v1.PersistentVolume
pvc *v1.PersistentVolumeClaim pvc *v1.PersistentVolumeClaim
@ -453,7 +456,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration,
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
} }
pvs = append(pvs, pv) pvs[i] = pv
} }
if pv != nil && pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Name == pvcName { if pv != nil && pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Name == pvcName {
pvc = makePVC(pvcName, config.ns, &classImmediate, pv.Name) pvc = makePVC(pvcName, config.ns, &classImmediate, pv.Name)
@ -463,12 +466,12 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration,
if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil { if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil {
t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err) t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
} }
pvcs = append(pvcs, pvc) pvcs[i] = pvc
} })
klog.Infof("Start creating Pods") klog.Infof("Start creating Pods")
pods := []*v1.Pod{} pods := make([]*v1.Pod, podLimit)
for i := 0; i < podLimit; i++ { workqueue.ParallelizeUntil(context.TODO(), 16, podLimit, func(i int) {
// Generate string of all the PVCs for the pod // Generate string of all the PVCs for the pod
podPvcs := []string{} podPvcs := []string{}
for j := i * volsPerPod; j < (i+1)*volsPerPod; j++ { for j := i * volsPerPod; j < (i+1)*volsPerPod; j++ {
@ -479,27 +482,32 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration,
if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil { if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
} }
pods = append(pods, pod) pods[i] = pod
} })
klog.Infof("Start validating pod scheduled") klog.Infof("Start validating pod scheduled")
// Validate Pods scheduled // Validate Pods scheduled
for _, pod := range pods { workqueue.ParallelizeUntil(context.TODO(), 16, len(pods), func(i int) {
pod := pods[i]
// Use increased timeout for stress test because there is a higher chance of // Use increased timeout for stress test because there is a higher chance of
// PV sync error // PV sync error
if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil { if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil {
t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err) t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
} }
} })
klog.Infof("Start validating PVCs scheduled") klog.Infof("Start validating PVCs scheduled")
// Validate PVC/PV binding // Validate PVC/PV binding
for _, pvc := range pvcs { workqueue.ParallelizeUntil(context.TODO(), 16, len(pvcs), func(i int) {
validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound, dynamic) validatePVCPhase(t, config.client, pvcs[i].Name, config.ns, v1.ClaimBound, dynamic)
} })
klog.Infof("Start validating PVs scheduled")
for _, pv := range pvs { // Don't validate pv for dynamic provisioning test
validatePVPhase(t, config.client, pv.Name, v1.VolumeBound) if !dynamic {
klog.Infof("Start validating PVs scheduled")
workqueue.ParallelizeUntil(context.TODO(), 16, len(pvs), func(i int) {
validatePVPhase(t, config.client, pvs[i].Name, v1.VolumeBound)
})
} }
} }
@ -921,6 +929,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t
ns: ns, ns: ns,
stop: context.stopCh, stop: context.stopCh,
teardown: func() { teardown: func() {
klog.Infof("test cluster %q start to tear down", ns)
deleteTestObjects(clientset, ns, nil) deleteTestObjects(clientset, ns, nil)
cleanupTest(t, context) cleanupTest(t, context)
}, },