use controller healthz

in KCM and CCM.
This commit is contained in:
Jiahui Feng 2021-08-31 14:01:06 -07:00 committed by Indeed
parent 15e0336de2
commit f6028618e2
3 changed files with 39 additions and 11 deletions

View File

@ -61,6 +61,7 @@ import (
genericcontrollermanager "k8s.io/controller-manager/app" genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller" "k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/clientbuilder" "k8s.io/controller-manager/pkg/clientbuilder"
controllerhealthz "k8s.io/controller-manager/pkg/healthz"
"k8s.io/controller-manager/pkg/informerfactory" "k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/controller-manager/pkg/leadermigration" "k8s.io/controller-manager/pkg/leadermigration"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -199,12 +200,13 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
checks = append(checks, electionChecker) checks = append(checks, electionChecker)
} }
healthzHandler := controllerhealthz.NewMutableHealthzHandler(checks...)
// Start the controller manager HTTP server // Start the controller manager HTTP server
// unsecuredMux is the handler for these controller *after* authn/authz filters have been applied // unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
var unsecuredMux *mux.PathRecorderMux var unsecuredMux *mux.PathRecorderMux
if c.SecureServing != nil { if c.SecureServing != nil {
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve // TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
@ -223,7 +225,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
klog.Fatalf("error building controller context: %v", err) klog.Fatalf("error building controller context: %v", err)
} }
controllerInitializers := initializersFunc(controllerContext.LoopMode) controllerInitializers := initializersFunc(controllerContext.LoopMode)
if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux); err != nil { if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
klog.Fatalf("error starting controllers: %v", err) klog.Fatalf("error starting controllers: %v", err)
} }
@ -541,7 +543,8 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
} }
// StartControllers starts a set of controllers with a specified ControllerContext // StartControllers starts a set of controllers with a specified ControllerContext
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error { func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials. // If this fails, just return here and fail since other controllers won't be able to get credentials.
if startSATokenController != nil { if startSATokenController != nil {
@ -556,6 +559,8 @@ func StartControllers(ctx ControllerContext, startSATokenController InitFunc, co
ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop) ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop)
} }
var controllerChecks []healthz.HealthChecker
for controllerName, initFn := range controllers { for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) { if !ctx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName) klog.Warningf("%q is disabled", controllerName)
@ -574,6 +579,7 @@ func StartControllers(ctx ControllerContext, startSATokenController InitFunc, co
klog.Warningf("Skipping %q", controllerName) klog.Warningf("Skipping %q", controllerName)
continue continue
} }
check := controllerhealthz.NamedPingChecker(controllerName)
if ctrl != nil { if ctrl != nil {
// check if the controller supports and requests a debugHandler // check if the controller supports and requests a debugHandler
// and it needs the unsecuredMux to mount the handler onto. // and it needs the unsecuredMux to mount the handler onto.
@ -584,10 +590,19 @@ func StartControllers(ctx ControllerContext, startSATokenController InitFunc, co
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler)) unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
} }
} }
if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
} }
}
}
controllerChecks = append(controllerChecks, check)
klog.Infof("Started %q", controllerName) klog.Infof("Started %q", controllerName)
} }
healthzHandler.AddHealthChecker(controllerChecks...)
return nil return nil
} }

View File

