Revert "Revert "Added metrics/debug gathering methods to utils and used them in density ...""

This reverts commit 70500a64a7.
This commit is contained in:
Robert Rati 2015-05-26 10:24:46 -04:00
parent 1845ca88fc
commit 13b8d947fc
5 changed files with 333 additions and 95 deletions

View File

@ -19,6 +19,7 @@ package e2e
import ( import (
"fmt" "fmt"
"math" "math"
"os"
"strconv" "strconv"
"time" "time"
@ -45,6 +46,7 @@ var _ = Describe("Density", func() {
var minionCount int var minionCount int
var RCName string var RCName string
var ns string var ns string
var uuid string
BeforeEach(func() { BeforeEach(func() {
var err error var err error
@ -57,6 +59,9 @@ var _ = Describe("Density", func() {
nsForTesting, err := createTestingNS("density", c) nsForTesting, err := createTestingNS("density", c)
ns = nsForTesting.Name ns = nsForTesting.Name
expectNoError(err) expectNoError(err)
uuid = string(util.NewUUID())
expectNoError(os.Mkdir(uuid, 0777))
expectNoError(writePerfData(c, uuid, "before"))
}) })
AfterEach(func() { AfterEach(func() {
@ -76,6 +81,8 @@ var _ = Describe("Density", func() {
Failf("Couldn't delete ns %s", err) Failf("Couldn't delete ns %s", err)
} }
expectNoError(writePerfData(c, uuid, "after"))
// Verify latency metrics // Verify latency metrics
// TODO: Update threshold to 1s once we reach this goal // TODO: Update threshold to 1s once we reach this goal
// TODO: We should reset metrics before the test. Currently previous tests influence latency metrics. // TODO: We should reset metrics before the test. Currently previous tests influence latency metrics.
@ -89,16 +96,18 @@ var _ = Describe("Density", func() {
type Density struct { type Density struct {
skip bool skip bool
podsPerMinion int podsPerMinion int
/* Controls how often the apiserver is polled for pods */
interval int
} }
densityTests := []Density{ densityTests := []Density{
// This test should always run, even if larger densities are skipped. // This test should always run, even if larger densities are skipped.
{podsPerMinion: 3, skip: false}, {podsPerMinion: 3, skip: false, interval: 10},
{podsPerMinion: 30, skip: false}, {podsPerMinion: 30, skip: false, interval: 10},
// More than 30 pods per node is outside our v1.0 goals. // More than 30 pods per node is outside our v1.0 goals.
// We might want to enable those tests in the future. // We might want to enable those tests in the future.
{podsPerMinion: 50, skip: true}, {podsPerMinion: 50, skip: true, interval: 10},
{podsPerMinion: 100, skip: true}, {podsPerMinion: 100, skip: true, interval: 1},
} }
for _, testArg := range densityTests { for _, testArg := range densityTests {
@ -112,8 +121,19 @@ var _ = Describe("Density", func() {
itArg := testArg itArg := testArg
It(name, func() { It(name, func() {
totalPods := itArg.podsPerMinion * minionCount totalPods := itArg.podsPerMinion * minionCount
nameStr := strconv.Itoa(totalPods) + "-" + string(util.NewUUID()) RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid
RCName = "my-hostname-density" + nameStr fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.csv", uuid))
expectNoError(err)
defer fileHndl.Close()
config := RCConfig{Client: c,
Image: "gcr.io/google_containers/pause:go",
Name: RCName,
Namespace: ns,
PollInterval: itArg.interval,
PodStatusFile: fileHndl,
Replicas: totalPods,
}
// Create a listener for events. // Create a listener for events.
events := make([](*api.Event), 0) events := make([](*api.Event), 0)
@ -139,7 +159,7 @@ var _ = Describe("Density", func() {
// Start the replication controller. // Start the replication controller.
startTime := time.Now() startTime := time.Now()
expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods)) expectNoError(RunRC(config))
e2eStartupTime := time.Now().Sub(startTime) e2eStartupTime := time.Now().Sub(startTime)
Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime)

75
test/e2e/fifo_queue.go Normal file
View File

@ -0,0 +1,75 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"sync"
"time"
)
type QueueItem struct {
createTime string
value interface{}
}
type QueueItems struct {
pos int
mutex *sync.Mutex
list []QueueItem
}
type FifoQueue QueueItems
func (fq *FifoQueue) Push(elem interface{}) {
fq.mutex.Lock()
fq.list = append(fq.list, QueueItem{time.Now().String(), elem})
fq.mutex.Unlock()
}
func (fq *FifoQueue) Pop() QueueItem {
fq.mutex.Lock()
var val QueueItem
if len(fq.list)-1 >= fq.pos {
val = fq.list[fq.pos]
fq.pos++
}
fq.mutex.Unlock()
return val
}
func (fq FifoQueue) Len() int {
return len(fq.list[fq.pos:])
}
func (fq *FifoQueue) First() QueueItem {
return fq.list[fq.pos]
}
func (fq *FifoQueue) Last() QueueItem {
return fq.list[len(fq.list)-1]
}
func (fq *FifoQueue) Reset() {
fq.pos = 0
}
func newFifoQueue() *FifoQueue {
tmp := new(FifoQueue)
tmp.mutex = &sync.Mutex{}
tmp.pos = 0
return tmp
}

View File

@ -149,7 +149,13 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
// Once every 1-2 minutes perform scale of RC. // Once every 1-2 minutes perform scale of RC.
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) { for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
if !rcExist { if !rcExist {
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: image,
Replicas: size,
}
expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true rcExist = true
} }
// Scale RC to a random size between 0.5x and 1.5x of the original size. // Scale RC to a random size between 0.5x and 1.5x of the original size.
@ -187,7 +193,13 @@ func createRCGroup(c *client.Client, ns, groupName string, size, count, batchSiz
defer GinkgoRecover() defer GinkgoRecover()
defer wg.Done() defer wg.Done()
name := groupName + "-" + strconv.Itoa(i) name := groupName + "-" + strconv.Itoa(i)
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s for the first time", name, ns)) config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: image,
Replicas: size,
}
expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s for the first time", name, ns))
}(i) }(i)
} }
wg.Wait() wg.Wait()

View File

@ -107,7 +107,15 @@ var _ = Describe("Scale", func() {
for i := 0; i < itArg.rcsPerThread; i++ { for i := 0; i < itArg.rcsPerThread; i++ {
name := "my-short-lived-pod" + string(util.NewUUID()) name := "my-short-lived-pod" + string(util.NewUUID())
n := itArg.podsPerMinion * minionCount n := itArg.podsPerMinion * minionCount
expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n))
config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: "gcr.io/google_containers/pause:go",
Replicas: n,
}
expectNoError(RunRC(config))
podsLaunched += n podsLaunched += n
Logf("Launched %v pods so far...", podsLaunched) Logf("Launched %v pods so far...", podsLaunched)
err := DeleteRC(c, ns, name) err := DeleteRC(c, ns, name)

View File

@ -19,8 +19,10 @@ package e2e
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io/ioutil"
"math" "math"
"math/rand" "math/rand"
"net/http"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -132,6 +134,16 @@ func (s *podStore) Stop() {
close(s.stopCh) close(s.stopCh)
} }
type RCConfig struct {
Client *client.Client
Image string
Name string
Namespace string
PollInterval int
PodStatusFile *os.File
Replicas int
}
func Logf(format string, a ...interface{}) { func Logf(format string, a ...interface{}) {
fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...)
} }
@ -703,12 +715,20 @@ func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff {
// It will waits for all pods it spawns to become "Running". // It will waits for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the // It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup). // namespace lifecycle for handling cleanup).
func RunRC(c *client.Client, name string, ns, image string, replicas int) error { func RunRC(config RCConfig) error {
var last int var last int
c := config.Client
name := config.Name
ns := config.Namespace
image := config.Image
replicas := config.Replicas
interval := config.PollInterval
maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01))
current := 0 current := 0
same := 0 same := 0
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
podLists := newFifoQueue()
By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name)) By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name))
rc := &api.ReplicationController{ rc := &api.ReplicationController{
@ -741,32 +761,49 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
return fmt.Errorf("Error creating replication controller: %v", err) return fmt.Errorf("Error creating replication controller: %v", err)
} }
Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas) Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
podStore := newPodStore(c, ns, label, fields.Everything()) podStore := newPodStore(c, ns, label, fields.Everything())
defer podStore.Stop() defer podStore.Stop()
pods := podStore.List()
current = len(pods) // Create a routine to query for the list of pods
failCount := 24 stop := make(chan struct{})
go func(stop <-chan struct{}, n string, ns string, l labels.Selector, i int) {
for {
select {
case <-stop:
return
default:
podLists.Push(podStore.List())
time.Sleep(time.Duration(i) * time.Second)
}
}
}(stop, name, ns, label, interval)
defer close(stop)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
failCount := int(120.0 / float64(interval))
for same < failCount && current < replicas { for same < failCount && current < replicas {
Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas) time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current)
}
if same >= failCount { // Greedily read all existing entries in the queue until
return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) // all pods are found submitted or the queue is empty
} for podLists.Len() > 0 && current < replicas {
item := podLists.Pop()
pods := item.value.([]*api.Pod)
current = len(pods)
Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas)
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current)
}
last = current if same >= failCount {
time.Sleep(5 * time.Second) return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount)
pods = podStore.List() }
current = len(pods) last = current
}
} }
if current != replicas { if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
@ -776,77 +813,92 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures)) By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures))
same = 0 same = 0
last = 0 last = 0
failCount = 20 failCount = int(100.0 / float64(interval))
current = 0 current = 0
oldPods := make([]*api.Pod, 0) var oldPods []*api.Pod
podLists.Reset()
foundAllPods := false
for same < failCount && current < replicas { for same < failCount && current < replicas {
current = 0 time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
time.Sleep(5 * time.Second)
currentPods := podStore.List() // Greedily read all existing entries in the queue until
for _, p := range currentPods { // either all pods are running or the queue is empty
if p.Status.Phase == api.PodRunning { for podLists.Len() > 0 && current < replicas {
current++ item := podLists.Pop()
for _, v := range FailedContainers(*p) { current = 0
failedContainers = failedContainers + v.restarts waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
currentPods := item.value.([]*api.Pod)
for _, p := range currentPods {
if p.Status.Phase == api.PodRunning {
current++
for _, v := range FailedContainers(p) {
failedContainers = failedContainers + v.restarts
}
} else if p.Status.Phase == api.PodPending {
if p.Spec.NodeName == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
} }
} else if p.Status.Phase == api.PodPending {
if p.Spec.NodeName == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
} }
}
Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
if len(currentPods) != len(pods) { Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
if config.PodStatusFile != nil {
fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown)
}
// This failure mode includes: if foundAllPods && len(currentPods) != len(oldPods) {
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(pods))
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
// The pod failed or succeeded, or was somehow pushed out of running by the kubelet. // This failure mode includes:
errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) // kubelet is dead, so node controller deleted pods and rc creates more
Logf("%v, pods that changed since the last iteration:", errorStr) // - diagnose by noting the pod diff below.
Diff(oldPods, currentPods).Print(util.NewStringSet()) // pod is unhealthy, so replication controller creates another to take its place
return fmt.Errorf(errorStr) // - diagnose by comparing the previous "2 Pod states" lines for inactive pods
} errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods))
if same >= failCount { Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
// Most times this happens because a few nodes have kubelet problems, and their pods are // The pod failed or succeeded, or was somehow pushed out of running by the kubelet.
// stuck in pending. errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current)
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) Logf("%v, pods that changed since the last iteration:", errorStr)
Logf("%v, pods currently in pending:", errorStr) Diff(oldPods, currentPods).Print(util.NewStringSet())
Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) return fmt.Errorf(errorStr)
return fmt.Errorf(errorStr) }
} if same >= failCount {
last = current
oldPods = currentPods
if failedContainers > maxContainerFailures { // Most times this happens because a few nodes have kubelet problems, and their pods are
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) // stuck in pending.
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount)
Logf("%v, pods currently in pending:", errorStr)
Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning)))
return fmt.Errorf(errorStr)
}
if !foundAllPods {
foundAllPods = len(currentPods) == replicas
}
last = current
oldPods = currentPods
if failedContainers > maxContainerFailures {
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
}
} }
} }
if current != replicas { if current != replicas {
@ -908,7 +960,7 @@ func DeleteRC(c *client.Client, ns, name string) error {
// information for containers that have failed or been restarted. // information for containers that have failed or been restarted.
// A map is returned where the key is the containerID and the value is a // A map is returned where the key is the containerID and the value is a
// struct containing the restart and failure information // struct containing the restart and failure information
func FailedContainers(pod api.Pod) map[string]ContainerFailures { func FailedContainers(pod *api.Pod) map[string]ContainerFailures {
var state ContainerFailures var state ContainerFailures
states := make(map[string]ContainerFailures) states := make(map[string]ContainerFailures)
@ -1048,7 +1100,7 @@ func (a LatencyMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a LatencyMetricByLatency) Less(i, j int) bool { return a[i].Latency < a[j].Latency } func (a LatencyMetricByLatency) Less(i, j int) bool { return a[i].Latency < a[j].Latency }
func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw() body, err := getMetrics(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1118,3 +1170,74 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou
return len(badMetrics), nil return len(badMetrics), nil
} }
// Retrieve metrics information
func getMetrics(c *client.Client) (string, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
// Retrieve debug information
func getDebugInfo(c *client.Client) (map[string]string, error) {
data := make(map[string]string)
for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} {
resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2")
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
Logf("Warning: Error trying to fetch %s debug data: %v", key, err)
}
data[key] = string(body)
}
return data, nil
}
func writePerfData(c *client.Client, dirName string, postfix string) error {
fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)
handler, err := os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
metrics, err := getMetrics(c)
if err != nil {
return fmt.Errorf("Error retrieving metrics: %v", err)
}
_, err = handler.WriteString(metrics)
if err != nil {
return fmt.Errorf("Error writing metrics: %v", err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
debug, err := getDebugInfo(c)
if err != nil {
return fmt.Errorf("Error retrieving debug information: %v", err)
}
for key, value := range debug {
fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)
handler, err = os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
_, err = handler.WriteString(value)
if err != nil {
return fmt.Errorf("Error writing %s: %v", key, err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
}
return nil
}