Integration and e2e tests

This commit is contained in:
Michelle Au 2018-08-17 17:46:11 -07:00
parent 01d83fa104
commit 8091c7975b
5 changed files with 226 additions and 31 deletions

View File

@ -239,6 +239,7 @@ type FakeVolumePlugin struct {
VolumeLimits map[string]int64
VolumeLimitsError error
LimitKey string
ProvisionDelaySeconds int
Mounters []*FakeVolume
Unmounters []*FakeVolume
@ -437,7 +438,7 @@ func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provision
plugin.Lock()
defer plugin.Unlock()
plugin.LastProvisionerOptions = options
return &FakeProvisioner{options, plugin.Host}, nil
return &FakeProvisioner{options, plugin.Host, plugin.ProvisionDelaySeconds}, nil
}
func (plugin *FakeVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
@ -779,8 +780,9 @@ func (fd *FakeDeleter) GetPath() string {
}
type FakeProvisioner struct {
Options VolumeOptions
Host VolumeHost
Options VolumeOptions
Host VolumeHost
ProvisionDelaySeconds int
}
func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
@ -807,6 +809,10 @@ func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []
},
}
if fc.ProvisionDelaySeconds > 0 {
time.Sleep(time.Duration(fc.ProvisionDelaySeconds) * time.Second)
}
return pv, nil
}

View File

@ -566,13 +566,28 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
framework.Skipf("Runs only when number of nodes >= %v", ssReplicas)
}
By("Creating a StatefulSet with pod anti-affinity on nodes")
ss := createStatefulSet(config, ssReplicas, volsPerNode, true)
ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false)
validateStatefulSet(config, ss, true)
})
It("should use volumes on one node when pod has affinity", func() {
By("Creating a StatefulSet with pod affinity on nodes")
ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false)
ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false)
validateStatefulSet(config, ss, false)
})
It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() {
if len(config.nodes) < ssReplicas {
framework.Skipf("Runs only when number of nodes >= %v", ssReplicas)
}
By("Creating a StatefulSet with pod anti-affinity on nodes")
ss := createStatefulSet(config, ssReplicas, 1, true, true)
validateStatefulSet(config, ss, true)
})
It("should use volumes on one node when pod management is parallel and pod has affinity", func() {
By("Creating a StatefulSet with pod affinity on nodes")
ss := createStatefulSet(config, ssReplicas, 1, false, true)
validateStatefulSet(config, ss, false)
})
})
@ -1830,7 +1845,7 @@ func findLocalPersistentVolume(c clientset.Interface, volumePath string) (*v1.Pe
return nil, nil
}
func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti bool) *appsv1.StatefulSet {
func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet {
mounts := []v1.VolumeMount{}
claims := []v1.PersistentVolumeClaim{}
for i := 0; i < volumeCount; i++ {
@ -1897,6 +1912,10 @@ func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount in
},
}
if parallel {
spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
}
ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(spec)
Expect(err).NotTo(HaveOccurred())

View File

@ -34,6 +34,7 @@ go_test(
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",

View File

@ -143,7 +143,7 @@ func initTestScheduler(
) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, time.Second)
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, false, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -155,13 +155,15 @@ func initTestSchedulerWithOptions(
setPodInformer bool,
policy *schedulerapi.Policy,
disablePreemption bool,
disableEquivalenceCache bool,
resyncPeriod time.Duration,
) *TestContext {
// Enable EnableEquivalenceClassCache for all integration tests.
defer utilfeaturetesting.SetFeatureGateDuringTest(
t,
utilfeature.DefaultFeatureGate,
features.EnableEquivalenceClassCache, true)()
if !disableEquivalenceCache {
defer utilfeaturetesting.SetFeatureGateDuringTest(
t,
utilfeature.DefaultFeatureGate,
features.EnableEquivalenceClassCache, true)()
}
// 1. Create scheduler
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod)
@ -256,7 +258,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, time.Second)
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, false, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called

View File

