Merge pull request #14827 from wojtek-t/export_metrics_for_dashboard

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-02 14:57:44 -07:00
commit 2c81a664f2
4 changed files with 288 additions and 198 deletions

View File

@ -34,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
. "github.com/onsi/ginkgo"
@ -63,12 +62,17 @@ func (a latencySlice) Len() int { return len(a) }
func (a latencySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a latencySlice) Less(i, j int) bool { return a[i].Latency < a[j].Latency }
func printLatencies(latencies []podLatencyData, header string) {
func extractLatencyMetrics(latencies []podLatencyData) LatencyMetric {
perc50 := latencies[len(latencies)/2].Latency
perc90 := latencies[(len(latencies)*9)/10].Latency
perc99 := latencies[(len(latencies)*99)/100].Latency
return LatencyMetric{Perc50: perc50, Perc90: perc90, Perc99: perc99}
}
func printLatencies(latencies []podLatencyData, header string) {
metrics := extractLatencyMetrics(latencies)
Logf("10%% %s: %v", header, latencies[(len(latencies)*9)/10:len(latencies)])
Logf("perc50: %v, perc90: %v, perc99: %v", perc50, perc90, perc99)
Logf("perc50: %v, perc90: %v, perc99: %v", metrics.Perc50, metrics.Perc90, metrics.Perc99)
}
// This test suite can take a long time to run, so by default it is added to
@ -139,7 +143,7 @@ var _ = Describe("Density", func() {
expectNoError(writePerfData(c, fmt.Sprintf(testContext.OutputDir+"/%s", uuid), "after"))
// Verify latency metrics
highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second, sets.NewString("events"))
highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second)
expectNoError(err)
Expect(highLatencyRequests).NotTo(BeNumerically(">", 0), "There should be no high-latency requests")
@ -379,14 +383,10 @@ var _ = Describe("Density", func() {
printLatencies(e2eLag, "worst e2e total latencies")
// Test whether e2e pod startup time is acceptable.
podStartupLatency := PodStartupLatency{Latency: extractLatencyMetrics(e2eLag)}
// TODO: Switch it to 5 seconds once we are sure our tests are passing.
podStartupThreshold := 8 * time.Second
e2ePodStartupTime50perc := e2eLag[len(e2eLag)/2].Latency
e2ePodStartupTime90perc := e2eLag[len(e2eLag)*9/10].Latency
e2ePodStartupTime99perc := e2eLag[len(e2eLag)*99/100].Latency
Expect(e2ePodStartupTime50perc < podStartupThreshold).To(Equal(true), "Too high pod startup time 50th percentile")
Expect(e2ePodStartupTime90perc < podStartupThreshold).To(Equal(true), "Too high pod startup time 90th percentile")
Expect(e2ePodStartupTime99perc < podStartupThreshold).To(Equal(true), "Too high pod startup time 99th percentile")
expectNoError(VerifyPodStartupLatency(podStartupLatency, podStartupThreshold))
// Log suspicious latency metrics/docker errors from all nodes that had slow startup times
for _, l := range startupLag {

View File

@ -26,7 +26,6 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -78,7 +77,7 @@ var _ = Describe("Load capacity", func() {
deleteAllRC(configs)
// Verify latency metrics
highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second, sets.NewString("events"))
highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second)
expectNoError(err, "Too many instances metrics above the threshold")
Expect(highLatencyRequests).NotTo(BeNumerically(">", 0))

277
test/e2e/metrics_util.go Normal file
View File

@ -0,0 +1,277 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"sort"
"strconv"
"strings"
"time"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/model"
)
// Dashboard metrics
type LatencyMetric struct {
Perc50 time.Duration `json:"Perc50"`
Perc90 time.Duration `json:"Perc90"`
Perc99 time.Duration `json:"Perc99"`
}
type PodStartupLatency struct {
Latency LatencyMetric `json:"latency"`
}
type APICall struct {
Resource string `json:"resource"`
Verb string `json:"verb"`
Latency LatencyMetric `json:"latency"`
}
type APIResponsiveness struct {
APICalls []APICall `json:"apicalls"`
}
func (a APIResponsiveness) Len() int { return len(a.APICalls) }
func (a APIResponsiveness) Swap(i, j int) { a.APICalls[i], a.APICalls[j] = a.APICalls[j], a.APICalls[i] }
func (a APIResponsiveness) Less(i, j int) bool {
return a.APICalls[i].Latency.Perc99 < a.APICalls[j].Latency.Perc99
}
// Ingest method implements extraction.Ingester (necessary for Prometheus library
// to parse the metrics).
func (a *APIResponsiveness) Ingest(samples model.Samples) error {
ignoredResources := sets.NewString("events")
ignoredVerbs := sets.NewString("WATCHLIST", "PROXY")
for _, sample := range samples {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" {
continue
}
resource := string(sample.Metric["resource"])
verb := string(sample.Metric["verb"])
if ignoredResources.Has(resource) || ignoredVerbs.Has(verb) {
continue
}
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return err
}
a.addMetric(resource, verb, quantile, time.Duration(int64(latency))*time.Microsecond)
}
return nil
}
// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func (a *APIResponsiveness) addMetric(resource, verb string, quantile float64, latency time.Duration) {
for i, apicall := range a.APICalls {
if apicall.Resource == resource && apicall.Verb == verb {
a.APICalls[i] = setQuantile(apicall, quantile, latency)
return
}
}
apicall := setQuantile(APICall{Resource: resource, Verb: verb}, quantile, latency)
a.APICalls = append(a.APICalls, apicall)
}
// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func setQuantile(apicall APICall, quantile float64, latency time.Duration) APICall {
switch quantile {
case 0.5:
apicall.Latency.Perc50 = latency
case 0.9:
apicall.Latency.Perc90 = latency
case 0.99:
apicall.Latency.Perc99 = latency
}
return apicall
}
func readLatencyMetrics(c *client.Client) (APIResponsiveness, error) {
body, err := getMetrics(c)
if err != nil {
return APIResponsiveness{}, err
}
var ingester APIResponsiveness
err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{})
return ingester, err
}
// Prints summary metrics for request types with latency above threshold
// and returns number of such request types.
func HighLatencyRequests(c *client.Client, threshold time.Duration) (int, error) {
metrics, err := readLatencyMetrics(c)
if err != nil {
return 0, err
}
sort.Sort(sort.Reverse(metrics))
badMetrics := 0
top := 5
for _, metric := range metrics.APICalls {
isBad := false
if metric.Latency.Perc99 > threshold {
badMetrics++
isBad = true
}
if top > 0 || isBad {
top--
prefix := ""
if isBad {
prefix = "WARNING "
}
Logf("%vTop latency metric: %+v", prefix, metric)
}
}
Logf("API calls latencies: %s", prettyPrintJSON(metrics))
return badMetrics, nil
}
// Verifies whether 50, 90 and 99th percentiles of PodStartupLatency are smaller
// than the given threshold (returns error in the oposite case).
func VerifyPodStartupLatency(latency PodStartupLatency, podStartupThreshold time.Duration) error {
Logf("Pod startup latency: %s", prettyPrintJSON(latency))
if latency.Latency.Perc50 > podStartupThreshold {
return fmt.Errorf("too high pod startup latency 50th percentile: %v", latency.Latency.Perc50)
}
if latency.Latency.Perc90 > podStartupThreshold {
return fmt.Errorf("too high pod startup latency 90th percentile: %v", latency.Latency.Perc90)
}
if latency.Latency.Perc99 > podStartupThreshold {
return fmt.Errorf("too high pod startup latency 99th percentil: %v", latency.Latency.Perc99)
}
return nil
}
// Resets latency metrics in apiserver.
func resetMetrics(c *client.Client) error {
Logf("Resetting latency metrics in apiserver...")
body, err := c.Get().AbsPath("/resetMetrics").DoRaw()
if err != nil {
return err
}
if string(body) != "metrics reset\n" {
return fmt.Errorf("Unexpected response: %q", string(body))
}
return nil
}
// Retrieves metrics information.
func getMetrics(c *client.Client) (string, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
func prettyPrintJSON(metrics interface{}) string {
output := &bytes.Buffer{}
if err := json.NewEncoder(output).Encode(metrics); err != nil {
return ""
}
formatted := &bytes.Buffer{}
if err := json.Indent(formatted, output.Bytes(), "", " "); err != nil {
return ""
}
return string(formatted.Bytes())
}
// Retrieves debug information.
func getDebugInfo(c *client.Client) (map[string]string, error) {
data := make(map[string]string)
for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} {
resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2")
if err != nil {
Logf("Warning: Error trying to fetch %s debug data: %v", key, err)
continue
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
Logf("Warning: Error trying to read %s debug data: %v", key, err)
}
data[key] = string(body)
}
return data, nil
}
func writePerfData(c *client.Client, dirName string, postfix string) error {
fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)
handler, err := os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
metrics, err := getMetrics(c)
if err != nil {
return fmt.Errorf("Error retrieving metrics: %v", err)
}
_, err = handler.WriteString(metrics)
if err != nil {
return fmt.Errorf("Error writing metrics: %v", err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
debug, err := getDebugInfo(c)
if err != nil {
return fmt.Errorf("Error retrieving debug information: %v", err)
}
for key, value := range debug {
fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)
handler, err = os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
_, err = handler.WriteString(value)
if err != nil {
return fmt.Errorf("Error writing %s: %v", key, err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
}
return nil
}

View File

@ -20,14 +20,11 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
@ -51,8 +48,6 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/davecgh/go-spew/spew"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/model"
"golang.org/x/crypto/ssh"
. "github.com/onsi/ginkgo"
@ -1902,187 +1897,6 @@ func filterNodes(nodeList *api.NodeList, fn func(node api.Node) bool) {
nodeList.Items = l
}
// LatencyMetrics stores data about request latency at a given quantile
// broken down by verb (e.g. GET, PUT, LIST) and resource (e.g. pods, services).
type LatencyMetric struct {
Verb string
Resource string
// 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median.
Quantile float64
Latency time.Duration
}
// latencyMetricIngestor implements extraction.Ingester
type latencyMetricIngester []LatencyMetric
func (l *latencyMetricIngester) Ingest(samples model.Samples) error {
for _, sample := range samples {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" {
continue
}
resource := string(sample.Metric["resource"])
verb := string(sample.Metric["verb"])
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return err
}
*l = append(*l, LatencyMetric{
verb,
resource,
quantile,
time.Duration(int64(latency)) * time.Microsecond,
})
}
return nil
}
// LatencyMetricByLatency implements sort.Interface for []LatencyMetric based on
// the latency field.
type LatencyMetricByLatency []LatencyMetric
func (a LatencyMetricByLatency) Len() int { return len(a) }
func (a LatencyMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a LatencyMetricByLatency) Less(i, j int) bool { return a[i].Latency < a[j].Latency }
func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) {
body, err := getMetrics(c)
if err != nil {
return nil, err
}
var ingester latencyMetricIngester
err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{})
return ingester, err
}
// Prints summary metrics for request types with latency above threshold
// and returns number of such request types.
func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResources sets.String) (int, error) {
ignoredVerbs := sets.NewString("WATCHLIST", "PROXY")
metrics, err := ReadLatencyMetrics(c)
if err != nil {
return 0, err
}
sort.Sort(sort.Reverse(LatencyMetricByLatency(metrics)))
var badMetrics []LatencyMetric
top := 5
for _, metric := range metrics {
if ignoredResources.Has(metric.Resource) || ignoredVerbs.Has(metric.Verb) {
continue
}
isBad := false
if metric.Latency > threshold &&
// We are only interested in 99%tile, but for logging purposes
// it's useful to have all the offending percentiles.
metric.Quantile <= 0.99 {
badMetrics = append(badMetrics, metric)
isBad = true
}
if top > 0 || isBad {
top--
prefix := ""
if isBad {
prefix = "WARNING "
}
Logf("%vTop latency metric: %+v", prefix, metric)
}
}
return len(badMetrics), nil
}
// Reset latency metrics in apiserver.
func resetMetrics(c *client.Client) error {
Logf("Resetting latency metrics in apiserver...")
body, err := c.Get().AbsPath("/resetMetrics").DoRaw()
if err != nil {
return err
}
if string(body) != "metrics reset\n" {
return fmt.Errorf("Unexpected response: %q", string(body))
}
return nil
}
// Retrieve metrics information
func getMetrics(c *client.Client) (string, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
// Retrieve debug information
func getDebugInfo(c *client.Client) (map[string]string, error) {
data := make(map[string]string)
for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} {
resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2")
if err != nil {
Logf("Warning: Error trying to fetch %s debug data: %v", key, err)
continue
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
Logf("Warning: Error trying to read %s debug data: %v", key, err)
}
data[key] = string(body)
}
return data, nil
}
func writePerfData(c *client.Client, dirName string, postfix string) error {
fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)
handler, err := os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
metrics, err := getMetrics(c)
if err != nil {
return fmt.Errorf("Error retrieving metrics: %v", err)
}
_, err = handler.WriteString(metrics)
if err != nil {
return fmt.Errorf("Error writing metrics: %v", err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
debug, err := getDebugInfo(c)
if err != nil {
return fmt.Errorf("Error retrieving debug information: %v", err)
}
for key, value := range debug {
fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)
handler, err = os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
_, err = handler.WriteString(value)
if err != nil {
return fmt.Errorf("Error writing %s: %v", key, err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
}
return nil
}
// parseKVLines parses output that looks like lines containing "<key>: <val>"
// and returns <val> if <key> is found. Otherwise, it returns the empty string.
func parseKVLines(output, key string) string {