Refactored metrics-related functions from framework/metrics_util.go

This a refactoring of framework/metrics_utils.go into framework/metrics.

Signed-off-by: alejandrox1 <alarcj137@gmail.com>
This commit is contained in:
alejandrox1 2019-07-02 23:15:20 -04:00
parent e79dcc2174
commit 348fd0805e
29 changed files with 1318 additions and 988 deletions

View File

@ -558,6 +558,7 @@ staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1
staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/fischer
staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/flunder
test/e2e/common
test/e2e/framework/metrics
test/e2e/lifecycle/bootstrap
test/e2e/storage/vsphere
test/e2e_kubeadm

View File

@ -39,7 +39,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework/metrics"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
"github.com/onsi/ginkgo"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -242,7 +242,7 @@ func verifyRemainingObjects(f *framework.Framework, objects map[string]int) (boo
func gatherMetrics(f *framework.Framework) {
ginkgo.By("Gathering metrics")
var summary framework.TestDataSummary
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, false, false, true, false, false)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, false, false, true, false, false)
if err != nil {
e2elog.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.")
} else {
@ -250,7 +250,7 @@ func gatherMetrics(f *framework.Framework) {
if err != nil {
e2elog.Logf("MetricsGrabber failed grab metrics. Skipping metrics gathering.")
} else {
summary = (*framework.MetricsForE2E)(&received)
summary = (*e2emetrics.MetricsForE2E)(&received)
e2elog.Logf(summary.PrintHumanReadable())
}
}

View File

@ -39,7 +39,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/ginkgowrapper"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework/metrics"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/manifest"
testutils "k8s.io/kubernetes/test/utils"
@ -188,7 +188,7 @@ func gatherTestSuiteMetrics() error {
}
// Grab metrics for apiserver, scheduler, controller-manager, kubelet (for non-kubemark case) and cluster autoscaler (optionally).
grabber, err := metrics.NewMetricsGrabber(c, nil, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics)
grabber, err := e2emetrics.NewMetricsGrabber(c, nil, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
return fmt.Errorf("failed to create MetricsGrabber: %v", err)
}
@ -198,7 +198,7 @@ func gatherTestSuiteMetrics() error {
return fmt.Errorf("failed to grab metrics: %v", err)
}
metricsForE2E := (*framework.MetricsForE2E)(&received)
metricsForE2E := (*e2emetrics.MetricsForE2E)(&received)
metricsJSON := metricsForE2E.PrintJSON()
if framework.TestContext.ReportDir != "" {
filePath := path.Join(framework.TestContext.ReportDir, "MetricsForE2ESuite_"+time.Now().Format(time.RFC3339)+".json")

View File

@ -14,7 +14,6 @@ go_library(
"google_compute.go",
"kubelet_stats.go",
"log_size_monitoring.go",
"metrics_util.go",
"networking_utils.go",
"nodes_util.go",
"perf_util.go",
@ -48,7 +47,6 @@ go_library(
"//pkg/master/ports:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/security/podsecuritypolicy/seccomp:go_default_library",
"//pkg/util/system:go_default_library",
@ -113,7 +111,6 @@ go_library(
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/onsi/gomega/types:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/golang.org/x/net/websocket:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"sync"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
)
// FlakeReport is a struct for managing the flake report.
@ -90,7 +91,7 @@ func (f *FlakeReport) PrintHumanReadable() string {
func (f *FlakeReport) PrintJSON() string {
f.lock.RLock()
defer f.lock.RUnlock()
return PrettyPrintJSON(f)
return e2emetrics.PrettyPrintJSON(f)
}
// SummaryKind returns the summary of flake report.

View File

@ -47,7 +47,7 @@ import (
"k8s.io/client-go/restmapper"
scaleclient "k8s.io/client-go/scale"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework/metrics"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
testutils "k8s.io/kubernetes/test/utils"
@ -112,7 +112,7 @@ type Framework struct {
TestSummaries []TestDataSummary
// Place to keep ClusterAutoscaler metrics from before test in order to compute delta.
clusterAutoscalerMetricsBeforeTest metrics.Collection
clusterAutoscalerMetricsBeforeTest e2emetrics.Collection
}
// TestDataSummary is an interface for managing test data.
@ -271,7 +271,7 @@ func (f *Framework) BeforeEach() {
gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master"
if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics {
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
e2elog.Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err)
} else {
@ -363,7 +363,7 @@ func (f *Framework) AfterEach() {
ginkgo.By("Gathering metrics")
// Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics.
grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark")
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
e2elog.Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err)
} else {
@ -371,8 +371,8 @@ func (f *Framework) AfterEach() {
if err != nil {
e2elog.Logf("MetricsGrabber failed to grab some of the metrics: %v", err)
}
(*MetricsForE2E)(&received).computeClusterAutoscalerMetricsDelta(f.clusterAutoscalerMetricsBeforeTest)
f.TestSummaries = append(f.TestSummaries, (*MetricsForE2E)(&received))
(*e2emetrics.MetricsForE2E)(&received).ComputeClusterAutoscalerMetricsDelta(f.clusterAutoscalerMetricsBeforeTest)
f.TestSummaries = append(f.TestSummaries, (*e2emetrics.MetricsForE2E)(&received))
}
}

View File

@ -47,6 +47,7 @@ import (
// KubeletLatencyMetric stores metrics scraped from the kubelet server's /metric endpoint.
// TODO: Get some more structure around the metrics and this type
// TODO(alejandrox1): this is already present in test/e2e/framework/metrics.
type KubeletLatencyMetric struct {
// eg: list, info, create
Operation string
@ -59,6 +60,7 @@ type KubeletLatencyMetric struct {
// KubeletLatencyMetrics implements sort.Interface for []KubeletMetric based on
// the latency field.
// TODO(alejandrox1): this is already present in test/e2e/framework/metrics.
type KubeletLatencyMetrics []KubeletLatencyMetric
func (a KubeletLatencyMetrics) Len() int { return len(a) }
@ -67,6 +69,7 @@ func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].
// If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber;
// or else, the function will try to get kubelet metrics directly from the node.
// TODO(alejandrox1): this is already present in test/e2e/framework/metrics.
func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) {
if c == nil {
return metrics.GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
@ -80,6 +83,7 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (metrics.
// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims
// the subsystem prefix.
// TODO(alejandrox1): this is already present in test/e2e/framework/metrics.
func getKubeletMetrics(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) {
ms, err := getKubeletMetricsFromNode(c, nodeName)
if err != nil {
@ -102,6 +106,7 @@ func getKubeletMetrics(c clientset.Interface, nodeName string) (metrics.KubeletM
// GetDefaultKubeletLatencyMetrics calls GetKubeletLatencyMetrics with a set of default metricNames
// identifying common latency metrics.
// Note that the KubeletMetrics passed in should not contain subsystem prefix.
// TODO(alejandrox1): this is already present in test/e2e/framework/metrics.
func GetDefaultKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics {
latencyMetricNames := sets.NewString(
kubeletmetrics.PodWorkerDurationKey,
@ -117,6 +122,7 @@ func GetDefaultKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMe
// GetKubeletLatencyMetrics filters ms to include only those contained in the metricNames set,
// then constructs a KubeletLatencyMetrics list based on the samples associated with those metrics.
// TODO(alejandrox1): this is already present in test/e2e/framework/metrics.
func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics, filterMetricNames sets.String) KubeletLatencyMetrics {
var latencyMetrics KubeletLatencyMetrics
for name, samples := range ms {
@ -266,6 +272,7 @@ func getNodeRuntimeOperationErrorRate(c clientset.Interface, node string) (NodeR
}
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
// TODO(alejandrox1): this is already present in test/e2e/framework/metrics.
func HighLatencyKubeletOperations(c clientset.Interface, threshold time.Duration, nodeName string, logFunc func(fmt string, args ...interface{})) (KubeletLatencyMetrics, error) {
ms, err := getKubeletMetrics(c, nodeName)
if err != nil {

View File

@ -27,6 +27,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
)
@ -108,7 +109,7 @@ func (s *LogsSizeDataSummary) PrintHumanReadable() string {
// PrintJSON returns the summary of log size data with JSON format.
func (s *LogsSizeDataSummary) PrintJSON() string {
return PrettyPrintJSON(*s)
return e2emetrics.PrettyPrintJSON(*s)
}
// SummaryKind returns the summary of log size data summary.

View File

@ -8,22 +8,37 @@ load(
go_library(
name = "go_default_library",
srcs = [
"api.go",
"api_server_metrics.go",
"cluster_autoscaler_metrics.go",
"controller_manager_metrics.go",
"e2e_metrics.go",
"etcd.go",
"generic_metrics.go",
"interesting_metrics.go",
"kubelet_metrics.go",
"latencies.go",
"metrics_grabber.go",
"pod.go",
"scheduler_metrics.go",
"scheduling.go",
],
importpath = "k8s.io/kubernetes/test/e2e/framework/metrics",
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/kubelet/dockershim/metrics:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/util/system:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/ssh:go_default_library",
"//test/e2e/perftype:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -0,0 +1,135 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"time"
e2eperftype "k8s.io/kubernetes/test/e2e/perftype"
)
// APICall is a struct for managing API call.
type APICall struct {
Resource string `json:"resource"`
Subresource string `json:"subresource"`
Verb string `json:"verb"`
Scope string `json:"scope"`
Latency LatencyMetric `json:"latency"`
Count int `json:"count"`
}
// APIResponsiveness is a struct for managing multiple API calls.
type APIResponsiveness struct {
APICalls []APICall `json:"apicalls"`
}
// SummaryKind returns the summary of API responsiveness.
func (a *APIResponsiveness) SummaryKind() string {
return "APIResponsiveness"
}
// PrintHumanReadable returns metrics with JSON format.
func (a *APIResponsiveness) PrintHumanReadable() string {
return PrettyPrintJSON(a)
}
// PrintJSON returns metrics of PerfData(50, 90 and 99th percentiles) with JSON format.
func (a *APIResponsiveness) PrintJSON() string {
return PrettyPrintJSON(APICallToPerfData(a))
}
func (a *APIResponsiveness) Len() int { return len(a.APICalls) }
func (a *APIResponsiveness) Swap(i, j int) {
a.APICalls[i], a.APICalls[j] = a.APICalls[j], a.APICalls[i]
}
func (a *APIResponsiveness) Less(i, j int) bool {
return a.APICalls[i].Latency.Perc99 < a.APICalls[j].Latency.Perc99
}
// Set request latency for a particular quantile in the APICall metric entry (creating one if necessary).
// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func (a *APIResponsiveness) addMetricRequestLatency(resource, subresource, verb, scope string, quantile float64, latency time.Duration) {
for i, apicall := range a.APICalls {
if apicall.Resource == resource && apicall.Subresource == subresource && apicall.Verb == verb && apicall.Scope == scope {
a.APICalls[i] = setQuantileAPICall(apicall, quantile, latency)
return
}
}
apicall := setQuantileAPICall(APICall{Resource: resource, Subresource: subresource, Verb: verb, Scope: scope}, quantile, latency)
a.APICalls = append(a.APICalls, apicall)
}
// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func setQuantileAPICall(apicall APICall, quantile float64, latency time.Duration) APICall {
setQuantile(&apicall.Latency, quantile, latency)
return apicall
}
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func setQuantile(metric *LatencyMetric, quantile float64, latency time.Duration) {
switch quantile {
case 0.5:
metric.Perc50 = latency
case 0.9:
metric.Perc90 = latency
case 0.99:
metric.Perc99 = latency
}
}
// Add request count to the APICall metric entry (creating one if necessary).
func (a *APIResponsiveness) addMetricRequestCount(resource, subresource, verb, scope string, count int) {
for i, apicall := range a.APICalls {
if apicall.Resource == resource && apicall.Subresource == subresource && apicall.Verb == verb && apicall.Scope == scope {
a.APICalls[i].Count += count
return
}
}
apicall := APICall{Resource: resource, Subresource: subresource, Verb: verb, Count: count, Scope: scope}
a.APICalls = append(a.APICalls, apicall)
}
// currentAPICallMetricsVersion is the current apicall performance metrics version. We should
// bump up the version each time we make incompatible change to the metrics.
const currentAPICallMetricsVersion = "v1"
// APICallToPerfData transforms APIResponsiveness to PerfData.
func APICallToPerfData(apicalls *APIResponsiveness) *e2eperftype.PerfData {
perfData := &e2eperftype.PerfData{Version: currentAPICallMetricsVersion}
for _, apicall := range apicalls.APICalls {
item := e2eperftype.DataItem{
Data: map[string]float64{
"Perc50": float64(apicall.Latency.Perc50) / 1000000, // us -> ms
"Perc90": float64(apicall.Latency.Perc90) / 1000000,
"Perc99": float64(apicall.Latency.Perc99) / 1000000,
},
Unit: "ms",
Labels: map[string]string{
"Verb": apicall.Verb,
"Resource": apicall.Resource,
"Subresource": apicall.Subresource,
"Scope": apicall.Scope,
"Count": fmt.Sprintf("%v", apicall.Count),
},
}
perfData.DataItems = append(perfData.DataItems, item)
}
return perfData
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"github.com/prometheus/common/model"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
)
const (
// Cluster Autoscaler metrics names
caFunctionMetric = "cluster_autoscaler_function_duration_seconds_bucket"
caFunctionMetricLabel = "function"
)
// MetricsForE2E is metrics collection of components.
type MetricsForE2E Collection
func (m *MetricsForE2E) filterMetrics() {
apiServerMetrics := make(APIServerMetrics)
for _, metric := range interestingAPIServerMetrics {
apiServerMetrics[metric] = (*m).APIServerMetrics[metric]
}
controllerManagerMetrics := make(ControllerManagerMetrics)
for _, metric := range interestingControllerManagerMetrics {
controllerManagerMetrics[metric] = (*m).ControllerManagerMetrics[metric]
}
kubeletMetrics := make(map[string]KubeletMetrics)
for kubelet, grabbed := range (*m).KubeletMetrics {
kubeletMetrics[kubelet] = make(KubeletMetrics)
for _, metric := range interestingKubeletMetrics {
kubeletMetrics[kubelet][metric] = grabbed[metric]
}
}
(*m).APIServerMetrics = apiServerMetrics
(*m).ControllerManagerMetrics = controllerManagerMetrics
(*m).KubeletMetrics = kubeletMetrics
}
func printSample(sample *model.Sample) string {
buf := make([]string, 0)
// Id is a VERY special label. For 'normal' container it's useless, but it's necessary
// for 'system' containers (e.g. /docker-daemon, /kubelet, etc.). We know if that's the
// case by checking if there's a label "kubernetes_container_name" present. It's hacky
// but it works...
_, normalContainer := sample.Metric["kubernetes_container_name"]
for k, v := range sample.Metric {
if strings.HasPrefix(string(k), "__") {
continue
}
if string(k) == "id" && normalContainer {
continue
}
buf = append(buf, fmt.Sprintf("%v=%v", string(k), v))
}
return fmt.Sprintf("[%v] = %v", strings.Join(buf, ","), sample.Value)
}
// PrintHumanReadable returns e2e metrics with JSON format.
func (m *MetricsForE2E) PrintHumanReadable() string {
buf := bytes.Buffer{}
for _, interestingMetric := range interestingAPIServerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).APIServerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
}
}
for _, interestingMetric := range interestingControllerManagerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ControllerManagerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
}
}
for _, interestingMetric := range interestingClusterAutoscalerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ClusterAutoscalerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
}
}
for kubelet, grabbed := range (*m).KubeletMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", kubelet))
for _, interestingMetric := range interestingKubeletMetrics {
buf.WriteString(fmt.Sprintf("\tFor %v:\n", interestingMetric))
for _, sample := range grabbed[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t\t%v\n", printSample(sample)))
}
}
}
return buf.String()
}
// PrettyPrintJSON converts metrics to JSON format.
func PrettyPrintJSON(metrics interface{}) string {
output := &bytes.Buffer{}
if err := json.NewEncoder(output).Encode(metrics); err != nil {
e2elog.Logf("Error building encoder: %v", err)
return ""
}
formatted := &bytes.Buffer{}
if err := json.Indent(formatted, output.Bytes(), "", " "); err != nil {
e2elog.Logf("Error indenting: %v", err)
return ""
}
return string(formatted.Bytes())
}
// PrintJSON returns e2e metrics with JSON format.
func (m *MetricsForE2E) PrintJSON() string {
m.filterMetrics()
return PrettyPrintJSON(m)
}
// SummaryKind returns the summary of e2e metrics.
func (m *MetricsForE2E) SummaryKind() string {
return "MetricsForE2E"
}
func makeKey(a, b model.LabelValue) string {
return string(a) + "___" + string(b)
}
// ComputeClusterAutoscalerMetricsDelta computes the change in cluster
// autoscaler metrics.
func (m *MetricsForE2E) ComputeClusterAutoscalerMetricsDelta(before Collection) {
if beforeSamples, found := before.ClusterAutoscalerMetrics[caFunctionMetric]; found {
if afterSamples, found := m.ClusterAutoscalerMetrics[caFunctionMetric]; found {
beforeSamplesMap := make(map[string]*model.Sample)
for _, bSample := range beforeSamples {
beforeSamplesMap[makeKey(bSample.Metric[caFunctionMetricLabel], bSample.Metric["le"])] = bSample
}
for _, aSample := range afterSamples {
if bSample, found := beforeSamplesMap[makeKey(aSample.Metric[caFunctionMetricLabel], aSample.Metric["le"])]; found {
aSample.Value = aSample.Value - bSample.Value
}
}
}
}
}

View File

@ -0,0 +1,223 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"io"
"math"
"reflect"
"strings"
"sync"
"time"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
)
// Histogram is a struct for managing histogram.
type Histogram struct {
Labels map[string]string `json:"labels"`
Buckets map[string]int `json:"buckets"`
}
// HistogramVec is an array of Histogram.
type HistogramVec []Histogram
func newHistogram(labels map[string]string) *Histogram {
return &Histogram{
Labels: labels,
Buckets: make(map[string]int),
}
}
// EtcdMetrics is a struct for managing etcd metrics.
type EtcdMetrics struct {
BackendCommitDuration HistogramVec `json:"backendCommitDuration"`
SnapshotSaveTotalDuration HistogramVec `json:"snapshotSaveTotalDuration"`
PeerRoundTripTime HistogramVec `json:"peerRoundTripTime"`
WalFsyncDuration HistogramVec `json:"walFsyncDuration"`
MaxDatabaseSize float64 `json:"maxDatabaseSize"`
}
func newEtcdMetrics() *EtcdMetrics {
return &EtcdMetrics{
BackendCommitDuration: make(HistogramVec, 0),
SnapshotSaveTotalDuration: make(HistogramVec, 0),
PeerRoundTripTime: make(HistogramVec, 0),
WalFsyncDuration: make(HistogramVec, 0),
}
}
// SummaryKind returns the summary of etcd metrics.
func (l *EtcdMetrics) SummaryKind() string {
return "EtcdMetrics"
}
// PrintHumanReadable returns etcd metrics with JSON format.
func (l *EtcdMetrics) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
// PrintJSON returns etcd metrics with JSON format.
func (l *EtcdMetrics) PrintJSON() string {
return PrettyPrintJSON(l)
}
// EtcdMetricsCollector is a struct for managing etcd metrics collector.
type EtcdMetricsCollector struct {
stopCh chan struct{}
wg *sync.WaitGroup
metrics *EtcdMetrics
}
// NewEtcdMetricsCollector creates a new etcd metrics collector.
func NewEtcdMetricsCollector() *EtcdMetricsCollector {
return &EtcdMetricsCollector{
stopCh: make(chan struct{}),
wg: &sync.WaitGroup{},
metrics: newEtcdMetrics(),
}
}
// extractMetricSamples parses the prometheus metric samples from the input string.
func extractMetricSamples(metricsBlob string) ([]*model.Sample, error) {
dec := expfmt.NewDecoder(strings.NewReader(metricsBlob), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
var samples []*model.Sample
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return samples, nil
}
return nil, err
}
samples = append(samples, v...)
}
}
func getEtcdMetrics(provider string, masterHostname string) ([]*model.Sample, error) {
// Etcd is only exposed on localhost level. We are using ssh method
if provider == "gke" || provider == "eks" {
e2elog.Logf("Not grabbing etcd metrics through master SSH: unsupported for %s", provider)
return nil, nil
}
cmd := "curl http://localhost:2379/metrics"
sshResult, err := e2essh.SSH(cmd, masterHostname+":22", provider)
if err != nil || sshResult.Code != 0 {
return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
}
data := sshResult.Stdout
return extractMetricSamples(data)
}
func getEtcdDatabaseSize(provider string, masterHostname string) (float64, error) {
samples, err := getEtcdMetrics(provider, masterHostname)
if err != nil {
return 0, err
}
for _, sample := range samples {
if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" {
return float64(sample.Value), nil
}
}
return 0, fmt.Errorf("Couldn't find etcd database size metric")
}
// StartCollecting starts to collect etcd db size metric periodically
// and updates MaxDatabaseSize accordingly.
func (mc *EtcdMetricsCollector) StartCollecting(interval time.Duration, provider string, masterHostname string) {
mc.wg.Add(1)
go func() {
defer mc.wg.Done()
for {
select {
case <-time.After(interval):
dbSize, err := getEtcdDatabaseSize(provider, masterHostname)
if err != nil {
e2elog.Logf("Failed to collect etcd database size")
continue
}
mc.metrics.MaxDatabaseSize = math.Max(mc.metrics.MaxDatabaseSize, dbSize)
case <-mc.stopCh:
return
}
}
}()
}
func convertSampleToBucket(sample *model.Sample, h *HistogramVec) {
labels := make(map[string]string)
for k, v := range sample.Metric {
if k != "le" {
labels[string(k)] = string(v)
}
}
var hist *Histogram
for i := range *h {
if reflect.DeepEqual(labels, (*h)[i].Labels) {
hist = &((*h)[i])
break
}
}
if hist == nil {
hist = newHistogram(labels)
*h = append(*h, *hist)
}
hist.Buckets[string(sample.Metric["le"])] = int(sample.Value)
}
// StopAndSummarize stops etcd metrics collector and summarizes the metrics.
func (mc *EtcdMetricsCollector) StopAndSummarize(provider string, masterHostname string) error {
close(mc.stopCh)
mc.wg.Wait()
// Do some one-off collection of metrics.
samples, err := getEtcdMetrics(provider, masterHostname)
if err != nil {
return err
}
for _, sample := range samples {
switch sample.Metric[model.MetricNameLabel] {
case "etcd_disk_backend_commit_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.BackendCommitDuration)
case "etcd_debugging_snap_save_total_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.SnapshotSaveTotalDuration)
case "etcd_disk_wal_fsync_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.WalFsyncDuration)
case "etcd_network_peer_round_trip_time_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.PeerRoundTripTime)
}
}
return nil
}
// GetMetrics returns metrics of etcd metrics collector.
func (mc *EtcdMetricsCollector) GetMetrics() *EtcdMetrics {
return mc.metrics
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
var interestingAPIServerMetrics = []string{
"apiserver_request_total",
// TODO(krzysied): apiserver_request_latencies_summary is a deprecated metric.
// It should be replaced with new metric.
"apiserver_request_latencies_summary",
"apiserver_init_events_total",
}
var interestingControllerManagerMetrics = []string{
"garbage_collector_attempt_to_delete_queue_latency",
"garbage_collector_attempt_to_delete_work_duration",
"garbage_collector_attempt_to_orphan_queue_latency",
"garbage_collector_attempt_to_orphan_work_duration",
"garbage_collector_dirty_processing_latency_microseconds",
"garbage_collector_event_processing_latency_microseconds",
"garbage_collector_graph_changes_queue_latency",
"garbage_collector_graph_changes_work_duration",
"garbage_collector_orphan_processing_latency_microseconds",
"namespace_queue_latency",
"namespace_queue_latency_sum",
"namespace_queue_latency_count",
"namespace_retries",
"namespace_work_duration",
"namespace_work_duration_sum",
"namespace_work_duration_count",
}
var interestingKubeletMetrics = []string{
"kubelet_docker_operations_errors_total",
"kubelet_docker_operations_duration_seconds",
"kubelet_pod_start_duration_seconds",
"kubelet_pod_worker_duration_seconds",
"kubelet_pod_worker_start_duration_seconds",
}
var interestingClusterAutoscalerMetrics = []string{
"function_duration_seconds",
"errors_total",
"evicted_pods_total",
}

View File

@ -20,7 +20,18 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"github.com/prometheus/common/model"
)
const (
@ -89,3 +100,122 @@ func (g *Grabber) getMetricsFromNode(nodeName string, kubeletPort int) (string,
return string(rawOutput), nil
}
}
// KubeletLatencyMetric stores metrics scraped from the kubelet server's /metric endpoint.
// TODO: Get some more structure around the metrics and this type
type KubeletLatencyMetric struct {
// eg: list, info, create
Operation string
// eg: sync_pods, pod_worker
Method string
// 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median.
Quantile float64
Latency time.Duration
}
// KubeletLatencyMetrics implements sort.Interface for []KubeletMetric based on
// the latency field.
type KubeletLatencyMetrics []KubeletLatencyMetric
func (a KubeletLatencyMetrics) Len() int { return len(a) }
func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
// If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber;
// or else, the function will try to get kubelet metrics directly from the node.
func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletMetrics, error) {
if c == nil {
return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
}
grabber, err := NewMetricsGrabber(c, nil, true, false, false, false, false)
if err != nil {
return KubeletMetrics{}, err
}
return grabber.GrabFromKubelet(nodeName)
}
// getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims
// the subsystem prefix.
func getKubeletMetrics(c clientset.Interface, nodeName string) (KubeletMetrics, error) {
ms, err := getKubeletMetricsFromNode(c, nodeName)
if err != nil {
return KubeletMetrics{}, err
}
kubeletMetrics := make(KubeletMetrics)
for name, samples := range ms {
const prefix = kubeletmetrics.KubeletSubsystem + "_"
if !strings.HasPrefix(name, prefix) {
// Not a kubelet metric.
continue
}
method := strings.TrimPrefix(name, prefix)
kubeletMetrics[method] = samples
}
return kubeletMetrics, nil
}
// GetDefaultKubeletLatencyMetrics calls GetKubeletLatencyMetrics with a set of default metricNames
// identifying common latency metrics.
// Note that the KubeletMetrics passed in should not contain subsystem prefix.
func GetDefaultKubeletLatencyMetrics(ms KubeletMetrics) KubeletLatencyMetrics {
latencyMetricNames := sets.NewString(
kubeletmetrics.PodWorkerDurationKey,
kubeletmetrics.PodWorkerStartDurationKey,
kubeletmetrics.PodStartDurationKey,
kubeletmetrics.CgroupManagerOperationsKey,
dockermetrics.DockerOperationsLatencyKey,
kubeletmetrics.PodWorkerStartDurationKey,
kubeletmetrics.PLEGRelistDurationKey,
)
return GetKubeletLatencyMetrics(ms, latencyMetricNames)
}
// GetKubeletLatencyMetrics filters ms to include only those contained in the metricNames set,
// then constructs a KubeletLatencyMetrics list based on the samples associated with those metrics.
func GetKubeletLatencyMetrics(ms KubeletMetrics, filterMetricNames sets.String) KubeletLatencyMetrics {
var latencyMetrics KubeletLatencyMetrics
for name, samples := range ms {
if !filterMetricNames.Has(name) {
continue
}
for _, sample := range samples {
latency := sample.Value
operation := string(sample.Metric["operation_type"])
var quantile float64
if val, ok := sample.Metric[model.QuantileLabel]; ok {
var err error
if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
continue
}
}
latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{
Operation: operation,
Method: name,
Quantile: quantile,
Latency: time.Duration(int64(latency)) * time.Microsecond,
})
}
}
return latencyMetrics
}
// HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
func HighLatencyKubeletOperations(c clientset.Interface, threshold time.Duration, nodeName string, logFunc func(fmt string, args ...interface{})) (KubeletLatencyMetrics, error) {
ms, err := getKubeletMetrics(c, nodeName)
if err != nil {
return KubeletLatencyMetrics{}, err
}
latencyMetrics := GetDefaultKubeletLatencyMetrics(ms)
sort.Sort(latencyMetrics)
var badMetrics KubeletLatencyMetrics
logFunc("\nLatency metrics for node %v", nodeName)
for _, m := range latencyMetrics {
if m.Latency > threshold {
badMetrics = append(badMetrics, m)
e2elog.Logf("%+v", m)
}
}
return badMetrics, nil
}

View File

@ -0,0 +1,363 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/master/ports"
schedulermetric "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/util/system"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"github.com/onsi/gomega"
"github.com/prometheus/common/model"
)
const (
// SingleCallTimeout is how long to try single API calls (like 'get' or 'list'). Used to prevent
// transient failures from failing tests.
// TODO: client should not apply this timeout to Watch calls. Increased from 30s until that is fixed.
SingleCallTimeout = 5 * time.Minute
// nodeStartupThreshold is a rough estimate of the time allocated for a pod to start on a node.
nodeStartupThreshold = 4 * time.Second
// We are setting 1s threshold for apicalls even in small clusters to avoid flakes.
// The problem is that if long GC is happening in small clusters (where we have e.g.
// 1-core master machines) and tests are pretty short, it may consume significant
// portion of CPU and basically stop all the real work.
// Increasing threshold to 1s is within our SLO and should solve this problem.
apiCallLatencyThreshold time.Duration = 1 * time.Second
// We use a higher threshold for list apicalls if the cluster is big (i.e having > 500 nodes)
// as list response sizes are bigger in general for big clusters. We also use a higher threshold
// for list calls at cluster scope (this includes non-namespaced and all-namespaced calls).
apiListCallLatencyThreshold time.Duration = 5 * time.Second
apiClusterScopeListCallThreshold time.Duration = 10 * time.Second
bigClusterNodeCountThreshold = 500
)
var schedulingLatencyMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_" + schedulermetric.SchedulingLatencyName)
func readLatencyMetrics(c clientset.Interface) (*APIResponsiveness, error) {
var a APIResponsiveness
body, err := getMetrics(c)
if err != nil {
return nil, err
}
samples, err := extractMetricSamples(body)
if err != nil {
return nil, err
}
ignoredResources := sets.NewString("events")
// TODO: figure out why we're getting non-capitalized proxy and fix this.
ignoredVerbs := sets.NewString("WATCH", "WATCHLIST", "PROXY", "proxy", "CONNECT")
for _, sample := range samples {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
// apiserver_request_total{resource="pods",verb="LIST",client="kubectl",code="200",contentType="json"} 233
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" &&
sample.Metric[model.MetricNameLabel] != "apiserver_request_total" {
continue
}
resource := string(sample.Metric["resource"])
subresource := string(sample.Metric["subresource"])
verb := string(sample.Metric["verb"])
scope := string(sample.Metric["scope"])
if ignoredResources.Has(resource) || ignoredVerbs.Has(verb) {
continue
}
switch sample.Metric[model.MetricNameLabel] {
case "apiserver_request_latencies_summary":
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return nil, err
}
a.addMetricRequestLatency(resource, subresource, verb, scope, quantile, time.Duration(int64(latency))*time.Microsecond)
case "apiserver_request_total":
count := sample.Value
a.addMetricRequestCount(resource, subresource, verb, scope, int(count))
}
}
return &a, err
}
// HighLatencyRequests prints top five summary metrics for request types with latency and returns
// number of such request types above threshold. We use a higher threshold for
// list calls if nodeCount is above a given threshold (i.e. cluster is big).
func HighLatencyRequests(c clientset.Interface, nodeCount int) (int, *APIResponsiveness, error) {
isBigCluster := (nodeCount > bigClusterNodeCountThreshold)
metrics, err := readLatencyMetrics(c)
if err != nil {
return 0, metrics, err
}
sort.Sort(sort.Reverse(metrics))
badMetrics := 0
top := 5
for i := range metrics.APICalls {
latency := metrics.APICalls[i].Latency.Perc99
isListCall := (metrics.APICalls[i].Verb == "LIST")
isClusterScopedCall := (metrics.APICalls[i].Scope == "cluster")
isBad := false
latencyThreshold := apiCallLatencyThreshold
if isListCall && isBigCluster {
latencyThreshold = apiListCallLatencyThreshold
if isClusterScopedCall {
latencyThreshold = apiClusterScopeListCallThreshold
}
}
if latency > latencyThreshold {
isBad = true
badMetrics++
}
if top > 0 || isBad {
top--
prefix := ""
if isBad {
prefix = "WARNING "
}
e2elog.Logf("%vTop latency metric: %+v", prefix, metrics.APICalls[i])
}
}
return badMetrics, metrics, nil
}
// VerifyLatencyWithinThreshold verifies whether 50, 90 and 99th percentiles of a latency metric are
// within the expected threshold.
func VerifyLatencyWithinThreshold(threshold, actual LatencyMetric, metricName string) error {
if actual.Perc50 > threshold.Perc50 {
return fmt.Errorf("too high %v latency 50th percentile: %v", metricName, actual.Perc50)
}
if actual.Perc90 > threshold.Perc90 {
return fmt.Errorf("too high %v latency 90th percentile: %v", metricName, actual.Perc90)
}
if actual.Perc99 > threshold.Perc99 {
return fmt.Errorf("too high %v latency 99th percentile: %v", metricName, actual.Perc99)
}
return nil
}
// ResetMetrics resets latency metrics in apiserver.
func ResetMetrics(c clientset.Interface) error {
e2elog.Logf("Resetting latency metrics in apiserver...")
body, err := c.CoreV1().RESTClient().Delete().AbsPath("/metrics").DoRaw()
if err != nil {
return err
}
if string(body) != "metrics reset\n" {
return fmt.Errorf("Unexpected response: %q", string(body))
}
return nil
}
// Retrieves metrics information.
func getMetrics(c clientset.Interface) (string, error) {
body, err := c.CoreV1().RESTClient().Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
// Sends REST request to kube scheduler metrics
func sendRestRequestToScheduler(c clientset.Interface, op, provider, cloudMasterName, masterHostname string) (string, error) {
opUpper := strings.ToUpper(op)
if opUpper != "GET" && opUpper != "DELETE" {
return "", fmt.Errorf("Unknown REST request")
}
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
// The following 4 lines are intended to replace framework.ExpectNoError(err)
if err != nil {
e2elog.Logf("Unexpected error occurred: %v", err)
}
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
var masterRegistered = false
for _, node := range nodes.Items {
if system.IsMasterNode(node.Name) {
masterRegistered = true
}
}
var responseText string
if masterRegistered {
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
body, err := c.CoreV1().RESTClient().Verb(opUpper).
Context(ctx).
Namespace(metav1.NamespaceSystem).
Resource("pods").
Name(fmt.Sprintf("kube-scheduler-%v:%v", cloudMasterName, ports.InsecureSchedulerPort)).
SubResource("proxy").
Suffix("metrics").
Do().Raw()
// The following 4 lines are intended to replace
// framework.ExpectNoError(err).
if err != nil {
e2elog.Logf("Unexpected error occurred: %v", err)
}
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
responseText = string(body)
} else {
// If master is not registered fall back to old method of using SSH.
if provider == "gke" || provider == "eks" {
e2elog.Logf("Not grabbing scheduler metrics through master SSH: unsupported for %s", provider)
return "", nil
}
cmd := "curl -X " + opUpper + " http://localhost:10251/metrics"
sshResult, err := e2essh.SSH(cmd, masterHostname+":22", provider)
if err != nil || sshResult.Code != 0 {
return "", fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
}
responseText = sshResult.Stdout
}
return responseText, nil
}
// Retrieves scheduler latency metrics.
func getSchedulingLatency(c clientset.Interface, provider, cloudMasterName, masterHostname string) (*SchedulingMetrics, error) {
result := SchedulingMetrics{}
data, err := sendRestRequestToScheduler(c, "GET", provider, cloudMasterName, masterHostname)
if err != nil {
return nil, err
}
samples, err := extractMetricSamples(data)
if err != nil {
return nil, err
}
for _, sample := range samples {
if sample.Metric[model.MetricNameLabel] != schedulingLatencyMetricName {
continue
}
var metric *LatencyMetric
switch sample.Metric[schedulermetric.OperationLabel] {
case schedulermetric.PredicateEvaluation:
metric = &result.PredicateEvaluationLatency
case schedulermetric.PriorityEvaluation:
metric = &result.PriorityEvaluationLatency
case schedulermetric.PreemptionEvaluation:
metric = &result.PreemptionEvaluationLatency
case schedulermetric.Binding:
metric = &result.BindingLatency
}
if metric == nil {
continue
}
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return nil, err
}
setQuantile(metric, quantile, time.Duration(int64(float64(sample.Value)*float64(time.Second))))
}
return &result, nil
}
// VerifySchedulerLatency verifies (currently just by logging them) the scheduling latencies.
func VerifySchedulerLatency(c clientset.Interface, provider, cloudMasterName, masterHostname string) (*SchedulingMetrics, error) {
latency, err := getSchedulingLatency(c, provider, cloudMasterName, masterHostname)
if err != nil {
return nil, err
}
return latency, nil
}
// ResetSchedulerMetrics sends a DELETE request to kube-scheduler for resetting metrics.
func ResetSchedulerMetrics(c clientset.Interface, provider, cloudMasterName, masterHostname string) error {
responseText, err := sendRestRequestToScheduler(c, "DELETE", provider, cloudMasterName, masterHostname)
if err != nil {
return fmt.Errorf("Unexpected response: %q", responseText)
}
return nil
}
// PodLatencyData encapsulates pod startup latency information.
type PodLatencyData struct {
// Name of the pod
Name string
// Node this pod was running on
Node string
// Latency information related to pod startuptime
Latency time.Duration
}
// LatencySlice is an array of PodLatencyData which encapsulates pod startup latency information.
type LatencySlice []PodLatencyData
func (a LatencySlice) Len() int { return len(a) }
func (a LatencySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a LatencySlice) Less(i, j int) bool { return a[i].Latency < a[j].Latency }
// ExtractLatencyMetrics returns latency metrics for each percentile(50th, 90th and 99th).
func ExtractLatencyMetrics(latencies []PodLatencyData) LatencyMetric {
length := len(latencies)
perc50 := latencies[int(math.Ceil(float64(length*50)/100))-1].Latency
perc90 := latencies[int(math.Ceil(float64(length*90)/100))-1].Latency
perc99 := latencies[int(math.Ceil(float64(length*99)/100))-1].Latency
perc100 := latencies[length-1].Latency
return LatencyMetric{Perc50: perc50, Perc90: perc90, Perc99: perc99, Perc100: perc100}
}
// LogSuspiciousLatency logs metrics/docker errors from all nodes that had slow startup times
// If latencyDataLag is nil then it will be populated from latencyData
func LogSuspiciousLatency(latencyData []PodLatencyData, latencyDataLag []PodLatencyData, nodeCount int, c clientset.Interface) {
if latencyDataLag == nil {
latencyDataLag = latencyData
}
for _, l := range latencyData {
if l.Latency > nodeStartupThreshold {
HighLatencyKubeletOperations(c, 1*time.Second, l.Node, e2elog.Logf)
}
}
e2elog.Logf("Approx throughput: %v pods/min",
float64(nodeCount)/(latencyDataLag[len(latencyDataLag)-1].Latency.Minutes()))
}
// PrintLatencies outputs latencies to log with readable format.
func PrintLatencies(latencies []PodLatencyData, header string) {
metrics := ExtractLatencyMetrics(latencies)
e2elog.Logf("10%% %s: %v", header, latencies[(len(latencies)*9)/10:])
e2elog.Logf("perc50: %v, perc90: %v, perc99: %v", metrics.Perc50, metrics.Perc90, metrics.Perc99)
}

