From 6051664c0fb0eb78a84cea8bf5a1e1e4584de50e Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 7 Mar 2019 15:39:37 -0800 Subject: [PATCH] add kubelet prometheus resource metrics endpoint --- pkg/kubelet/apis/BUILD | 1 + .../apis/resourcemetrics/v1alpha1/BUILD | 26 ++ .../apis/resourcemetrics/v1alpha1/config.go | 81 ++++ pkg/kubelet/server/BUILD | 1 + pkg/kubelet/server/server.go | 21 +- pkg/kubelet/server/stats/BUILD | 6 + .../stats/prometheus_resource_metrics.go | 118 ++++++ .../stats/prometheus_resource_metrics_test.go | 348 ++++++++++++++++++ test/e2e/framework/kubelet_stats.go | 2 +- test/e2e/framework/metrics/kubelet_metrics.go | 5 +- test/e2e_node/BUILD | 2 + test/e2e_node/density_test.go | 2 +- test/e2e_node/gpu_device_plugin.go | 2 +- test/e2e_node/resource_metrics_test.go | 138 +++++++ test/e2e_node/util.go | 4 +- 15 files changed, 743 insertions(+), 14 deletions(-) create mode 100644 pkg/kubelet/apis/resourcemetrics/v1alpha1/BUILD create mode 100644 pkg/kubelet/apis/resourcemetrics/v1alpha1/config.go create mode 100644 pkg/kubelet/server/stats/prometheus_resource_metrics.go create mode 100644 pkg/kubelet/server/stats/prometheus_resource_metrics_test.go create mode 100644 test/e2e_node/resource_metrics_test.go diff --git a/pkg/kubelet/apis/BUILD b/pkg/kubelet/apis/BUILD index e1bfa430ca5..526638c1025 100644 --- a/pkg/kubelet/apis/BUILD +++ b/pkg/kubelet/apis/BUILD @@ -44,6 +44,7 @@ filegroup( "//pkg/kubelet/apis/pluginregistration/v1alpha1:all-srcs", "//pkg/kubelet/apis/pluginregistration/v1beta1:all-srcs", "//pkg/kubelet/apis/podresources:all-srcs", + "//pkg/kubelet/apis/resourcemetrics/v1alpha1:all-srcs", "//pkg/kubelet/apis/stats/v1alpha1:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/kubelet/apis/resourcemetrics/v1alpha1/BUILD b/pkg/kubelet/apis/resourcemetrics/v1alpha1/BUILD new file mode 100644 index 00000000000..3addd5c1877 --- /dev/null +++ b/pkg/kubelet/apis/resourcemetrics/v1alpha1/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["config.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", + "//pkg/kubelet/server/stats:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/apis/resourcemetrics/v1alpha1/config.go b/pkg/kubelet/apis/resourcemetrics/v1alpha1/config.go new file mode 100644 index 00000000000..b0697dec261 --- /dev/null +++ b/pkg/kubelet/apis/resourcemetrics/v1alpha1/config.go @@ -0,0 +1,81 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "time" + + summary "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/server/stats" +) + +// Version is the string representation of the version of this configuration +const Version = "v1alpha1" + +// Config is the v1alpha1 resource metrics definition +func Config() stats.ResourceMetricsConfig { + return stats.ResourceMetricsConfig{ + NodeMetrics: []stats.NodeResourceMetric{ + { + Name: "node_cpu_usage_seconds_total", + Description: "Cumulative cpu time consumed by the node in core-seconds", + ValueFn: func(s summary.NodeStats) (*float64, time.Time) { + if s.CPU == nil { + return nil, time.Time{} + } + v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second) + return &v, s.CPU.Time.Time + }, + }, + { + Name: "node_memory_working_set_bytes", + Description: "Current working set of the node in bytes", + ValueFn: func(s summary.NodeStats) (*float64, time.Time) { + if s.Memory == nil { + return nil, time.Time{} + } + v := float64(*s.Memory.WorkingSetBytes) + return &v, s.Memory.Time.Time + }, + }, + }, + ContainerMetrics: []stats.ContainerResourceMetric{ + { + Name: "container_cpu_usage_seconds_total", + Description: "Cumulative cpu time consumed by the container in core-seconds", + ValueFn: func(s summary.ContainerStats) (*float64, time.Time) { + if s.CPU == nil { + return nil, time.Time{} + } + v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second) + return &v, s.CPU.Time.Time + }, + }, + { + Name: "container_memory_working_set_bytes", + Description: "Current working set of the container in bytes", + ValueFn: func(s summary.ContainerStats) (*float64, time.Time) { + if s.Memory == nil { + return nil, time.Time{} + } + v := float64(*s.Memory.WorkingSetBytes) + return &v, s.Memory.Time.Time + }, + }, + }, + } +} diff --git a/pkg/kubelet/server/BUILD b/pkg/kubelet/server/BUILD index 55353c425aa..81e921cae8d 100644 --- a/pkg/kubelet/server/BUILD +++ b/pkg/kubelet/server/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/apis/core/v1/validation:go_default_library", "//pkg/kubelet/apis/podresources:go_default_library", "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", + "//pkg/kubelet/apis/resourcemetrics/v1alpha1:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/prober:go_default_library", "//pkg/kubelet/server/portforward:go_default_library", diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 50256f6e80c..c018dc81c5e 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -25,6 +25,7 @@ import ( "net/http" "net/http/pprof" "net/url" + "path" "reflect" goruntime "runtime" "strconv" @@ -59,6 +60,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core/v1/validation" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/kubelet/server/portforward" @@ -71,12 +73,13 @@ import ( ) const ( - metricsPath = "/metrics" - cadvisorMetricsPath = "/metrics/cadvisor" - proberMetricsPath = "/metrics/probes" - specPath = "/spec/" - statsPath = "/stats/" - logsPath = "/logs/" + metricsPath = "/metrics" + cadvisorMetricsPath = "/metrics/cadvisor" + resourceMetricsPathPrefix = "/metrics/resource" + proberMetricsPath = "/metrics/probes" + specPath = "/spec/" + statsPath = "/stats/" + logsPath = "/logs/" ) // Server is a http.Handler which exposes kubelet functionality over HTTP. @@ -308,6 +311,12 @@ func (s *Server) InstallDefaultHandlers() { promhttp.HandlerFor(r, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}), ) + v1alpha1ResourceRegistry := prometheus.NewRegistry() + v1alpha1ResourceRegistry.MustRegister(stats.NewPrometheusResourceMetricCollector(s.resourceAnalyzer, v1alpha1.Config())) + s.restfulCont.Handle(path.Join(resourceMetricsPathPrefix, v1alpha1.Version), + promhttp.HandlerFor(v1alpha1ResourceRegistry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}), + ) + // prober metrics are exposed under a different endpoint p := prometheus.NewRegistry() p.MustRegister(prober.ProberResults) diff --git a/pkg/kubelet/server/stats/BUILD b/pkg/kubelet/server/stats/BUILD index dde9b3cbc7d..954fec082d1 100644 --- a/pkg/kubelet/server/stats/BUILD +++ b/pkg/kubelet/server/stats/BUILD @@ -6,6 +6,7 @@ go_library( "doc.go", "fs_resource_analyzer.go", "handler.go", + "prometheus_resource_metrics.go", "resource_analyzer.go", "summary.go", "summary_sys_containers.go", @@ -27,6 +28,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/github.com/emicklei/go-restful:go_default_library", "//vendor/github.com/google/cadvisor/info/v1:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -34,6 +36,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "prometheus_resource_metrics_test.go", "summary_test.go", "summary_windows_test.go", "volume_stat_calculator_test.go", @@ -46,7 +49,10 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + "//vendor/github.com/prometheus/client_model/go:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/mock:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:android": [ "//pkg/kubelet/cm:go_default_library", diff --git a/pkg/kubelet/server/stats/prometheus_resource_metrics.go b/pkg/kubelet/server/stats/prometheus_resource_metrics.go new file mode 100644 index 00000000000..050bc20a73c --- /dev/null +++ b/pkg/kubelet/server/stats/prometheus_resource_metrics.go @@ -0,0 +1,118 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +import ( + "time" + + "k8s.io/klog" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" + + "github.com/prometheus/client_golang/prometheus" +) + +// NodeResourceMetric describes a metric for the node +type NodeResourceMetric struct { + Name string + Description string + ValueFn func(stats.NodeStats) (*float64, time.Time) +} + +func (n *NodeResourceMetric) desc() *prometheus.Desc { + return prometheus.NewDesc(n.Name, n.Description, []string{}, nil) +} + +// ContainerResourceMetric describes a metric for containers +type ContainerResourceMetric struct { + Name string + Description string + ValueFn func(stats.ContainerStats) (*float64, time.Time) +} + +func (n *ContainerResourceMetric) desc() *prometheus.Desc { + return prometheus.NewDesc(n.Name, n.Description, []string{"container", "pod", "namespace"}, nil) +} + +// ResourceMetricsConfig specifies which metrics to collect and export +type ResourceMetricsConfig struct { + NodeMetrics []NodeResourceMetric + ContainerMetrics []ContainerResourceMetric +} + +// NewPrometheusResourceMetricCollector returns a prometheus.Collector which exports resource metrics +func NewPrometheusResourceMetricCollector(provider SummaryProvider, config ResourceMetricsConfig) prometheus.Collector { + return &resourceMetricCollector{ + provider: provider, + config: config, + errors: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "scrape_error", + Help: "1 if there was an error while getting container metrics, 0 otherwise", + }), + } +} + +type resourceMetricCollector struct { + provider SummaryProvider + config ResourceMetricsConfig + errors prometheus.Gauge +} + +var _ prometheus.Collector = &resourceMetricCollector{} + +// Describe implements prometheus.Collector +func (rc *resourceMetricCollector) Describe(ch chan<- *prometheus.Desc) { + rc.errors.Describe(ch) + for _, metric := range rc.config.NodeMetrics { + ch <- metric.desc() + } + for _, metric := range rc.config.ContainerMetrics { + ch <- metric.desc() + } +} + +// Collect implements prometheus.Collector +// Since new containers are frequently created and removed, using the prometheus.Gauge Collector would +// leak metric collectors for containers or pods that no longer exist. Instead, implement +// prometheus.Collector in a way that only collects metrics for active containers. +func (rc *resourceMetricCollector) Collect(ch chan<- prometheus.Metric) { + rc.errors.Set(0) + defer rc.errors.Collect(ch) + summary, err := rc.provider.GetCPUAndMemoryStats() + if err != nil { + rc.errors.Set(1) + klog.Warningf("Error getting summary for resourceMetric prometheus endpoint: %v", err) + return + } + + for _, metric := range rc.config.NodeMetrics { + if value, timestamp := metric.ValueFn(summary.Node); value != nil { + ch <- prometheus.NewMetricWithTimestamp(timestamp, + prometheus.MustNewConstMetric(metric.desc(), prometheus.GaugeValue, *value)) + } + } + + for _, pod := range summary.Pods { + for _, container := range pod.Containers { + for _, metric := range rc.config.ContainerMetrics { + if value, timestamp := metric.ValueFn(container); value != nil { + ch <- prometheus.NewMetricWithTimestamp(timestamp, + prometheus.MustNewConstMetric(metric.desc(), prometheus.GaugeValue, *value, container.Name, pod.PodRef.Name, pod.PodRef.Namespace)) + } + } + } + } +} diff --git a/pkg/kubelet/server/stats/prometheus_resource_metrics_test.go b/pkg/kubelet/server/stats/prometheus_resource_metrics_test.go new file mode 100644 index 00000000000..462b141c340 --- /dev/null +++ b/pkg/kubelet/server/stats/prometheus_resource_metrics_test.go @@ -0,0 +1,348 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" +) + +const ( + errorName = "scrape_error" + errorHelp = "1 if there was an error while getting container metrics, 0 otherwise" +) + +var ( + noError = float64(0) + hasError = float64(1) +) + +type mockSummaryProvider struct { + mock.Mock +} + +func (m *mockSummaryProvider) Get(updateStats bool) (*statsapi.Summary, error) { + args := m.Called(updateStats) + return args.Get(0).(*statsapi.Summary), args.Error(1) +} + +func (m *mockSummaryProvider) GetCPUAndMemoryStats() (*statsapi.Summary, error) { + args := m.Called() + return args.Get(0).(*statsapi.Summary), args.Error(1) +} + +type collectResult struct { + desc *prometheus.Desc + metric *dto.Metric +} + +func TestCollectResourceMetrics(t *testing.T) { + testTime := metav1.Now() + for _, tc := range []struct { + description string + config ResourceMetricsConfig + summary *statsapi.Summary + summaryErr error + expectedMetrics []collectResult + }{ + { + description: "error getting summary", + config: ResourceMetricsConfig{}, + summary: nil, + summaryErr: fmt.Errorf("failed to get summary"), + expectedMetrics: []collectResult{ + { + desc: prometheus.NewDesc(errorName, errorHelp, []string{}, nil), + metric: &dto.Metric{Gauge: &dto.Gauge{Value: &hasError}}, + }, + }, + }, + { + description: "arbitrary node metrics", + config: ResourceMetricsConfig{ + NodeMetrics: []NodeResourceMetric{ + { + Name: "node_foo", + Description: "a metric from nodestats", + ValueFn: func(s statsapi.NodeStats) (*float64, time.Time) { + if s.CPU == nil { + return nil, time.Time{} + } + v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second) + return &v, s.CPU.Time.Time + }, + }, + { + Name: "node_bar", + Description: "another metric from nodestats", + ValueFn: func(s statsapi.NodeStats) (*float64, time.Time) { + if s.Memory == nil { + return nil, time.Time{} + } + v := float64(*s.Memory.WorkingSetBytes) + return &v, s.Memory.Time.Time + }, + }, + }, + }, + summary: &statsapi.Summary{ + Node: statsapi.NodeStats{ + CPU: &statsapi.CPUStats{ + Time: testTime, + UsageCoreNanoSeconds: uint64Ptr(10000000000), + }, + Memory: &statsapi.MemoryStats{ + Time: testTime, + WorkingSetBytes: uint64Ptr(1000), + }, + }, + }, + summaryErr: nil, + expectedMetrics: []collectResult{ + { + desc: prometheus.NewDesc("node_foo", "a metric from nodestats", []string{}, nil), + metric: &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(10)}}, + }, + { + desc: prometheus.NewDesc("node_bar", "another metric from nodestats", []string{}, nil), + metric: &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(1000)}}, + }, + { + desc: prometheus.NewDesc(errorName, errorHelp, []string{}, nil), + metric: &dto.Metric{Gauge: &dto.Gauge{Value: &noError}}, + }, + }, + }, + { + description: "arbitrary container metrics for different container, pods and namespaces", + config: ResourceMetricsConfig{ + ContainerMetrics: []ContainerResourceMetric{ + { + Name: "container_foo", + Description: "a metric from container stats", + ValueFn: func(s statsapi.ContainerStats) (*float64, time.Time) { + if s.CPU == nil { + return nil, time.Time{} + } + v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second) + return &v, s.CPU.Time.Time + }, + }, + { + Name: "container_bar", + Description: "another metric from container stats", + ValueFn: func(s statsapi.ContainerStats) (*float64, time.Time) { + if s.Memory == nil { + return nil, time.Time{} + } + v := float64(*s.Memory.WorkingSetBytes) + return &v, s.Memory.Time.Time + }, + }, + }, + }, + summary: &statsapi.Summary{ + Pods: []statsapi.PodStats{ + { + PodRef: statsapi.PodReference{ + Name: "pod_a", + Namespace: "namespace_a", + }, + Containers: []statsapi.ContainerStats{ + { + Name: "container_a", + CPU: &statsapi.CPUStats{ + Time: testTime, + UsageCoreNanoSeconds: uint64Ptr(10000000000), + }, + Memory: &statsapi.MemoryStats{ + Time: testTime, + WorkingSetBytes: uint64Ptr(1000), + }, + }, + { + Name: "container_b", + CPU: &statsapi.CPUStats{ + Time: testTime, + UsageCoreNanoSeconds: uint64Ptr(10000000000), + }, + Memory: &statsapi.MemoryStats{ + Time: testTime, + WorkingSetBytes: uint64Ptr(1000), + }, + }, + }, + }, + { + PodRef: statsapi.PodReference{ + Name: "pod_b", + Namespace: "namespace_b", + }, + Containers: []statsapi.ContainerStats{ + { + Name: "container_a", + CPU: &statsapi.CPUStats{ + Time: testTime, + UsageCoreNanoSeconds: uint64Ptr(10000000000), + }, + Memory: &statsapi.MemoryStats{ + Time: testTime, + WorkingSetBytes: uint64Ptr(1000), + }, + }, + }, + }, + }, + }, + summaryErr: nil, + expectedMetrics: []collectResult{ + { + desc: prometheus.NewDesc("container_foo", "a metric from container stats", []string{"container", "pod", "namespace"}, nil), + metric: &dto.Metric{ + Gauge: &dto.Gauge{Value: float64Ptr(10)}, + Label: []*dto.LabelPair{ + {Name: stringPtr("container"), Value: stringPtr("container_a")}, + {Name: stringPtr("namespace"), Value: stringPtr("namespace_a")}, + {Name: stringPtr("pod"), Value: stringPtr("pod_a")}, + }, + }, + }, + { + desc: prometheus.NewDesc("container_bar", "another metric from container stats", []string{"container", "pod", "namespace"}, nil), + metric: &dto.Metric{ + Gauge: &dto.Gauge{Value: float64Ptr(1000)}, + Label: []*dto.LabelPair{ + {Name: stringPtr("container"), Value: stringPtr("container_a")}, + {Name: stringPtr("namespace"), Value: stringPtr("namespace_a")}, + {Name: stringPtr("pod"), Value: stringPtr("pod_a")}, + }, + }, + }, + { + desc: prometheus.NewDesc("container_foo", "a metric from container stats", []string{"container", "pod", "namespace"}, nil), + metric: &dto.Metric{ + Gauge: &dto.Gauge{Value: float64Ptr(10)}, + Label: []*dto.LabelPair{ + {Name: stringPtr("container"), Value: stringPtr("container_b")}, + {Name: stringPtr("namespace"), Value: stringPtr("namespace_a")}, + {Name: stringPtr("pod"), Value: stringPtr("pod_a")}, + }, + }, + }, + { + desc: prometheus.NewDesc("container_bar", "another metric from container stats", []string{"container", "pod", "namespace"}, nil), + metric: &dto.Metric{ + Gauge: &dto.Gauge{Value: float64Ptr(1000)}, + Label: []*dto.LabelPair{ + {Name: stringPtr("container"), Value: stringPtr("container_b")}, + {Name: stringPtr("namespace"), Value: stringPtr("namespace_a")}, + {Name: stringPtr("pod"), Value: stringPtr("pod_a")}, + }, + }, + }, + { + desc: prometheus.NewDesc("container_foo", "a metric from container stats", []string{"container", "pod", "namespace"}, nil), + metric: &dto.Metric{ + Gauge: &dto.Gauge{Value: float64Ptr(10)}, + Label: []*dto.LabelPair{ + {Name: stringPtr("container"), Value: stringPtr("container_a")}, + {Name: stringPtr("namespace"), Value: stringPtr("namespace_b")}, + {Name: stringPtr("pod"), Value: stringPtr("pod_b")}, + }, + }, + }, + { + desc: prometheus.NewDesc("container_bar", "another metric from container stats", []string{"container", "pod", "namespace"}, nil), + metric: &dto.Metric{ + Gauge: &dto.Gauge{Value: float64Ptr(1000)}, + Label: []*dto.LabelPair{ + {Name: stringPtr("container"), Value: stringPtr("container_a")}, + {Name: stringPtr("namespace"), Value: stringPtr("namespace_b")}, + {Name: stringPtr("pod"), Value: stringPtr("pod_b")}, + }, + }, + }, + { + desc: prometheus.NewDesc(errorName, errorHelp, []string{}, nil), + metric: &dto.Metric{Gauge: &dto.Gauge{Value: &noError}}, + }, + }, + }, + } { + t.Run(tc.description, func(t *testing.T) { + provider := &mockSummaryProvider{} + provider.On("GetCPUAndMemoryStats").Return(tc.summary, tc.summaryErr) + collector := NewPrometheusResourceMetricCollector(provider, tc.config) + metrics := collectMetrics(t, collector, len(tc.expectedMetrics)) + for i := range metrics { + assertEqual(t, metrics[i], tc.expectedMetrics[i]) + } + }) + } +} + +// collectMetrics is a wrapper around a prometheus.Collector which returns the metrics added to the metric channel as a slice.metric +// It will block indefinitely if the collector does not collect exactly numMetrics. +func collectMetrics(t *testing.T, collector prometheus.Collector, numMetrics int) (results []collectResult) { + metricsCh := make(chan prometheus.Metric) + done := make(chan struct{}) + go func() { + collector.Collect(metricsCh) + done <- struct{}{} + }() + for i := 0; i < numMetrics; i++ { + metric := <-metricsCh + metricProto := &dto.Metric{} + assert.NoError(t, metric.Write(metricProto)) + results = append(results, collectResult{desc: metric.Desc(), metric: metricProto}) + } + <-done + return +} + +// assertEqual asserts for semanitic equality for fields we care about +func assertEqual(t *testing.T, expected, actual collectResult) { + assert.Equal(t, expected.desc.String(), actual.desc.String()) + assert.Equal(t, *expected.metric.Gauge.Value, *actual.metric.Gauge.Value, "for desc: %v", expected.desc.String()) + assert.Equal(t, len(expected.metric.Label), len(actual.metric.Label)) + if len(expected.metric.Label) == len(actual.metric.Label) { + for i := range expected.metric.Label { + assert.Equal(t, *expected.metric.Label[i], *actual.metric.Label[i], "for desc: %v", expected.desc.String()) + } + } +} + +func stringPtr(s string) *string { + return &s +} + +func uint64Ptr(u uint64) *uint64 { + return &u +} + +func float64Ptr(f float64) *float64 { + return &f +} diff --git a/test/e2e/framework/kubelet_stats.go b/test/e2e/framework/kubelet_stats.go index 9b8f4c9cb72..4341ab1ec3d 100644 --- a/test/e2e/framework/kubelet_stats.go +++ b/test/e2e/framework/kubelet_stats.go @@ -67,7 +67,7 @@ func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j]. // or else, the function will try to get kubelet metrics directly from the node. func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) { if c == nil { - return metrics.GrabKubeletMetricsWithoutProxy(nodeName) + return metrics.GrabKubeletMetricsWithoutProxy(nodeName, "/metrics") } grabber, err := metrics.NewMetricsGrabber(c, nil, true, false, false, false, false) if err != nil { diff --git a/test/e2e/framework/metrics/kubelet_metrics.go b/test/e2e/framework/metrics/kubelet_metrics.go index cf2d666e8b0..9b381b8fc93 100644 --- a/test/e2e/framework/metrics/kubelet_metrics.go +++ b/test/e2e/framework/metrics/kubelet_metrics.go @@ -36,9 +36,8 @@ func NewKubeletMetrics() KubeletMetrics { // GrabKubeletMetricsWithoutProxy retrieve metrics from the kubelet on the given node using a simple GET over http. // Currently only used in integration tests. -func GrabKubeletMetricsWithoutProxy(nodeName string) (KubeletMetrics, error) { - metricsEndpoint := "http://%s/metrics" - resp, err := http.Get(fmt.Sprintf(metricsEndpoint, nodeName)) +func GrabKubeletMetricsWithoutProxy(nodeName, path string) (KubeletMetrics, error) { + resp, err := http.Get(fmt.Sprintf("http://%s%s", nodeName, path)) if err != nil { return KubeletMetrics{}, err } diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index e643dc2953a..4dcf27a9279 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -99,6 +99,7 @@ go_test( "node_perf_test.go", "pids_test.go", "pods_container_manager_test.go", + "resource_metrics_test.go", "resource_usage_test.go", "restart_test.go", "runtime_conformance_test.go", @@ -116,6 +117,7 @@ go_test( "//pkg/kubelet/apis/config:go_default_library", "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", + "//pkg/kubelet/apis/resourcemetrics/v1alpha1:go_default_library", "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm/cpumanager:go_default_library", diff --git a/test/e2e_node/density_test.go b/test/e2e_node/density_test.go index 3a80860a03a..8e71f02888a 100644 --- a/test/e2e_node/density_test.go +++ b/test/e2e_node/density_test.go @@ -454,7 +454,7 @@ func createBatchPodWithRateControl(f *framework.Framework, pods []*v1.Pod, inter // getPodStartLatency gets prometheus metric 'pod start latency' from kubelet func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) { latencyMetrics := framework.KubeletLatencyMetrics{} - ms, err := metrics.GrabKubeletMetricsWithoutProxy(node) + ms, err := metrics.GrabKubeletMetricsWithoutProxy(node, "/metrics") Expect(err).NotTo(HaveOccurred(), "Failed to get kubelet metrics without proxy in node %s", node) for _, samples := range ms { diff --git a/test/e2e_node/gpu_device_plugin.go b/test/e2e_node/gpu_device_plugin.go index 17a91f054e8..beff923f0ef 100644 --- a/test/e2e_node/gpu_device_plugin.go +++ b/test/e2e_node/gpu_device_plugin.go @@ -152,7 +152,7 @@ func checkIfNvidiaGPUsExistOnNode() bool { } func logDevicePluginMetrics() { - ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName + ":10255") + ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics") framework.ExpectNoError(err) for msKey, samples := range ms { switch msKey { diff --git a/test/e2e_node/resource_metrics_test.go b/test/e2e_node/resource_metrics_test.go new file mode 100644 index 00000000000..4dcc4faa9cf --- /dev/null +++ b/test/e2e_node/resource_metrics_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/framework/metrics" + + "github.com/prometheus/common/model" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + "github.com/onsi/gomega/types" +) + +const ( + pod0 = "stats-busybox-0" + pod1 = "stats-busybox-1" + maxStatsAge = time.Minute +) + +var _ = framework.KubeDescribe("ResourceMetricsAPI", func() { + f := framework.NewDefaultFramework("resource-metrics") + Context("when querying /resource/metrics", func() { + BeforeEach(func() { + By("Creating test pods") + numRestarts := int32(1) + pods := getSummaryTestPods(f, numRestarts, pod0, pod1) + f.PodClient().CreateBatch(pods) + + By("Waiting for test pods to restart the desired number of times") + Eventually(func() error { + for _, pod := range pods { + err := verifyPodRestartCount(f, pod.Name, len(pod.Spec.Containers), numRestarts) + if err != nil { + return err + } + } + return nil + }, time.Minute, 5*time.Second).Should(Succeed()) + + By("Waiting 15 seconds for cAdvisor to collect 2 stats points") + time.Sleep(15 * time.Second) + }) + It("should report resource usage through the v1alpha1 resouce metrics api", func() { + By("Fetching node so we can know proper node memory bounds for unconstrained cgroups") + node := getLocalNode(f) + memoryCapacity := node.Status.Capacity["memory"] + memoryLimit := memoryCapacity.Value() + + matchV1alpha1Expectations := gstruct.MatchAllKeys(gstruct.Keys{ + "scrape_error": gstruct.Ignore(), + "node_cpu_usage_seconds_total": gstruct.MatchAllElements(nodeId, gstruct.Elements{ + "": boundedSample(1, 1E6), + }), + "node_memory_working_set_bytes": gstruct.MatchAllElements(nodeId, gstruct.Elements{ + "": boundedSample(10*framework.Mb, memoryLimit), + }), + + "container_cpu_usage_seconds_total": gstruct.MatchElements(containerId, gstruct.IgnoreExtras, gstruct.Elements{ + fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod0, "busybox-container"): boundedSample(0, 100), + fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod1, "busybox-container"): boundedSample(0, 100), + }), + + "container_memory_working_set_bytes": gstruct.MatchAllElements(containerId, gstruct.Elements{ + fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod0, "busybox-container"): boundedSample(10*framework.Kb, 80*framework.Mb), + fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod1, "busybox-container"): boundedSample(10*framework.Kb, 80*framework.Mb), + }), + }) + By("Giving pods a minute to start up and produce metrics") + Eventually(getV1alpha1ResourceMetrics, 1*time.Minute, 15*time.Second).Should(matchV1alpha1Expectations) + By("Ensuring the metrics match the expectations a few more times") + Consistently(getV1alpha1ResourceMetrics, 1*time.Minute, 15*time.Second).Should(matchV1alpha1Expectations) + }) + AfterEach(func() { + By("Deleting test pods") + f.PodClient().DeleteSync(pod0, &metav1.DeleteOptions{}, 10*time.Minute) + f.PodClient().DeleteSync(pod1, &metav1.DeleteOptions{}, 10*time.Minute) + if !CurrentGinkgoTestDescription().Failed { + return + } + if framework.TestContext.DumpLogsOnFailure { + framework.LogFailedContainers(f.ClientSet, f.Namespace.Name, framework.Logf) + } + By("Recording processes in system cgroups") + recordSystemCgroupProcesses() + }) + }) +}) + +func getV1alpha1ResourceMetrics() (metrics.KubeletMetrics, error) { + return metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics/resource/"+v1alpha1.Version) +} + +func nodeId(element interface{}) string { + return "" +} + +func containerId(element interface{}) string { + el := element.(*model.Sample) + return fmt.Sprintf("%s::%s::%s", el.Metric["namespace"], el.Metric["pod"], el.Metric["container"]) +} + +func boundedSample(lower, upper interface{}) types.GomegaMatcher { + return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{ + // We already check Metric when matching the Id + "Metric": gstruct.Ignore(), + "Value": And(BeNumerically(">=", lower), BeNumerically("<=", upper)), + "Timestamp": WithTransform(func(t model.Time) time.Time { + // model.Time is in Milliseconds since epoch + return time.Unix(0, int64(t)*int64(time.Millisecond)) + }, + And( + BeTemporally(">=", time.Now().Add(-maxStatsAge)), + // Now() is the test start time, not the match time, so permit a few extra minutes. + BeTemporally("<", time.Now().Add(2*time.Minute))), + )})) +} diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index fd6353e90b2..8981bb7f71d 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -353,7 +353,7 @@ func logKubeletLatencyMetrics(metricNames ...string) { for _, key := range metricNames { metricSet.Insert(kubeletmetrics.KubeletSubsystem + "_" + key) } - metric, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName + ":10255") + metric, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics") if err != nil { framework.Logf("Error getting kubelet metrics: %v", err) } else { @@ -364,7 +364,7 @@ func logKubeletLatencyMetrics(metricNames ...string) { // returns config related metrics from the local kubelet, filtered to the filterMetricNames passed in func getKubeletMetrics(filterMetricNames sets.String) (frameworkmetrics.KubeletMetrics, error) { // grab Kubelet metrics - ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName + ":10255") + ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics") if err != nil { return nil, err }