From f6430c0159fb75a40ebacd3b4dcdeb8b429c6433 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Wed, 26 Jun 2019 14:07:17 +0200 Subject: [PATCH] Add benchmark for scheduling of pods with PVs --- test/integration/scheduler_perf/BUILD | 6 + .../scheduler_perf/scheduler_bench_test.go | 245 ++++++++++++++- test/utils/BUILD | 3 + test/utils/create_resources.go | 34 +++ test/utils/runners.go | 279 +++++++++++++++++- 5 files changed, 561 insertions(+), 6 deletions(-) diff --git a/test/integration/scheduler_perf/BUILD b/test/integration/scheduler_perf/BUILD index 462e1c0f1ae..156c3c608b0 100644 --- a/test/integration/scheduler_perf/BUILD +++ b/test/integration/scheduler_perf/BUILD @@ -33,11 +33,17 @@ go_test( embed = [":go_default_library"], tags = ["integration"], deps = [ + "//pkg/features:go_default_library", "//pkg/scheduler/factory:go_default_library", + "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//test/integration/framework:go_default_library", "//test/utils:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/test/integration/scheduler_perf/scheduler_bench_test.go b/test/integration/scheduler_perf/scheduler_bench_test.go index 77196bc4f69..c85c0a23b62 100644 --- a/test/integration/scheduler_perf/scheduler_bench_test.go +++ b/test/integration/scheduler_perf/scheduler_bench_test.go @@ -22,16 +22,27 @@ import ( "time" "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/csi-translation-lib/plugins" + csilibplugins "k8s.io/csi-translation-lib/plugins" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" - - "k8s.io/klog" ) var ( defaultNodeStrategy = &testutils.TrivialNodePrepareStrategy{} + + testCSIDriver = plugins.AWSEBSDriverName + // From PV controller + annBindCompleted = "pv.kubernetes.io/bind-completed" ) // BenchmarkScheduling benchmarks the scheduling rate when the cluster has @@ -79,6 +90,134 @@ func BenchmarkSchedulingPodAntiAffinity(b *testing.B) { } } +// BenchmarkSchedulingSecrets benchmarks the scheduling rate of pods with +// volumes that don't require any special handling, such as Secrets. +// It can be used to compare scheduler efficiency with the other benchmarks +// that use volume scheduling predicates. +func BenchmarkSchedulingSecrets(b *testing.B) { + tests := []struct{ nodes, existingPods, minPods int }{ + {nodes: 500, existingPods: 250, minPods: 250}, + {nodes: 500, existingPods: 5000, minPods: 250}, + {nodes: 1000, existingPods: 1000, minPods: 500}, + {nodes: 5000, existingPods: 1000, minPods: 1000}, + } + // The setup strategy creates pods with no volumes. + setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") + // The test strategy creates pods with a secret. + testBasePod := makeBasePodWithSecret() + testStrategy := testutils.NewCustomCreatePodStrategy(testBasePod) + for _, test := range tests { + name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) + b.Run(name, func(b *testing.B) { + benchmarkScheduling(test.nodes, test.existingPods, test.minPods, defaultNodeStrategy, setupStrategy, testStrategy, b) + }) + } +} + +// BenchmarkSchedulingInTreePVs benchmarks the scheduling rate of pods with +// in-tree volumes (used via PV/PVC). Nodes have default hardcoded attach limits +// (39 for AWS EBS). +func BenchmarkSchedulingInTreePVs(b *testing.B) { + tests := []struct{ nodes, existingPods, minPods int }{ + {nodes: 500, existingPods: 250, minPods: 250}, + {nodes: 500, existingPods: 5000, minPods: 250}, + {nodes: 1000, existingPods: 1000, minPods: 500}, + {nodes: 5000, existingPods: 1000, minPods: 1000}, + } + // The setup strategy creates pods with no volumes. + setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") + + // The test strategy creates pods with AWS EBS volume used via PV. + baseClaim := makeBasePersistentVolumeClaim() + basePod := makeBasePod() + testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, awsVolumeFactory, basePod) + for _, test := range tests { + name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) + b.Run(name, func(b *testing.B) { + benchmarkScheduling(test.nodes, test.existingPods, test.minPods, defaultNodeStrategy, setupStrategy, testStrategy, b) + }) + } +} + +// BenchmarkSchedulingMigratedInTreePVs benchmarks the scheduling rate of pods with +// in-tree volumes (used via PV/PVC) that are migrated to CSI. CSINode instances exist +// for all nodes and have proper annotation that AWS is migrated. +func BenchmarkSchedulingMigratedInTreePVs(b *testing.B) { + tests := []struct{ nodes, existingPods, minPods int }{ + {nodes: 500, existingPods: 250, minPods: 250}, + {nodes: 500, existingPods: 5000, minPods: 250}, + {nodes: 1000, existingPods: 1000, minPods: 500}, + {nodes: 5000, existingPods: 1000, minPods: 1000}, + } + // The setup strategy creates pods with no volumes. + setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") + + // The test strategy creates pods with AWS EBS volume used via PV. + baseClaim := makeBasePersistentVolumeClaim() + basePod := makeBasePod() + testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, awsVolumeFactory, basePod) + + // Each node can use the same amount of CSI volumes as in-tree AWS volume + // plugin, so the results should be comparable with BenchmarkSchedulingInTreePVs. + driverKey := util.GetCSIAttachLimitKey(testCSIDriver) + allocatable := map[v1.ResourceName]string{ + v1.ResourceName(driverKey): fmt.Sprintf("%d", util.DefaultMaxEBSVolumes), + } + var count int32 = util.DefaultMaxEBSVolumes + csiAllocatable := map[string]*storagev1beta1.VolumeNodeResources{ + testCSIDriver: { + Count: &count, + }, + } + nodeStrategy := testutils.NewNodeAllocatableStrategy(allocatable, csiAllocatable, []string{csilibplugins.AWSEBSInTreePluginName}) + for _, test := range tests { + name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) + b.Run(name, func(b *testing.B) { + defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() + defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() + benchmarkScheduling(test.nodes, test.existingPods, test.minPods, nodeStrategy, setupStrategy, testStrategy, b) + }) + } +} + +// node.status.allocatable. +func BenchmarkSchedulingCSIPVs(b *testing.B) { + tests := []struct{ nodes, existingPods, minPods int }{ + {nodes: 500, existingPods: 250, minPods: 250}, + {nodes: 500, existingPods: 5000, minPods: 250}, + {nodes: 1000, existingPods: 1000, minPods: 500}, + {nodes: 5000, existingPods: 1000, minPods: 1000}, + } + + // The setup strategy creates pods with no volumes. + setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") + + // The test strategy creates pods with CSI volume via PV. + baseClaim := makeBasePersistentVolumeClaim() + basePod := makeBasePod() + testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, csiVolumeFactory, basePod) + + // Each node can use the same amount of CSI volumes as in-tree AWS volume + // plugin, so the results should be comparable with BenchmarkSchedulingInTreePVs. + driverKey := util.GetCSIAttachLimitKey(testCSIDriver) + allocatable := map[v1.ResourceName]string{ + v1.ResourceName(driverKey): fmt.Sprintf("%d", util.DefaultMaxEBSVolumes), + } + var count int32 = util.DefaultMaxEBSVolumes + csiAllocatable := map[string]*storagev1beta1.VolumeNodeResources{ + testCSIDriver: { + Count: &count, + }, + } + nodeStrategy := testutils.NewNodeAllocatableStrategy(allocatable, csiAllocatable, []string{}) + for _, test := range tests { + name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) + b.Run(name, func(b *testing.B) { + benchmarkScheduling(test.nodes, test.existingPods, test.minPods, nodeStrategy, setupStrategy, testStrategy, b) + }) + } +} + // BenchmarkSchedulingPodAffinity benchmarks the scheduling rate of pods with // PodAffinity rules when the cluster has various quantities of nodes and // scheduled pods. @@ -265,8 +404,110 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int, if len(scheduled) >= numExistingPods+b.N { break } + // Note: This might introduce slight deviation in accuracy of benchmark results. // Since the total amount of time is relatively large, it might not be a concern. time.Sleep(100 * time.Millisecond) } } + +// makeBasePodWithSecrets creates a Pod object to be used as a template. +// The pod uses a single Secrets volume. +func makeBasePodWithSecret() *v1.Pod { + basePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "secret-volume-", + }, + Spec: testutils.MakePodSpec(), + } + + volumes := []v1.Volume{ + { + Name: "secret", + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: "secret", + }, + }, + }, + } + basePod.Spec.Volumes = volumes + return basePod +} + +// makeBasePod creates a Pod object to be used as a template. +func makeBasePod() *v1.Pod { + basePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-", + }, + Spec: testutils.MakePodSpec(), + } + return basePod +} + +func makeBasePersistentVolumeClaim() *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + // Name is filled in NewCreatePodWithPersistentVolumeStrategy + Annotations: map[string]string{ + annBindCompleted: "true", + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), + }, + }, + }, + } +} + +func awsVolumeFactory(id int) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vol-%d", id), + }, + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), + }, + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain, + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + // VolumeID must be unique for each PV, so every PV is + // counted as a separate volume in MaxPDVolumeCountChecker + // predicate. + VolumeID: fmt.Sprintf("vol-%d", id), + }, + }, + }, + } +} + +func csiVolumeFactory(id int) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vol-%d", id), + }, + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), + }, + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain, + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + // Handle must be unique for each PV, so every PV is + // counted as a separate volume in CSIMaxVolumeLimitChecker + // predicate. + VolumeHandle: fmt.Sprintf("vol-%d", id), + Driver: testCSIDriver, + }, + }, + }, + } +} diff --git a/test/utils/BUILD b/test/utils/BUILD index 01c2113ad86..9dc20634518 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -38,6 +38,7 @@ go_library( "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library", "//staging/src/k8s.io/api/batch/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", @@ -47,8 +48,10 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", diff --git a/test/utils/create_resources.go b/test/utils/create_resources.go index 173f296c676..c40a0672264 100644 --- a/test/utils/create_resources.go +++ b/test/utils/create_resources.go @@ -232,3 +232,37 @@ func CreateResourceQuotaWithRetries(c clientset.Interface, namespace string, obj } return RetryWithExponentialBackOff(createFunc) } + +func CreatePersistentVolumeWithRetries(c clientset.Interface, obj *v1.PersistentVolume) error { + if obj == nil { + return fmt.Errorf("Object provided to create is empty") + } + createFunc := func() (bool, error) { + _, err := c.CoreV1().PersistentVolumes().Create(obj) + if err == nil || apierrs.IsAlreadyExists(err) { + return true, nil + } + if IsRetryableAPIError(err) { + return false, nil + } + return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err) + } + return RetryWithExponentialBackOff(createFunc) +} + +func CreatePersistentVolumeClaimWithRetries(c clientset.Interface, namespace string, obj *v1.PersistentVolumeClaim) error { + if obj == nil { + return fmt.Errorf("Object provided to create is empty") + } + createFunc := func() (bool, error) { + _, err := c.CoreV1().PersistentVolumeClaims(namespace).Create(obj) + if err == nil || apierrs.IsAlreadyExists(err) { + return true, nil + } + if IsRetryableAPIError(err) { + return false, nil + } + return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err) + } + return RetryWithExponentialBackOff(createFunc) +} diff --git a/test/utils/runners.go b/test/utils/runners.go index 74fad6c1fd2..c5361dd3df6 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -28,6 +28,7 @@ import ( apps "k8s.io/api/apps/v1" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -36,7 +37,9 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" @@ -908,12 +911,22 @@ type TestNodePreparer interface { } type PrepareNodeStrategy interface { + // Modify pre-created Node objects before the test starts. PreparePatch(node *v1.Node) []byte + // Create or modify any objects that depend on the node before the test starts. + // Caller will re-try when http.StatusConflict error is returned. + PrepareDependentObjects(node *v1.Node, client clientset.Interface) error + // Clean up any node modifications after the test finishes. CleanupNode(node *v1.Node) *v1.Node + // Clean up any objects that depend on the node after the test finishes. + // Caller will re-try when http.StatusConflict error is returned. + CleanupDependentObjects(nodeName string, client clientset.Interface) error } type TrivialNodePrepareStrategy struct{} +var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{} + func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte { return []byte{} } @@ -923,11 +936,21 @@ func (*TrivialNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node { return &nodeCopy } +func (*TrivialNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error { + return nil +} + +func (*TrivialNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error { + return nil +} + type LabelNodePrepareStrategy struct { labelKey string labelValue string } +var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{} + func NewLabelNodePrepareStrategy(labelKey string, labelValue string) *LabelNodePrepareStrategy { return &LabelNodePrepareStrategy{ labelKey: labelKey, @@ -949,6 +972,148 @@ func (s *LabelNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node { return nodeCopy } +func (*LabelNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error { + return nil +} + +func (*LabelNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error { + return nil +} + +// NodeAllocatableStrategy fills node.status.allocatable and csiNode.spec.drivers[*].allocatable. +// csiNode is created if it does not exist. On cleanup, any csiNode.spec.drivers[*].allocatable is +// set to nil. +type NodeAllocatableStrategy struct { + // Node.status.allocatable to fill to all nodes. + nodeAllocatable map[v1.ResourceName]string + // Map -> VolumeNodeResources to fill into csiNode.spec.drivers[]. + csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources + // List of in-tree volume plugins migrated to CSI. + migratedPlugins []string +} + +var _ PrepareNodeStrategy = &NodeAllocatableStrategy{} + +func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy { + return &NodeAllocatableStrategy{nodeAllocatable, csiNodeAllocatable, migratedPlugins} +} + +func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte { + newNode := node.DeepCopy() + for name, value := range s.nodeAllocatable { + newNode.Status.Allocatable[name] = resource.MustParse(value) + } + + oldJSON, err := json.Marshal(node) + if err != nil { + panic(err) + } + newJSON, err := json.Marshal(newNode) + if err != nil { + panic(err) + } + + patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{}) + if err != nil { + panic(err) + } + return patch +} + +func (s *NodeAllocatableStrategy) CleanupNode(node *v1.Node) *v1.Node { + nodeCopy := node.DeepCopy() + for name := range s.nodeAllocatable { + delete(nodeCopy.Status.Allocatable, name) + } + return nodeCopy +} + +func (s *NodeAllocatableStrategy) createCSINode(nodeName string, client clientset.Interface) error { + csiNode := &storagev1beta1.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Annotations: map[string]string{ + v1.MigratedPluginsAnnotationKey: strings.Join(s.migratedPlugins, ","), + }, + }, + Spec: storagev1beta1.CSINodeSpec{ + Drivers: []storagev1beta1.CSINodeDriver{}, + }, + } + + for driver, allocatable := range s.csiNodeAllocatable { + d := storagev1beta1.CSINodeDriver{ + Name: driver, + Allocatable: allocatable, + NodeID: nodeName, + } + csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d) + } + + _, err := client.StorageV1beta1().CSINodes().Create(csiNode) + if apierrs.IsAlreadyExists(err) { + // Something created CSINode instance after we checked it did not exist. + // Make the caller to re-try PrepareDependentObjects by returning Conflict error + err = apierrs.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err) + } + return err +} + +func (s *NodeAllocatableStrategy) updateCSINode(csiNode *storagev1beta1.CSINode, client clientset.Interface) error { + for driverName, allocatable := range s.csiNodeAllocatable { + found := false + for i, driver := range csiNode.Spec.Drivers { + if driver.Name == driverName { + found = true + csiNode.Spec.Drivers[i].Allocatable = allocatable + break + } + } + if !found { + d := storagev1beta1.CSINodeDriver{ + Name: driverName, + Allocatable: allocatable, + } + + csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d) + } + } + csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.migratedPlugins, ",") + + _, err := client.StorageV1beta1().CSINodes().Update(csiNode) + return err +} + +func (s *NodeAllocatableStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error { + csiNode, err := client.StorageV1beta1().CSINodes().Get(node.Name, metav1.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + return s.createCSINode(node.Name, client) + } + return err + } + return s.updateCSINode(csiNode, client) +} + +func (s *NodeAllocatableStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error { + csiNode, err := client.StorageV1beta1().CSINodes().Get(nodeName, metav1.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + return nil + } + return err + } + + for driverName := range s.csiNodeAllocatable { + for i, driver := range csiNode.Spec.Drivers { + if driver.Name == driverName { + csiNode.Spec.Drivers[i].Allocatable = nil + } + } + } + return s.updateCSINode(csiNode, client) +} + func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error { var err error patch := strategy.PreparePatch(node) @@ -957,17 +1122,34 @@ func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNo } for attempt := 0; attempt < retries; attempt++ { if _, err = client.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patch)); err == nil { - return nil + break } if !apierrs.IsConflict(err) { return fmt.Errorf("Error while applying patch %v to Node %v: %v", string(patch), node.Name, err) } time.Sleep(100 * time.Millisecond) } - return fmt.Errorf("To many conflicts when applying patch %v to Node %v", string(patch), node.Name) + if err != nil { + return fmt.Errorf("Too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err) + } + + for attempt := 0; attempt < retries; attempt++ { + if err = strategy.PrepareDependentObjects(node, client); err == nil { + break + } + if !apierrs.IsConflict(err) { + return fmt.Errorf("Error while preparing objects for node %s: %s", node.Name, err) + } + time.Sleep(100 * time.Millisecond) + } + if err != nil { + return fmt.Errorf("Too many conflicts when creating objects for node %s: %s", node.Name, err) + } + return nil } func DoCleanupNode(client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error { + var err error for attempt := 0; attempt < retries; attempt++ { node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) if err != nil { @@ -978,14 +1160,31 @@ func DoCleanupNode(client clientset.Interface, nodeName string, strategy Prepare return nil } if _, err = client.CoreV1().Nodes().Update(updatedNode); err == nil { - return nil + break } if !apierrs.IsConflict(err) { return fmt.Errorf("Error when updating Node %v: %v", nodeName, err) } time.Sleep(100 * time.Millisecond) } - return fmt.Errorf("To many conflicts when trying to cleanup Node %v", nodeName) + if err != nil { + return fmt.Errorf("Too many conflicts when trying to cleanup Node %v: %s", nodeName, err) + } + + for attempt := 0; attempt < retries; attempt++ { + err = strategy.CleanupDependentObjects(nodeName, client) + if err == nil { + break + } + if !apierrs.IsConflict(err) { + return fmt.Errorf("Error when cleaning up Node %v objects: %v", nodeName, err) + } + time.Sleep(100 * time.Millisecond) + } + if err != nil { + return fmt.Errorf("Too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err) + } + return nil } type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error @@ -1077,6 +1276,70 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe return createError } +func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int) error { + var createError error + lock := sync.Mutex{} + createPodFunc := func(i int) { + pvcName := fmt.Sprintf("pvc-%d", i) + + // pv + pv := factory(i) + // bind to "pvc-$i" + pv.Spec.ClaimRef = &v1.ObjectReference{ + Kind: "PersistentVolumeClaim", + Namespace: namespace, + Name: pvcName, + APIVersion: "v1", + } + pv.Status.Phase = v1.VolumeBound + if err := CreatePersistentVolumeWithRetries(client, pv); err != nil { + lock.Lock() + defer lock.Unlock() + createError = fmt.Errorf("error creating PV: %s", err) + return + } + + // pvc + pvc := claimTemplate.DeepCopy() + pvc.Name = pvcName + // bind to "pv-$i" + pvc.Spec.VolumeName = pv.Name + pvc.Status.Phase = v1.ClaimBound + if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil { + lock.Lock() + defer lock.Unlock() + createError = fmt.Errorf("error creating PVC: %s", err) + return + } + + // pod + pod := podTemplate.DeepCopy() + pod.Spec.Volumes = []v1.Volume{ + { + Name: "vol", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + }, + }, + }, + } + if err := makeCreatePod(client, namespace, pod); err != nil { + lock.Lock() + defer lock.Unlock() + createError = err + return + } + } + + if count < 30 { + workqueue.ParallelizeUntil(context.TODO(), count, count, createPodFunc) + } else { + workqueue.ParallelizeUntil(context.TODO(), 30, count, createPodFunc) + } + return createError +} + func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error { rc := &v1.ReplicationController{ ObjectMeta: metav1.ObjectMeta{ @@ -1105,6 +1368,14 @@ func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy { } } +// volumeFactory creates an unique PersistentVolume for given integer. +type volumeFactory func(uniqueID int) *v1.PersistentVolume + +func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy { + return func(client clientset.Interface, namespace string, podCount int) error { + return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount) + } +} func NewSimpleCreatePodStrategy() TestPodCreateStrategy { basePod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{