diff --git a/test/e2e/density.go b/test/e2e/density.go index 6a5fe3e67b8..90c02056977 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -19,6 +19,7 @@ package e2e import ( "fmt" "math" + "os" "strconv" "time" @@ -45,6 +46,7 @@ var _ = Describe("Density", func() { var minionCount int var RCName string var ns string + var uuid string BeforeEach(func() { var err error @@ -57,6 +59,9 @@ var _ = Describe("Density", func() { nsForTesting, err := createTestingNS("density", c) ns = nsForTesting.Name expectNoError(err) + uuid = string(util.NewUUID()) + expectNoError(os.Mkdir(uuid, 0777)) + expectNoError(writePerfData(c, uuid, "before")) }) AfterEach(func() { @@ -76,6 +81,8 @@ var _ = Describe("Density", func() { Failf("Couldn't delete ns %s", err) } + expectNoError(writePerfData(c, uuid, "after")) + // Verify latency metrics // TODO: Update threshold to 1s once we reach this goal // TODO: We should reset metrics before the test. Currently previous tests influence latency metrics. @@ -89,16 +96,18 @@ var _ = Describe("Density", func() { type Density struct { skip bool podsPerMinion int + /* Controls how often the apiserver is polled for pods */ + interval int } densityTests := []Density{ // This test should always run, even if larger densities are skipped. - {podsPerMinion: 3, skip: false}, - {podsPerMinion: 30, skip: false}, + {podsPerMinion: 3, skip: false, interval: 10}, + {podsPerMinion: 30, skip: false, interval: 10}, // More than 30 pods per node is outside our v1.0 goals. // We might want to enable those tests in the future. - {podsPerMinion: 50, skip: true}, - {podsPerMinion: 100, skip: true}, + {podsPerMinion: 50, skip: true, interval: 10}, + {podsPerMinion: 100, skip: true, interval: 1}, } for _, testArg := range densityTests { @@ -112,8 +121,19 @@ var _ = Describe("Density", func() { itArg := testArg It(name, func() { totalPods := itArg.podsPerMinion * minionCount - nameStr := strconv.Itoa(totalPods) + "-" + string(util.NewUUID()) - RCName = "my-hostname-density" + nameStr + RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid + fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.csv", uuid)) + expectNoError(err) + defer fileHndl.Close() + + config := RCConfig{Client: c, + Image: "gcr.io/google_containers/pause:go", + Name: RCName, + Namespace: ns, + PollInterval: itArg.interval, + PodStatusFile: fileHndl, + Replicas: totalPods, + } // Create a listener for events. events := make([](*api.Event), 0) @@ -139,7 +159,7 @@ var _ = Describe("Density", func() { // Start the replication controller. startTime := time.Now() - expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods)) + expectNoError(RunRC(config)) e2eStartupTime := time.Now().Sub(startTime) Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) diff --git a/test/e2e/fifo_queue.go b/test/e2e/fifo_queue.go new file mode 100644 index 00000000000..0942c37df72 --- /dev/null +++ b/test/e2e/fifo_queue.go @@ -0,0 +1,75 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "sync" + "time" +) + +type QueueItem struct { + createTime string + value interface{} +} + +type QueueItems struct { + pos int + mutex *sync.Mutex + list []QueueItem +} + +type FifoQueue QueueItems + +func (fq *FifoQueue) Push(elem interface{}) { + fq.mutex.Lock() + fq.list = append(fq.list, QueueItem{time.Now().String(), elem}) + fq.mutex.Unlock() +} + +func (fq *FifoQueue) Pop() QueueItem { + fq.mutex.Lock() + var val QueueItem + if len(fq.list)-1 >= fq.pos { + val = fq.list[fq.pos] + fq.pos++ + } + fq.mutex.Unlock() + return val +} + +func (fq FifoQueue) Len() int { + return len(fq.list[fq.pos:]) +} + +func (fq *FifoQueue) First() QueueItem { + return fq.list[fq.pos] +} + +func (fq *FifoQueue) Last() QueueItem { + return fq.list[len(fq.list)-1] +} + +func (fq *FifoQueue) Reset() { + fq.pos = 0 +} + +func newFifoQueue() *FifoQueue { + tmp := new(FifoQueue) + tmp.mutex = &sync.Mutex{} + tmp.pos = 0 + return tmp +} diff --git a/test/e2e/load.go b/test/e2e/load.go index 8e3dad52431..482382f0b5d 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -120,7 +120,13 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int) // Once every 1-2 minutes perform resize of RC. for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) { if !rcExist { - expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) + config := RCConfig{Client: c, + Name: name, + Namespace: ns, + Image: image, + Replicas: size, + } + expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) rcExist = true } // Resize RC to a random size between 0.5x and 1.5x of the original size. diff --git a/test/e2e/scale.go b/test/e2e/scale.go index a787c28f731..52f1496b6fb 100644 --- a/test/e2e/scale.go +++ b/test/e2e/scale.go @@ -107,7 +107,15 @@ var _ = Describe("Scale", func() { for i := 0; i < itArg.rcsPerThread; i++ { name := "my-short-lived-pod" + string(util.NewUUID()) n := itArg.podsPerMinion * minionCount - expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n)) + + config := RCConfig{Client: c, + Name: name, + Namespace: ns, + Image: "gcr.io/google_containers/pause:go", + Replicas: n, + } + + expectNoError(RunRC(config)) podsLaunched += n Logf("Launched %v pods so far...", podsLaunched) err := DeleteRC(c, ns, name) diff --git a/test/e2e/util.go b/test/e2e/util.go index d45dbaf0eb3..ea3386422e7 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "math" "math/rand" + "net/http" "os" "os/exec" "path/filepath" @@ -86,6 +87,16 @@ type ContainerFailures struct { restarts int } +type RCConfig struct { + Client *client.Client + Image string + Name string + Namespace string + PollInterval int + PodStatusFile *os.File + Replicas int +} + func Logf(format string, a ...interface{}) { fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) } @@ -585,16 +596,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) { } // Diff computes a PodDiff given 2 lists of pods. -func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff { +func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff { podInfoMap := PodDiff{} // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist. - for _, pod := range curPods.Items { + for _, pod := range curPods { podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.Host, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist} } // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist. - for _, pod := range oldPods.Items { + for _, pod := range oldPods { if info, ok := podInfoMap[pod.Name]; ok { info.oldHostname, info.oldPhase = pod.Spec.Host, string(pod.Status.Phase) } else { @@ -608,12 +619,20 @@ func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff { // It will waits for all pods it spawns to become "Running". // It's the caller's responsibility to clean up externally (i.e. use the // namespace lifecycle for handling cleanup). -func RunRC(c *client.Client, name string, ns, image string, replicas int) error { +func RunRC(config RCConfig) error { var last int + c := config.Client + name := config.Name + ns := config.Namespace + image := config.Image + replicas := config.Replicas + interval := config.PollInterval maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) current := 0 same := 0 + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) + podLists := newFifoQueue() By(fmt.Sprintf("Creating replication controller %s", name)) rc := &api.ReplicationController{ @@ -647,35 +666,52 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error } Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas) + // Create a routine to query for the list of pods + stop := make(chan struct{}) + go func(stop <-chan struct{}, n string, ns string, l labels.Selector, i int) { + for { + select { + case <-stop: + return + default: + p, err := c.Pods(ns).List(l, fields.Everything()) + if err != nil { + Logf("Warning: Failed to get pod list: %v", err) + } else { + podLists.Push(p.Items) + } + time.Sleep(time.Duration(i) * time.Second) + } + } + }(stop, name, ns, label, interval) + defer close(stop) + By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) - label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) - pods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - current = len(pods.Items) - failCount := 5 + failCount := int(25 / interval) for same < failCount && current < replicas { - Logf("Controller %s: Found %d pods out of %d", name, current, replicas) - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) - } + time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) - if same >= failCount { - return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) - } + // Greedily read all existing entries in the queue until + // all pods are found submitted or the queue is empty + for podLists.Len() > 0 && current < replicas { + item := podLists.Pop() + pods := item.value.([]api.Pod) + current = len(pods) + Logf("Controller %s: Found %d pods out of %d", name, current, replicas) + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { + return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) + } - last = current - time.Sleep(5 * time.Second) - pods, err = listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) + if same >= failCount { + return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) + } + + last = current } - current = len(pods.Items) } if current != replicas { return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) @@ -685,82 +721,91 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures)) same = 0 last = 0 - failCount = 10 + failCount = int(100 / interval) current = 0 - oldPods := &api.PodList{} + var oldPods []api.Pod + podLists.Reset() + foundAllPods := false for same < failCount && current < replicas { - current = 0 - waiting := 0 - pending := 0 - unknown := 0 - inactive := 0 - failedContainers := 0 - time.Sleep(10 * time.Second) + time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) - // TODO: Use a reflector both to put less strain on the cluster and - // for more clarity. - currentPods, err := listPods(c, ns, label, fields.Everything()) - if err != nil { - return fmt.Errorf("Error listing pods: %v", err) - } - for _, p := range currentPods.Items { - if p.Status.Phase == api.PodRunning { - current++ - for _, v := range FailedContainers(p) { - failedContainers = failedContainers + v.restarts + // Greedily read all existing entries in the queue until + // either all pods are running or the queue is empty + for podLists.Len() > 0 && current < replicas { + item := podLists.Pop() + current = 0 + waiting := 0 + pending := 0 + unknown := 0 + inactive := 0 + failedContainers := 0 + currentPods := item.value.([]api.Pod) + for _, p := range currentPods { + if p.Status.Phase == api.PodRunning { + current++ + for _, v := range FailedContainers(p) { + failedContainers = failedContainers + v.restarts + } + } else if p.Status.Phase == api.PodPending { + if p.Spec.Host == "" { + waiting++ + } else { + pending++ + } + } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { + inactive++ + } else if p.Status.Phase == api.PodUnknown { + unknown++ } - } else if p.Status.Phase == api.PodPending { - if p.Spec.Host == "" { - waiting++ - } else { - pending++ - } - } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { - inactive++ - } else if p.Status.Phase == api.PodUnknown { - unknown++ } - } - Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) + Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) + if config.PodStatusFile != nil { + fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown) + } - if len(currentPods.Items) != len(pods.Items) { + if foundAllPods && len(currentPods) != len(oldPods) { - // This failure mode includes: - // kubelet is dead, so node controller deleted pods and rc creates more - // - diagnose by noting the pod diff below. - // pod is unhealthy, so replication controller creates another to take its place - // - diagnose by comparing the previous "2 Pod states" lines for inactive pods - errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items)) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { + // This failure mode includes: + // kubelet is dead, so node controller deleted pods and rc creates more + // - diagnose by noting the pod diff below. + // pod is unhealthy, so replication controller creates another to take its place + // - diagnose by comparing the previous "2 Pod states" lines for inactive pods + errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods)) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { - // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. - errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if same >= failCount { + // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. + errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if same >= failCount { - // Most times this happens because a few nodes have kubelet problems, and their pods are - // stuck in pending. - errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) - Logf("%v, pods currently in pending:", errorStr) - Diff(currentPods, &api.PodList{}).Print(util.NewStringSet(string(api.PodRunning))) - return fmt.Errorf(errorStr) - } - last = current - oldPods = currentPods + // Most times this happens because a few nodes have kubelet problems, and their pods are + // stuck in pending. + errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) + Logf("%v, pods currently in pending:", errorStr) + Diff(currentPods, make([]api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) + return fmt.Errorf(errorStr) + } - if failedContainers > maxContainerFailures { - return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) + if !foundAllPods { + foundAllPods = len(currentPods) == replicas + } + last = current + oldPods = currentPods + + if failedContainers > maxContainerFailures { + return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) + } } } if current != replicas { @@ -1014,7 +1059,7 @@ type LatencyMetric struct { } func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { - body, err := c.Get().AbsPath("/metrics").DoRaw() + body, err := getMetrics(c) if err != nil { return nil, err } @@ -1070,3 +1115,74 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou return len(badMetrics), nil } + +// Retrieve metrics information +func getMetrics(c *client.Client) (string, error) { + body, err := c.Get().AbsPath("/metrics").DoRaw() + if err != nil { + return "", err + } + return string(body), nil +} + +// Retrieve debug information +func getDebugInfo(c *client.Client) (map[string]string, error) { + data := make(map[string]string) + for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} { + resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2") + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + Logf("Warning: Error trying to fetch %s debug data: %v", key, err) + } + data[key] = string(body) + } + return data, nil +} + +func writePerfData(c *client.Client, dirName string, postfix string) error { + fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix) + + handler, err := os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + + metrics, err := getMetrics(c) + if err != nil { + return fmt.Errorf("Error retrieving metrics: %v", err) + } + + _, err = handler.WriteString(metrics) + if err != nil { + return fmt.Errorf("Error writing metrics: %v", err) + } + + err = handler.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + + debug, err := getDebugInfo(c) + if err != nil { + return fmt.Errorf("Error retrieving debug information: %v", err) + } + + for key, value := range debug { + fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix) + handler, err = os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + _, err = handler.WriteString(value) + if err != nil { + return fmt.Errorf("Error writing %s: %v", key, err) + } + + err = handler.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + } + return nil +}