diff --git a/test/integration/scheduler_perf/scheduler_bench_test.go b/test/integration/scheduler_perf/scheduler_bench_test.go index ef781443ee2..8a12044e273 100644 --- a/test/integration/scheduler_perf/scheduler_bench_test.go +++ b/test/integration/scheduler_perf/scheduler_bench_test.go @@ -128,6 +128,25 @@ func BenchmarkSchedulingInTreePVs(b *testing.B) { } } +// BenchmarkSchedulingWaitForFirstConsumerPVs benchmarks the scheduling rate +// of pods with volumes with VolumeBindingMode set to WaitForFirstConsumer. +func BenchmarkSchedulingWaitForFirstConsumerPVs(b *testing.B) { + tests := []struct{ nodes, existingPods, minPods int }{ + {nodes: 500, existingPods: 500, minPods: 1000}, + // default 5000 existingPods is a way too much for now + } + basePod := makeBasePod() + testStrategy := testutils.NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(gceVolumeFactory, basePod) + nodeStrategy := testutils.NewLabelNodePrepareStrategy(v1.LabelZoneFailureDomain, "zone1") + for _, test := range tests { + name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) + b.Run(name, func(b *testing.B) { + nodeStrategies := []testutils.CountToStrategy{{Count: test.nodes, Strategy: nodeStrategy}} + benchmarkScheduling(test.existingPods, test.minPods, nodeStrategies, testStrategy, b) + }) + } +} + // BenchmarkSchedulingMigratedInTreePVs benchmarks the scheduling rate of pods with // in-tree volumes (used via PV/PVC) that are migrated to CSI. CSINode instances exist // for all nodes and have proper annotation that AWS is migrated. @@ -557,6 +576,42 @@ func awsVolumeFactory(id int) *v1.PersistentVolume { } } +func gceVolumeFactory(id int) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vol-%d", id), + }, + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), + }, + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain, + PersistentVolumeSource: v1.PersistentVolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + FSType: "ext4", + PDName: fmt.Sprintf("vol-%d-pvc", id), + }, + }, + NodeAffinity: &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: v1.LabelZoneFailureDomain, + Operator: v1.NodeSelectorOpIn, + Values: []string{"zone1"}, + }, + }, + }, + }, + }, + }, + }, + } +} + func csiVolumeFactory(id int) *v1.PersistentVolume { return &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index b099b73caa5..6779ee18539 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -61,8 +61,10 @@ func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clients Burst: 5000, }) _, podInformer, schedulerShutdown := util.StartScheduler(clientSet) + fakePVControllerShutdown := util.StartFakePVController(clientSet) shutdownFunc := func() { + fakePVControllerShutdown() schedulerShutdown() apiShutdown() } diff --git a/test/integration/util/BUILD b/test/integration/util/BUILD index b998d29714e..4b9e82af0ef 100644 --- a/test/integration/util/BUILD +++ b/test/integration/util/BUILD @@ -14,11 +14,14 @@ go_library( importpath = "k8s.io/kubernetes/test/integration/util", deps = [ "//pkg/api/legacyscheme:go_default_library", + "//pkg/controller/volume/persistentvolume/util:go_default_library", "//pkg/scheduler:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library", "//test/integration/framework:go_default_library", diff --git a/test/integration/util/util.go b/test/integration/util/util.go index c06b0cdbe48..d535d96afd5 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -22,12 +22,15 @@ import ( "net/http/httptest" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" "k8s.io/klog" "k8s.io/kubernetes/pkg/api/legacyscheme" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" "k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/test/integration/framework" ) @@ -87,6 +90,48 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein return sched, podInformer, shutdownFunc } +// StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding. +// TODO(mborsz): Use a real PV controller here. +func StartFakePVController(clientSet clientset.Interface) ShutdownFunc { + ctx, cancel := context.WithCancel(context.Background()) + + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + pvInformer := informerFactory.Core().V1().PersistentVolumes() + + syncPV := func(obj *v1.PersistentVolume) { + if obj.Spec.ClaimRef != nil { + claimRef := obj.Spec.ClaimRef + pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{}) + if err != nil { + klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + return + } + + if pvc.Spec.VolumeName == "" { + pvc.Spec.VolumeName = obj.Name + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes") + _, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + return + } + } + } + } + + pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + syncPV(obj.(*v1.PersistentVolume)) + }, + UpdateFunc: func(_, obj interface{}) { + syncPV(obj.(*v1.PersistentVolume)) + }, + }) + + informerFactory.Start(ctx.Done()) + return ShutdownFunc(cancel) +} + // createScheduler create a scheduler with given informer factory and default name. func createScheduler( clientSet clientset.Interface, diff --git a/test/utils/BUILD b/test/utils/BUILD index cd04e9f89a2..7578811008a 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -39,6 +39,7 @@ go_library( "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library", "//staging/src/k8s.io/api/batch/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", diff --git a/test/utils/create_resources.go b/test/utils/create_resources.go index 51b92ace88f..661ff864208 100644 --- a/test/utils/create_resources.go +++ b/test/utils/create_resources.go @@ -25,6 +25,8 @@ import ( apps "k8s.io/api/apps/v1" batch "k8s.io/api/batch/v1" + storage "k8s.io/api/storage/v1" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -218,6 +220,23 @@ func CreateServiceWithRetries(c clientset.Interface, namespace string, obj *v1.S return RetryWithExponentialBackOff(createFunc) } +func CreateStorageClassWithRetries(c clientset.Interface, obj *storage.StorageClass) error { + if obj == nil { + return fmt.Errorf("Object provided to create is empty") + } + createFunc := func() (bool, error) { + _, err := c.StorageV1().StorageClasses().Create(context.TODO(), obj, metav1.CreateOptions{}) + if err == nil || apierrors.IsAlreadyExists(err) { + return true, nil + } + if IsRetryableAPIError(err) { + return false, nil + } + return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err) + } + return RetryWithExponentialBackOff(createFunc) +} + func CreateResourceQuotaWithRetries(c clientset.Interface, namespace string, obj *v1.ResourceQuota) error { if obj == nil { return fmt.Errorf("Object provided to create is empty") diff --git a/test/utils/runners.go b/test/utils/runners.go index 164b985651c..6c4218f5f50 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -28,6 +28,7 @@ import ( apps "k8s.io/api/apps/v1" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -1349,35 +1350,50 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe return createError } -func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int) error { +func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int, bindVolume bool) error { var createError error lock := sync.Mutex{} createPodFunc := func(i int) { pvcName := fmt.Sprintf("pvc-%d", i) - + // pvc + pvc := claimTemplate.DeepCopy() + pvc.Name = pvcName // pv pv := factory(i) - // bind to "pvc-$i" - pv.Spec.ClaimRef = &v1.ObjectReference{ - Kind: "PersistentVolumeClaim", - Namespace: namespace, - Name: pvcName, - APIVersion: "v1", + // PVs are cluster-wide resources. + // Prepend a namespace to make the name globally unique. + pv.Name = fmt.Sprintf("%s-%s", namespace, pv.Name) + if bindVolume { + // bind pv to "pvc-$i" + pv.Spec.ClaimRef = &v1.ObjectReference{ + Kind: "PersistentVolumeClaim", + Namespace: namespace, + Name: pvcName, + APIVersion: "v1", + } + pv.Status.Phase = v1.VolumeBound + + // bind pvc to "pv-$i" + // pvc.Spec.VolumeName = pv.Name + pvc.Status.Phase = v1.ClaimBound + } else { + pv.Status.Phase = v1.VolumeAvailable } - pv.Status.Phase = v1.VolumeBound if err := CreatePersistentVolumeWithRetries(client, pv); err != nil { lock.Lock() defer lock.Unlock() createError = fmt.Errorf("error creating PV: %s", err) return } + // We need to update status separately, as creating persistentvolumes resets status to the default one + // (so with Status.Phase will be equal to PersistentVolumePhase). + if _, err := client.CoreV1().PersistentVolumes().UpdateStatus(context.TODO(), pv, metav1.UpdateOptions{}); err != nil { + lock.Lock() + defer lock.Unlock() + createError = fmt.Errorf("error creating PV: %s", err) + return + } - // pvc - pvc := claimTemplate.DeepCopy() - pvc.Name = pvcName - // bind to "pv-$i" - pvc.Spec.VolumeName = pv.Name - pvc.Status.Phase = v1.ClaimBound if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil { lock.Lock() defer lock.Unlock() @@ -1446,9 +1462,50 @@ type volumeFactory func(uniqueID int) *v1.PersistentVolume func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy { return func(client clientset.Interface, namespace string, podCount int) error { - return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount) + return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount, true /* bindVolume */) } } + +func makeUnboundPersistentVolumeClaim(storageClass string) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + StorageClassName: &storageClass, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), + }, + }, + }, + } +} + +func NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy { + return func(client clientset.Interface, namespace string, podCount int) error { + volumeBindingMode := storage.VolumeBindingWaitForFirstConsumer + storageClass := &storage.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "storage-class-1", + }, + Provisioner: "kubernetes.io/gce-pd", + VolumeBindingMode: &volumeBindingMode, + } + claimTemplate := makeUnboundPersistentVolumeClaim(storageClass.Name) + + if err := CreateStorageClassWithRetries(client, storageClass); err != nil { + return fmt.Errorf("failed to create storage class: %v", err) + } + + factoryWithStorageClass := func(i int) *v1.PersistentVolume { + pv := factory(i) + pv.Spec.StorageClassName = storageClass.Name + return pv + } + + return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factoryWithStorageClass, podTemplate, podCount, false /* bindVolume */) + } +} + func NewSimpleCreatePodStrategy() TestPodCreateStrategy { basePod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{