Merge pull request #94866 from smarterclayton/scheduling_metrics

scheduler: Implement resource metrics at /metrics/resources
This commit is contained in:
Kubernetes Prow Robot 2020-11-12 18:36:23 -08:00 committed by GitHub
commit cd21a1240a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 992 additions and 48 deletions

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/runtime:go_default_library",
"//pkg/scheduler/metrics/resources:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
@ -26,6 +27,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",

View File

@ -20,7 +20,6 @@ package app
import (
"context"
"fmt"
"io"
"net/http"
"os"
goruntime "runtime"
@ -36,6 +35,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics/resources"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
@ -152,22 +153,34 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// Start up the healthz server.
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
@ -184,7 +197,10 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
@ -200,6 +216,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
@ -218,25 +235,23 @@ func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz
return handler
}
func installMetricHandler(pathRecorderMux *mux.PathRecorderMux) {
func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func() bool) {
configz.InstallHandler(pathRecorderMux)
//lint:ignore SA1019 See the Metrics Stability Migration KEP
defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
if req.Method == "DELETE" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
io.WriteString(w, "metrics reset\n")
pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())
pathRecorderMux.HandleFunc("/metrics/resources", func(w http.ResponseWriter, req *http.Request) {
if !isLeader() {
return
}
defaultMetricsHandler(w, req)
resourceMetricsHandler.ServeHTTP(w, req)
})
}
// newMetricsHandler builds a metrics server from the config.
func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration) http.Handler {
func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
installMetricHandler(pathRecorderMux)
installMetricHandler(pathRecorderMux, informers, isLeader)
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
if config.EnableContentionProfiling {
@ -250,11 +265,11 @@ func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration) h
// newHealthzHandler creates a healthz server from the config, and will also
// embed the metrics handler if the healthz and metrics address configurations
// are the same.
func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, separateMetrics bool, checks ...healthz.HealthChecker) http.Handler {
func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, separateMetrics bool, checks ...healthz.HealthChecker) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
healthz.InstallHandler(pathRecorderMux, checks...)
if !separateMetrics {
installMetricHandler(pathRecorderMux)
installMetricHandler(pathRecorderMux, informers, isLeader)
}
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)

View File

