diff --git a/test/e2e/autoscaling/BUILD b/test/e2e/autoscaling/BUILD index 2f13fcb1a85..ce21fb6b6a0 100644 --- a/test/e2e/autoscaling/BUILD +++ b/test/e2e/autoscaling/BUILD @@ -11,6 +11,7 @@ go_library( name = "go_default_library", srcs = [ "autoscaling_timer.go", + "cluster_autoscaler_scalability.go", "cluster_size_autoscaling.go", "dns_autoscaling.go", "framework.go", @@ -33,9 +34,11 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/test/e2e/autoscaling/cluster_autoscaler_scalability.go b/test/e2e/autoscaling/cluster_autoscaler_scalability.go new file mode 100644 index 00000000000..d3577c2ca85 --- /dev/null +++ b/test/e2e/autoscaling/cluster_autoscaler_scalability.go @@ -0,0 +1,458 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package autoscaling + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" + + "github.com/golang/glog" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + largeResizeTimeout = 10 * time.Minute + largeScaleUpTimeout = 10 * time.Minute + largeScaleDownTimeout = 20 * time.Minute + minute = 1 * time.Minute + + maxNodes = 1000 +) + +type clusterPredicates struct { + nodes int +} + +type scaleUpTestConfig struct { + initialNodes int + initialPods int + extraPods *testutils.RCConfig + expectedResult *clusterPredicates +} + +var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", func() { + f := framework.NewDefaultFramework("autoscaling") + var c clientset.Interface + var nodeCount int + var coresPerNode int + var memCapacityMb int + var originalSizes map[string]int + var sum int + + BeforeEach(func() { + framework.SkipUnlessProviderIs("gce", "gke", "kubemark") + + c = f.ClientSet + if originalSizes == nil { + originalSizes = make(map[string]int) + sum = 0 + for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") { + size, err := framework.GroupSize(mig) + framework.ExpectNoError(err) + By(fmt.Sprintf("Initial size of %s: %d", mig, size)) + originalSizes[mig] = size + sum += size + } + } + + framework.ExpectNoError(framework.WaitForClusterSize(c, sum, scaleUpTimeout)) + + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + nodeCount = len(nodes.Items) + Expect(nodeCount).NotTo(BeZero()) + cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU] + mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory] + coresPerNode = int((&cpu).MilliValue() / 1000) + memCapacityMb = int((&mem).Value() / 1024 / 1024) + + Expect(nodeCount).Should(Equal(sum)) + + if framework.ProviderIs("gke") { + val, err := isAutoscalerEnabled(3) + framework.ExpectNoError(err) + if !val { + err = enableAutoscaler("default-pool", 3, 5) + framework.ExpectNoError(err) + } + } + }) + + AfterEach(func() { + By(fmt.Sprintf("Restoring initial size of the cluster")) + setMigSizes(originalSizes) + framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount, scaleDownTimeout)) + nodes, err := c.Core().Nodes().List(metav1.ListOptions{}) + framework.ExpectNoError(err) + s := time.Now() + makeSchedulableLoop: + for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) { + for _, n := range nodes.Items { + err = makeNodeSchedulable(c, &n, true) + switch err.(type) { + case CriticalAddonsOnlyError: + continue makeSchedulableLoop + default: + framework.ExpectNoError(err) + } + } + break + } + glog.Infof("Made nodes schedulable again in %v", time.Now().Sub(s).String()) + }) + + It("should scale up at all [Feature:ClusterAutoscalerScalability1]", func() { + perNodeReservation := int(float64(memCapacityMb) * 0.95) + replicasPerNode := 10 + + additionalNodes := maxNodes - nodeCount + replicas := additionalNodes * replicasPerNode + additionalReservation := additionalNodes * perNodeReservation + + // saturate cluster + reservationCleanup := ReserveMemory(f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, scaleUpTimeout) + defer reservationCleanup() + framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c)) + + // configure pending pods & expected scale up + rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout) + expectedResult := createClusterPredicates(nodeCount + additionalNodes) + config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult) + + // run test + testCleanup := simpleScaleUpTest(f, config) + defer testCleanup() + }) + + It("should scale up twice [Feature:ClusterAutoscalerScalability2]", func() { + perNodeReservation := int(float64(memCapacityMb) * 0.95) + replicasPerNode := 10 + additionalNodes1 := int(0.7 * maxNodes) + additionalNodes2 := int(0.25 * maxNodes) + + replicas1 := additionalNodes1 * replicasPerNode + replicas2 := additionalNodes2 * replicasPerNode + + glog.Infof("cores per node: %v", coresPerNode) + + // saturate cluster + reservationCleanup := ReserveMemory(f, "some-pod", nodeCount, nodeCount*perNodeReservation, true, scaleUpTimeout) + defer reservationCleanup() + framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c)) + + glog.Infof("Reserved successfully") + + // configure pending pods & expected scale up #1 + rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout) + expectedResult := createClusterPredicates(nodeCount + additionalNodes1) + config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult) + + epsilon := 0.05 + + // run test #1 + testCleanup1 := simpleScaleUpTestWithEpsilon(f, config, epsilon) + defer testCleanup1() + + glog.Infof("Scaled up once") + + // configure pending pods & expected scale up #2 + rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout) + expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2) + config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2) + + // run test #2 + testCleanup2 := simpleScaleUpTestWithEpsilon(f, config2, epsilon) + defer testCleanup2() + + glog.Infof("Scaled up twice") + }) + + It("should scale down empty nodes [Feature:ClusterAutoscalerScalability3]", func() { + perNodeReservation := int(float64(memCapacityMb) * 0.7) + replicas := int(float64(maxNodes) * 0.7) + totalNodes := maxNodes + + // resize cluster to totalNodes + newSizes := map[string]int{ + anyKey(originalSizes): totalNodes, + } + setMigSizes(newSizes) + framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, totalNodes, largeResizeTimeout)) + + // run replicas + rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout) + expectedResult := createClusterPredicates(totalNodes) + config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult) + testCleanup := simpleScaleUpTestWithEpsilon(f, config, 0.1) + defer testCleanup() + + // check if empty nodes are scaled down + framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, + func(size int) bool { + return size <= replicas+3 // leaving space for non-evictable kube-system pods + }, scaleDownTimeout)) + }) + + It("should scale down underutilized nodes [Feature:ClusterAutoscalerScalability4]", func() { + underutilizedReservation := int64(float64(memCapacityMb) * 0.01) + fullReservation := int64(float64(memCapacityMb) * 0.8) + perNodeReplicas := 10 + totalNodes := maxNodes + + // resize cluster to totalNodes + newSizes := map[string]int{ + anyKey(originalSizes): totalNodes, + } + setMigSizes(newSizes) + framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, totalNodes, largeResizeTimeout)) + + // annotate all nodes with no-scale-down + ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled" + + nodes, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{ + FieldSelector: fields.Set{ + "spec.unschedulable": "false", + }.AsSelector().String(), + }) + + framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "true")) + + // distribute pods (using taints) + divider := int(float64(len(nodes.Items)) * 0.7) + + fullNodes := nodes.Items[:divider] + underutilizedNodes := nodes.Items[divider:] + + framework.ExpectNoError(makeUnschedulable(f, underutilizedNodes)) + + testId2 := "full" + labels2 := map[string]string{"test_id": testId2} + cleanup2, err := runReplicatedPodOnEachNodeWithCleanup(f, fullNodes, f.Namespace.Name, 1, "filling-pod", labels2, fullReservation) + defer cleanup2() + framework.ExpectNoError(err) + + framework.ExpectNoError(makeUnschedulable(f, fullNodes)) + + testId := "underutilized" + labels := map[string]string{"test_id": testId} + cleanup, err := runReplicatedPodOnEachNodeWithCleanup(f, underutilizedNodes, f.Namespace.Name, perNodeReplicas, "underutilizing-pod", labels, underutilizedReservation) + defer cleanup() + framework.ExpectNoError(err) + + framework.ExpectNoError(makeSchedulable(f, nodes.Items)) + framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "false")) + + // wait for scale down + expectedSize := int(float64(totalNodes) * 0.85) + nodesToScaleDownCount := totalNodes - expectedSize + timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout + framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool { + return size <= expectedSize + }, timeout)) + }) + + It("shouldn't scale down with underutilized nodes due to host port conflicts [Feature:ClusterAutoscalerScalability5]", func() { + fullReservation := int(float64(memCapacityMb) * 0.9) + hostPortPodReservation := int(float64(memCapacityMb) * 0.3) + totalNodes := maxNodes + reservedPort := 4321 + + // resize cluster to totalNodes + newSizes := map[string]int{ + anyKey(originalSizes): totalNodes, + } + setMigSizes(newSizes) + framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, totalNodes, largeResizeTimeout)) + divider := int(float64(totalNodes) * 0.7) + fullNodesCount := divider + underutilizedNodesCount := totalNodes - fullNodesCount + + By("Reserving full nodes") + // run RC1 w/o host port + cleanup := ReserveMemory(f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2) + defer cleanup() + + By("Reserving host ports on remaining nodes") + // run RC2 w/ host port + cleanup2 := createHostPortPodsWithMemory(f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout) + defer cleanup2() + + waitForAllCaPodsReadyInNamespace(f, c) + // wait and check scale down doesn't occur + By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes())) + time.Sleep(scaleDownTimeout) + + By("Checking if the number of nodes is as expected") + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + glog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes) + Expect(len(nodes.Items)).Should(Equal(totalNodes)) + }) + +}) + +func makeUnschedulable(f *framework.Framework, nodes []v1.Node) error { + for _, node := range nodes { + err := makeNodeUnschedulable(f.ClientSet, &node) + if err != nil { + return err + } + } + return nil +} + +func makeSchedulable(f *framework.Framework, nodes []v1.Node) error { + for _, node := range nodes { + err := makeNodeSchedulable(f.ClientSet, &node, false) + if err != nil { + return err + } + } + return nil +} + +func anyKey(input map[string]int) string { + for k := range input { + return k + } + return "" +} + +func simpleScaleUpTestWithEpsilon(f *framework.Framework, config *scaleUpTestConfig, epsilon float64) func() error { + // resize cluster to start size + // run rc based on config + By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name)) + start := time.Now() + framework.ExpectNoError(framework.RunRC(*config.extraPods)) + // check results + if epsilon > 0 && epsilon < 1 { + // Tolerate some number of nodes not to be created. + minExpectedNodeCount := int(float64(config.expectedResult.nodes) - epsilon*float64(config.expectedResult.nodes)) + framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, + func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout)) + } else { + framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, config.expectedResult.nodes, scaleUpTimeout)) + } + glog.Infof("cluster is increased") + if epsilon > 0 && epsilon < 0 { + framework.ExpectNoError(waitForCaPodsReadyInNamespace(f, f.ClientSet, int(epsilon*float64(config.extraPods.Replicas)+1))) + } else { + framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet)) + } + timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes)) + return func() error { + return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, config.extraPods.Name) + } +} + +func simpleScaleUpTest(f *framework.Framework, config *scaleUpTestConfig) func() error { + return simpleScaleUpTestWithEpsilon(f, config, 0) +} + +func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig { + return &testutils.RCConfig{ + Client: f.ClientSet, + InternalClient: f.InternalClientset, + Name: id, + Namespace: f.Namespace.Name, + Timeout: timeout, + Image: framework.GetPauseImageName(f.ClientSet), + Replicas: replicas, + MemRequest: int64(1024 * 1024 * megabytes / replicas), + } +} + +func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig { + return &scaleUpTestConfig{ + initialNodes: nodes, + initialPods: pods, + extraPods: extraPods, + expectedResult: expectedResult, + } +} + +func createClusterPredicates(nodes int) *clusterPredicates { + return &clusterPredicates{ + nodes: nodes, + } +} + +func addAnnotation(f *framework.Framework, nodes []v1.Node, key, value string) error { + for _, node := range nodes { + oldData, err := json.Marshal(node) + if err != nil { + return err + } + + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + node.Annotations[key] = value + + newData, err := json.Marshal(node) + if err != nil { + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + if err != nil { + return err + } + + _, err = f.ClientSet.Core().Nodes().Patch(string(node.Name), types.StrategicMergePatchType, patchBytes) + if err != nil { + return err + } + } + return nil +} + +func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error { + By(fmt.Sprintf("Running RC which reserves host port and memory")) + request := int64(1024 * 1024 * megabytes / replicas) + config := &testutils.RCConfig{ + Client: f.ClientSet, + InternalClient: f.InternalClientset, + Name: id, + Namespace: f.Namespace.Name, + Timeout: timeout, + Image: framework.GetPauseImageName(f.ClientSet), + Replicas: replicas, + HostPorts: map[string]int{"port1": port}, + MemRequest: request, + } + err := framework.RunRC(*config) + framework.ExpectNoError(err) + return func() error { + return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, id) + } +} + +func timeTrack(start time.Time, name string) { + elapsed := time.Since(start) + glog.Infof("%s took %s", name, elapsed) +} diff --git a/test/e2e/autoscaling/cluster_size_autoscaling.go b/test/e2e/autoscaling/cluster_size_autoscaling.go index d2b75fc828d..463d5f1e7c9 100644 --- a/test/e2e/autoscaling/cluster_size_autoscaling.go +++ b/test/e2e/autoscaling/cluster_size_autoscaling.go @@ -199,7 +199,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() { framework.ExpectNoError(err) unmanagedNodes := nodeCount - status.ready - By("Schedule more pods than can fit and wait for claster to scale-up") + By("Schedule more pods than can fit and wait for cluster to scale-up") ReserveMemory(f, "memory-reservation", 100, nodeCount*memCapacityMb, false, 1*time.Second) defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation") @@ -719,7 +719,7 @@ func runDrainTest(f *framework.Framework, migSizes map[string]int, namespace str numPods := len(nodes.Items) * podsPerNode testId := string(uuid.NewUUID()) // So that we can label and find pods labelMap := map[string]string{"test_id": testId} - framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap)) + framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap, 0)) defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, "reschedulable-pods") @@ -907,7 +907,7 @@ func doPut(url, content string) (string, error) { return strBody, nil } -func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) { +func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) func() error { By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes)) request := int64(1024 * 1024 * megabytes / replicas) config := &testutils.RCConfig{ @@ -929,9 +929,12 @@ func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, e if expectRunning { framework.ExpectNoError(err) } - return + return func() error { + return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, id) + } } framework.Failf("Failed to reserve memory within timeout") + return nil } // WaitForClusterSize waits until the cluster size matches the given function. @@ -961,7 +964,7 @@ func WaitForClusterSizeFuncWithUnready(c clientset.Interface, sizeFunc func(int) glog.Infof("Cluster has reached the desired size") return nil } - glog.Infof("Waiting for cluster, current size %d, not ready nodes %d", numNodes, numNodes-numReady) + glog.Infof("Waiting for cluster with func, current size %d, not ready nodes %d", numNodes, numNodes-numReady) } return fmt.Errorf("timeout waiting %v for appropriate cluster size", timeout) } @@ -1201,7 +1204,7 @@ func buildAntiAffinity(labels map[string]string) *v1.Affinity { // 3. for each node: // 3a. enable scheduling on that node // 3b. increase number of replicas in RC by podsPerNode -func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string) error { +func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) error { By("Run a pod on each node") for _, node := range nodes { err := makeNodeUnschedulable(f.ClientSet, &node) @@ -1223,6 +1226,7 @@ func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespa Image: framework.GetPauseImageName(f.ClientSet), Replicas: 0, Labels: labels, + MemRequest: memRequest, } err := framework.RunRC(*config) if err != nil { @@ -1274,6 +1278,14 @@ func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespa return nil } +// wrap runReplicatedPodOnEachNode to return cleanup +func runReplicatedPodOnEachNodeWithCleanup(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) (func(), error) { + err := runReplicatedPodOnEachNode(f, nodes, namespace, podsPerNode, id, labels, memRequest) + return func() { + framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, id) + }, err +} + // Increase cluster size by newNodesForScaledownTests to create some unused nodes // that can be later removed by cluster autoscaler. func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[string]int) int {