Merge pull request #7573 from rrati/performance-gathering-7572

Added metrics/debug gathering methods to utils and used them in density ...
This commit is contained in:
Dawn Chen 2015-05-22 10:17:15 -07:00
commit 7c80f3d985
5 changed files with 329 additions and 104 deletions

View File

@ -19,6 +19,7 @@ package e2e
import (
"fmt"
"math"
"os"
"strconv"
"time"
@ -45,6 +46,7 @@ var _ = Describe("Density", func() {
var minionCount int
var RCName string
var ns string
var uuid string
BeforeEach(func() {
var err error
@ -57,6 +59,9 @@ var _ = Describe("Density", func() {
nsForTesting, err := createTestingNS("density", c)
ns = nsForTesting.Name
expectNoError(err)
uuid = string(util.NewUUID())
expectNoError(os.Mkdir(uuid, 0777))
expectNoError(writePerfData(c, uuid, "before"))
})
AfterEach(func() {
@ -76,6 +81,8 @@ var _ = Describe("Density", func() {
Failf("Couldn't delete ns %s", err)
}
expectNoError(writePerfData(c, uuid, "after"))
// Verify latency metrics
// TODO: Update threshold to 1s once we reach this goal
// TODO: We should reset metrics before the test. Currently previous tests influence latency metrics.
@ -89,16 +96,18 @@ var _ = Describe("Density", func() {
type Density struct {
skip bool
podsPerMinion int
/* Controls how often the apiserver is polled for pods */
interval int
}
densityTests := []Density{
// This test should always run, even if larger densities are skipped.
{podsPerMinion: 3, skip: false},
{podsPerMinion: 30, skip: false},
{podsPerMinion: 3, skip: false, interval: 10},
{podsPerMinion: 30, skip: false, interval: 10},
// More than 30 pods per node is outside our v1.0 goals.
// We might want to enable those tests in the future.
{podsPerMinion: 50, skip: true},
{podsPerMinion: 100, skip: true},
{podsPerMinion: 50, skip: true, interval: 10},
{podsPerMinion: 100, skip: true, interval: 1},
}
for _, testArg := range densityTests {
@ -112,8 +121,19 @@ var _ = Describe("Density", func() {
itArg := testArg
It(name, func() {
totalPods := itArg.podsPerMinion * minionCount
nameStr := strconv.Itoa(totalPods) + "-" + string(util.NewUUID())
RCName = "my-hostname-density" + nameStr
RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid
fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.csv", uuid))
expectNoError(err)
defer fileHndl.Close()
config := RCConfig{Client: c,
Image: "gcr.io/google_containers/pause:go",
Name: RCName,
Namespace: ns,
PollInterval: itArg.interval,
PodStatusFile: fileHndl,
Replicas: totalPods,
}
// Create a listener for events.
events := make([](*api.Event), 0)
@ -139,7 +159,7 @@ var _ = Describe("Density", func() {
// Start the replication controller.
startTime := time.Now()
expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods))
expectNoError(RunRC(config))
e2eStartupTime := time.Now().Sub(startTime)
Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime)

75
test/e2e/fifo_queue.go Normal file
View File

@ -0,0 +1,75 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"sync"
"time"
)
type QueueItem struct {
createTime string
value interface{}
}
type QueueItems struct {
pos int
mutex *sync.Mutex
list []QueueItem
}
type FifoQueue QueueItems
func (fq *FifoQueue) Push(elem interface{}) {
fq.mutex.Lock()
fq.list = append(fq.list, QueueItem{time.Now().String(), elem})
fq.mutex.Unlock()
}
func (fq *FifoQueue) Pop() QueueItem {
fq.mutex.Lock()
var val QueueItem
if len(fq.list)-1 >= fq.pos {
val = fq.list[fq.pos]
fq.pos++
}
fq.mutex.Unlock()
return val
}
func (fq FifoQueue) Len() int {
return len(fq.list[fq.pos:])
}
func (fq *FifoQueue) First() QueueItem {
return fq.list[fq.pos]
}
func (fq *FifoQueue) Last() QueueItem {
return fq.list[len(fq.list)-1]
}
func (fq *FifoQueue) Reset() {
fq.pos = 0
}
func newFifoQueue() *FifoQueue {
tmp := new(FifoQueue)
tmp.mutex = &sync.Mutex{}
tmp.pos = 0
return tmp
}

View File

@ -120,7 +120,13 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
// Once every 1-2 minutes perform resize of RC.
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
if !rcExist {
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: image,
Replicas: size,
}
expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true
}
// Resize RC to a random size between 0.5x and 1.5x of the original size.

View File

@ -107,7 +107,15 @@ var _ = Describe("Scale", func() {
for i := 0; i < itArg.rcsPerThread; i++ {
name := "my-short-lived-pod" + string(util.NewUUID())
n := itArg.podsPerMinion * minionCount
expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n))
config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: "gcr.io/google_containers/pause:go",
Replicas: n,
}
expectNoError(RunRC(config))
podsLaunched += n
Logf("Launched %v pods so far...", podsLaunched)
err := DeleteRC(c, ns, name)

View File

@ -22,6 +22,7 @@ import (
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"os/exec"
"path/filepath"
@ -86,6 +87,16 @@ type ContainerFailures struct {
restarts int
}
type RCConfig struct {
Client *client.Client
Image string
Name string
Namespace string
PollInterval int
PodStatusFile *os.File
Replicas int
}
func Logf(format string, a ...interface{}) {
fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...)
}
@ -585,16 +596,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) {
}
// Diff computes a PodDiff given 2 lists of pods.
func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff {
func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff {
podInfoMap := PodDiff{}
// New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
for _, pod := range curPods.Items {
for _, pod := range curPods {
podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.Host, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
}
// Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
for _, pod := range oldPods.Items {
for _, pod := range oldPods {
if info, ok := podInfoMap[pod.Name]; ok {
info.oldHostname, info.oldPhase = pod.Spec.Host, string(pod.Status.Phase)
} else {
@ -608,12 +619,20 @@ func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff {
// It will waits for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup).
func RunRC(c *client.Client, name string, ns, image string, replicas int) error {
func RunRC(config RCConfig) error {
var last int
c := config.Client
name := config.Name
ns := config.Namespace
image := config.Image
replicas := config.Replicas
interval := config.PollInterval
maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01))
current := 0
same := 0
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
podLists := newFifoQueue()
By(fmt.Sprintf("Creating replication controller %s", name))
rc := &api.ReplicationController{
@ -647,15 +666,37 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
}
Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := listPods(c, ns, label, fields.Everything())
// Create a routine to query for the list of pods
stop := make(chan struct{})
go func(stop <-chan struct{}, n string, ns string, l labels.Selector, i int) {
for {
select {
case <-stop:
return
default:
p, err := c.Pods(ns).List(l, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
Logf("Warning: Failed to get pod list: %v", err)
} else {
podLists.Push(p.Items)
}
current = len(pods.Items)
failCount := 5
time.Sleep(time.Duration(i) * time.Second)
}
}
}(stop, name, ns, label, interval)
defer close(stop)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
failCount := int(25 / interval)
for same < failCount && current < replicas {
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
// Greedily read all existing entries in the queue until
// all pods are found submitted or the queue is empty
for podLists.Len() > 0 && current < replicas {
item := podLists.Pop()
pods := item.value.([]api.Pod)
current = len(pods)
Logf("Controller %s: Found %d pods out of %d", name, current, replicas)
if last < current {
same = 0
@ -670,12 +711,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
}
last = current
time.Sleep(5 * time.Second)
pods, err = listPods(c, ns, label, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
}
current = len(pods.Items)
}
if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
@ -685,25 +721,26 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures))
same = 0
last = 0
failCount = 10
failCount = int(100 / interval)
current = 0
oldPods := &api.PodList{}
var oldPods []api.Pod
podLists.Reset()
foundAllPods := false
for same < failCount && current < replicas {
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
// Greedily read all existing entries in the queue until
// either all pods are running or the queue is empty
for podLists.Len() > 0 && current < replicas {
item := podLists.Pop()
current = 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
time.Sleep(10 * time.Second)
// TODO: Use a reflector both to put less strain on the cluster and
// for more clarity.
currentPods, err := listPods(c, ns, label, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
}
for _, p := range currentPods.Items {
currentPods := item.value.([]api.Pod)
for _, p := range currentPods {
if p.Status.Phase == api.PodRunning {
current++
for _, v := range FailedContainers(p) {
@ -722,15 +759,18 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
}
}
Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
if config.PodStatusFile != nil {
fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown)
}
if len(currentPods.Items) != len(pods.Items) {
if foundAllPods && len(currentPods) != len(oldPods) {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items))
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods))
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
@ -753,9 +793,13 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
// stuck in pending.
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount)
Logf("%v, pods currently in pending:", errorStr)
Diff(currentPods, &api.PodList{}).Print(util.NewStringSet(string(api.PodRunning)))
Diff(currentPods, make([]api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning)))
return fmt.Errorf(errorStr)
}
if !foundAllPods {
foundAllPods = len(currentPods) == replicas
}
last = current
oldPods = currentPods
@ -763,6 +807,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
}
}
}
if current != replicas {
return fmt.Errorf("Only %d pods started out of %d", current, replicas)
}
@ -1014,7 +1059,7 @@ type LatencyMetric struct {
}
func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
body, err := getMetrics(c)
if err != nil {
return nil, err
}
@ -1070,3 +1115,74 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou
return len(badMetrics), nil
}
// Retrieve metrics information
func getMetrics(c *client.Client) (string, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
// Retrieve debug information
func getDebugInfo(c *client.Client) (map[string]string, error) {
data := make(map[string]string)
for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} {
resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2")
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
Logf("Warning: Error trying to fetch %s debug data: %v", key, err)
}
data[key] = string(body)
}
return data, nil
}
func writePerfData(c *client.Client, dirName string, postfix string) error {
fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)
handler, err := os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
metrics, err := getMetrics(c)
if err != nil {
return fmt.Errorf("Error retrieving metrics: %v", err)
}
_, err = handler.WriteString(metrics)
if err != nil {
return fmt.Errorf("Error writing metrics: %v", err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
debug, err := getDebugInfo(c)
if err != nil {
return fmt.Errorf("Error retrieving debug information: %v", err)
}
for key, value := range debug {
fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)
handler, err = os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
_, err = handler.WriteString(value)
if err != nil {
return fmt.Errorf("Error writing %s: %v", key, err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
}
return nil
}