scheduler: Implement resource metrics at /metrics/resources

Implement pod resource metrics as described in KEP 1916. The new
`/metrics/resources` endpoint is exposed on the active scheduler
and reports kube_pod_resources* metrics that present the effective
requests and limits for all resources on the pods as calculated by
the scheduler and kubelet. This allows administrators using the
system to quickly perform resource consumption, reservation, and
pending utilization calculations when those metrics are read.
Because metrics calculation is on-demand, there is no additional
resource consumption incurred by the scheduler unless the endpoint
is scraped.
This commit is contained in:
Clayton Coleman
2020-09-17 16:08:04 -04:00
parent c06c1121d1
commit 264496cc41
7 changed files with 876 additions and 48 deletions

View File

@@ -16,6 +16,7 @@ go_library(
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/runtime:go_default_library",
"//pkg/scheduler/metrics/resources:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
@@ -26,6 +27,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",

View File

@@ -20,7 +20,6 @@ package app
import (
"context"
"fmt"
"io"
"net/http"
"os"
goruntime "runtime"
@@ -36,6 +35,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
@@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics/resources"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
@@ -160,22 +161,34 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// Start up the healthz server.
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
@@ -192,7 +205,10 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
@@ -208,6 +224,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
@@ -226,25 +243,23 @@ func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz
return handler
}
func installMetricHandler(pathRecorderMux *mux.PathRecorderMux) {
func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func() bool) {
configz.InstallHandler(pathRecorderMux)
//lint:ignore SA1019 See the Metrics Stability Migration KEP
defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
if req.Method == "DELETE" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
io.WriteString(w, "metrics reset\n")
pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())
pathRecorderMux.HandleFunc("/metrics/resources", func(w http.ResponseWriter, req *http.Request) {
if !isLeader() {
return
}
defaultMetricsHandler(w, req)
resourceMetricsHandler.ServeHTTP(w, req)
})
}
// newMetricsHandler builds a metrics server from the config.
func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration) http.Handler {
func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
installMetricHandler(pathRecorderMux)
installMetricHandler(pathRecorderMux, informers, isLeader)
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
if config.EnableContentionProfiling {
@@ -258,11 +273,11 @@ func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration) h
// newHealthzHandler creates a healthz server from the config, and will also
// embed the metrics handler if the healthz and metrics address configurations
// are the same.
func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, separateMetrics bool, checks ...healthz.HealthChecker) http.Handler {
func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, separateMetrics bool, checks ...healthz.HealthChecker) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
healthz.InstallHandler(pathRecorderMux, checks...)
if !separateMetrics {
installMetricHandler(pathRecorderMux)
installMetricHandler(pathRecorderMux, informers, isLeader)
}
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)