Merge pull request #86282 from RainbowMango/pr_refactor_resource_endpoint

Refactor kubelet resource metrics
This commit is contained in:
Kubernetes Prow Robot 2020-01-14 02:23:09 -08:00 committed by GitHub
commit be26fbc638
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 373 additions and 13 deletions

View File

@ -24,6 +24,11 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/stats"
)
// This file contains a series of deprecated metrics which we emit them by endpoint `/metrics/resource/v1alpha1`.
// These metrics have been adapted to new endpoint `/metrics/resource` as well as new `Desc`s.
// In general, we don't need to maintain these deprecated metrics any more.
// TODO(RainbowMango): Remove this file in release 1.20.0+.
// Version is the string representation of the version of this configuration
const Version = "v1alpha1"
@ -33,28 +38,28 @@ var (
nil,
nil,
metrics.ALPHA,
"")
"1.18.0")
nodeMemoryUsageDesc = metrics.NewDesc("node_memory_working_set_bytes",
"Current working set of the node in bytes",
nil,
nil,
metrics.ALPHA,
"")
"1.18.0")
containerCPUUsageDesc = metrics.NewDesc("container_cpu_usage_seconds_total",
"Cumulative cpu time consumed by the container in core-seconds",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
"1.18.0")
containerMemoryUsageDesc = metrics.NewDesc("container_memory_working_set_bytes",
"Current working set of the container in bytes",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
"1.18.0")
)
// getNodeCPUMetrics returns CPU utilization of a node.

View File

@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"log_metrics.go",
"resource_metrics.go",
"volume_stats.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/metrics/collectors",
@ -22,6 +23,7 @@ go_test(
name = "go_default_test",
srcs = [
"log_metrics_test.go",
"resource_metrics_test.go",
"volume_stats_test.go",
],
embed = [":go_default_library"],
@ -30,6 +32,7 @@ go_test(
"//pkg/kubelet/server/stats/testing:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library",
],
)

View File