@ -28,39 +28,23 @@ import (
"k8s.io/kubernetes/pkg/features"
)
// addResourceList adds the resources in newList to list
func addResourceList(list, newList v1.ResourceList) {
for name, quantity := range newList {
if value, ok := list[name]; !ok {
list[name] = quantity.DeepCopy()
} else {
value.Add(quantity)
list[name] = value
}
}
}
// maxResourceList sets list to the greater of list/newList for every resource
// either list
func maxResourceList(list, new v1.ResourceList) {
for name, quantity := range new {
if value, ok := list[name]; !ok {
list[name] = quantity.DeepCopy()
continue
} else {
if quantity.Cmp(value) > 0 {
list[name] = quantity.DeepCopy()
}
}
}
}
// PodRequestsAndLimits returns a dictionary of all defined resources summed up for all
// containers of the pod. If PodOverhead feature is enabled, pod overhead is added to the
// total container resource requests and to the total container limits which have a
// non-zero quantity.
func PodRequestsAndLimits(pod *v1.Pod) (reqs, limits v1.ResourceList) {
reqs, limits = v1.ResourceList{}, v1.ResourceList{}
return PodRequestsAndLimitsReuse(pod, nil, nil)
}
// PodRequestsAndLimitsReuse returns a dictionary of all defined resources summed up for all
// containers of the pod. If PodOverhead feature is enabled, pod overhead is added to the
// total container resource requests and to the total container limits which have a
// non-zero quantity. The caller may avoid allocations of resource lists by passing
// a requests and limits list to the function, which will be cleared before use.
func PodRequestsAndLimitsReuse(pod *v1.Pod, reuseReqs, reuseLimits v1.ResourceList) (reqs, limits v1.ResourceList) {
// attempt to reuse the maps if passed, or allocate otherwise
reqs, limits = reuseOrClearResourceList(reuseReqs), reuseOrClearResourceList(reuseLimits)
for _, container := range pod.Spec.Containers {
addResourceList(reqs, container.Resources.Requests)
addResourceList(limits, container.Resources.Limits)
@ -72,7 +56,7 @@ func PodRequestsAndLimits(pod *v1.Pod) (reqs, limits v1.ResourceList) {
}
// if PodOverhead feature is supported, add overhead for running a pod
// to the sum of reqeuests and to non-zero limits:
// to the sum of requests and to non-zero limits:
if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
addResourceList(reqs, pod.Spec.Overhead)
@ -87,6 +71,39 @@ func PodRequestsAndLimits(pod *v1.Pod) (reqs, limits v1.ResourceList) {
return
}
// reuseOrClearResourceList is a helper for avoiding excessive allocations of
// resource lists within the inner loop of resource calculations.
func reuseOrClearResourceList(reuse v1.ResourceList) v1.ResourceList {
if reuse == nil {
return make(v1.ResourceList, 4)
}
for k := range reuse {
delete(reuse, k)
}
return reuse
}
// addResourceList adds the resources in newList to list.
func addResourceList(list, newList v1.ResourceList) {
for name, quantity := range newList {
if value, ok := list[name]; !ok {
list[name] = quantity.DeepCopy()
} else {
value.Add(quantity)
list[name] = value
}
}
}
// maxResourceList sets list to the greater of list/newList for every resource in newList
func maxResourceList(list, newList v1.ResourceList) {
for name, quantity := range newList {
if value, ok := list[name]; !ok || quantity.Cmp(value) > 0 {
list[name] = quantity.DeepCopy()
}
}
}
// GetResourceRequestQuantity finds and returns the request quantity for a specific resource.
func GetResourceRequestQuantity(pod *v1.Pod, resourceName v1.ResourceName) resource.Quantity {
requestQuantity := resource.Quantity{}

View File

@ -26,7 +26,10 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/scheduler/metrics/resources:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,46 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["resources.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/metrics/resources",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/resource:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["resources_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
],
)

View File

@ -0,0 +1,201 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package resources provides a metrics collector that reports the
// resource consumption (requests and limits) of the pods in the cluster
// as the scheduler and kubelet would interpret it.
package resources
import (
"net/http"
"strconv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/component-base/metrics"
v1resource "k8s.io/kubernetes/pkg/api/v1/resource"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
type resourceLifecycleDescriptors struct {
total *metrics.Desc
}
func (d resourceLifecycleDescriptors) Describe(ch chan<- *metrics.Desc) {
ch <- d.total
}
type resourceMetricsDescriptors struct {
requests resourceLifecycleDescriptors
limits resourceLifecycleDescriptors
}
func (d resourceMetricsDescriptors) Describe(ch chan<- *metrics.Desc) {
d.requests.Describe(ch)
d.limits.Describe(ch)
}
var podResourceDesc = resourceMetricsDescriptors{
requests: resourceLifecycleDescriptors{
total: metrics.NewDesc("kube_pod_resource_request",
"Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.",
[]string{"namespace", "pod", "node", "scheduler", "priority", "resource", "unit"},
nil,
metrics.ALPHA,
""),
},
limits: resourceLifecycleDescriptors{
total: metrics.NewDesc("kube_pod_resource_limit",
"Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.",
[]string{"namespace", "pod", "node", "scheduler", "priority", "resource", "unit"},
nil,
metrics.ALPHA,
""),
},
}
// Handler creates a collector from the provided podLister and returns an http.Handler that
// will report the requested metrics in the prometheus format. It does not include any other
// metrics.
func Handler(podLister corelisters.PodLister) http.Handler {
collector := NewPodResourcesMetricsCollector(podLister)
registry := metrics.NewKubeRegistry()
registry.CustomMustRegister(collector)
return metrics.HandlerWithReset(registry, metrics.HandlerOpts{})
}
// Check if resourceMetricsCollector implements necessary interface
var _ metrics.StableCollector = &podResourceCollector{}
// NewPodResourcesMetricsCollector registers a O(pods) cardinality metric that
// reports the current resources requested by all pods on the cluster within
// the Kubernetes resource model. Metrics are broken down by pod, node, resource,
// and phase of lifecycle. Each pod returns two series per resource - one for
// their aggregate usage (required to schedule) and one for their phase specific
// usage. This allows admins to assess the cost per resource at different phases
// of startup and compare to actual resource usage.
func NewPodResourcesMetricsCollector(podLister corelisters.PodLister) metrics.StableCollector {
return &podResourceCollector{
lister: podLister,
}
}
type podResourceCollector struct {
metrics.BaseStableCollector
lister corelisters.PodLister
}
func (c *podResourceCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
podResourceDesc.Describe(ch)
}
func (c *podResourceCollector) CollectWithStability(ch chan<- metrics.Metric) {
pods, err := c.lister.List(labels.Everything())
if err != nil {
return
}
reuseReqs, reuseLimits := make(v1.ResourceList, 4), make(v1.ResourceList, 4)
for _, p := range pods {
reqs, limits, terminal := podRequestsAndLimitsByLifecycle(p, reuseReqs, reuseLimits)
if terminal {
// terminal pods are excluded from resource usage calculations
continue
}
for _, t := range []struct {
desc resourceLifecycleDescriptors
total v1.ResourceList
}{
{
desc: podResourceDesc.requests,
total: reqs,
},
{
desc: podResourceDesc.limits,
total: limits,
},
} {
for resourceName, val := range t.total {
var unitName string
switch resourceName {
case v1.ResourceCPU:
unitName = "cores"
case v1.ResourceMemory:
unitName = "bytes"
case v1.ResourceStorage:
unitName = "bytes"
case v1.ResourceEphemeralStorage:
unitName = "bytes"
default:
switch {
case v1helper.IsHugePageResourceName(resourceName):
unitName = "bytes"
case v1helper.IsAttachableVolumeResourceName(resourceName):
unitName = "integer"
}
}
var priority string
if p.Spec.Priority != nil {
priority = strconv.FormatInt(int64(*p.Spec.Priority), 10)
}
recordMetricWithUnit(ch, t.desc.total, p.Namespace, p.Name, p.Spec.NodeName, p.Spec.SchedulerName, priority, resourceName, unitName, val)
}
}
}
}
func recordMetricWithUnit(
ch chan<- metrics.Metric,
desc *metrics.Desc,
namespace, name, nodeName, schedulerName, priority string,
resourceName v1.ResourceName,
unit string,
val resource.Quantity,
) {
if val.IsZero() {
return
}
ch <- metrics.NewLazyConstMetric(desc, metrics.GaugeValue,
val.AsApproximateFloat64(),
namespace, name, nodeName, schedulerName, priority, string(resourceName), unit,
)
}
// podRequestsAndLimitsByLifecycle returns a dictionary of all defined resources summed up for all
// containers of the pod. If PodOverhead feature is enabled, pod overhead is added to the
// total container resource requests and to the total container limits which have a
// non-zero quantity. The caller may avoid allocations of resource lists by passing
// a requests and limits list to the function, which will be cleared before use.
// This method is the same as v1resource.PodRequestsAndLimits but avoids allocating in several
// scenarios for efficiency.
func podRequestsAndLimitsByLifecycle(pod *v1.Pod, reuseReqs, reuseLimits v1.ResourceList) (reqs, limits v1.ResourceList, terminal bool) {
switch {
case len(pod.Spec.NodeName) == 0:
// unscheduled pods cannot be terminal
case pod.Status.Phase == v1.PodSucceeded, pod.Status.Phase == v1.PodFailed:
terminal = true
// TODO: resolve https://github.com/kubernetes/kubernetes/issues/96515 and add a condition here
// for checking that terminal state
}
if terminal {
return
}
reqs, limits = v1resource.PodRequestsAndLimitsReuse(pod, reuseReqs, reuseLimits)
return
}

View File

@ -0,0 +1,544 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package resources provides a metrics collector that reports the
// resource consumption (requests and limits) of the pods in the cluster
// as the scheduler and kubelet would interpret it.
package resources
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
)
type fakePodLister struct {
pods []*v1.Pod
}
func (l *fakePodLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
return l.pods, nil
}
func (l *fakePodLister) Pods(namespace string) corelisters.PodNamespaceLister {
panic("not implemented")
}
func Test_podResourceCollector_Handler(t *testing.T) {
h := Handler(&fakePodLister{pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
NodeName: "node-one",
InitContainers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("2"),
"custom": resource.MustParse("3"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("1G"),
"custom": resource.MustParse("5"),
},
}},
},
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("1"),
"custom": resource.MustParse("0"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("2G"),
"custom": resource.MustParse("6"),
},
}},
},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{Type: v1.PodInitialized, Status: v1.ConditionTrue},
},
},
},
}})
r := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/metrics/resources", nil)
if err != nil {
t.Fatal(err)
}
h.ServeHTTP(r, req)
expected := `# HELP kube_pod_resource_limit [ALPHA] Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_limit gauge
kube_pod_resource_limit{namespace="test",node="node-one",pod="foo",priority="",resource="custom",scheduler="",unit=""} 6
kube_pod_resource_limit{namespace="test",node="node-one",pod="foo",priority="",resource="memory",scheduler="",unit="bytes"} 2e+09
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="node-one",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 2
kube_pod_resource_request{namespace="test",node="node-one",pod="foo",priority="",resource="custom",scheduler="",unit=""} 3
`
out := r.Body.String()
if expected != out {
t.Fatal(out)
}
}
func Test_podResourceCollector_CollectWithStability(t *testing.T) {
int32p := func(i int32) *int32 {
return &i
}
tests := []struct {
name string
pods []*v1.Pod
expected string
}{
{},
{
name: "no containers",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
},
},
},
{
name: "no resources",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
InitContainers: []v1.Container{},
Containers: []v1.Container{},
},
},
},
},
{
name: "request only",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
},
},
expected: `
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 1
`,
},
{
name: "limits only",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Limits: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
},
},
expected: `
# HELP kube_pod_resource_limit [ALPHA] Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_limit gauge
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 1
`,
},
{
name: "terminal pods are excluded",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo-unscheduled-succeeded"},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
// until node name is set, phase is ignored
Status: v1.PodStatus{Phase: v1.PodSucceeded},
},
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo-succeeded"},
Spec: v1.PodSpec{
NodeName: "node-one",
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
Status: v1.PodStatus{Phase: v1.PodSucceeded},
},
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo-failed"},
Spec: v1.PodSpec{
NodeName: "node-one",
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
Status: v1.PodStatus{Phase: v1.PodFailed},
},
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo-unknown"},
Spec: v1.PodSpec{
NodeName: "node-one",
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
Status: v1.PodStatus{Phase: v1.PodUnknown},
},
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo-pending"},
Spec: v1.PodSpec{
NodeName: "node-one",
InitContainers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{Type: "ArbitraryCondition", Status: v1.ConditionTrue},
},
},
},
},
expected: `
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="",pod="foo-unscheduled-succeeded",priority="",resource="cpu",scheduler="",unit="cores"} 1
kube_pod_resource_request{namespace="test",node="node-one",pod="foo-pending",priority="",resource="cpu",scheduler="",unit="cores"} 1
kube_pod_resource_request{namespace="test",node="node-one",pod="foo-unknown",priority="",resource="cpu",scheduler="",unit="cores"} 1
`,
},
{
name: "zero resource should be excluded",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("0"),
"custom": resource.MustParse("0"),
"test.com/custom-metric": resource.MustParse("0"),
},
Limits: v1.ResourceList{
"cpu": resource.MustParse("0"),
"custom": resource.MustParse("0"),
"test.com/custom-metric": resource.MustParse("0"),
},
}},
},
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("0"),
"custom": resource.MustParse("0"),
"test.com/custom-metric": resource.MustParse("0"),
},
Limits: v1.ResourceList{
"cpu": resource.MustParse("0"),
"custom": resource.MustParse("0"),
"test.com/custom-metric": resource.MustParse("0"),
},
}},
},
},
},
},
expected: ``,
},
{
name: "optional field labels",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
SchedulerName: "default-scheduler",
Priority: int32p(0),
NodeName: "node-one",
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{Requests: v1.ResourceList{"cpu": resource.MustParse("1")}}},
},
},
},
},
expected: `
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="node-one",pod="foo",priority="0",resource="cpu",scheduler="default-scheduler",unit="cores"} 1
`,
},
{
name: "init containers and regular containers when initialized",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
NodeName: "node-one",
InitContainers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("2"),
"custom": resource.MustParse("3"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("1G"),
"custom": resource.MustParse("5"),
},
}},
},
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("1"),
"custom": resource.MustParse("0"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("2G"),
"custom": resource.MustParse("6"),
},
}},
},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{Type: v1.PodInitialized, Status: v1.ConditionTrue},
},
},
},
},
expected: `
# HELP kube_pod_resource_limit [ALPHA] Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_limit gauge
kube_pod_resource_limit{namespace="test",node="node-one",pod="foo",priority="",resource="custom",scheduler="",unit=""} 6
kube_pod_resource_limit{namespace="test",node="node-one",pod="foo",priority="",resource="memory",scheduler="",unit="bytes"} 2e+09
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="node-one",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 2
kube_pod_resource_request{namespace="test",node="node-one",pod="foo",priority="",resource="custom",scheduler="",unit=""} 3
`,
},
{
name: "init containers and regular containers when initializing",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
NodeName: "node-one",
InitContainers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("2"),
"custom": resource.MustParse("3"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("1G"),
"custom": resource.MustParse("5"),
},
}},
},
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("1"),
"custom": resource.MustParse("0"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("2G"),
"custom": resource.MustParse("6"),
},
}},
},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{Type: "AnotherCondition", Status: v1.ConditionUnknown},
{Type: v1.PodInitialized, Status: v1.ConditionFalse},
},
},
},
},
expected: `
# HELP kube_pod_resource_limit [ALPHA] Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_limit gauge
kube_pod_resource_limit{namespace="test",node="node-one",pod="foo",priority="",resource="custom",scheduler="",unit=""} 6
kube_pod_resource_limit{namespace="test",node="node-one",pod="foo",priority="",resource="memory",scheduler="",unit="bytes"} 2e+09
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="node-one",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 2
kube_pod_resource_request{namespace="test",node="node-one",pod="foo",priority="",resource="custom",scheduler="",unit=""} 3
`,
},
{
name: "aggregate container requests and limits",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{"cpu": resource.MustParse("1")},
Limits: v1.ResourceList{"cpu": resource.MustParse("2")},
}},
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{"memory": resource.MustParse("1G")},
Limits: v1.ResourceList{"memory": resource.MustParse("2G")},
}},
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{"cpu": resource.MustParse("0.5")},
Limits: v1.ResourceList{"cpu": resource.MustParse("1.25")},
}},
{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{"memory": resource.MustParse("2G")},
}},
},
},
},
},
expected: `
# HELP kube_pod_resource_limit [ALPHA] Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_limit gauge
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 3.25
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="memory",scheduler="",unit="bytes"} 4e+09
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 1.5
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="memory",scheduler="",unit="bytes"} 1e+09
`,
},
{
name: "overhead added to requests and limits",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
Overhead: v1.ResourceList{
"cpu": resource.MustParse("0.25"),
"memory": resource.MustParse("0.75G"),
"custom": resource.MustParse("0.5"),
},
InitContainers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("2"),
"custom": resource.MustParse("3"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("1G"),
"custom": resource.MustParse("5"),
},
}},
},
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("1"),
"custom": resource.MustParse("0"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("2G"),
"custom": resource.MustParse("6"),
},
}},
},
},
},
},
expected: `
# HELP kube_pod_resource_limit [ALPHA] Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_limit gauge
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="custom",scheduler="",unit=""} 6.5
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="memory",scheduler="",unit="bytes"} 2.75e+09
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="cpu",scheduler="",unit="cores"} 2.25
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="custom",scheduler="",unit=""} 3.5
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="memory",scheduler="",unit="bytes"} 7.5e+08
`,
},
{
name: "units for standard resources",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"storage": resource.MustParse("5"),
"ephemeral-storage": resource.MustParse("6"),
},
Limits: v1.ResourceList{
"hugepages-x": resource.MustParse("1"),
"hugepages-": resource.MustParse("2"),
"attachable-volumes-aws": resource.MustParse("3"),
"attachable-volumes-": resource.MustParse("4"),
},
}},
},
},
},
},
expected: `
# HELP kube_pod_resource_limit [ALPHA] Resources limit for workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_limit gauge
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="attachable-volumes-",scheduler="",unit="integer"} 4
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="attachable-volumes-aws",scheduler="",unit="integer"} 3
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="hugepages-",scheduler="",unit="bytes"} 2
kube_pod_resource_limit{namespace="test",node="",pod="foo",priority="",resource="hugepages-x",scheduler="",unit="bytes"} 1
# HELP kube_pod_resource_request [ALPHA] Resources requested by workloads on the cluster, broken down by pod. This shows the resource usage the scheduler and kubelet expect per pod for resources along with the unit for the resource if any.
# TYPE kube_pod_resource_request gauge
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="ephemeral-storage",scheduler="",unit="bytes"} 6
kube_pod_resource_request{namespace="test",node="",pod="foo",priority="",resource="storage",scheduler="",unit="bytes"} 5
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewPodResourcesMetricsCollector(&fakePodLister{pods: tt.pods})
registry := metrics.NewKubeRegistry()
registry.CustomMustRegister(c)
err := testutil.GatherAndCompare(registry, strings.NewReader(tt.expected))
if err != nil {
t.Fatal(err)
}
})
}
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"math"
"math/big"
"strconv"
"strings"
@ -442,6 +443,36 @@ func (q *Quantity) CanonicalizeBytes(out []byte) (result, suffix []byte) {
}
}
// AsApproximateFloat64 returns a float64 representation of the quantity which may
// lose precision. If the value of the quantity is outside the range of a float64
// +Inf/-Inf will be returned.
func (q *Quantity) AsApproximateFloat64() float64 {
var base float64
var exponent int
if q.d.Dec != nil {
base, _ = big.NewFloat(0).SetInt(q.d.Dec.UnscaledBig()).Float64()
exponent = int(-q.d.Dec.Scale())
} else {
base = float64(q.i.value)
exponent = int(q.i.scale)
}
if exponent == 0 {
return base
}
// multiply by the appropriate exponential scale
switch q.Format {
case DecimalExponent, DecimalSI:
return base * math.Pow10(exponent)
default:
// fast path for exponents that can fit in 64 bits
if exponent > 0 && exponent < 7 {
return base * float64(int64(1)<<(exponent*10))
}
return base * math.Pow(2, float64(exponent*10))
}
}
// AsInt64 returns a representation of the current value as an int64 if a fast conversion
// is possible. If false is returned, callers must use the inf.Dec form of this quantity.
func (q *Quantity) AsInt64() (int64, bool) {

View File

@ -18,6 +18,8 @@ package resource
import (
"encoding/json"
"fmt"
"math"
"math/rand"
"strings"
"testing"
@ -1177,6 +1179,77 @@ func TestNegateRoundTrip(t *testing.T) {
}
}
}
func TestQuantityAsApproximateFloat64(t *testing.T) {
table := []struct {
in Quantity
out float64
}{
{decQuantity(0, 0, DecimalSI), 0.0},
{decQuantity(0, 0, DecimalExponent), 0.0},
{decQuantity(0, 0, BinarySI), 0.0},
{decQuantity(1, 0, DecimalSI), 1},
{decQuantity(1, 0, DecimalExponent), 1},
{decQuantity(1, 0, BinarySI), 1},
// Binary suffixes
{decQuantity(1024, 0, BinarySI), 1024},
{decQuantity(8*1024, 0, BinarySI), 8 * 1024},
{decQuantity(7*1024*1024, 0, BinarySI), 7 * 1024 * 1024},
{decQuantity(7*1024*1024, 1, BinarySI), (7 * 1024 * 1024) * 1024},
{decQuantity(7*1024*1024, 4, BinarySI), (7 * 1024 * 1024) * (1024 * 1024 * 1024 * 1024)},
{decQuantity(7*1024*1024, 8, BinarySI), (7 * 1024 * 1024) * (1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024)},
{decQuantity(7*1024*1024, -1, BinarySI), (7 * 1024 * 1024) / float64(1024)},
{decQuantity(7*1024*1024, -8, BinarySI), (7 * 1024 * 1024) / float64(1024*1024*1024*1024*1024*1024*1024*1024)},
{decQuantity(1024, 0, DecimalSI), 1024},
{decQuantity(8*1024, 0, DecimalSI), 8 * 1024},
{decQuantity(7*1024*1024, 0, DecimalSI), 7 * 1024 * 1024},
{decQuantity(7*1024*1024, 1, DecimalSI), (7 * 1024 * 1024) * 10},
{decQuantity(7*1024*1024, 4, DecimalSI), (7 * 1024 * 1024) * 10000},
{decQuantity(7*1024*1024, 8, DecimalSI), (7 * 1024 * 1024) * 100000000},
{decQuantity(7*1024*1024, -1, DecimalSI), (7 * 1024 * 1024) * math.Pow10(-1)}, // '* Pow10' and '/ float(10)' do not round the same way
{decQuantity(7*1024*1024, -8, DecimalSI), (7 * 1024 * 1024) / float64(100000000)},
{decQuantity(1024, 0, DecimalExponent), 1024},
{decQuantity(8*1024, 0, DecimalExponent), 8 * 1024},
{decQuantity(7*1024*1024, 0, DecimalExponent), 7 * 1024 * 1024},
{decQuantity(7*1024*1024, 1, DecimalExponent), (7 * 1024 * 1024) * 10},
{decQuantity(7*1024*1024, 4, DecimalExponent), (7 * 1024 * 1024) * 10000},
{decQuantity(7*1024*1024, 8, DecimalExponent), (7 * 1024 * 1024) * 100000000},
{decQuantity(7*1024*1024, -1, DecimalExponent), (7 * 1024 * 1024) * math.Pow10(-1)}, // '* Pow10' and '/ float(10)' do not round the same way
{decQuantity(7*1024*1024, -8, DecimalExponent), (7 * 1024 * 1024) / float64(100000000)},
// very large numbers
{Quantity{d: maxAllowed, Format: DecimalSI}, math.MaxInt64},
{Quantity{d: maxAllowed, Format: BinarySI}, math.MaxInt64},
{decQuantity(12, 18, DecimalSI), 1.2e19},
// infinities caused due to float64 overflow
{decQuantity(12, 500, DecimalSI), math.Inf(0)},
{decQuantity(-12, 500, DecimalSI), math.Inf(-1)},
}
for _, item := range table {
t.Run(fmt.Sprintf("%s %s", item.in.Format, item.in.String()), func(t *testing.T) {
out := item.in.AsApproximateFloat64()
if out != item.out {
t.Fatalf("expected %v, got %v", item.out, out)
}
if item.in.d.Dec != nil {
if i, ok := item.in.AsInt64(); ok {
q := intQuantity(i, 0, item.in.Format)
out := q.AsApproximateFloat64()
if out != item.out {
t.Fatalf("as int quantity: expected %v, got %v", item.out, out)
}
}
}
})
}
}
func benchmarkQuantities() []Quantity {
return []Quantity{
intQuantity(1024*1024*1024, 0, BinarySI),
@ -1346,3 +1419,15 @@ func BenchmarkQuantityCmp(b *testing.B) {
}
b.StopTimer()
}
func BenchmarkQuantityAsApproximateFloat64(b *testing.B) {
values := benchmarkQuantities()
b.ResetTimer()
for i := 0; i < b.N; i++ {
q := values[i%len(values)]
if q.AsApproximateFloat64() == -1 {
b.Fatal(q)
}
}
b.StopTimer()
}