View File

@ -0,0 +1,81 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"time"
e2eperftype "k8s.io/kubernetes/test/e2e/perftype"
)
// LatencyMetric is a struct for dashboard metrics.
type LatencyMetric struct {
Perc50 time.Duration `json:"Perc50"`
Perc90 time.Duration `json:"Perc90"`
Perc99 time.Duration `json:"Perc99"`
Perc100 time.Duration `json:"Perc100"`
}
// PodStartupLatency is a struct for managing latency of pod startup.
type PodStartupLatency struct {
CreateToScheduleLatency LatencyMetric `json:"createToScheduleLatency"`
ScheduleToRunLatency LatencyMetric `json:"scheduleToRunLatency"`
RunToWatchLatency LatencyMetric `json:"runToWatchLatency"`
ScheduleToWatchLatency LatencyMetric `json:"scheduleToWatchLatency"`
E2ELatency LatencyMetric `json:"e2eLatency"`
}
// SummaryKind returns the summary of pod startup latency.
func (l *PodStartupLatency) SummaryKind() string {
return "PodStartupLatency"
}
// PrintHumanReadable returns pod startup letency with JSON format.
func (l *PodStartupLatency) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
// PrintJSON returns pod startup letency with JSON format.
func (l *PodStartupLatency) PrintJSON() string {
return PrettyPrintJSON(PodStartupLatencyToPerfData(l))
}
func latencyToPerfData(l LatencyMetric, name string) e2eperftype.DataItem {
return e2eperftype.DataItem{
Data: map[string]float64{
"Perc50": float64(l.Perc50) / 1000000, // us -> ms
"Perc90": float64(l.Perc90) / 1000000,
"Perc99": float64(l.Perc99) / 1000000,
"Perc100": float64(l.Perc100) / 1000000,
},
Unit: "ms",
Labels: map[string]string{
"Metric": name,
},
}
}
// PodStartupLatencyToPerfData transforms PodStartupLatency to PerfData.
func PodStartupLatencyToPerfData(latency *PodStartupLatency) *e2eperftype.PerfData {
perfData := &e2eperftype.PerfData{Version: currentAPICallMetricsVersion}
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.CreateToScheduleLatency, "create_to_schedule"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.ScheduleToRunLatency, "schedule_to_run"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.RunToWatchLatency, "run_to_watch"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.ScheduleToWatchLatency, "schedule_to_watch"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.E2ELatency, "pod_startup"))
return perfData
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
// SchedulingMetrics is a struct for managing scheduling metrics.
type SchedulingMetrics struct {
PredicateEvaluationLatency LatencyMetric `json:"predicateEvaluationLatency"`
PriorityEvaluationLatency LatencyMetric `json:"priorityEvaluationLatency"`
PreemptionEvaluationLatency LatencyMetric `json:"preemptionEvaluationLatency"`
BindingLatency LatencyMetric `json:"bindingLatency"`
ThroughputAverage float64 `json:"throughputAverage"`
ThroughputPerc50 float64 `json:"throughputPerc50"`
ThroughputPerc90 float64 `json:"throughputPerc90"`
ThroughputPerc99 float64 `json:"throughputPerc99"`
}
// SummaryKind returns the summary of scheduling metrics.
func (l *SchedulingMetrics) SummaryKind() string {
return "SchedulingMetrics"
}
// PrintHumanReadable returns scheduling metrics with JSON format.
func (l *SchedulingMetrics) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
// PrintJSON returns scheduling metrics with JSON format.
func (l *SchedulingMetrics) PrintJSON() string {
return PrettyPrintJSON(l)
}