@ -0,0 +1,153 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"time"
"k8s.io/component-base/metrics"
"k8s.io/klog"
summary "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
)
var (
nodeCPUUsageDesc = metrics.NewDesc("node_cpu_usage_seconds",
"Cumulative cpu time consumed by the node in core-seconds",
nil,
nil,
metrics.ALPHA,
"")
nodeMemoryUsageDesc = metrics.NewDesc("node_memory_working_set_bytes",
"Current working set of the node in bytes",
nil,
nil,
metrics.ALPHA,
"")
containerCPUUsageDesc = metrics.NewDesc("container_cpu_usage_seconds",
"Cumulative cpu time consumed by the container in core-seconds",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
containerMemoryUsageDesc = metrics.NewDesc("container_memory_working_set_bytes",
"Current working set of the container in bytes",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
resouceScrapeResultDesc = metrics.NewDesc("scrape_error",
"1 if there was an error while getting container metrics, 0 otherwise",
nil,
nil,
metrics.ALPHA,
"")
)
// NewResourceMetricsCollector returns a metrics.StableCollector which exports resource metrics
func NewResourceMetricsCollector(provider stats.SummaryProvider) metrics.StableCollector {
return &resourceMetricsCollector{
provider: provider,
}
}
type resourceMetricsCollector struct {
metrics.BaseStableCollector
provider stats.SummaryProvider
}
// Check if resourceMetricsCollector implements necessary interface
var _ metrics.StableCollector = &resourceMetricsCollector{}
// DescribeWithStability implements metrics.StableCollector
func (rc *resourceMetricsCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
ch <- nodeCPUUsageDesc
ch <- nodeMemoryUsageDesc
ch <- containerCPUUsageDesc
ch <- containerMemoryUsageDesc
ch <- resouceScrapeResultDesc
}
// CollectWithStability implements metrics.StableCollector
// Since new containers are frequently created and removed, using the Gauge would
// leak metric collectors for containers or pods that no longer exist. Instead, implement
// custom collector in a way that only collects metrics for active containers.
func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- metrics.Metric) {
var errorCount float64
defer func() {
ch <- metrics.NewLazyConstMetric(resouceScrapeResultDesc, metrics.GaugeValue, errorCount)
}()
statsSummary, err := rc.provider.GetCPUAndMemoryStats()
if err != nil {
errorCount = 1
klog.Warningf("Error getting summary for resourceMetric prometheus endpoint: %v", err)
return
}
rc.collectNodeCPUMetrics(ch, statsSummary.Node)
rc.collectNodeMemoryMetrics(ch, statsSummary.Node)
for _, pod := range statsSummary.Pods {
for _, container := range pod.Containers {
rc.collectContainerCPUMetrics(ch, pod, container)
rc.collectContainerMemoryMetrics(ch, pod, container)
}
}
}
func (rc *resourceMetricsCollector) collectNodeCPUMetrics(ch chan<- metrics.Metric, s summary.NodeStats) {
if s.CPU == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
metrics.NewLazyConstMetric(nodeCPUUsageDesc, metrics.GaugeValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second)))
}
func (rc *resourceMetricsCollector) collectNodeMemoryMetrics(ch chan<- metrics.Metric, s summary.NodeStats) {
if s.Memory == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
metrics.NewLazyConstMetric(nodeMemoryUsageDesc, metrics.GaugeValue, float64(*s.Memory.WorkingSetBytes)))
}
func (rc *resourceMetricsCollector) collectContainerCPUMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) {
if s.CPU == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
metrics.NewLazyConstMetric(containerCPUUsageDesc, metrics.GaugeValue,
float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}
func (rc *resourceMetricsCollector) collectContainerMemoryMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) {
if s.Memory == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
metrics.NewLazyConstMetric(containerMemoryUsageDesc, metrics.GaugeValue,
float64(*s.Memory.WorkingSetBytes), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}

View File

@ -0,0 +1,189 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
type mockSummaryProvider struct {
mock.Mock
}
func (m *mockSummaryProvider) Get(updateStats bool) (*statsapi.Summary, error) {
args := m.Called(updateStats)
return args.Get(0).(*statsapi.Summary), args.Error(1)
}
func (m *mockSummaryProvider) GetCPUAndMemoryStats() (*statsapi.Summary, error) {
args := m.Called()
return args.Get(0).(*statsapi.Summary), args.Error(1)
}
func TestCollectResourceMetrics(t *testing.T) {
testTime := metav1.NewTime(time.Unix(2, 0)) // a static timestamp: 2000
interestedMetrics := []string{
"scrape_error",
"node_cpu_usage_seconds",
"node_memory_working_set_bytes",
"container_cpu_usage_seconds",
"container_memory_working_set_bytes",
}
tests := []struct {
name string
summary *statsapi.Summary
summaryErr error
expectedMetrics string
}{
{
name: "error getting summary",
summary: nil,
summaryErr: fmt.Errorf("failed to get summary"),
expectedMetrics: `
# HELP scrape_error [ALPHA] 1 if there was an error while getting container metrics, 0 otherwise
# TYPE scrape_error gauge
scrape_error 1
`,
},
{
name: "arbitrary node metrics",
summary: &statsapi.Summary{
Node: statsapi.NodeStats{
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
summaryErr: nil,
expectedMetrics: `
# HELP node_cpu_usage_seconds [ALPHA] Cumulative cpu time consumed by the node in core-seconds
# TYPE node_cpu_usage_seconds gauge
node_cpu_usage_seconds 10 2000
# HELP node_memory_working_set_bytes [ALPHA] Current working set of the node in bytes
# TYPE node_memory_working_set_bytes gauge
node_memory_working_set_bytes 1000 2000
# HELP scrape_error [ALPHA] 1 if there was an error while getting container metrics, 0 otherwise
# TYPE scrape_error gauge
scrape_error 0
`,
},
{
name: "arbitrary container metrics for different container, pods and namespaces",
summary: &statsapi.Summary{
Pods: []statsapi.PodStats{
{
PodRef: statsapi.PodReference{
Name: "pod_a",
Namespace: "namespace_a",
},
Containers: []statsapi.ContainerStats{
{
Name: "container_a",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
{
Name: "container_b",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
},
{
PodRef: statsapi.PodReference{
Name: "pod_b",
Namespace: "namespace_b",
},
Containers: []statsapi.ContainerStats{
{
Name: "container_a",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
},
},
},
summaryErr: nil,
expectedMetrics: `
# HELP scrape_error [ALPHA] 1 if there was an error while getting container metrics, 0 otherwise
# TYPE scrape_error gauge
scrape_error 0
# HELP container_cpu_usage_seconds [ALPHA] Cumulative cpu time consumed by the container in core-seconds
# TYPE container_cpu_usage_seconds gauge
container_cpu_usage_seconds{container="container_a",namespace="namespace_a",pod="pod_a"} 10 2000
container_cpu_usage_seconds{container="container_a",namespace="namespace_b",pod="pod_b"} 10 2000
container_cpu_usage_seconds{container="container_b",namespace="namespace_a",pod="pod_a"} 10 2000
# HELP container_memory_working_set_bytes [ALPHA] Current working set of the container in bytes
# TYPE container_memory_working_set_bytes gauge
container_memory_working_set_bytes{container="container_a",namespace="namespace_a",pod="pod_a"} 1000 2000
container_memory_working_set_bytes{container="container_a",namespace="namespace_b",pod="pod_b"} 1000 2000
container_memory_working_set_bytes{container="container_b",namespace="namespace_a",pod="pod_a"} 1000 2000
`,
},
}
for _, test := range tests {
tc := test
t.Run(tc.name, func(t *testing.T) {
provider := &mockSummaryProvider{}
provider.On("GetCPUAndMemoryStats").Return(tc.summary, tc.summaryErr)
collector := NewResourceMetricsCollector(provider)
if err := testutil.CustomCollectAndCompare(collector, strings.NewReader(tc.expectedMetrics), interestedMetrics...); err != nil {
t.Fatal(err)
}
})
}
}
func uint64Ptr(u uint64) *uint64 {
return &u
}

View File

@ -22,6 +22,7 @@ go_library(
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/apis/resourcemetrics/v1alpha1:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/metrics/collectors:go_default_library",
"//pkg/kubelet/prober:go_default_library",
"//pkg/kubelet/server/metrics:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",

View File

@ -128,6 +128,7 @@ func AuthzTestCases() []AuthzTestCase {
"/metrics/cadvisor": "metrics",
"/metrics/probes": "metrics",
"/metrics/resource/v1alpha1": "metrics",
"/metrics/resource": "metrics",
"/pods/": "proxy",
"/portForward/{podNamespace}/{podID}": "proxy",
"/portForward/{podNamespace}/{podID}/{uid}": "proxy",

View File

@ -38,6 +38,7 @@ import (
"github.com/google/cadvisor/metrics"
"google.golang.org/grpc"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -74,13 +75,13 @@ import (
)
const (
metricsPath = "/metrics"
cadvisorMetricsPath = "/metrics/cadvisor"
resourceMetricsPathPrefix = "/metrics/resource"
proberMetricsPath = "/metrics/probes"
specPath = "/spec/"
statsPath = "/stats/"
logsPath = "/logs/"
metricsPath = "/metrics"
cadvisorMetricsPath = "/metrics/cadvisor"
resourceMetricsPath = "/metrics/resource"
proberMetricsPath = "/metrics/probes"
specPath = "/spec/"
statsPath = "/stats/"
logsPath = "/logs/"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
@ -319,12 +320,19 @@ func (s *Server) InstallDefaultHandlers(enableCAdvisorJSONEndpoints bool) {
compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
// deprecated endpoint which will be removed in release 1.20.0+.
v1alpha1ResourceRegistry := compbasemetrics.NewKubeRegistry()
v1alpha1ResourceRegistry.CustomMustRegister(stats.NewPrometheusResourceMetricCollector(s.resourceAnalyzer, v1alpha1.Config()))
s.restfulCont.Handle(path.Join(resourceMetricsPathPrefix, v1alpha1.Version),
s.restfulCont.Handle(path.Join(resourceMetricsPath, v1alpha1.Version),
compbasemetrics.HandlerFor(v1alpha1ResourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
resourceRegistry := compbasemetrics.NewKubeRegistry()
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
s.restfulCont.Handle(resourceMetricsPath,
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
// prober metrics are exposed under a different endpoint
p := compbasemetrics.NewKubeRegistry()

View File

@ -60,7 +60,7 @@ func NewPrometheusResourceMetricCollector(provider SummaryProvider, config Resou
nil,
nil,
metrics.ALPHA,
""),
"1.18.0"),
}
}