add kubelet prometheus resource metrics endpoint

This commit is contained in:
David Ashpole 2019-03-07 15:39:37 -08:00
parent fec22bbb25
commit 6051664c0f
15 changed files with 743 additions and 14 deletions

View File

@ -44,6 +44,7 @@ filegroup(
"//pkg/kubelet/apis/pluginregistration/v1alpha1:all-srcs",
"//pkg/kubelet/apis/pluginregistration/v1beta1:all-srcs",
"//pkg/kubelet/apis/podresources:all-srcs",
"//pkg/kubelet/apis/resourcemetrics/v1alpha1:all-srcs",
"//pkg/kubelet/apis/stats/v1alpha1:all-srcs",
],
tags = ["automanaged"],

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["config.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,81 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"time"
summary "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
)
// Version is the string representation of the version of this configuration
const Version = "v1alpha1"
// Config is the v1alpha1 resource metrics definition
func Config() stats.ResourceMetricsConfig {
return stats.ResourceMetricsConfig{
NodeMetrics: []stats.NodeResourceMetric{
{
Name: "node_cpu_usage_seconds_total",
Description: "Cumulative cpu time consumed by the node in core-seconds",
ValueFn: func(s summary.NodeStats) (*float64, time.Time) {
if s.CPU == nil {
return nil, time.Time{}
}
v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second)
return &v, s.CPU.Time.Time
},
},
{
Name: "node_memory_working_set_bytes",
Description: "Current working set of the node in bytes",
ValueFn: func(s summary.NodeStats) (*float64, time.Time) {
if s.Memory == nil {
return nil, time.Time{}
}
v := float64(*s.Memory.WorkingSetBytes)
return &v, s.Memory.Time.Time
},
},
},
ContainerMetrics: []stats.ContainerResourceMetric{
{
Name: "container_cpu_usage_seconds_total",
Description: "Cumulative cpu time consumed by the container in core-seconds",
ValueFn: func(s summary.ContainerStats) (*float64, time.Time) {
if s.CPU == nil {
return nil, time.Time{}
}
v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second)
return &v, s.CPU.Time.Time
},
},
{
Name: "container_memory_working_set_bytes",
Description: "Current working set of the container in bytes",
ValueFn: func(s summary.ContainerStats) (*float64, time.Time) {
if s.Memory == nil {
return nil, time.Time{}
}
v := float64(*s.Memory.WorkingSetBytes)
return &v, s.Memory.Time.Time
},
},
},
}
}

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/apis/core/v1/validation:go_default_library",
"//pkg/kubelet/apis/podresources:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/apis/resourcemetrics/v1alpha1:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/prober:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"net/http"
"net/http/pprof"
"net/url"
"path"
"reflect"
goruntime "runtime"
"strconv"
@ -59,6 +60,7 @@ import (
"k8s.io/kubernetes/pkg/apis/core/v1/validation"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
@ -71,12 +73,13 @@ import (
)
const (
metricsPath = "/metrics"
cadvisorMetricsPath = "/metrics/cadvisor"
proberMetricsPath = "/metrics/probes"
specPath = "/spec/"
statsPath = "/stats/"
logsPath = "/logs/"
metricsPath = "/metrics"
cadvisorMetricsPath = "/metrics/cadvisor"
resourceMetricsPathPrefix = "/metrics/resource"
proberMetricsPath = "/metrics/probes"
specPath = "/spec/"
statsPath = "/stats/"
logsPath = "/logs/"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
@ -308,6 +311,12 @@ func (s *Server) InstallDefaultHandlers() {
promhttp.HandlerFor(r, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
)
v1alpha1ResourceRegistry := prometheus.NewRegistry()
v1alpha1ResourceRegistry.MustRegister(stats.NewPrometheusResourceMetricCollector(s.resourceAnalyzer, v1alpha1.Config()))
s.restfulCont.Handle(path.Join(resourceMetricsPathPrefix, v1alpha1.Version),
promhttp.HandlerFor(v1alpha1ResourceRegistry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
)
// prober metrics are exposed under a different endpoint
p := prometheus.NewRegistry()
p.MustRegister(prober.ProberResults)

View File

@ -6,6 +6,7 @@ go_library(
"doc.go",
"fs_resource_analyzer.go",
"handler.go",
"prometheus_resource_metrics.go",
"resource_analyzer.go",
"summary.go",
"summary_sys_containers.go",
@ -27,6 +28,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -34,6 +36,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"prometheus_resource_metrics_test.go",
"summary_test.go",
"summary_windows_test.go",
"volume_stat_calculator_test.go",
@ -46,7 +49,10 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:android": [
"//pkg/kubelet/cm:go_default_library",

View File

@ -0,0 +1,118 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"time"
"k8s.io/klog"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
)
// NodeResourceMetric describes a metric for the node
type NodeResourceMetric struct {
Name string
Description string
ValueFn func(stats.NodeStats) (*float64, time.Time)
}
func (n *NodeResourceMetric) desc() *prometheus.Desc {
return prometheus.NewDesc(n.Name, n.Description, []string{}, nil)
}
// ContainerResourceMetric describes a metric for containers
type ContainerResourceMetric struct {
Name string
Description string
ValueFn func(stats.ContainerStats) (*float64, time.Time)
}
func (n *ContainerResourceMetric) desc() *prometheus.Desc {
return prometheus.NewDesc(n.Name, n.Description, []string{"container", "pod", "namespace"}, nil)
}
// ResourceMetricsConfig specifies which metrics to collect and export
type ResourceMetricsConfig struct {
NodeMetrics []NodeResourceMetric
ContainerMetrics []ContainerResourceMetric
}
// NewPrometheusResourceMetricCollector returns a prometheus.Collector which exports resource metrics
func NewPrometheusResourceMetricCollector(provider SummaryProvider, config ResourceMetricsConfig) prometheus.Collector {
return &resourceMetricCollector{
provider: provider,
config: config,
errors: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "scrape_error",
Help: "1 if there was an error while getting container metrics, 0 otherwise",
}),
}
}
type resourceMetricCollector struct {
provider SummaryProvider
config ResourceMetricsConfig
errors prometheus.Gauge
}
var _ prometheus.Collector = &resourceMetricCollector{}
// Describe implements prometheus.Collector
func (rc *resourceMetricCollector) Describe(ch chan<- *prometheus.Desc) {
rc.errors.Describe(ch)
for _, metric := range rc.config.NodeMetrics {
ch <- metric.desc()
}
for _, metric := range rc.config.ContainerMetrics {
ch <- metric.desc()
}
}
// Collect implements prometheus.Collector
// Since new containers are frequently created and removed, using the prometheus.Gauge Collector would
// leak metric collectors for containers or pods that no longer exist. Instead, implement
// prometheus.Collector in a way that only collects metrics for active containers.
func (rc *resourceMetricCollector) Collect(ch chan<- prometheus.Metric) {
rc.errors.Set(0)
defer rc.errors.Collect(ch)
summary, err := rc.provider.GetCPUAndMemoryStats()
if err != nil {
rc.errors.Set(1)
klog.Warningf("Error getting summary for resourceMetric prometheus endpoint: %v", err)
return
}
for _, metric := range rc.config.NodeMetrics {
if value, timestamp := metric.ValueFn(summary.Node); value != nil {
ch <- prometheus.NewMetricWithTimestamp(timestamp,
prometheus.MustNewConstMetric(metric.desc(), prometheus.GaugeValue, *value))
}
}
for _, pod := range summary.Pods {
for _, container := range pod.Containers {
for _, metric := range rc.config.ContainerMetrics {
if value, timestamp := metric.ValueFn(container); value != nil {
ch <- prometheus.NewMetricWithTimestamp(timestamp,
prometheus.MustNewConstMetric(metric.desc(), prometheus.GaugeValue, *value, container.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}
}
}
}
}

View File

@ -0,0 +1,348 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"fmt"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
const (
errorName = "scrape_error"
errorHelp = "1 if there was an error while getting container metrics, 0 otherwise"
)
var (
noError = float64(0)
hasError = float64(1)
)
type mockSummaryProvider struct {
mock.Mock
}
func (m *mockSummaryProvider) Get(updateStats bool) (*statsapi.Summary, error) {
args := m.Called(updateStats)
return args.Get(0).(*statsapi.Summary), args.Error(1)
}
func (m *mockSummaryProvider) GetCPUAndMemoryStats() (*statsapi.Summary, error) {
args := m.Called()
return args.Get(0).(*statsapi.Summary), args.Error(1)
}
type collectResult struct {
desc *prometheus.Desc
metric *dto.Metric
}
func TestCollectResourceMetrics(t *testing.T) {
testTime := metav1.Now()
for _, tc := range []struct {
description string
config ResourceMetricsConfig
summary *statsapi.Summary
summaryErr error
expectedMetrics []collectResult
}{
{
description: "error getting summary",
config: ResourceMetricsConfig{},
summary: nil,
summaryErr: fmt.Errorf("failed to get summary"),
expectedMetrics: []collectResult{
{
desc: prometheus.NewDesc(errorName, errorHelp, []string{}, nil),
metric: &dto.Metric{Gauge: &dto.Gauge{Value: &hasError}},
},
},
},
{
description: "arbitrary node metrics",
config: ResourceMetricsConfig{
NodeMetrics: []NodeResourceMetric{
{
Name: "node_foo",
Description: "a metric from nodestats",
ValueFn: func(s statsapi.NodeStats) (*float64, time.Time) {
if s.CPU == nil {
return nil, time.Time{}
}
v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second)
return &v, s.CPU.Time.Time
},
},
{
Name: "node_bar",
Description: "another metric from nodestats",
ValueFn: func(s statsapi.NodeStats) (*float64, time.Time) {
if s.Memory == nil {
return nil, time.Time{}
}
v := float64(*s.Memory.WorkingSetBytes)
return &v, s.Memory.Time.Time
},
},
},
},
summary: &statsapi.Summary{
Node: statsapi.NodeStats{
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
summaryErr: nil,
expectedMetrics: []collectResult{
{
desc: prometheus.NewDesc("node_foo", "a metric from nodestats", []string{}, nil),
metric: &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(10)}},
},
{
desc: prometheus.NewDesc("node_bar", "another metric from nodestats", []string{}, nil),
metric: &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(1000)}},
},
{
desc: prometheus.NewDesc(errorName, errorHelp, []string{}, nil),
metric: &dto.Metric{Gauge: &dto.Gauge{Value: &noError}},
},
},
},
{
description: "arbitrary container metrics for different container, pods and namespaces",
config: ResourceMetricsConfig{
ContainerMetrics: []ContainerResourceMetric{
{
Name: "container_foo",
Description: "a metric from container stats",
ValueFn: func(s statsapi.ContainerStats) (*float64, time.Time) {
if s.CPU == nil {
return nil, time.Time{}
}
v := float64(*s.CPU.UsageCoreNanoSeconds) / float64(time.Second)
return &v, s.CPU.Time.Time
},
},
{
Name: "container_bar",
Description: "another metric from container stats",
ValueFn: func(s statsapi.ContainerStats) (*float64, time.Time) {
if s.Memory == nil {
return nil, time.Time{}
}
v := float64(*s.Memory.WorkingSetBytes)
return &v, s.Memory.Time.Time
},
},
},
},
summary: &statsapi.Summary{
Pods: []statsapi.PodStats{
{
PodRef: statsapi.PodReference{
Name: "pod_a",
Namespace: "namespace_a",
},
Containers: []statsapi.ContainerStats{
{
Name: "container_a",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
{
Name: "container_b",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
},
{
PodRef: statsapi.PodReference{
Name: "pod_b",
Namespace: "namespace_b",
},
Containers: []statsapi.ContainerStats{
{
Name: "container_a",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
},
},
},
summaryErr: nil,
expectedMetrics: []collectResult{
{
desc: prometheus.NewDesc("container_foo", "a metric from container stats", []string{"container", "pod", "namespace"}, nil),
metric: &dto.Metric{
Gauge: &dto.Gauge{Value: float64Ptr(10)},
Label: []*dto.LabelPair{
{Name: stringPtr("container"), Value: stringPtr("container_a")},
{Name: stringPtr("namespace"), Value: stringPtr("namespace_a")},
{Name: stringPtr("pod"), Value: stringPtr("pod_a")},
},
},
},
{
desc: prometheus.NewDesc("container_bar", "another metric from container stats", []string{"container", "pod", "namespace"}, nil),
metric: &dto.Metric{
Gauge: &dto.Gauge{Value: float64Ptr(1000)},
Label: []*dto.LabelPair{
{Name: stringPtr("container"), Value: stringPtr("container_a")},
{Name: stringPtr("namespace"), Value: stringPtr("namespace_a")},
{Name: stringPtr("pod"), Value: stringPtr("pod_a")},
},
},
},
{
desc: prometheus.NewDesc("container_foo", "a metric from container stats", []string{"container", "pod", "namespace"}, nil),
metric: &dto.Metric{
Gauge: &dto.Gauge{Value: float64Ptr(10)},
Label: []*dto.LabelPair{
{Name: stringPtr("container"), Value: stringPtr("container_b")},
{Name: stringPtr("namespace"), Value: stringPtr("namespace_a")},
{Name: stringPtr("pod"), Value: stringPtr("pod_a")},
},
},
},
{
desc: prometheus.NewDesc("container_bar", "another metric from container stats", []string{"container", "pod", "namespace"}, nil),
metric: &dto.Metric{
Gauge: &dto.Gauge{Value: float64Ptr(1000)},
Label: []*dto.LabelPair{
{Name: stringPtr("container"), Value: stringPtr("container_b")},
{Name: stringPtr("namespace"), Value: stringPtr("namespace_a")},
{Name: stringPtr("pod"), Value: stringPtr("pod_a")},
},
},
},
{
desc: prometheus.NewDesc("container_foo", "a metric from container stats", []string{"container", "pod", "namespace"}, nil),
metric: &dto.Metric{
Gauge: &dto.Gauge{Value: float64Ptr(10)},
Label: []*dto.LabelPair{
{Name: stringPtr("container"), Value: stringPtr("container_a")},
{Name: stringPtr("namespace"), Value: stringPtr("namespace_b")},
{Name: stringPtr("pod"), Value: stringPtr("pod_b")},
},
},
},
{
desc: prometheus.NewDesc("container_bar", "another metric from container stats", []string{"container", "pod", "namespace"}, nil),
metric: &dto.Metric{
Gauge: &dto.Gauge{Value: float64Ptr(1000)},
Label: []*dto.LabelPair{
{Name: stringPtr("container"), Value: stringPtr("container_a")},
{Name: stringPtr("namespace"), Value: stringPtr("namespace_b")},
{Name: stringPtr("pod"), Value: stringPtr("pod_b")},
},
},
},
{
desc: prometheus.NewDesc(errorName, errorHelp, []string{}, nil),
metric: &dto.Metric{Gauge: &dto.Gauge{Value: &noError}},
},
},
},
} {
t.Run(tc.description, func(t *testing.T) {
provider := &mockSummaryProvider{}
provider.On("GetCPUAndMemoryStats").Return(tc.summary, tc.summaryErr)
collector := NewPrometheusResourceMetricCollector(provider, tc.config)
metrics := collectMetrics(t, collector, len(tc.expectedMetrics))
for i := range metrics {
assertEqual(t, metrics[i], tc.expectedMetrics[i])
}
})
}
}
// collectMetrics is a wrapper around a prometheus.Collector which returns the metrics added to the metric channel as a slice.metric
// It will block indefinitely if the collector does not collect exactly numMetrics.
func collectMetrics(t *testing.T, collector prometheus.Collector, numMetrics int) (results []collectResult) {
metricsCh := make(chan prometheus.Metric)
done := make(chan struct{})
go func() {
collector.Collect(metricsCh)
done <- struct{}{}
}()
for i := 0; i < numMetrics; i++ {
metric := <-metricsCh
metricProto := &dto.Metric{}
assert.NoError(t, metric.Write(metricProto))
results = append(results, collectResult{desc: metric.Desc(), metric: metricProto})
}
<-done
return
}
// assertEqual asserts for semanitic equality for fields we care about
func assertEqual(t *testing.T, expected, actual collectResult) {
assert.Equal(t, expected.desc.String(), actual.desc.String())
assert.Equal(t, *expected.metric.Gauge.Value, *actual.metric.Gauge.Value, "for desc: %v", expected.desc.String())
assert.Equal(t, len(expected.metric.Label), len(actual.metric.Label))
if len(expected.metric.Label) == len(actual.metric.Label) {
for i := range expected.metric.Label {
assert.Equal(t, *expected.metric.Label[i], *actual.metric.Label[i], "for desc: %v", expected.desc.String())
}
}
}
func stringPtr(s string) *string {
return &s
}
func uint64Ptr(u uint64) *uint64 {
return &u
}
func float64Ptr(f float64) *float64 {
return &f
}

View File

@ -67,7 +67,7 @@ func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].
// or else, the function will try to get kubelet metrics directly from the node.
func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) {
if c == nil {
return metrics.GrabKubeletMetricsWithoutProxy(nodeName)
return metrics.GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
}
grabber, err := metrics.NewMetricsGrabber(c, nil, true, false, false, false, false)
if err != nil {

View File

@ -36,9 +36,8 @@ func NewKubeletMetrics() KubeletMetrics {
// GrabKubeletMetricsWithoutProxy retrieve metrics from the kubelet on the given node using a simple GET over http.
// Currently only used in integration tests.
func GrabKubeletMetricsWithoutProxy(nodeName string) (KubeletMetrics, error) {
metricsEndpoint := "http://%s/metrics"
resp, err := http.Get(fmt.Sprintf(metricsEndpoint, nodeName))
func GrabKubeletMetricsWithoutProxy(nodeName, path string) (KubeletMetrics, error) {
resp, err := http.Get(fmt.Sprintf("http://%s%s", nodeName, path))
if err != nil {
return KubeletMetrics{}, err
}

View File

@ -99,6 +99,7 @@ go_test(
"node_perf_test.go",
"pids_test.go",
"pods_container_manager_test.go",
"resource_metrics_test.go",
"resource_usage_test.go",
"restart_test.go",
"runtime_conformance_test.go",
@ -116,6 +117,7 @@ go_test(
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/resourcemetrics/v1alpha1:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",

View File

@ -454,7 +454,7 @@ func createBatchPodWithRateControl(f *framework.Framework, pods []*v1.Pod, inter
// getPodStartLatency gets prometheus metric 'pod start latency' from kubelet
func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) {
latencyMetrics := framework.KubeletLatencyMetrics{}
ms, err := metrics.GrabKubeletMetricsWithoutProxy(node)
ms, err := metrics.GrabKubeletMetricsWithoutProxy(node, "/metrics")
Expect(err).NotTo(HaveOccurred(), "Failed to get kubelet metrics without proxy in node %s", node)
for _, samples := range ms {

View File

@ -152,7 +152,7 @@ func checkIfNvidiaGPUsExistOnNode() bool {
}
func logDevicePluginMetrics() {
ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName + ":10255")
ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics")
framework.ExpectNoError(err)
for msKey, samples := range ms {
switch msKey {

View File

@ -0,0 +1,138 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/metrics"
"github.com/prometheus/common/model"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gstruct"
"github.com/onsi/gomega/types"
)
const (
pod0 = "stats-busybox-0"
pod1 = "stats-busybox-1"
maxStatsAge = time.Minute
)
var _ = framework.KubeDescribe("ResourceMetricsAPI", func() {
f := framework.NewDefaultFramework("resource-metrics")
Context("when querying /resource/metrics", func() {
BeforeEach(func() {
By("Creating test pods")
numRestarts := int32(1)
pods := getSummaryTestPods(f, numRestarts, pod0, pod1)
f.PodClient().CreateBatch(pods)
By("Waiting for test pods to restart the desired number of times")
Eventually(func() error {
for _, pod := range pods {
err := verifyPodRestartCount(f, pod.Name, len(pod.Spec.Containers), numRestarts)
if err != nil {
return err
}
}
return nil
}, time.Minute, 5*time.Second).Should(Succeed())
By("Waiting 15 seconds for cAdvisor to collect 2 stats points")
time.Sleep(15 * time.Second)
})
It("should report resource usage through the v1alpha1 resouce metrics api", func() {
By("Fetching node so we can know proper node memory bounds for unconstrained cgroups")
node := getLocalNode(f)
memoryCapacity := node.Status.Capacity["memory"]
memoryLimit := memoryCapacity.Value()
matchV1alpha1Expectations := gstruct.MatchAllKeys(gstruct.Keys{
"scrape_error": gstruct.Ignore(),
"node_cpu_usage_seconds_total": gstruct.MatchAllElements(nodeId, gstruct.Elements{
"": boundedSample(1, 1E6),
}),
"node_memory_working_set_bytes": gstruct.MatchAllElements(nodeId, gstruct.Elements{
"": boundedSample(10*framework.Mb, memoryLimit),
}),
"container_cpu_usage_seconds_total": gstruct.MatchElements(containerId, gstruct.IgnoreExtras, gstruct.Elements{
fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod0, "busybox-container"): boundedSample(0, 100),
fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod1, "busybox-container"): boundedSample(0, 100),
}),
"container_memory_working_set_bytes": gstruct.MatchAllElements(containerId, gstruct.Elements{
fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod0, "busybox-container"): boundedSample(10*framework.Kb, 80*framework.Mb),
fmt.Sprintf("%s::%s::%s", f.Namespace.Name, pod1, "busybox-container"): boundedSample(10*framework.Kb, 80*framework.Mb),
}),
})
By("Giving pods a minute to start up and produce metrics")
Eventually(getV1alpha1ResourceMetrics, 1*time.Minute, 15*time.Second).Should(matchV1alpha1Expectations)
By("Ensuring the metrics match the expectations a few more times")
Consistently(getV1alpha1ResourceMetrics, 1*time.Minute, 15*time.Second).Should(matchV1alpha1Expectations)
})
AfterEach(func() {
By("Deleting test pods")
f.PodClient().DeleteSync(pod0, &metav1.DeleteOptions{}, 10*time.Minute)
f.PodClient().DeleteSync(pod1, &metav1.DeleteOptions{}, 10*time.Minute)
if !CurrentGinkgoTestDescription().Failed {
return
}
if framework.TestContext.DumpLogsOnFailure {
framework.LogFailedContainers(f.ClientSet, f.Namespace.Name, framework.Logf)
}
By("Recording processes in system cgroups")
recordSystemCgroupProcesses()
})
})
})
func getV1alpha1ResourceMetrics() (metrics.KubeletMetrics, error) {
return metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics/resource/"+v1alpha1.Version)
}
func nodeId(element interface{}) string {
return ""
}
func containerId(element interface{}) string {
el := element.(*model.Sample)
return fmt.Sprintf("%s::%s::%s", el.Metric["namespace"], el.Metric["pod"], el.Metric["container"])
}
func boundedSample(lower, upper interface{}) types.GomegaMatcher {
return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{
// We already check Metric when matching the Id
"Metric": gstruct.Ignore(),
"Value": And(BeNumerically(">=", lower), BeNumerically("<=", upper)),
"Timestamp": WithTransform(func(t model.Time) time.Time {
// model.Time is in Milliseconds since epoch
return time.Unix(0, int64(t)*int64(time.Millisecond))
},
And(
BeTemporally(">=", time.Now().Add(-maxStatsAge)),
// Now() is the test start time, not the match time, so permit a few extra minutes.
BeTemporally("<", time.Now().Add(2*time.Minute))),
)}))
}

View File

@ -353,7 +353,7 @@ func logKubeletLatencyMetrics(metricNames ...string) {
for _, key := range metricNames {
metricSet.Insert(kubeletmetrics.KubeletSubsystem + "_" + key)
}
metric, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName + ":10255")
metric, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics")
if err != nil {
framework.Logf("Error getting kubelet metrics: %v", err)
} else {
@ -364,7 +364,7 @@ func logKubeletLatencyMetrics(metricNames ...string) {
// returns config related metrics from the local kubelet, filtered to the filterMetricNames passed in
func getKubeletMetrics(filterMetricNames sets.String) (frameworkmetrics.KubeletMetrics, error) {
// grab Kubelet metrics
ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName + ":10255")
ms, err := metrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics")
if err != nil {
return nil, err
}