Merge pull request #88318 from mborsz/bench

Add BenchmarkSchedulingWaitForFirstConsumerPVs benchmark
This commit is contained in:
Kubernetes Prow Robot 2020-02-25 07:52:49 -08:00 committed by GitHub
commit fe9073b8c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 16 deletions

View File

@ -128,6 +128,25 @@ func BenchmarkSchedulingInTreePVs(b *testing.B) {
}
}
// BenchmarkSchedulingWaitForFirstConsumerPVs benchmarks the scheduling rate
// of pods with volumes with VolumeBindingMode set to WaitForFirstConsumer.
func BenchmarkSchedulingWaitForFirstConsumerPVs(b *testing.B) {
tests := []struct{ nodes, existingPods, minPods int }{
{nodes: 500, existingPods: 500, minPods: 1000},
// default 5000 existingPods is a way too much for now
}
basePod := makeBasePod()
testStrategy := testutils.NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(gceVolumeFactory, basePod)
nodeStrategy := testutils.NewLabelNodePrepareStrategy(v1.LabelZoneFailureDomain, "zone1")
for _, test := range tests {
name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods)
b.Run(name, func(b *testing.B) {
nodeStrategies := []testutils.CountToStrategy{{Count: test.nodes, Strategy: nodeStrategy}}
benchmarkScheduling(test.existingPods, test.minPods, nodeStrategies, testStrategy, b)
})
}
}
// BenchmarkSchedulingMigratedInTreePVs benchmarks the scheduling rate of pods with
// in-tree volumes (used via PV/PVC) that are migrated to CSI. CSINode instances exist
// for all nodes and have proper annotation that AWS is migrated.
@ -557,6 +576,42 @@ func awsVolumeFactory(id int) *v1.PersistentVolume {
}
}
func gceVolumeFactory(id int) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("vol-%d", id),
},
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
},
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain,
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
FSType: "ext4",
PDName: fmt.Sprintf("vol-%d-pvc", id),
},
},
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: v1.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"zone1"},
},
},
},
},
},
},
},
}
}
func csiVolumeFactory(id int) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{

View File

@ -61,8 +61,10 @@ func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clients
Burst: 5000,
})
_, podInformer, schedulerShutdown := util.StartScheduler(clientSet)
fakePVControllerShutdown := util.StartFakePVController(clientSet)
shutdownFunc := func() {
fakePVControllerShutdown()
schedulerShutdown()
apiShutdown()
}

View File