View File

@ -1,856 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/master/ports"
schedulermetric "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/util/system"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework/metrics"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
)
const (
// NodeStartupThreshold is a rough estimate of the time allocated for a pod to start on a node.
NodeStartupThreshold = 4 * time.Second
// We are setting 1s threshold for apicalls even in small clusters to avoid flakes.
// The problem is that if long GC is happening in small clusters (where we have e.g.
// 1-core master machines) and tests are pretty short, it may consume significant
// portion of CPU and basically stop all the real work.
// Increasing threshold to 1s is within our SLO and should solve this problem.
apiCallLatencyThreshold time.Duration = 1 * time.Second
// We use a higher threshold for list apicalls if the cluster is big (i.e having > 500 nodes)
// as list response sizes are bigger in general for big clusters. We also use a higher threshold
// for list calls at cluster scope (this includes non-namespaced and all-namespaced calls).
apiListCallLatencyThreshold time.Duration = 5 * time.Second
apiClusterScopeListCallThreshold time.Duration = 10 * time.Second
bigClusterNodeCountThreshold = 500
// Cluster Autoscaler metrics names
caFunctionMetric = "cluster_autoscaler_function_duration_seconds_bucket"
caFunctionMetricLabel = "function"
)
// MetricsForE2E is metrics collection of components.
type MetricsForE2E metrics.Collection
func (m *MetricsForE2E) filterMetrics() {
apiServerMetrics := make(metrics.APIServerMetrics)
for _, metric := range interestingAPIServerMetrics {
apiServerMetrics[metric] = (*m).APIServerMetrics[metric]
}
controllerManagerMetrics := make(metrics.ControllerManagerMetrics)
for _, metric := range interestingControllerManagerMetrics {
controllerManagerMetrics[metric] = (*m).ControllerManagerMetrics[metric]
}
kubeletMetrics := make(map[string]metrics.KubeletMetrics)
for kubelet, grabbed := range (*m).KubeletMetrics {
kubeletMetrics[kubelet] = make(metrics.KubeletMetrics)
for _, metric := range interestingKubeletMetrics {
kubeletMetrics[kubelet][metric] = grabbed[metric]
}
}
(*m).APIServerMetrics = apiServerMetrics
(*m).ControllerManagerMetrics = controllerManagerMetrics
(*m).KubeletMetrics = kubeletMetrics
}
func printSample(sample *model.Sample) string {
buf := make([]string, 0)
// Id is a VERY special label. For 'normal' container it's useless, but it's necessary
// for 'system' containers (e.g. /docker-daemon, /kubelet, etc.). We know if that's the
// case by checking if there's a label "kubernetes_container_name" present. It's hacky
// but it works...
_, normalContainer := sample.Metric["kubernetes_container_name"]
for k, v := range sample.Metric {
if strings.HasPrefix(string(k), "__") {
continue
}
if string(k) == "id" && normalContainer {
continue
}
buf = append(buf, fmt.Sprintf("%v=%v", string(k), v))
}
return fmt.Sprintf("[%v] = %v", strings.Join(buf, ","), sample.Value)
}
// PrintHumanReadable returns e2e metrics with JSON format.
func (m *MetricsForE2E) PrintHumanReadable() string {
buf := bytes.Buffer{}
for _, interestingMetric := range interestingAPIServerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).APIServerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
}
}
for _, interestingMetric := range interestingControllerManagerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ControllerManagerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
}
}
for _, interestingMetric := range interestingClusterAutoscalerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ClusterAutoscalerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
}
}
for kubelet, grabbed := range (*m).KubeletMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", kubelet))
for _, interestingMetric := range interestingKubeletMetrics {
buf.WriteString(fmt.Sprintf("\tFor %v:\n", interestingMetric))
for _, sample := range grabbed[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t\t%v\n", printSample(sample)))
}
}
}
return buf.String()
}
// PrintJSON returns e2e metrics with JSON format.
func (m *MetricsForE2E) PrintJSON() string {
m.filterMetrics()
return PrettyPrintJSON(m)
}
// SummaryKind returns the summary of e2e metrics.
func (m *MetricsForE2E) SummaryKind() string {
return "MetricsForE2E"
}
var schedulingLatencyMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_" + schedulermetric.SchedulingLatencyName)
var interestingAPIServerMetrics = []string{
"apiserver_request_total",
// TODO(krzysied): apiserver_request_latencies_summary is a deprecated metric.
// It should be replaced with new metric.
"apiserver_request_latencies_summary",
"apiserver_init_events_total",
}
var interestingControllerManagerMetrics = []string{
"garbage_collector_attempt_to_delete_queue_latency",
"garbage_collector_attempt_to_delete_work_duration",
"garbage_collector_attempt_to_orphan_queue_latency",
"garbage_collector_attempt_to_orphan_work_duration",
"garbage_collector_dirty_processing_latency_microseconds",
"garbage_collector_event_processing_latency_microseconds",
"garbage_collector_graph_changes_queue_latency",
"garbage_collector_graph_changes_work_duration",
"garbage_collector_orphan_processing_latency_microseconds",
"namespace_queue_latency",
"namespace_queue_latency_sum",
"namespace_queue_latency_count",
"namespace_retries",
"namespace_work_duration",
"namespace_work_duration_sum",
"namespace_work_duration_count",
}
var interestingKubeletMetrics = []string{
"kubelet_docker_operations_errors_total",
"kubelet_docker_operations_duration_seconds",
"kubelet_pod_start_duration_seconds",
"kubelet_pod_worker_duration_seconds",
"kubelet_pod_worker_start_duration_seconds",
}
var interestingClusterAutoscalerMetrics = []string{
"function_duration_seconds",
"errors_total",
"evicted_pods_total",
}
// LatencyMetric is a struct for dashboard metrics.
type LatencyMetric struct {
Perc50 time.Duration `json:"Perc50"`
Perc90 time.Duration `json:"Perc90"`
Perc99 time.Duration `json:"Perc99"`
Perc100 time.Duration `json:"Perc100"`
}
// PodStartupLatency is a struct for managing latency of pod startup.
type PodStartupLatency struct {
CreateToScheduleLatency LatencyMetric `json:"createToScheduleLatency"`
ScheduleToRunLatency LatencyMetric `json:"scheduleToRunLatency"`
RunToWatchLatency LatencyMetric `json:"runToWatchLatency"`
ScheduleToWatchLatency LatencyMetric `json:"scheduleToWatchLatency"`
E2ELatency LatencyMetric `json:"e2eLatency"`
}
// SummaryKind returns the summary of pod startup latency.
func (l *PodStartupLatency) SummaryKind() string {
return "PodStartupLatency"
}
// PrintHumanReadable returns pod startup letency with JSON format.
func (l *PodStartupLatency) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
// PrintJSON returns pod startup letency with JSON format.
func (l *PodStartupLatency) PrintJSON() string {
return PrettyPrintJSON(PodStartupLatencyToPerfData(l))
}
// SchedulingMetrics is a struct for managing scheduling metrics.
type SchedulingMetrics struct {
PredicateEvaluationLatency LatencyMetric `json:"predicateEvaluationLatency"`
PriorityEvaluationLatency LatencyMetric `json:"priorityEvaluationLatency"`
PreemptionEvaluationLatency LatencyMetric `json:"preemptionEvaluationLatency"`
BindingLatency LatencyMetric `json:"bindingLatency"`
ThroughputAverage float64 `json:"throughputAverage"`
ThroughputPerc50 float64 `json:"throughputPerc50"`
ThroughputPerc90 float64 `json:"throughputPerc90"`
ThroughputPerc99 float64 `json:"throughputPerc99"`
}
// SummaryKind returns the summary of scheduling metrics.
func (l *SchedulingMetrics) SummaryKind() string {
return "SchedulingMetrics"
}
// PrintHumanReadable returns scheduling metrics with JSON format.
func (l *SchedulingMetrics) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
// PrintJSON returns scheduling metrics with JSON format.
func (l *SchedulingMetrics) PrintJSON() string {
return PrettyPrintJSON(l)
}
// Histogram is a struct for managing histogram.
type Histogram struct {
Labels map[string]string `json:"labels"`
Buckets map[string]int `json:"buckets"`
}
// HistogramVec is an array of Histogram.
type HistogramVec []Histogram
func newHistogram(labels map[string]string) *Histogram {
return &Histogram{
Labels: labels,
Buckets: make(map[string]int),
}
}
// EtcdMetrics is a struct for managing etcd metrics.
type EtcdMetrics struct {
BackendCommitDuration HistogramVec `json:"backendCommitDuration"`
SnapshotSaveTotalDuration HistogramVec `json:"snapshotSaveTotalDuration"`
PeerRoundTripTime HistogramVec `json:"peerRoundTripTime"`
WalFsyncDuration HistogramVec `json:"walFsyncDuration"`
MaxDatabaseSize float64 `json:"maxDatabaseSize"`
}
func newEtcdMetrics() *EtcdMetrics {
return &EtcdMetrics{
BackendCommitDuration: make(HistogramVec, 0),
SnapshotSaveTotalDuration: make(HistogramVec, 0),
PeerRoundTripTime: make(HistogramVec, 0),
WalFsyncDuration: make(HistogramVec, 0),
}
}
// SummaryKind returns the summary of etcd metrics.
func (l *EtcdMetrics) SummaryKind() string {
return "EtcdMetrics"
}
// PrintHumanReadable returns etcd metrics with JSON format.
func (l *EtcdMetrics) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
// PrintJSON returns etcd metrics with JSON format.
func (l *EtcdMetrics) PrintJSON() string {
return PrettyPrintJSON(l)
}
// EtcdMetricsCollector is a struct for managing etcd metrics collector.
type EtcdMetricsCollector struct {
stopCh chan struct{}
wg *sync.WaitGroup
metrics *EtcdMetrics
}
// NewEtcdMetricsCollector creates a new etcd metrics collector.
func NewEtcdMetricsCollector() *EtcdMetricsCollector {
return &EtcdMetricsCollector{
stopCh: make(chan struct{}),
wg: &sync.WaitGroup{},
metrics: newEtcdMetrics(),
}
}
func getEtcdMetrics() ([]*model.Sample, error) {
// Etcd is only exposed on localhost level. We are using ssh method
if TestContext.Provider == "gke" || TestContext.Provider == "eks" {
e2elog.Logf("Not grabbing etcd metrics through master SSH: unsupported for %s", TestContext.Provider)
return nil, nil
}
cmd := "curl http://localhost:2379/metrics"
sshResult, err := e2essh.SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
if err != nil || sshResult.Code != 0 {
return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
}
data := sshResult.Stdout
return extractMetricSamples(data)
}
func getEtcdDatabaseSize() (float64, error) {
samples, err := getEtcdMetrics()
if err != nil {
return 0, err
}
for _, sample := range samples {
if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" {
return float64(sample.Value), nil
}
}
return 0, fmt.Errorf("Couldn't find etcd database size metric")
}
// StartCollecting starts to collect etcd db size metric periodically
// and updates MaxDatabaseSize accordingly.
func (mc *EtcdMetricsCollector) StartCollecting(interval time.Duration) {
mc.wg.Add(1)
go func() {
defer mc.wg.Done()
for {
select {
case <-time.After(interval):
dbSize, err := getEtcdDatabaseSize()
if err != nil {
e2elog.Logf("Failed to collect etcd database size")
continue
}
mc.metrics.MaxDatabaseSize = math.Max(mc.metrics.MaxDatabaseSize, dbSize)
case <-mc.stopCh:
return
}
}
}()
}
// StopAndSummarize stops etcd metrics collector and summarizes the metrics.
func (mc *EtcdMetricsCollector) StopAndSummarize() error {
close(mc.stopCh)
mc.wg.Wait()
// Do some one-off collection of metrics.
samples, err := getEtcdMetrics()
if err != nil {
return err
}
for _, sample := range samples {
switch sample.Metric[model.MetricNameLabel] {
case "etcd_disk_backend_commit_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.BackendCommitDuration)
case "etcd_debugging_snap_save_total_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.SnapshotSaveTotalDuration)
case "etcd_disk_wal_fsync_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.WalFsyncDuration)
case "etcd_network_peer_round_trip_time_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.PeerRoundTripTime)
}
}
return nil
}
// GetMetrics returns metrics of etcd metrics collector.
func (mc *EtcdMetricsCollector) GetMetrics() *EtcdMetrics {
return mc.metrics
}
// APICall is a struct for managing API call.
type APICall struct {
Resource string `json:"resource"`
Subresource string `json:"subresource"`
Verb string `json:"verb"`
Scope string `json:"scope"`
Latency LatencyMetric `json:"latency"`
Count int `json:"count"`
}
// APIResponsiveness is a struct for managing multiple API calls.
type APIResponsiveness struct {
APICalls []APICall `json:"apicalls"`
}
// SummaryKind returns the summary of API responsiveness.
func (a *APIResponsiveness) SummaryKind() string {
return "APIResponsiveness"
}
// PrintHumanReadable returns metrics with JSON format.
func (a *APIResponsiveness) PrintHumanReadable() string {
return PrettyPrintJSON(a)
}
// PrintJSON returns metrics of PerfData(50, 90 and 99th percentiles) with JSON format.
func (a *APIResponsiveness) PrintJSON() string {
return PrettyPrintJSON(APICallToPerfData(a))
}
func (a *APIResponsiveness) Len() int { return len(a.APICalls) }
func (a *APIResponsiveness) Swap(i, j int) {
a.APICalls[i], a.APICalls[j] = a.APICalls[j], a.APICalls[i]
}
func (a *APIResponsiveness) Less(i, j int) bool {
return a.APICalls[i].Latency.Perc99 < a.APICalls[j].Latency.Perc99
}
// Set request latency for a particular quantile in the APICall metric entry (creating one if necessary).
// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func (a *APIResponsiveness) addMetricRequestLatency(resource, subresource, verb, scope string, quantile float64, latency time.Duration) {
for i, apicall := range a.APICalls {
if apicall.Resource == resource && apicall.Subresource == subresource && apicall.Verb == verb && apicall.Scope == scope {
a.APICalls[i] = setQuantileAPICall(apicall, quantile, latency)
return
}
}
apicall := setQuantileAPICall(APICall{Resource: resource, Subresource: subresource, Verb: verb, Scope: scope}, quantile, latency)
a.APICalls = append(a.APICalls, apicall)
}
// 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func setQuantileAPICall(apicall APICall, quantile float64, latency time.Duration) APICall {
setQuantile(&apicall.Latency, quantile, latency)
return apicall
}
// Only 0.5, 0.9 and 0.99 quantiles are supported.
func setQuantile(metric *LatencyMetric, quantile float64, latency time.Duration) {
switch quantile {
case 0.5:
metric.Perc50 = latency
case 0.9:
metric.Perc90 = latency
case 0.99:
metric.Perc99 = latency
}
}
// Add request count to the APICall metric entry (creating one if necessary).
func (a *APIResponsiveness) addMetricRequestCount(resource, subresource, verb, scope string, count int) {
for i, apicall := range a.APICalls {
if apicall.Resource == resource && apicall.Subresource == subresource && apicall.Verb == verb && apicall.Scope == scope {
a.APICalls[i].Count += count
return
}
}
apicall := APICall{Resource: resource, Subresource: subresource, Verb: verb, Count: count, Scope: scope}
a.APICalls = append(a.APICalls, apicall)
}
func readLatencyMetrics(c clientset.Interface) (*APIResponsiveness, error) {
var a APIResponsiveness
body, err := getMetrics(c)
if err != nil {
return nil, err
}
samples, err := extractMetricSamples(body)
if err != nil {
return nil, err
}
ignoredResources := sets.NewString("events")
// TODO: figure out why we're getting non-capitalized proxy and fix this.
ignoredVerbs := sets.NewString("WATCH", "WATCHLIST", "PROXY", "proxy", "CONNECT")
for _, sample := range samples {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
// apiserver_request_total{resource="pods",verb="LIST",client="kubectl",code="200",contentType="json"} 233
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" &&
sample.Metric[model.MetricNameLabel] != "apiserver_request_total" {
continue
}
resource := string(sample.Metric["resource"])
subresource := string(sample.Metric["subresource"])
verb := string(sample.Metric["verb"])
scope := string(sample.Metric["scope"])
if ignoredResources.Has(resource) || ignoredVerbs.Has(verb) {
continue
}
switch sample.Metric[model.MetricNameLabel] {
case "apiserver_request_latencies_summary":
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return nil, err
}
a.addMetricRequestLatency(resource, subresource, verb, scope, quantile, time.Duration(int64(latency))*time.Microsecond)
case "apiserver_request_total":
count := sample.Value
a.addMetricRequestCount(resource, subresource, verb, scope, int(count))
}
}
return &a, err
}
// HighLatencyRequests prints top five summary metrics for request types with latency and returns
// number of such request types above threshold. We use a higher threshold for
// list calls if nodeCount is above a given threshold (i.e. cluster is big).
func HighLatencyRequests(c clientset.Interface, nodeCount int) (int, *APIResponsiveness, error) {
isBigCluster := (nodeCount > bigClusterNodeCountThreshold)
metrics, err := readLatencyMetrics(c)
if err != nil {
return 0, metrics, err
}
sort.Sort(sort.Reverse(metrics))
badMetrics := 0
top := 5
for i := range metrics.APICalls {
latency := metrics.APICalls[i].Latency.Perc99
isListCall := (metrics.APICalls[i].Verb == "LIST")
isClusterScopedCall := (metrics.APICalls[i].Scope == "cluster")
isBad := false
latencyThreshold := apiCallLatencyThreshold
if isListCall && isBigCluster {
latencyThreshold = apiListCallLatencyThreshold
if isClusterScopedCall {
latencyThreshold = apiClusterScopeListCallThreshold
}
}
if latency > latencyThreshold {
isBad = true
badMetrics++
}
if top > 0 || isBad {
top--
prefix := ""
if isBad {
prefix = "WARNING "
}
e2elog.Logf("%vTop latency metric: %+v", prefix, metrics.APICalls[i])
}
}
return badMetrics, metrics, nil
}
// VerifyLatencyWithinThreshold verifies whether 50, 90 and 99th percentiles of a latency metric are
// within the expected threshold.
func VerifyLatencyWithinThreshold(threshold, actual LatencyMetric, metricName string) error {
if actual.Perc50 > threshold.Perc50 {
return fmt.Errorf("too high %v latency 50th percentile: %v", metricName, actual.Perc50)
}
if actual.Perc90 > threshold.Perc90 {
return fmt.Errorf("too high %v latency 90th percentile: %v", metricName, actual.Perc90)
}
if actual.Perc99 > threshold.Perc99 {
return fmt.Errorf("too high %v latency 99th percentile: %v", metricName, actual.Perc99)
}
return nil
}
// ResetMetrics resets latency metrics in apiserver.
func ResetMetrics(c clientset.Interface) error {
e2elog.Logf("Resetting latency metrics in apiserver...")
body, err := c.CoreV1().RESTClient().Delete().AbsPath("/metrics").DoRaw()
if err != nil {
return err
}
if string(body) != "metrics reset\n" {
return fmt.Errorf("Unexpected response: %q", string(body))
}
return nil
}
// Retrieves metrics information.
func getMetrics(c clientset.Interface) (string, error) {
body, err := c.CoreV1().RESTClient().Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
// Sends REST request to kube scheduler metrics
func sendRestRequestToScheduler(c clientset.Interface, op string) (string, error) {
opUpper := strings.ToUpper(op)
if opUpper != "GET" && opUpper != "DELETE" {
return "", fmt.Errorf("Unknown REST request")
}
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
ExpectNoError(err)
var masterRegistered = false
for _, node := range nodes.Items {
if system.IsMasterNode(node.Name) {
masterRegistered = true
}
}
var responseText string
if masterRegistered {
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
body, err := c.CoreV1().RESTClient().Verb(opUpper).
Context(ctx).
Namespace(metav1.NamespaceSystem).
Resource("pods").
Name(fmt.Sprintf("kube-scheduler-%v:%v", TestContext.CloudConfig.MasterName, ports.InsecureSchedulerPort)).
SubResource("proxy").
Suffix("metrics").
Do().Raw()
ExpectNoError(err)
responseText = string(body)
} else {
// If master is not registered fall back to old method of using SSH.
if TestContext.Provider == "gke" || TestContext.Provider == "eks" {
e2elog.Logf("Not grabbing scheduler metrics through master SSH: unsupported for %s", TestContext.Provider)
return "", nil
}
cmd := "curl -X " + opUpper + " http://localhost:10251/metrics"
sshResult, err := e2essh.SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
if err != nil || sshResult.Code != 0 {
return "", fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
}
responseText = sshResult.Stdout
}
return responseText, nil
}
// Retrieves scheduler latency metrics.
func getSchedulingLatency(c clientset.Interface) (*SchedulingMetrics, error) {
result := SchedulingMetrics{}
data, err := sendRestRequestToScheduler(c, "GET")
if err != nil {
return nil, err
}
samples, err := extractMetricSamples(data)
if err != nil {
return nil, err
}
for _, sample := range samples {
if sample.Metric[model.MetricNameLabel] != schedulingLatencyMetricName {
continue
}
var metric *LatencyMetric
switch sample.Metric[schedulermetric.OperationLabel] {
case schedulermetric.PredicateEvaluation:
metric = &result.PredicateEvaluationLatency
case schedulermetric.PriorityEvaluation:
metric = &result.PriorityEvaluationLatency
case schedulermetric.PreemptionEvaluation:
metric = &result.PreemptionEvaluationLatency
case schedulermetric.Binding:
metric = &result.BindingLatency
}
if metric == nil {
continue
}
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
if err != nil {
return nil, err
}
setQuantile(metric, quantile, time.Duration(int64(float64(sample.Value)*float64(time.Second))))
}
return &result, nil
}
// VerifySchedulerLatency verifies (currently just by logging them) the scheduling latencies.
func VerifySchedulerLatency(c clientset.Interface) (*SchedulingMetrics, error) {
latency, err := getSchedulingLatency(c)
if err != nil {
return nil, err
}
return latency, nil
}
// ResetSchedulerMetrics sends a DELETE request to kube-scheduler for resetting metrics.
func ResetSchedulerMetrics(c clientset.Interface) error {
responseText, err := sendRestRequestToScheduler(c, "DELETE")
if err != nil {
return fmt.Errorf("Unexpected response: %q", responseText)
}
return nil
}
func convertSampleToBucket(sample *model.Sample, h *HistogramVec) {
labels := make(map[string]string)
for k, v := range sample.Metric {
if k != "le" {
labels[string(k)] = string(v)
}
}
var hist *Histogram
for i := range *h {
if reflect.DeepEqual(labels, (*h)[i].Labels) {
hist = &((*h)[i])
break
}
}
if hist == nil {
hist = newHistogram(labels)
*h = append(*h, *hist)
}
hist.Buckets[string(sample.Metric["le"])] = int(sample.Value)
}
// PrettyPrintJSON converts metrics to JSON format.
func PrettyPrintJSON(metrics interface{}) string {
output := &bytes.Buffer{}
if err := json.NewEncoder(output).Encode(metrics); err != nil {
e2elog.Logf("Error building encoder: %v", err)
return ""
}
formatted := &bytes.Buffer{}
if err := json.Indent(formatted, output.Bytes(), "", " "); err != nil {
e2elog.Logf("Error indenting: %v", err)
return ""
}
return string(formatted.Bytes())
}
// extractMetricSamples parses the prometheus metric samples from the input string.
func extractMetricSamples(metricsBlob string) ([]*model.Sample, error) {
dec := expfmt.NewDecoder(strings.NewReader(metricsBlob), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
var samples []*model.Sample
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return samples, nil
}
return nil, err
}
samples = append(samples, v...)
}
}
// PodLatencyData encapsulates pod startup latency information.
type PodLatencyData struct {
// Name of the pod
Name string
// Node this pod was running on
Node string
// Latency information related to pod startuptime
Latency time.Duration
}
// LatencySlice is an array of PodLatencyData which encapsulates pod startup latency information.
type LatencySlice []PodLatencyData
func (a LatencySlice) Len() int { return len(a) }
func (a LatencySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a LatencySlice) Less(i, j int) bool { return a[i].Latency < a[j].Latency }
// ExtractLatencyMetrics returns latency metrics for each percentile(50th, 90th and 99th).
func ExtractLatencyMetrics(latencies []PodLatencyData) LatencyMetric {
length := len(latencies)
perc50 := latencies[int(math.Ceil(float64(length*50)/100))-1].Latency
perc90 := latencies[int(math.Ceil(float64(length*90)/100))-1].Latency
perc99 := latencies[int(math.Ceil(float64(length*99)/100))-1].Latency
perc100 := latencies[length-1].Latency
return LatencyMetric{Perc50: perc50, Perc90: perc90, Perc99: perc99, Perc100: perc100}
}
// LogSuspiciousLatency logs metrics/docker errors from all nodes that had slow startup times
// If latencyDataLag is nil then it will be populated from latencyData
func LogSuspiciousLatency(latencyData []PodLatencyData, latencyDataLag []PodLatencyData, nodeCount int, c clientset.Interface) {
if latencyDataLag == nil {
latencyDataLag = latencyData
}
for _, l := range latencyData {
if l.Latency > NodeStartupThreshold {
HighLatencyKubeletOperations(c, 1*time.Second, l.Node, e2elog.Logf)
}
}
e2elog.Logf("Approx throughput: %v pods/min",
float64(nodeCount)/(latencyDataLag[len(latencyDataLag)-1].Latency.Minutes()))
}
// PrintLatencies outputs latencies to log with readable format.
func PrintLatencies(latencies []PodLatencyData, header string) {
metrics := ExtractLatencyMetrics(latencies)
e2elog.Logf("10%% %s: %v", header, latencies[(len(latencies)*9)/10:])
e2elog.Logf("perc50: %v, perc90: %v, perc99: %v", metrics.Perc50, metrics.Perc90, metrics.Perc99)
}
func (m *MetricsForE2E) computeClusterAutoscalerMetricsDelta(before metrics.Collection) {
if beforeSamples, found := before.ClusterAutoscalerMetrics[caFunctionMetric]; found {
if afterSamples, found := m.ClusterAutoscalerMetrics[caFunctionMetric]; found {
beforeSamplesMap := make(map[string]*model.Sample)
for _, bSample := range beforeSamples {
beforeSamplesMap[makeKey(bSample.Metric[caFunctionMetricLabel], bSample.Metric["le"])] = bSample
}
for _, aSample := range afterSamples {
if bSample, found := beforeSamplesMap[makeKey(aSample.Metric[caFunctionMetricLabel], aSample.Metric["le"])]; found {
aSample.Value = aSample.Value - bSample.Value
}
}
}
}
}
func makeKey(a, b model.LabelValue) string {
return string(a) + "___" + string(b)
}

