diff --git a/test/e2e/cluster_size_autoscaling.go b/test/e2e/cluster_size_autoscaling.go index 3558925508d..30718171a5c 100644 --- a/test/e2e/cluster_size_autoscaling.go +++ b/test/e2e/cluster_size_autoscaling.go @@ -28,7 +28,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" + policy "k8s.io/client-go/pkg/apis/policy/v1beta1" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/test/e2e/framework" @@ -45,9 +49,13 @@ const ( resizeTimeout = 5 * time.Minute scaleUpTimeout = 5 * time.Minute scaleDownTimeout = 15 * time.Minute + podTimeout = 2 * time.Minute gkeEndpoint = "https://test-container.sandbox.googleapis.com" gkeUpdateTimeout = 15 * time.Minute + + disabledTaint = "DisabledForAutoscalingTest" + newNodesForScaledownTests = 2 ) var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() { @@ -95,6 +103,11 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() { By(fmt.Sprintf("Restoring initial size of the cluster")) setMigSizes(originalSizes) framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount, scaleDownTimeout)) + nodes, err := c.Core().Nodes().List(metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, n := range nodes.Items { + framework.ExpectNoError(makeNodeSchedulable(c, &n)) + } }) It("shouldn't increase cluster size if pending pod is too large [Feature:ClusterSizeAutoscalingScaleUp]", func() { @@ -279,17 +292,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() { }) It("should correctly scale down after a node is not needed [Feature:ClusterSizeAutoscalingScaleDown]", func() { - By("Manually increase cluster size") - increasedSize := 0 - newSizes := make(map[string]int) - for key, val := range originalSizes { - newSizes[key] = val + 2 - increasedSize += val + 2 - } - setMigSizes(newSizes) - framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, - func(size int) bool { return size >= increasedSize }, scaleUpTimeout)) - + increasedSize := manuallyIncreaseClusterSize(f, originalSizes) By("Some node should be removed") framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool { return size < increasedSize }, scaleDownTimeout)) @@ -298,16 +301,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() { It("should correctly scale down after a node is not needed when there is non autoscaled pool[Feature:ClusterSizeAutoscalingScaleDown]", func() { framework.SkipUnlessProviderIs("gke") - By("Manually increase cluster size") - increasedSize := 0 - newSizes := make(map[string]int) - for key, val := range originalSizes { - newSizes[key] = val + 2 - increasedSize += val + 2 - } - setMigSizes(newSizes) - framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, - func(size int) bool { return size >= increasedSize }, scaleUpTimeout)) + increasedSize := manuallyIncreaseClusterSize(f, originalSizes) const extraPoolName = "extra-pool" addNodePool(extraPoolName, "n1-standard-1", 3) @@ -324,8 +318,62 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() { framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool { return size < increasedSize+3 }, scaleDownTimeout+10*time.Minute)) }) + + It("should be able to scale down when rescheduling a pod is required and pdb allows for it[Feature:ClusterSizeAutoscalingScaleDown]", func() { + runDrainTest(f, originalSizes, 1, func(increasedSize int) { + By("Some node should be removed") + framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, + func(size int) bool { return size < increasedSize }, scaleDownTimeout)) + }) + }) + + It("shouldn't be able to scale down when rescheduling a pod is required, but pdb doesn't allow drain[Feature:ClusterSizeAutoscalingScaleDown]", func() { + runDrainTest(f, originalSizes, 0, func(increasedSize int) { + By("No nodes should be removed") + time.Sleep(scaleDownTimeout) + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + Expect(len(nodes.Items)).Should(Equal(increasedSize)) + }) + }) + }) +func runDrainTest(f *framework.Framework, migSizes map[string]int, pdbSize int, verifyFunction func(int)) { + increasedSize := manuallyIncreaseClusterSize(f, migSizes) + + nodes, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{ + "spec.unschedulable": "false", + }.AsSelector().String()}) + framework.ExpectNoError(err) + namespace := f.Namespace.Name + numPods := len(nodes.Items) + testId := string(uuid.NewUUID()) // So that we can label and find pods + labelMap := map[string]string{"test_id": testId} + framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, "reschedulable-pods", labelMap)) + + defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "reschedulable-pods") + + By("Create a PodDisruptionBudget") + pdb := &policy.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_pdb", + Namespace: namespace, + }, + Spec: policy.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: labelMap}, + MinAvailable: intstr.FromInt(numPods - pdbSize), + }, + } + _, err = f.StagingClient.Policy().PodDisruptionBudgets(namespace).Create(pdb) + + defer func() { + f.StagingClient.Policy().PodDisruptionBudgets(namespace).Delete(pdb.Name, &metav1.DeleteOptions{}) + }() + + framework.ExpectNoError(err) + verifyFunction(increasedSize) +} + func getGKEClusterUrl() string { out, err := exec.Command("gcloud", "auth", "print-access-token").Output() framework.ExpectNoError(err) @@ -605,3 +653,128 @@ func setMigSizes(sizes map[string]int) { } } } + +func makeNodeUnschedulable(c clientset.Interface, node *v1.Node) error { + By(fmt.Sprintf("Taint node %s", node.Name)) + freshNode, err := c.Core().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil { + return err + } + for _, taint := range freshNode.Spec.Taints { + if taint.Key == disabledTaint { + return nil + } + } + freshNode.Spec.Taints = append(freshNode.Spec.Taints, v1.Taint{ + Key: disabledTaint, + Value: "DisabledForTest", + Effect: v1.TaintEffectNoSchedule, + }) + _, err = c.Core().Nodes().Update(freshNode) + return err +} + +func makeNodeSchedulable(c clientset.Interface, node *v1.Node) error { + By(fmt.Sprintf("Remove taint from node %s", node.Name)) + freshNode, err := c.Core().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil { + return err + } + newTaints := make([]v1.Taint, 0) + for _, taint := range freshNode.Spec.Taints { + if taint.Key != disabledTaint { + newTaints = append(newTaints, taint) + } + } + + if len(newTaints) != len(freshNode.Spec.Taints) { + freshNode.Spec.Taints = newTaints + _, err = c.Core().Nodes().Update(freshNode) + return err + } + return nil +} + +// Creat an RC running a single pod on each node without adding any constraint forcing such +// pod distribution. This is meant to create a bunch of underutilized (but not unused) nodes +// with pods that can be rescheduled on different nodes. +// This is achieved using the following method: +// 1. disable scheduling on each node +// 2. create an empty RC +// 3. for each node: +// 3a. enable scheduling on that node +// 3b. increase number of replicas in RC by 1 +func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, id string, labels map[string]string) error { + By("Run a pod on each node") + for _, node := range nodes { + err := makeNodeUnschedulable(f.ClientSet, &node) + + defer func(n v1.Node) { + makeNodeSchedulable(f.ClientSet, &n) + }(node) + + if err != nil { + return err + } + } + config := &testutils.RCConfig{ + Client: f.ClientSet, + InternalClient: f.InternalClientset, + Name: id, + Namespace: f.Namespace.Name, + Timeout: defaultTimeout, + Image: framework.GetPauseImageName(f.ClientSet), + Replicas: 0, + Labels: labels, + } + err := framework.RunRC(*config) + if err != nil { + return err + } + rc, err := f.ClientSet.Core().ReplicationControllers(f.Namespace.Name).Get(id, metav1.GetOptions{}) + if err != nil { + return err + } + for i, node := range nodes { + err = makeNodeSchedulable(f.ClientSet, &node) + if err != nil { + return err + } + *rc.Spec.Replicas = int32(i + 1) + rc, err = f.ClientSet.Core().ReplicationControllers(f.Namespace.Name).Update(rc) + if err != nil { + return err + } + err = wait.PollImmediate(5*time.Second, podTimeout, func() (bool, error) { + rc, err = f.ClientSet.Core().ReplicationControllers(f.Namespace.Name).Get(id, metav1.GetOptions{}) + if err != nil || rc.Status.ReadyReplicas < int32(i+1) { + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("failed to coerce RC into spawning a pod on node %s within timeout", node.Name) + } + err = makeNodeUnschedulable(f.ClientSet, &node) + if err != nil { + return err + } + } + return nil +} + +// Increase cluster size by newNodesForScaledownTests to create some unused nodes +// that can be later removed by cluster autoscaler. +func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[string]int) int { + By("Manually increase cluster size") + increasedSize := 0 + newSizes := make(map[string]int) + for key, val := range originalSizes { + newSizes[key] = val + newNodesForScaledownTests + increasedSize += val + newNodesForScaledownTests + } + setMigSizes(newSizes) + framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, + func(size int) bool { return size >= increasedSize }, scaleUpTimeout)) + return increasedSize +}