Remove prometheus references in test/integration

This PR does minimal changes to interface to allow removing all
references to prometheus from `test` directory. In future I would expect
wrapping prometheus samples to provide better abstraction. Changes:

Move generic_metrics.go to testutil/metrics.go
Remove etcd.go as it was not called
Move prometheus label consts to testutil.
This commit is contained in:
Marek Siarkowicz 2019-10-24 12:10:12 +02:00
parent 7d13dfe3c3
commit 09329b5bbc
18 changed files with 231 additions and 435 deletions

View File

@ -2,13 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["testutil.go"],
srcs = [
"metrics.go",
"testutil.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/component-base/metrics/testutil",
importpath = "k8s.io/component-base/metrics/testutil",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus/testutil:go_default_library",
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
],
)

View File

@ -0,0 +1,149 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testutil
import (
"fmt"
"io"
"reflect"
"strings"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
)
var (
// MetricNameLabel is label under which model.Sample stores metric name
MetricNameLabel model.LabelName = model.MetricNameLabel
// QuantileLabel is label under which model.Sample stores latency quantile value
QuantileLabel model.LabelName = model.QuantileLabel
)
// Metrics is generic metrics for other specific metrics
type Metrics map[string]model.Samples
// Equal returns true if all metrics are the same as the arguments.
func (m *Metrics) Equal(o Metrics) bool {
leftKeySet := []string{}
rightKeySet := []string{}
for k := range *m {
leftKeySet = append(leftKeySet, k)
}
for k := range o {
rightKeySet = append(rightKeySet, k)
}
if !reflect.DeepEqual(leftKeySet, rightKeySet) {
return false
}
for _, k := range leftKeySet {
if !(*m)[k].Equal(o[k]) {
return false
}
}
return true
}
// NewMetrics returns new metrics which are initialized.
func NewMetrics() Metrics {
result := make(Metrics)
return result
}
// ParseMetrics parses Metrics from data returned from prometheus endpoint
func ParseMetrics(data string, output *Metrics) error {
dec := expfmt.NewDecoder(strings.NewReader(data), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return nil
}
continue
}
for _, metric := range v {
name := string(metric.Metric[model.MetricNameLabel])
(*output)[name] = append((*output)[name], metric)
}
}
}
// ExtractMetricSamples parses the prometheus metric samples from the input string.
func ExtractMetricSamples(metricsBlob string) ([]*model.Sample, error) {
dec := expfmt.NewDecoder(strings.NewReader(metricsBlob), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
var samples []*model.Sample
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return samples, nil
}
return nil, err
}
samples = append(samples, v...)
}
}
// PrintSample returns formated representation of metric Sample
func PrintSample(sample *model.Sample) string {
buf := make([]string, 0)
// Id is a VERY special label. For 'normal' container it's useless, but it's necessary
// for 'system' containers (e.g. /docker-daemon, /kubelet, etc.). We know if that's the
// case by checking if there's a label "kubernetes_container_name" present. It's hacky
// but it works...
_, normalContainer := sample.Metric["kubernetes_container_name"]
for k, v := range sample.Metric {
if strings.HasPrefix(string(k), "__") {
continue
}
if string(k) == "id" && normalContainer {
continue
}
buf = append(buf, fmt.Sprintf("%v=%v", string(k), v))
}
return fmt.Sprintf("[%v] = %v", strings.Join(buf, ","), sample.Value)
}
// ComputeHistogramDelta computes the change in histogram metric for a selected label.
// Results are stored in after samples
func ComputeHistogramDelta(before, after model.Samples, label model.LabelName) {
beforeSamplesMap := make(map[string]*model.Sample)
for _, bSample := range before {
beforeSamplesMap[makeKey(bSample.Metric[label], bSample.Metric["le"])] = bSample
}
for _, aSample := range after {
if bSample, found := beforeSamplesMap[makeKey(aSample.Metric[label], aSample.Metric["le"])]; found {
aSample.Value = aSample.Value - bSample.Value
}
}
}
func makeKey(a, b model.LabelValue) string {
return string(a) + "___" + string(b)
}

