From 708180be6961a29f02d1b738a37695eea2b9b24d Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Sat, 20 May 2023 16:04:53 +0000 Subject: [PATCH 1/2] Add /livez to kube-scheduler Health endpoint `/livez` only contains ping check. --- cmd/kube-scheduler/app/server.go | 7 +- .../scheduler/serving/healthcheck_test.go | 200 ++++++++++++++++++ .../scheduler/serving/main_test.go | 27 +++ 3 files changed, 231 insertions(+), 3 deletions(-) create mode 100644 test/integration/scheduler/serving/healthcheck_test.go create mode 100644 test/integration/scheduler/serving/main_test.go diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 4a7e171ae30..9d75834fc4e 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -186,7 +186,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * // Start up the healthz server. if cc.SecureServing != nil { - handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) + handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above @@ -288,11 +288,12 @@ func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers inform }) } -// newHealthzAndMetricsHandler creates a healthz server from the config, and will also +// newHealthEndpointsAndMetricsHandler creates an API health server from the config, and will also // embed the metrics handler. -func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler { +func newHealthEndpointsAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler { pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler") healthz.InstallHandler(pathRecorderMux, checks...) + healthz.InstallLivezHandler(pathRecorderMux) installMetricHandler(pathRecorderMux, informers, isLeader) slis.SLIMetricsWithReset{}.Install(pathRecorderMux) diff --git a/test/integration/scheduler/serving/healthcheck_test.go b/test/integration/scheduler/serving/healthcheck_test.go new file mode 100644 index 00000000000..47104970d64 --- /dev/null +++ b/test/integration/scheduler/serving/healthcheck_test.go @@ -0,0 +1,200 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serving + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "net/http" + "os" + "path" + "strings" + "testing" + + "k8s.io/klog/v2/ktesting" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + kubeschedulertesting "k8s.io/kubernetes/cmd/kube-scheduler/app/testing" + "k8s.io/kubernetes/test/integration/framework" +) + +func TestHealthEndpoints(t *testing.T) { + server, configStr, _, err := startTestAPIServer(t) + if err != nil { + t.Fatalf("Failed to start kube-apiserver server: %v", err) + } + defer server.TearDownFn() + apiserverConfig, err := os.CreateTemp("", "kubeconfig") + if err != nil { + t.Fatalf("Failed to create config file: %v", err) + } + defer func() { + _ = os.Remove(apiserverConfig.Name()) + }() + if _, err = apiserverConfig.WriteString(configStr); err != nil { + t.Fatalf("Failed to write config file: %v", err) + } + + tests := []struct { + name string + path string + wantResponseCode int + }{ + { + "/healthz", + "/healthz", + http.StatusOK, + }, + { + "/livez", + "/livez", + http.StatusOK, + }, + { + "/livez with ping check", + "/livez/ping", + http.StatusOK, + }, + { + "/readyz", + "/readyz", + http.StatusNotFound, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt := tt + _, ctx := ktesting.NewTestContext(t) + + result, err := kubeschedulertesting.StartTestServer( + ctx, + []string{"--kubeconfig", apiserverConfig.Name(), "--leader-elect=false", "--authorization-always-allow-paths", tt.path}) + + if err != nil { + t.Fatalf("Failed to start kube-scheduler server: %v", err) + } + if result.TearDownFn != nil { + defer result.TearDownFn() + } + + client, base, err := clientAndURLFromTestServer(result) + if err != nil { + t.Fatalf("Failed to get client from test server: %v", err) + } + req, err := http.NewRequest("GET", base+tt.path, nil) + if err != nil { + t.Fatal(err) + } + r, err := client.Do(req) + if err != nil { + t.Fatalf("failed to GET %s from component: %v", tt.path, err) + } + + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("failed to read response body: %v", err) + } + if err = r.Body.Close(); err != nil { + t.Fatalf("failed to close response body: %v", err) + } + if got, expected := r.StatusCode, tt.wantResponseCode; got != expected { + t.Fatalf("expected http %d at %s of component, got: %d %q", expected, tt.path, got, string(body)) + } + }) + } +} + +// TODO: Make this a util function once there is a unified way to start a testing apiserver so that we can reuse it. +func startTestAPIServer(t *testing.T) (server *kubeapiservertesting.TestServer, apiserverConfig, token string, err error) { + // Insulate this test from picking up in-cluster config when run inside a pod + // We can't assume we have permissions to write to /var/run/secrets/... from a unit test to mock in-cluster config for testing + originalHost := os.Getenv("KUBERNETES_SERVICE_HOST") + if len(originalHost) > 0 { + if err = os.Setenv("KUBERNETES_SERVICE_HOST", ""); err != nil { + return + } + defer func() { + err = os.Setenv("KUBERNETES_SERVICE_HOST", originalHost) + }() + } + + // authenticate to apiserver via bearer token + token = "flwqkenfjasasdfmwerasd" // Fake token for testing. + var tokenFile *os.File + tokenFile, err = os.CreateTemp("", "kubeconfig") + if err != nil { + return + } + if _, err = tokenFile.WriteString(fmt.Sprintf(`%s,system:kube-scheduler,system:kube-scheduler,""`, token)); err != nil { + return + } + if err = tokenFile.Close(); err != nil { + return + } + + // start apiserver + server = kubeapiservertesting.StartTestServerOrDie(t, nil, []string{ + "--token-auth-file", tokenFile.Name(), + "--authorization-mode", "RBAC", + }, framework.SharedEtcd()) + + apiserverConfig = fmt.Sprintf(` +apiVersion: v1 +kind: Config +clusters: +- cluster: + server: %s + certificate-authority: %s + name: integration +contexts: +- context: + cluster: integration + user: kube-scheduler + name: default-context +current-context: default-context +users: +- name: kube-scheduler + user: + token: %s +`, server.ClientConfig.Host, server.ServerOpts.SecureServing.ServerCert.CertKey.CertFile, token) + return server, apiserverConfig, token, nil +} + +func clientAndURLFromTestServer(s kubeschedulertesting.TestServer) (*http.Client, string, error) { + secureInfo := s.Config.SecureServing + secureOptions := s.Options.SecureServing + url := fmt.Sprintf("https://%s", secureInfo.Listener.Addr().String()) + url = strings.ReplaceAll(url, "[::]", "127.0.0.1") // switch to IPv4 because the self-signed cert does not support [::] + + // read self-signed server cert disk + pool := x509.NewCertPool() + serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt") + serverCert, err := os.ReadFile(serverCertPath) + if err != nil { + return nil, "", fmt.Errorf("Failed to read component server cert %q: %w", serverCertPath, err) + } + pool.AppendCertsFromPEM(serverCert) + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: pool, + }, + } + client := &http.Client{Transport: tr} + return client, url, nil +} diff --git a/test/integration/scheduler/serving/main_test.go b/test/integration/scheduler/serving/main_test.go new file mode 100644 index 00000000000..1694670f32c --- /dev/null +++ b/test/integration/scheduler/serving/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serving + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} From 44c08fdbd592b7c167ad0c3b0b4a3b52b99c116f Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Sat, 20 May 2023 16:38:21 +0000 Subject: [PATCH 2/2] Add /readyz for kube-scheduler /readyz contains `sched-handler-sync`, `leaderElection` (when election is enabled) and `shutdown` checks --- cmd/kube-scheduler/app/server.go | 24 ++++++++-- .../k8s.io/apiserver/pkg/server/healthz.go | 21 +------- .../apiserver/pkg/server/healthz/healthz.go | 23 +++++++++ .../scheduler/serving/healthcheck_test.go | 48 +++++++++++++++++-- 4 files changed, 88 insertions(+), 28 deletions(-) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 9d75834fc4e..dc9f8638f0f 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -167,10 +167,12 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * defer cc.EventBroadcaster.Shutdown() // Setup healthz checks. - var checks []healthz.HealthChecker + var checks, readyzChecks []healthz.HealthChecker if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) + readyzChecks = append(readyzChecks, cc.LeaderElection.WatchDog) } + readyzChecks = append(readyzChecks, healthz.NewShutdownHealthz(ctx.Done())) waitingForLeader := make(chan struct{}) isLeader := func() bool { @@ -184,9 +186,20 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * } } + handlerSyncReadyCh := make(chan struct{}) + handlerSyncCheck := healthz.NamedCheck("sched-handler-sync", func(_ *http.Request) error { + select { + case <-handlerSyncReadyCh: + return nil + default: + } + return fmt.Errorf("waiting for handlers to sync") + }) + readyzChecks = append(readyzChecks, handlerSyncCheck) + // Start up the healthz server. if cc.SecureServing != nil { - handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) + handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above @@ -214,6 +227,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * logger.Error(err, "waiting for handlers to sync") } + close(handlerSyncReadyCh) logger.V(3).Info("Handlers synced") } if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil { @@ -290,10 +304,12 @@ func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers inform // newHealthEndpointsAndMetricsHandler creates an API health server from the config, and will also // embed the metrics handler. -func newHealthEndpointsAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler { +// TODO: healthz check is deprecated, please use livez and readyz instead. Will be removed in the future. +func newHealthEndpointsAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, healthzChecks, readyzChecks []healthz.HealthChecker) http.Handler { pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler") - healthz.InstallHandler(pathRecorderMux, checks...) + healthz.InstallHandler(pathRecorderMux, healthzChecks...) healthz.InstallLivezHandler(pathRecorderMux) + healthz.InstallReadyzHandler(pathRecorderMux, readyzChecks...) installMetricHandler(pathRecorderMux, informers, isLeader) slis.SLIMetricsWithReset{}.Install(pathRecorderMux) diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz.go b/staging/src/k8s.io/apiserver/pkg/server/healthz.go index f4b564c48f6..96da308e8c5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/healthz.go +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz.go @@ -118,7 +118,7 @@ func (s *GenericAPIServer) AddLivezChecks(delay time.Duration, checks ...healthz // that we can register that the api-server is no longer ready while we attempt to gracefully // shutdown. func (s *GenericAPIServer) addReadyzShutdownCheck(stopCh <-chan struct{}) error { - return s.AddReadyzChecks(shutdownCheck{stopCh}) + return s.AddReadyzChecks(healthz.NewShutdownHealthz(stopCh)) } // installHealthz creates the healthz endpoint for this server @@ -139,25 +139,6 @@ func (s *GenericAPIServer) installLivez() { s.livezRegistry.installHandler(s.Handler.NonGoRestfulMux) } -// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences -// for the apiserver. -type shutdownCheck struct { - StopCh <-chan struct{} -} - -func (shutdownCheck) Name() string { - return "shutdown" -} - -func (c shutdownCheck) Check(req *http.Request) error { - select { - case <-c.StopCh: - return fmt.Errorf("process is shutting down") - default: - } - return nil -} - // delayedHealthCheck wraps a health check which will not fail until the explicitly defined delay has elapsed. This // is intended for use primarily for livez health checks. func delayedHealthCheck(check healthz.HealthChecker, clock clock.Clock, delay time.Duration) healthz.HealthChecker { diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go b/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go index 4613fd6d17e..76f5745b336 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go @@ -105,6 +105,29 @@ func (i *informerSync) Name() string { return "informer-sync" } +type shutdown struct { + stopCh <-chan struct{} +} + +// NewShutdownHealthz returns a new HealthChecker that will fail if the embedded channel is closed. +// This is intended to allow for graceful shutdown sequences. +func NewShutdownHealthz(stopCh <-chan struct{}) HealthChecker { + return &shutdown{stopCh} +} + +func (s *shutdown) Name() string { + return "shutdown" +} + +func (s *shutdown) Check(req *http.Request) error { + select { + case <-s.stopCh: + return fmt.Errorf("process is shutting down") + default: + } + return nil +} + func (i *informerSync) Check(_ *http.Request) error { stopCh := make(chan struct{}) // Close stopCh to force checking if informers are synced now. diff --git a/test/integration/scheduler/serving/healthcheck_test.go b/test/integration/scheduler/serving/healthcheck_test.go index 47104970d64..c094fcd9ace 100644 --- a/test/integration/scheduler/serving/healthcheck_test.go +++ b/test/integration/scheduler/serving/healthcheck_test.go @@ -39,6 +39,7 @@ func TestHealthEndpoints(t *testing.T) { t.Fatalf("Failed to start kube-apiserver server: %v", err) } defer server.TearDownFn() + apiserverConfig, err := os.CreateTemp("", "kubeconfig") if err != nil { t.Fatalf("Failed to create config file: %v", err) @@ -50,30 +51,65 @@ func TestHealthEndpoints(t *testing.T) { t.Fatalf("Failed to write config file: %v", err) } + brokenConfigStr := strings.ReplaceAll(configStr, "127.0.0.1", "127.0.0.2") + brokenConfig, err := os.CreateTemp("", "kubeconfig") + if err != nil { + t.Fatalf("Failed to create config file: %v", err) + } + if _, err := brokenConfig.WriteString(brokenConfigStr); err != nil { + t.Fatalf("Failed to write config file: %v", err) + } + defer func() { + _ = os.Remove(brokenConfig.Name()) + }() + tests := []struct { name string path string + useBrokenConfig bool wantResponseCode int }{ { "/healthz", "/healthz", + false, http.StatusOK, }, { "/livez", "/livez", + false, http.StatusOK, }, { "/livez with ping check", "/livez/ping", + false, http.StatusOK, }, { "/readyz", "/readyz", - http.StatusNotFound, + false, + http.StatusOK, + }, + { + "/readyz with sched-handler-sync", + "/readyz/sched-handler-sync", + false, + http.StatusOK, + }, + { + "/readyz with shutdown", + "/readyz/shutdown", + false, + http.StatusOK, + }, + { + "/readyz with broken apiserver", + "/readyz", + true, + http.StatusInternalServerError, }, } @@ -82,9 +118,13 @@ func TestHealthEndpoints(t *testing.T) { tt := tt _, ctx := ktesting.NewTestContext(t) + configFile := apiserverConfig.Name() + if tt.useBrokenConfig { + configFile = brokenConfig.Name() + } result, err := kubeschedulertesting.StartTestServer( ctx, - []string{"--kubeconfig", apiserverConfig.Name(), "--leader-elect=false", "--authorization-always-allow-paths", tt.path}) + []string{"--kubeconfig", configFile, "--leader-elect=false", "--authorization-always-allow-paths", tt.path}) if err != nil { t.Fatalf("Failed to start kube-scheduler server: %v", err) @@ -99,7 +139,7 @@ func TestHealthEndpoints(t *testing.T) { } req, err := http.NewRequest("GET", base+tt.path, nil) if err != nil { - t.Fatal(err) + t.Fatalf("failed to request: %v", err) } r, err := client.Do(req) if err != nil { @@ -151,7 +191,7 @@ func startTestAPIServer(t *testing.T) (server *kubeapiservertesting.TestServer, // start apiserver server = kubeapiservertesting.StartTestServerOrDie(t, nil, []string{ "--token-auth-file", tokenFile.Name(), - "--authorization-mode", "RBAC", + "--authorization-mode", "AlwaysAllow", }, framework.SharedEtcd()) apiserverConfig = fmt.Sprintf(`