diff --git a/cmd/kube-apiserver/app/testing/BUILD b/cmd/kube-apiserver/app/testing/BUILD index 758f8fcb0f8..c19fbf699de 100644 --- a/cmd/kube-apiserver/app/testing/BUILD +++ b/cmd/kube-apiserver/app/testing/BUILD @@ -17,6 +17,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 9fa1c82516e..6d3404ed600 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -25,12 +25,13 @@ import ( "runtime" "time" - pflag "github.com/spf13/pflag" + "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -45,6 +46,8 @@ type TearDownFunc func() type TestServerInstanceOptions struct { // DisableStorageCleanup Disable the automatic storage cleanup DisableStorageCleanup bool + // Injected health + InjectedHealthzChecker healthz.HealthzChecker } // TestServer return values supplied by kube-test-ApiServer @@ -144,6 +147,12 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) server, err := app.CreateServerChain(completedOptions, stopCh) + + if instanceOptions.InjectedHealthzChecker != nil { + t.Logf("Adding health check with delay %v %v", s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker.Name()) + server.AddDelayedHealthzChecks(s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker) + } + if err != nil { return result, fmt.Errorf("failed to create server chain: %v", err) } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 53e7298e5c4..0b8795670f0 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -198,7 +198,7 @@ func ClusterRoles() []rbacv1.ClusterRole { ObjectMeta: metav1.ObjectMeta{Name: "system:discovery"}, Rules: []rbacv1.PolicyRule{ rbacv1helpers.NewRule("get").URLs( - "/healthz", "/version", "/version/", + "/readyz", "/healthz", "/version", "/version/", "/openapi", "/openapi/*", "/api", "/api/*", "/apis", "/apis/*", @@ -218,7 +218,7 @@ func ClusterRoles() []rbacv1.ClusterRole { ObjectMeta: metav1.ObjectMeta{Name: "system:public-info-viewer"}, Rules: []rbacv1.PolicyRule{ rbacv1helpers.NewRule("get").URLs( - "/healthz", "/version", "/version/", + "/readyz", "/healthz", "/version", "/version/", ).RuleOrDie(), }, }, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index bfadaea2079..eb134e34ce3 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -535,6 +535,7 @@ items: - /healthz - /openapi - /openapi/* + - /readyz - /version - /version/ verbs: @@ -1156,6 +1157,7 @@ items: rules: - nonResourceURLs: - /healthz + - /readyz - /version - /version/ verbs: diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index 80afc805f86..16e427ce3c0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -12,6 +12,7 @@ go_test( "config_selfclient_test.go", "config_test.go", "genericapiserver_test.go", + "healthz_test.go", ], embed = [":go_default_library"], deps = [ @@ -19,6 +20,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", @@ -66,6 +68,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 03cf1776e80..2ded9d9849f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -32,11 +32,11 @@ import ( jsonpatch "github.com/evanphx/json-patch" "github.com/go-openapi/spec" "github.com/pborman/uuid" - "k8s.io/klog" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" @@ -65,6 +65,7 @@ import ( restclient "k8s.io/client-go/rest" certutil "k8s.io/client-go/util/cert" "k8s.io/component-base/logs" + "k8s.io/klog" openapicommon "k8s.io/kube-openapi/pkg/common" // install apis @@ -135,6 +136,8 @@ type Config struct { DiscoveryAddresses discovery.Addresses // The default set of healthz checks. There might be more added via AddHealthzChecks dynamically. HealthzChecks []healthz.HealthzChecker + // The default set of readyz-only checks. There might be more added via AddReadyzChecks dynamically. + ReadyzChecks []healthz.HealthzChecker // LegacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests // to InstallLegacyAPIGroup. New API servers don't generally have legacy groups at all. LegacyAPIGroupPrefixes sets.String @@ -156,6 +159,12 @@ type Config struct { // If specified, long running requests such as watch will be allocated a random timeout between this value, and // twice this value. Note that it is up to the request handlers to ignore or honor this timeout. In seconds. MinRequestTimeout int + + // This represents the maximum amount of time it should take for apiserver to complete its startup + // sequence and become healthy. From apiserver's start time to when this amount of time has + // elapsed, /healthz will assume that unfinished post-start hooks will complete successfully and + // therefore return true. + MaxStartupSequenceDuration time.Duration // The limit on the total size increase all "copy" operations in a json // patch may cause. // This affects all places that applies json patch in the binary. @@ -256,13 +265,15 @@ type AuthorizationInfo struct { // NewConfig returns a Config struct with the default values func NewConfig(codecs serializer.CodecFactory) *Config { + defaultHealthChecks := []healthz.HealthzChecker{healthz.PingHealthz, healthz.LogHealthz} return &Config{ Serializer: codecs, BuildHandlerChainFunc: DefaultBuildHandlerChain, HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup), LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), DisabledPostStartHooks: sets.NewString(), - HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz, healthz.LogHealthz}, + HealthzChecks: append([]healthz.HealthzChecker{}, defaultHealthChecks...), + ReadyzChecks: append([]healthz.HealthzChecker{}, defaultHealthChecks...), EnableIndex: true, EnableDiscovery: true, EnableProfiling: true, @@ -271,6 +282,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { MaxMutatingRequestsInFlight: 200, RequestTimeout: time.Duration(60) * time.Second, MinRequestTimeout: 1800, + MaxStartupSequenceDuration: time.Duration(0), // 10MB is the recommended maximum client request size in bytes // the etcd server should accept. See // https://github.com/etcd-io/etcd/blob/release-3.3/etcdserver/server.go#L90. @@ -479,7 +491,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, - SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, @@ -493,12 +504,16 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G preShutdownHooks: map[string]preShutdownHookEntry{}, disabledPostStartHooks: c.DisabledPostStartHooks, - healthzChecks: c.HealthzChecks, + healthzChecks: c.HealthzChecks, + readyzChecks: c.ReadyzChecks, + readinessStopCh: make(chan struct{}), + maxStartupSequenceDuration: c.MaxStartupSequenceDuration, DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), enableAPIResponseCompression: c.EnableAPIResponseCompression, maxRequestBodyBytes: c.MaxRequestBodyBytes, + healthzClock: clock.RealClock{}, } for { @@ -546,6 +561,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } s.healthzChecks = append(s.healthzChecks, delegateCheck) + s.readyzChecks = append(s.readyzChecks, delegateCheck) } s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 9a6fded8475..8cef53abf71 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -115,7 +115,15 @@ func TestNewWithDelegate(t *testing.T) { "/healthz/poststarthook/generic-apiserver-start-informers", "/healthz/poststarthook/wrapping-post-start-hook", "/healthz/wrapping-health", - "/metrics" + "/metrics", + "/readyz", + "/readyz/delegate-health", + "/readyz/log", + "/readyz/ping", + "/readyz/poststarthook/delegate-post-start-hook", + "/readyz/poststarthook/generic-apiserver-start-informers", + "/readyz/poststarthook/wrapping-post-start-hook", + "/readyz/shutdown" ] }`, t) checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 55532c3a92d..e0ef0f5fda4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -26,13 +26,13 @@ import ( systemd "github.com/coreos/go-systemd/daemon" "github.com/go-openapi/spec" - "k8s.io/klog" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apiserver/pkg/admission" @@ -45,6 +45,7 @@ import ( "k8s.io/apiserver/pkg/server/routes" utilopenapi "k8s.io/apiserver/pkg/util/openapi" restclient "k8s.io/client-go/rest" + "k8s.io/klog" openapibuilder "k8s.io/kube-openapi/pkg/builder" openapicommon "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/handler" @@ -145,9 +146,17 @@ type GenericAPIServer struct { preShutdownHooksCalled bool // healthz checks - healthzLock sync.Mutex - healthzChecks []healthz.HealthzChecker - healthzCreated bool + healthzLock sync.Mutex + healthzChecks []healthz.HealthzChecker + healthzChecksInstalled bool + readyzLock sync.Mutex + readyzChecks []healthz.HealthzChecker + readyzChecksInstalled bool + maxStartupSequenceDuration time.Duration + healthzClock clock.Clock + // the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this + // will cause readyz to return unhealthy. + readinessStopCh chan struct{} // auditing. The backend is started after the server starts listening. AuditBackend audit.Backend @@ -259,6 +268,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { } s.installHealthz() + s.installReadyz(s.readinessStopCh) // Register audit backend preShutdownHook. if s.AuditBackend != nil { @@ -327,6 +337,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { // ensure cleanup. go func() { <-stopCh + close(s.readinessStopCh) close(internalStopCh) if stoppedCh != nil { <-stoppedCh diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz.go b/staging/src/k8s.io/apiserver/pkg/server/healthz.go index 43e102b5cc7..83e4ab6705c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/healthz.go +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz.go @@ -18,20 +18,30 @@ package server import ( "fmt" + "net/http" + "time" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/server/healthz" ) -// AddHealthzCheck allows you to add a HealthzCheck. +// AddHealthzCheck adds HealthzCheck(s) to both healthz and readyz. All healthz checks +// are automatically added to readyz, since we want to avoid the situation where the +// apiserver is ready but not live. func (s *GenericAPIServer) AddHealthzChecks(checks ...healthz.HealthzChecker) error { - s.healthzLock.Lock() - defer s.healthzLock.Unlock() + return s.AddDelayedHealthzChecks(0, checks...) +} - if s.healthzCreated { - return fmt.Errorf("unable to add because the healthz endpoint has already been created") +// AddReadyzChecks allows you to add a HealthzCheck to readyz. +func (s *GenericAPIServer) AddReadyzChecks(checks ...healthz.HealthzChecker) error { + s.readyzLock.Lock() + defer s.readyzLock.Unlock() + + if s.readyzChecksInstalled { + return fmt.Errorf("unable to add because the readyz endpoint has already been created") } - s.healthzChecks = append(s.healthzChecks, checks...) + s.readyzChecks = append(s.readyzChecks, checks...) return nil } @@ -39,7 +49,81 @@ func (s *GenericAPIServer) AddHealthzChecks(checks ...healthz.HealthzChecker) er func (s *GenericAPIServer) installHealthz() { s.healthzLock.Lock() defer s.healthzLock.Unlock() - s.healthzCreated = true + s.healthzChecksInstalled = true healthz.InstallHandler(s.Handler.NonGoRestfulMux, s.healthzChecks...) } + +// installReadyz creates the readyz endpoint for this server. +func (s *GenericAPIServer) installReadyz(stopCh <-chan struct{}) { + s.AddReadyzChecks(shutdownCheck{stopCh}) + s.readyzLock.Lock() + defer s.readyzLock.Unlock() + + s.readyzChecksInstalled = true + + healthz.InstallReadyzHandler(s.Handler.NonGoRestfulMux, s.readyzChecks...) +} + +// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences +// for the apiserver. +type shutdownCheck struct { + StopCh <-chan struct{} +} + +func (shutdownCheck) Name() string { + return "shutdown" +} + +func (c shutdownCheck) Check(req *http.Request) error { + select { + case <-c.StopCh: + return fmt.Errorf("process is shutting down") + default: + } + return nil +} + +// AddDelayedHealthzChecks adds a health check to both healthz and readyz. The delay parameter +// allows you to set the grace period for healthz checks, which will return healthy while +// grace period has not yet elapsed. One may want to set a grace period in order to prevent +// the kubelet from restarting the kube-apiserver due to long-ish boot sequences. Readyz health +// checks have no grace period, since we want readyz to fail while boot has not completed. +func (s *GenericAPIServer) AddDelayedHealthzChecks(delay time.Duration, checks ...healthz.HealthzChecker) error { + s.healthzLock.Lock() + defer s.healthzLock.Unlock() + if s.healthzChecksInstalled { + return fmt.Errorf("unable to add because the healthz endpoint has already been created") + } + for _, check := range checks { + s.healthzChecks = append(s.healthzChecks, delayedHealthCheck(check, s.healthzClock, s.maxStartupSequenceDuration)) + } + + return s.AddReadyzChecks(checks...) +} + +// delayedHealthCheck wraps a health check which will not fail until the explicitly defined delay has elapsed. +func delayedHealthCheck(check healthz.HealthzChecker, clock clock.Clock, delay time.Duration) healthz.HealthzChecker { + return delayedHealthzCheck{ + check, + clock.Now().Add(delay), + clock, + } +} + +type delayedHealthzCheck struct { + check healthz.HealthzChecker + startCheck time.Time + clock clock.Clock +} + +func (c delayedHealthzCheck) Name() string { + return c.check.Name() +} + +func (c delayedHealthzCheck) Check(req *http.Request) error { + if c.clock.Now().After(c.startCheck) { + return c.check.Check(req) + } + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go b/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go index 9cb565e2cc5..81d2bccb4ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go @@ -93,6 +93,14 @@ func InstallHandler(mux mux, checks ...HealthzChecker) { InstallPathHandler(mux, "/healthz", checks...) } +// InstallReadyzHandler registers handlers for health checking on the path +// "/readyz" to mux. *All handlers* for mux must be specified in +// exactly one call to InstallHandler. Calling InstallHandler more +// than once for the same mux will result in a panic. +func InstallReadyzHandler(mux mux, checks ...HealthzChecker) { + InstallPathHandler(mux, "/readyz", checks...) +} + // InstallPathHandler registers handlers for health checking on // a specific path to mux. *All handlers* for the path must be // specified in exactly one call to InstallPathHandler. Calling diff --git a/staging/src/k8s.io/apiserver/pkg/server/healthz_test.go b/staging/src/k8s.io/apiserver/pkg/server/healthz_test.go new file mode 100644 index 00000000000..eb4ce2abd31 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/healthz_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +func TestDelayedHealthCheck(t *testing.T) { + t.Run("test that liveness check returns true until the delay has elapsed", func(t *testing.T) { + t0 := time.Unix(0, 0) + c := clock.NewFakeClock(t0) + doneCh := make(chan struct{}) + + healthCheck := delayedHealthCheck(postStartHookHealthz{"test", doneCh}, c, time.Duration(10)*time.Second) + err := healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + c.Step(10 * time.Second) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + c.Step(1 * time.Millisecond) + err = healthCheck.Check(nil) + if err == nil || err.Error() != "not finished" { + t.Errorf("Got '%v', but expected error to be 'not finished'", err) + } + close(doneCh) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + }) + t.Run("test that liveness check does not toggle false even if done channel is closed early", func(t *testing.T) { + t0 := time.Unix(0, 0) + c := clock.NewFakeClock(t0) + + doneCh := make(chan struct{}) + + healthCheck := delayedHealthCheck(postStartHookHealthz{"test", doneCh}, c, time.Duration(10)*time.Second) + err := healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + close(doneCh) + c.Step(10 * time.Second) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + c.Step(1 * time.Millisecond) + err = healthCheck.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + }) + +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/hooks.go b/staging/src/k8s.io/apiserver/pkg/server/hooks.go index d431eda869b..1fb0bc65e90 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/hooks.go +++ b/staging/src/k8s.io/apiserver/pkg/server/hooks.go @@ -97,7 +97,7 @@ func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) // done is closed when the poststarthook is finished. This is used by the health check to be able to indicate // that the poststarthook is finished done := make(chan struct{}) - if err := s.AddHealthzChecks(postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil { + if err := s.AddDelayedHealthzChecks(s.maxStartupSequenceDuration, postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil { return err } s.postStartHooks[name] = postStartHookEntry{hook: hook, originatingStack: string(debug.Stack()), done: done} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index 73cfff1267d..46191bedb60 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -41,6 +41,7 @@ type ServerRunOptions struct { MaxRequestsInFlight int MaxMutatingRequestsInFlight int RequestTimeout time.Duration + MaxStartupSequenceDuration time.Duration MinRequestTimeout int // We intentionally did not add a flag for this option. Users of the // apiserver library can wire it to a flag. @@ -60,6 +61,7 @@ func NewServerRunOptions() *ServerRunOptions { MaxRequestsInFlight: defaults.MaxRequestsInFlight, MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, RequestTimeout: defaults.RequestTimeout, + MaxStartupSequenceDuration: defaults.MaxStartupSequenceDuration, MinRequestTimeout: defaults.MinRequestTimeout, JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, @@ -72,6 +74,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.ExternalAddress = s.ExternalHost c.MaxRequestsInFlight = s.MaxRequestsInFlight c.MaxMutatingRequestsInFlight = s.MaxMutatingRequestsInFlight + c.MaxStartupSequenceDuration = s.MaxStartupSequenceDuration c.RequestTimeout = s.RequestTimeout c.MinRequestTimeout = s.MinRequestTimeout c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes @@ -106,6 +109,10 @@ func (s *ServerRunOptions) Validate() []error { errors = append(errors, fmt.Errorf("--target-ram-mb can not be negative value")) } + if s.MaxStartupSequenceDuration < 0 { + errors = append(errors, fmt.Errorf("--maximum-startup-sequence-duration can not be a negative value")) + } + if s.EnableInfightQuotaHandler { if !utilfeature.DefaultFeatureGate.Enabled(features.RequestManagement) { errors = append(errors, fmt.Errorf("--enable-inflight-quota-handler can not be set if feature "+ @@ -185,6 +192,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "it out. This is the default request timeout for requests but may be overridden by flags such as "+ "--min-request-timeout for specific types of requests.") + fs.DurationVar(&s.MaxStartupSequenceDuration, "maximum-startup-sequence-duration", s.MaxStartupSequenceDuration, ""+ + "This option represents the maximum amount of time it should take for apiserver to complete its startup sequence "+ + "and become healthy. From apiserver's start time to when this amount of time has elapsed, /healthz will assume "+ + "that unfinished post-start hooks will complete successfully and therefore return true.") + fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+ "An optional field indicating the minimum number of seconds a handler must keep "+ "a request open before timing it out. Currently only honored by the watch request "+ diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go index bb7e618230e..6eb438aaae0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options_test.go @@ -136,6 +136,22 @@ func TestServerRunOptionsValidate(t *testing.T) { }, expectErr: "--max-resource-write-bytes can not be negative value", }, + { + name: "Test when MaxStartupSequenceDuration is negative value", + testOptions: &ServerRunOptions{ + AdvertiseAddress: net.ParseIP("192.168.10.10"), + CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"}, + MaxRequestsInFlight: 400, + MaxMutatingRequestsInFlight: 200, + RequestTimeout: time.Duration(2) * time.Minute, + MinRequestTimeout: 1800, + JSONPatchMaxCopyBytes: 10 * 1024 * 1024, + MaxRequestBodyBytes: 10 * 1024 * 1024, + TargetRAMMB: 65536, + MaxStartupSequenceDuration: -time.Second, + }, + expectErr: "--maximum-startup-sequence-duration can not be a negative value", + }, { name: "Test when ServerRunOptions is valid", testOptions: &ServerRunOptions{ diff --git a/test/integration/master/kube_apiserver_test.go b/test/integration/master/kube_apiserver_test.go index 741b338575c..9682d9f8352 100644 --- a/test/integration/master/kube_apiserver_test.go +++ b/test/integration/master/kube_apiserver_test.go @@ -22,6 +22,7 @@ import ( "net/http" "reflect" "strings" + "sync" "testing" "time" @@ -85,6 +86,48 @@ func TestRun(t *testing.T) { } } +func endpointReturnsStatusOK(client *kubernetes.Clientset, path string) bool { + res := client.CoreV1().RESTClient().Get().AbsPath(path).Do() + var status int + res.StatusCode(&status) + return status == http.StatusOK +} + +func TestStartupSequenceHealthzAndReadyz(t *testing.T) { + hc := &delayedCheck{} + instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{ + InjectedHealthzChecker: hc, + } + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{"--maximum-startup-sequence-duration", "5s"}, framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := kubernetes.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if endpointReturnsStatusOK(client, "/readyz") { + t.Fatalf("readyz should start unready") + } + // we need to wait longer than our grace period + err = wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return !endpointReturnsStatusOK(client, "/healthz"), nil + }) + if err != nil { + t.Fatalf("healthz should have become unhealthy: %v", err) + } + hc.makeHealthy() + err = wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return endpointReturnsStatusOK(client, "/healthz"), nil + }) + if err != nil { + t.Fatalf("healthz should have become healthy again: %v", err) + } + if !endpointReturnsStatusOK(client, "/readyz") { + t.Fatalf("readyz should be healthy") + } +} + // TestOpenAPIDelegationChainPlumbing is a smoke test that checks for // the existence of some representative paths from the // apiextensions-server and the kube-aggregator server, both part of @@ -253,3 +296,27 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) { func TestReconcilerMasterLeaseMultiCombined(t *testing.T) { testReconcilersMasterLease(t, 3, 3) } + +type delayedCheck struct { + healthLock sync.Mutex + isHealthy bool +} + +func (h *delayedCheck) Name() string { + return "delayed-check" +} + +func (h *delayedCheck) Check(req *http.Request) error { + h.healthLock.Lock() + defer h.healthLock.Unlock() + if h.isHealthy { + return nil + } + return fmt.Errorf("isn't healthy") +} + +func (h *delayedCheck) makeHealthy() { + h.healthLock.Lock() + defer h.healthLock.Unlock() + h.isHealthy = true +}