diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index a290d90ff6a..e2731f45ddc 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -139,16 +139,17 @@ func TestAddFlags(t *testing.T) { expected := &ServerRunOptions{ Options: &controlplaneapiserver.Options{ GenericServerRunOptions: &apiserveroptions.ServerRunOptions{ - AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"), - CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"}, - MaxRequestsInFlight: 400, - MaxMutatingRequestsInFlight: 200, - RequestTimeout: time.Duration(2) * time.Minute, - MinRequestTimeout: 1800, - JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024), - MaxRequestBodyBytes: int64(3 * 1024 * 1024), - ComponentGlobalsRegistry: componentGlobalsRegistry, - ComponentName: utilversion.DefaultKubeComponent, + AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"), + CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"}, + MaxRequestsInFlight: 400, + MaxMutatingRequestsInFlight: 200, + RequestTimeout: time.Duration(2) * time.Minute, + MinRequestTimeout: 1800, + StorageInitializationTimeout: time.Minute, + JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024), + MaxRequestBodyBytes: int64(3 * 1024 * 1024), + ComponentGlobalsRegistry: componentGlobalsRegistry, + ComponentName: utilversion.DefaultKubeComponent, }, Admission: &kubeoptions.AdmissionOptions{ GenericAdmission: &apiserveroptions.AdmissionOptions{ diff --git a/pkg/controlplane/apiserver/options/options_test.go b/pkg/controlplane/apiserver/options/options_test.go index 3836e23c40e..54488d3b700 100644 --- a/pkg/controlplane/apiserver/options/options_test.go +++ b/pkg/controlplane/apiserver/options/options_test.go @@ -123,16 +123,17 @@ func TestAddFlags(t *testing.T) { // This is a snapshot of expected options parsed by args. expected := &Options{ GenericServerRunOptions: &apiserveroptions.ServerRunOptions{ - AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"), - CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"}, - MaxRequestsInFlight: 400, - MaxMutatingRequestsInFlight: 200, - RequestTimeout: time.Duration(2) * time.Minute, - MinRequestTimeout: 1800, - JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024), - MaxRequestBodyBytes: int64(3 * 1024 * 1024), - ComponentGlobalsRegistry: componentGlobalsRegistry, - ComponentName: utilversion.DefaultKubeComponent, + AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"), + CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"}, + MaxRequestsInFlight: 400, + MaxMutatingRequestsInFlight: 200, + RequestTimeout: time.Duration(2) * time.Minute, + MinRequestTimeout: 1800, + StorageInitializationTimeout: time.Minute, + JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024), + MaxRequestBodyBytes: int64(3 * 1024 * 1024), + ComponentGlobalsRegistry: componentGlobalsRegistry, + ComponentName: utilversion.DefaultKubeComponent, }, Admission: &kubeoptions.AdmissionOptions{ GenericAdmission: &apiserveroptions.AdmissionOptions{ diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index b22eb8ba562..c5dbe50002c 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -245,6 +245,10 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele }) } + if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) { + s.GenericAPIServer.AddPostStartHookOrDie("storage-readiness", s.GenericAPIServer.StorageReadinessHook.Hook) + } + s.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error { go legacytokentracking.NewController(client).Run(hookContext.StopCh) return nil diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 16f92c2b17e..3a346ed5242 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1275,6 +1275,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, + genericfeatures.WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta}, + genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta}, genericfeatures.WatchList: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index de27f413a6f..037201c4658 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -290,6 +290,12 @@ const ( // Enables support for watch bookmark events. WatchBookmark featuregate.Feature = "WatchBookmark" + // owner: @wojtek-t + // beta: v1.31 + // + // Enables post-start-hook for storage readiness + WatchCacheInitializationPostStartHook featuregate.Feature = "WatchCacheInitializationPostStartHook" + // owner: @serathius // beta: 1.30 // Enables watches without resourceVersion to be served from storage. @@ -407,6 +413,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, + WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta}, + WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta}, InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 98efac93339..198c09f6b8e 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -226,6 +226,10 @@ type Store struct { // storageVersionHash as empty in the discovery document. StorageVersioner runtime.GroupVersioner + // ReadinessCheckFunc checks if the storage is ready for accepting requests. + // The field is optional, if set needs to be thread-safe. + ReadinessCheckFunc func() error + // DestroyFunc cleans up clients used by the underlying Storage; optional. // If set, DestroyFunc has to be implemented in thread-safe way and // be prepared for being called more than once. @@ -234,6 +238,7 @@ type Store struct { // Note: the rest.StandardStorage interface aggregates the common REST verbs var _ rest.StandardStorage = &Store{} +var _ rest.StorageWithReadiness = &Store{} var _ rest.TableConvertor = &Store{} var _ GenericStore = &Store{} @@ -292,6 +297,14 @@ func (e *Store) New() runtime.Object { return e.NewFunc() } +// ReadinessCheck checks if the storage is ready for accepting requests. +func (e *Store) ReadinessCheck() error { + if e.ReadinessCheckFunc != nil { + return e.ReadinessCheckFunc() + } + return nil +} + // Destroy cleans up its resources on shutdown. func (e *Store) Destroy() { if e.DestroyFunc != nil { @@ -1614,6 +1627,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { } } } + if e.Storage.Storage != nil { + e.ReadinessCheckFunc = e.Storage.Storage.ReadinessCheck + } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index 78b6ea8b0ef..03cea7bb70b 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -52,6 +52,8 @@ import ( // Storage is a generic interface for RESTful storage services. // Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected // that objects may implement any of the below interfaces. +// +// Consider using StorageWithReadiness whenever possible. type Storage interface { // New returns an empty object that can be used with Create and Update after request data has been put into it. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) @@ -63,6 +65,14 @@ type Storage interface { Destroy() } +// StorageWithReadiness extends Storage interface with the readiness check. +type StorageWithReadiness interface { + Storage + + // ReadinessCheck allows for checking storage readiness. + ReadinessCheck() error +} + // Scoper indicates what scope the resource is at. It must be specified. // It is usually provided automatically based on your strategy. type Scoper interface { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 6f0ca1bcac4..febefa61afb 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -216,6 +216,10 @@ type Config struct { // twice this value. Note that it is up to the request handlers to ignore or honor this timeout. In seconds. MinRequestTimeout int + // StorageInitializationTimeout defines the maximum amount of time to wait for storage initialization + // before declaring apiserver ready. + StorageInitializationTimeout time.Duration + // This represents the maximum amount of time it should take for apiserver to complete its startup // sequence and become healthy. From apiserver's start time to when this amount of time has // elapsed, /livez will assume that unfinished post-start hooks will complete successfully and @@ -426,6 +430,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { MaxMutatingRequestsInFlight: 200, RequestTimeout: time.Duration(60) * time.Second, MinRequestTimeout: 1800, + StorageInitializationTimeout: time.Minute, LivezGracePeriod: time.Duration(0), ShutdownDelayDuration: time.Duration(0), // 1.5MB is the default client request size in bytes @@ -824,6 +829,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G ShutdownSendRetryAfter: c.ShutdownSendRetryAfter, APIServerID: c.APIServerID, + StorageReadinessHook: NewStorageReadinessHook(c.StorageInitializationTimeout), StorageVersionManager: c.StorageVersionManager, EffectiveVersion: c.EffectiveVersion, diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index fed94bf4abd..52fb449ac6e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -233,6 +233,10 @@ type GenericAPIServer struct { // APIServerID is the ID of this API server APIServerID string + // StorageReadinessHook implements post-start-hook functionality for checking readiness + // of underlying storage for registered resources. + StorageReadinessHook *StorageReadinessHook + // StorageVersionManager holds the storage versions of the API resources installed by this server. StorageVersionManager storageversion.Manager @@ -844,6 +848,7 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo } else { s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService()) } + s.registerStorageReadinessCheck("", apiGroupInfo) return nil } @@ -902,10 +907,28 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro s.DiscoveryGroupManager.AddGroup(apiGroup) s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService()) + s.registerStorageReadinessCheck(apiGroupInfo.PrioritizedVersions[0].Group, apiGroupInfo) } return nil } +// registerStorageReadinessCheck registers the readiness checks for all underlying storages +// for a given APIGroup. +func (s *GenericAPIServer) registerStorageReadinessCheck(groupName string, apiGroupInfo *APIGroupInfo) { + for version, storageMap := range apiGroupInfo.VersionedResourcesStorageMap { + for resource, storage := range storageMap { + if withReadiness, ok := storage.(rest.StorageWithReadiness); ok { + gvr := metav1.GroupVersionResource{ + Group: groupName, + Version: version, + Resource: resource, + } + s.StorageReadinessHook.RegisterStorage(gvr, withReadiness) + } + } + } +} + // InstallAPIGroup exposes the given api group in the API. // The passed into this function shouldn't be used elsewhere as the // underlying storage will be destroyed on this servers shutdown. diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index 593d591638e..9246366dd63 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -47,16 +47,17 @@ const ( type ServerRunOptions struct { AdvertiseAddress net.IP - CorsAllowedOriginList []string - HSTSDirectives []string - ExternalHost string - MaxRequestsInFlight int - MaxMutatingRequestsInFlight int - RequestTimeout time.Duration - GoawayChance float64 - LivezGracePeriod time.Duration - MinRequestTimeout int - ShutdownDelayDuration time.Duration + CorsAllowedOriginList []string + HSTSDirectives []string + ExternalHost string + MaxRequestsInFlight int + MaxMutatingRequestsInFlight int + RequestTimeout time.Duration + GoawayChance float64 + LivezGracePeriod time.Duration + MinRequestTimeout int + StorageInitializationTimeout time.Duration + ShutdownDelayDuration time.Duration // We intentionally did not add a flag for this option. Users of the // apiserver library can wire it to a flag. JSONPatchMaxCopyBytes int64 @@ -116,6 +117,7 @@ func NewServerRunOptionsForComponent(componentName string, componentGlobalsRegis RequestTimeout: defaults.RequestTimeout, LivezGracePeriod: defaults.LivezGracePeriod, MinRequestTimeout: defaults.MinRequestTimeout, + StorageInitializationTimeout: defaults.StorageInitializationTimeout, ShutdownDelayDuration: defaults.ShutdownDelayDuration, ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod, JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, @@ -140,6 +142,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.RequestTimeout = s.RequestTimeout c.GoawayChance = s.GoawayChance c.MinRequestTimeout = s.MinRequestTimeout + c.StorageInitializationTimeout = s.StorageInitializationTimeout c.ShutdownDelayDuration = s.ShutdownDelayDuration c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes c.MaxRequestBodyBytes = s.MaxRequestBodyBytes @@ -197,6 +200,10 @@ func (s *ServerRunOptions) Validate() []error { errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value")) } + if s.StorageInitializationTimeout < 0 { + errors = append(errors, fmt.Errorf("--storage-initialization-timeout can not be negative value")) + } + if s.ShutdownDelayDuration < 0 { errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value")) } @@ -350,6 +357,9 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "handler, which picks a randomized value above this number as the connection timeout, "+ "to spread out load.") + fs.DurationVar(&s.StorageInitializationTimeout, "storage-initialization-timeout", s.StorageInitializationTimeout, + "Maximum amount of time to wait for storage initialization before declaring apiserver ready. Defaults to 1m.") + fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+ "Time to delay the termination. During that time the server keeps serving requests normally. The endpoints /healthz and /livez "+ "will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+ diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook.go b/staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook.go new file mode 100644 index 00000000000..64fc241a6d3 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook.go @@ -0,0 +1,91 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "errors" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/klog/v2" +) + +// StorageReadinessHook implements PostStartHook functionality for checking readiness +// of underlying storage for registered resources. +type StorageReadinessHook struct { + timeout time.Duration + + lock sync.Mutex + checks map[string]func() error +} + +// NewStorageReadinessHook created new StorageReadinessHook. +func NewStorageReadinessHook(timeout time.Duration) *StorageReadinessHook { + return &StorageReadinessHook{ + checks: make(map[string]func() error), + timeout: timeout, + } +} + +func (h *StorageReadinessHook) RegisterStorage(gvr metav1.GroupVersionResource, storage rest.StorageWithReadiness) { + h.lock.Lock() + defer h.lock.Unlock() + + if _, ok := h.checks[gvr.String()]; !ok { + h.checks[gvr.String()] = storage.ReadinessCheck + } else { + klog.Errorf("Registering storage readiness hook for %s again: ", gvr.String()) + } +} + +func (h *StorageReadinessHook) check() bool { + h.lock.Lock() + defer h.lock.Unlock() + + failedChecks := []string{} + for gvr, check := range h.checks { + if err := check(); err != nil { + failedChecks = append(failedChecks, gvr) + } + } + if len(failedChecks) == 0 { + klog.Infof("Storage is ready for all registered resources") + return true + } + klog.V(4).Infof("Storage is not ready for: %v", failedChecks) + return false +} + +func (h *StorageReadinessHook) Hook(ctx PostStartHookContext) error { + deadlineCtx, cancel := context.WithTimeout(ctx, h.timeout) + defer cancel() + err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true, + func(_ context.Context) (bool, error) { + if ok := h.check(); ok { + return true, nil + } + return false, nil + }) + if errors.Is(err, context.DeadlineExceeded) { + klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring") + } + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook_test.go b/staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook_test.go new file mode 100644 index 00000000000..422c7a9fe84 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "fmt" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +type fakeReadinessStorage struct { + result error +} + +func (s *fakeReadinessStorage) New() runtime.Object { return nil } +func (s *fakeReadinessStorage) Destroy() {} +func (s *fakeReadinessStorage) ReadinessCheck() error { return s.result } + +func testGVR(index int) metav1.GroupVersionResource { + return metav1.GroupVersionResource{ + Group: "group", + Version: "version", + Resource: fmt.Sprintf("resource-%d", index), + } +} + +func TestStorageReadinessHook(t *testing.T) { + h := NewStorageReadinessHook(time.Second) + + numChecks := 5 + storages := make([]*fakeReadinessStorage, numChecks) + for i := 0; i < numChecks; i++ { + storages[i] = &fakeReadinessStorage{ + result: fmt.Errorf("failed"), + } + h.RegisterStorage(testGVR(i), storages[i]) + } + + for i := 0; i < numChecks; i++ { + if ok := h.check(); ok { + t.Errorf("%d: unexpected check pass", i) + } + storages[i].result = nil + } + if ok := h.check(); !ok { + t.Errorf("unexpected check failure") + } +} + +func TestStorageReadinessHookTimeout(t *testing.T) { + h := NewStorageReadinessHook(time.Second) + + storage := &fakeReadinessStorage{ + result: fmt.Errorf("failed"), + } + h.RegisterStorage(testGVR(0), storage) + + ctx := context.Background() + hookCtx := PostStartHookContext{ + LoopbackClientConfig: nil, + StopCh: ctx.Done(), + Context: ctx, + } + if err := h.Hook(hookCtx); err != nil { + t.Errorf("unexpected hook failure on timeout") + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index cdbf028c3b5..c553045814d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -962,6 +962,14 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) { return c.storage.Count(pathPrefix) } +// ReadinessCheck implements storage.Interface. +func (c *Cacher) ReadinessCheck() error { + if !c.ready.check() { + return storage.ErrStorageNotReady + } + return nil +} + // baseObjectThreadUnsafe omits locking for cachingObject. func baseObjectThreadUnsafe(object runtime.Object) runtime.Object { if co, ok := object.(*cachingObject); ok { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index a5e62cbb689..d30501026a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -181,6 +181,9 @@ func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.O func (d *dummyStorage) Count(_ string) (int64, error) { return 0, fmt.Errorf("unimplemented") } +func (d *dummyStorage) ReadinessCheck() error { + return nil +} func (d *dummyStorage) injectError(err error) { d.Lock() defer d.Unlock() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/errors.go b/staging/src/k8s.io/apiserver/pkg/storage/errors.go index e7e0957489d..cc2c1c974d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/errors.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/errors.go @@ -25,7 +25,10 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" ) -var ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created") +var ( + ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created") + ErrStorageNotReady = errors.New("storage not ready") +) const ( ErrCodeKeyNotFound int = iota + 1 diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index efd91dc6995..3eb967188b6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -591,6 +591,11 @@ func (s *store) Count(key string) (int64, error) { return getResp.Count, nil } +// ReadinessCheck implements storage.Interface. +func (s *store) ReadinessCheck() error { + return nil +} + // resolveGetListRev is used by GetList to resolve the rev to use in the client.KV.Get request. func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts storage.ListOptions) (int64, error) { var withRev int64 diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index cc891c6b90f..cff804b285d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -243,6 +243,9 @@ type Interface interface { // Count returns number of different entries under the key (generally being path prefix). Count(key string) (int64, error) + // ReadinessCheck checks if the storage is ready for accepting requests. + ReadinessCheck() error + // RequestWatchProgress requests the a watch stream progress status be sent in the // watch response stream as soon as possible. // Used for monitor watch progress even if watching resources with no changes. diff --git a/test/e2e/apimachinery/health_handlers.go b/test/e2e/apimachinery/health_handlers.go index ca64adb337e..6693afd3e16 100644 --- a/test/e2e/apimachinery/health_handlers.go +++ b/test/e2e/apimachinery/health_handlers.go @@ -24,6 +24,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + apiserverfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/test/e2e/framework" @@ -126,6 +128,13 @@ var _ = SIGDescribe("health handlers", func() { f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged ginkgo.It("should contain necessary checks", func(ctx context.Context) { + if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) { + storageReadinessCheck := "[+]poststarthook/storage-readiness ok" + requiredHealthzChecks.Insert(storageReadinessCheck) + requiredLivezChecks.Insert(storageReadinessCheck) + requiredReadyzChecks.Insert(storageReadinessCheck) + } + ginkgo.By("/health") err := testPath(ctx, f.ClientSet, "/healthz?verbose=1", requiredHealthzChecks) framework.ExpectNoError(err)