@ -20,6 +20,7 @@ package scheduler
import (
"fmt"
"os"
"strconv"
"strings"
"testing"
@ -32,12 +33,14 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -60,6 +63,7 @@ var (
classWait = "wait"
classImmediate = "immediate"
classDynamic = "dynamic"
sharedClasses = map[storagev1.VolumeBindingMode]*storagev1.StorageClass{
modeImmediate: makeStorageClass(classImmediate, &modeImmediate),
@ -94,7 +98,7 @@ func TestVolumeBinding(t *testing.T) {
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 2, features, 0)
config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false)
defer config.teardown()
cases := map[string]struct {
@ -267,7 +271,7 @@ func TestVolumeBindingRescheduling(t *testing.T) {
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 2, features, 0)
config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false)
defer config.teardown()
storageClassName := "local-storage"
@ -385,8 +389,9 @@ func TestVolumeBindingRescheduling(t *testing.T) {
}
// TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound PVCs.
// PVs are precreated.
func TestVolumeBindingStress(t *testing.T) {
testVolumeBindingStress(t, 0)
testVolumeBindingStress(t, 0, false, 0)
}
// Like TestVolumeBindingStress but with scheduler resync. In real cluster,
@ -394,32 +399,60 @@ func TestVolumeBindingStress(t *testing.T) {
// service/node update events.
// This is useful to detect possible race conditions.
func TestVolumeBindingStressWithSchedulerResync(t *testing.T) {
testVolumeBindingStress(t, time.Second)
testVolumeBindingStress(t, time.Second, false, 0)
}
func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) {
// Like TestVolumeBindingStress but with fast dynamic provisioning
func TestVolumeBindingDynamicStressFast(t *testing.T) {
testVolumeBindingStress(t, 0, true, 0)
}
// Like TestVolumeBindingStress but with slow dynamic provisioning
func TestVolumeBindingDynamicStressSlow(t *testing.T) {
testVolumeBindingStress(t, 0, true, 30)
}
func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-binding-stress", 1, features, schedulerResyncPeriod)
config := setupCluster(t, "volume-binding-stress-", 1, features, schedulerResyncPeriod, provisionDelaySeconds, false)
defer config.teardown()
// Set max volume limit to the number of PVCs the test will create
// TODO: remove when max volume limit allows setting through storageclass
if err := os.Setenv(predicates.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod)); err != nil {
t.Fatalf("failed to set max pd limit: %v", err)
}
defer os.Unsetenv(predicates.KubeMaxPDVols)
scName := &classWait
if dynamic {
scName = &classDynamic
sc := makeDynamicProvisionerStorageClass(*scName, &modeWait)
if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil {
t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
}
}
// Create enough PVs and PVCs for all the pods
pvs := []*v1.PersistentVolume{}
pvcs := []*v1.PersistentVolumeClaim{}
for i := 0; i < podLimit*volsPerPod; i++ {
pv := makePV(fmt.Sprintf("pv-stress-%v", i), classWait, "", "", node1)
pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, &classWait, "")
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
// Don't create pvs for dynamic provisioning test
if !dynamic {
pv := makePV(fmt.Sprintf("pv-stress-%v", i), *scName, "", "", node1)
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
}
pvs = append(pvs, pv)
}
pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, scName, "")
if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil {
t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
}
pvs = append(pvs, pv)
pvcs = append(pvcs, pvc)
}
@ -431,7 +464,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration)
podPvcs = append(podPvcs, pvcs[j].Name)
}
pod := makePod(fmt.Sprintf("pod%v", i), config.ns, podPvcs)
pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, podPvcs)
if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
@ -442,7 +475,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration)
for _, pod := range pods {
// Use increased timeout for stress test because there is a higher chance of
// PV sync error
if err := waitForPodToScheduleWithTimeout(config.client, pod, 60*time.Second); err != nil {
if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil {
t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
}
}
@ -456,12 +489,142 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration)
}
}
func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
// TODO: disable equivalence cache until kubernetes/kubernetes#67680 is fixed
config := setupCluster(t, "volume-pod-affinity-", numNodes, features, 0, 0, true)
defer config.teardown()
pods := []*v1.Pod{}
pvcs := []*v1.PersistentVolumeClaim{}
pvs := []*v1.PersistentVolume{}
// Create PVs for the first node
for i := 0; i < numPVsFirstNode; i++ {
pv := makePV(fmt.Sprintf("pv-node1-%v", i), classWait, "", "", node1)
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
}
pvs = append(pvs, pv)
}
// Create 1 PV per Node for the remaining nodes
for i := 2; i <= numNodes; i++ {
pv := makePV(fmt.Sprintf("pv-node%v-0", i), classWait, "", "", fmt.Sprintf("node-%v", i))
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
}
pvs = append(pvs, pv)
}
// Create pods
for i := 0; i < numPods; i++ {
// Create one pvc per pod
pvc := makePVC(fmt.Sprintf("pvc-%v", i), config.ns, &classWait, "")
if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil {
t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
}
pvcs = append(pvcs, pvc)
// Create pod with pod affinity
pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, []string{pvc.Name})
pod.Spec.Affinity = &v1.Affinity{}
affinityTerms := []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"volume-binding-test"},
},
},
},
TopologyKey: nodeAffinityLabelKey,
},
}
if anti {
pod.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
}
} else {
pod.Spec.Affinity.PodAffinity = &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
}
}
if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
pods = append(pods, pod)
}
// Validate Pods scheduled
scheduledNodes := sets.NewString()
for _, pod := range pods {
if err := waitForPodToSchedule(config.client, pod); err != nil {
t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
} else {
// Keep track of all the nodes that the Pods were scheduled on
pod, err = config.client.CoreV1().Pods(config.ns).Get(pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Pod %q: %v", pod.Name, err)
}
if pod.Spec.NodeName == "" {
t.Fatalf("Pod %q node name unset after scheduling", pod.Name)
}
scheduledNodes.Insert(pod.Spec.NodeName)
}
}
// Validate the affinity policy
if anti {
// The pods should have been spread across different nodes
if scheduledNodes.Len() != numPods {
t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), numPods)
}
} else {
// The pods should have been scheduled on 1 node
if scheduledNodes.Len() != 1 {
t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), 1)
}
}
// Validate PVC binding
for _, pvc := range pvcs {
validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound)
}
}
func TestVolumeBindingWithAntiAffinity(t *testing.T) {
numNodes := 10
// Create as many pods as number of nodes
numPods := numNodes
// Create many more PVs on node1 to increase chance of selecting node1
numPVsFirstNode := 10 * numNodes
testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
}
func TestVolumeBindingWithAffinity(t *testing.T) {
numPods := 10
// Create many more nodes to increase chance of selecting a PV on a different node than node1
numNodes := 10 * numPods
// Create numPods PVs on the first node
numPVsFirstNode := numPods
testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
}
func TestPVAffinityConflict(t *testing.T) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 3, features, 0)
config := setupCluster(t, "volume-scheduling-", 3, features, 0, 0, false)
defer config.teardown()
pv := makePV("local-pv", classImmediate, "", "", node1)
@ -519,7 +682,7 @@ func TestPVAffinityConflict(t *testing.T) {
}
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration) *testConfig {
func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig {
oldFeatures := make(map[string]bool, len(features))
for feature := range features {
oldFeatures[feature] = utilfeature.DefaultFeatureGate.Enabled(utilfeature.Feature(feature))
@ -529,7 +692,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
controllerCh := make(chan struct{})
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, resyncPeriod)
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, disableEquivalenceCache, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name
@ -543,6 +706,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
Host: host,
Config: volume.VolumeConfig{},
LastProvisionerOptions: volume.VolumeOptions{},
ProvisionDelaySeconds: provisionDelaySeconds,
NewAttacherCallCount: 0,
NewDetacherCallCount: 0,
Mounters: nil,
@ -732,6 +896,9 @@ func makePod(name, ns string, pvcs []string) *v1.Pod {
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
Labels: map[string]string{
"app": "volume-binding-test",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{