View File

@ -20,41 +20,14 @@ import (
"fmt"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
"k8s.io/kubernetes/test/e2e/perftype"
)
// TODO(random-liu): Change the tests to actually use PerfData from the beginning instead of
// translating one to the other here.
// currentAPICallMetricsVersion is the current apicall performance metrics version. We should
// bump up the version each time we make incompatible change to the metrics.
const currentAPICallMetricsVersion = "v1"
// APICallToPerfData transforms APIResponsiveness to PerfData.
func APICallToPerfData(apicalls *APIResponsiveness) *perftype.PerfData {
perfData := &perftype.PerfData{Version: currentAPICallMetricsVersion}
for _, apicall := range apicalls.APICalls {
item := perftype.DataItem{
Data: map[string]float64{
"Perc50": float64(apicall.Latency.Perc50) / 1000000, // us -> ms
"Perc90": float64(apicall.Latency.Perc90) / 1000000,
"Perc99": float64(apicall.Latency.Perc99) / 1000000,
},
Unit: "ms",
Labels: map[string]string{
"Verb": apicall.Verb,
"Resource": apicall.Resource,
"Subresource": apicall.Subresource,
"Scope": apicall.Scope,
"Count": fmt.Sprintf("%v", apicall.Count),
},
}
perfData.DataItems = append(perfData.DataItems, item)
}
return perfData
}
func latencyToPerfData(l LatencyMetric, name string) perftype.DataItem {
func latencyToPerfData(l e2emetrics.LatencyMetric, name string) perftype.DataItem {
return perftype.DataItem{
Data: map[string]float64{
"Perc50": float64(l.Perc50) / 1000000, // us -> ms
@ -69,17 +42,6 @@ func latencyToPerfData(l LatencyMetric, name string) perftype.DataItem {
}
}
// PodStartupLatencyToPerfData transforms PodStartupLatency to PerfData.
func PodStartupLatencyToPerfData(latency *PodStartupLatency) *perftype.PerfData {
perfData := &perftype.PerfData{Version: currentAPICallMetricsVersion}
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.CreateToScheduleLatency, "create_to_schedule"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.ScheduleToRunLatency, "schedule_to_run"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.RunToWatchLatency, "run_to_watch"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.ScheduleToWatchLatency, "schedule_to_watch"))
perfData.DataItems = append(perfData.DataItems, latencyToPerfData(latency.E2ELatency, "pod_startup"))
return perfData
}
// CurrentKubeletPerfMetricsVersion is the current kubelet performance metrics
// version. This is used by mutiple perf related data structures. We should
// bump up the version each time we make an incompatible change to the metrics.
@ -100,7 +62,7 @@ func CPUUsageToPerfData(usagePerNode NodesCPUSummary) *perftype.PerfData {
// If an error occurs, nothing will be printed.
func PrintPerfData(p *perftype.PerfData) {
// Notice that we must make sure the perftype.PerfResultEnd is in a new line.
if str := PrettyPrintJSON(p); str != "" {
if str := e2emetrics.PrettyPrintJSON(p); str != "" {
e2elog.Logf("%s %s\n%s", perftype.PerfResultTag, str, perftype.PerfResultEnd)
}
}

View File

@ -33,6 +33,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/util/system"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
)
// ResourceConstraint is a struct to hold constraints.
@ -72,7 +73,7 @@ func (s *ResourceUsageSummary) PrintHumanReadable() string {
// PrintJSON prints resource usage summary in JSON.
func (s *ResourceUsageSummary) PrintJSON() string {
return PrettyPrintJSON(*s)
return e2emetrics.PrettyPrintJSON(*s)
}
// SummaryKind returns string of ResourceUsageSummary

View File

@ -6,7 +6,7 @@ go_library(
importpath = "k8s.io/kubernetes/test/e2e/framework/timer",
visibility = ["//visibility:public"],
deps = [
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e/perftype:go_default_library",
],
)

View File

@ -22,7 +22,7 @@ import (
"bytes"
"fmt"
"k8s.io/kubernetes/test/e2e/framework"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
"k8s.io/kubernetes/test/e2e/perftype"
"sync"
)
@ -124,5 +124,5 @@ func (timer *TestPhaseTimer) PrintJSON() string {
data.DataItems[0].Labels["ended"] = "false"
}
}
return framework.PrettyPrintJSON(data)
return e2emetrics.PrettyPrintJSON(data)
}

View File

@ -39,6 +39,7 @@ go_library(
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e/framework/pod:go_default_library",
"//test/e2e/framework/timer:go_default_library",
"//test/utils:go_default_library",

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/framework/timer"
testutils "k8s.io/kubernetes/test/utils"
@ -425,7 +426,7 @@ var _ = SIGDescribe("Density", func() {
missingMeasurements := 0
var testPhaseDurations *timer.TestPhaseTimer
var profileGathererStopCh chan struct{}
var etcdMetricsCollector *framework.EtcdMetricsCollector
var etcdMetricsCollector *e2emetrics.EtcdMetricsCollector
// Gathers data prior to framework namespace teardown
ginkgo.AfterEach(func() {
@ -447,18 +448,18 @@ var _ = SIGDescribe("Density", func() {
NumberOfPods: totalPods,
Throughput: float32(totalPods) / float32(e2eStartupTime/time.Second),
}
e2elog.Logf("Cluster saturation time: %s", framework.PrettyPrintJSON(saturationData))
e2elog.Logf("Cluster saturation time: %s", e2emetrics.PrettyPrintJSON(saturationData))
summaries := make([]framework.TestDataSummary, 0, 2)
// Verify latency metrics.
highLatencyRequests, metrics, err := framework.HighLatencyRequests(c, nodeCount)
highLatencyRequests, metrics, err := e2emetrics.HighLatencyRequests(c, nodeCount)
framework.ExpectNoError(err)
if err == nil {
summaries = append(summaries, metrics)
}
// Summarize scheduler metrics.
latency, err := framework.VerifySchedulerLatency(c)
latency, err := e2emetrics.VerifySchedulerLatency(c, framework.TestContext.Provider, framework.TestContext.CloudConfig.MasterName, framework.GetMasterHost())
framework.ExpectNoError(err)
if err == nil {
// Compute avg and quantiles of throughput (excluding last element, that's usually an outlier).
@ -475,7 +476,7 @@ var _ = SIGDescribe("Density", func() {
}
// Summarize etcd metrics.
err = etcdMetricsCollector.StopAndSummarize()
err = etcdMetricsCollector.StopAndSummarize(framework.TestContext.Provider, framework.GetMasterHost())
framework.ExpectNoError(err)
if err == nil {
summaries = append(summaries, etcdMetricsCollector.GetMetrics())
@ -533,8 +534,8 @@ var _ = SIGDescribe("Density", func() {
uuid = string(utiluuid.NewUUID())
framework.ExpectNoError(framework.ResetSchedulerMetrics(c))
framework.ExpectNoError(framework.ResetMetrics(c))
framework.ExpectNoError(e2emetrics.ResetSchedulerMetrics(c, framework.TestContext.Provider, framework.TestContext.CloudConfig.MasterName, framework.GetMasterHost()))
framework.ExpectNoError(e2emetrics.ResetMetrics(c))
framework.ExpectNoError(os.Mkdir(fmt.Sprintf(framework.TestContext.OutputDir+"/%s", uuid), 0777))
e2elog.Logf("Listing nodes for easy debugging:\n")
@ -556,8 +557,8 @@ var _ = SIGDescribe("Density", func() {
profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "density", profileGatheringDelay)
// Start etcs metrics collection.
etcdMetricsCollector = framework.NewEtcdMetricsCollector()
etcdMetricsCollector.StartCollecting(time.Minute)
etcdMetricsCollector = e2emetrics.NewEtcdMetricsCollector()
etcdMetricsCollector.StartCollecting(time.Minute, framework.TestContext.Provider, framework.GetMasterHost())
})
type Density struct {
@ -941,11 +942,11 @@ var _ = SIGDescribe("Density", func() {
}
}
scheduleLag := make([]framework.PodLatencyData, 0)
startupLag := make([]framework.PodLatencyData, 0)
watchLag := make([]framework.PodLatencyData, 0)
schedToWatchLag := make([]framework.PodLatencyData, 0)
e2eLag := make([]framework.PodLatencyData, 0)
scheduleLag := make([]e2emetrics.PodLatencyData, 0)
startupLag := make([]e2emetrics.PodLatencyData, 0)
watchLag := make([]e2emetrics.PodLatencyData, 0)
schedToWatchLag := make([]e2emetrics.PodLatencyData, 0)
e2eLag := make([]e2emetrics.PodLatencyData, 0)
for name, create := range createTimes {
sched, ok := scheduleTimes[name]
@ -969,44 +970,44 @@ var _ = SIGDescribe("Density", func() {
missingMeasurements++
}
scheduleLag = append(scheduleLag, framework.PodLatencyData{Name: name, Node: node, Latency: sched.Time.Sub(create.Time)})
startupLag = append(startupLag, framework.PodLatencyData{Name: name, Node: node, Latency: run.Time.Sub(sched.Time)})
watchLag = append(watchLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(run.Time)})
schedToWatchLag = append(schedToWatchLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(sched.Time)})
e2eLag = append(e2eLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(create.Time)})
scheduleLag = append(scheduleLag, e2emetrics.PodLatencyData{Name: name, Node: node, Latency: sched.Time.Sub(create.Time)})
startupLag = append(startupLag, e2emetrics.PodLatencyData{Name: name, Node: node, Latency: run.Time.Sub(sched.Time)})
watchLag = append(watchLag, e2emetrics.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(run.Time)})
schedToWatchLag = append(schedToWatchLag, e2emetrics.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(sched.Time)})
e2eLag = append(e2eLag, e2emetrics.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(create.Time)})
}
sort.Sort(framework.LatencySlice(scheduleLag))
sort.Sort(framework.LatencySlice(startupLag))
sort.Sort(framework.LatencySlice(watchLag))
sort.Sort(framework.LatencySlice(schedToWatchLag))
sort.Sort(framework.LatencySlice(e2eLag))
sort.Sort(e2emetrics.LatencySlice(scheduleLag))
sort.Sort(e2emetrics.LatencySlice(startupLag))
sort.Sort(e2emetrics.LatencySlice(watchLag))
sort.Sort(e2emetrics.LatencySlice(schedToWatchLag))
sort.Sort(e2emetrics.LatencySlice(e2eLag))
framework.PrintLatencies(scheduleLag, "worst create-to-schedule latencies")
framework.PrintLatencies(startupLag, "worst schedule-to-run latencies")
framework.PrintLatencies(watchLag, "worst run-to-watch latencies")
framework.PrintLatencies(schedToWatchLag, "worst schedule-to-watch latencies")
framework.PrintLatencies(e2eLag, "worst e2e latencies")
e2emetrics.PrintLatencies(scheduleLag, "worst create-to-schedule latencies")
e2emetrics.PrintLatencies(startupLag, "worst schedule-to-run latencies")
e2emetrics.PrintLatencies(watchLag, "worst run-to-watch latencies")
e2emetrics.PrintLatencies(schedToWatchLag, "worst schedule-to-watch latencies")
e2emetrics.PrintLatencies(e2eLag, "worst e2e latencies")
// Capture latency metrics related to pod-startup.
podStartupLatency := &framework.PodStartupLatency{
CreateToScheduleLatency: framework.ExtractLatencyMetrics(scheduleLag),
ScheduleToRunLatency: framework.ExtractLatencyMetrics(startupLag),
RunToWatchLatency: framework.ExtractLatencyMetrics(watchLag),
ScheduleToWatchLatency: framework.ExtractLatencyMetrics(schedToWatchLag),
E2ELatency: framework.ExtractLatencyMetrics(e2eLag),
podStartupLatency := &e2emetrics.PodStartupLatency{
CreateToScheduleLatency: e2emetrics.ExtractLatencyMetrics(scheduleLag),
ScheduleToRunLatency: e2emetrics.ExtractLatencyMetrics(startupLag),
RunToWatchLatency: e2emetrics.ExtractLatencyMetrics(watchLag),
ScheduleToWatchLatency: e2emetrics.ExtractLatencyMetrics(schedToWatchLag),
E2ELatency: e2emetrics.ExtractLatencyMetrics(e2eLag),
}
f.TestSummaries = append(f.TestSummaries, podStartupLatency)
// Test whether e2e pod startup time is acceptable.
podStartupLatencyThreshold := framework.LatencyMetric{
podStartupLatencyThreshold := e2emetrics.LatencyMetric{
Perc50: PodStartupLatencyThreshold,
Perc90: PodStartupLatencyThreshold,
Perc99: PodStartupLatencyThreshold,
}
framework.ExpectNoError(framework.VerifyLatencyWithinThreshold(podStartupLatencyThreshold, podStartupLatency.E2ELatency, "pod startup"))
framework.ExpectNoError(e2emetrics.VerifyLatencyWithinThreshold(podStartupLatencyThreshold, podStartupLatency.E2ELatency, "pod startup"))
framework.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c)
e2emetrics.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c)
}
})
}

View File

@ -59,6 +59,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
"k8s.io/kubernetes/test/e2e/framework/timer"
testutils "k8s.io/kubernetes/test/utils"
@ -119,7 +120,7 @@ var _ = SIGDescribe("Load capacity", func() {
wg.Wait()
// Verify latency metrics
highLatencyRequests, metrics, err := framework.HighLatencyRequests(clientset, nodeCount)
highLatencyRequests, metrics, err := e2emetrics.HighLatencyRequests(clientset, nodeCount)
framework.ExpectNoError(err)
if err == nil {
summaries := make([]framework.TestDataSummary, 0, 2)
@ -164,7 +165,7 @@ var _ = SIGDescribe("Load capacity", func() {
err := framework.CheckTestingNSDeletedExcept(clientset, ns)
framework.ExpectNoError(err)
framework.ExpectNoError(framework.ResetMetrics(clientset))
framework.ExpectNoError(e2emetrics.ResetMetrics(clientset))
// Start apiserver CPU profile gatherer with frequency based on cluster size.
profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute

View File

@ -32,6 +32,7 @@ go_library(
"//staging/src/k8s.io/kubelet/config/v1beta1:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e/framework/pod:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",

View File

@ -31,6 +31,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -49,7 +50,7 @@ var _ = SIGDescribe("[Feature:Windows] Density [Serial] [Slow]", func() {
podsNr: 10,
interval: 0 * time.Millisecond,
// percentile limit of single pod startup latency
podStartupLimits: framework.LatencyMetric{
podStartupLimits: e2emetrics.LatencyMetric{
Perc50: 30 * time.Second,
Perc90: 54 * time.Second,
Perc99: 59 * time.Second,
@ -85,12 +86,12 @@ type densityTest struct {
// performance limits
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
podStartupLimits framework.LatencyMetric
podStartupLimits e2emetrics.LatencyMetric
podBatchStartupLimit time.Duration
}
// runDensityBatchTest runs the density batch pod creation test
func runDensityBatchTest(f *framework.Framework, testArg densityTest) (time.Duration, []framework.PodLatencyData) {
func runDensityBatchTest(f *framework.Framework, testArg densityTest) (time.Duration, []e2emetrics.PodLatencyData) {
const (
podType = "density_test_pod"
)
@ -127,7 +128,7 @@ func runDensityBatchTest(f *framework.Framework, testArg densityTest) (time.Dura
firstCreate metav1.Time
lastRunning metav1.Time
init = true
e2eLags = make([]framework.PodLatencyData, 0)
e2eLags = make([]e2emetrics.PodLatencyData, 0)
)
for name, create := range createTimes {
@ -135,7 +136,7 @@ func runDensityBatchTest(f *framework.Framework, testArg densityTest) (time.Dura
gomega.Expect(ok).To(gomega.Equal(true))
e2eLags = append(e2eLags,
framework.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
e2emetrics.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
if !init {
if firstCreate.Time.After(create.Time) {
@ -150,7 +151,7 @@ func runDensityBatchTest(f *framework.Framework, testArg densityTest) (time.Dura
}
}
sort.Sort(framework.LatencySlice(e2eLags))
sort.Sort(e2emetrics.LatencySlice(e2eLags))
batchLag := lastRunning.Time.Sub(firstCreate.Time)
deletePodsSync(f, pods)

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
"k8s.io/kubernetes/test/e2e/perftype"
nodeperftype "k8s.io/kubernetes/test/e2e_node/perftype"
)
@ -46,7 +47,7 @@ func dumpDataToFile(data interface{}, labels map[string]string, prefix string) {
fileName := path.Join(framework.TestContext.ReportDir, fmt.Sprintf("%s-%s-%s.json", prefix, framework.TestContext.ReportPrefix, testName))
labels["timestamp"] = strconv.FormatInt(time.Now().UTC().Unix(), 10)
e2elog.Logf("Dumping perf data for test %q to %q.", testName, fileName)
if err := ioutil.WriteFile(fileName, []byte(framework.PrettyPrintJSON(data)), 0644); err != nil {
if err := ioutil.WriteFile(fileName, []byte(e2emetrics.PrettyPrintJSON(data)), 0644); err != nil {
e2elog.Logf("Failed to write perf data for test %q to %q: %v", testName, fileName, err)
}
}
@ -81,7 +82,7 @@ func logDensityTimeSeries(rc *ResourceCollector, create, watch map[string]metav1
timeSeries.ResourceData = rc.GetResourceTimeSeries()
if framework.TestContext.ReportDir == "" {
e2elog.Logf("%s %s\n%s", TimeSeriesTag, framework.PrettyPrintJSON(timeSeries), TimeSeriesEnd)
e2elog.Logf("%s %s\n%s", TimeSeriesTag, e2emetrics.PrettyPrintJSON(timeSeries), TimeSeriesEnd)
return
}
dumpDataToFile(timeSeries, timeSeries.Labels, "time_series")
@ -105,7 +106,7 @@ func getCumulatedPodTimeSeries(timePerPod map[string]metav1.Time) []int64 {
}
// getLatencyPerfData returns perf data of pod startup latency.
func getLatencyPerfData(latency framework.LatencyMetric, testInfo map[string]string) *perftype.PerfData {
func getLatencyPerfData(latency e2emetrics.LatencyMetric, testInfo map[string]string) *perftype.PerfData {
return &perftype.PerfData{
Version: framework.CurrentKubeletPerfMetricsVersion,
DataItems: []perftype.DataItem{
@ -128,7 +129,7 @@ func getLatencyPerfData(latency framework.LatencyMetric, testInfo map[string]str
}
// getThroughputPerfData returns perf data of pod creation startup throughput.
func getThroughputPerfData(batchLag time.Duration, e2eLags []framework.PodLatencyData, podsNr int, testInfo map[string]string) *perftype.PerfData {
func getThroughputPerfData(batchLag time.Duration, e2eLags []e2emetrics.PodLatencyData, podsNr int, testInfo map[string]string) *perftype.PerfData {
return &perftype.PerfData{
Version: framework.CurrentKubeletPerfMetricsVersion,
DataItems: []perftype.DataItem{

View File

@ -36,7 +36,7 @@ import (
kubemetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework/metrics"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
imageutils "k8s.io/kubernetes/test/utils/image"
. "github.com/onsi/ginkgo"
@ -85,7 +85,7 @@ var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
kubeletstatsv1alpha1.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
},
// percentile limit of single pod startup latency
podStartupLimits: framework.LatencyMetric{
podStartupLimits: e2emetrics.LatencyMetric{
Perc50: 16 * time.Second,
Perc90: 18 * time.Second,
Perc99: 20 * time.Second,
@ -231,7 +231,7 @@ var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
kubeletstatsv1alpha1.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
kubeletstatsv1alpha1.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
},
podStartupLimits: framework.LatencyMetric{
podStartupLimits: e2emetrics.LatencyMetric{
Perc50: 5000 * time.Millisecond,
Perc90: 9000 * time.Millisecond,
Perc99: 10000 * time.Millisecond,
@ -304,7 +304,7 @@ type densityTest struct {
// performance limits
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
podStartupLimits framework.LatencyMetric
podStartupLimits e2emetrics.LatencyMetric
podBatchStartupLimit time.Duration
}
@ -321,7 +321,7 @@ func (dt *densityTest) getTestName() string {
// runDensityBatchTest runs the density batch pod creation test
func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg densityTest, testInfo map[string]string,
isLogTimeSeries bool) (time.Duration, []framework.PodLatencyData) {
isLogTimeSeries bool) (time.Duration, []e2emetrics.PodLatencyData) {
const (
podType = "density_test_pod"
sleepBeforeCreatePods = 30 * time.Second
@ -367,7 +367,7 @@ func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg
firstCreate metav1.Time
lastRunning metav1.Time
init = true
e2eLags = make([]framework.PodLatencyData, 0)
e2eLags = make([]e2emetrics.PodLatencyData, 0)
)
for name, create := range createTimes {
@ -375,7 +375,7 @@ func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg
Expect(ok).To(Equal(true))
e2eLags = append(e2eLags,
framework.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
e2emetrics.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
if !init {
if firstCreate.Time.After(create.Time) {
@ -390,7 +390,7 @@ func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg
}
}
sort.Sort(framework.LatencySlice(e2eLags))
sort.Sort(e2emetrics.LatencySlice(e2eLags))
batchLag := lastRunning.Time.Sub(firstCreate.Time)
rc.Stop()
@ -409,7 +409,7 @@ func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg
}
// runDensitySeqTest runs the density sequential pod creation test
func runDensitySeqTest(f *framework.Framework, rc *ResourceCollector, testArg densityTest, testInfo map[string]string) (time.Duration, []framework.PodLatencyData) {
func runDensitySeqTest(f *framework.Framework, rc *ResourceCollector, testArg densityTest, testInfo map[string]string) (time.Duration, []e2emetrics.PodLatencyData) {
const (
podType = "density_test_pod"
sleepBeforeCreatePods = 30 * time.Second
@ -455,7 +455,7 @@ func createBatchPodWithRateControl(f *framework.Framework, pods []*v1.Pod, inter
// getPodStartLatency gets prometheus metric 'pod start latency' from kubelet
func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) {
latencyMetrics := framework.KubeletLatencyMetrics{}
ms, err := metrics.GrabKubeletMetricsWithoutProxy(node, "/metrics")
ms, err := e2emetrics.GrabKubeletMetricsWithoutProxy(node, "/metrics")
framework.ExpectNoError(err, "Failed to get kubelet metrics without proxy in node %s", node)
for _, samples := range ms {
@ -519,37 +519,37 @@ func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes m
}
// createBatchPodSequential creates pods back-to-back in sequence.
func createBatchPodSequential(f *framework.Framework, pods []*v1.Pod) (time.Duration, []framework.PodLatencyData) {
func createBatchPodSequential(f *framework.Framework, pods []*v1.Pod) (time.Duration, []e2emetrics.PodLatencyData) {
batchStartTime := metav1.Now()
e2eLags := make([]framework.PodLatencyData, 0)
e2eLags := make([]e2emetrics.PodLatencyData, 0)
for _, pod := range pods {
create := metav1.Now()
f.PodClient().CreateSync(pod)
e2eLags = append(e2eLags,
framework.PodLatencyData{Name: pod.Name, Latency: metav1.Now().Time.Sub(create.Time)})
e2emetrics.PodLatencyData{Name: pod.Name, Latency: metav1.Now().Time.Sub(create.Time)})
}
batchLag := metav1.Now().Time.Sub(batchStartTime.Time)
sort.Sort(framework.LatencySlice(e2eLags))
sort.Sort(e2emetrics.LatencySlice(e2eLags))
return batchLag, e2eLags
}
// logAndVerifyLatency verifies that whether pod creation latency satisfies the limit.
func logAndVerifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyData, podStartupLimits framework.LatencyMetric,
func logAndVerifyLatency(batchLag time.Duration, e2eLags []e2emetrics.PodLatencyData, podStartupLimits e2emetrics.LatencyMetric,
podBatchStartupLimit time.Duration, testInfo map[string]string, isVerify bool) {
framework.PrintLatencies(e2eLags, "worst client e2e total latencies")
e2emetrics.PrintLatencies(e2eLags, "worst client e2e total latencies")
// TODO(coufon): do not trust 'kubelet' metrics since they are not reset!
latencyMetrics, _ := getPodStartLatency(kubeletAddr)
e2elog.Logf("Kubelet Prometheus metrics (not reset):\n%s", framework.PrettyPrintJSON(latencyMetrics))
e2elog.Logf("Kubelet Prometheus metrics (not reset):\n%s", e2emetrics.PrettyPrintJSON(latencyMetrics))
podStartupLatency := framework.ExtractLatencyMetrics(e2eLags)
podStartupLatency := e2emetrics.ExtractLatencyMetrics(e2eLags)
// log latency perf data
logPerfData(getLatencyPerfData(podStartupLatency, testInfo), "latency")
if isVerify {
// check whether e2e pod startup time is acceptable.
framework.ExpectNoError(framework.VerifyLatencyWithinThreshold(podStartupLimits, podStartupLatency, "pod startup"))
framework.ExpectNoError(e2emetrics.VerifyLatencyWithinThreshold(podStartupLimits, podStartupLatency, "pod startup"))
// check bactch pod creation latency
if podBatchStartupLimit > 0 {
@ -560,6 +560,6 @@ func logAndVerifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyD
}
// logThroughput calculates and logs pod creation throughput.
func logPodCreateThroughput(batchLag time.Duration, e2eLags []framework.PodLatencyData, podsNr int, testInfo map[string]string) {
func logPodCreateThroughput(batchLag time.Duration, e2eLags []e2emetrics.PodLatencyData, podsNr int, testInfo map[string]string) {
logPerfData(getThroughputPerfData(batchLag, e2eLags, podsNr, testInfo), "throughput")
}