diff --git a/test/e2e/util.go b/test/e2e/util.go index 5250f59d57f..78c7cc47f15 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -31,13 +31,16 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "code.google.com/p/go-uuid/uuid" "github.com/davecgh/go-spew/spew" @@ -95,6 +98,40 @@ type ContainerFailures struct { restarts int } +// Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}. +type podStore struct { + cache.Store + stopCh chan struct{} +} + +func newPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *podStore { + lw := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return c.Pods(namespace).List(label, field) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return c.Pods(namespace).Watch(label, field, rv) + }, + } + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + stopCh := make(chan struct{}) + cache.NewReflector(lw, &api.Pod{}, store, 0).RunUntil(stopCh) + return &podStore{store, stopCh} +} + +func (s *podStore) List() []*api.Pod { + objects := s.Store.List() + pods := make([]*api.Pod, 0) + for _, o := range objects { + pods = append(pods, o.(*api.Pod)) + } + return pods +} + +func (s *podStore) Stop() { + close(s.stopCh) +} + func Logf(format string, a ...interface{}) { fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) } @@ -643,16 +680,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) { } // Diff computes a PodDiff given 2 lists of pods. -func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff { +func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff { podInfoMap := PodDiff{} // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist. - for _, pod := range curPods.Items { + for _, pod := range curPods { podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist} } // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist. - for _, pod := range oldPods.Items { + for _, pod := range oldPods { if info, ok := podInfoMap[pod.Name]; ok { info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase) } else { @@ -707,11 +744,10 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) - pods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - current = len(pods.Items) + podStore := newPodStore(c, ns, label, fields.Everything()) + defer podStore.Stop() + pods := podStore.List() + current = len(pods) failCount := 5 for same < failCount && current < replicas { Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas) @@ -729,11 +765,8 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error last = current time.Sleep(5 * time.Second) - pods, err = listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - current = len(pods.Items) + pods = podStore.List() + current = len(pods) } if current != replicas { return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) @@ -745,7 +778,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error last = 0 failCount = 10 current = 0 - oldPods := &api.PodList{} + oldPods := make([]*api.Pod, 0) for same < failCount && current < replicas { current = 0 waiting := 0 @@ -753,18 +786,13 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error unknown := 0 inactive := 0 failedContainers := 0 - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) - // TODO: Use a reflector both to put less strain on the cluster and - // for more clarity. - currentPods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - for _, p := range currentPods.Items { + currentPods := podStore.List() + for _, p := range currentPods { if p.Status.Phase == api.PodRunning { current++ - for _, v := range FailedContainers(p) { + for _, v := range FailedContainers(*p) { failedContainers = failedContainers + v.restarts } } else if p.Status.Phase == api.PodPending { @@ -781,14 +809,14 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error } Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) - if len(currentPods.Items) != len(pods.Items) { + if len(currentPods) != len(pods) { // This failure mode includes: // kubelet is dead, so node controller deleted pods and rc creates more // - diagnose by noting the pod diff below. // pod is unhealthy, so replication controller creates another to take its place // - diagnose by comparing the previous "2 Pod states" lines for inactive pods - errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items)) + errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(pods)) Logf("%v, pods that changed since the last iteration:", errorStr) Diff(oldPods, currentPods).Print(util.NewStringSet()) return fmt.Errorf(errorStr) @@ -811,7 +839,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error // stuck in pending. errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) Logf("%v, pods currently in pending:", errorStr) - Diff(currentPods, &api.PodList{}).Print(util.NewStringSet(string(api.PodRunning))) + Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) return fmt.Errorf(errorStr) } last = current @@ -844,13 +872,11 @@ func ScaleRC(c *client.Client, ns, name string, size uint) error { func waitForRCPodsRunning(c *client.Client, ns, rcName string) error { running := false label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName})) + podStore := newPodStore(c, ns, label, fields.Everything()) + defer podStore.Stop() for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) { - pods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - Logf("Error listing pods: %v", err) - continue - } - for _, p := range pods.Items { + pods := podStore.List() + for _, p := range pods { if p.Status.Phase != api.PodRunning { continue } @@ -878,19 +904,6 @@ func DeleteRC(c *client.Client, ns, name string) error { return err } -// Convenient wrapper around listing pods supporting retries. -func listPods(c *client.Client, namespace string, label labels.Selector, field fields.Selector) (*api.PodList, error) { - maxRetries := 4 - pods, err := c.Pods(namespace).List(label, field) - for i := 0; i < maxRetries; i++ { - if err == nil { - return pods, nil - } - pods, err = c.Pods(namespace).List(label, field) - } - return pods, err -} - // FailedContainers inspects all containers in a pod and returns failure // information for containers that have failed or been restarted. // A map is returned where the key is the containerID and the value is a