@ -51,6 +51,7 @@ import (
genericcontrollermanager "k8s.io/controller-manager/app" genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller" "k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/clientbuilder" "k8s.io/controller-manager/pkg/clientbuilder"
controllerhealthz "k8s.io/controller-manager/pkg/healthz"
"k8s.io/controller-manager/pkg/informerfactory" "k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/controller-manager/pkg/leadermigration" "k8s.io/controller-manager/pkg/leadermigration"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -156,9 +157,10 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
checks = append(checks, electionChecker) checks = append(checks, electionChecker)
} }
healthzHandler := controllerhealthz.NewMutableHealthzHandler(checks...)
// Start the controller manager HTTP server // Start the controller manager HTTP server
if c.SecureServing != nil { if c.SecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve // TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
@ -166,7 +168,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
} }
} }
if c.InsecureServing != nil { if c.InsecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}} insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
@ -182,7 +184,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
if err != nil { if err != nil {
klog.Fatalf("error building controller context: %v", err) klog.Fatalf("error building controller context: %v", err)
} }
if err := startControllers(cloud, controllerContext, c, ctx.Done(), controllerInitializers); err != nil { if err := startControllers(cloud, controllerContext, c, ctx.Done(), controllerInitializers, healthzHandler); err != nil {
klog.Fatalf("error running controllers: %v", err) klog.Fatalf("error running controllers: %v", err)
} }
} }
@ -259,13 +261,14 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
} }
// startControllers starts the cloud specific controller loops. // startControllers starts the cloud specific controller loops.
func startControllers(cloud cloudprovider.Interface, ctx genericcontrollermanager.ControllerContext, c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, controllers map[string]InitFunc) error { func startControllers(cloud cloudprovider.Interface, ctx genericcontrollermanager.ControllerContext, c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, controllers map[string]InitFunc, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
// Initialize the cloud provider with a reference to the clientBuilder // Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(c.ClientBuilder, stopCh) cloud.Initialize(c.ClientBuilder, stopCh)
// Set the informer on the user cloud object // Set the informer on the user cloud object
if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok { if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(c.SharedInformers) informerUserCloud.SetInformers(c.SharedInformers)
} }
var controllerChecks []healthz.HealthChecker
for controllerName, initFn := range controllers { for controllerName, initFn := range controllers {
if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) { if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
klog.Warningf("%q is disabled", controllerName) klog.Warningf("%q is disabled", controllerName)
@ -273,7 +276,7 @@ func startControllers(cloud cloudprovider.Interface, ctx genericcontrollermanage
} }
klog.V(1).Infof("Starting %q", controllerName) klog.V(1).Infof("Starting %q", controllerName)
_, started, err := initFn(ctx) ctrl, started, err := initFn(ctx)
if err != nil { if err != nil {
klog.Errorf("Error starting %q", controllerName) klog.Errorf("Error starting %q", controllerName)
return err return err
@ -282,11 +285,22 @@ func startControllers(cloud cloudprovider.Interface, ctx genericcontrollermanage
klog.Warningf("Skipping %q", controllerName) klog.Warningf("Skipping %q", controllerName)
continue continue
} }
check := controllerhealthz.NamedPingChecker(controllerName)
if ctrl != nil {
if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
}
}
}
controllerChecks = append(controllerChecks, check)
klog.Infof("Started %q", controllerName) klog.Infof("Started %q", controllerName)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
} }
healthzHandler.AddHealthChecker(controllerChecks...)
// If apiserver is not running we should wait for some time and fail only then. This is particularly // If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time. // important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil { if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil {

View File

@ -24,7 +24,6 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
apiserver "k8s.io/apiserver/pkg/server" apiserver "k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters" genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes" "k8s.io/apiserver/pkg/server/routes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
@ -56,9 +55,9 @@ func BuildHandlerChain(apiHandler http.Handler, authorizationInfo *apiserver.Aut
} }
// NewBaseHandler takes in CompletedConfig and returns a handler. // NewBaseHandler takes in CompletedConfig and returns a handler.
func NewBaseHandler(c *componentbaseconfig.DebuggingConfiguration, checks ...healthz.HealthChecker) *mux.PathRecorderMux { func NewBaseHandler(c *componentbaseconfig.DebuggingConfiguration, healthzHandler http.Handler) *mux.PathRecorderMux {
mux := mux.NewPathRecorderMux("controller-manager") mux := mux.NewPathRecorderMux("controller-manager")
healthz.InstallHandler(mux, checks...) mux.Handle("/healthz", healthzHandler)
if c.EnableProfiling { if c.EnableProfiling {
routes.Profiling{}.Install(mux) routes.Profiling{}.Install(mux)
if c.EnableContentionProfiling { if c.EnableContentionProfiling {