diff --git a/test/e2e/density.go b/test/e2e/density.go index 524a51ba2fe..22b97de8296 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -34,7 +34,6 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" . "github.com/onsi/ginkgo" @@ -63,12 +62,17 @@ func (a latencySlice) Len() int { return len(a) } func (a latencySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a latencySlice) Less(i, j int) bool { return a[i].Latency < a[j].Latency } -func printLatencies(latencies []podLatencyData, header string) { +func extractLatencyMetrics(latencies []podLatencyData) LatencyMetric { perc50 := latencies[len(latencies)/2].Latency perc90 := latencies[(len(latencies)*9)/10].Latency perc99 := latencies[(len(latencies)*99)/100].Latency + return LatencyMetric{Perc50: perc50, Perc90: perc90, Perc99: perc99} +} + +func printLatencies(latencies []podLatencyData, header string) { + metrics := extractLatencyMetrics(latencies) Logf("10%% %s: %v", header, latencies[(len(latencies)*9)/10:len(latencies)]) - Logf("perc50: %v, perc90: %v, perc99: %v", perc50, perc90, perc99) + Logf("perc50: %v, perc90: %v, perc99: %v", metrics.Perc50, metrics.Perc90, metrics.Perc99) } // This test suite can take a long time to run, so by default it is added to @@ -139,7 +143,7 @@ var _ = Describe("Density", func() { expectNoError(writePerfData(c, fmt.Sprintf(testContext.OutputDir+"/%s", uuid), "after")) // Verify latency metrics - highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second, sets.NewString("events")) + highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second) expectNoError(err) Expect(highLatencyRequests).NotTo(BeNumerically(">", 0), "There should be no high-latency requests") @@ -379,14 +383,10 @@ var _ = Describe("Density", func() { printLatencies(e2eLag, "worst e2e total latencies") // Test whether e2e pod startup time is acceptable. + podStartupLatency := PodStartupLatency{Latency: extractLatencyMetrics(e2eLag)} // TODO: Switch it to 5 seconds once we are sure our tests are passing. podStartupThreshold := 8 * time.Second - e2ePodStartupTime50perc := e2eLag[len(e2eLag)/2].Latency - e2ePodStartupTime90perc := e2eLag[len(e2eLag)*9/10].Latency - e2ePodStartupTime99perc := e2eLag[len(e2eLag)*99/100].Latency - Expect(e2ePodStartupTime50perc < podStartupThreshold).To(Equal(true), "Too high pod startup time 50th percentile") - Expect(e2ePodStartupTime90perc < podStartupThreshold).To(Equal(true), "Too high pod startup time 90th percentile") - Expect(e2ePodStartupTime99perc < podStartupThreshold).To(Equal(true), "Too high pod startup time 99th percentile") + expectNoError(VerifyPodStartupLatency(podStartupLatency, podStartupThreshold)) // Log suspicious latency metrics/docker errors from all nodes that had slow startup times for _, l := range startupLag { diff --git a/test/e2e/load.go b/test/e2e/load.go index 162a316a311..9ba45921027 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -26,7 +26,6 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util/sets" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -78,7 +77,7 @@ var _ = Describe("Load capacity", func() { deleteAllRC(configs) // Verify latency metrics - highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second, sets.NewString("events")) + highLatencyRequests, err := HighLatencyRequests(c, 3*time.Second) expectNoError(err, "Too many instances metrics above the threshold") Expect(highLatencyRequests).NotTo(BeNumerically(">", 0)) diff --git a/test/e2e/metrics_util.go b/test/e2e/metrics_util.go new file mode 100644 index 00000000000..7620365d5ea --- /dev/null +++ b/test/e2e/metrics_util.go @@ -0,0 +1,277 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "sort" + "strconv" + "strings" + "time" + + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/util/sets" + + "github.com/prometheus/client_golang/extraction" + "github.com/prometheus/client_golang/model" +) + +// Dashboard metrics +type LatencyMetric struct { + Perc50 time.Duration `json:"Perc50"` + Perc90 time.Duration `json:"Perc90"` + Perc99 time.Duration `json:"Perc99"` +} + +type PodStartupLatency struct { + Latency LatencyMetric `json:"latency"` +} + +type APICall struct { + Resource string `json:"resource"` + Verb string `json:"verb"` + Latency LatencyMetric `json:"latency"` +} + +type APIResponsiveness struct { + APICalls []APICall `json:"apicalls"` +} + +func (a APIResponsiveness) Len() int { return len(a.APICalls) } +func (a APIResponsiveness) Swap(i, j int) { a.APICalls[i], a.APICalls[j] = a.APICalls[j], a.APICalls[i] } +func (a APIResponsiveness) Less(i, j int) bool { + return a.APICalls[i].Latency.Perc99 < a.APICalls[j].Latency.Perc99 +} + +// Ingest method implements extraction.Ingester (necessary for Prometheus library +// to parse the metrics). +func (a *APIResponsiveness) Ingest(samples model.Samples) error { + ignoredResources := sets.NewString("events") + ignoredVerbs := sets.NewString("WATCHLIST", "PROXY") + + for _, sample := range samples { + // Example line: + // apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908 + if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" { + continue + } + + resource := string(sample.Metric["resource"]) + verb := string(sample.Metric["verb"]) + if ignoredResources.Has(resource) || ignoredVerbs.Has(verb) { + continue + } + latency := sample.Value + quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64) + if err != nil { + return err + } + a.addMetric(resource, verb, quantile, time.Duration(int64(latency))*time.Microsecond) + } + return nil +} + +// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median) +// Only 0.5, 0.9 and 0.99 quantiles are supported. +func (a *APIResponsiveness) addMetric(resource, verb string, quantile float64, latency time.Duration) { + for i, apicall := range a.APICalls { + if apicall.Resource == resource && apicall.Verb == verb { + a.APICalls[i] = setQuantile(apicall, quantile, latency) + return + } + } + apicall := setQuantile(APICall{Resource: resource, Verb: verb}, quantile, latency) + a.APICalls = append(a.APICalls, apicall) +} + +// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median) +// Only 0.5, 0.9 and 0.99 quantiles are supported. +func setQuantile(apicall APICall, quantile float64, latency time.Duration) APICall { + switch quantile { + case 0.5: + apicall.Latency.Perc50 = latency + case 0.9: + apicall.Latency.Perc90 = latency + case 0.99: + apicall.Latency.Perc99 = latency + } + return apicall +} + +func readLatencyMetrics(c *client.Client) (APIResponsiveness, error) { + body, err := getMetrics(c) + if err != nil { + return APIResponsiveness{}, err + } + + var ingester APIResponsiveness + err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{}) + return ingester, err +} + +// Prints summary metrics for request types with latency above threshold +// and returns number of such request types. +func HighLatencyRequests(c *client.Client, threshold time.Duration) (int, error) { + metrics, err := readLatencyMetrics(c) + if err != nil { + return 0, err + } + sort.Sort(sort.Reverse(metrics)) + badMetrics := 0 + top := 5 + for _, metric := range metrics.APICalls { + isBad := false + if metric.Latency.Perc99 > threshold { + badMetrics++ + isBad = true + } + if top > 0 || isBad { + top-- + prefix := "" + if isBad { + prefix = "WARNING " + } + Logf("%vTop latency metric: %+v", prefix, metric) + } + } + + Logf("API calls latencies: %s", prettyPrintJSON(metrics)) + + return badMetrics, nil +} + +// Verifies whether 50, 90 and 99th percentiles of PodStartupLatency are smaller +// than the given threshold (returns error in the oposite case). +func VerifyPodStartupLatency(latency PodStartupLatency, podStartupThreshold time.Duration) error { + Logf("Pod startup latency: %s", prettyPrintJSON(latency)) + + if latency.Latency.Perc50 > podStartupThreshold { + return fmt.Errorf("too high pod startup latency 50th percentile: %v", latency.Latency.Perc50) + } + if latency.Latency.Perc90 > podStartupThreshold { + return fmt.Errorf("too high pod startup latency 90th percentile: %v", latency.Latency.Perc90) + } + if latency.Latency.Perc99 > podStartupThreshold { + return fmt.Errorf("too high pod startup latency 99th percentil: %v", latency.Latency.Perc99) + } + return nil +} + +// Resets latency metrics in apiserver. +func resetMetrics(c *client.Client) error { + Logf("Resetting latency metrics in apiserver...") + body, err := c.Get().AbsPath("/resetMetrics").DoRaw() + if err != nil { + return err + } + if string(body) != "metrics reset\n" { + return fmt.Errorf("Unexpected response: %q", string(body)) + } + return nil +} + +// Retrieves metrics information. +func getMetrics(c *client.Client) (string, error) { + body, err := c.Get().AbsPath("/metrics").DoRaw() + if err != nil { + return "", err + } + return string(body), nil +} + +func prettyPrintJSON(metrics interface{}) string { + output := &bytes.Buffer{} + if err := json.NewEncoder(output).Encode(metrics); err != nil { + return "" + } + formatted := &bytes.Buffer{} + if err := json.Indent(formatted, output.Bytes(), "", " "); err != nil { + return "" + } + return string(formatted.Bytes()) +} + +// Retrieves debug information. +func getDebugInfo(c *client.Client) (map[string]string, error) { + data := make(map[string]string) + for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} { + resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2") + if err != nil { + Logf("Warning: Error trying to fetch %s debug data: %v", key, err) + continue + } + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + Logf("Warning: Error trying to read %s debug data: %v", key, err) + } + data[key] = string(body) + } + return data, nil +} + +func writePerfData(c *client.Client, dirName string, postfix string) error { + fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix) + + handler, err := os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + + metrics, err := getMetrics(c) + if err != nil { + return fmt.Errorf("Error retrieving metrics: %v", err) + } + + _, err = handler.WriteString(metrics) + if err != nil { + return fmt.Errorf("Error writing metrics: %v", err) + } + + err = handler.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + + debug, err := getDebugInfo(c) + if err != nil { + return fmt.Errorf("Error retrieving debug information: %v", err) + } + + for key, value := range debug { + fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix) + handler, err = os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + _, err = handler.WriteString(value) + if err != nil { + return fmt.Errorf("Error writing %s: %v", key, err) + } + + err = handler.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + } + return nil +} diff --git a/test/e2e/util.go b/test/e2e/util.go index 82e196b9769..92f89b4ef74 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -20,14 +20,11 @@ import ( "bytes" "fmt" "io" - "io/ioutil" "math" "math/rand" - "net/http" "os" "os/exec" "path/filepath" - "sort" "strconv" "strings" "time" @@ -51,8 +48,6 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/davecgh/go-spew/spew" - "github.com/prometheus/client_golang/extraction" - "github.com/prometheus/client_golang/model" "golang.org/x/crypto/ssh" . "github.com/onsi/ginkgo" @@ -1902,187 +1897,6 @@ func filterNodes(nodeList *api.NodeList, fn func(node api.Node) bool) { nodeList.Items = l } -// LatencyMetrics stores data about request latency at a given quantile -// broken down by verb (e.g. GET, PUT, LIST) and resource (e.g. pods, services). -type LatencyMetric struct { - Verb string - Resource string - // 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median. - Quantile float64 - Latency time.Duration -} - -// latencyMetricIngestor implements extraction.Ingester -type latencyMetricIngester []LatencyMetric - -func (l *latencyMetricIngester) Ingest(samples model.Samples) error { - for _, sample := range samples { - // Example line: - // apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908 - if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" { - continue - } - - resource := string(sample.Metric["resource"]) - verb := string(sample.Metric["verb"]) - latency := sample.Value - quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64) - if err != nil { - return err - } - *l = append(*l, LatencyMetric{ - verb, - resource, - quantile, - time.Duration(int64(latency)) * time.Microsecond, - }) - } - return nil -} - -// LatencyMetricByLatency implements sort.Interface for []LatencyMetric based on -// the latency field. -type LatencyMetricByLatency []LatencyMetric - -func (a LatencyMetricByLatency) Len() int { return len(a) } -func (a LatencyMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a LatencyMetricByLatency) Less(i, j int) bool { return a[i].Latency < a[j].Latency } - -func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { - body, err := getMetrics(c) - if err != nil { - return nil, err - } - var ingester latencyMetricIngester - err = extraction.Processor004.ProcessSingle(strings.NewReader(body), &ingester, &extraction.ProcessOptions{}) - return ingester, err -} - -// Prints summary metrics for request types with latency above threshold -// and returns number of such request types. -func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResources sets.String) (int, error) { - ignoredVerbs := sets.NewString("WATCHLIST", "PROXY") - - metrics, err := ReadLatencyMetrics(c) - if err != nil { - return 0, err - } - sort.Sort(sort.Reverse(LatencyMetricByLatency(metrics))) - var badMetrics []LatencyMetric - top := 5 - for _, metric := range metrics { - if ignoredResources.Has(metric.Resource) || ignoredVerbs.Has(metric.Verb) { - continue - } - isBad := false - if metric.Latency > threshold && - // We are only interested in 99%tile, but for logging purposes - // it's useful to have all the offending percentiles. - metric.Quantile <= 0.99 { - badMetrics = append(badMetrics, metric) - isBad = true - } - if top > 0 || isBad { - top-- - prefix := "" - if isBad { - prefix = "WARNING " - } - Logf("%vTop latency metric: %+v", prefix, metric) - } - } - - return len(badMetrics), nil -} - -// Reset latency metrics in apiserver. -func resetMetrics(c *client.Client) error { - Logf("Resetting latency metrics in apiserver...") - body, err := c.Get().AbsPath("/resetMetrics").DoRaw() - if err != nil { - return err - } - if string(body) != "metrics reset\n" { - return fmt.Errorf("Unexpected response: %q", string(body)) - } - return nil -} - -// Retrieve metrics information -func getMetrics(c *client.Client) (string, error) { - body, err := c.Get().AbsPath("/metrics").DoRaw() - if err != nil { - return "", err - } - return string(body), nil -} - -// Retrieve debug information -func getDebugInfo(c *client.Client) (map[string]string, error) { - data := make(map[string]string) - for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} { - resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2") - if err != nil { - Logf("Warning: Error trying to fetch %s debug data: %v", key, err) - continue - } - body, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - Logf("Warning: Error trying to read %s debug data: %v", key, err) - } - data[key] = string(body) - } - return data, nil -} - -func writePerfData(c *client.Client, dirName string, postfix string) error { - fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix) - - handler, err := os.Create(fname) - if err != nil { - return fmt.Errorf("Error creating file '%s': %v", fname, err) - } - - metrics, err := getMetrics(c) - if err != nil { - return fmt.Errorf("Error retrieving metrics: %v", err) - } - - _, err = handler.WriteString(metrics) - if err != nil { - return fmt.Errorf("Error writing metrics: %v", err) - } - - err = handler.Close() - if err != nil { - return fmt.Errorf("Error closing '%s': %v", fname, err) - } - - debug, err := getDebugInfo(c) - if err != nil { - return fmt.Errorf("Error retrieving debug information: %v", err) - } - - for key, value := range debug { - fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix) - handler, err = os.Create(fname) - if err != nil { - return fmt.Errorf("Error creating file '%s': %v", fname, err) - } - _, err = handler.WriteString(value) - if err != nil { - return fmt.Errorf("Error writing %s: %v", key, err) - } - - err = handler.Close() - if err != nil { - return fmt.Errorf("Error closing '%s': %v", fname, err) - } - } - return nil -} - // parseKVLines parses output that looks like lines containing ": " // and returns if is found. Otherwise, it returns the empty string. func parseKVLines(output, key string) string {