Merge pull request #43557 from MaciekPytel/ca_cluster_failure_e2e

Automatic merge from submit-queue

e2e for cluster-autoscaler unhealthy cluster handling

Add e2e test for cluster-autoscaler handling of unhealthy cluster. 

~~This only passes reliably with https://github.com/kubernetes/contrib/pull/2488 and should not be merged before it.~~ (included in current CA image)

**Release note**:
```release-note
```
This commit is contained in:
Kubernetes Submit Queue 2017-04-05 02:40:06 -07:00 committed by GitHub
commit 8667d7c4f1
3 changed files with 110 additions and 41 deletions

View File

@ -20,8 +20,10 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math"
"net/http"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
@ -46,17 +48,19 @@ import (
)
const (
defaultTimeout = 3 * time.Minute
resizeTimeout = 5 * time.Minute
scaleUpTimeout = 5 * time.Minute
scaleDownTimeout = 15 * time.Minute
podTimeout = 2 * time.Minute
defaultTimeout = 3 * time.Minute
resizeTimeout = 5 * time.Minute
scaleUpTimeout = 5 * time.Minute
scaleDownTimeout = 15 * time.Minute
podTimeout = 2 * time.Minute
nodesRecoverTimeout = 5 * time.Minute
gkeEndpoint = "https://test-container.sandbox.googleapis.com"
gkeUpdateTimeout = 15 * time.Minute
disabledTaint = "DisabledForAutoscalingTest"
newNodesForScaledownTests = 2
unhealthyClusterThreshold = 4
)
var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
@ -354,6 +358,48 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
})
})
It("Shouldn't perform scale up operation and should list unhealthy status if most of the cluster is broken[Feature:ClusterSizeAutoscalingScaleUp]", func() {
clusterSize := nodeCount
for clusterSize < unhealthyClusterThreshold+1 {
clusterSize = manuallyIncreaseClusterSize(f, originalSizes)
}
By("Block network connectivity to some nodes to simulate unhealthy cluster")
nodesToBreakCount := int(math.Floor(math.Max(float64(unhealthyClusterThreshold), 0.5*float64(clusterSize))))
nodes, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String()})
framework.ExpectNoError(err)
Expect(nodesToBreakCount <= len(nodes.Items)).To(BeTrue())
nodesToBreak := nodes.Items[:nodesToBreakCount]
// TestUnderTemporaryNetworkFailure only removes connectivity to a single node,
// and accepts func() callback. This is expanding the loop to recursive call
// to avoid duplicating TestUnderTemporaryNetworkFailure
var testFunction func()
testFunction = func() {
if len(nodesToBreak) > 0 {
ntb := &nodesToBreak[0]
nodesToBreak = nodesToBreak[1:]
framework.TestUnderTemporaryNetworkFailure(c, "default", ntb, testFunction)
} else {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memCapacityMb, false)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
time.Sleep(scaleUpTimeout)
currentNodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
framework.Logf("Currently available nodes: %v, nodes available at the start of test: %v, disabled nodes: %v", len(currentNodes.Items), len(nodes.Items), nodesToBreakCount)
Expect(len(currentNodes.Items)).Should(Equal(len(nodes.Items) - nodesToBreakCount))
status, err := getClusterwideStatus(c)
framework.Logf("Clusterwide status: %v", status)
framework.ExpectNoError(err)
Expect(status).Should(Equal("Unhealthy"))
}
}
testFunction()
// Give nodes time to recover from network failure
framework.ExpectNoError(framework.WaitForClusterSize(c, len(nodes.Items), nodesRecoverTimeout))
})
})
func runDrainTest(f *framework.Framework, migSizes map[string]int, podsPerNode, pdbSize int, verifyFunction func(int)) {
@ -828,3 +874,25 @@ func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[strin
func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
return increasedSize
}
// Try to get clusterwide health from CA status configmap.
// Status configmap is not parsing-friendly, so evil regexpery follows.
func getClusterwideStatus(c clientset.Interface) (string, error) {
configMap, err := c.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
if err != nil {
return "", err
}
status, ok := configMap.Data["status"]
if !ok {
return "", fmt.Errorf("Status information not found in configmap")
}
matcher, err := regexp.Compile("Cluster-wide:\\s*\n\\s*Health:\\s*([A-Za-z]+)")
if err != nil {
return "", err
}
result := matcher.FindStringSubmatch(status)
if len(result) < 2 {
return "", fmt.Errorf("Failed to parse CA status configmap")
}
return result[1], nil
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
)
@ -826,3 +827,35 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout
}
return nil
}
// Blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
host := GetNodeExternalIP(node)
master := GetMasterAddress(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
UnblockNetwork(host, master)
}()
Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
BlockNetwork(host, master)
Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
}
testFunc()
// network traffic is unblocked in a deferred function
}

View File

@ -41,38 +41,6 @@ import (
. "github.com/onsi/gomega"
)
// Blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func testUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
host := framework.GetNodeExternalIP(node)
master := framework.GetMasterAddress(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
framework.UnblockNetwork(host, master)
}()
framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
framework.BlockNetwork(host, master)
framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
}
testFunc()
// network traffic is unblocked in a deferred function
}
func expectNodeReadiness(isReady bool, newNode chan *v1.Node) {
timeout := false
expected := false
@ -281,7 +249,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
Expect(err).NotTo(HaveOccurred())
@ -346,7 +314,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas + 1'.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
@ -421,7 +389,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to StatefulSet 'statefulSetName', **does not** disappear due to forced deletion from the apiserver.
// The grace period on the stateful pods is set to a value > 0.
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Checking that the NodeController does not force delete stateful pods %v", pod.Name)
err := framework.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, 10*time.Minute)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
@ -464,7 +432,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// This creates a temporary network partition, verifies that the job has 'parallelism' number of
// running pods after the node-controller detects node unreachable.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForPodToDisappear(c, ns, pods.Items[0].Name, label, 20*time.Second, 10*time.Minute)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")