mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #11339 from bprashanth/restart_test
Confirms daemons restart and do sane things in an e2e test
This commit is contained in:
commit
7158f8a5df
@ -97,6 +97,7 @@ GCE_DEFAULT_SKIP_TESTS=(
|
|||||||
# -flaky- build variants.
|
# -flaky- build variants.
|
||||||
GCE_FLAKY_TESTS=(
|
GCE_FLAKY_TESTS=(
|
||||||
"Autoscaling"
|
"Autoscaling"
|
||||||
|
"DaemonRestart"
|
||||||
"ResourceUsage"
|
"ResourceUsage"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -117,6 +118,7 @@ GCE_PARALLEL_SKIP_TESTS=(
|
|||||||
|
|
||||||
# Tests which are known to be flaky when run in parallel.
|
# Tests which are known to be flaky when run in parallel.
|
||||||
GCE_PARALLEL_FLAKY_TESTS=(
|
GCE_PARALLEL_FLAKY_TESTS=(
|
||||||
|
"DaemonRestart"
|
||||||
"Elasticsearch"
|
"Elasticsearch"
|
||||||
"PD"
|
"PD"
|
||||||
"ServiceAccounts"
|
"ServiceAccounts"
|
||||||
|
275
test/e2e/daemon_restart.go
Normal file
275
test/e2e/daemon_restart.go
Normal file
@ -0,0 +1,275 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package e2e
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
|
controllerFramework "k8s.io/kubernetes/pkg/controller/framework"
|
||||||
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
"k8s.io/kubernetes/pkg/master/ports"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This test primarily checks 2 things:
|
||||||
|
// 1. Daemons restart automatically within some sane time (10m).
|
||||||
|
// 2. They don't take abnormal actions when restarted in the steady state.
|
||||||
|
// - Controller manager sholdn't overshoot replicas
|
||||||
|
// - Kubelet shouldn't restart containers
|
||||||
|
// - Scheduler should continue assigning hosts to new pods
|
||||||
|
|
||||||
|
const (
|
||||||
|
restartPollInterval = 5 * time.Second
|
||||||
|
restartTimeout = 10 * time.Minute
|
||||||
|
numPods = 10
|
||||||
|
sshPort = 22
|
||||||
|
)
|
||||||
|
|
||||||
|
// nodeExec execs the given cmd on node via SSH. Note that the nodeName is an sshable name,
|
||||||
|
// eg: the name returned by getMasterHost(). This is also not guaranteed to work across
|
||||||
|
// cloud providers since it involves ssh.
|
||||||
|
func nodeExec(nodeName, cmd string) (string, string, int, error) {
|
||||||
|
stdout, stderr, code, err := SSH(cmd, fmt.Sprintf("%v:%v", nodeName, sshPort), testContext.Provider)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
return stdout, stderr, code, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// restartDaemonConfig is a config to restart a running daemon on a node, and wait till
|
||||||
|
// it comes back up. It uses ssh to send a SIGTERM to the daemon.
|
||||||
|
type restartDaemonConfig struct {
|
||||||
|
nodeName string
|
||||||
|
daemonName string
|
||||||
|
healthzPort int
|
||||||
|
pollInterval time.Duration
|
||||||
|
pollTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRestartConfig creates a restartDaemonConfig for the given node and daemon.
|
||||||
|
func NewRestartConfig(nodeName, daemonName string, healthzPort int, pollInterval, pollTimeout time.Duration) *restartDaemonConfig {
|
||||||
|
if !providerIs("gce") {
|
||||||
|
Logf("WARNING: SSH through the restart config might not work on %s", testContext.Provider)
|
||||||
|
}
|
||||||
|
return &restartDaemonConfig{
|
||||||
|
nodeName: nodeName,
|
||||||
|
daemonName: daemonName,
|
||||||
|
healthzPort: healthzPort,
|
||||||
|
pollInterval: pollInterval,
|
||||||
|
pollTimeout: pollTimeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *restartDaemonConfig) String() string {
|
||||||
|
return fmt.Sprintf("Daemon %v on node %v", r.daemonName, r.nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitUp polls healthz of the daemon till it returns "ok" or the polling hits the pollTimeout
|
||||||
|
func (r *restartDaemonConfig) waitUp() {
|
||||||
|
Logf("Checking if %v is up by polling for a 200 on its /healthz endpoint", r)
|
||||||
|
healthzCheck := fmt.Sprintf(
|
||||||
|
"curl -s -o /dev/null -I -w \"%%{http_code}\" http://localhost:%v/healthz", r.healthzPort)
|
||||||
|
|
||||||
|
err := wait.Poll(r.pollInterval, r.pollTimeout, func() (bool, error) {
|
||||||
|
stdout, stderr, code, err := nodeExec(r.nodeName, healthzCheck)
|
||||||
|
expectNoError(err)
|
||||||
|
if code == 0 {
|
||||||
|
httpCode, err := strconv.Atoi(stdout)
|
||||||
|
if err != nil {
|
||||||
|
Logf("Unable to parse healthz http return code: %v", err)
|
||||||
|
} else if httpCode == 200 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Logf("node %v exec command, '%v' failed with exitcode %v: \n\tstdout: %v\n\tstderr: %v",
|
||||||
|
r.nodeName, healthzCheck, code, stdout, stderr)
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
expectNoError(err, "%v did not respond with a 200 via %v within %v", r, healthzCheck, r.pollTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// kill sends a SIGTERM to the daemon
|
||||||
|
func (r *restartDaemonConfig) kill() {
|
||||||
|
Logf("Killing %v", r)
|
||||||
|
nodeExec(r.nodeName, fmt.Sprintf("pgrep %v | xargs -I {} sudo kill {}", r.daemonName))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart checks if the daemon is up, kills it, and waits till it comes back up
|
||||||
|
func (r *restartDaemonConfig) restart() {
|
||||||
|
r.waitUp()
|
||||||
|
r.kill()
|
||||||
|
r.waitUp()
|
||||||
|
}
|
||||||
|
|
||||||
|
// replacePods replaces content of the store with the given pods.
|
||||||
|
func replacePods(pods []*api.Pod, store cache.Store) {
|
||||||
|
found := make([]interface{}, 0, len(pods))
|
||||||
|
for i := range pods {
|
||||||
|
found = append(found, pods[i])
|
||||||
|
}
|
||||||
|
expectNoError(store.Replace(found))
|
||||||
|
}
|
||||||
|
|
||||||
|
// getContainerRestarts returns the count of container restarts across all pods matching the given labelSelector,
|
||||||
|
// and a list of nodenames across which these containers restarted.
|
||||||
|
func getContainerRestarts(c *client.Client, ns string, labelSelector labels.Selector) (int, []string) {
|
||||||
|
pods, err := c.Pods(ns).List(labelSelector, fields.Everything())
|
||||||
|
expectNoError(err)
|
||||||
|
failedContainers := 0
|
||||||
|
containerRestartNodes := util.NewStringSet()
|
||||||
|
for _, p := range pods.Items {
|
||||||
|
for _, v := range FailedContainers(&p) {
|
||||||
|
failedContainers = failedContainers + v.restarts
|
||||||
|
containerRestartNodes.Insert(p.Spec.NodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return failedContainers, containerRestartNodes.List()
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ = Describe("DaemonRestart", func() {
|
||||||
|
|
||||||
|
framework := Framework{BaseName: "daemonrestart"}
|
||||||
|
rcName := "daemonrestart" + strconv.Itoa(numPods) + "-" + string(util.NewUUID())
|
||||||
|
labelSelector := labels.Set(map[string]string{"name": rcName}).AsSelector()
|
||||||
|
existingPods := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
var ns string
|
||||||
|
var config RCConfig
|
||||||
|
var controller *controllerFramework.Controller
|
||||||
|
var newPods cache.Store
|
||||||
|
var stopCh chan struct{}
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
|
||||||
|
// These tests require SSH
|
||||||
|
// TODO: Enable on gke after testing (#11834)
|
||||||
|
if !providerIs("gce") {
|
||||||
|
By(fmt.Sprintf("Skipping test, which is not implemented for %s", testContext.Provider))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
framework.beforeEach()
|
||||||
|
ns = framework.Namespace.Name
|
||||||
|
|
||||||
|
// All the restart tests need an rc and a watch on pods of the rc.
|
||||||
|
// Additionally some of them might scale the rc during the test.
|
||||||
|
config = RCConfig{
|
||||||
|
Client: framework.Client,
|
||||||
|
Name: rcName,
|
||||||
|
Namespace: ns,
|
||||||
|
Image: "kubernetes/pause",
|
||||||
|
Replicas: numPods,
|
||||||
|
CreatedPods: &[]*api.Pod{},
|
||||||
|
}
|
||||||
|
Expect(RunRC(config)).NotTo(HaveOccurred())
|
||||||
|
replacePods(*config.CreatedPods, existingPods)
|
||||||
|
|
||||||
|
stopCh = make(chan struct{})
|
||||||
|
newPods, controller = controllerFramework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func() (runtime.Object, error) {
|
||||||
|
return framework.Client.Pods(ns).List(labelSelector, fields.Everything())
|
||||||
|
},
|
||||||
|
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||||
|
return framework.Client.Pods(ns).Watch(labelSelector, fields.Everything(), rv)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&api.Pod{},
|
||||||
|
0,
|
||||||
|
controllerFramework.ResourceEventHandlerFuncs{},
|
||||||
|
)
|
||||||
|
go controller.Run(stopCh)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
close(stopCh)
|
||||||
|
expectNoError(DeleteRC(framework.Client, ns, rcName))
|
||||||
|
framework.afterEach()
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Controller Manager should not create/delete replicas across restart", func() {
|
||||||
|
|
||||||
|
restarter := NewRestartConfig(
|
||||||
|
getMasterHost(), "kube-controller", ports.ControllerManagerPort, restartPollInterval, restartTimeout)
|
||||||
|
restarter.restart()
|
||||||
|
|
||||||
|
// The intent is to ensure the replication controller manager has observed and reported status of
|
||||||
|
// the replication controller at least once since the manager restarted, so that we can determine
|
||||||
|
// that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC
|
||||||
|
// to the same size achieves this, because the scale operation advances the RC's sequence number
|
||||||
|
// and awaits it to be observed and reported back in the RC's status.
|
||||||
|
ScaleRC(framework.Client, ns, rcName, numPods, true)
|
||||||
|
|
||||||
|
// Only check the keys, the pods can be different if the kubelet updated it.
|
||||||
|
// TODO: Can it really?
|
||||||
|
existingKeys := util.NewStringSet()
|
||||||
|
newKeys := util.NewStringSet()
|
||||||
|
for _, k := range existingPods.ListKeys() {
|
||||||
|
existingKeys.Insert(k)
|
||||||
|
}
|
||||||
|
for _, k := range newPods.ListKeys() {
|
||||||
|
newKeys.Insert(k)
|
||||||
|
}
|
||||||
|
if len(newKeys.List()) != len(existingKeys.List()) ||
|
||||||
|
!newKeys.IsSuperset(existingKeys) {
|
||||||
|
Failf("RcManager created/deleted pods after restart")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Scheduler should continue assigning pods to nodes across restart", func() {
|
||||||
|
|
||||||
|
restarter := NewRestartConfig(
|
||||||
|
getMasterHost(), "kube-scheduler", ports.SchedulerPort, restartPollInterval, restartTimeout)
|
||||||
|
|
||||||
|
// Create pods while the scheduler is down and make sure the scheduler picks them up by
|
||||||
|
// scaling the rc to the same size.
|
||||||
|
restarter.waitUp()
|
||||||
|
restarter.kill()
|
||||||
|
// This is best effort to try and create pods while the scheduler is down,
|
||||||
|
// since we don't know exactly when it is restarted after the kill signal.
|
||||||
|
expectNoError(ScaleRC(framework.Client, ns, rcName, numPods+5, false))
|
||||||
|
restarter.waitUp()
|
||||||
|
expectNoError(ScaleRC(framework.Client, ns, rcName, numPods+5, true))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Kubelet should not restart containers across restart", func() {
|
||||||
|
nodeIPs, err := getMinionPublicIps(framework.Client)
|
||||||
|
expectNoError(err)
|
||||||
|
preRestarts, badNodes := getContainerRestarts(framework.Client, ns, labelSelector)
|
||||||
|
if preRestarts != 0 {
|
||||||
|
Logf("WARNING: Non-zero container restart count: %d across nodes %v", preRestarts, badNodes)
|
||||||
|
}
|
||||||
|
for _, ip := range nodeIPs {
|
||||||
|
restarter := NewRestartConfig(
|
||||||
|
ip, "kubelet", ports.KubeletReadOnlyPort, restartPollInterval, restartTimeout)
|
||||||
|
restarter.restart()
|
||||||
|
}
|
||||||
|
postRestarts, badNodes := getContainerRestarts(framework.Client, ns, labelSelector)
|
||||||
|
if postRestarts != preRestarts {
|
||||||
|
dumpNodeDebugInfo(framework.Client, badNodes)
|
||||||
|
Failf("Net container restart count went from %v -> %v after kubelet restart on nodes %v", preRestarts, postRestarts, badNodes)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
@ -194,7 +194,7 @@ var _ = Describe("Examples e2e", func() {
|
|||||||
|
|
||||||
By("starting workers")
|
By("starting workers")
|
||||||
runKubectl("create", "-f", workerControllerJson, nsFlag)
|
runKubectl("create", "-f", workerControllerJson, nsFlag)
|
||||||
ScaleRC(c, ns, "spark-worker-controller", 2)
|
ScaleRC(c, ns, "spark-worker-controller", 2, true)
|
||||||
forEachPod(c, ns, "name", "spark-worker", func(pod api.Pod) {
|
forEachPod(c, ns, "name", "spark-worker", func(pod api.Pod) {
|
||||||
_, err := lookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
|
_, err := lookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
@ -223,7 +223,7 @@ var _ = Describe("Examples e2e", func() {
|
|||||||
|
|
||||||
By("create and scale rc")
|
By("create and scale rc")
|
||||||
runKubectl("create", "-f", controllerYaml, nsFlag)
|
runKubectl("create", "-f", controllerYaml, nsFlag)
|
||||||
err = ScaleRC(c, ns, "cassandra", 2)
|
err = ScaleRC(c, ns, "cassandra", 2, true)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
forEachPod(c, ns, "name", "cassandra", func(pod api.Pod) {
|
forEachPod(c, ns, "name", "cassandra", func(pod api.Pod) {
|
||||||
_, err = lookForStringInLog(ns, pod.Name, "cassandra", "Listening for thrift clients", serverStartTimeout)
|
_, err = lookForStringInLog(ns, pod.Name, "cassandra", "Listening for thrift clients", serverStartTimeout)
|
||||||
@ -384,7 +384,7 @@ var _ = Describe("Examples e2e", func() {
|
|||||||
checkDbInstances()
|
checkDbInstances()
|
||||||
|
|
||||||
By("scaling rethinkdb")
|
By("scaling rethinkdb")
|
||||||
ScaleRC(c, ns, "rethinkdb-rc", 2)
|
ScaleRC(c, ns, "rethinkdb-rc", 2, true)
|
||||||
checkDbInstances()
|
checkDbInstances()
|
||||||
|
|
||||||
By("starting admin")
|
By("starting admin")
|
||||||
@ -421,7 +421,7 @@ var _ = Describe("Examples e2e", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
By("scaling hazelcast")
|
By("scaling hazelcast")
|
||||||
ScaleRC(c, ns, "hazelcast", 2)
|
ScaleRC(c, ns, "hazelcast", 2, true)
|
||||||
forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
|
forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
|
||||||
_, err := lookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout)
|
_, err := lookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -213,7 +213,7 @@ func scaleRC(wg *sync.WaitGroup, config *RCConfig) {
|
|||||||
|
|
||||||
sleepUpTo(resizingTime)
|
sleepUpTo(resizingTime)
|
||||||
newSize := uint(rand.Intn(config.Replicas) + config.Replicas/2)
|
newSize := uint(rand.Intn(config.Replicas) + config.Replicas/2)
|
||||||
expectNoError(ScaleRC(config.Client, config.Namespace, config.Name, newSize),
|
expectNoError(ScaleRC(config.Client, config.Namespace, config.Name, newSize, true),
|
||||||
fmt.Sprintf("scaling rc %s for the first time", config.Name))
|
fmt.Sprintf("scaling rc %s for the first time", config.Name))
|
||||||
selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
|
selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
|
||||||
_, err := config.Client.Pods(config.Namespace).List(selector, fields.Everything())
|
_, err := config.Client.Pods(config.Namespace).List(selector, fields.Everything())
|
||||||
|
@ -1376,7 +1376,7 @@ func getNodeEvents(c *client.Client, nodeName string) []api.Event {
|
|||||||
return events.Items
|
return events.Items
|
||||||
}
|
}
|
||||||
|
|
||||||
func ScaleRC(c *client.Client, ns, name string, size uint) error {
|
func ScaleRC(c *client.Client, ns, name string, size uint, wait bool) error {
|
||||||
By(fmt.Sprintf("%v Scaling replication controller %s in namespace %s to %d", time.Now(), name, ns, size))
|
By(fmt.Sprintf("%v Scaling replication controller %s in namespace %s to %d", time.Now(), name, ns, size))
|
||||||
scaler, err := kubectl.ScalerFor("ReplicationController", kubectl.NewScalerClient(c))
|
scaler, err := kubectl.ScalerFor("ReplicationController", kubectl.NewScalerClient(c))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1387,6 +1387,9 @@ func ScaleRC(c *client.Client, ns, name string, size uint) error {
|
|||||||
if err = scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil {
|
if err = scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if !wait {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return waitForRCPodsRunning(c, ns, name)
|
return waitForRCPodsRunning(c, ns, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user