diff --git a/test/e2e/resize_nodes.go b/test/e2e/resize_nodes.go index eb51b19a981..3bcf5336952 100644 --- a/test/e2e/resize_nodes.go +++ b/test/e2e/resize_nodes.go @@ -33,13 +33,21 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/client/cache" awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + controllerframework "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" ) const ( serveHostnameImage = "gcr.io/google_containers/serve_hostname:1.1" resizeNodeReadyTimeout = 2 * time.Minute resizeNodeNotReadyTimeout = 2 * time.Minute + nodeReadinessTimeout = 3 * time.Minute + podNotReadyTimeout = 1 * time.Minute + podReadyTimeout = 2 * time.Minute testPort = 9376 ) @@ -287,14 +295,69 @@ func verifyPods(c *client.Client, ns, name string, wantName bool, replicas int) return nil } -// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear', -// that belongs to replication controller 'rcName', really disappeared. -// Finally, it checks that the replication controller recreates the -// pods on another node and that now the number of replicas is equal 'replicas'. -// At the end (even in case of errors), the network traffic is brought back to normal. -// This function executes commands on a node so it will work only for some -// environments. -func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replicas int, podNameToDisappear string, node *api.Node) { +func blockNetwork(from string, to string) { + Logf("block network traffic from %s to %s", from, to) + iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to) + dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule) + if result, err := SSH(dropCmd, from, testContext.Provider); result.Code != 0 || err != nil { + LogSSHResult(result) + Failf("Unexpected error: %v", err) + } +} + +func unblockNetwork(from string, to string) { + Logf("Unblock network traffic from %s to %s", from, to) + iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to) + undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule) + // Undrop command may fail if the rule has never been created. + // In such case we just lose 30 seconds, but the cluster is healthy. + // But if the rule had been created and removing it failed, the node is broken and + // not coming back. Subsequent tests will run or fewer nodes (some of the tests + // may fail). Manual intervention is required in such case (recreating the + // cluster solves the problem too). + err := wait.Poll(time.Millisecond*100, time.Second*30, func() (bool, error) { + result, err := SSH(undropCmd, from, testContext.Provider) + if result.Code == 0 && err == nil { + return true, nil + } + LogSSHResult(result) + if err != nil { + Logf("Unexpected error: %v", err) + } + return false, nil + }) + if err != nil { + Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+ + "required on host %s: remove rule %s, if exists", from, iptablesRule) + } +} + +func getMaster(c *client.Client) string { + master := "" + switch testContext.Provider { + case "gce": + eps, err := c.Endpoints(api.NamespaceDefault).Get("kubernetes") + if err != nil { + Failf("Fail to get kubernetes endpoinds: %v", err) + } + if len(eps.Subsets) != 1 || len(eps.Subsets[0].Addresses) != 1 { + Failf("There are more than 1 endpoints for kubernetes service: %+v", eps) + } + master = eps.Subsets[0].Addresses[0].IP + case "gke": + master = strings.TrimPrefix(testContext.Host, "https://") + case "aws": + // TODO(justinsb): Avoid hardcoding this. + master = "172.20.0.9" + default: + Failf("This test is not supported for provider %s and should be disabled", testContext.Provider) + } + return master +} + +// Return node external IP concatenated with port 22 for ssh +// e.g. 1.2.3.4:22 +func getNodeExternalIP(node *api.Node) string { Logf("Getting external IP address for %s", node.Name) host := "" for _, a := range node.Status.Addresses { @@ -306,71 +369,34 @@ func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replica if host == "" { Failf("Couldn't get the external IP of host %s with addresses %v", node.Name, node.Status.Addresses) } + return host +} +// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear', +// that belongs to replication controller 'rcName', really disappeared. +// Finally, it checks that the replication controller recreates the +// pods on another node and that now the number of replicas is equal 'replicas'. +// At the end (even in case of errors), the network traffic is brought back to normal. +// This function executes commands on a node so it will work only for some +// environments. +func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replicas int, podNameToDisappear string, node *api.Node) { + host := getNodeExternalIP(node) + master := getMaster(c) By(fmt.Sprintf("block network traffic from node %s to the master", node.Name)) - master := "" - switch testContext.Provider { - case "gce": - // TODO(#10085): The use of MasterName will cause iptables to do a DNS - // lookup to resolve the name to an IP address, which will slow down the - // test and cause it to fail if DNS is absent or broken. Use the - // internal IP address instead (i.e. NOT the one in testContext.Host). - master = testContext.CloudConfig.MasterName - case "gke": - master = strings.TrimPrefix(testContext.Host, "https://") - case "aws": - // TODO(justinsb): Avoid hardcoding this. - master = "172.20.0.9" - default: - Failf("This test is not supported for provider %s and should be disabled", testContext.Provider) - } - iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", master) defer func() { // This code will execute even if setting the iptables rule failed. // It is on purpose because we may have an error even if the new rule // had been inserted. (yes, we could look at the error code and ssh error // separately, but I prefer to stay on the safe side). - By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name)) - undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule) - // Undrop command may fail if the rule has never been created. - // In such case we just lose 30 seconds, but the cluster is healthy. - // But if the rule had been created and removing it failed, the node is broken and - // not coming back. Subsequent tests will run or fewer nodes (some of the tests - // may fail). Manual intervention is required in such case (recreating the - // cluster solves the problem too). - err := wait.Poll(time.Millisecond*100, time.Second*30, func() (bool, error) { - result, err := SSH(undropCmd, host, testContext.Provider) - if result.Code == 0 && err == nil { - return true, nil - } - LogSSHResult(result) - if err != nil { - Logf("Unexpected error: %v", err) - } - return false, nil - }) - if err != nil { - Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+ - "required on node %s: remove rule %s, if exists", node.Name, iptablesRule) - } + unblockNetwork(host, master) }() Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name) if !waitForNodeToBe(c, node.Name, api.NodeReady, true, resizeNodeReadyTimeout) { Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout) } - - // The command will block all outgoing network traffic from the node to the master - // When multi-master is implemented, this test will have to be improved to block - // network traffic to all masters. - // We could also block network traffic from the master(s) to this node, - // but blocking it one way is sufficient for this test. - dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule) - if result, err := SSH(dropCmd, host, testContext.Provider); result.Code != 0 || err != nil { - LogSSHResult(result) - Failf("Unexpected error: %v", err) - } + blockNetwork(host, master) Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name) if !waitForNodeToBe(c, node.Name, api.NodeReady, false, resizeNodeNotReadyTimeout) { @@ -388,12 +414,32 @@ func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replica // network traffic is unblocked in a deferred function } +func expectNodeReadiness(isReady bool, newNode chan *api.Node) { + timeout := false + expected := false + timer := time.After(nodeReadinessTimeout) + for !expected && !timeout { + select { + case n := <-newNode: + if isNodeConditionSetAsExpected(n, api.NodeReady, isReady) { + expected = true + } else { + Logf("Observed node ready status is NOT %v as expected", isReady) + } + case <-timer: + timeout = true + } + } + if !expected { + Failf("Failed to observe node ready status change to %v", isReady) + } +} + var _ = Describe("Nodes [Disruptive]", func() { framework := NewFramework("resize-nodes") var systemPodsNo int var c *client.Client var ns string - BeforeEach(func() { c = framework.Client ns = framework.Namespace.Name @@ -560,6 +606,101 @@ var _ = Describe("Nodes [Disruptive]", func() { } } }) + + // What happens in this test: + // Network traffic from a node to master is cut off to simulate network partition + // Expect to observe: + // 1. Node is marked NotReady after timeout by nodecontroller (40seconds) + // 2. All pods on node are marked NotReady shortly after #1 + // 3. Node and pods return to Ready after connectivivty recovers + It("All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+ + "AND all pods should be mark back to Ready when the node get back to Ready before pod eviction timeout", func() { + By("choose a node - we will block all network traffic on this node") + var podOpts api.ListOptions + nodeOpts := api.ListOptions{} + nodes, err := c.Nodes().List(nodeOpts) + Expect(err).NotTo(HaveOccurred()) + filterNodes(nodes, func(node api.Node) bool { + if !isNodeConditionSetAsExpected(&node, api.NodeReady, true) { + return false + } + podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(client.PodHost, node.Name)} + pods, err := c.Pods(api.NamespaceAll).List(podOpts) + if err != nil || len(pods.Items) <= 0 { + return false + } + return true + }) + if len(nodes.Items) <= 0 { + Failf("No eligible node were found: %d", len(nodes.Items)) + } + node := nodes.Items[0] + podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(client.PodHost, node.Name)} + if err = waitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, podRunningReady); err != nil { + Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err) + } + + By("Set up watch on node status") + nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name) + stopCh := make(chan struct{}) + newNode := make(chan *api.Node) + var controller *controllerframework.Controller + _, controller = controllerframework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + options.FieldSelector = nodeSelector + return framework.Client.Nodes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + options.FieldSelector = nodeSelector + return framework.Client.Nodes().Watch(options) + }, + }, + &api.Node{}, + 0, + controllerframework.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + n, ok := newObj.(*api.Node) + Expect(ok).To(Equal(true)) + newNode <- n + + }, + }, + ) + + defer func() { + // Will not explicitly close newNode channel here due to + // race condition where stopCh and newNode are closed but informer onUpdate still executes. + close(stopCh) + }() + go controller.Run(stopCh) + + By(fmt.Sprintf("Block traffic from node %s to the master", node.Name)) + host := getNodeExternalIP(&node) + master := getMaster(c) + defer func() { + By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name)) + unblockNetwork(host, master) + + if CurrentGinkgoTestDescription().Failed { + return + } + + By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers") + expectNodeReadiness(true, newNode) + if err = waitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, podRunningReady); err != nil { + Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err) + } + }() + + blockNetwork(host, master) + + By("Expect to observe node and pod status change from Ready to NotReady after network partition") + expectNodeReadiness(false, newNode) + if err = waitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, podNotReady); err != nil { + Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err) + } + }) }) }) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index b35f62254b5..2fd955a3fd6 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubectl" + "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" @@ -354,6 +355,16 @@ func podRunningReady(p *api.Pod) (bool, error) { return true, nil } +// podNotReady checks whether pod p's has a ready condition of status false. +func podNotReady(p *api.Pod) (bool, error) { + // Check the ready condition is false. + if podReady(p) { + return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v", + p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionFalse, p.Status.Conditions) + } + return true, nil +} + // check if a Pod is controlled by a Replication Controller in the List func hasReplicationControllersForPod(rcs *api.ReplicationControllerList, pod api.Pod) bool { for _, rc := range rcs.Items { @@ -545,6 +556,33 @@ func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout tim return fmt.Errorf("gave up waiting for pod '%s' to be '%s' after %v", podName, desc, timeout) } +// waitForMatchPodsCondition finds match pods based on the input ListOptions. +// waits and checks if all match pods are in the given podCondition +func waitForMatchPodsCondition(c *client.Client, opts api.ListOptions, desc string, timeout time.Duration, condition podCondition) error { + Logf("Waiting up to %v for matching pods' status to be %s", timeout, desc) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + pods, err := c.Pods(api.NamespaceAll).List(opts) + if err != nil { + return err + } + conditionNotMatch := []string{} + for _, pod := range pods.Items { + done, err := condition(&pod) + if done && err != nil { + return fmt.Errorf("Unexpected error: %v", err) + } + if !done { + conditionNotMatch = append(conditionNotMatch, format.Pod(&pod)) + } + } + if len(conditionNotMatch) <= 0 { + return err + } + Logf("%d pods are not %s", len(conditionNotMatch), desc) + } + return fmt.Errorf("gave up waiting for matching pods to be '%s' after %v", desc, timeout) +} + // waitForDefaultServiceAccountInNamespace waits for the default service account to be provisioned // the default service account is what is associated with pods when they do not specify a service account // as a result, pods are not able to be provisioned in a namespace until the service account is provisioned