View File

@ -13,8 +13,6 @@ go_library(
"cluster_autoscaler_metrics.go",
"controller_manager_metrics.go",
"e2e_metrics.go",
"etcd.go",
"generic_metrics.go",
"interesting_metrics.go",
"kubelet_metrics.go",
"latencies.go",
@ -34,13 +32,12 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/ssh:go_default_library",
"//test/e2e/perftype:go_default_library",
"//test/e2e/system:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -16,22 +16,24 @@ limitations under the License.
package metrics
import "k8s.io/component-base/metrics/testutil"
// APIServerMetrics is metrics for API server
type APIServerMetrics Metrics
type APIServerMetrics testutil.Metrics
// Equal returns true if all metrics are the same as the arguments.
func (m *APIServerMetrics) Equal(o APIServerMetrics) bool {
return (*Metrics)(m).Equal(Metrics(o))
return (*testutil.Metrics)(m).Equal(testutil.Metrics(o))
}
func newAPIServerMetrics() APIServerMetrics {
result := NewMetrics()
result := testutil.NewMetrics()
return APIServerMetrics(result)
}
func parseAPIServerMetrics(data string) (APIServerMetrics, error) {
result := newAPIServerMetrics()
if err := parseMetrics(data, (*Metrics)(&result)); err != nil {
if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
return APIServerMetrics{}, err
}
return result, nil

View File

@ -16,22 +16,24 @@ limitations under the License.
package metrics
import "k8s.io/component-base/metrics/testutil"
// ClusterAutoscalerMetrics is metrics for cluster autoscaler
type ClusterAutoscalerMetrics Metrics
type ClusterAutoscalerMetrics testutil.Metrics
// Equal returns true if all metrics are the same as the arguments.
func (m *ClusterAutoscalerMetrics) Equal(o ClusterAutoscalerMetrics) bool {
return (*Metrics)(m).Equal(Metrics(o))
return (*testutil.Metrics)(m).Equal(testutil.Metrics(o))
}
func newClusterAutoscalerMetrics() ClusterAutoscalerMetrics {
result := NewMetrics()
result := testutil.NewMetrics()
return ClusterAutoscalerMetrics(result)
}
func parseClusterAutoscalerMetrics(data string) (ClusterAutoscalerMetrics, error) {
result := newClusterAutoscalerMetrics()
if err := parseMetrics(data, (*Metrics)(&result)); err != nil {
if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
return ClusterAutoscalerMetrics{}, err
}
return result, nil

View File

@ -16,22 +16,24 @@ limitations under the License.
package metrics
import "k8s.io/component-base/metrics/testutil"
// ControllerManagerMetrics is metrics for controller manager
type ControllerManagerMetrics Metrics
type ControllerManagerMetrics testutil.Metrics
// Equal returns true if all metrics are the same as the arguments.
func (m *ControllerManagerMetrics) Equal(o ControllerManagerMetrics) bool {
return (*Metrics)(m).Equal(Metrics(o))
return (*testutil.Metrics)(m).Equal(testutil.Metrics(o))
}
func newControllerManagerMetrics() ControllerManagerMetrics {
result := NewMetrics()
result := testutil.NewMetrics()
return ControllerManagerMetrics(result)
}
func parseControllerManagerMetrics(data string) (ControllerManagerMetrics, error) {
result := newControllerManagerMetrics()
if err := parseMetrics(data, (*Metrics)(&result)); err != nil {
if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
return ControllerManagerMetrics{}, err
}
return result, nil

View File

@ -20,10 +20,8 @@ import (
"bytes"
"encoding/json"
"fmt"
"strings"
"github.com/prometheus/common/model"
"k8s.io/component-base/metrics/testutil"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
)
@ -57,45 +55,25 @@ func (m *ComponentCollection) filterMetrics() {
(*m).KubeletMetrics = kubeletMetrics
}
func printSample(sample *model.Sample) string {
buf := make([]string, 0)
// Id is a VERY special label. For 'normal' container it's useless, but it's necessary
// for 'system' containers (e.g. /docker-daemon, /kubelet, etc.). We know if that's the
// case by checking if there's a label "kubernetes_container_name" present. It's hacky
// but it works...
_, normalContainer := sample.Metric["kubernetes_container_name"]
for k, v := range sample.Metric {
if strings.HasPrefix(string(k), "__") {
continue
}
if string(k) == "id" && normalContainer {
continue
}
buf = append(buf, fmt.Sprintf("%v=%v", string(k), v))
}
return fmt.Sprintf("[%v] = %v", strings.Join(buf, ","), sample.Value)
}
// PrintHumanReadable returns e2e metrics with JSON format.
func (m *ComponentCollection) PrintHumanReadable() string {
buf := bytes.Buffer{}
for _, interestingMetric := range interestingAPIServerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).APIServerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
buf.WriteString(fmt.Sprintf("\t%v\n", testutil.PrintSample(sample)))
}
}
for _, interestingMetric := range interestingControllerManagerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ControllerManagerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
buf.WriteString(fmt.Sprintf("\t%v\n", testutil.PrintSample(sample)))
}
}
for _, interestingMetric := range interestingClusterAutoscalerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ClusterAutoscalerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
buf.WriteString(fmt.Sprintf("\t%v\n", testutil.PrintSample(sample)))
}
}
for kubelet, grabbed := range (*m).KubeletMetrics {
@ -103,7 +81,7 @@ func (m *ComponentCollection) PrintHumanReadable() string {
for _, interestingMetric := range interestingKubeletMetrics {
buf.WriteString(fmt.Sprintf("\tFor %v:\n", interestingMetric))
for _, sample := range grabbed[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t\t%v\n", printSample(sample)))
buf.WriteString(fmt.Sprintf("\t\t%v\n", testutil.PrintSample(sample)))
}
}
}
@ -138,25 +116,12 @@ func (m *ComponentCollection) SummaryKind() string {
return "ComponentCollection"
}
func makeKey(a, b model.LabelValue) string {
return string(a) + "___" + string(b)
}
// ComputeClusterAutoscalerMetricsDelta computes the change in cluster
// autoscaler metrics.
func (m *ComponentCollection) ComputeClusterAutoscalerMetricsDelta(before Collection) {
if beforeSamples, found := before.ClusterAutoscalerMetrics[caFunctionMetric]; found {
if afterSamples, found := m.ClusterAutoscalerMetrics[caFunctionMetric]; found {
beforeSamplesMap := make(map[string]*model.Sample)
for _, bSample := range beforeSamples {
beforeSamplesMap[makeKey(bSample.Metric[caFunctionMetricLabel], bSample.Metric["le"])] = bSample
}
for _, aSample := range afterSamples {
if bSample, found := beforeSamplesMap[makeKey(aSample.Metric[caFunctionMetricLabel], aSample.Metric["le"])]; found {
aSample.Value = aSample.Value - bSample.Value
}
}
testutil.ComputeHistogramDelta(beforeSamples, afterSamples, caFunctionMetricLabel)
}
}
}

View File

@ -1,223 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"io"
"math"
"reflect"
"strings"
"sync"
"time"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
)
// Histogram is a struct for managing histogram.
type Histogram struct {
Labels map[string]string `json:"labels"`
Buckets map[string]int `json:"buckets"`
}
// HistogramVec is an array of Histogram.
type HistogramVec []Histogram
func newHistogram(labels map[string]string) *Histogram {
return &Histogram{
Labels: labels,
Buckets: make(map[string]int),
}
}
// EtcdMetrics is a struct for managing etcd metrics.
type EtcdMetrics struct {
BackendCommitDuration HistogramVec `json:"backendCommitDuration"`
SnapshotSaveTotalDuration HistogramVec `json:"snapshotSaveTotalDuration"`
PeerRoundTripTime HistogramVec `json:"peerRoundTripTime"`
WalFsyncDuration HistogramVec `json:"walFsyncDuration"`
MaxDatabaseSize float64 `json:"maxDatabaseSize"`
}
func newEtcdMetrics() *EtcdMetrics {
return &EtcdMetrics{
BackendCommitDuration: make(HistogramVec, 0),
SnapshotSaveTotalDuration: make(HistogramVec, 0),
PeerRoundTripTime: make(HistogramVec, 0),
WalFsyncDuration: make(HistogramVec, 0),
}
}
// SummaryKind returns the summary of etcd metrics.
func (l *EtcdMetrics) SummaryKind() string {
return "EtcdMetrics"
}
// PrintHumanReadable returns etcd metrics with JSON format.
func (l *EtcdMetrics) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
// PrintJSON returns etcd metrics with JSON format.
func (l *EtcdMetrics) PrintJSON() string {
return PrettyPrintJSON(l)
}
// EtcdMetricsCollector is a struct for managing etcd metrics collector.
type EtcdMetricsCollector struct {
stopCh chan struct{}
wg *sync.WaitGroup
metrics *EtcdMetrics
}
// NewEtcdMetricsCollector creates a new etcd metrics collector.
func NewEtcdMetricsCollector() *EtcdMetricsCollector {
return &EtcdMetricsCollector{
stopCh: make(chan struct{}),
wg: &sync.WaitGroup{},
metrics: newEtcdMetrics(),
}
}
// extractMetricSamples parses the prometheus metric samples from the input string.
func extractMetricSamples(metricsBlob string) ([]*model.Sample, error) {
dec := expfmt.NewDecoder(strings.NewReader(metricsBlob), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
var samples []*model.Sample
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return samples, nil
}
return nil, err
}
samples = append(samples, v...)
}
}
func getEtcdMetrics(provider string, masterHostname string) ([]*model.Sample, error) {
// Etcd is only exposed on localhost level. We are using ssh method
if provider == "gke" || provider == "eks" {
e2elog.Logf("Not grabbing etcd metrics through master SSH: unsupported for %s", provider)
return nil, nil
}
cmd := "curl http://localhost:2379/metrics"
sshResult, err := e2essh.SSH(cmd, masterHostname+":22", provider)
if err != nil || sshResult.Code != 0 {
return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
}
data := sshResult.Stdout
return extractMetricSamples(data)
}
func getEtcdDatabaseSize(provider string, masterHostname string) (float64, error) {
samples, err := getEtcdMetrics(provider, masterHostname)
if err != nil {
return 0, err
}
for _, sample := range samples {
if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" {
return float64(sample.Value), nil
}
}
return 0, fmt.Errorf("Couldn't find etcd database size metric")
}
// StartCollecting starts to collect etcd db size metric periodically
// and updates MaxDatabaseSize accordingly.
func (mc *EtcdMetricsCollector) StartCollecting(interval time.Duration, provider string, masterHostname string) {
mc.wg.Add(1)
go func() {
defer mc.wg.Done()
for {
select {
case <-time.After(interval):
dbSize, err := getEtcdDatabaseSize(provider, masterHostname)
if err != nil {
e2elog.Logf("Failed to collect etcd database size")
continue
}
mc.metrics.MaxDatabaseSize = math.Max(mc.metrics.MaxDatabaseSize, dbSize)
case <-mc.stopCh:
return
}
}
}()
}
func convertSampleToBucket(sample *model.Sample, h *HistogramVec) {
labels := make(map[string]string)
for k, v := range sample.Metric {
if k != "le" {
labels[string(k)] = string(v)
}
}
var hist *Histogram
for i := range *h {
if reflect.DeepEqual(labels, (*h)[i].Labels) {
hist = &((*h)[i])
break
}
}
if hist == nil {
hist = newHistogram(labels)
*h = append(*h, *hist)
}
hist.Buckets[string(sample.Metric["le"])] = int(sample.Value)
}
// StopAndSummarize stops etcd metrics collector and summarizes the metrics.
func (mc *EtcdMetricsCollector) StopAndSummarize(provider string, masterHostname string) error {
close(mc.stopCh)
mc.wg.Wait()
// Do some one-off collection of metrics.
samples, err := getEtcdMetrics(provider, masterHostname)
if err != nil {
return err
}
for _, sample := range samples {
switch sample.Metric[model.MetricNameLabel] {
case "etcd_disk_backend_commit_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.BackendCommitDuration)
case "etcd_debugging_snap_save_total_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.SnapshotSaveTotalDuration)
case "etcd_disk_wal_fsync_duration_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.WalFsyncDuration)
case "etcd_network_peer_round_trip_time_seconds_bucket":
convertSampleToBucket(sample, &mc.metrics.PeerRoundTripTime)
}
}
return nil
}
// GetMetrics returns metrics of etcd metrics collector.
func (mc *EtcdMetricsCollector) GetMetrics() *EtcdMetrics {
return mc.metrics
}

