From ec5e7093b6cae65ee31c4ff22db45c71d6936891 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Wed, 6 May 2015 12:53:54 -0400 Subject: [PATCH] Created a fifo queue containing results from listing pods and checks from from the queue to generate finer granularity #7572 --- test/e2e/util.go | 281 +++++++++++++++++++++++++++++------------------ 1 file changed, 176 insertions(+), 105 deletions(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index 6139ff05d9c..79fcca78478 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -27,6 +27,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "code.google.com/p/go-uuid/uuid" @@ -74,6 +75,59 @@ type ContainerFailures struct { restarts int } +type QueueItem struct { + createTime string + value interface{} +} + +type QueueItems struct { + pos int + mutex *sync.Mutex + list []QueueItem +} + +type FifoQueue QueueItems + +func (fq *FifoQueue) Push(elem interface{}) { + fq.mutex.Lock() + fq.list = append(fq.list, QueueItem{time.Now().String(), elem}) + fq.mutex.Unlock() +} + +func (fq *FifoQueue) Pop() QueueItem { + fq.mutex.Lock() + var val QueueItem + if len(fq.list) >= fq.pos { + val = fq.list[fq.pos] + fq.pos++ + } + fq.mutex.Unlock() + return val +} + +func (fq FifoQueue) Len() int { + return len(fq.list[fq.pos:]) +} + +func (fq *FifoQueue) First() QueueItem { + return fq.list[fq.pos] +} + +func (fq *FifoQueue) Last() QueueItem { + return fq.list[len(fq.list)] +} + +func (fq *FifoQueue) Reset() { + fq.pos = 0 +} + +func newFifoQueue() *FifoQueue { + tmp := new(FifoQueue) + tmp.mutex = &sync.Mutex{} + tmp.pos = 0 + return tmp +} + func Logf(format string, a ...interface{}) { fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) } @@ -451,16 +505,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) { } // Diff computes a PodDiff given 2 lists of pods. -func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff { +func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff { podInfoMap := PodDiff{} // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist. - for _, pod := range curPods.Items { + for _, pod := range curPods { podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.Host, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist} } // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist. - for _, pod := range oldPods.Items { + for _, pod := range oldPods { if info, ok := podInfoMap[pod.Name]; ok { info.oldHostname, info.oldPhase = pod.Spec.Host, string(pod.Status.Phase) } else { @@ -480,6 +534,8 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) current := 0 same := 0 + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) + podLists := newFifoQueue() By(fmt.Sprintf("Creating replication controller %s", name)) rc := &api.ReplicationController{ @@ -513,39 +569,51 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta } Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas) + // Create a routine to query for the list of pods + stop := make(chan struct{}) + go func(stop <-chan struct{}, n string, ns string, l labels.Selector) { + for { + select { + case <-stop: + return + default: + p, err := c.Pods(ns).List(l, fields.Everything()) + if err != nil { + Logf("Warning: Failed to get pod list: %v", err) + } else { + podLists.Push(p.Items) + } + time.Sleep(1 * time.Second) + } + } + }(stop, name, ns, label) + By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) - label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) - pods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - current = len(pods.Items) failCount := 5 for same < failCount && current < replicas { - msg := fmt.Sprintf("Controller %s: Found %d pods out of %d", name, current, replicas) - Logf(msg) - if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) - } - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) - } + time.Sleep(2 * time.Second) + for item := podLists.Pop(); podLists.Len() > 0 && current < replicas; item = podLists.Pop() { + pods := item.value.([]api.Pod) + current = len(pods) + msg := fmt.Sprintf("Controller %s: Found %d pods out of %d", name, current, replicas) + Logf(msg) + if podStatusFile != nil { + fmt.Fprintf(podStatusFile, "%s: %s\n", item.createTime, msg) + } + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { + return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) + } - if same >= failCount { - return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) - } + if same >= failCount { + return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) + } - last = current - time.Sleep(5 * time.Second) - pods, err = listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) + last = current } - current = len(pods.Items) } if current != replicas { return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) @@ -561,86 +629,89 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta last = 0 failCount = 10 current = 0 - oldPods := &api.PodList{} + var oldPods []api.Pod + podLists.Reset() + foundAllPods := false for same < failCount && current < replicas { - current = 0 - waiting := 0 - pending := 0 - unknown := 0 - inactive := 0 - failedContainers := 0 - time.Sleep(10 * time.Second) + time.Sleep(2 * time.Second) + for item := podLists.Pop(); podLists.Len() > 0 && current < replicas; item = podLists.Pop() { + current = 0 + waiting := 0 + pending := 0 + unknown := 0 + inactive := 0 + failedContainers := 0 + currentPods := item.value.([]api.Pod) + for _, p := range currentPods { + if p.Status.Phase == api.PodRunning { + current++ + for _, v := range FailedContainers(p) { + failedContainers = failedContainers + v.restarts + } + } else if p.Status.Phase == api.PodPending { + if p.Spec.Host == "" { + waiting++ + } else { + pending++ + } + } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { + inactive++ + } else if p.Status.Phase == api.PodUnknown { + unknown++ + } + } + msg := fmt.Sprintf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) + Logf(msg) + if podStatusFile != nil { + fmt.Fprintf(podStatusFile, "%s: %s\n", item.createTime, msg) + } - // TODO: Use a reflector both to put less strain on the cluster and - // for more clarity. - currentPods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - for _, p := range currentPods.Items { - if p.Status.Phase == api.PodRunning { - current++ - for _, v := range FailedContainers(p) { - failedContainers = failedContainers + v.restarts - } - } else if p.Status.Phase == api.PodPending { - if p.Spec.Host == "" { - waiting++ - } else { - pending++ - } - } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { - inactive++ - } else if p.Status.Phase == api.PodUnknown { - unknown++ + if foundAllPods && len(currentPods) != len(oldPods) { + + // This failure mode includes: + // kubelet is dead, so node controller deleted pods and rc creates more + // - diagnose by noting the pod diff below. + // pod is unhealthy, so replication controller creates another to take its place + // - diagnose by comparing the previous "2 Pod states" lines for inactive pods + errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods)) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { + + // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. + errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if same >= failCount { + + // Most times this happens because a few nodes have kubelet problems, and their pods are + // stuck in pending. + errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) + Logf("%v, pods currently in pending:", errorStr) + Diff(currentPods, make([]api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) + return fmt.Errorf(errorStr) + } + + if !foundAllPods { + foundAllPods = len(currentPods) == replicas + } + last = current + oldPods = currentPods + + if failedContainers > maxContainerFailures { + return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) } } - msg := fmt.Sprintf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) - Logf(msg) - if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) - } - - if len(currentPods.Items) != len(pods.Items) { - - // This failure mode includes: - // kubelet is dead, so node controller deleted pods and rc creates more - // - diagnose by noting the pod diff below. - // pod is unhealthy, so replication controller creates another to take its place - // - diagnose by comparing the previous "2 Pod states" lines for inactive pods - errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items)) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - - // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. - errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if same >= failCount { - - // Most times this happens because a few nodes have kubelet problems, and their pods are - // stuck in pending. - errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) - Logf("%v, pods currently in pending:", errorStr) - Diff(currentPods, &api.PodList{}).Print(util.NewStringSet(string(api.PodRunning))) - return fmt.Errorf(errorStr) - } - last = current - oldPods = currentPods - - if failedContainers > maxContainerFailures { - return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) - } } + close(stop) if current != replicas { return fmt.Errorf("Only %d pods started out of %d", current, replicas) }