From a89121cb708363b4047759f7c53bd13d8685a108 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Thu, 30 Apr 2015 11:32:46 -0400 Subject: [PATCH 1/9] Added metrics/debug gathering methods to utils and used them in density #7572 --- test/e2e/density.go | 35 ++++++++++++++++++++++++++++++++--- test/e2e/load.go | 2 +- test/e2e/scale.go | 2 +- test/e2e/util.go | 42 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index 6a5fe3e67b8..e32fdae024d 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -19,6 +19,7 @@ package e2e import ( "fmt" "math" + "os" "strconv" "time" @@ -36,6 +37,26 @@ import ( . "github.com/onsi/gomega" ) +func writePerfData(c *client.Client, dirName string, postfix string) { + defer GinkgoRecover() + + hdnl, err := os.Create(fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)) + expectNoError(err) + metrics, err := GetMetrics(c) + expectNoError(err) + _, err = hdnl.WriteString(metrics) + expectNoError(err) + expectNoError(hdnl.Close()) + debug, err := GetDebugInfo(c) + for key, value := range debug { + hdnl, err = os.Create(fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)) + expectNoError(err) + _, err = hdnl.WriteString(value) + expectNoError(err) + expectNoError(hdnl.Close()) + } +} + // This test suite can take a long time to run, so by default it is added to // the ginkgo.skip list (see driver.go). // To run this suite you must explicitly ask for it by setting the @@ -45,6 +66,7 @@ var _ = Describe("Density", func() { var minionCount int var RCName string var ns string + var uuid string BeforeEach(func() { var err error @@ -57,6 +79,9 @@ var _ = Describe("Density", func() { nsForTesting, err := createTestingNS("density", c) ns = nsForTesting.Name expectNoError(err) + uuid = string(util.NewUUID()) + expectNoError(os.Mkdir(uuid, 0777)) + writePerfData(c, uuid, "before") }) AfterEach(func() { @@ -82,6 +107,7 @@ var _ = Describe("Density", func() { highLatencyRequests, err := HighLatencyRequests(c, 10*time.Second, util.NewStringSet("events")) expectNoError(err) Expect(highLatencyRequests).NotTo(BeNumerically(">", 0)) + writePerfData(c, uuid, "after") }) // Tests with "Skipped" substring in their name will be skipped when running @@ -112,8 +138,10 @@ var _ = Describe("Density", func() { itArg := testArg It(name, func() { totalPods := itArg.podsPerMinion * minionCount - nameStr := strconv.Itoa(totalPods) + "-" + string(util.NewUUID()) - RCName = "my-hostname-density" + nameStr + RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid + fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.txt", uuid)) + expectNoError(err) + defer fileHndl.Close() // Create a listener for events. events := make([](*api.Event), 0) @@ -139,9 +167,10 @@ var _ = Describe("Density", func() { // Start the replication controller. startTime := time.Now() - expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods)) + expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods, fileHndl)) e2eStartupTime := time.Now().Sub(startTime) Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) + fmt.Fprintf(fileHndl, "E2E startup time for %d pods: %v\n", totalPods, e2eStartupTime) By("Waiting for all events to be recorded") last := -1 diff --git a/test/e2e/load.go b/test/e2e/load.go index 8e3dad52431..29ca0297960 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -120,7 +120,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int) // Once every 1-2 minutes perform resize of RC. for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) { if !rcExist { - expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) + expectNoError(RunRC(c, name, ns, image, size, nil), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) rcExist = true } // Resize RC to a random size between 0.5x and 1.5x of the original size. diff --git a/test/e2e/scale.go b/test/e2e/scale.go index a787c28f731..d7e43dddc3c 100644 --- a/test/e2e/scale.go +++ b/test/e2e/scale.go @@ -107,7 +107,7 @@ var _ = Describe("Scale", func() { for i := 0; i < itArg.rcsPerThread; i++ { name := "my-short-lived-pod" + string(util.NewUUID()) n := itArg.podsPerMinion * minionCount - expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n)) + expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n, nil)) podsLaunched += n Logf("Launched %v pods so far...", podsLaunched) err := DeleteRC(c, ns, name) diff --git a/test/e2e/util.go b/test/e2e/util.go index a05b538655c..df118c643d5 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -474,7 +474,7 @@ func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff { // It will waits for all pods it spawns to become "Running". // It's the caller's responsibility to clean up externally (i.e. use the // namespace lifecycle for handling cleanup). -func RunRC(c *client.Client, name string, ns, image string, replicas int) error { +func RunRC(c *client.Client, name string, ns, image string, replicas int, podStatusFile *os.File) error { var last int maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) @@ -522,7 +522,11 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error current = len(pods.Items) failCount := 5 for same < failCount && current < replicas { - Logf("Controller %s: Found %d pods out of %d", name, current, replicas) + msg := fmt.Sprintf("Controller %s: Found %d pods out of %d", name, current, replicas) + Logf(msg) + if podStatusFile != nil { + fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) + } if last < current { same = 0 } else if last == current { @@ -546,7 +550,11 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error if current != replicas { return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) } - Logf("Controller %s in ns %s: Found %d pods out of %d", name, ns, current, replicas) + msg := fmt.Sprintf("Controller %s in ns %s: Found %d pods out of %d", name, ns, current, replicas) + Logf(msg) + if podStatusFile != nil { + fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) + } By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures)) same = 0 @@ -587,7 +595,11 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error unknown++ } } - Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) + msg := fmt.Sprintf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) + Logf(msg) + if podStatusFile != nil { + fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) + } if len(currentPods.Items) != len(pods.Items) { @@ -936,3 +948,25 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou return len(badMetrics), nil } + +// Retrieve metrics information +func GetMetrics(c *client.Client) (string, error) { + body, err := c.Get().AbsPath("/metrics").DoRaw() + if err != nil { + return "", err + } + return string(body), nil +} + +// Retrieve debug information +func GetDebugInfo(c *client.Client) (map[string]string, error) { + data := make(map[string]string) + for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} { + body, err := c.Get().AbsPath(fmt.Sprintf("/debug/pprof/%s", key)).DoRaw() + if err != nil { + Logf("Warning: Error trying to fetch %s debug data: %v", key, err) + } + data[key] = string(body) + } + return data, nil +} From bd3306c84568c68ade05cc97d4b7644e7e25ac93 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Fri, 1 May 2015 09:24:16 -0400 Subject: [PATCH 2/9] Moved writePerfData to utils. #7572 --- test/e2e/density.go | 24 ++------------------- test/e2e/util.go | 51 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index e32fdae024d..0878ac833a8 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -37,26 +37,6 @@ import ( . "github.com/onsi/gomega" ) -func writePerfData(c *client.Client, dirName string, postfix string) { - defer GinkgoRecover() - - hdnl, err := os.Create(fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)) - expectNoError(err) - metrics, err := GetMetrics(c) - expectNoError(err) - _, err = hdnl.WriteString(metrics) - expectNoError(err) - expectNoError(hdnl.Close()) - debug, err := GetDebugInfo(c) - for key, value := range debug { - hdnl, err = os.Create(fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)) - expectNoError(err) - _, err = hdnl.WriteString(value) - expectNoError(err) - expectNoError(hdnl.Close()) - } -} - // This test suite can take a long time to run, so by default it is added to // the ginkgo.skip list (see driver.go). // To run this suite you must explicitly ask for it by setting the @@ -81,7 +61,7 @@ var _ = Describe("Density", func() { expectNoError(err) uuid = string(util.NewUUID()) expectNoError(os.Mkdir(uuid, 0777)) - writePerfData(c, uuid, "before") + expectNoError(writePerfData(c, uuid, "before")) }) AfterEach(func() { @@ -107,7 +87,7 @@ var _ = Describe("Density", func() { highLatencyRequests, err := HighLatencyRequests(c, 10*time.Second, util.NewStringSet("events")) expectNoError(err) Expect(highLatencyRequests).NotTo(BeNumerically(">", 0)) - writePerfData(c, uuid, "after") + expectNoError(writePerfData(c, uuid, "after")) }) // Tests with "Skipped" substring in their name will be skipped when running diff --git a/test/e2e/util.go b/test/e2e/util.go index df118c643d5..6139ff05d9c 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -950,7 +950,7 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou } // Retrieve metrics information -func GetMetrics(c *client.Client) (string, error) { +func getMetrics(c *client.Client) (string, error) { body, err := c.Get().AbsPath("/metrics").DoRaw() if err != nil { return "", err @@ -959,7 +959,7 @@ func GetMetrics(c *client.Client) (string, error) { } // Retrieve debug information -func GetDebugInfo(c *client.Client) (map[string]string, error) { +func getDebugInfo(c *client.Client) (map[string]string, error) { data := make(map[string]string) for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} { body, err := c.Get().AbsPath(fmt.Sprintf("/debug/pprof/%s", key)).DoRaw() @@ -970,3 +970,50 @@ func GetDebugInfo(c *client.Client) (map[string]string, error) { } return data, nil } + +func writePerfData(c *client.Client, dirName string, postfix string) error { + fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix) + + hdnl, err := os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + + metrics, err := getMetrics(c) + if err != nil { + return fmt.Errorf("Error retrieving metrics: %v", err) + } + + _, err = hdnl.WriteString(metrics) + if err != nil { + return fmt.Errorf("Error writing metrics: %v", err) + } + + err = hdnl.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + + debug, err := getDebugInfo(c) + if err != nil { + return fmt.Errorf("Error retrieving debug information: %v", err) + } + + for key, value := range debug { + fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix) + hdnl, err = os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + _, err = hdnl.WriteString(value) + if err != nil { + return fmt.Errorf("Error writing %s: %v", key, err) + } + + err = hdnl.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + } + return nil +} From ec5e7093b6cae65ee31c4ff22db45c71d6936891 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Wed, 6 May 2015 12:53:54 -0400 Subject: [PATCH 3/9] Created a fifo queue containing results from listing pods and checks from from the queue to generate finer granularity #7572 --- test/e2e/util.go | 281 +++++++++++++++++++++++++++++------------------ 1 file changed, 176 insertions(+), 105 deletions(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index 6139ff05d9c..79fcca78478 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -27,6 +27,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "code.google.com/p/go-uuid/uuid" @@ -74,6 +75,59 @@ type ContainerFailures struct { restarts int } +type QueueItem struct { + createTime string + value interface{} +} + +type QueueItems struct { + pos int + mutex *sync.Mutex + list []QueueItem +} + +type FifoQueue QueueItems + +func (fq *FifoQueue) Push(elem interface{}) { + fq.mutex.Lock() + fq.list = append(fq.list, QueueItem{time.Now().String(), elem}) + fq.mutex.Unlock() +} + +func (fq *FifoQueue) Pop() QueueItem { + fq.mutex.Lock() + var val QueueItem + if len(fq.list) >= fq.pos { + val = fq.list[fq.pos] + fq.pos++ + } + fq.mutex.Unlock() + return val +} + +func (fq FifoQueue) Len() int { + return len(fq.list[fq.pos:]) +} + +func (fq *FifoQueue) First() QueueItem { + return fq.list[fq.pos] +} + +func (fq *FifoQueue) Last() QueueItem { + return fq.list[len(fq.list)] +} + +func (fq *FifoQueue) Reset() { + fq.pos = 0 +} + +func newFifoQueue() *FifoQueue { + tmp := new(FifoQueue) + tmp.mutex = &sync.Mutex{} + tmp.pos = 0 + return tmp +} + func Logf(format string, a ...interface{}) { fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) } @@ -451,16 +505,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) { } // Diff computes a PodDiff given 2 lists of pods. -func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff { +func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff { podInfoMap := PodDiff{} // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist. - for _, pod := range curPods.Items { + for _, pod := range curPods { podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.Host, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist} } // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist. - for _, pod := range oldPods.Items { + for _, pod := range oldPods { if info, ok := podInfoMap[pod.Name]; ok { info.oldHostname, info.oldPhase = pod.Spec.Host, string(pod.Status.Phase) } else { @@ -480,6 +534,8 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) current := 0 same := 0 + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) + podLists := newFifoQueue() By(fmt.Sprintf("Creating replication controller %s", name)) rc := &api.ReplicationController{ @@ -513,39 +569,51 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta } Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas) + // Create a routine to query for the list of pods + stop := make(chan struct{}) + go func(stop <-chan struct{}, n string, ns string, l labels.Selector) { + for { + select { + case <-stop: + return + default: + p, err := c.Pods(ns).List(l, fields.Everything()) + if err != nil { + Logf("Warning: Failed to get pod list: %v", err) + } else { + podLists.Push(p.Items) + } + time.Sleep(1 * time.Second) + } + } + }(stop, name, ns, label) + By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) - label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) - pods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - current = len(pods.Items) failCount := 5 for same < failCount && current < replicas { - msg := fmt.Sprintf("Controller %s: Found %d pods out of %d", name, current, replicas) - Logf(msg) - if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) - } - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) - } + time.Sleep(2 * time.Second) + for item := podLists.Pop(); podLists.Len() > 0 && current < replicas; item = podLists.Pop() { + pods := item.value.([]api.Pod) + current = len(pods) + msg := fmt.Sprintf("Controller %s: Found %d pods out of %d", name, current, replicas) + Logf(msg) + if podStatusFile != nil { + fmt.Fprintf(podStatusFile, "%s: %s\n", item.createTime, msg) + } + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { + return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) + } - if same >= failCount { - return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) - } + if same >= failCount { + return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) + } - last = current - time.Sleep(5 * time.Second) - pods, err = listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) + last = current } - current = len(pods.Items) } if current != replicas { return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) @@ -561,86 +629,89 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta last = 0 failCount = 10 current = 0 - oldPods := &api.PodList{} + var oldPods []api.Pod + podLists.Reset() + foundAllPods := false for same < failCount && current < replicas { - current = 0 - waiting := 0 - pending := 0 - unknown := 0 - inactive := 0 - failedContainers := 0 - time.Sleep(10 * time.Second) + time.Sleep(2 * time.Second) + for item := podLists.Pop(); podLists.Len() > 0 && current < replicas; item = podLists.Pop() { + current = 0 + waiting := 0 + pending := 0 + unknown := 0 + inactive := 0 + failedContainers := 0 + currentPods := item.value.([]api.Pod) + for _, p := range currentPods { + if p.Status.Phase == api.PodRunning { + current++ + for _, v := range FailedContainers(p) { + failedContainers = failedContainers + v.restarts + } + } else if p.Status.Phase == api.PodPending { + if p.Spec.Host == "" { + waiting++ + } else { + pending++ + } + } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { + inactive++ + } else if p.Status.Phase == api.PodUnknown { + unknown++ + } + } + msg := fmt.Sprintf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) + Logf(msg) + if podStatusFile != nil { + fmt.Fprintf(podStatusFile, "%s: %s\n", item.createTime, msg) + } - // TODO: Use a reflector both to put less strain on the cluster and - // for more clarity. - currentPods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - for _, p := range currentPods.Items { - if p.Status.Phase == api.PodRunning { - current++ - for _, v := range FailedContainers(p) { - failedContainers = failedContainers + v.restarts - } - } else if p.Status.Phase == api.PodPending { - if p.Spec.Host == "" { - waiting++ - } else { - pending++ - } - } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { - inactive++ - } else if p.Status.Phase == api.PodUnknown { - unknown++ + if foundAllPods && len(currentPods) != len(oldPods) { + + // This failure mode includes: + // kubelet is dead, so node controller deleted pods and rc creates more + // - diagnose by noting the pod diff below. + // pod is unhealthy, so replication controller creates another to take its place + // - diagnose by comparing the previous "2 Pod states" lines for inactive pods + errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods)) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { + + // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. + errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if same >= failCount { + + // Most times this happens because a few nodes have kubelet problems, and their pods are + // stuck in pending. + errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) + Logf("%v, pods currently in pending:", errorStr) + Diff(currentPods, make([]api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) + return fmt.Errorf(errorStr) + } + + if !foundAllPods { + foundAllPods = len(currentPods) == replicas + } + last = current + oldPods = currentPods + + if failedContainers > maxContainerFailures { + return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) } } - msg := fmt.Sprintf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) - Logf(msg) - if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) - } - - if len(currentPods.Items) != len(pods.Items) { - - // This failure mode includes: - // kubelet is dead, so node controller deleted pods and rc creates more - // - diagnose by noting the pod diff below. - // pod is unhealthy, so replication controller creates another to take its place - // - diagnose by comparing the previous "2 Pod states" lines for inactive pods - errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items)) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - - // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. - errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if same >= failCount { - - // Most times this happens because a few nodes have kubelet problems, and their pods are - // stuck in pending. - errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) - Logf("%v, pods currently in pending:", errorStr) - Diff(currentPods, &api.PodList{}).Print(util.NewStringSet(string(api.PodRunning))) - return fmt.Errorf(errorStr) - } - last = current - oldPods = currentPods - - if failedContainers > maxContainerFailures { - return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) - } } + close(stop) if current != replicas { return fmt.Errorf("Only %d pods started out of %d", current, replicas) } From ac282bd50a030e54b6753eac926bb0e40822cb00 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Thu, 7 May 2015 11:41:52 -0400 Subject: [PATCH 4/9] Cleaned up the output files. #7572 --- test/e2e/density.go | 3 +-- test/e2e/util.go | 32 ++++++++++++++------------------ 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index 0878ac833a8..7581e263e07 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -119,7 +119,7 @@ var _ = Describe("Density", func() { It(name, func() { totalPods := itArg.podsPerMinion * minionCount RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid - fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.txt", uuid)) + fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.csv", uuid)) expectNoError(err) defer fileHndl.Close() @@ -150,7 +150,6 @@ var _ = Describe("Density", func() { expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods, fileHndl)) e2eStartupTime := time.Now().Sub(startTime) Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) - fmt.Fprintf(fileHndl, "E2E startup time for %d pods: %v\n", totalPods, e2eStartupTime) By("Waiting for all events to be recorded") last := -1 diff --git a/test/e2e/util.go b/test/e2e/util.go index 79fcca78478..9af6577749e 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "math" "math/rand" + "net/http" "os" "os/exec" "path/filepath" @@ -97,7 +98,7 @@ func (fq *FifoQueue) Push(elem interface{}) { func (fq *FifoQueue) Pop() QueueItem { fq.mutex.Lock() var val QueueItem - if len(fq.list) >= fq.pos { + if len(fq.list)-1 >= fq.pos { val = fq.list[fq.pos] fq.pos++ } @@ -114,7 +115,7 @@ func (fq *FifoQueue) First() QueueItem { } func (fq *FifoQueue) Last() QueueItem { - return fq.list[len(fq.list)] + return fq.list[len(fq.list)-1] } func (fq *FifoQueue) Reset() { @@ -592,14 +593,11 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta failCount := 5 for same < failCount && current < replicas { time.Sleep(2 * time.Second) - for item := podLists.Pop(); podLists.Len() > 0 && current < replicas; item = podLists.Pop() { + for podLists.Len() > 0 && current < replicas { + item := podLists.Pop() pods := item.value.([]api.Pod) current = len(pods) - msg := fmt.Sprintf("Controller %s: Found %d pods out of %d", name, current, replicas) - Logf(msg) - if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s: %s\n", item.createTime, msg) - } + Logf("Controller %s: Found %d pods out of %d", name, current, replicas) if last < current { same = 0 } else if last == current { @@ -618,11 +616,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta if current != replicas { return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) } - msg := fmt.Sprintf("Controller %s in ns %s: Found %d pods out of %d", name, ns, current, replicas) - Logf(msg) - if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s: %s\n", time.Now().String(), msg) - } + Logf("Controller %s in ns %s: Found %d pods out of %d", name, ns, current, replicas) By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures)) same = 0 @@ -634,7 +628,8 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta foundAllPods := false for same < failCount && current < replicas { time.Sleep(2 * time.Second) - for item := podLists.Pop(); podLists.Len() > 0 && current < replicas; item = podLists.Pop() { + for podLists.Len() > 0 && current < replicas { + item := podLists.Pop() current = 0 waiting := 0 pending := 0 @@ -660,10 +655,9 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta unknown++ } } - msg := fmt.Sprintf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) - Logf(msg) + Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s: %s\n", item.createTime, msg) + fmt.Fprintf(podStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown) } if foundAllPods && len(currentPods) != len(oldPods) { @@ -1033,7 +1027,9 @@ func getMetrics(c *client.Client) (string, error) { func getDebugInfo(c *client.Client) (map[string]string, error) { data := make(map[string]string) for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} { - body, err := c.Get().AbsPath(fmt.Sprintf("/debug/pprof/%s", key)).DoRaw() + resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2") + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() if err != nil { Logf("Warning: Error trying to fetch %s debug data: %v", key, err) } From b78576386f0f25ea2401e8efa829dcf618c555ad Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Thu, 7 May 2015 13:22:57 -0400 Subject: [PATCH 5/9] Use getMetrics in ReadLatencyMetrics. #7572 --- test/e2e/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index 9af6577749e..1ed3650de73 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -957,7 +957,7 @@ type LatencyMetric struct { } func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { - body, err := c.Get().AbsPath("/metrics").DoRaw() + body, err := getMetrics(c) if err != nil { return nil, err } From 9caee9ad1681f0241228c2141beba42397870255 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Fri, 8 May 2015 10:33:16 -0400 Subject: [PATCH 6/9] Moved FifoQueue to separate file and minor cleanup. #7572 --- test/e2e/fifo_queue.go | 75 ++++++++++++++++++++++++++++++++++++++++++ test/e2e/util.go | 66 ++++--------------------------------- 2 files changed, 81 insertions(+), 60 deletions(-) create mode 100644 test/e2e/fifo_queue.go diff --git a/test/e2e/fifo_queue.go b/test/e2e/fifo_queue.go new file mode 100644 index 00000000000..0942c37df72 --- /dev/null +++ b/test/e2e/fifo_queue.go @@ -0,0 +1,75 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "sync" + "time" +) + +type QueueItem struct { + createTime string + value interface{} +} + +type QueueItems struct { + pos int + mutex *sync.Mutex + list []QueueItem +} + +type FifoQueue QueueItems + +func (fq *FifoQueue) Push(elem interface{}) { + fq.mutex.Lock() + fq.list = append(fq.list, QueueItem{time.Now().String(), elem}) + fq.mutex.Unlock() +} + +func (fq *FifoQueue) Pop() QueueItem { + fq.mutex.Lock() + var val QueueItem + if len(fq.list)-1 >= fq.pos { + val = fq.list[fq.pos] + fq.pos++ + } + fq.mutex.Unlock() + return val +} + +func (fq FifoQueue) Len() int { + return len(fq.list[fq.pos:]) +} + +func (fq *FifoQueue) First() QueueItem { + return fq.list[fq.pos] +} + +func (fq *FifoQueue) Last() QueueItem { + return fq.list[len(fq.list)-1] +} + +func (fq *FifoQueue) Reset() { + fq.pos = 0 +} + +func newFifoQueue() *FifoQueue { + tmp := new(FifoQueue) + tmp.mutex = &sync.Mutex{} + tmp.pos = 0 + return tmp +} diff --git a/test/e2e/util.go b/test/e2e/util.go index 1ed3650de73..ba2f3021cb2 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -28,7 +28,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "time" "code.google.com/p/go-uuid/uuid" @@ -76,59 +75,6 @@ type ContainerFailures struct { restarts int } -type QueueItem struct { - createTime string - value interface{} -} - -type QueueItems struct { - pos int - mutex *sync.Mutex - list []QueueItem -} - -type FifoQueue QueueItems - -func (fq *FifoQueue) Push(elem interface{}) { - fq.mutex.Lock() - fq.list = append(fq.list, QueueItem{time.Now().String(), elem}) - fq.mutex.Unlock() -} - -func (fq *FifoQueue) Pop() QueueItem { - fq.mutex.Lock() - var val QueueItem - if len(fq.list)-1 >= fq.pos { - val = fq.list[fq.pos] - fq.pos++ - } - fq.mutex.Unlock() - return val -} - -func (fq FifoQueue) Len() int { - return len(fq.list[fq.pos:]) -} - -func (fq *FifoQueue) First() QueueItem { - return fq.list[fq.pos] -} - -func (fq *FifoQueue) Last() QueueItem { - return fq.list[len(fq.list)-1] -} - -func (fq *FifoQueue) Reset() { - fq.pos = 0 -} - -func newFifoQueue() *FifoQueue { - tmp := new(FifoQueue) - tmp.mutex = &sync.Mutex{} - tmp.pos = 0 - return tmp -} - func Logf(format string, a ...interface{}) { fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) } @@ -1041,7 +987,7 @@ func getDebugInfo(c *client.Client) (map[string]string, error) { func writePerfData(c *client.Client, dirName string, postfix string) error { fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix) - hdnl, err := os.Create(fname) + handler, err := os.Create(fname) if err != nil { return fmt.Errorf("Error creating file '%s': %v", fname, err) } @@ -1051,12 +997,12 @@ func writePerfData(c *client.Client, dirName string, postfix string) error { return fmt.Errorf("Error retrieving metrics: %v", err) } - _, err = hdnl.WriteString(metrics) + _, err = handler.WriteString(metrics) if err != nil { return fmt.Errorf("Error writing metrics: %v", err) } - err = hdnl.Close() + err = handler.Close() if err != nil { return fmt.Errorf("Error closing '%s': %v", fname, err) } @@ -1068,16 +1014,16 @@ func writePerfData(c *client.Client, dirName string, postfix string) error { for key, value := range debug { fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix) - hdnl, err = os.Create(fname) + handler, err = os.Create(fname) if err != nil { return fmt.Errorf("Error creating file '%s': %v", fname, err) } - _, err = hdnl.WriteString(value) + _, err = handler.WriteString(value) if err != nil { return fmt.Errorf("Error writing %s: %v", key, err) } - err = hdnl.Close() + err = handler.Close() if err != nil { return fmt.Errorf("Error closing '%s': %v", fname, err) } From 9e06132ed3255ca0aa68f67075eee28f53117310 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Thu, 14 May 2015 14:54:22 -0400 Subject: [PATCH 7/9] Minor cleanup. #7572 --- test/e2e/density.go | 3 ++- test/e2e/util.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index 7581e263e07..2d269ff0451 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -81,13 +81,14 @@ var _ = Describe("Density", func() { Failf("Couldn't delete ns %s", err) } + expectNoError(writePerfData(c, uuid, "after")) + // Verify latency metrics // TODO: Update threshold to 1s once we reach this goal // TODO: We should reset metrics before the test. Currently previous tests influence latency metrics. highLatencyRequests, err := HighLatencyRequests(c, 10*time.Second, util.NewStringSet("events")) expectNoError(err) Expect(highLatencyRequests).NotTo(BeNumerically(">", 0)) - expectNoError(writePerfData(c, uuid, "after")) }) // Tests with "Skipped" substring in their name will be skipped when running diff --git a/test/e2e/util.go b/test/e2e/util.go index ba2f3021cb2..67b1cfe051e 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -534,6 +534,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta } } }(stop, name, ns, label) + defer close(stop) By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) failCount := 5 From 7361f751a644036a788b78a6f4b029efc6a958ca Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Fri, 15 May 2015 14:34:34 -0400 Subject: [PATCH 8/9] Create a config struct for RunRC and allow polling interval to be configurable. #7572 --- test/e2e/density.go | 21 ++++++++++++++++----- test/e2e/load.go | 8 +++++++- test/e2e/scale.go | 10 +++++++++- test/e2e/util.go | 43 ++++++++++++++++++++++++++++++++----------- 4 files changed, 64 insertions(+), 18 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index 2d269ff0451..90c02056977 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -96,16 +96,18 @@ var _ = Describe("Density", func() { type Density struct { skip bool podsPerMinion int + /* Controls how often the apiserver is polled for pods */ + interval int } densityTests := []Density{ // This test should always run, even if larger densities are skipped. - {podsPerMinion: 3, skip: false}, - {podsPerMinion: 30, skip: false}, + {podsPerMinion: 3, skip: false, interval: 10}, + {podsPerMinion: 30, skip: false, interval: 10}, // More than 30 pods per node is outside our v1.0 goals. // We might want to enable those tests in the future. - {podsPerMinion: 50, skip: true}, - {podsPerMinion: 100, skip: true}, + {podsPerMinion: 50, skip: true, interval: 10}, + {podsPerMinion: 100, skip: true, interval: 1}, } for _, testArg := range densityTests { @@ -124,6 +126,15 @@ var _ = Describe("Density", func() { expectNoError(err) defer fileHndl.Close() + config := RCConfig{Client: c, + Image: "gcr.io/google_containers/pause:go", + Name: RCName, + Namespace: ns, + PollInterval: itArg.interval, + PodStatusFile: fileHndl, + Replicas: totalPods, + } + // Create a listener for events. events := make([](*api.Event), 0) _, controller := framework.NewInformer( @@ -148,7 +159,7 @@ var _ = Describe("Density", func() { // Start the replication controller. startTime := time.Now() - expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods, fileHndl)) + expectNoError(RunRC(config)) e2eStartupTime := time.Now().Sub(startTime) Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) diff --git a/test/e2e/load.go b/test/e2e/load.go index 29ca0297960..482382f0b5d 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -120,7 +120,13 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int) // Once every 1-2 minutes perform resize of RC. for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) { if !rcExist { - expectNoError(RunRC(c, name, ns, image, size, nil), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) + config := RCConfig{Client: c, + Name: name, + Namespace: ns, + Image: image, + Replicas: size, + } + expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) rcExist = true } // Resize RC to a random size between 0.5x and 1.5x of the original size. diff --git a/test/e2e/scale.go b/test/e2e/scale.go index d7e43dddc3c..52f1496b6fb 100644 --- a/test/e2e/scale.go +++ b/test/e2e/scale.go @@ -107,7 +107,15 @@ var _ = Describe("Scale", func() { for i := 0; i < itArg.rcsPerThread; i++ { name := "my-short-lived-pod" + string(util.NewUUID()) n := itArg.podsPerMinion * minionCount - expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n, nil)) + + config := RCConfig{Client: c, + Name: name, + Namespace: ns, + Image: "gcr.io/google_containers/pause:go", + Replicas: n, + } + + expectNoError(RunRC(config)) podsLaunched += n Logf("Launched %v pods so far...", podsLaunched) err := DeleteRC(c, ns, name) diff --git a/test/e2e/util.go b/test/e2e/util.go index 67b1cfe051e..fca94ce5c53 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -75,6 +75,16 @@ type ContainerFailures struct { restarts int } +type RCConfig struct { + Client *client.Client + Image string + Name string + Namespace string + PollInterval int + PodStatusFile *os.File + Replicas int +} + func Logf(format string, a ...interface{}) { fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) } @@ -475,8 +485,14 @@ func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff { // It will waits for all pods it spawns to become "Running". // It's the caller's responsibility to clean up externally (i.e. use the // namespace lifecycle for handling cleanup). -func RunRC(c *client.Client, name string, ns, image string, replicas int, podStatusFile *os.File) error { +func RunRC(config RCConfig) error { var last int + c := config.Client + name := config.Name + ns := config.Namespace + image := config.Image + replicas := config.Replicas + interval := config.PollInterval maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) current := 0 @@ -518,7 +534,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta // Create a routine to query for the list of pods stop := make(chan struct{}) - go func(stop <-chan struct{}, n string, ns string, l labels.Selector) { + go func(stop <-chan struct{}, n string, ns string, l labels.Selector, i int) { for { select { case <-stop: @@ -530,16 +546,19 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta } else { podLists.Push(p.Items) } - time.Sleep(1 * time.Second) + time.Sleep(time.Duration(i) * time.Second) } } - }(stop, name, ns, label) + }(stop, name, ns, label, interval) defer close(stop) By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) - failCount := 5 + failCount := int(25 / interval) for same < failCount && current < replicas { - time.Sleep(2 * time.Second) + time.Sleep(time.Duration(interval*2) * time.Second) + + // Greedily read all existing entries in the queue until + // all pods are found submitted or the queue is empty for podLists.Len() > 0 && current < replicas { item := podLists.Pop() pods := item.value.([]api.Pod) @@ -568,13 +587,16 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures)) same = 0 last = 0 - failCount = 10 + failCount = int(100 / interval) current = 0 var oldPods []api.Pod podLists.Reset() foundAllPods := false for same < failCount && current < replicas { - time.Sleep(2 * time.Second) + time.Sleep(time.Duration(interval*2) * time.Second) + + // Greedily read all existing entries in the queue until + // either all pods are running or the queue is empty for podLists.Len() > 0 && current < replicas { item := podLists.Pop() current = 0 @@ -603,8 +625,8 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta } } Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) - if podStatusFile != nil { - fmt.Fprintf(podStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown) + if config.PodStatusFile != nil { + fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown) } if foundAllPods && len(currentPods) != len(oldPods) { @@ -652,7 +674,6 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int, podSta } } } - close(stop) if current != replicas { return fmt.Errorf("Only %d pods started out of %d", current, replicas) } From 3191b26bc6ca241fba85d0b183868f109e078213 Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Tue, 19 May 2015 18:40:21 -0400 Subject: [PATCH 9/9] Only sleep 1.1*interval. #7572 --- test/e2e/util.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index fca94ce5c53..77dcd8d6a34 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -555,7 +555,7 @@ func RunRC(config RCConfig) error { By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) failCount := int(25 / interval) for same < failCount && current < replicas { - time.Sleep(time.Duration(interval*2) * time.Second) + time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) // Greedily read all existing entries in the queue until // all pods are found submitted or the queue is empty @@ -593,7 +593,7 @@ func RunRC(config RCConfig) error { podLists.Reset() foundAllPods := false for same < failCount && current < replicas { - time.Sleep(time.Duration(interval*2) * time.Second) + time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) // Greedily read all existing entries in the queue until // either all pods are running or the queue is empty