diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 8cfa7657841..850fc204dfc 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -239,6 +239,7 @@ type FakeVolumePlugin struct { VolumeLimits map[string]int64 VolumeLimitsError error LimitKey string + ProvisionDelaySeconds int Mounters []*FakeVolume Unmounters []*FakeVolume @@ -437,7 +438,7 @@ func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provision plugin.Lock() defer plugin.Unlock() plugin.LastProvisionerOptions = options - return &FakeProvisioner{options, plugin.Host}, nil + return &FakeProvisioner{options, plugin.Host, plugin.ProvisionDelaySeconds}, nil } func (plugin *FakeVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { @@ -779,8 +780,9 @@ func (fd *FakeDeleter) GetPath() string { } type FakeProvisioner struct { - Options VolumeOptions - Host VolumeHost + Options VolumeOptions + Host VolumeHost + ProvisionDelaySeconds int } func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) { @@ -807,6 +809,10 @@ func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies [] }, } + if fc.ProvisionDelaySeconds > 0 { + time.Sleep(time.Duration(fc.ProvisionDelaySeconds) * time.Second) + } + return pv, nil } diff --git a/test/e2e/storage/persistent_volumes-local.go b/test/e2e/storage/persistent_volumes-local.go index 7340f9ab527..52744cdcbad 100644 --- a/test/e2e/storage/persistent_volumes-local.go +++ b/test/e2e/storage/persistent_volumes-local.go @@ -566,13 +566,28 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { framework.Skipf("Runs only when number of nodes >= %v", ssReplicas) } By("Creating a StatefulSet with pod anti-affinity on nodes") - ss := createStatefulSet(config, ssReplicas, volsPerNode, true) + ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false) validateStatefulSet(config, ss, true) }) It("should use volumes on one node when pod has affinity", func() { By("Creating a StatefulSet with pod affinity on nodes") - ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false) + ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false) + validateStatefulSet(config, ss, false) + }) + + It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() { + if len(config.nodes) < ssReplicas { + framework.Skipf("Runs only when number of nodes >= %v", ssReplicas) + } + By("Creating a StatefulSet with pod anti-affinity on nodes") + ss := createStatefulSet(config, ssReplicas, 1, true, true) + validateStatefulSet(config, ss, true) + }) + + It("should use volumes on one node when pod management is parallel and pod has affinity", func() { + By("Creating a StatefulSet with pod affinity on nodes") + ss := createStatefulSet(config, ssReplicas, 1, false, true) validateStatefulSet(config, ss, false) }) }) @@ -1830,7 +1845,7 @@ func findLocalPersistentVolume(c clientset.Interface, volumePath string) (*v1.Pe return nil, nil } -func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti bool) *appsv1.StatefulSet { +func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet { mounts := []v1.VolumeMount{} claims := []v1.PersistentVolumeClaim{} for i := 0; i < volumeCount; i++ { @@ -1897,6 +1912,10 @@ func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount in }, } + if parallel { + spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement + } + ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(spec) Expect(err).NotTo(HaveOccurred()) diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index ba2c51e2b38..4943e0eb83f 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -34,6 +34,7 @@ go_test( "//pkg/kubeapiserver/admission:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithm:go_default_library", + "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/apis/config:go_default_library", diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index d801caa4a01..833e3a8a6d5 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -143,7 +143,7 @@ func initTestScheduler( ) *TestContext { // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // feature gate is enabled at the same time. - return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, time.Second) + return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, false, time.Second) } // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -155,13 +155,15 @@ func initTestSchedulerWithOptions( setPodInformer bool, policy *schedulerapi.Policy, disablePreemption bool, + disableEquivalenceCache bool, resyncPeriod time.Duration, ) *TestContext { - // Enable EnableEquivalenceClassCache for all integration tests. - defer utilfeaturetesting.SetFeatureGateDuringTest( - t, - utilfeature.DefaultFeatureGate, - features.EnableEquivalenceClassCache, true)() + if !disableEquivalenceCache { + defer utilfeaturetesting.SetFeatureGateDuringTest( + t, + utilfeature.DefaultFeatureGate, + features.EnableEquivalenceClassCache, true)() + } // 1. Create scheduler context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod) @@ -256,7 +258,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { // configuration but with pod preemption disabled. func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { return initTestSchedulerWithOptions( - t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, time.Second) + t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, false, time.Second) } // cleanupTest deletes the scheduler and the test namespace. It should be called diff --git a/test/integration/scheduler/volume_binding_test.go b/test/integration/scheduler/volume_binding_test.go index 484ce93dc98..9bb20f5adf7 100644 --- a/test/integration/scheduler/volume_binding_test.go +++ b/test/integration/scheduler/volume_binding_test.go @@ -20,6 +20,7 @@ package scheduler import ( "fmt" + "os" "strconv" "strings" "testing" @@ -32,12 +33,14 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" imageutils "k8s.io/kubernetes/test/utils/image" @@ -60,6 +63,7 @@ var ( classWait = "wait" classImmediate = "immediate" + classDynamic = "dynamic" sharedClasses = map[storagev1.VolumeBindingMode]*storagev1.StorageClass{ modeImmediate: makeStorageClass(classImmediate, &modeImmediate), @@ -94,7 +98,7 @@ func TestVolumeBinding(t *testing.T) { "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-scheduling", 2, features, 0) + config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false) defer config.teardown() cases := map[string]struct { @@ -267,7 +271,7 @@ func TestVolumeBindingRescheduling(t *testing.T) { "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-scheduling", 2, features, 0) + config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false) defer config.teardown() storageClassName := "local-storage" @@ -385,8 +389,9 @@ func TestVolumeBindingRescheduling(t *testing.T) { } // TestVolumeBindingStress creates pods, each with unbound PVCs. +// PVs are precreated. func TestVolumeBindingStress(t *testing.T) { - testVolumeBindingStress(t, 0) + testVolumeBindingStress(t, 0, false, 0) } // Like TestVolumeBindingStress but with scheduler resync. In real cluster, @@ -394,32 +399,60 @@ func TestVolumeBindingStress(t *testing.T) { // service/node update events. // This is useful to detect possible race conditions. func TestVolumeBindingStressWithSchedulerResync(t *testing.T) { - testVolumeBindingStress(t, time.Second) + testVolumeBindingStress(t, time.Second, false, 0) } -func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) { +// Like TestVolumeBindingStress but with fast dynamic provisioning +func TestVolumeBindingDynamicStressFast(t *testing.T) { + testVolumeBindingStress(t, 0, true, 0) +} + +// Like TestVolumeBindingStress but with slow dynamic provisioning +func TestVolumeBindingDynamicStressSlow(t *testing.T) { + testVolumeBindingStress(t, 0, true, 30) +} + +func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) { features := map[string]bool{ "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-binding-stress", 1, features, schedulerResyncPeriod) + config := setupCluster(t, "volume-binding-stress-", 1, features, schedulerResyncPeriod, provisionDelaySeconds, false) defer config.teardown() + // Set max volume limit to the number of PVCs the test will create + // TODO: remove when max volume limit allows setting through storageclass + if err := os.Setenv(predicates.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod)); err != nil { + t.Fatalf("failed to set max pd limit: %v", err) + } + defer os.Unsetenv(predicates.KubeMaxPDVols) + + scName := &classWait + if dynamic { + scName = &classDynamic + sc := makeDynamicProvisionerStorageClass(*scName, &modeWait) + if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil { + t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err) + } + } + // Create enough PVs and PVCs for all the pods pvs := []*v1.PersistentVolume{} pvcs := []*v1.PersistentVolumeClaim{} for i := 0; i < podLimit*volsPerPod; i++ { - pv := makePV(fmt.Sprintf("pv-stress-%v", i), classWait, "", "", node1) - pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, &classWait, "") - - if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { - t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + // Don't create pvs for dynamic provisioning test + if !dynamic { + pv := makePV(fmt.Sprintf("pv-stress-%v", i), *scName, "", "", node1) + if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { + t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + } + pvs = append(pvs, pv) } + + pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, scName, "") if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil { t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err) } - - pvs = append(pvs, pv) pvcs = append(pvcs, pvc) } @@ -431,7 +464,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) podPvcs = append(podPvcs, pvcs[j].Name) } - pod := makePod(fmt.Sprintf("pod%v", i), config.ns, podPvcs) + pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, podPvcs) if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil { t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) } @@ -442,7 +475,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) for _, pod := range pods { // Use increased timeout for stress test because there is a higher chance of // PV sync error - if err := waitForPodToScheduleWithTimeout(config.client, pod, 60*time.Second); err != nil { + if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil { t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err) } } @@ -456,12 +489,142 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) } } +func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) { + features := map[string]bool{ + "VolumeScheduling": true, + "PersistentLocalVolumes": true, + } + // TODO: disable equivalence cache until kubernetes/kubernetes#67680 is fixed + config := setupCluster(t, "volume-pod-affinity-", numNodes, features, 0, 0, true) + defer config.teardown() + + pods := []*v1.Pod{} + pvcs := []*v1.PersistentVolumeClaim{} + pvs := []*v1.PersistentVolume{} + + // Create PVs for the first node + for i := 0; i < numPVsFirstNode; i++ { + pv := makePV(fmt.Sprintf("pv-node1-%v", i), classWait, "", "", node1) + if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { + t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + } + pvs = append(pvs, pv) + } + + // Create 1 PV per Node for the remaining nodes + for i := 2; i <= numNodes; i++ { + pv := makePV(fmt.Sprintf("pv-node%v-0", i), classWait, "", "", fmt.Sprintf("node-%v", i)) + if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { + t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + } + pvs = append(pvs, pv) + } + + // Create pods + for i := 0; i < numPods; i++ { + // Create one pvc per pod + pvc := makePVC(fmt.Sprintf("pvc-%v", i), config.ns, &classWait, "") + if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil { + t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err) + } + pvcs = append(pvcs, pvc) + + // Create pod with pod affinity + pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, []string{pvc.Name}) + pod.Spec.Affinity = &v1.Affinity{} + affinityTerms := []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"volume-binding-test"}, + }, + }, + }, + TopologyKey: nodeAffinityLabelKey, + }, + } + if anti { + pod.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms, + } + } else { + pod.Spec.Affinity.PodAffinity = &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms, + } + } + + if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + pods = append(pods, pod) + } + + // Validate Pods scheduled + scheduledNodes := sets.NewString() + for _, pod := range pods { + if err := waitForPodToSchedule(config.client, pod); err != nil { + t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err) + } else { + // Keep track of all the nodes that the Pods were scheduled on + pod, err = config.client.CoreV1().Pods(config.ns).Get(pod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Pod %q: %v", pod.Name, err) + } + if pod.Spec.NodeName == "" { + t.Fatalf("Pod %q node name unset after scheduling", pod.Name) + } + scheduledNodes.Insert(pod.Spec.NodeName) + } + } + + // Validate the affinity policy + if anti { + // The pods should have been spread across different nodes + if scheduledNodes.Len() != numPods { + t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), numPods) + } + } else { + // The pods should have been scheduled on 1 node + if scheduledNodes.Len() != 1 { + t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), 1) + } + } + + // Validate PVC binding + for _, pvc := range pvcs { + validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound) + } +} + +func TestVolumeBindingWithAntiAffinity(t *testing.T) { + numNodes := 10 + // Create as many pods as number of nodes + numPods := numNodes + // Create many more PVs on node1 to increase chance of selecting node1 + numPVsFirstNode := 10 * numNodes + + testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode) +} + +func TestVolumeBindingWithAffinity(t *testing.T) { + numPods := 10 + // Create many more nodes to increase chance of selecting a PV on a different node than node1 + numNodes := 10 * numPods + // Create numPods PVs on the first node + numPVsFirstNode := numPods + + testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode) +} + func TestPVAffinityConflict(t *testing.T) { features := map[string]bool{ "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-scheduling", 3, features, 0) + config := setupCluster(t, "volume-scheduling-", 3, features, 0, 0, false) defer config.teardown() pv := makePV("local-pv", classImmediate, "", "", node1) @@ -519,7 +682,7 @@ func TestPVAffinityConflict(t *testing.T) { } } -func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration) *testConfig { +func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig { oldFeatures := make(map[string]bool, len(features)) for feature := range features { oldFeatures[feature] = utilfeature.DefaultFeatureGate.Enabled(utilfeature.Feature(feature)) @@ -529,7 +692,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s controllerCh := make(chan struct{}) - context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, resyncPeriod) + context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, disableEquivalenceCache, resyncPeriod) clientset := context.clientSet ns := context.ns.Name @@ -543,6 +706,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s Host: host, Config: volume.VolumeConfig{}, LastProvisionerOptions: volume.VolumeOptions{}, + ProvisionDelaySeconds: provisionDelaySeconds, NewAttacherCallCount: 0, NewDetacherCallCount: 0, Mounters: nil, @@ -732,6 +896,9 @@ func makePod(name, ns string, pvcs []string) *v1.Pod { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns, + Labels: map[string]string{ + "app": "volume-binding-test", + }, }, Spec: v1.PodSpec{ Containers: []v1.Container{