add e2e test for network partition

This commit is contained in:
Minhan Xia 2016-01-08 16:25:59 -08:00
parent 7f095c1f8e
commit 9611986509
2 changed files with 239 additions and 60 deletions

View File

@ -33,13 +33,21 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/client/cache"
awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
controllerframework "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
const (
serveHostnameImage = "gcr.io/google_containers/serve_hostname:1.1"
resizeNodeReadyTimeout = 2 * time.Minute
resizeNodeNotReadyTimeout = 2 * time.Minute
nodeReadinessTimeout = 3 * time.Minute
podNotReadyTimeout = 1 * time.Minute
podReadyTimeout = 2 * time.Minute
testPort = 9376
)
@ -287,14 +295,69 @@ func verifyPods(c *client.Client, ns, name string, wantName bool, replicas int)
return nil
}
// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to replication controller 'rcName', really disappeared.
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replicas int, podNameToDisappear string, node *api.Node) {
func blockNetwork(from string, to string) {
Logf("block network traffic from %s to %s", from, to)
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
if result, err := SSH(dropCmd, from, testContext.Provider); result.Code != 0 || err != nil {
LogSSHResult(result)
Failf("Unexpected error: %v", err)
}
}
func unblockNetwork(from string, to string) {
Logf("Unblock network traffic from %s to %s", from, to)
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
// Undrop command may fail if the rule has never been created.
// In such case we just lose 30 seconds, but the cluster is healthy.
// But if the rule had been created and removing it failed, the node is broken and
// not coming back. Subsequent tests will run or fewer nodes (some of the tests
// may fail). Manual intervention is required in such case (recreating the
// cluster solves the problem too).
err := wait.Poll(time.Millisecond*100, time.Second*30, func() (bool, error) {
result, err := SSH(undropCmd, from, testContext.Provider)
if result.Code == 0 && err == nil {
return true, nil
}
LogSSHResult(result)
if err != nil {
Logf("Unexpected error: %v", err)
}
return false, nil
})
if err != nil {
Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
"required on host %s: remove rule %s, if exists", from, iptablesRule)
}
}
func getMaster(c *client.Client) string {
master := ""
switch testContext.Provider {
case "gce":
eps, err := c.Endpoints(api.NamespaceDefault).Get("kubernetes")
if err != nil {
Failf("Fail to get kubernetes endpoinds: %v", err)
}
if len(eps.Subsets) != 1 || len(eps.Subsets[0].Addresses) != 1 {
Failf("There are more than 1 endpoints for kubernetes service: %+v", eps)
}
master = eps.Subsets[0].Addresses[0].IP
case "gke":
master = strings.TrimPrefix(testContext.Host, "https://")
case "aws":
// TODO(justinsb): Avoid hardcoding this.
master = "172.20.0.9"
default:
Failf("This test is not supported for provider %s and should be disabled", testContext.Provider)
}
return master
}
// Return node external IP concatenated with port 22 for ssh
// e.g. 1.2.3.4:22
func getNodeExternalIP(node *api.Node) string {
Logf("Getting external IP address for %s", node.Name)
host := ""
for _, a := range node.Status.Addresses {
@ -306,71 +369,34 @@ func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replica
if host == "" {
Failf("Couldn't get the external IP of host %s with addresses %v", node.Name, node.Status.Addresses)
}
return host
}
// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to replication controller 'rcName', really disappeared.
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replicas int, podNameToDisappear string, node *api.Node) {
host := getNodeExternalIP(node)
master := getMaster(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
master := ""
switch testContext.Provider {
case "gce":
// TODO(#10085): The use of MasterName will cause iptables to do a DNS
// lookup to resolve the name to an IP address, which will slow down the
// test and cause it to fail if DNS is absent or broken. Use the
// internal IP address instead (i.e. NOT the one in testContext.Host).
master = testContext.CloudConfig.MasterName
case "gke":
master = strings.TrimPrefix(testContext.Host, "https://")
case "aws":
// TODO(justinsb): Avoid hardcoding this.
master = "172.20.0.9"
default:
Failf("This test is not supported for provider %s and should be disabled", testContext.Provider)
}
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", master)
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
// Undrop command may fail if the rule has never been created.
// In such case we just lose 30 seconds, but the cluster is healthy.
// But if the rule had been created and removing it failed, the node is broken and
// not coming back. Subsequent tests will run or fewer nodes (some of the tests
// may fail). Manual intervention is required in such case (recreating the
// cluster solves the problem too).
err := wait.Poll(time.Millisecond*100, time.Second*30, func() (bool, error) {
result, err := SSH(undropCmd, host, testContext.Provider)
if result.Code == 0 && err == nil {
return true, nil
}
LogSSHResult(result)
if err != nil {
Logf("Unexpected error: %v", err)
}
return false, nil
})
if err != nil {
Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
"required on node %s: remove rule %s, if exists", node.Name, iptablesRule)
}
unblockNetwork(host, master)
}()
Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !waitForNodeToBe(c, node.Name, api.NodeReady, true, resizeNodeReadyTimeout) {
Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
// The command will block all outgoing network traffic from the node to the master
// When multi-master is implemented, this test will have to be improved to block
// network traffic to all masters.
// We could also block network traffic from the master(s) to this node,
// but blocking it one way is sufficient for this test.
dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
if result, err := SSH(dropCmd, host, testContext.Provider); result.Code != 0 || err != nil {
LogSSHResult(result)
Failf("Unexpected error: %v", err)
}
blockNetwork(host, master)
Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !waitForNodeToBe(c, node.Name, api.NodeReady, false, resizeNodeNotReadyTimeout) {
@ -388,12 +414,32 @@ func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replica
// network traffic is unblocked in a deferred function
}
func expectNodeReadiness(isReady bool, newNode chan *api.Node) {
timeout := false
expected := false
timer := time.After(nodeReadinessTimeout)
for !expected && !timeout {
select {
case n := <-newNode:
if isNodeConditionSetAsExpected(n, api.NodeReady, isReady) {
expected = true
} else {
Logf("Observed node ready status is NOT %v as expected", isReady)
}
case <-timer:
timeout = true
}
}
if !expected {
Failf("Failed to observe node ready status change to %v", isReady)
}
}
var _ = Describe("Nodes [Disruptive]", func() {
framework := NewFramework("resize-nodes")
var systemPodsNo int
var c *client.Client
var ns string
BeforeEach(func() {
c = framework.Client
ns = framework.Namespace.Name
@ -560,6 +606,101 @@ var _ = Describe("Nodes [Disruptive]", func() {
}
}
})
// What happens in this test:
// Network traffic from a node to master is cut off to simulate network partition
// Expect to observe:
// 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
// 2. All pods on node are marked NotReady shortly after #1
// 3. Node and pods return to Ready after connectivivty recovers
It("All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+
"AND all pods should be mark back to Ready when the node get back to Ready before pod eviction timeout", func() {
By("choose a node - we will block all network traffic on this node")
var podOpts api.ListOptions
nodeOpts := api.ListOptions{}
nodes, err := c.Nodes().List(nodeOpts)
Expect(err).NotTo(HaveOccurred())
filterNodes(nodes, func(node api.Node) bool {
if !isNodeConditionSetAsExpected(&node, api.NodeReady, true) {
return false
}
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(client.PodHost, node.Name)}
pods, err := c.Pods(api.NamespaceAll).List(podOpts)
if err != nil || len(pods.Items) <= 0 {
return false
}
return true
})
if len(nodes.Items) <= 0 {
Failf("No eligible node were found: %d", len(nodes.Items))
}
node := nodes.Items[0]
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(client.PodHost, node.Name)}
if err = waitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, podRunningReady); err != nil {
Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err)
}
By("Set up watch on node status")
nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
stopCh := make(chan struct{})
newNode := make(chan *api.Node)
var controller *controllerframework.Controller
_, controller = controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = nodeSelector
return framework.Client.Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = nodeSelector
return framework.Client.Nodes().Watch(options)
},
},
&api.Node{},
0,
controllerframework.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
n, ok := newObj.(*api.Node)
Expect(ok).To(Equal(true))
newNode <- n
},
},
)
defer func() {
// Will not explicitly close newNode channel here due to
// race condition where stopCh and newNode are closed but informer onUpdate still executes.
close(stopCh)
}()
go controller.Run(stopCh)
By(fmt.Sprintf("Block traffic from node %s to the master", node.Name))
host := getNodeExternalIP(&node)
master := getMaster(c)
defer func() {
By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name))
unblockNetwork(host, master)
if CurrentGinkgoTestDescription().Failed {
return
}
By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers")
expectNodeReadiness(true, newNode)
if err = waitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, podRunningReady); err != nil {
Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err)
}
}()
blockNetwork(host, master)
By("Expect to observe node and pod status change from Ready to NotReady after network partition")
expectNodeReadiness(false, newNode)
if err = waitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, podNotReady); err != nil {
Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err)
}
})
})
})
})

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
@ -354,6 +355,16 @@ func podRunningReady(p *api.Pod) (bool, error) {
return true, nil
}
// podNotReady checks whether pod p's has a ready condition of status false.
func podNotReady(p *api.Pod) (bool, error) {
// Check the ready condition is false.
if podReady(p) {
return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v",
p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionFalse, p.Status.Conditions)
}
return true, nil
}
// check if a Pod is controlled by a Replication Controller in the List
func hasReplicationControllersForPod(rcs *api.ReplicationControllerList, pod api.Pod) bool {
for _, rc := range rcs.Items {
@ -545,6 +556,33 @@ func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout tim
return fmt.Errorf("gave up waiting for pod '%s' to be '%s' after %v", podName, desc, timeout)
}
// waitForMatchPodsCondition finds match pods based on the input ListOptions.
// waits and checks if all match pods are in the given podCondition
func waitForMatchPodsCondition(c *client.Client, opts api.ListOptions, desc string, timeout time.Duration, condition podCondition) error {
Logf("Waiting up to %v for matching pods' status to be %s", timeout, desc)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
pods, err := c.Pods(api.NamespaceAll).List(opts)
if err != nil {
return err
}
conditionNotMatch := []string{}
for _, pod := range pods.Items {
done, err := condition(&pod)
if done && err != nil {
return fmt.Errorf("Unexpected error: %v", err)
}
if !done {
conditionNotMatch = append(conditionNotMatch, format.Pod(&pod))
}
}
if len(conditionNotMatch) <= 0 {
return err
}
Logf("%d pods are not %s", len(conditionNotMatch), desc)
}
return fmt.Errorf("gave up waiting for matching pods to be '%s' after %v", desc, timeout)
}
// waitForDefaultServiceAccountInNamespace waits for the default service account to be provisioned
// the default service account is what is associated with pods when they do not specify a service account
// as a result, pods are not able to be provisioned in a namespace until the service account is provisioned