From 83f1212e0bb3e7b6d1bd6769583bb45681da7e38 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 23 Jun 2015 16:20:04 -0700 Subject: [PATCH] Confirms daemons restart and do sane things in an e2e test --- hack/jenkins/e2e.sh | 4 +- test/e2e/daemon_restart.go | 275 +++++++++++++++++++++++++++++++++++++ test/e2e/examples.go | 8 +- test/e2e/load.go | 2 +- test/e2e/util.go | 5 +- 5 files changed, 287 insertions(+), 7 deletions(-) create mode 100644 test/e2e/daemon_restart.go diff --git a/hack/jenkins/e2e.sh b/hack/jenkins/e2e.sh index 5e703805b95..cace0069062 100755 --- a/hack/jenkins/e2e.sh +++ b/hack/jenkins/e2e.sh @@ -97,6 +97,7 @@ GCE_DEFAULT_SKIP_TESTS=( # -flaky- build variants. GCE_FLAKY_TESTS=( "Autoscaling" + "DaemonRestart" "ResourceUsage" ) @@ -117,6 +118,7 @@ GCE_PARALLEL_SKIP_TESTS=( # Tests which are known to be flaky when run in parallel. GCE_PARALLEL_FLAKY_TESTS=( + "DaemonRestart" "Elasticsearch" "PD" "ServiceAccounts" @@ -133,7 +135,7 @@ GCE_SOAK_CONTINUOUS_SKIP_TESTS=( "Density.*30\spods" "Elasticsearch" "Etcd.*SIGKILL" - "external\sload\sbalancer" + "external\sload\sbalancer" "identically\snamed\sservices" "network\spartition" "Reboot" diff --git a/test/e2e/daemon_restart.go b/test/e2e/daemon_restart.go new file mode 100644 index 00000000000..a7a8c19e5c0 --- /dev/null +++ b/test/e2e/daemon_restart.go @@ -0,0 +1,275 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "strconv" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client" + "k8s.io/kubernetes/pkg/client/cache" + controllerFramework "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/master/ports" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// This test primarily checks 2 things: +// 1. Daemons restart automatically within some sane time (10m). +// 2. They don't take abnormal actions when restarted in the steady state. +// - Controller manager sholdn't overshoot replicas +// - Kubelet shouldn't restart containers +// - Scheduler should continue assigning hosts to new pods + +const ( + restartPollInterval = 5 * time.Second + restartTimeout = 10 * time.Minute + numPods = 10 + sshPort = 22 +) + +// nodeExec execs the given cmd on node via SSH. Note that the nodeName is an sshable name, +// eg: the name returned by getMasterHost(). This is also not guaranteed to work across +// cloud providers since it involves ssh. +func nodeExec(nodeName, cmd string) (string, string, int, error) { + stdout, stderr, code, err := SSH(cmd, fmt.Sprintf("%v:%v", nodeName, sshPort), testContext.Provider) + Expect(err).NotTo(HaveOccurred()) + return stdout, stderr, code, err +} + +// restartDaemonConfig is a config to restart a running daemon on a node, and wait till +// it comes back up. It uses ssh to send a SIGTERM to the daemon. +type restartDaemonConfig struct { + nodeName string + daemonName string + healthzPort int + pollInterval time.Duration + pollTimeout time.Duration +} + +// NewRestartConfig creates a restartDaemonConfig for the given node and daemon. +func NewRestartConfig(nodeName, daemonName string, healthzPort int, pollInterval, pollTimeout time.Duration) *restartDaemonConfig { + if !providerIs("gce") { + Logf("WARNING: SSH through the restart config might not work on %s", testContext.Provider) + } + return &restartDaemonConfig{ + nodeName: nodeName, + daemonName: daemonName, + healthzPort: healthzPort, + pollInterval: pollInterval, + pollTimeout: pollTimeout, + } +} + +func (r *restartDaemonConfig) String() string { + return fmt.Sprintf("Daemon %v on node %v", r.daemonName, r.nodeName) +} + +// waitUp polls healthz of the daemon till it returns "ok" or the polling hits the pollTimeout +func (r *restartDaemonConfig) waitUp() { + Logf("Checking if %v is up by polling for a 200 on its /healthz endpoint", r) + healthzCheck := fmt.Sprintf( + "curl -s -o /dev/null -I -w \"%%{http_code}\" http://localhost:%v/healthz", r.healthzPort) + + err := wait.Poll(r.pollInterval, r.pollTimeout, func() (bool, error) { + stdout, stderr, code, err := nodeExec(r.nodeName, healthzCheck) + expectNoError(err) + if code == 0 { + httpCode, err := strconv.Atoi(stdout) + if err != nil { + Logf("Unable to parse healthz http return code: %v", err) + } else if httpCode == 200 { + return true, nil + } + } + Logf("node %v exec command, '%v' failed with exitcode %v: \n\tstdout: %v\n\tstderr: %v", + r.nodeName, healthzCheck, code, stdout, stderr) + return false, nil + }) + expectNoError(err, "%v did not respond with a 200 via %v within %v", r, healthzCheck, r.pollTimeout) +} + +// kill sends a SIGTERM to the daemon +func (r *restartDaemonConfig) kill() { + Logf("Killing %v", r) + nodeExec(r.nodeName, fmt.Sprintf("pgrep %v | xargs -I {} sudo kill {}", r.daemonName)) +} + +// Restart checks if the daemon is up, kills it, and waits till it comes back up +func (r *restartDaemonConfig) restart() { + r.waitUp() + r.kill() + r.waitUp() +} + +// replacePods replaces content of the store with the given pods. +func replacePods(pods []*api.Pod, store cache.Store) { + found := make([]interface{}, 0, len(pods)) + for i := range pods { + found = append(found, pods[i]) + } + expectNoError(store.Replace(found)) +} + +// getContainerRestarts returns the count of container restarts across all pods matching the given labelSelector, +// and a list of nodenames across which these containers restarted. +func getContainerRestarts(c *client.Client, ns string, labelSelector labels.Selector) (int, []string) { + pods, err := c.Pods(ns).List(labelSelector, fields.Everything()) + expectNoError(err) + failedContainers := 0 + containerRestartNodes := util.NewStringSet() + for _, p := range pods.Items { + for _, v := range FailedContainers(&p) { + failedContainers = failedContainers + v.restarts + containerRestartNodes.Insert(p.Spec.NodeName) + } + } + return failedContainers, containerRestartNodes.List() +} + +var _ = Describe("DaemonRestart", func() { + + framework := Framework{BaseName: "daemonrestart"} + rcName := "daemonrestart" + strconv.Itoa(numPods) + "-" + string(util.NewUUID()) + labelSelector := labels.Set(map[string]string{"name": rcName}).AsSelector() + existingPods := cache.NewStore(cache.MetaNamespaceKeyFunc) + var ns string + var config RCConfig + var controller *controllerFramework.Controller + var newPods cache.Store + var stopCh chan struct{} + + BeforeEach(func() { + + // These tests require SSH + // TODO: Enable on gke after testing (#11834) + if !providerIs("gce") { + By(fmt.Sprintf("Skipping test, which is not implemented for %s", testContext.Provider)) + return + } + framework.beforeEach() + ns = framework.Namespace.Name + + // All the restart tests need an rc and a watch on pods of the rc. + // Additionally some of them might scale the rc during the test. + config = RCConfig{ + Client: framework.Client, + Name: rcName, + Namespace: ns, + Image: "kubernetes/pause", + Replicas: numPods, + CreatedPods: &[]*api.Pod{}, + } + Expect(RunRC(config)).NotTo(HaveOccurred()) + replacePods(*config.CreatedPods, existingPods) + + stopCh = make(chan struct{}) + newPods, controller = controllerFramework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return framework.Client.Pods(ns).List(labelSelector, fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return framework.Client.Pods(ns).Watch(labelSelector, fields.Everything(), rv) + }, + }, + &api.Pod{}, + 0, + controllerFramework.ResourceEventHandlerFuncs{}, + ) + go controller.Run(stopCh) + }) + + AfterEach(func() { + close(stopCh) + expectNoError(DeleteRC(framework.Client, ns, rcName)) + framework.afterEach() + }) + + It("Controller Manager should not create/delete replicas across restart", func() { + + restarter := NewRestartConfig( + getMasterHost(), "kube-controller", ports.ControllerManagerPort, restartPollInterval, restartTimeout) + restarter.restart() + + // The intent is to ensure the replication controller manager has observed and reported status of + // the replication controller at least once since the manager restarted, so that we can determine + // that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC + // to the same size achieves this, because the scale operation advances the RC's sequence number + // and awaits it to be observed and reported back in the RC's status. + ScaleRC(framework.Client, ns, rcName, numPods, true) + + // Only check the keys, the pods can be different if the kubelet updated it. + // TODO: Can it really? + existingKeys := util.NewStringSet() + newKeys := util.NewStringSet() + for _, k := range existingPods.ListKeys() { + existingKeys.Insert(k) + } + for _, k := range newPods.ListKeys() { + newKeys.Insert(k) + } + if len(newKeys.List()) != len(existingKeys.List()) || + !newKeys.IsSuperset(existingKeys) { + Failf("RcManager created/deleted pods after restart") + } + }) + + It("Scheduler should continue assigning pods to nodes across restart", func() { + + restarter := NewRestartConfig( + getMasterHost(), "kube-scheduler", ports.SchedulerPort, restartPollInterval, restartTimeout) + + // Create pods while the scheduler is down and make sure the scheduler picks them up by + // scaling the rc to the same size. + restarter.waitUp() + restarter.kill() + // This is best effort to try and create pods while the scheduler is down, + // since we don't know exactly when it is restarted after the kill signal. + expectNoError(ScaleRC(framework.Client, ns, rcName, numPods+5, false)) + restarter.waitUp() + expectNoError(ScaleRC(framework.Client, ns, rcName, numPods+5, true)) + }) + + It("Kubelet should not restart containers across restart", func() { + nodeIPs, err := getMinionPublicIps(framework.Client) + expectNoError(err) + preRestarts, badNodes := getContainerRestarts(framework.Client, ns, labelSelector) + if preRestarts != 0 { + Logf("WARNING: Non-zero container restart count: %d across nodes %v", preRestarts, badNodes) + } + for _, ip := range nodeIPs { + restarter := NewRestartConfig( + ip, "kubelet", ports.KubeletReadOnlyPort, restartPollInterval, restartTimeout) + restarter.restart() + } + postRestarts, badNodes := getContainerRestarts(framework.Client, ns, labelSelector) + if postRestarts != preRestarts { + dumpNodeDebugInfo(framework.Client, badNodes) + Failf("Net container restart count went from %v -> %v after kubelet restart on nodes %v", preRestarts, postRestarts, badNodes) + } + }) +}) diff --git a/test/e2e/examples.go b/test/e2e/examples.go index b645f7e9ef6..440e9417039 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -194,7 +194,7 @@ var _ = Describe("Examples e2e", func() { By("starting workers") runKubectl("create", "-f", workerControllerJson, nsFlag) - ScaleRC(c, ns, "spark-worker-controller", 2) + ScaleRC(c, ns, "spark-worker-controller", 2, true) forEachPod(c, ns, "name", "spark-worker", func(pod api.Pod) { _, err := lookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) @@ -223,7 +223,7 @@ var _ = Describe("Examples e2e", func() { By("create and scale rc") runKubectl("create", "-f", controllerYaml, nsFlag) - err = ScaleRC(c, ns, "cassandra", 2) + err = ScaleRC(c, ns, "cassandra", 2, true) Expect(err).NotTo(HaveOccurred()) forEachPod(c, ns, "name", "cassandra", func(pod api.Pod) { _, err = lookForStringInLog(ns, pod.Name, "cassandra", "Listening for thrift clients", serverStartTimeout) @@ -384,7 +384,7 @@ var _ = Describe("Examples e2e", func() { checkDbInstances() By("scaling rethinkdb") - ScaleRC(c, ns, "rethinkdb-rc", 2) + ScaleRC(c, ns, "rethinkdb-rc", 2, true) checkDbInstances() By("starting admin") @@ -421,7 +421,7 @@ var _ = Describe("Examples e2e", func() { }) By("scaling hazelcast") - ScaleRC(c, ns, "hazelcast", 2) + ScaleRC(c, ns, "hazelcast", 2, true) forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) { _, err := lookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/load.go b/test/e2e/load.go index 3cf9d2ac77b..c041d8424a2 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -213,7 +213,7 @@ func scaleRC(wg *sync.WaitGroup, config *RCConfig) { sleepUpTo(resizingTime) newSize := uint(rand.Intn(config.Replicas) + config.Replicas/2) - expectNoError(ScaleRC(config.Client, config.Namespace, config.Name, newSize), + expectNoError(ScaleRC(config.Client, config.Namespace, config.Name, newSize, true), fmt.Sprintf("scaling rc %s for the first time", config.Name)) selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) _, err := config.Client.Pods(config.Namespace).List(selector, fields.Everything()) diff --git a/test/e2e/util.go b/test/e2e/util.go index af69273ce72..7209e271df2 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1376,7 +1376,7 @@ func getNodeEvents(c *client.Client, nodeName string) []api.Event { return events.Items } -func ScaleRC(c *client.Client, ns, name string, size uint) error { +func ScaleRC(c *client.Client, ns, name string, size uint, wait bool) error { By(fmt.Sprintf("%v Scaling replication controller %s in namespace %s to %d", time.Now(), name, ns, size)) scaler, err := kubectl.ScalerFor("ReplicationController", kubectl.NewScalerClient(c)) if err != nil { @@ -1387,6 +1387,9 @@ func ScaleRC(c *client.Client, ns, name string, size uint) error { if err = scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil { return err } + if !wait { + return nil + } return waitForRCPodsRunning(c, ns, name) }