From 54dcf5c9c46fc4782d4861936309349b5a71a1ac Mon Sep 17 00:00:00 2001 From: Han Kang Date: Thu, 30 May 2019 11:19:49 -0700 Subject: [PATCH] add readyz endpoint for kube-apiserver readiness checks add startup sequence duration and readyz endpoint add rbac bootstrapping policy for readyz add integration test around grace period and readyz rename startup sequence duration flag copy health checks to fields rename health-check installed boolean, refactor clock injection logic cleanup clock injection code remove todo about poststarthook url registration from healthz --- cmd/kube-apiserver/app/testing/BUILD | 1 + cmd/kube-apiserver/app/testing/testserver.go | 11 ++- .../authorizer/rbac/bootstrappolicy/policy.go | 4 +- .../testdata/cluster-roles.yaml | 2 + staging/src/k8s.io/apiserver/pkg/server/BUILD | 3 + .../src/k8s.io/apiserver/pkg/server/config.go | 24 ++++- .../apiserver/pkg/server/config_test.go | 10 +- .../apiserver/pkg/server/genericapiserver.go | 19 +++- .../k8s.io/apiserver/pkg/server/healthz.go | 98 +++++++++++++++++-- .../apiserver/pkg/server/healthz/healthz.go | 8 ++ .../apiserver/pkg/server/healthz_test.go | 77 +++++++++++++++ .../src/k8s.io/apiserver/pkg/server/hooks.go | 2 +- .../pkg/server/options/server_run_options.go | 12 +++ .../server/options/server_run_options_test.go | 16 +++ .../integration/master/kube_apiserver_test.go | 67 +++++++++++++ 15 files changed, 334 insertions(+), 20 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/healthz_test.go diff --git a/cmd/kube-apiserver/app/testing/BUILD b/cmd/kube-apiserver/app/testing/BUILD index 758f8fcb0f8..c19fbf699de 100644 --- a/cmd/kube-apiserver/app/testing/BUILD +++ b/cmd/kube-apiserver/app/testing/BUILD @@ -17,6 +17,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 9fa1c82516e..6d3404ed600 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -25,12 +25,13 @@ import ( "runtime" "time" - pflag "github.com/spf13/pflag" + "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -45,6 +46,8 @@ type TearDownFunc func() type TestServerInstanceOptions struct { // DisableStorageCleanup Disable the automatic storage cleanup DisableStorageCleanup bool + // Injected health + InjectedHealthzChecker healthz.HealthzChecker } // TestServer return values supplied by kube-test-ApiServer @@ -144,6 +147,12 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) server, err := app.CreateServerChain(completedOptions, stopCh) + + if instanceOptions.InjectedHealthzChecker != nil { + t.Logf("Adding health check with delay %v %v", s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker.Name()) + server.AddDelayedHealthzChecks(s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker) + } + if err != nil { return result, fmt.Errorf("failed to create server chain: %v", err) } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 53e7298e5c4..0b8795670f0 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -198,7 +198,7 @@ func ClusterRoles() []rbacv1.ClusterRole { ObjectMeta: metav1.ObjectMeta{Name: "system:discovery"}, Rules: []rbacv1.PolicyRule{ rbacv1helpers.NewRule("get").URLs( - "/healthz", "/version", "/version/", + "/readyz", "/healthz", "/version", "/version/", "/openapi", "/openapi/*", "/api", "/api/*", "/apis", "/apis/*", @@ -218,7 +218,7 @@ func ClusterRoles() []rbacv1.ClusterRole { ObjectMeta: metav1.ObjectMeta{Name: "system:public-info-viewer"}, Rules: []rbacv1.PolicyRule{ rbacv1helpers.NewRule("get").URLs( - "/healthz", "/version", "/version/", + "/readyz", "/healthz", "/version", "/version/", ).RuleOrDie(), }, }, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index bfadaea2079..eb134e34ce3 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -535,6 +535,7 @@ items: - /healthz - /openapi - /openapi/* + - /readyz - /version - /version/ verbs: @@ -1156,6 +1157,7 @@ items: rules: - nonResourceURLs: - /healthz + - /readyz - /version - /version/ verbs: diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index 80afc805f86..16e427ce3c0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -12,6 +12,7 @@ go_test( "config_selfclient_test.go", "config_test.go", "genericapiserver_test.go", + "healthz_test.go", ], embed = [":go_default_library"], deps = [ @@ -19,6 +20,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", @@ -66,6 +68,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 03cf1776e80..2ded9d9849f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -32,11 +32,11 @@ import ( jsonpatch "github.com/evanphx/json-patch" "github.com/go-openapi/spec" "github.com/pborman/uuid" - "k8s.io/klog" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" @@ -65,6 +65,7 @@ import ( restclient "k8s.io/client-go/rest" certutil "k8s.io/client-go/util/cert" "k8s.io/component-base/logs" + "k8s.io/klog" openapicommon "k8s.io/kube-openapi/pkg/common" // install apis @@ -135,6 +136,8 @@ type Config struct { DiscoveryAddresses discovery.Addresses // The default set of healthz checks. There might be more added via AddHealthzChecks dynamically. HealthzChecks []healthz.HealthzChecker + // The default set of readyz-only checks. There might be more added via AddReadyzChecks dynamically. + ReadyzChecks []healthz.HealthzChecker // LegacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests // to InstallLegacyAPIGroup. New API servers don't generally have legacy groups at all. LegacyAPIGroupPrefixes sets.String @@ -156,6 +159,12 @@ type Config struct { // If specified, long running requests such as watch will be allocated a random timeout between this value, and // twice this value. Note that it is up to the request handlers to ignore or honor this timeout. In seconds. MinRequestTimeout int + + // This represents the maximum amount of time it should take for apiserver to complete its startup + // sequence and become healthy. From apiserver's start time to when this amount of time has + // elapsed, /healthz will assume that unfinished post-start hooks will complete successfully and + // therefore return true. + MaxStartupSequenceDuration time.Duration // The limit on the total size increase all "copy" operations in a json // patch may cause. // This affects all places that applies json patch in the binary. @@ -256,13 +265,15 @@ type AuthorizationInfo struct { // NewConfig returns a Config struct with the default values func NewConfig(codecs serializer.CodecFactory) *Config { + defaultHealthChecks := []healthz.HealthzChecker{healthz.PingHealthz, healthz.LogHealthz} return &Config{ Serializer: codecs, BuildHandlerChainFunc: DefaultBuildHandlerChain, HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup), LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), DisabledPostStartHooks: sets.NewString(), - HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz, healthz.LogHealthz}, + HealthzChecks: append([]healthz.HealthzChecker{}, defaultHealthChecks...), + ReadyzChecks: append([]healthz.HealthzChecker{}, defaultHealthChecks...), EnableIndex: true, EnableDiscovery: true, EnableProfiling: true, @@ -271,6 +282,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { MaxMutatingRequestsInFlight: 200, RequestTimeout: time.Duration(60) * time.Second, MinRequestTimeout: 1800, + MaxStartupSequenceDuration: time.Duration(0), // 10MB is the recommended maximum client request size in bytes // the etcd server should accept. See // https://github.com/etcd-io/etcd/blob/release-3.3/etcdserver/server.go#L90. @@ -479,7 +491,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, - SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, @@ -493,12 +504,16 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G preShutdownHooks: map[string]preShutdownHookEntry{}, disabledPostStartHooks: c.DisabledPostStartHooks, - healthzChecks: c.HealthzChecks, + healthzChecks: c.HealthzChecks, + readyzChecks: c.ReadyzChecks, + readinessStopCh: make(chan struct{}), + maxStartupSequenceDuration: c.MaxStartupSequenceDuration, DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), enableAPIResponseCompression: c.EnableAPIResponseCompression, maxRequestBodyBytes: c.MaxRequestBodyBytes, + healthzClock: clock.RealClock{}, } for { @@ -546,6 +561,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } s.healthzChecks = append(s.healthzChecks, delegateCheck) + s.readyzChecks = append(s.readyzChecks, delegateCheck) } s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 9a6fded8475..8cef53abf71 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -115,7 +115,15 @@ func TestNewWithDelegate(t *testing.T) { "/healthz/poststarthook/generic-apiserver-start-informers", "/healthz/poststarthook/wrapping-post-start-hook", "/healthz/wrapping-health", - "/metrics" + "/metrics", + "/readyz", + "/readyz/delegate-health", + "/readyz/log", + "/readyz/ping", + "/readyz/poststarthook/delegate-post-start-hook", + "/readyz/poststarthook/generic-apiserver-start-informers", + "/readyz/poststarthook/wrapping-post-start-hook", + "/readyz/shutdown" ] }`, t) checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 55532c3a92d..e0ef0f5fda4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -26,13 +26,13 @@ import ( systemd "github.com/coreos/go-systemd/daemon" "github.com/go-openapi/spec" - "k8s.io/klog" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apiserver/pkg/admission" @@ -45,6 +45,7 @@ import ( "k8s.io/apiserver/pkg/server/routes" utilopenapi "k8s.io/apiserver/pkg/util/openapi" restclient "k8s.io/client-go/rest" + "k8s.io/klog" openapibuilder "k8s.io/kube-openapi/pkg/builder" openapicommon "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/handler" @@ -145,9 +146,17 @@ type GenericAPIServer struct { preShutdownHooksCalled bool // healthz checks - healthzLock sync.Mutex - healthzChecks []healthz.HealthzChecker - healthzCreated bool + healthzLock sync.Mutex + healthzChecks []healthz.HealthzChecker + healthzChecksInstalled bool + readyzLock sync.Mutex + readyzChecks []healthz.HealthzChecker + readyzChecksInstalled bool + maxStartupSequenceDuration time.Duration + healthzClock clock.Clock + // the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this + // will cause readyz to return unhealthy. + readinessStopCh chan struct{} // auditing. The backend is started after the server starts listening. AuditBackend audit.Backend @@ -259,6 +268,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { } s.installHealthz() + s.installReadyz(s.readinessStopCh) // Register audit backend preShutdownHook. if s.AuditBackend != nil { @@ -327,6 +337,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { // ensure cleanup. go func() { <-stopCh + close(s.readinessStopCh) close(internalStopCh) if stoppedCh != nil { <-stoppedCh diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz.go b/staging/src/k8s.io/apiserver/pkg/server/healthz.go index 43e102b5cc7..83e4ab6705c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/healthz.go +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz.go @@ -18,20 +18,30 @@ package server import ( "fmt" + "net/http" + "time" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/server/healthz" ) -// AddHealthzCheck allows you to add a HealthzCheck. +// AddHealthzCheck adds HealthzCheck(s) to both healthz and readyz. All healthz checks +// are automatically added to readyz, since we want to avoid the situation where the +// apiserver is ready but not live. func (s *GenericAPIServer) AddHealthzChecks(checks ...healthz.HealthzChecker) error { - s.healthzLock.Lock() - defer s.healthzLock.Unlock() + return s.AddDelayedHealthzChecks(0, checks...) +} - if s.healthzCreated { - return fmt.Errorf("unable to add because the healthz endpoint has already been created") +// AddReadyzChecks allows you to add a HealthzCheck to readyz. +func (s *GenericAPIServer) AddReadyzChecks(checks ...healthz.HealthzChecker) error { + s.readyzLock.Lock() + defer s.readyzLock.Unlock() + + if s.readyzChecksInstalled { + return fmt.Errorf("unable to add because the readyz endpoint has already been created") } - s.healthzChecks = append(s.healthzChecks, checks...) + s.readyzChecks = append(s.readyzChecks, checks...) return nil } @@ -39,7 +49,81 @@ func (s *GenericAPIServer) AddHealthzChecks(checks ...healthz.HealthzChecker) er func (s *GenericAPIServer) installHealthz() { s.healthzLock.Lock() defer s.healthzLock.Unlock() - s.healthzCreated = true + s.healthzChecksInstalled = true healthz.InstallHandler(s.Handler.NonGoRestfulMux, s.healthzChecks...) } + +// installReadyz creates the readyz endpoint for this server. +func (s *GenericAPIServer) installReadyz(stopCh <-chan struct{}) { + s.AddReadyzChecks(shutdownCheck{stopCh}) + s.readyzLock.Lock() + defer s.readyzLock.Unlock() + + s.readyzChecksInstalled = true + + healthz.InstallReadyzHandler(s.Handler.NonGoRestfulMux, s.readyzChecks...) +} + +// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences +// for the apiserver. +type shutdownCheck struct { + StopCh <-chan struct{} +} + +func (shutdownCheck) Name() string { + return "shutdown" +} + +func (c shutdownCheck) Check(req *http.Request) error { + select { + case <-c.StopCh: + return fmt.Errorf("process is shutting down") + default: + } + return nil +} + +// AddDelayedHealthzChecks adds a health check to both healthz and readyz. The delay parameter +// allows you to set the grace period for healthz checks, which will return healthy while +// grace period has not yet elapsed. One may want to set a grace period in order to prevent +// the kubelet from restarting the kube-apiserver due to long-ish boot sequences. Readyz health +// checks have no grace period, since we want readyz to fail while boot has not completed. +func (s *GenericAPIServer) AddDelayedHealthzChecks(delay time.Duration, checks ...healthz.HealthzChecker) error { + s.healthzLock.Lock() + defer s.healthzLock.Unlock() + if s.healthzChecksInstalled { + return fmt.Errorf("unable to add because the healthz endpoint has already been created") + } + for _, check := range checks { + s.healthzChecks = append(s.healthzChecks, delayedHealthCheck(check, s.healthzClock, s.maxStartupSequenceDuration)) + } + + return s.AddReadyzChecks(checks...) +} + +// delayedHealthCheck wraps a health check which will not fail until the explicitly defined delay has elapsed. +func delayedHealthCheck(check healthz.HealthzChecker, clock clock.Clock, delay time.Duration) healthz.HealthzChecker { + return delayedHealthzCheck{ + check, + clock.Now().Add(delay), + clock, + } +} + +type delayedHealthzCheck struct { + check healthz.HealthzChecker + startCheck time.Time + clock clock.Clock +} + +func (c delayedHealthzCheck) Name() string { + return c.check.Name() +} + +func (c delayedHealthzCheck) Check(req *http.Request) error { + if c.clock.Now().After(c.startCheck) { + return c.check.Check(req) + } + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go b/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go index 9cb565e2cc5..81d2bccb4ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go @@ -93,6 +93,14 @@ func InstallHandler(mux mux, checks ...HealthzChecker) { InstallPathHandler(mux, "/healthz", checks...) } +// InstallReadyzHandler registers handlers for health checking on the path +// "/readyz" to mux. *All handlers* for mux must be specified in +// exactly one call to InstallHandler. Calling InstallHandler more +// than once for the same mux will result in a panic. +func InstallReadyzHandler(mux mux, checks ...HealthzChecker) { + InstallPathHandler(mux, "/readyz", checks...) +} + // InstallPathHandler registers handlers for health checking on // a specific path to mux. *All handlers* for the path must be // specified in exactly one call to InstallPathHandler. Calling diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz_test.go b/staging/src/k8s.io/apiserver/pkg/server/healthz_test.go new file mode 100644 index 00000000000..eb4ce2abd31 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +func TestDelayedHealthCheck(t *testing.T) { + t.Run("test that liveness check returns true until the delay has elapsed", func(t *testing.T) { + t0 := time.Unix(0, 0) + c := clock.NewFakeClock(t0) + doneCh := make(chan struct{}) + + healthCheck := delayedHealthCheck(postStartHookHealthz{"test", doneCh}, c, time.Duration(10)*time.Second) + err := healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + c.Step(10 * time.Second) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + c.Step(1 * time.Millisecond) + err = healthCheck.Check(nil) + if err == nil || err.Error() != "not finished" { + t.Errorf("Got '%v', but expected error to be 'not finished'", err) + } + close(doneCh) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + }) + t.Run("test that liveness check does not toggle false even if done channel is closed early", func(t *testing.T) { + t0 := time.Unix(0, 0) + c := clock.NewFakeClock(t0) + + doneCh := make(chan struct{}) + + healthCheck := delayedHealthCheck(postStartHookHealthz{"test", doneCh}, c, time.Duration(10)*time.Second) + err := healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + close(doneCh) + c.Step(10 * time.Second) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + c.Step(1 * time.Millisecond) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + }) + +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/hooks.go b/staging/src/k8s.io/apiserver/pkg/server/hooks.go index d431eda869b..1fb0bc65e90 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/hooks.go +++ b/staging/src/k8s.io/apiserver/pkg/server/hooks.go @@ -97,7 +97,7 @@ func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) // done is closed when the poststarthook is finished. This is used by the health check to be able to indicate // that the poststarthook is finished done := make(chan struct{}) - if err := s.AddHealthzChecks(postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil { + if err := s.AddDelayedHealthzChecks(s.maxStartupSequenceDuration, postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil { return err } s.postStartHooks[name] = postStartHookEntry{hook: hook, originatingStack: string(debug.Stack()), done: done} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index 73cfff1267d..46191bedb60 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -41,6 +41,7 @@ type ServerRunOptions struct { MaxRequestsInFlight int MaxMutatingRequestsInFlight int RequestTimeout time.Duration + MaxStartupSequenceDuration time.Duration MinRequestTimeout int // We intentionally did not add a flag for this option. Users of the // apiserver library can wire it to a flag. @@ -60,6 +61,7 @@ func NewServerRunOptions() *ServerRunOptions { MaxRequestsInFlight: defaults.MaxRequestsInFlight, MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, RequestTimeout: defaults.RequestTimeout, + MaxStartupSequenceDuration: defaults.MaxStartupSequenceDuration, MinRequestTimeout: defaults.MinRequestTimeout, JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, @@ -72,6 +74,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.ExternalAddress = s.ExternalHost c.MaxRequestsInFlight = s.MaxRequestsInFlight c.MaxMutatingRequestsInFlight = s.MaxMutatingRequestsInFlight + c.MaxStartupSequenceDuration = s.MaxStartupSequenceDuration c.RequestTimeout = s.RequestTimeout c.MinRequestTimeout = s.MinRequestTimeout c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes @@ -106,6 +109,10 @@ func (s *ServerRunOptions) Validate() []error { errors = append(errors, fmt.Errorf("--target-ram-mb can not be negative value")) } + if s.MaxStartupSequenceDuration < 0 { + errors = append(errors, fmt.Errorf("--maximum-startup-sequence-duration can not be a negative value")) + } + if s.EnableInfightQuotaHandler { if !utilfeature.DefaultFeatureGate.Enabled(features.RequestManagement) { errors = append(errors, fmt.Errorf("--enable-inflight-quota-handler can not be set if feature "+ @@ -185,6 +192,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "it out. This is the default request timeout for requests but may be overridden by flags such as "+ "--min-request-timeout for specific types of requests.") + fs.DurationVar(&s.MaxStartupSequenceDuration, "maximum-startup-sequence-duration", s.MaxStartupSequenceDuration, ""+ + "This option represents the maximum amount of time it should take for apiserver to complete its startup sequence "+ + "and become healthy. From apiserver's start time to when this amount of time has elapsed, /healthz will assume "+ + "that unfinished post-start hooks will complete successfully and therefore return true.") + fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+ "An optional field indicating the minimum number of seconds a handler must keep "+ "a request open before timing it out. Currently only honored by the watch request "+ diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go index bb7e618230e..6eb438aaae0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go @@ -136,6 +136,22 @@ func TestServerRunOptionsValidate(t *testing.T) { }, expectErr: "--max-resource-write-bytes can not be negative value", }, + { + name: "Test when MaxStartupSequenceDuration is negative value", + testOptions: &ServerRunOptions{ + AdvertiseAddress: net.ParseIP("192.168.10.10"), + CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"}, + MaxRequestsInFlight: 400, + MaxMutatingRequestsInFlight: 200, + RequestTimeout: time.Duration(2) * time.Minute, + MinRequestTimeout: 1800, + JSONPatchMaxCopyBytes: 10 * 1024 * 1024, + MaxRequestBodyBytes: 10 * 1024 * 1024, + TargetRAMMB: 65536, + MaxStartupSequenceDuration: -time.Second, + }, + expectErr: "--maximum-startup-sequence-duration can not be a negative value", + }, { name: "Test when ServerRunOptions is valid", testOptions: &ServerRunOptions{ diff --git a/test/integration/master/kube_apiserver_test.go b/test/integration/master/kube_apiserver_test.go index 741b338575c..9682d9f8352 100644 --- a/test/integration/master/kube_apiserver_test.go +++ b/test/integration/master/kube_apiserver_test.go @@ -22,6 +22,7 @@ import ( "net/http" "reflect" "strings" + "sync" "testing" "time" @@ -85,6 +86,48 @@ func TestRun(t *testing.T) { } } +func endpointReturnsStatusOK(client *kubernetes.Clientset, path string) bool { + res := client.CoreV1().RESTClient().Get().AbsPath(path).Do() + var status int + res.StatusCode(&status) + return status == http.StatusOK +} + +func TestStartupSequenceHealthzAndReadyz(t *testing.T) { + hc := &delayedCheck{} + instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{ + InjectedHealthzChecker: hc, + } + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{"--maximum-startup-sequence-duration", "5s"}, framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := kubernetes.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if endpointReturnsStatusOK(client, "/readyz") { + t.Fatalf("readyz should start unready") + } + // we need to wait longer than our grace period + err = wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return !endpointReturnsStatusOK(client, "/healthz"), nil + }) + if err != nil { + t.Fatalf("healthz should have become unhealthy: %v", err) + } + hc.makeHealthy() + err = wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return endpointReturnsStatusOK(client, "/healthz"), nil + }) + if err != nil { + t.Fatalf("healthz should have become healthy again: %v", err) + } + if !endpointReturnsStatusOK(client, "/readyz") { + t.Fatalf("readyz should be healthy") + } +} + // TestOpenAPIDelegationChainPlumbing is a smoke test that checks for // the existence of some representative paths from the // apiextensions-server and the kube-aggregator server, both part of @@ -253,3 +296,27 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) { func TestReconcilerMasterLeaseMultiCombined(t *testing.T) { testReconcilersMasterLease(t, 3, 3) } + +type delayedCheck struct { + healthLock sync.Mutex + isHealthy bool +} + +func (h *delayedCheck) Name() string { + return "delayed-check" +} + +func (h *delayedCheck) Check(req *http.Request) error { + h.healthLock.Lock() + defer h.healthLock.Unlock() + if h.isHealthy { + return nil + } + return fmt.Errorf("isn't healthy") +} + +func (h *delayedCheck) makeHealthy() { + h.healthLock.Lock() + defer h.healthLock.Unlock() + h.isHealthy = true +}