Revert "Added metrics/debug gathering methods to utils and used them in density ..."

This commit is contained in:
Filip Grzadkowski 2015-05-22 13:00:46 -07:00
parent a823f6d1d5
commit 70500a64a7
5 changed files with 106 additions and 331 deletions

View File

@ -19,7 +19,6 @@ package e2e
import ( import (
"fmt" "fmt"
"math" "math"
"os"
"strconv" "strconv"
"time" "time"
@ -46,7 +45,6 @@ var _ = Describe("Density", func() {
var minionCount int var minionCount int
var RCName string var RCName string
var ns string var ns string
var uuid string
BeforeEach(func() { BeforeEach(func() {
var err error var err error
@ -59,9 +57,6 @@ var _ = Describe("Density", func() {
nsForTesting, err := createTestingNS("density", c) nsForTesting, err := createTestingNS("density", c)
ns = nsForTesting.Name ns = nsForTesting.Name
expectNoError(err) expectNoError(err)
uuid = string(util.NewUUID())
expectNoError(os.Mkdir(uuid, 0777))
expectNoError(writePerfData(c, uuid, "before"))
}) })
AfterEach(func() { AfterEach(func() {
@ -81,8 +76,6 @@ var _ = Describe("Density", func() {
Failf("Couldn't delete ns %s", err) Failf("Couldn't delete ns %s", err)
} }
expectNoError(writePerfData(c, uuid, "after"))
// Verify latency metrics // Verify latency metrics
// TODO: Update threshold to 1s once we reach this goal // TODO: Update threshold to 1s once we reach this goal
// TODO: We should reset metrics before the test. Currently previous tests influence latency metrics. // TODO: We should reset metrics before the test. Currently previous tests influence latency metrics.
@ -96,18 +89,16 @@ var _ = Describe("Density", func() {
type Density struct { type Density struct {
skip bool skip bool
podsPerMinion int podsPerMinion int
/* Controls how often the apiserver is polled for pods */
interval int
} }
densityTests := []Density{ densityTests := []Density{
// This test should always run, even if larger densities are skipped. // This test should always run, even if larger densities are skipped.
{podsPerMinion: 3, skip: false, interval: 10}, {podsPerMinion: 3, skip: false},
{podsPerMinion: 30, skip: false, interval: 10}, {podsPerMinion: 30, skip: false},
// More than 30 pods per node is outside our v1.0 goals. // More than 30 pods per node is outside our v1.0 goals.
// We might want to enable those tests in the future. // We might want to enable those tests in the future.
{podsPerMinion: 50, skip: true, interval: 10}, {podsPerMinion: 50, skip: true},
{podsPerMinion: 100, skip: true, interval: 1}, {podsPerMinion: 100, skip: true},
} }
for _, testArg := range densityTests { for _, testArg := range densityTests {
@ -121,19 +112,8 @@ var _ = Describe("Density", func() {
itArg := testArg itArg := testArg
It(name, func() { It(name, func() {
totalPods := itArg.podsPerMinion * minionCount totalPods := itArg.podsPerMinion * minionCount
RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid nameStr := strconv.Itoa(totalPods) + "-" + string(util.NewUUID())
fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.csv", uuid)) RCName = "my-hostname-density" + nameStr
expectNoError(err)
defer fileHndl.Close()
config := RCConfig{Client: c,
Image: "gcr.io/google_containers/pause:go",
Name: RCName,
Namespace: ns,
PollInterval: itArg.interval,
PodStatusFile: fileHndl,
Replicas: totalPods,
}
// Create a listener for events. // Create a listener for events.
events := make([](*api.Event), 0) events := make([](*api.Event), 0)
@ -159,7 +139,7 @@ var _ = Describe("Density", func() {
// Start the replication controller. // Start the replication controller.
startTime := time.Now() startTime := time.Now()
expectNoError(RunRC(config)) expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods))
e2eStartupTime := time.Now().Sub(startTime) e2eStartupTime := time.Now().Sub(startTime)
Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime)

View File

@ -1,75 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"sync"
"time"
)
type QueueItem struct {
createTime string
value interface{}
}
type QueueItems struct {
pos int
mutex *sync.Mutex
list []QueueItem
}
type FifoQueue QueueItems
func (fq *FifoQueue) Push(elem interface{}) {
fq.mutex.Lock()
fq.list = append(fq.list, QueueItem{time.Now().String(), elem})
fq.mutex.Unlock()
}
func (fq *FifoQueue) Pop() QueueItem {
fq.mutex.Lock()
var val QueueItem
if len(fq.list)-1 >= fq.pos {
val = fq.list[fq.pos]
fq.pos++
}
fq.mutex.Unlock()
return val
}
func (fq FifoQueue) Len() int {
return len(fq.list[fq.pos:])
}
func (fq *FifoQueue) First() QueueItem {
return fq.list[fq.pos]
}
func (fq *FifoQueue) Last() QueueItem {
return fq.list[len(fq.list)-1]
}
func (fq *FifoQueue) Reset() {
fq.pos = 0
}
func newFifoQueue() *FifoQueue {
tmp := new(FifoQueue)
tmp.mutex = &sync.Mutex{}
tmp.pos = 0
return tmp
}

View File

@ -120,13 +120,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
// Once every 1-2 minutes perform resize of RC. // Once every 1-2 minutes perform resize of RC.
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) { for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
if !rcExist { if !rcExist {
config := RCConfig{Client: c, expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
Name: name,
Namespace: ns,
Image: image,
Replicas: size,
}
expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true rcExist = true
} }
// Resize RC to a random size between 0.5x and 1.5x of the original size. // Resize RC to a random size between 0.5x and 1.5x of the original size.

View File

@ -107,15 +107,7 @@ var _ = Describe("Scale", func() {
for i := 0; i < itArg.rcsPerThread; i++ { for i := 0; i < itArg.rcsPerThread; i++ {
name := "my-short-lived-pod" + string(util.NewUUID()) name := "my-short-lived-pod" + string(util.NewUUID())
n := itArg.podsPerMinion * minionCount n := itArg.podsPerMinion * minionCount
expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n))
config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: "gcr.io/google_containers/pause:go",
Replicas: n,
}
expectNoError(RunRC(config))
podsLaunched += n podsLaunched += n
Logf("Launched %v pods so far...", podsLaunched) Logf("Launched %v pods so far...", podsLaunched)
err := DeleteRC(c, ns, name) err := DeleteRC(c, ns, name)

View File

@ -22,7 +22,6 @@ import (
"io/ioutil" "io/ioutil"
"math" "math"
"math/rand" "math/rand"
"net/http"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -87,16 +86,6 @@ type ContainerFailures struct {
restarts int restarts int
} }
type RCConfig struct {
Client *client.Client
Image string
Name string
Namespace string
PollInterval int
PodStatusFile *os.File
Replicas int
}
func Logf(format string, a ...interface{}) { func Logf(format string, a ...interface{}) {
fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...)
} }
@ -596,16 +585,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) {
} }
// Diff computes a PodDiff given 2 lists of pods. // Diff computes a PodDiff given 2 lists of pods.
func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff { func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff {
podInfoMap := PodDiff{} podInfoMap := PodDiff{}
// New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist. // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
for _, pod := range curPods { for _, pod := range curPods.Items {
podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.Host, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist} podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.Host, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
} }
// Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist. // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
for _, pod := range oldPods { for _, pod := range oldPods.Items {
if info, ok := podInfoMap[pod.Name]; ok { if info, ok := podInfoMap[pod.Name]; ok {
info.oldHostname, info.oldPhase = pod.Spec.Host, string(pod.Status.Phase) info.oldHostname, info.oldPhase = pod.Spec.Host, string(pod.Status.Phase)
} else { } else {
@ -619,20 +608,12 @@ func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff {
// It will waits for all pods it spawns to become "Running". // It will waits for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the // It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup). // namespace lifecycle for handling cleanup).
func RunRC(config RCConfig) error { func RunRC(c *client.Client, name string, ns, image string, replicas int) error {
var last int var last int
c := config.Client
name := config.Name
ns := config.Namespace
image := config.Image
replicas := config.Replicas
interval := config.PollInterval
maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01))
current := 0 current := 0
same := 0 same := 0
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
podLists := newFifoQueue()
By(fmt.Sprintf("Creating replication controller %s", name)) By(fmt.Sprintf("Creating replication controller %s", name))
rc := &api.ReplicationController{ rc := &api.ReplicationController{
@ -666,52 +647,35 @@ func RunRC(config RCConfig) error {
} }
Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas) Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas)
// Create a routine to query for the list of pods
stop := make(chan struct{})
go func(stop <-chan struct{}, n string, ns string, l labels.Selector, i int) {
for {
select {
case <-stop:
return
default:
p, err := c.Pods(ns).List(l, fields.Everything())
if err != nil {
Logf("Warning: Failed to get pod list: %v", err)
} else {
podLists.Push(p.Items)
}
time.Sleep(time.Duration(i) * time.Second)
}
}
}(stop, name, ns, label, interval)
defer close(stop)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
failCount := int(25 / interval) label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := listPods(c, ns, label, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
}
current = len(pods.Items)
failCount := 5
for same < failCount && current < replicas { for same < failCount && current < replicas {
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) Logf("Controller %s: Found %d pods out of %d", name, current, replicas)
if last < current {
// Greedily read all existing entries in the queue until same = 0
// all pods are found submitted or the queue is empty } else if last == current {
for podLists.Len() > 0 && current < replicas { same++
item := podLists.Pop() } else if current < last {
pods := item.value.([]api.Pod) return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current)
current = len(pods)
Logf("Controller %s: Found %d pods out of %d", name, current, replicas)
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current)
}
if same >= failCount {
return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount)
}
last = current
} }
if same >= failCount {
return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount)
}
last = current
time.Sleep(5 * time.Second)
pods, err = listPods(c, ns, label, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
}
current = len(pods.Items)
} }
if current != replicas { if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
@ -721,91 +685,82 @@ func RunRC(config RCConfig) error {
By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures)) By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures))
same = 0 same = 0
last = 0 last = 0
failCount = int(100 / interval) failCount = 10
current = 0 current = 0
var oldPods []api.Pod oldPods := &api.PodList{}
podLists.Reset()
foundAllPods := false
for same < failCount && current < replicas { for same < failCount && current < replicas {
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) current = 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
time.Sleep(10 * time.Second)
// Greedily read all existing entries in the queue until // TODO: Use a reflector both to put less strain on the cluster and
// either all pods are running or the queue is empty // for more clarity.
for podLists.Len() > 0 && current < replicas { currentPods, err := listPods(c, ns, label, fields.Everything())
item := podLists.Pop() if err != nil {
current = 0 return fmt.Errorf("Error listing pods: %v", err)
waiting := 0 }
pending := 0 for _, p := range currentPods.Items {
unknown := 0 if p.Status.Phase == api.PodRunning {
inactive := 0 current++
failedContainers := 0 for _, v := range FailedContainers(p) {
currentPods := item.value.([]api.Pod) failedContainers = failedContainers + v.restarts
for _, p := range currentPods {
if p.Status.Phase == api.PodRunning {
current++
for _, v := range FailedContainers(p) {
failedContainers = failedContainers + v.restarts
}
} else if p.Status.Phase == api.PodPending {
if p.Spec.Host == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
} }
} else if p.Status.Phase == api.PodPending {
if p.Spec.Host == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
} }
Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) }
if config.PodStatusFile != nil { Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown)
}
if foundAllPods && len(currentPods) != len(oldPods) { if len(currentPods.Items) != len(pods.Items) {
// This failure mode includes: // This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more // kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below. // - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place // pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods // - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods)) errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items))
Logf("%v, pods that changed since the last iteration:", errorStr) Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet()) Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr) return fmt.Errorf(errorStr)
} }
if last < current { if last < current {
same = 0 same = 0
} else if last == current { } else if last == current {
same++ same++
} else if current < last { } else if current < last {
// The pod failed or succeeded, or was somehow pushed out of running by the kubelet. // The pod failed or succeeded, or was somehow pushed out of running by the kubelet.
errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current)
Logf("%v, pods that changed since the last iteration:", errorStr) Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet()) Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr) return fmt.Errorf(errorStr)
} }
if same >= failCount { if same >= failCount {
// Most times this happens because a few nodes have kubelet problems, and their pods are // Most times this happens because a few nodes have kubelet problems, and their pods are
// stuck in pending. // stuck in pending.
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount)
Logf("%v, pods currently in pending:", errorStr) Logf("%v, pods currently in pending:", errorStr)
Diff(currentPods, make([]api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) Diff(currentPods, &api.PodList{}).Print(util.NewStringSet(string(api.PodRunning)))
return fmt.Errorf(errorStr) return fmt.Errorf(errorStr)
} }
last = current
oldPods = currentPods
if !foundAllPods { if failedContainers > maxContainerFailures {
foundAllPods = len(currentPods) == replicas return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
}
last = current
oldPods = currentPods
if failedContainers > maxContainerFailures {
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
}
} }
} }
if current != replicas { if current != replicas {
@ -1059,7 +1014,7 @@ type LatencyMetric struct {
} }
func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) {
body, err := getMetrics(c) body, err := c.Get().AbsPath("/metrics").DoRaw()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1115,74 +1070,3 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou
return len(badMetrics), nil return len(badMetrics), nil
} }
// Retrieve metrics information
func getMetrics(c *client.Client) (string, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
// Retrieve debug information
func getDebugInfo(c *client.Client) (map[string]string, error) {
data := make(map[string]string)
for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} {
resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2")
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
Logf("Warning: Error trying to fetch %s debug data: %v", key, err)
}
data[key] = string(body)
}
return data, nil
}
func writePerfData(c *client.Client, dirName string, postfix string) error {
fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)
handler, err := os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
metrics, err := getMetrics(c)
if err != nil {
return fmt.Errorf("Error retrieving metrics: %v", err)
}
_, err = handler.WriteString(metrics)
if err != nil {
return fmt.Errorf("Error writing metrics: %v", err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
debug, err := getDebugInfo(c)
if err != nil {
return fmt.Errorf("Error retrieving debug information: %v", err)
}
for key, value := range debug {
fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)
handler, err = os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
_, err = handler.WriteString(value)
if err != nil {
return fmt.Errorf("Error writing %s: %v", key, err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
}
return nil
}