From 7681a1bdff0552e0008c805fe56bbbfec85d0624 Mon Sep 17 00:00:00 2001 From: gmarek Date: Wed, 12 Oct 2016 13:37:37 +0200 Subject: [PATCH] Move RunRC-like functions to test/utils --- test/e2e/autoscaling_utils.go | 10 +- test/e2e/cluster_size_autoscaling.go | 9 +- test/e2e/common/container_probe.go | 5 +- test/e2e/daemon_restart.go | 7 +- test/e2e/dashboard.go | 3 +- test/e2e/density.go | 17 +- test/e2e/etcd_failure.go | 3 +- test/e2e/examples.go | 21 +- test/e2e/framework/framework.go | 3 +- test/e2e/framework/util.go | 719 ++------------------------- test/e2e/ingress_utils.go | 3 +- test/e2e/kubectl.go | 5 +- test/e2e/kubelet.go | 3 +- test/e2e/kubelet_perf.go | 3 +- test/e2e/load.go | 27 +- test/e2e/proxy.go | 3 +- test/e2e/reboot.go | 5 +- test/e2e/rescheduler.go | 3 +- test/e2e/resize_nodes.go | 7 +- test/e2e/restart.go | 7 +- test/e2e/scheduler_predicates.go | 5 +- test/e2e/service.go | 3 +- test/e2e/service_latency.go | 3 +- test/e2e/ubernetes_lite.go | 3 +- test/utils/conditions.go | 104 ++++ test/utils/pod_store.go | 67 +++ test/utils/runners.go | 603 ++++++++++++++++++++++ 27 files changed, 895 insertions(+), 756 deletions(-) create mode 100644 test/utils/conditions.go create mode 100644 test/utils/pod_store.go create mode 100644 test/utils/runners.go diff --git a/test/e2e/autoscaling_utils.go b/test/e2e/autoscaling_utils.go index 2d64991b9e1..b35b520bef9 100644 --- a/test/e2e/autoscaling_utils.go +++ b/test/e2e/autoscaling_utils.go @@ -25,6 +25,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" ) @@ -327,7 +328,7 @@ func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind s }) framework.ExpectNoError(err) - rcConfig := framework.RCConfig{ + rcConfig := testutils.RCConfig{ Client: c, Image: resourceConsumerImage, Name: name, @@ -345,15 +346,16 @@ func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind s framework.ExpectNoError(framework.RunRC(rcConfig)) break case kindDeployment: - dpConfig := framework.DeploymentConfig{ + dpConfig := testutils.DeploymentConfig{ RCConfig: rcConfig, } framework.ExpectNoError(framework.RunDeployment(dpConfig)) break case kindReplicaSet: - rsConfig := framework.ReplicaSetConfig{ + rsConfig := testutils.ReplicaSetConfig{ RCConfig: rcConfig, } + By(fmt.Sprintf("creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace)) framework.ExpectNoError(framework.RunReplicaSet(rsConfig)) break default: @@ -380,7 +382,7 @@ func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind s framework.ExpectNoError(err) dnsClusterFirst := api.DNSClusterFirst - controllerRcConfig := framework.RCConfig{ + controllerRcConfig := testutils.RCConfig{ Client: c, Image: resourceConsumerControllerImage, Name: controllerName, diff --git a/test/e2e/cluster_size_autoscaling.go b/test/e2e/cluster_size_autoscaling.go index af784574bae..d0478340402 100644 --- a/test/e2e/cluster_size_autoscaling.go +++ b/test/e2e/cluster_size_autoscaling.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" "github.com/golang/glog" . "github.com/onsi/ginkgo" @@ -456,7 +457,7 @@ func doPut(url, content string) (string, error) { func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) { By(fmt.Sprintf("Running RC which reserves host port and defines node selector")) - config := &framework.RCConfig{ + config := &testutils.RCConfig{ Client: f.Client, Name: "node-selector", Namespace: f.Namespace.Name, @@ -474,7 +475,7 @@ func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nod func CreateHostPortPods(f *framework.Framework, id string, replicas int, expectRunning bool) { By(fmt.Sprintf("Running RC which reserves host port")) - config := &framework.RCConfig{ + config := &testutils.RCConfig{ Client: f.Client, Name: id, Namespace: f.Namespace.Name, @@ -492,7 +493,7 @@ func CreateHostPortPods(f *framework.Framework, id string, replicas int, expectR func ReserveCpu(f *framework.Framework, id string, replicas, millicores int) { By(fmt.Sprintf("Running RC which reserves %v millicores", millicores)) request := int64(millicores / replicas) - config := &framework.RCConfig{ + config := &testutils.RCConfig{ Client: f.Client, Name: id, Namespace: f.Namespace.Name, @@ -507,7 +508,7 @@ func ReserveCpu(f *framework.Framework, id string, replicas, millicores int) { func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool) { By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes)) request := int64(1024 * 1024 * megabytes / replicas) - config := &framework.RCConfig{ + config := &testutils.RCConfig{ Client: f.Client, Name: id, Namespace: f.Namespace.Name, diff --git a/test/e2e/common/container_probe.go b/test/e2e/common/container_probe.go index fef1627a19f..292f1da5748 100644 --- a/test/e2e/common/container_probe.go +++ b/test/e2e/common/container_probe.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -51,7 +52,7 @@ var _ = framework.KubeDescribe("Probing container", func() { p, err := podClient.Get(p.Name) framework.ExpectNoError(err) - isReady, err := framework.PodRunningReady(p) + isReady, err := testutils.PodRunningReady(p) framework.ExpectNoError(err) Expect(isReady).To(BeTrue(), "pod should be ready") @@ -85,7 +86,7 @@ var _ = framework.KubeDescribe("Probing container", func() { p, err := podClient.Get(p.Name) framework.ExpectNoError(err) - isReady, err := framework.PodRunningReady(p) + isReady, err := testutils.PodRunningReady(p) Expect(isReady).NotTo(BeTrue(), "pod should be not ready") restartCount := getRestartCount(p) diff --git a/test/e2e/daemon_restart.go b/test/e2e/daemon_restart.go index 4fad69c8e14..ee3bcb13ce1 100644 --- a/test/e2e/daemon_restart.go +++ b/test/e2e/daemon_restart.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -175,7 +176,7 @@ func getContainerRestarts(c *client.Client, ns string, labelSelector labels.Sele failedContainers := 0 containerRestartNodes := sets.NewString() for _, p := range pods.Items { - for _, v := range framework.FailedContainers(&p) { + for _, v := range testutils.FailedContainers(&p) { failedContainers = failedContainers + v.Restarts containerRestartNodes.Insert(p.Spec.NodeName) } @@ -190,7 +191,7 @@ var _ = framework.KubeDescribe("DaemonRestart [Disruptive]", func() { labelSelector := labels.Set(map[string]string{"name": rcName}).AsSelector() existingPods := cache.NewStore(cache.MetaNamespaceKeyFunc) var ns string - var config framework.RCConfig + var config testutils.RCConfig var controller *cache.Controller var newPods cache.Store var stopCh chan struct{} @@ -203,7 +204,7 @@ var _ = framework.KubeDescribe("DaemonRestart [Disruptive]", func() { // All the restart tests need an rc and a watch on pods of the rc. // Additionally some of them might scale the rc during the test. - config = framework.RCConfig{ + config = testutils.RCConfig{ Client: f.Client, Name: rcName, Namespace: ns, diff --git a/test/e2e/dashboard.go b/test/e2e/dashboard.go index fecad0d8ccd..71b0f0da4e3 100644 --- a/test/e2e/dashboard.go +++ b/test/e2e/dashboard.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -47,7 +48,7 @@ var _ = framework.KubeDescribe("Kubernetes Dashboard", func() { By("Checking to make sure the kubernetes-dashboard pods are running") selector := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": uiAppName})) - err = framework.WaitForPodsWithLabelRunning(f.Client, uiNamespace, selector) + err = testutils.WaitForPodsWithLabelRunning(f.Client, uiNamespace, selector) Expect(err).NotTo(HaveOccurred()) By("Checking to make sure we get a response from the kubernetes-dashboard.") diff --git a/test/e2e/density.go b/test/e2e/density.go index fb2421d62c6..894eba20ba9 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -53,7 +54,7 @@ const ( var MaxContainerFailures = 0 type DensityTestConfig struct { - Configs []framework.RCConfig + Configs []testutils.RCConfig Client *client.Client ClientSet internalclientset.Interface Namespace string @@ -163,7 +164,7 @@ func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceC func logPodStartupStatus(c *client.Client, expectedPods int, ns string, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) { label := labels.SelectorFromSet(labels.Set(observedLabels)) - podStore := framework.NewPodStore(c, ns, label, fields.Everything()) + podStore := testutils.NewPodStore(c, ns, label, fields.Everything()) defer podStore.Stop() ticker := time.NewTicker(period) defer ticker.Stop() @@ -171,11 +172,11 @@ func logPodStartupStatus(c *client.Client, expectedPods int, ns string, observed select { case <-ticker.C: pods := podStore.List() - startupStatus := framework.ComputeRCStartupStatus(pods, expectedPods) + startupStatus := testutils.ComputeRCStartupStatus(pods, expectedPods) framework.Logf(startupStatus.String("Density")) case <-stopCh: pods := podStore.List() - startupStatus := framework.ComputeRCStartupStatus(pods, expectedPods) + startupStatus := testutils.ComputeRCStartupStatus(pods, expectedPods) framework.Logf(startupStatus.String("Density")) return } @@ -471,10 +472,10 @@ var _ = framework.KubeDescribe("Density", func() { // TODO: loop to podsPerNode instead of 1 when we're ready. numberOrRCs := 1 - RCConfigs := make([]framework.RCConfig, numberOrRCs) + RCConfigs := make([]testutils.RCConfig, numberOrRCs) for i := 0; i < numberOrRCs; i++ { RCName := "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid - RCConfigs[i] = framework.RCConfig{Client: c, + RCConfigs[i] = testutils.RCConfig{Client: c, Image: framework.GetPauseImageName(f.Client), Name: RCName, Namespace: ns, @@ -690,14 +691,14 @@ var _ = framework.KubeDescribe("Density", func() { framework.ExpectNoError(err) defer fileHndl.Close() rcCnt := 1 - RCConfigs := make([]framework.RCConfig, rcCnt) + RCConfigs := make([]testutils.RCConfig, rcCnt) podsPerRC := int(totalPods / rcCnt) for i := 0; i < rcCnt; i++ { if i == rcCnt-1 { podsPerRC += int(math.Mod(float64(totalPods), float64(rcCnt))) } RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid - RCConfigs[i] = framework.RCConfig{Client: c, + RCConfigs[i] = testutils.RCConfig{Client: c, Image: framework.GetPauseImageName(f.Client), Name: RCName, Namespace: ns, diff --git a/test/e2e/etcd_failure.go b/test/e2e/etcd_failure.go index bec502ea3ca..b574401e93f 100644 --- a/test/e2e/etcd_failure.go +++ b/test/e2e/etcd_failure.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -40,7 +41,7 @@ var _ = framework.KubeDescribe("Etcd failure [Disruptive]", func() { // providers that provide those capabilities. framework.SkipUnlessProviderIs("gce") - Expect(framework.RunRC(framework.RCConfig{ + Expect(framework.RunRC(testutils.RCConfig{ Client: f.Client, Name: "baz", Namespace: f.Namespace.Name, diff --git a/test/e2e/examples.go b/test/e2e/examples.go index a5f9a644aa7..58b339dfb24 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -94,10 +95,10 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.RunKubectlOrDie("create", "-f", sentinelControllerYaml, nsFlag) framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag) label := labels.SelectorFromSet(labels.Set(map[string]string{sentinelRC: "true"})) - err = framework.WaitForPodsWithLabelRunning(c, ns, label) + err = testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) label = labels.SelectorFromSet(labels.Set(map[string]string{"name": redisRC})) - err = framework.WaitForPodsWithLabelRunning(c, ns, label) + err = testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) By("scaling up the deployment") @@ -110,7 +111,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { checkAllLogs := func() { selectorKey, selectorValue := "name", redisRC label := labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue})) - err = framework.WaitForPodsWithLabelRunning(c, ns, label) + err = testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) forEachPod(selectorKey, selectorValue, func(pod api.Pod) { if pod.Name != bootstrapPodName { @@ -120,7 +121,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { }) selectorKey, selectorValue = sentinelRC, "true" label = labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue})) - err = framework.WaitForPodsWithLabelRunning(c, ns, label) + err = testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) forEachPod(selectorKey, selectorValue, func(pod api.Pod) { if pod.Name != bootstrapPodName { @@ -158,7 +159,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.RunKubectlOrDie("create", "-f", masterYaml, nsFlag) selectorKey, selectorValue := "component", "spark-master" label := labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue})) - err := framework.WaitForPodsWithLabelRunning(c, ns, label) + err := testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) framework.Logf("Now polling for Master startup...") @@ -185,7 +186,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.RunKubectlOrDie("create", "-f", workerControllerYaml, nsFlag) selectorKey, selectorValue := "component", "spark-worker" label := labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue})) - err := framework.WaitForPodsWithLabelRunning(c, ns, label) + err := testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) // For now, scaling is orthogonal to the core test. @@ -223,7 +224,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("Creating a Cassandra RC") framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag) label := labels.SelectorFromSet(labels.Set(map[string]string{"app": "cassandra"})) - err = framework.WaitForPodsWithLabelRunning(c, ns, label) + err = testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) forEachPod("app", "cassandra", func(pod api.Pod) { framework.Logf("Verifying pod %v ", pod.Name) @@ -354,7 +355,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("starting workers") framework.RunKubectlOrDie("create", "-f", workerControllerJson, nsFlag) label := labels.SelectorFromSet(labels.Set(map[string]string{"name": "storm-worker"})) - err = framework.WaitForPodsWithLabelRunning(c, ns, label) + err = testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) forEachPod("name", "storm-worker", func(pod api.Pod) { //do nothing, just wait for the pod to be running @@ -490,7 +491,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.RunKubectlOrDie("create", "-f", driverServiceYaml, nsFlag) framework.RunKubectlOrDie("create", "-f", rethinkDbControllerYaml, nsFlag) label := labels.SelectorFromSet(labels.Set(map[string]string{"db": "rethinkdb"})) - err := framework.WaitForPodsWithLabelRunning(c, ns, label) + err := testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) checkDbInstances := func() { forEachPod("db", "rethinkdb", func(pod api.Pod) { @@ -533,7 +534,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.RunKubectlOrDie("create", "-f", serviceYaml, nsFlag) framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag) label := labels.SelectorFromSet(labels.Set(map[string]string{"name": "hazelcast"})) - err := framework.WaitForPodsWithLabelRunning(c, ns, label) + err := testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) forEachPod("name", "hazelcast", func(pod api.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [1]", serverStartTimeout) diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 3f20465cacd..07deb4d0956 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/metrics" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/wait" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -612,7 +613,7 @@ func (f *Framework) CreateServiceForSimpleAppWithPods(contPort int, svcPort int, theService := f.CreateServiceForSimpleApp(contPort, svcPort, appName) f.CreatePodsPerNodeForSimpleApp(appName, podSpec, count) if block { - err = WaitForPodsWithLabelRunning(f.Client, f.Namespace.Name, labels.SelectorFromSet(labels.Set(theService.Spec.Selector))) + err = testutils.WaitForPodsWithLabelRunning(f.Client, f.Namespace.Name, labels.SelectorFromSet(labels.Set(theService.Spec.Selector))) } return err, theService } diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 293632a72de..62092e352fb 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "io/ioutil" - "math" "math/rand" "net" "net/http" @@ -46,11 +45,9 @@ import ( "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5" "k8s.io/kubernetes/pkg/api" apierrs "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apis/extensions" - "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/client/restclient" @@ -79,7 +76,7 @@ import ( utilyaml "k8s.io/kubernetes/pkg/util/yaml" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/watch" - testutil "k8s.io/kubernetes/test/utils" + testutils "k8s.io/kubernetes/test/utils" "github.com/blang/semver" "golang.org/x/crypto/ssh" @@ -112,9 +109,6 @@ const ( // How long to wait for a service endpoint to be resolvable. ServiceStartTimeout = 1 * time.Minute - // String used to mark pod deletion - nonExist = "NonExist" - // How often to Poll pods, nodes and claims. Poll = 2 * time.Second @@ -250,110 +244,6 @@ func GetMasterHost() string { return masterUrl.Host } -// Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}. -type PodStore struct { - cache.Store - stopCh chan struct{} - reflector *cache.Reflector -} - -func NewPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *PodStore { - lw := &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - options.LabelSelector = label - options.FieldSelector = field - return c.Pods(namespace).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - options.LabelSelector = label - options.FieldSelector = field - return c.Pods(namespace).Watch(options) - }, - } - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - stopCh := make(chan struct{}) - reflector := cache.NewReflector(lw, &api.Pod{}, store, 0) - reflector.RunUntil(stopCh) - return &PodStore{store, stopCh, reflector} -} - -func (s *PodStore) List() []*api.Pod { - objects := s.Store.List() - pods := make([]*api.Pod, 0) - for _, o := range objects { - pods = append(pods, o.(*api.Pod)) - } - return pods -} - -func (s *PodStore) Stop() { - close(s.stopCh) -} - -type RCConfig struct { - Client *client.Client - Image string - Command []string - Name string - Namespace string - PollInterval time.Duration - Timeout time.Duration - PodStatusFile *os.File - Replicas int - CpuRequest int64 // millicores - CpuLimit int64 // millicores - MemRequest int64 // bytes - MemLimit int64 // bytes - ReadinessProbe *api.Probe - DNSPolicy *api.DNSPolicy - - // Env vars, set the same for every pod. - Env map[string]string - - // Extra labels added to every pod. - Labels map[string]string - - // Node selector for pods in the RC. - NodeSelector map[string]string - - // Ports to declare in the container (map of name to containerPort). - Ports map[string]int - // Ports to declare in the container as host and container ports. - HostPorts map[string]int - - Volumes []api.Volume - VolumeMounts []api.VolumeMount - - // Pointer to a list of pods; if non-nil, will be set to a list of pods - // created by this RC by RunRC. - CreatedPods *[]*api.Pod - - // Maximum allowable container failures. If exceeded, RunRC returns an error. - // Defaults to replicas*0.1 if unspecified. - MaxContainerFailures *int - - // If set to false starting RC will print progress, otherwise only errors will be printed. - Silent bool - - // If set this function will be used to print log lines instead of glog. - LogFunc func(fmt string, args ...interface{}) -} - -func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) { - if rc.LogFunc != nil { - rc.LogFunc(fmt, args...) - } - glog.Infof(fmt, args...) -} - -type DeploymentConfig struct { - RCConfig -} - -type ReplicaSetConfig struct { - RCConfig -} - func nowStamp() string { return time.Now().Format(time.StampMilli) } @@ -470,17 +360,6 @@ var providersWithMasterSSH = []string{"gce", "gke", "kubemark", "aws"} type podCondition func(pod *api.Pod) (bool, error) -// podReady returns whether pod has a condition of Ready with a status of true. -// TODO: should be replaced with api.IsPodReady -func podReady(pod *api.Pod) bool { - for _, cond := range pod.Status.Conditions { - if cond.Type == api.PodReady && cond.Status == api.ConditionTrue { - return true - } - } - return false -} - // logPodStates logs basic info of provided pods for debugging. func logPodStates(pods []api.Pod) { // Find maximum widths for pod, node, and phase strings for column printing. @@ -541,40 +420,6 @@ func errorBadPodsStates(badPods []api.Pod, desiredPods int, ns, desiredState str return errStr + buf.String() } -// PodRunningReady checks whether pod p's phase is running and it has a ready -// condition of status true. -func PodRunningReady(p *api.Pod) (bool, error) { - // Check the phase is running. - if p.Status.Phase != api.PodRunning { - return false, fmt.Errorf("want pod '%s' on '%s' to be '%v' but was '%v'", - p.ObjectMeta.Name, p.Spec.NodeName, api.PodRunning, p.Status.Phase) - } - // Check the ready condition is true. - if !podReady(p) { - return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v", - p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionTrue, p.Status.Conditions) - } - return true, nil -} - -func PodRunningReadyOrSucceeded(p *api.Pod) (bool, error) { - // Check if the phase is succeeded. - if p.Status.Phase == api.PodSucceeded { - return true, nil - } - return PodRunningReady(p) -} - -// PodNotReady checks whether pod p's has a ready condition of status false. -func PodNotReady(p *api.Pod) (bool, error) { - // Check the ready condition is false. - if podReady(p) { - return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v", - p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionFalse, p.Status.Conditions) - } - return true, nil -} - // check if a Pod is controlled by a Replication Controller in the List func hasReplicationControllersForPod(rcs *api.ReplicationControllerList, pod api.Pod) bool { for _, rc := range rcs.Items { @@ -681,7 +526,7 @@ func WaitForPodsRunningReady(c *client.Client, ns string, minPods int32, timeout Logf("%v in state %v, ignoring", pod.Name, pod.Status.Phase) continue } - if res, err := PodRunningReady(&pod); res && err == nil { + if res, err := testutils.PodRunningReady(&pod); res && err == nil { nOk++ if hasReplicationControllersForPod(rcList, pod) { replicaOk++ @@ -751,7 +596,7 @@ func RunKubernetesServiceTestContainer(c *client.Client, ns string) { } }() timeout := 5 * time.Minute - if err := waitForPodCondition(c, ns, p.Name, "clusterapi-tester", timeout, PodRunningReady); err != nil { + if err := waitForPodCondition(c, ns, p.Name, "clusterapi-tester", timeout, testutils.PodRunningReady); err != nil { Logf("Pod %v took longer than %v to enter running/ready: %v", p.Name, timeout, err) return } @@ -788,7 +633,7 @@ func LogFailedContainers(c *client.Client, ns string, logFunc func(ftm string, a } logFunc("Running kubectl logs on non-ready containers in %v", ns) for _, pod := range podList.Items { - if res, err := PodRunningReady(&pod); !res || err != nil { + if res, err := testutils.PodRunningReady(&pod); !res || err != nil { kubectlLogPod(c, pod, "", Logf) } } @@ -911,7 +756,7 @@ func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout tim } Logf("Waiting for pod %[1]s in namespace '%[2]s' status to be '%[3]s'"+ "(found phase: %[4]q, readiness: %[5]t) (%[6]v elapsed)", - podName, ns, desc, pod.Status.Phase, podReady(pod), time.Since(start)) + podName, ns, desc, pod.Status.Phase, testutils.PodReady(pod), time.Since(start)) } return fmt.Errorf("gave up waiting for pod '%s' to be '%s' after %v", podName, desc, timeout) } @@ -2344,471 +2189,25 @@ func (f *Framework) MatchContainerOutput( return nil } -// podInfo contains pod information useful for debugging e2e tests. -type podInfo struct { - oldHostname string - oldPhase string - hostname string - phase string -} - -// PodDiff is a map of pod name to podInfos -type PodDiff map[string]*podInfo - -// Print formats and prints the give PodDiff. -func (p PodDiff) String(ignorePhases sets.String) string { - ret := "" - for name, info := range p { - if ignorePhases.Has(info.phase) { - continue - } - if info.phase == nonExist { - ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname) - continue - } - phaseChange, hostChange := false, false - msg := fmt.Sprintf("Pod %v ", name) - if info.oldPhase != info.phase { - phaseChange = true - if info.oldPhase == nonExist { - msg += fmt.Sprintf("in phase %v ", info.phase) - } else { - msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase) - } - } - if info.oldHostname != info.hostname { - hostChange = true - if info.oldHostname == nonExist || info.oldHostname == "" { - msg += fmt.Sprintf("assigned host %v ", info.hostname) - } else { - msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname) - } - } - if phaseChange || hostChange { - ret += msg + "\n" - } - } - return ret -} - -// Diff computes a PodDiff given 2 lists of pods. -func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff { - podInfoMap := PodDiff{} - - // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist. - for _, pod := range curPods { - podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist} - } - - // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist. - for _, pod := range oldPods { - if info, ok := podInfoMap[pod.Name]; ok { - info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase) - } else { - podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)} - } - } - return podInfoMap -} - -// RunDeployment Launches (and verifies correctness) of a Deployment -// and will wait for all pods it spawns to become "Running". -// It's the caller's responsibility to clean up externally (i.e. use the -// namespace lifecycle for handling Cleanup). -func RunDeployment(config DeploymentConfig) error { - err := config.create() - if err != nil { - return err - } - return config.start() -} - -func (config *DeploymentConfig) create() error { +func RunDeployment(config testutils.DeploymentConfig) error { By(fmt.Sprintf("creating deployment %s in namespace %s", config.Name, config.Namespace)) - deployment := &extensions.Deployment{ - ObjectMeta: api.ObjectMeta{ - Name: config.Name, - }, - Spec: extensions.DeploymentSpec{ - Replicas: int32(config.Replicas), - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{ - "name": config.Name, - }, - }, - Template: api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{"name": config.Name}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: config.Name, - Image: config.Image, - Command: config.Command, - Ports: []api.ContainerPort{{ContainerPort: 80}}, - }, - }, - }, - }, - }, - } - - config.applyTo(&deployment.Spec.Template) - - _, err := config.Client.Deployments(config.Namespace).Create(deployment) - if err != nil { - return fmt.Errorf("Error creating deployment: %v", err) - } - config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, deployment.Spec.Replicas) - return nil + config.NodeDumpFunc = DumpNodeDebugInfo + config.ContainerDumpFunc = LogFailedContainers + return testutils.RunDeployment(config) } -// RunReplicaSet launches (and verifies correctness) of a ReplicaSet -// and waits until all the pods it launches to reach the "Running" state. -// It's the caller's responsibility to clean up externally (i.e. use the -// namespace lifecycle for handling Cleanup). -func RunReplicaSet(config ReplicaSetConfig) error { - err := config.create() - if err != nil { - return err - } - return config.start() -} - -func (config *ReplicaSetConfig) create() error { +func RunReplicaSet(config testutils.ReplicaSetConfig) error { By(fmt.Sprintf("creating replicaset %s in namespace %s", config.Name, config.Namespace)) - rs := &extensions.ReplicaSet{ - ObjectMeta: api.ObjectMeta{ - Name: config.Name, - }, - Spec: extensions.ReplicaSetSpec{ - Replicas: int32(config.Replicas), - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{ - "name": config.Name, - }, - }, - Template: api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{"name": config.Name}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: config.Name, - Image: config.Image, - Command: config.Command, - Ports: []api.ContainerPort{{ContainerPort: 80}}, - }, - }, - }, - }, - }, - } - - config.applyTo(&rs.Spec.Template) - - _, err := config.Client.ReplicaSets(config.Namespace).Create(rs) - if err != nil { - return fmt.Errorf("Error creating replica set: %v", err) - } - config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, rs.Spec.Replicas) - return nil + config.NodeDumpFunc = DumpNodeDebugInfo + config.ContainerDumpFunc = LogFailedContainers + return testutils.RunReplicaSet(config) } -// RunRC Launches (and verifies correctness) of a Replication Controller -// and will wait for all pods it spawns to become "Running". -// It's the caller's responsibility to clean up externally (i.e. use the -// namespace lifecycle for handling Cleanup). -func RunRC(config RCConfig) error { - err := config.create() - if err != nil { - return err - } - return config.start() -} - -func (config *RCConfig) create() error { +func RunRC(config testutils.RCConfig) error { By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace)) - dnsDefault := api.DNSDefault - if config.DNSPolicy == nil { - config.DNSPolicy = &dnsDefault - } - rc := &api.ReplicationController{ - ObjectMeta: api.ObjectMeta{ - Name: config.Name, - }, - Spec: api.ReplicationControllerSpec{ - Replicas: int32(config.Replicas), - Selector: map[string]string{ - "name": config.Name, - }, - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{"name": config.Name}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: config.Name, - Image: config.Image, - Command: config.Command, - Ports: []api.ContainerPort{{ContainerPort: 80}}, - ReadinessProbe: config.ReadinessProbe, - }, - }, - DNSPolicy: *config.DNSPolicy, - NodeSelector: config.NodeSelector, - }, - }, - }, - } - - config.applyTo(rc.Spec.Template) - - _, err := config.Client.ReplicationControllers(config.Namespace).Create(rc) - if err != nil { - return fmt.Errorf("Error creating replication controller: %v", err) - } - config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, rc.Spec.Replicas) - return nil -} - -func (config *RCConfig) applyTo(template *api.PodTemplateSpec) { - if config.Env != nil { - for k, v := range config.Env { - c := &template.Spec.Containers[0] - c.Env = append(c.Env, api.EnvVar{Name: k, Value: v}) - } - } - if config.Labels != nil { - for k, v := range config.Labels { - template.ObjectMeta.Labels[k] = v - } - } - if config.NodeSelector != nil { - template.Spec.NodeSelector = make(map[string]string) - for k, v := range config.NodeSelector { - template.Spec.NodeSelector[k] = v - } - } - if config.Ports != nil { - for k, v := range config.Ports { - c := &template.Spec.Containers[0] - c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v)}) - } - } - if config.HostPorts != nil { - for k, v := range config.HostPorts { - c := &template.Spec.Containers[0] - c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)}) - } - } - if config.CpuLimit > 0 || config.MemLimit > 0 { - template.Spec.Containers[0].Resources.Limits = api.ResourceList{} - } - if config.CpuLimit > 0 { - template.Spec.Containers[0].Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI) - } - if config.MemLimit > 0 { - template.Spec.Containers[0].Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI) - } - if config.CpuRequest > 0 || config.MemRequest > 0 { - template.Spec.Containers[0].Resources.Requests = api.ResourceList{} - } - if config.CpuRequest > 0 { - template.Spec.Containers[0].Resources.Requests[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI) - } - if config.MemRequest > 0 { - template.Spec.Containers[0].Resources.Requests[api.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI) - } - if len(config.Volumes) > 0 { - template.Spec.Volumes = config.Volumes - } - if len(config.VolumeMounts) > 0 { - template.Spec.Containers[0].VolumeMounts = config.VolumeMounts - } -} - -type RCStartupStatus struct { - Expected int - Terminating int - Running int - RunningButNotReady int - Waiting int - Pending int - Unknown int - Inactive int - FailedContainers int - Created []*api.Pod - ContainerRestartNodes sets.String -} - -func (s *RCStartupStatus) String(name string) string { - return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ", - name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady) -} - -func ComputeRCStartupStatus(pods []*api.Pod, expected int) RCStartupStatus { - startupStatus := RCStartupStatus{ - Expected: expected, - Created: make([]*api.Pod, 0, expected), - ContainerRestartNodes: sets.NewString(), - } - for _, p := range pods { - if p.DeletionTimestamp != nil { - startupStatus.Terminating++ - continue - } - startupStatus.Created = append(startupStatus.Created, p) - if p.Status.Phase == api.PodRunning { - ready := false - for _, c := range p.Status.Conditions { - if c.Type == api.PodReady && c.Status == api.ConditionTrue { - ready = true - break - } - } - if ready { - // Only count a pod is running when it is also ready. - startupStatus.Running++ - } else { - startupStatus.RunningButNotReady++ - } - for _, v := range FailedContainers(p) { - startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts - startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName) - } - } else if p.Status.Phase == api.PodPending { - if p.Spec.NodeName == "" { - startupStatus.Waiting++ - } else { - startupStatus.Pending++ - } - } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { - startupStatus.Inactive++ - } else if p.Status.Phase == api.PodUnknown { - startupStatus.Unknown++ - } - } - return startupStatus -} - -func (config *RCConfig) start() error { - // Don't force tests to fail if they don't care about containers restarting. - var maxContainerFailures int - if config.MaxContainerFailures == nil { - maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01)) - } else { - maxContainerFailures = *config.MaxContainerFailures - } - - label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) - - PodStore := NewPodStore(config.Client, config.Namespace, label, fields.Everything()) - defer PodStore.Stop() - - interval := config.PollInterval - if interval <= 0 { - interval = 10 * time.Second - } - timeout := config.Timeout - if timeout <= 0 { - timeout = 5 * time.Minute - } - oldPods := make([]*api.Pod, 0) - oldRunning := 0 - lastChange := time.Now() - for oldRunning != config.Replicas { - time.Sleep(interval) - - pods := PodStore.List() - startupStatus := ComputeRCStartupStatus(pods, config.Replicas) - - pods = startupStatus.Created - if config.CreatedPods != nil { - *config.CreatedPods = pods - } - if !config.Silent { - config.RCConfigLog(startupStatus.String(config.Name)) - } - - if config.PodStatusFile != nil { - fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady) - } - - if startupStatus.FailedContainers > maxContainerFailures { - DumpNodeDebugInfo(config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog) - // Get the logs from the failed containers to help diagnose what caused them to fail - LogFailedContainers(config.Client, config.Namespace, config.RCConfigLog) - return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures) - } - if len(pods) < len(oldPods) || len(pods) > config.Replicas { - // This failure mode includes: - // kubelet is dead, so node controller deleted pods and rc creates more - // - diagnose by noting the pod diff below. - // pod is unhealthy, so replication controller creates another to take its place - // - diagnose by comparing the previous "2 Pod states" lines for inactive pods - errorStr := fmt.Sprintf("Number of reported pods for %s changed: %d vs %d", config.Name, len(pods), len(oldPods)) - config.RCConfigLog("%v, pods that changed since the last iteration:", errorStr) - config.RCConfigLog(Diff(oldPods, pods).String(sets.NewString())) - return fmt.Errorf(errorStr) - } - - if len(pods) > len(oldPods) || startupStatus.Running > oldRunning { - lastChange = time.Now() - } - oldPods = pods - oldRunning = startupStatus.Running - - if time.Since(lastChange) > timeout { - break - } - } - - if oldRunning != config.Replicas { - // List only pods from a given replication controller. - options := api.ListOptions{LabelSelector: label} - if pods, err := config.Client.Pods(api.NamespaceAll).List(options); err == nil { - - for _, pod := range pods.Items { - config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp) - } - } else { - config.RCConfigLog("Can't list pod debug info: %v", err) - } - return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas) - } - return nil -} - -// Simplified version of RunRC, that does not create RC, but creates plain Pods. -// Optionally waits for pods to start running (if waitForRunning == true). -// The number of replicas must be non-zero. -func StartPods(c *client.Client, replicas int, namespace string, podNamePrefix string, - pod api.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error { - // no pod to start - if replicas < 1 { - panic("StartPods: number of replicas must be non-zero") - } - startPodsID := string(uuid.NewUUID()) // So that we can label and find them - for i := 0; i < replicas; i++ { - podName := fmt.Sprintf("%v-%v", podNamePrefix, i) - pod.ObjectMeta.Name = podName - pod.ObjectMeta.Labels["name"] = podName - pod.ObjectMeta.Labels["startPodsID"] = startPodsID - pod.Spec.Containers[0].Name = podName - _, err := c.Pods(namespace).Create(&pod) - return err - } - logFunc("Waiting for running...") - if waitForRunning { - label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID})) - err := WaitForPodsWithLabelRunning(c, namespace, label) - return fmt.Errorf("Error waiting for %d pods to be running - probably a timeout: %v", replicas, err) - } - return nil + config.NodeDumpFunc = DumpNodeDebugInfo + config.ContainerDumpFunc = LogFailedContainers + return testutils.RunRC(config) } type EventsLister func(opts v1.ListOptions, ns string) (*v1.EventList, error) @@ -3020,7 +2419,7 @@ func WaitForAllNodesSchedulable(c *client.Client) error { } func AddOrUpdateLabelOnNode(c clientset.Interface, nodeName string, labelKey, labelValue string) { - ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue})) + ExpectNoError(testutils.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue})) } func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string, labelValue string) { @@ -3034,10 +2433,10 @@ func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string, // won't fail if target label doesn't exist or has been removed. func RemoveLabelOffNode(c clientset.Interface, nodeName string, labelKey string) { By("removing the label " + labelKey + " off the node " + nodeName) - ExpectNoError(testutil.RemoveLabelOffNode(c, nodeName, []string{labelKey})) + ExpectNoError(testutils.RemoveLabelOffNode(c, nodeName, []string{labelKey})) By("verifying the node doesn't have the label " + labelKey) - ExpectNoError(testutil.VerifyLabelsRemoved(c, nodeName, []string{labelKey})) + ExpectNoError(testutils.VerifyLabelsRemoved(c, nodeName, []string{labelKey})) } func AddOrUpdateTaintOnNode(c *client.Client, nodeName string, taint api.Taint) { @@ -3195,42 +2594,16 @@ func WaitForRCPodsRunning(c *client.Client, ns, rcName string) error { return err } selector := labels.SelectorFromSet(labels.Set(rc.Spec.Selector)) - err = WaitForPodsWithLabelRunning(c, ns, selector) + err = testutils.WaitForPodsWithLabelRunning(c, ns, selector) if err != nil { return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", rcName, err) } return nil } -// Wait up to 10 minutes for all matching pods to become Running and at least one -// matching pod exists. -func WaitForPodsWithLabelRunning(c *client.Client, ns string, label labels.Selector) error { - running := false - PodStore := NewPodStore(c, ns, label, fields.Everything()) - defer PodStore.Stop() -waitLoop: - for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) { - pods := PodStore.List() - if len(pods) == 0 { - continue waitLoop - } - for _, p := range pods { - if p.Status.Phase != api.PodRunning { - continue waitLoop - } - } - running = true - break - } - if !running { - return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String()) - } - return nil -} - // Returns true if all the specified pods are scheduled, else returns false. func podsWithLabelScheduled(c *client.Client, ns string, label labels.Selector) (bool, error) { - PodStore := NewPodStore(c, ns, label, fields.Everything()) + PodStore := testutils.NewPodStore(c, ns, label, fields.Everything()) defer PodStore.Stop() pods := PodStore.List() if len(pods) == 0 { @@ -3389,11 +2762,11 @@ func DeleteRCAndWaitForGC(c *client.Client, ns, name string) error { // podStoreForRC creates a PodStore that monitors pods belong to the rc. It // waits until the reflector does a List() before returning. -func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*PodStore, error) { +func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*testutils.PodStore, error) { labels := labels.SelectorFromSet(rc.Spec.Selector) - ps := NewPodStore(c, rc.Namespace, labels, fields.Everything()) + ps := testutils.NewPodStore(c, rc.Namespace, labels, fields.Everything()) err := wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) { - if len(ps.reflector.LastSyncResourceVersion()) != 0 { + if len(ps.Reflector.LastSyncResourceVersion()) != 0 { return true, nil } return false, nil @@ -3405,7 +2778,7 @@ func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*PodStore, // This is to make a fair comparison of deletion time between DeleteRCAndPods // and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas // when the pod is inactvie. -func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error { +func waitForPodsInactive(ps *testutils.PodStore, interval, timeout time.Duration) error { return wait.PollImmediate(interval, timeout, func() (bool, error) { pods := ps.List() for _, pod := range pods { @@ -3418,7 +2791,7 @@ func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error { } // waitForPodsGone waits until there are no pods left in the PodStore. -func waitForPodsGone(ps *PodStore, interval, timeout time.Duration) error { +func waitForPodsGone(ps *testutils.PodStore, interval, timeout time.Duration) error { return wait.PollImmediate(interval, timeout, func() (bool, error) { if pods := ps.List(); len(pods) == 0 { return true, nil @@ -3839,38 +3212,6 @@ func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, return deployment, err } -// FailedContainers inspects all containers in a pod and returns failure -// information for containers that have failed or been restarted. -// A map is returned where the key is the containerID and the value is a -// struct containing the restart and failure information -func FailedContainers(pod *api.Pod) map[string]ContainerFailures { - var state ContainerFailures - states := make(map[string]ContainerFailures) - - statuses := pod.Status.ContainerStatuses - if len(statuses) == 0 { - return nil - } else { - for _, status := range statuses { - if status.State.Terminated != nil { - states[status.ContainerID] = ContainerFailures{status: status.State.Terminated} - } else if status.LastTerminationState.Terminated != nil { - states[status.ContainerID] = ContainerFailures{status: status.LastTerminationState.Terminated} - } - if status.RestartCount > 0 { - var ok bool - if state, ok = states[status.ContainerID]; !ok { - state = ContainerFailures{} - } - state.Restarts = int(status.RestartCount) - states[status.ContainerID] = state - } - } - } - - return states -} - // Prints the histogram of the events and returns the number of bad events. func BadEvents(events []*api.Event) int { type histogramKey struct { @@ -4091,14 +3432,14 @@ func GetSigner(provider string) (ssh.Signer, error) { // podNames in namespace ns are running and ready, using c and waiting at most // timeout. func CheckPodsRunningReady(c *client.Client, ns string, podNames []string, timeout time.Duration) bool { - return CheckPodsCondition(c, ns, podNames, timeout, PodRunningReady, "running and ready") + return CheckPodsCondition(c, ns, podNames, timeout, testutils.PodRunningReady, "running and ready") } // CheckPodsRunningReadyOrSucceeded returns whether all pods whose names are // listed in podNames in namespace ns are running and ready, or succeeded; use // c and waiting at most timeout. func CheckPodsRunningReadyOrSucceeded(c *client.Client, ns string, podNames []string, timeout time.Duration) bool { - return CheckPodsCondition(c, ns, podNames, timeout, PodRunningReadyOrSucceeded, "running and ready, or succeeded") + return CheckPodsCondition(c, ns, podNames, timeout, testutils.PodRunningReadyOrSucceeded, "running and ready, or succeeded") } // CheckPodsCondition returns whether all pods whose names are listed in podNames @@ -4698,7 +4039,7 @@ func ScaleRCByLabels(client *client.Client, clientset clientset.Interface, ns st return fmt.Errorf("error while waiting for pods gone %s: %v", name, err) } } else { - if err := WaitForPodsWithLabelRunning( + if err := testutils.WaitForPodsWithLabelRunning( client, ns, labels.SelectorFromSet(labels.Set(rc.Spec.Selector))); err != nil { return err } diff --git a/test/e2e/ingress_utils.go b/test/e2e/ingress_utils.go index 5d08a93f8ae..1da02baf905 100644 --- a/test/e2e/ingress_utils.go +++ b/test/e2e/ingress_utils.go @@ -53,6 +53,7 @@ import ( "k8s.io/kubernetes/pkg/util/wait" utilyaml "k8s.io/kubernetes/pkg/util/yaml" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -859,7 +860,7 @@ func (cont *NginxIngressController) init() { framework.Logf("waiting for pods with label %v", rc.Spec.Selector) sel := labels.SelectorFromSet(labels.Set(rc.Spec.Selector)) - ExpectNoError(framework.WaitForPodsWithLabelRunning(cont.c, cont.ns, sel)) + ExpectNoError(testutils.WaitForPodsWithLabelRunning(cont.c, cont.ns, sel)) pods, err := cont.c.Pods(cont.ns).List(api.ListOptions{LabelSelector: sel}) ExpectNoError(err) if len(pods.Items) == 0 { diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index 232c2014847..89bd934f80e 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -1172,7 +1173,7 @@ var _ = framework.KubeDescribe("Kubectl client", func() { By("verifying the pod " + podName + " is running") label := labels.SelectorFromSet(labels.Set(map[string]string{"run": podName})) - err := framework.WaitForPodsWithLabelRunning(c, ns, label) + err := testutils.WaitForPodsWithLabelRunning(c, ns, label) if err != nil { framework.Failf("Failed getting pod %s: %v", podName, err) } @@ -1519,7 +1520,7 @@ func curl(url string) (string, error) { func validateGuestbookApp(c *client.Client, ns string) { framework.Logf("Waiting for all frontend pods to be Running.") label := labels.SelectorFromSet(labels.Set(map[string]string{"tier": "frontend", "app": "guestbook"})) - err := framework.WaitForPodsWithLabelRunning(c, ns, label) + err := testutils.WaitForPodsWithLabelRunning(c, ns, label) Expect(err).NotTo(HaveOccurred()) framework.Logf("Waiting for frontend to serve content.") if !waitForGuestbookResponse(c, "get", "", `{"data": ""}`, guestbookStartupTimeout, ns) { diff --git a/test/e2e/kubelet.go b/test/e2e/kubelet.go index dc9ee7cb0ab..75dd5044afa 100644 --- a/test/e2e/kubelet.go +++ b/test/e2e/kubelet.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -186,7 +187,7 @@ var _ = framework.KubeDescribe("kubelet", func() { By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods)) rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(uuid.NewUUID())) - Expect(framework.RunRC(framework.RCConfig{ + Expect(framework.RunRC(testutils.RCConfig{ Client: f.Client, Name: rcName, Namespace: f.Namespace.Name, diff --git a/test/e2e/kubelet_perf.go b/test/e2e/kubelet_perf.go index d7cb3cf340a..151bc1eb027 100644 --- a/test/e2e/kubelet_perf.go +++ b/test/e2e/kubelet_perf.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -68,7 +69,7 @@ func runResourceTrackingTest(f *framework.Framework, podsPerNode int, nodeNames rcName := fmt.Sprintf("resource%d-%s", totalPods, string(uuid.NewUUID())) // TODO: Use a more realistic workload - Expect(framework.RunRC(framework.RCConfig{ + Expect(framework.RunRC(testutils.RCConfig{ Client: f.Client, Name: rcName, Namespace: f.Namespace.Name, diff --git a/test/e2e/load.go b/test/e2e/load.go index cb588562f01..779764da487 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/util/intstr" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -66,7 +67,7 @@ var _ = framework.KubeDescribe("Load capacity", func() { var c *client.Client var nodeCount int var ns string - var configs []*framework.RCConfig + var configs []*testutils.RCConfig var namespaces []*api.Namespace // Gathers metrics before teardown @@ -283,8 +284,8 @@ func computeRCCounts(total int) (int, int, int) { return smallRCCount, mediumRCCount, bigRCCount } -func generateRCConfigs(totalPods int, image string, command []string, nss []*api.Namespace) []*framework.RCConfig { - configs := make([]*framework.RCConfig, 0) +func generateRCConfigs(totalPods int, image string, command []string, nss []*api.Namespace) []*testutils.RCConfig { + configs := make([]*testutils.RCConfig, 0) smallRCCount, mediumRCCount, bigRCCount := computeRCCounts(totalPods) configs = append(configs, generateRCConfigsForGroup(nss, smallRCGroupName, smallRCSize, smallRCCount, image, command)...) @@ -305,10 +306,10 @@ func generateRCConfigs(totalPods int, image string, command []string, nss []*api } func generateRCConfigsForGroup( - nss []*api.Namespace, groupName string, size, count int, image string, command []string) []*framework.RCConfig { - configs := make([]*framework.RCConfig, 0, count) + nss []*api.Namespace, groupName string, size, count int, image string, command []string) []*testutils.RCConfig { + configs := make([]*testutils.RCConfig, 0, count) for i := 1; i <= count; i++ { - config := &framework.RCConfig{ + config := &testutils.RCConfig{ Client: nil, // this will be overwritten later Name: groupName + "-" + strconv.Itoa(i), Namespace: nss[i%len(nss)].Name, @@ -324,7 +325,7 @@ func generateRCConfigsForGroup( return configs } -func generateServicesForConfigs(configs []*framework.RCConfig) []*api.Service { +func generateServicesForConfigs(configs []*testutils.RCConfig) []*api.Service { services := make([]*api.Service, 0, len(configs)) for _, config := range configs { serviceName := config.Name + "-svc" @@ -351,7 +352,7 @@ func sleepUpTo(d time.Duration) { time.Sleep(time.Duration(rand.Int63n(d.Nanoseconds()))) } -func createAllRC(configs []*framework.RCConfig, creatingTime time.Duration) { +func createAllRC(configs []*testutils.RCConfig, creatingTime time.Duration) { var wg sync.WaitGroup wg.Add(len(configs)) for _, config := range configs { @@ -360,7 +361,7 @@ func createAllRC(configs []*framework.RCConfig, creatingTime time.Duration) { wg.Wait() } -func createRC(wg *sync.WaitGroup, config *framework.RCConfig, creatingTime time.Duration) { +func createRC(wg *sync.WaitGroup, config *testutils.RCConfig, creatingTime time.Duration) { defer GinkgoRecover() defer wg.Done() @@ -368,7 +369,7 @@ func createRC(wg *sync.WaitGroup, config *framework.RCConfig, creatingTime time. framework.ExpectNoError(framework.RunRC(*config), fmt.Sprintf("creating rc %s", config.Name)) } -func scaleAllRC(configs []*framework.RCConfig, scalingTime time.Duration) { +func scaleAllRC(configs []*testutils.RCConfig, scalingTime time.Duration) { var wg sync.WaitGroup wg.Add(len(configs)) for _, config := range configs { @@ -379,7 +380,7 @@ func scaleAllRC(configs []*framework.RCConfig, scalingTime time.Duration) { // Scales RC to a random size within [0.5*size, 1.5*size] and lists all the pods afterwards. // Scaling happens always based on original size, not the current size. -func scaleRC(wg *sync.WaitGroup, config *framework.RCConfig, scalingTime time.Duration) { +func scaleRC(wg *sync.WaitGroup, config *testutils.RCConfig, scalingTime time.Duration) { defer GinkgoRecover() defer wg.Done() @@ -396,7 +397,7 @@ func scaleRC(wg *sync.WaitGroup, config *framework.RCConfig, scalingTime time.Du framework.ExpectNoError(err, fmt.Sprintf("listing pods from rc %v", config.Name)) } -func deleteAllRC(configs []*framework.RCConfig, deletingTime time.Duration) { +func deleteAllRC(configs []*testutils.RCConfig, deletingTime time.Duration) { var wg sync.WaitGroup wg.Add(len(configs)) for _, config := range configs { @@ -405,7 +406,7 @@ func deleteAllRC(configs []*framework.RCConfig, deletingTime time.Duration) { wg.Wait() } -func deleteRC(wg *sync.WaitGroup, config *framework.RCConfig, deletingTime time.Duration) { +func deleteRC(wg *sync.WaitGroup, config *testutils.RCConfig, deletingTime time.Duration) { defer GinkgoRecover() defer wg.Done() diff --git a/test/e2e/proxy.go b/test/e2e/proxy.go index d6f09060ba8..7929096d99d 100644 --- a/test/e2e/proxy.go +++ b/test/e2e/proxy.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -116,7 +117,7 @@ func proxyContext(version string) { // environmental variables below. By("starting an echo server on multiple ports") pods := []*api.Pod{} - cfg := framework.RCConfig{ + cfg := testutils.RCConfig{ Client: f.Client, Image: "gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab", Name: service.Name, diff --git a/test/e2e/reboot.go b/test/e2e/reboot.go index 26b5bbdefd4..32758e7ae76 100644 --- a/test/e2e/reboot.go +++ b/test/e2e/reboot.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -179,7 +180,7 @@ func printStatusAndLogsForNotReadyPods(c *client.Client, ns string, podNames []s if !podNameSet.Has(p.Name) { continue } - if ok, _ := framework.PodRunningReady(p); ok { + if ok, _ := testutils.PodRunningReady(p); ok { continue } framework.Logf("Status for not ready pod %s/%s: %+v", p.Namespace, p.Name, p.Status) @@ -209,7 +210,7 @@ func printStatusAndLogsForNotReadyPods(c *client.Client, ns string, podNames []s func rebootNode(c *client.Client, provider, name, rebootCmd string) bool { // Setup ns := api.NamespaceSystem - ps := framework.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name)) + ps := testutils.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name)) defer ps.Stop() // Get the node initially. diff --git a/test/e2e/rescheduler.go b/test/e2e/rescheduler.go index 5ea934a21a1..cbd294bffdc 100644 --- a/test/e2e/rescheduler.go +++ b/test/e2e/rescheduler.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -103,6 +104,6 @@ func podRunningOrUnschedulable(pod *api.Pod) bool { if cond != nil && cond.Status == api.ConditionFalse && cond.Reason == "Unschedulable" { return true } - running, _ := framework.PodRunningReady(pod) + running, _ := testutils.PodRunningReady(pod) return running } diff --git a/test/e2e/resize_nodes.go b/test/e2e/resize_nodes.go index a07054681db..0bae5a4164a 100644 --- a/test/e2e/resize_nodes.go +++ b/test/e2e/resize_nodes.go @@ -40,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" + testutils "k8s.io/kubernetes/test/utils" ) const ( @@ -571,7 +572,7 @@ var _ = framework.KubeDescribe("Nodes [Disruptive]", func() { } node := nodes.Items[0] podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name)} - if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, framework.PodRunningReady); err != nil { + if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil { framework.Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err) } @@ -623,7 +624,7 @@ var _ = framework.KubeDescribe("Nodes [Disruptive]", func() { By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers") expectNodeReadiness(true, newNode) - if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, framework.PodRunningReady); err != nil { + if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil { framework.Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err) } }() @@ -632,7 +633,7 @@ var _ = framework.KubeDescribe("Nodes [Disruptive]", func() { By("Expect to observe node and pod status change from Ready to NotReady after network partition") expectNodeReadiness(false, newNode) - if err = framework.WaitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, framework.PodNotReady); err != nil { + if err = framework.WaitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, testutils.PodNotReady); err != nil { framework.Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err) } }) diff --git a/test/e2e/restart.go b/test/e2e/restart.go index 32f917738ec..23292c15f26 100644 --- a/test/e2e/restart.go +++ b/test/e2e/restart.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -32,14 +33,14 @@ import ( var _ = framework.KubeDescribe("Restart [Disruptive]", func() { f := framework.NewDefaultFramework("restart") - var ps *framework.PodStore + var ps *testutils.PodStore BeforeEach(func() { // This test requires the ability to restart all nodes, so the provider // check must be identical to that call. framework.SkipUnlessProviderIs("gce", "gke") - ps = framework.NewPodStore(f.Client, api.NamespaceSystem, labels.Everything(), fields.Everything()) + ps = testutils.NewPodStore(f.Client, api.NamespaceSystem, labels.Everything(), fields.Everything()) }) AfterEach(func() { @@ -100,7 +101,7 @@ var _ = framework.KubeDescribe("Restart [Disruptive]", func() { // waitForNPods tries to list pods using c until it finds expect of them, // returning their names if it can do so before timeout. -func waitForNPods(ps *framework.PodStore, expect int, timeout time.Duration) ([]string, error) { +func waitForNPods(ps *testutils.PodStore, expect int, timeout time.Duration) ([]string, error) { // Loop until we find expect pods or timeout is passed. var pods []*api.Pod var errLast error diff --git a/test/e2e/scheduler_predicates.go b/test/e2e/scheduler_predicates.go index 3c23fecbb73..53b89a1af7c 100644 --- a/test/e2e/scheduler_predicates.go +++ b/test/e2e/scheduler_predicates.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -126,7 +127,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { // and there is no need to create additional pods. // StartPods requires at least one pod to replicate. if podsNeededForSaturation > 0 { - framework.ExpectNoError(framework.StartPods(c, podsNeededForSaturation, ns, "maxp", + framework.ExpectNoError(testutils.StartPods(c, podsNeededForSaturation, ns, "maxp", *initPausePod(f, pausePodConfig{ Name: "", Labels: map[string]string{"name": ""}, @@ -187,7 +188,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { // and there is no need to create additional pods. // StartPods requires at least one pod to replicate. if podsNeededForSaturation > 0 { - framework.ExpectNoError(framework.StartPods(c, podsNeededForSaturation, ns, "overcommit", + framework.ExpectNoError(testutils.StartPods(c, podsNeededForSaturation, ns, "overcommit", *initPausePod(f, pausePodConfig{ Name: "", Labels: map[string]string{"name": ""}, diff --git a/test/e2e/service.go b/test/e2e/service.go index 281a79d6419..dacd437ab19 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" ) const ( @@ -1604,7 +1605,7 @@ func startServeHostnameService(c *client.Client, ns, name string, port, replicas var createdPods []*api.Pod maxContainerFailures := 0 - config := framework.RCConfig{ + config := testutils.RCConfig{ Client: c, Image: "gcr.io/google_containers/serve_hostname:v1.4", Name: name, diff --git a/test/e2e/service_latency.go b/test/e2e/service_latency.go index 2203fda9d56..db7d2dba0c6 100644 --- a/test/e2e/service_latency.go +++ b/test/e2e/service_latency.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" . "github.com/onsi/ginkgo" ) @@ -115,7 +116,7 @@ var _ = framework.KubeDescribe("Service endpoints latency", func() { }) func runServiceLatencies(f *framework.Framework, inParallel, total int) (output []time.Duration, err error) { - cfg := framework.RCConfig{ + cfg := testutils.RCConfig{ Client: f.Client, Image: framework.GetPauseImageName(f.Client), Name: "svc-latency-rc", diff --git a/test/e2e/ubernetes_lite.go b/test/e2e/ubernetes_lite.go index 37c34930e42..f027a85d667 100644 --- a/test/e2e/ubernetes_lite.go +++ b/test/e2e/ubernetes_lite.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" ) var _ = framework.KubeDescribe("Multi-AZ Clusters", func() { @@ -98,7 +99,7 @@ func SpreadServiceOrFail(f *framework.Framework, replicaCount int, image string) // Based on the callers, replicas is always positive number: zoneCount >= 0 implies (2*zoneCount)+1 > 0. // Thus, no need to test for it. Once the precondition changes to zero number of replicas, // test for replicaCount > 0. Otherwise, StartPods panics. - framework.ExpectNoError(framework.StartPods(f.Client, replicaCount, f.Namespace.Name, serviceName, *podSpec, false, framework.Logf)) + framework.ExpectNoError(testutils.StartPods(f.Client, replicaCount, f.Namespace.Name, serviceName, *podSpec, false, framework.Logf)) // Wait for all of them to be scheduled selector := labels.SelectorFromSet(labels.Set(map[string]string{"service": serviceName})) diff --git a/test/utils/conditions.go b/test/utils/conditions.go new file mode 100644 index 00000000000..04f04b2db70 --- /dev/null +++ b/test/utils/conditions.go @@ -0,0 +1,104 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + "k8s.io/kubernetes/pkg/api" +) + +type ContainerFailures struct { + status *api.ContainerStateTerminated + Restarts int +} + +// PodRunningReady checks whether pod p's phase is running and it has a ready +// condition of status true. +func PodRunningReady(p *api.Pod) (bool, error) { + // Check the phase is running. + if p.Status.Phase != api.PodRunning { + return false, fmt.Errorf("want pod '%s' on '%s' to be '%v' but was '%v'", + p.ObjectMeta.Name, p.Spec.NodeName, api.PodRunning, p.Status.Phase) + } + // Check the ready condition is true. + if !PodReady(p) { + return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v", + p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionTrue, p.Status.Conditions) + } + return true, nil +} + +func PodRunningReadyOrSucceeded(p *api.Pod) (bool, error) { + // Check if the phase is succeeded. + if p.Status.Phase == api.PodSucceeded { + return true, nil + } + return PodRunningReady(p) +} + +// FailedContainers inspects all containers in a pod and returns failure +// information for containers that have failed or been restarted. +// A map is returned where the key is the containerID and the value is a +// struct containing the restart and failure information +func FailedContainers(pod *api.Pod) map[string]ContainerFailures { + var state ContainerFailures + states := make(map[string]ContainerFailures) + + statuses := pod.Status.ContainerStatuses + if len(statuses) == 0 { + return nil + } else { + for _, status := range statuses { + if status.State.Terminated != nil { + states[status.ContainerID] = ContainerFailures{status: status.State.Terminated} + } else if status.LastTerminationState.Terminated != nil { + states[status.ContainerID] = ContainerFailures{status: status.LastTerminationState.Terminated} + } + if status.RestartCount > 0 { + var ok bool + if state, ok = states[status.ContainerID]; !ok { + state = ContainerFailures{} + } + state.Restarts = int(status.RestartCount) + states[status.ContainerID] = state + } + } + } + + return states +} + +// PodNotReady checks whether pod p's has a ready condition of status false. +func PodNotReady(p *api.Pod) (bool, error) { + // Check the ready condition is false. + if PodReady(p) { + return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v", + p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionFalse, p.Status.Conditions) + } + return true, nil +} + +// podReady returns whether pod has a condition of Ready with a status of true. +// TODO: should be replaced with api.IsPodReady +func PodReady(pod *api.Pod) bool { + for _, cond := range pod.Status.Conditions { + if cond.Type == api.PodReady && cond.Status == api.ConditionTrue { + return true + } + } + return false +} diff --git a/test/utils/pod_store.go b/test/utils/pod_store.go new file mode 100644 index 00000000000..e6e044f4fe8 --- /dev/null +++ b/test/utils/pod_store.go @@ -0,0 +1,67 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}. +type PodStore struct { + cache.Store + stopCh chan struct{} + Reflector *cache.Reflector +} + +func NewPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *PodStore { + lw := &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + options.LabelSelector = label + options.FieldSelector = field + return c.Pods(namespace).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + options.LabelSelector = label + options.FieldSelector = field + return c.Pods(namespace).Watch(options) + }, + } + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + stopCh := make(chan struct{}) + reflector := cache.NewReflector(lw, &api.Pod{}, store, 0) + reflector.RunUntil(stopCh) + return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector} +} + +func (s *PodStore) List() []*api.Pod { + objects := s.Store.List() + pods := make([]*api.Pod, 0) + for _, o := range objects { + pods = append(pods, o.(*api.Pod)) + } + return pods +} + +func (s *PodStore) Stop() { + close(s.stopCh) +} diff --git a/test/utils/runners.go b/test/utils/runners.go new file mode 100644 index 00000000000..160969938a2 --- /dev/null +++ b/test/utils/runners.go @@ -0,0 +1,603 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + "math" + "os" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/uuid" + + "github.com/golang/glog" +) + +const ( + // String used to mark pod deletion + nonExist = "NonExist" +) + +type RCConfig struct { + Client *client.Client + Image string + Command []string + Name string + Namespace string + PollInterval time.Duration + Timeout time.Duration + PodStatusFile *os.File + Replicas int + CpuRequest int64 // millicores + CpuLimit int64 // millicores + MemRequest int64 // bytes + MemLimit int64 // bytes + ReadinessProbe *api.Probe + DNSPolicy *api.DNSPolicy + + // Env vars, set the same for every pod. + Env map[string]string + + // Extra labels added to every pod. + Labels map[string]string + + // Node selector for pods in the RC. + NodeSelector map[string]string + + // Ports to declare in the container (map of name to containerPort). + Ports map[string]int + // Ports to declare in the container as host and container ports. + HostPorts map[string]int + + Volumes []api.Volume + VolumeMounts []api.VolumeMount + + // Pointer to a list of pods; if non-nil, will be set to a list of pods + // created by this RC by RunRC. + CreatedPods *[]*api.Pod + + // Maximum allowable container failures. If exceeded, RunRC returns an error. + // Defaults to replicas*0.1 if unspecified. + MaxContainerFailures *int + + // If set to false starting RC will print progress, otherwise only errors will be printed. + Silent bool + + // If set this function will be used to print log lines instead of glog. + LogFunc func(fmt string, args ...interface{}) + // If set those functions will be used to gather data from Nodes - in integration tests where no + // kubelets are running those variables should be nil. + NodeDumpFunc func(c *client.Client, nodeNames []string, logFunc func(fmt string, args ...interface{})) + ContainerDumpFunc func(c *client.Client, ns string, logFunc func(ftm string, args ...interface{})) +} + +func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) { + if rc.LogFunc != nil { + rc.LogFunc(fmt, args...) + } + glog.Infof(fmt, args...) +} + +type DeploymentConfig struct { + RCConfig +} + +type ReplicaSetConfig struct { + RCConfig +} + +// podInfo contains pod information useful for debugging e2e tests. +type podInfo struct { + oldHostname string + oldPhase string + hostname string + phase string +} + +// PodDiff is a map of pod name to podInfos +type PodDiff map[string]*podInfo + +// Print formats and prints the give PodDiff. +func (p PodDiff) String(ignorePhases sets.String) string { + ret := "" + for name, info := range p { + if ignorePhases.Has(info.phase) { + continue + } + if info.phase == nonExist { + ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname) + continue + } + phaseChange, hostChange := false, false + msg := fmt.Sprintf("Pod %v ", name) + if info.oldPhase != info.phase { + phaseChange = true + if info.oldPhase == nonExist { + msg += fmt.Sprintf("in phase %v ", info.phase) + } else { + msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase) + } + } + if info.oldHostname != info.hostname { + hostChange = true + if info.oldHostname == nonExist || info.oldHostname == "" { + msg += fmt.Sprintf("assigned host %v ", info.hostname) + } else { + msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname) + } + } + if phaseChange || hostChange { + ret += msg + "\n" + } + } + return ret +} + +// Diff computes a PodDiff given 2 lists of pods. +func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff { + podInfoMap := PodDiff{} + + // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist. + for _, pod := range curPods { + podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist} + } + + // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist. + for _, pod := range oldPods { + if info, ok := podInfoMap[pod.Name]; ok { + info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase) + } else { + podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)} + } + } + return podInfoMap +} + +// RunDeployment Launches (and verifies correctness) of a Deployment +// and will wait for all pods it spawns to become "Running". +// It's the caller's responsibility to clean up externally (i.e. use the +// namespace lifecycle for handling Cleanup). +func RunDeployment(config DeploymentConfig) error { + err := config.create() + if err != nil { + return err + } + return config.start() +} + +func (config *DeploymentConfig) create() error { + deployment := &extensions.Deployment{ + ObjectMeta: api.ObjectMeta{ + Name: config.Name, + }, + Spec: extensions.DeploymentSpec{ + Replicas: int32(config.Replicas), + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{ + "name": config.Name, + }, + }, + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{"name": config.Name}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: config.Name, + Image: config.Image, + Command: config.Command, + Ports: []api.ContainerPort{{ContainerPort: 80}}, + }, + }, + }, + }, + }, + } + + config.applyTo(&deployment.Spec.Template) + + _, err := config.Client.Deployments(config.Namespace).Create(deployment) + if err != nil { + return fmt.Errorf("Error creating deployment: %v", err) + } + config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, deployment.Spec.Replicas) + return nil +} + +// RunReplicaSet launches (and verifies correctness) of a ReplicaSet +// and waits until all the pods it launches to reach the "Running" state. +// It's the caller's responsibility to clean up externally (i.e. use the +// namespace lifecycle for handling Cleanup). +func RunReplicaSet(config ReplicaSetConfig) error { + err := config.create() + if err != nil { + return err + } + return config.start() +} + +func (config *ReplicaSetConfig) create() error { + rs := &extensions.ReplicaSet{ + ObjectMeta: api.ObjectMeta{ + Name: config.Name, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: int32(config.Replicas), + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{ + "name": config.Name, + }, + }, + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{"name": config.Name}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: config.Name, + Image: config.Image, + Command: config.Command, + Ports: []api.ContainerPort{{ContainerPort: 80}}, + }, + }, + }, + }, + }, + } + + config.applyTo(&rs.Spec.Template) + + _, err := config.Client.ReplicaSets(config.Namespace).Create(rs) + if err != nil { + return fmt.Errorf("Error creating replica set: %v", err) + } + config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, rs.Spec.Replicas) + return nil +} + +// RunRC Launches (and verifies correctness) of a Replication Controller +// and will wait for all pods it spawns to become "Running". +// It's the caller's responsibility to clean up externally (i.e. use the +// namespace lifecycle for handling Cleanup). +func RunRC(config RCConfig) error { + err := config.create() + if err != nil { + return err + } + return config.start() +} + +func (config *RCConfig) create() error { + dnsDefault := api.DNSDefault + if config.DNSPolicy == nil { + config.DNSPolicy = &dnsDefault + } + rc := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: config.Name, + }, + Spec: api.ReplicationControllerSpec{ + Replicas: int32(config.Replicas), + Selector: map[string]string{ + "name": config.Name, + }, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{"name": config.Name}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: config.Name, + Image: config.Image, + Command: config.Command, + Ports: []api.ContainerPort{{ContainerPort: 80}}, + ReadinessProbe: config.ReadinessProbe, + }, + }, + DNSPolicy: *config.DNSPolicy, + NodeSelector: config.NodeSelector, + }, + }, + }, + } + + config.applyTo(rc.Spec.Template) + + _, err := config.Client.ReplicationControllers(config.Namespace).Create(rc) + if err != nil { + return fmt.Errorf("Error creating replication controller: %v", err) + } + config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, rc.Spec.Replicas) + return nil +} + +func (config *RCConfig) applyTo(template *api.PodTemplateSpec) { + if config.Env != nil { + for k, v := range config.Env { + c := &template.Spec.Containers[0] + c.Env = append(c.Env, api.EnvVar{Name: k, Value: v}) + } + } + if config.Labels != nil { + for k, v := range config.Labels { + template.ObjectMeta.Labels[k] = v + } + } + if config.NodeSelector != nil { + template.Spec.NodeSelector = make(map[string]string) + for k, v := range config.NodeSelector { + template.Spec.NodeSelector[k] = v + } + } + if config.Ports != nil { + for k, v := range config.Ports { + c := &template.Spec.Containers[0] + c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v)}) + } + } + if config.HostPorts != nil { + for k, v := range config.HostPorts { + c := &template.Spec.Containers[0] + c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)}) + } + } + if config.CpuLimit > 0 || config.MemLimit > 0 { + template.Spec.Containers[0].Resources.Limits = api.ResourceList{} + } + if config.CpuLimit > 0 { + template.Spec.Containers[0].Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI) + } + if config.MemLimit > 0 { + template.Spec.Containers[0].Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI) + } + if config.CpuRequest > 0 || config.MemRequest > 0 { + template.Spec.Containers[0].Resources.Requests = api.ResourceList{} + } + if config.CpuRequest > 0 { + template.Spec.Containers[0].Resources.Requests[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI) + } + if config.MemRequest > 0 { + template.Spec.Containers[0].Resources.Requests[api.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI) + } + if len(config.Volumes) > 0 { + template.Spec.Volumes = config.Volumes + } + if len(config.VolumeMounts) > 0 { + template.Spec.Containers[0].VolumeMounts = config.VolumeMounts + } +} + +type RCStartupStatus struct { + Expected int + Terminating int + Running int + RunningButNotReady int + Waiting int + Pending int + Unknown int + Inactive int + FailedContainers int + Created []*api.Pod + ContainerRestartNodes sets.String +} + +func (s *RCStartupStatus) String(name string) string { + return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ", + name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady) +} + +func ComputeRCStartupStatus(pods []*api.Pod, expected int) RCStartupStatus { + startupStatus := RCStartupStatus{ + Expected: expected, + Created: make([]*api.Pod, 0, expected), + ContainerRestartNodes: sets.NewString(), + } + for _, p := range pods { + if p.DeletionTimestamp != nil { + startupStatus.Terminating++ + continue + } + startupStatus.Created = append(startupStatus.Created, p) + if p.Status.Phase == api.PodRunning { + ready := false + for _, c := range p.Status.Conditions { + if c.Type == api.PodReady && c.Status == api.ConditionTrue { + ready = true + break + } + } + if ready { + // Only count a pod is running when it is also ready. + startupStatus.Running++ + } else { + startupStatus.RunningButNotReady++ + } + for _, v := range FailedContainers(p) { + startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts + startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName) + } + } else if p.Status.Phase == api.PodPending { + if p.Spec.NodeName == "" { + startupStatus.Waiting++ + } else { + startupStatus.Pending++ + } + } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { + startupStatus.Inactive++ + } else if p.Status.Phase == api.PodUnknown { + startupStatus.Unknown++ + } + } + return startupStatus +} + +func (config *RCConfig) start() error { + // Don't force tests to fail if they don't care about containers restarting. + var maxContainerFailures int + if config.MaxContainerFailures == nil { + maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01)) + } else { + maxContainerFailures = *config.MaxContainerFailures + } + + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) + + PodStore := NewPodStore(config.Client, config.Namespace, label, fields.Everything()) + defer PodStore.Stop() + + interval := config.PollInterval + if interval <= 0 { + interval = 10 * time.Second + } + timeout := config.Timeout + if timeout <= 0 { + timeout = 5 * time.Minute + } + oldPods := make([]*api.Pod, 0) + oldRunning := 0 + lastChange := time.Now() + for oldRunning != config.Replicas { + time.Sleep(interval) + + pods := PodStore.List() + startupStatus := ComputeRCStartupStatus(pods, config.Replicas) + + pods = startupStatus.Created + if config.CreatedPods != nil { + *config.CreatedPods = pods + } + if !config.Silent { + config.RCConfigLog(startupStatus.String(config.Name)) + } + + if config.PodStatusFile != nil { + fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady) + } + + if startupStatus.FailedContainers > maxContainerFailures { + if config.NodeDumpFunc != nil { + config.NodeDumpFunc(config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog) + } + if config.ContainerDumpFunc != nil { + // Get the logs from the failed containers to help diagnose what caused them to fail + config.ContainerDumpFunc(config.Client, config.Namespace, config.RCConfigLog) + } + return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures) + } + if len(pods) < len(oldPods) || len(pods) > config.Replicas { + // This failure mode includes: + // kubelet is dead, so node controller deleted pods and rc creates more + // - diagnose by noting the pod diff below. + // pod is unhealthy, so replication controller creates another to take its place + // - diagnose by comparing the previous "2 Pod states" lines for inactive pods + errorStr := fmt.Sprintf("Number of reported pods for %s changed: %d vs %d", config.Name, len(pods), len(oldPods)) + config.RCConfigLog("%v, pods that changed since the last iteration:", errorStr) + config.RCConfigLog(Diff(oldPods, pods).String(sets.NewString())) + return fmt.Errorf(errorStr) + } + + if len(pods) > len(oldPods) || startupStatus.Running > oldRunning { + lastChange = time.Now() + } + oldPods = pods + oldRunning = startupStatus.Running + + if time.Since(lastChange) > timeout { + break + } + } + + if oldRunning != config.Replicas { + // List only pods from a given replication controller. + options := api.ListOptions{LabelSelector: label} + if pods, err := config.Client.Pods(api.NamespaceAll).List(options); err == nil { + + for _, pod := range pods.Items { + config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp) + } + } else { + config.RCConfigLog("Can't list pod debug info: %v", err) + } + return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas) + } + return nil +} + +// Simplified version of RunRC, that does not create RC, but creates plain Pods. +// Optionally waits for pods to start running (if waitForRunning == true). +// The number of replicas must be non-zero. +func StartPods(c *client.Client, replicas int, namespace string, podNamePrefix string, + pod api.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error { + // no pod to start + if replicas < 1 { + panic("StartPods: number of replicas must be non-zero") + } + startPodsID := string(uuid.NewUUID()) // So that we can label and find them + for i := 0; i < replicas; i++ { + podName := fmt.Sprintf("%v-%v", podNamePrefix, i) + pod.ObjectMeta.Name = podName + pod.ObjectMeta.Labels["name"] = podName + pod.ObjectMeta.Labels["startPodsID"] = startPodsID + pod.Spec.Containers[0].Name = podName + _, err := c.Pods(namespace).Create(&pod) + return err + } + logFunc("Waiting for running...") + if waitForRunning { + label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID})) + err := WaitForPodsWithLabelRunning(c, namespace, label) + return fmt.Errorf("Error waiting for %d pods to be running - probably a timeout: %v", replicas, err) + } + return nil +} + +// Wait up to 10 minutes for all matching pods to become Running and at least one +// matching pod exists. +func WaitForPodsWithLabelRunning(c *client.Client, ns string, label labels.Selector) error { + running := false + PodStore := NewPodStore(c, ns, label, fields.Everything()) + defer PodStore.Stop() +waitLoop: + for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) { + pods := PodStore.List() + if len(pods) == 0 { + continue waitLoop + } + for _, p := range pods { + if p.Status.Phase != api.PodRunning { + continue waitLoop + } + } + running = true + break + } + if !running { + return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String()) + } + return nil +}