View File

@ -1,81 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"io"
"reflect"
"strings"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"k8s.io/klog"
)
// Metrics is generic metrics for other specific metrics
type Metrics map[string]model.Samples
// Equal returns true if all metrics are the same as the arguments.
func (m *Metrics) Equal(o Metrics) bool {
leftKeySet := []string{}
rightKeySet := []string{}
for k := range *m {
leftKeySet = append(leftKeySet, k)
}
for k := range o {
rightKeySet = append(rightKeySet, k)
}
if !reflect.DeepEqual(leftKeySet, rightKeySet) {
return false
}
for _, k := range leftKeySet {
if !(*m)[k].Equal(o[k]) {
return false
}
}
return true
}
// NewMetrics returns new metrics which are initialized.
func NewMetrics() Metrics {
result := make(Metrics)
return result
}
func parseMetrics(data string, output *Metrics) error {
dec := expfmt.NewDecoder(strings.NewReader(data), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return nil
}
klog.Warningf("Invalid Decode. Skipping.")
continue
}
for _, metric := range v {
name := string(metric.Metric[model.MetricNameLabel])
(*output)[name] = append((*output)[name], metric)
}
}
}

View File

@ -27,11 +27,10 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/metrics/testutil"
dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"github.com/prometheus/common/model"
)
const (
@ -39,16 +38,16 @@ const (
)
// KubeletMetrics is metrics for kubelet
type KubeletMetrics Metrics
type KubeletMetrics testutil.Metrics
// Equal returns true if all metrics are the same as the arguments.
func (m *KubeletMetrics) Equal(o KubeletMetrics) bool {
return (*Metrics)(m).Equal(Metrics(o))
return (*testutil.Metrics)(m).Equal(testutil.Metrics(o))
}
// NewKubeletMetrics returns new metrics which are initialized.
func NewKubeletMetrics() KubeletMetrics {
result := NewMetrics()
result := testutil.NewMetrics()
return KubeletMetrics(result)
}
@ -69,7 +68,7 @@ func GrabKubeletMetricsWithoutProxy(nodeName, path string) (KubeletMetrics, erro
func parseKubeletMetrics(data string) (KubeletMetrics, error) {
result := NewKubeletMetrics()
if err := parseMetrics(data, (*Metrics)(&result)); err != nil {
if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
return KubeletMetrics{}, err
}
return result, nil
@ -183,7 +182,7 @@ func GetKubeletLatencyMetrics(ms KubeletMetrics, filterMetricNames sets.String)
latency := sample.Value
operation := string(sample.Metric["operation_type"])
var quantile float64
if val, ok := sample.Metric[model.QuantileLabel]; ok {
if val, ok := sample.Metric[testutil.QuantileLabel]; ok {
var err error
if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
continue

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/master/ports"
schedulermetric "k8s.io/kubernetes/pkg/scheduler/metrics"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
@ -35,8 +36,6 @@ import (
"k8s.io/kubernetes/test/e2e/system"
"github.com/onsi/gomega"
"github.com/prometheus/common/model"
)
const (
@ -63,7 +62,7 @@ const (
bigClusterNodeCountThreshold = 500
)
var schedulingLatencyMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_" + schedulermetric.SchedulingLatencyName)
var schedulingLatencyMetricName = schedulermetric.SchedulerSubsystem + "_" + schedulermetric.SchedulingLatencyName
func readLatencyMetrics(c clientset.Interface) (*APIResponsiveness, error) {
var a APIResponsiveness
@ -73,7 +72,7 @@ func readLatencyMetrics(c clientset.Interface) (*APIResponsiveness, error) {
return nil, err
}
samples, err := extractMetricSamples(body)
samples, err := testutil.ExtractMetricSamples(body)
if err != nil {
return nil, err
}
@ -86,8 +85,8 @@ func readLatencyMetrics(c clientset.Interface) (*APIResponsiveness, error) {
// Example line:
// apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
// apiserver_request_total{resource="pods",verb="LIST",client="kubectl",code="200",contentType="json"} 233
if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" &&
sample.Metric[model.MetricNameLabel] != "apiserver_request_total" {
if sample.Metric[testutil.MetricNameLabel] != "apiserver_request_latencies_summary" &&
sample.Metric[testutil.MetricNameLabel] != "apiserver_request_total" {
continue
}
@ -99,10 +98,10 @@ func readLatencyMetrics(c clientset.Interface) (*APIResponsiveness, error) {
continue
}
switch sample.Metric[model.MetricNameLabel] {
switch sample.Metric[testutil.MetricNameLabel] {
case "apiserver_request_latencies_summary":
latency := sample.Value
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
quantile, err := strconv.ParseFloat(string(sample.Metric[testutil.QuantileLabel]), 64)
if err != nil {
return nil, err
}
@ -261,13 +260,13 @@ func getSchedulingLatency(c clientset.Interface, provider, cloudMasterName, mast
return nil, err
}
samples, err := extractMetricSamples(data)
samples, err := testutil.ExtractMetricSamples(data)
if err != nil {
return nil, err
}
for _, sample := range samples {
if sample.Metric[model.MetricNameLabel] != schedulingLatencyMetricName {
if string(sample.Metric[testutil.MetricNameLabel]) != schedulingLatencyMetricName {
continue
}
@ -286,7 +285,7 @@ func getSchedulingLatency(c clientset.Interface, provider, cloudMasterName, mast
continue
}
quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
quantile, err := strconv.ParseFloat(string(sample.Metric[testutil.QuantileLabel]), 64)
if err != nil {
return nil, err
}

View File

@ -16,22 +16,24 @@ limitations under the License.
package metrics
import "k8s.io/component-base/metrics/testutil"
// SchedulerMetrics is metrics for scheduler
type SchedulerMetrics Metrics
type SchedulerMetrics testutil.Metrics
// Equal returns true if all metrics are the same as the arguments.
func (m *SchedulerMetrics) Equal(o SchedulerMetrics) bool {
return (*Metrics)(m).Equal(Metrics(o))
return (*testutil.Metrics)(m).Equal(testutil.Metrics(o))
}
func newSchedulerMetrics() SchedulerMetrics {
result := NewMetrics()
result := testutil.NewMetrics()
return SchedulerMetrics(result)
}
func parseSchedulerMetrics(data string) (SchedulerMetrics, error) {
result := newSchedulerMetrics()
if err := parseMetrics(data, (*Metrics)(&result)); err != nil {
if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
return SchedulerMetrics{}, err
}
return result, nil

View File

@ -62,6 +62,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/auth:go_default_library",
"//test/e2e/framework/deployment:go_default_library",

View File

@ -42,6 +42,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
"//test/e2e/framework:go_default_library",

View File

@ -36,6 +36,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/metrics/testutil"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/metrics"
@ -533,7 +534,7 @@ func StartPodLogs(f *framework.Framework) func() {
return cancel
}
func getVolumeOpsFromMetricsForPlugin(ms metrics.Metrics, pluginName string) opCounts {
func getVolumeOpsFromMetricsForPlugin(ms testutil.Metrics, pluginName string) opCounts {
totOps := opCounts{}
for method, samples := range ms {
@ -577,7 +578,7 @@ func getVolumeOpCounts(c clientset.Interface, pluginName string) opCounts {
controllerMetrics, err := metricsGrabber.GrabFromControllerManager()
framework.ExpectNoError(err, "Error getting c-m metrics : %v", err)
totOps := getVolumeOpsFromMetricsForPlugin(metrics.Metrics(controllerMetrics), pluginName)
totOps := getVolumeOpsFromMetricsForPlugin(testutil.Metrics(controllerMetrics), pluginName)
framework.Logf("Node name not specified for getVolumeOpCounts, falling back to listing nodes from API Server")
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
@ -589,7 +590,7 @@ func getVolumeOpCounts(c clientset.Interface, pluginName string) opCounts {
for _, node := range nodes.Items {
nodeMetrics, err := metricsGrabber.GrabFromKubelet(node.GetName())
framework.ExpectNoError(err, "Error getting Kubelet %v metrics: %v", node.GetName(), err)
totOps = addOpCounts(totOps, getVolumeOpsFromMetricsForPlugin(metrics.Metrics(nodeMetrics), pluginName))
totOps = addOpCounts(totOps, getVolumeOpsFromMetricsForPlugin(testutil.Metrics(nodeMetrics), pluginName))
}
} else {
framework.Logf("Skipping operation metrics gathering from nodes in getVolumeOpCounts, greater than %v nodes", nodeLimit)

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/metrics/testutil"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/metrics"
@ -274,7 +275,7 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
metricKey := "volume_operation_total_seconds_count"
dimensions := []string{"operation_name", "plugin_name"}
valid := hasValidMetrics(metrics.Metrics(controllerMetrics), metricKey, dimensions...)
valid := hasValidMetrics(testutil.Metrics(controllerMetrics), metricKey, dimensions...)
gomega.Expect(valid).To(gomega.BeTrue(), "Invalid metric in P/V Controller metrics: %q", metricKey)
framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
@ -304,7 +305,7 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
// Metrics should have dimensions plugin_name and state available
totalVolumesKey := "volume_manager_total_volumes"
dimensions := []string{"state", "plugin_name"}
valid := hasValidMetrics(metrics.Metrics(kubeMetrics), totalVolumesKey, dimensions...)
valid := hasValidMetrics(testutil.Metrics(kubeMetrics), totalVolumesKey, dimensions...)
gomega.Expect(valid).To(gomega.BeTrue(), "Invalid metric in Volume Manager metrics: %q", totalVolumesKey)
framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
@ -353,8 +354,8 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
// Total number of volumes in both ActualStateofWorld and DesiredStateOfWorld
// states should be higher or equal than it used to be
oldStates := getStatesMetrics(totalVolumesKey, metrics.Metrics(controllerMetrics))
updatedStates := getStatesMetrics(totalVolumesKey, metrics.Metrics(updatedControllerMetrics))
oldStates := getStatesMetrics(totalVolumesKey, testutil.Metrics(controllerMetrics))
updatedStates := getStatesMetrics(totalVolumesKey, testutil.Metrics(updatedControllerMetrics))
for _, stateName := range states {
if _, ok := oldStates[stateName]; !ok {
continue
@ -716,7 +717,7 @@ func calculateRelativeValues(originValues, updatedValues map[string]int64) map[s
return relativeValues
}
func hasValidMetrics(metrics metrics.Metrics, metricKey string, dimensions ...string) bool {
func hasValidMetrics(metrics testutil.Metrics, metricKey string, dimensions ...string) bool {
var errCount int
framework.Logf("Looking for sample in metric %q", metricKey)
samples, ok := metrics[metricKey]
@ -736,7 +737,7 @@ func hasValidMetrics(metrics metrics.Metrics, metricKey string, dimensions ...st
return errCount == 0
}
func getStatesMetrics(metricKey string, givenMetrics metrics.Metrics) map[string]map[string]int64 {
func getStatesMetrics(metricKey string, givenMetrics testutil.Metrics) map[string]map[string]int64 {
states := make(map[string]map[string]int64)
for _, sample := range givenMetrics[metricKey] {
framework.Logf("Found sample %q", sample.String())
@ -759,10 +760,10 @@ func waitForADControllerStatesMetrics(metricsGrabber *metrics.Grabber, metricNam
framework.Skipf("Could not get controller-manager metrics - skipping")
return false, err
}
if !hasValidMetrics(metrics.Metrics(updatedMetrics), metricName, dimensions...) {
if !hasValidMetrics(testutil.Metrics(updatedMetrics), metricName, dimensions...) {
return false, fmt.Errorf("could not get valid metrics for %q", metricName)
}
states := getStatesMetrics(metricName, metrics.Metrics(updatedMetrics))
states := getStatesMetrics(metricName, testutil.Metrics(updatedMetrics))
for _, name := range stateNames {
if _, ok := states[name]; !ok {
return false, fmt.Errorf("could not get state %q from A/D Controller metrics", name)

View File

@ -39,9 +39,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/github.com/golang/protobuf/proto:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -17,8 +17,8 @@ limitations under the License.
package metrics
import (
"bufio"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"runtime"
@ -28,26 +28,15 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/test/integration/framework"
"github.com/golang/protobuf/proto"
prometheuspb "github.com/prometheus/client_model/go"
"k8s.io/klog"
)
const scrapeRequestHeader = "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=compact-text"
func scrapeMetrics(s *httptest.Server) ([]*prometheuspb.MetricFamily, error) {
func scrapeMetrics(s *httptest.Server) (testutil.Metrics, error) {
req, err := http.NewRequest("GET", s.URL+"/metrics", nil)
if err != nil {
return nil, fmt.Errorf("Unable to create http request: %v", err)
}
// Ask the prometheus exporter for its text protocol buffer format, since it's
// much easier to parse than its plain-text format. Don't use the serialized
// proto representation since it uses a non-standard varint delimiter between
// metric families.
req.Header.Add("Accept", scrapeRequestHeader)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
@ -57,31 +46,18 @@ func scrapeMetrics(s *httptest.Server) ([]*prometheuspb.MetricFamily, error) {
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Non-200 response trying to scrape metrics from master: %v", resp)
}
// Each line in the response body should contain all the data for a single metric.
var metrics []*prometheuspb.MetricFamily
scanner := bufio.NewScanner(resp.Body)
// Increase buffer size, since default one is too small for reading
// the /metrics contents.
scanner.Buffer(make([]byte, 10), 131072)
for scanner.Scan() {
var metric prometheuspb.MetricFamily
if err := proto.UnmarshalText(scanner.Text(), &metric); err != nil {
return nil, fmt.Errorf("Failed to unmarshal line of metrics response: %v", err)
}
klog.V(4).Infof("Got metric %q", metric.GetName())
metrics = append(metrics, &metric)
metrics := testutil.NewMetrics()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("Unable to read response: %v", resp)
}
return metrics, nil
err = testutil.ParseMetrics(string(data), &metrics)
return metrics, err
}
func checkForExpectedMetrics(t *testing.T, metrics []*prometheuspb.MetricFamily, expectedMetrics []string) {
foundMetrics := make(map[string]bool)
for _, metric := range metrics {
foundMetrics[metric.GetName()] = true
}
func checkForExpectedMetrics(t *testing.T, metrics testutil.Metrics, expectedMetrics []string) {
for _, expected := range expectedMetrics {
if _, found := foundMetrics[expected]; !found {
if _, found := metrics[expected]; !found {
t.Errorf("Master metrics did not include expected metric %q", expected)
}
}
@ -124,7 +100,7 @@ func TestApiserverMetrics(t *testing.T) {
}
checkForExpectedMetrics(t, metrics, []string{
"apiserver_request_total",
"apiserver_request_duration_seconds",
"etcd_request_duration_seconds",
"apiserver_request_duration_seconds_sum",
"etcd_request_duration_seconds_sum",
})
}