Add /readyz for kube-scheduler

/readyz contains `sched-handler-sync`, `leaderElection` (when election is
enabled) and `shutdown` checks
This commit is contained in:
Eric Lin 2023-05-20 16:38:21 +00:00
parent 708180be69
commit 44c08fdbd5
4 changed files with 88 additions and 28 deletions

View File

@ -167,10 +167,12 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks []healthz.HealthChecker
var checks, readyzChecks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
readyzChecks = append(readyzChecks, cc.LeaderElection.WatchDog)
}
readyzChecks = append(readyzChecks, healthz.NewShutdownHealthz(ctx.Done()))
waitingForLeader := make(chan struct{})
isLeader := func() bool {
@ -184,9 +186,20 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}
}
handlerSyncReadyCh := make(chan struct{})
handlerSyncCheck := healthz.NamedCheck("sched-handler-sync", func(_ *http.Request) error {
select {
case <-handlerSyncReadyCh:
return nil
default:
}
return fmt.Errorf("waiting for handlers to sync")
})
readyzChecks = append(readyzChecks, handlerSyncCheck)
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
@ -214,6 +227,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
logger.Error(err, "waiting for handlers to sync")
}
close(handlerSyncReadyCh)
logger.V(3).Info("Handlers synced")
}
if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
@ -290,10 +304,12 @@ func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers inform
// newHealthEndpointsAndMetricsHandler creates an API health server from the config, and will also
// embed the metrics handler.
func newHealthEndpointsAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
// TODO: healthz check is deprecated, please use livez and readyz instead. Will be removed in the future.
func newHealthEndpointsAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, healthzChecks, readyzChecks []healthz.HealthChecker) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
healthz.InstallHandler(pathRecorderMux, checks...)
healthz.InstallHandler(pathRecorderMux, healthzChecks...)
healthz.InstallLivezHandler(pathRecorderMux)
healthz.InstallReadyzHandler(pathRecorderMux, readyzChecks...)
installMetricHandler(pathRecorderMux, informers, isLeader)
slis.SLIMetricsWithReset{}.Install(pathRecorderMux)

View File

@ -118,7 +118,7 @@ func (s *GenericAPIServer) AddLivezChecks(delay time.Duration, checks ...healthz
// that we can register that the api-server is no longer ready while we attempt to gracefully
// shutdown.
func (s *GenericAPIServer) addReadyzShutdownCheck(stopCh <-chan struct{}) error {
return s.AddReadyzChecks(shutdownCheck{stopCh})
return s.AddReadyzChecks(healthz.NewShutdownHealthz(stopCh))
}
// installHealthz creates the healthz endpoint for this server
@ -139,25 +139,6 @@ func (s *GenericAPIServer) installLivez() {
s.livezRegistry.installHandler(s.Handler.NonGoRestfulMux)
}
// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences
// for the apiserver.
type shutdownCheck struct {
StopCh <-chan struct{}
}
func (shutdownCheck) Name() string {
return "shutdown"
}
func (c shutdownCheck) Check(req *http.Request) error {
select {
case <-c.StopCh:
return fmt.Errorf("process is shutting down")
default:
}
return nil
}
// delayedHealthCheck wraps a health check which will not fail until the explicitly defined delay has elapsed. This
// is intended for use primarily for livez health checks.
func delayedHealthCheck(check healthz.HealthChecker, clock clock.Clock, delay time.Duration) healthz.HealthChecker {

View File

@ -105,6 +105,29 @@ func (i *informerSync) Name() string {
return "informer-sync"
}
type shutdown struct {
stopCh <-chan struct{}
}
// NewShutdownHealthz returns a new HealthChecker that will fail if the embedded channel is closed.
// This is intended to allow for graceful shutdown sequences.
func NewShutdownHealthz(stopCh <-chan struct{}) HealthChecker {
return &shutdown{stopCh}
}
func (s *shutdown) Name() string {
return "shutdown"
}
func (s *shutdown) Check(req *http.Request) error {
select {
case <-s.stopCh:
return fmt.Errorf("process is shutting down")
default:
}
return nil
}
func (i *informerSync) Check(_ *http.Request) error {
stopCh := make(chan struct{})
// Close stopCh to force checking if informers are synced now.

View File

@ -39,6 +39,7 @@ func TestHealthEndpoints(t *testing.T) {
t.Fatalf("Failed to start kube-apiserver server: %v", err)
}
defer server.TearDownFn()
apiserverConfig, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatalf("Failed to create config file: %v", err)
@ -50,30 +51,65 @@ func TestHealthEndpoints(t *testing.T) {
t.Fatalf("Failed to write config file: %v", err)
}
brokenConfigStr := strings.ReplaceAll(configStr, "127.0.0.1", "127.0.0.2")
brokenConfig, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatalf("Failed to create config file: %v", err)
}
if _, err := brokenConfig.WriteString(brokenConfigStr); err != nil {
t.Fatalf("Failed to write config file: %v", err)
}
defer func() {
_ = os.Remove(brokenConfig.Name())
}()
tests := []struct {
name string
path string
useBrokenConfig bool
wantResponseCode int
}{
{
"/healthz",
"/healthz",
false,
http.StatusOK,
},
{
"/livez",
"/livez",
false,
http.StatusOK,
},
{
"/livez with ping check",
"/livez/ping",
false,
http.StatusOK,
},
{
"/readyz",
"/readyz",
http.StatusNotFound,
false,
http.StatusOK,
},
{
"/readyz with sched-handler-sync",
"/readyz/sched-handler-sync",
false,
http.StatusOK,
},
{
"/readyz with shutdown",
"/readyz/shutdown",
false,
http.StatusOK,
},
{
"/readyz with broken apiserver",
"/readyz",
true,
http.StatusInternalServerError,
},
}
@ -82,9 +118,13 @@ func TestHealthEndpoints(t *testing.T) {
tt := tt
_, ctx := ktesting.NewTestContext(t)
configFile := apiserverConfig.Name()
if tt.useBrokenConfig {
configFile = brokenConfig.Name()
}
result, err := kubeschedulertesting.StartTestServer(
ctx,
[]string{"--kubeconfig", apiserverConfig.Name(), "--leader-elect=false", "--authorization-always-allow-paths", tt.path})
[]string{"--kubeconfig", configFile, "--leader-elect=false", "--authorization-always-allow-paths", tt.path})
if err != nil {
t.Fatalf("Failed to start kube-scheduler server: %v", err)
@ -99,7 +139,7 @@ func TestHealthEndpoints(t *testing.T) {
}
req, err := http.NewRequest("GET", base+tt.path, nil)
if err != nil {
t.Fatal(err)
t.Fatalf("failed to request: %v", err)
}
r, err := client.Do(req)
if err != nil {
@ -151,7 +191,7 @@ func startTestAPIServer(t *testing.T) (server *kubeapiservertesting.TestServer,
// start apiserver
server = kubeapiservertesting.StartTestServerOrDie(t, nil, []string{
"--token-auth-file", tokenFile.Name(),
"--authorization-mode", "RBAC",
"--authorization-mode", "AlwaysAllow",
}, framework.SharedEtcd())
apiserverConfig = fmt.Sprintf(`