@ -14,11 +14,14 @@ go_library(
importpath = "k8s.io/kubernetes/test/integration/util",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/scheduler:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library",
"//test/integration/framework:go_default_library",

View File

@ -22,12 +22,15 @@ import (
"net/http/httptest"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/legacyscheme"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/test/integration/framework"
)
@ -87,6 +90,48 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein
return sched, podInformer, shutdownFunc
}
// StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding.
// TODO(mborsz): Use a real PV controller here.
func StartFakePVController(clientSet clientset.Interface) ShutdownFunc {
ctx, cancel := context.WithCancel(context.Background())
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
pvInformer := informerFactory.Core().V1().PersistentVolumes()
syncPV := func(obj *v1.PersistentVolume) {
if obj.Spec.ClaimRef != nil {
claimRef := obj.Spec.ClaimRef
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
return
}
if pvc.Spec.VolumeName == "" {
pvc.Spec.VolumeName = obj.Name
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes")
_, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
return
}
}
}
}
pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
syncPV(obj.(*v1.PersistentVolume))
},
UpdateFunc: func(_, obj interface{}) {
syncPV(obj.(*v1.PersistentVolume))
},
})
informerFactory.Start(ctx.Done())
return ShutdownFunc(cancel)
}
// createScheduler create a scheduler with given informer factory and default name.
func createScheduler(
clientSet clientset.Interface,

View File

@ -39,6 +39,7 @@ go_library(
"//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",

View File

@ -25,6 +25,8 @@ import (
apps "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -218,6 +220,23 @@ func CreateServiceWithRetries(c clientset.Interface, namespace string, obj *v1.S
return RetryWithExponentialBackOff(createFunc)
}
func CreateStorageClassWithRetries(c clientset.Interface, obj *storage.StorageClass) error {
if obj == nil {
return fmt.Errorf("Object provided to create is empty")
}
createFunc := func() (bool, error) {
_, err := c.StorageV1().StorageClasses().Create(context.TODO(), obj, metav1.CreateOptions{})
if err == nil || apierrors.IsAlreadyExists(err) {
return true, nil
}
if IsRetryableAPIError(err) {
return false, nil
}
return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err)
}
return RetryWithExponentialBackOff(createFunc)
}
func CreateResourceQuotaWithRetries(c clientset.Interface, namespace string, obj *v1.ResourceQuota) error {
if obj == nil {
return fmt.Errorf("Object provided to create is empty")

View File

@ -28,6 +28,7 @@ import (
apps "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -1349,35 +1350,50 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe
return createError
}
func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int) error {
func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int, bindVolume bool) error {
var createError error
lock := sync.Mutex{}
createPodFunc := func(i int) {
pvcName := fmt.Sprintf("pvc-%d", i)
// pvc
pvc := claimTemplate.DeepCopy()
pvc.Name = pvcName
// pv
pv := factory(i)
// bind to "pvc-$i"
pv.Spec.ClaimRef = &v1.ObjectReference{
Kind: "PersistentVolumeClaim",
Namespace: namespace,
Name: pvcName,
APIVersion: "v1",
// PVs are cluster-wide resources.
// Prepend a namespace to make the name globally unique.
pv.Name = fmt.Sprintf("%s-%s", namespace, pv.Name)
if bindVolume {
// bind pv to "pvc-$i"
pv.Spec.ClaimRef = &v1.ObjectReference{
Kind: "PersistentVolumeClaim",
Namespace: namespace,
Name: pvcName,
APIVersion: "v1",
}
pv.Status.Phase = v1.VolumeBound
// bind pvc to "pv-$i"
// pvc.Spec.VolumeName = pv.Name
pvc.Status.Phase = v1.ClaimBound
} else {
pv.Status.Phase = v1.VolumeAvailable
}
pv.Status.Phase = v1.VolumeBound
if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
lock.Lock()
defer lock.Unlock()
createError = fmt.Errorf("error creating PV: %s", err)
return
}
// We need to update status separately, as creating persistentvolumes resets status to the default one
// (so with Status.Phase will be equal to PersistentVolumePhase).
if _, err := client.CoreV1().PersistentVolumes().UpdateStatus(context.TODO(), pv, metav1.UpdateOptions{}); err != nil {
lock.Lock()
defer lock.Unlock()
createError = fmt.Errorf("error creating PV: %s", err)
return
}
// pvc
pvc := claimTemplate.DeepCopy()
pvc.Name = pvcName
// bind to "pv-$i"
pvc.Spec.VolumeName = pv.Name
pvc.Status.Phase = v1.ClaimBound
if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
lock.Lock()
defer lock.Unlock()
@ -1446,9 +1462,50 @@ type volumeFactory func(uniqueID int) *v1.PersistentVolume
func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
return func(client clientset.Interface, namespace string, podCount int) error {
return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount)
return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount, true /* bindVolume */)
}
}
func makeUnboundPersistentVolumeClaim(storageClass string) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
StorageClassName: &storageClass,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
},
},
},
}
}
func NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
return func(client clientset.Interface, namespace string, podCount int) error {
volumeBindingMode := storage.VolumeBindingWaitForFirstConsumer
storageClass := &storage.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "storage-class-1",
},
Provisioner: "kubernetes.io/gce-pd",
VolumeBindingMode: &volumeBindingMode,
}
claimTemplate := makeUnboundPersistentVolumeClaim(storageClass.Name)
if err := CreateStorageClassWithRetries(client, storageClass); err != nil {
return fmt.Errorf("failed to create storage class: %v", err)
}
factoryWithStorageClass := func(i int) *v1.PersistentVolume {
pv := factory(i)
pv.Spec.StorageClassName = storageClass.Name
return pv
}
return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factoryWithStorageClass, podTemplate, podCount, false /* bindVolume */)
}
}
func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
basePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{