From 0527a0dd453c4b76259389ec8e8e6888c5e2a5ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 5 Apr 2022 11:00:06 +0200 Subject: [PATCH 1/5] Cleanup rest storage resources on shutdown --- cmd/kube-apiserver/app/server.go | 4 +- cmd/kube-apiserver/app/testing/testserver.go | 2 +- .../generic/registry/storage_factory.go | 7 ++- .../pkg/registry/generic/registry/store.go | 23 +++++++-- .../apiserver/pkg/server/genericapiserver.go | 48 ++++++++++++++++++- test/integration/etcd/server.go | 2 +- 6 files changed, 73 insertions(+), 13 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 0887d9124ba..0815511707d 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -159,7 +159,7 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) - server, err := CreateServerChain(completeOptions, stopCh) + server, err := CreateServerChain(completeOptions) if err != nil { return err } @@ -173,7 +173,7 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro } // CreateServerChain creates the apiservers connected via delegation. -func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) { +func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) { kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions) if err != nil { return nil, err diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 1d32d5696c2..156d2bdcb8b 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -213,7 +213,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) - server, err := app.CreateServerChain(completedOptions, stopCh) + server, err := app.CreateServerChain(completedOptions) if err != nil { return result, fmt.Errorf("failed to create server chain: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 60ad5a9b6a9..075170f14e4 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -70,9 +70,12 @@ func StorageWithCacher() generic.StorageDecorator { if err != nil { return nil, func() {}, err } + var once sync.Once destroyFunc := func() { - cacher.Stop() - d() + once.Do(func() { + cacher.Stop() + d() + }) } // TODO : Remove RegisterStorageCleanup below when PR diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 2d9a7599e88..cf3863408ad 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -217,7 +217,10 @@ type Store struct { // If the StorageVersioner is nil, apiserver will leave the // storageVersionHash as empty in the discovery document. StorageVersioner runtime.GroupVersioner - // Called to cleanup clients used by the underlying Storage; optional. + + // DestroyFunc cleans up clients used by the underlying Storage; optional. + // If set, DestroyFunc has to be implemented in thread-safe way and + // be prepared for being called more than once. DestroyFunc func() } @@ -279,6 +282,13 @@ func (e *Store) New() runtime.Object { return e.NewFunc() } +// Destroy cleans up its resources on shutdown. +func (e *Store) Destroy() { + if e.DestroyFunc != nil { + e.DestroyFunc() + } +} + // NewList implements rest.Lister. func (e *Store) NewList() runtime.Object { return e.NewListFunc() @@ -1433,11 +1443,14 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { if opts.CountMetricPollPeriod > 0 { stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker) previousDestroy := e.DestroyFunc + var once sync.Once e.DestroyFunc = func() { - stopFunc() - if previousDestroy != nil { - previousDestroy() - } + once.Do(func() { + stopFunc() + if previousDestroy != nil { + previousDestroy() + } + }) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index b8435792856..016278c02cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -89,6 +89,16 @@ type APIGroupInfo struct { StaticOpenAPISpec *spec.Swagger } +func (a *APIGroupInfo) destroyStorage() { + for _, stores := range a.VersionedResourcesStorageMap { + for _, store := range stores { + // TODO(wojtek-t): Uncomment once all storage support it. + klog.Errorf("Destroying storage: %v", store) + // store.Destroy() + } + } +} + // GenericAPIServer contains state for a Kubernetes cluster api server. type GenericAPIServer struct { // discoveryAddresses is used to build cluster IPs for discovery. @@ -222,6 +232,9 @@ type GenericAPIServer struct { // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver. lifecycleSignals lifecycleSignals + // destroyFns contains a list of functions that should be called on shutdown to clean up resources. + destroyFns []func() + // muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered. // it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. // it is exposed for easier composition of the individual servers. @@ -264,6 +277,11 @@ type DelegationTarget interface { // MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed. MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} + + // Destroy cleans up its resources on shutdown. + // Destroy has to be implemented in thread-safe way and be prepared + // for being called more than once. + Destroy() } func (s *GenericAPIServer) UnprotectedHandler() http.Handler { @@ -301,6 +319,18 @@ func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan st return s.muxAndDiscoveryCompleteSignals } +// Destroy cleans up all its and its delegation target resources on shutdown. +// It starts with destroying its own resources and later proceeds with +// its delegation target. +func (s *GenericAPIServer) Destroy() { + for _, destroyFn := range s.destroyFns { + destroyFn() + } + if s.delegationTarget != nil { + s.delegationTarget.Destroy() + } +} + type emptyDelegate struct { // handler is called at the end of the delegation chain // when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler. @@ -340,6 +370,8 @@ func (s emptyDelegate) PrepareRun() preparedGenericAPIServer { func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} { return map[string]<-chan struct{}{} } +func (s emptyDelegate) Destroy() { +} // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked. type preparedGenericAPIServer struct { @@ -395,6 +427,9 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated + // Clean up resources on shutdown. + defer s.Destroy() + // spawn a new goroutine for closing the MuxAndDiscoveryComplete signal // registration happens during construction of the generic api server // the last server in the chain aggregates signals from the previous instances @@ -584,6 +619,8 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A resourceInfos = append(resourceInfos, r...) } + s.destroyFns = append(s.destroyFns, apiGroupInfo.destroyStorage) + if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { // API installation happens before we start listening on the handlers, @@ -595,6 +632,9 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A return nil } +// InstallLegacyAPIGroup exposes the given legacy api group in the API. +// The passed into this function shouldn't be used elsewhere as the +// underlying storage will be destroyed on this servers shutdown. func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error { if !s.legacyAPIGroupPrefixes.Has(apiPrefix) { return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List()) @@ -616,7 +656,9 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo return nil } -// Exposes given api groups in the API. +// InstallAPIGroups exposes given api groups in the API. +// The passed into this function shouldn't be used elsewhere as the +// underlying storage will be destroyed on this servers shutdown. func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error { for _, apiGroupInfo := range apiGroupInfos { // Do not register empty group or empty version. Doing so claims /apis/ for the wrong entity to be returned. @@ -669,7 +711,9 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro return nil } -// Exposes the given api group in the API. +// InstallAPIGroup exposes the given api group in the API. +// The passed into this function shouldn't be used elsewhere as the +// underlying storage will be destroyed on this servers shutdown. func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error { return s.InstallAPIGroups(apiGroupInfo) } diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index b7887539423..9d7514cd464 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -136,7 +136,7 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu stopCh := make(chan struct{}) - kubeAPIServer, err := app.CreateServerChain(completedOptions, stopCh) + kubeAPIServer, err := app.CreateServerChain(completedOptions) if err != nil { t.Fatal(err) } From 80060a502c3f86f00800fbeba7684a85f1ce5e17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 5 Apr 2022 12:26:22 +0200 Subject: [PATCH 2/5] Implement Destroy() method for all registries --- .../storageversion/storage/storage.go | 6 +++++ .../apps/daemonset/storage/storage.go | 6 +++++ .../apps/deployment/storage/storage.go | 18 ++++++++++++++ .../apps/replicaset/storage/storage.go | 12 ++++++++++ .../apps/statefulset/storage/storage.go | 12 ++++++++++ .../authentication/tokenreview/storage.go | 6 +++++ .../localsubjectaccessreview/rest.go | 6 +++++ .../selfsubjectaccessreview/rest.go | 6 +++++ .../selfsubjectrulesreview/rest.go | 6 +++++ .../authorization/subjectaccessreview/rest.go | 6 +++++ .../storage/storage.go | 6 +++++ pkg/registry/batch/cronjob/storage/storage.go | 6 +++++ pkg/registry/batch/job/storage/storage.go | 6 +++++ .../certificates/storage/storage.go | 12 ++++++++++ pkg/registry/core/componentstatus/rest.go | 6 +++++ .../core/namespace/storage/storage.go | 17 +++++++++++++ pkg/registry/core/node/rest/proxy.go | 6 +++++ pkg/registry/core/node/storage/storage.go | 6 +++++ .../core/persistentvolume/storage/storage.go | 6 +++++ .../persistentvolumeclaim/storage/storage.go | 6 +++++ pkg/registry/core/pod/rest/log.go | 6 +++++ pkg/registry/core/pod/rest/subresources.go | 24 +++++++++++++++++++ pkg/registry/core/pod/storage/eviction.go | 6 +++++ pkg/registry/core/pod/storage/storage.go | 24 +++++++++++++++++++ .../replicationcontroller/storage/storage.go | 12 ++++++++++ .../core/resourcequota/storage/storage.go | 6 +++++ pkg/registry/core/service/proxy.go | 6 +++++ pkg/registry/core/service/storage/storage.go | 6 +++++ .../core/serviceaccount/storage/token.go | 6 +++++ .../flowcontrol/flowschema/storage/storage.go | 6 +++++ .../storage/storage.go | 6 +++++ .../networking/ingress/storage/storage.go | 6 +++++ .../networkpolicy/storage/storage.go | 6 +++++ .../poddisruptionbudget/storage/storage.go | 6 +++++ .../rbac/clusterrole/policybased/storage.go | 5 ++++ .../clusterrolebinding/policybased/storage.go | 5 ++++ pkg/registry/rbac/role/policybased/storage.go | 5 ++++ .../rbac/rolebinding/policybased/storage.go | 5 ++++ .../volumeattachment/storage/storage.go | 6 +++++ .../registry/customresourcedefinition/etcd.go | 6 +++++ .../apiserver/pkg/endpoints/apiserver_test.go | 15 ++++++++++++ .../apiserver/pkg/registry/rest/rest.go | 5 ++++ .../pkg/server/deleted_kinds_test.go | 3 +++ .../apiserver/pkg/server/genericapiserver.go | 4 +--- .../pkg/server/genericapiserver_test.go | 6 +++++ .../pkg/registry/apiservice/etcd/etcd.go | 6 +++++ .../sample-apiserver/pkg/registry/registry.go | 3 +-- 47 files changed, 356 insertions(+), 5 deletions(-) diff --git a/pkg/registry/apiserverinternal/storageversion/storage/storage.go b/pkg/registry/apiserverinternal/storageversion/storage/storage.go index af97b70af18..7bb55c31ed8 100644 --- a/pkg/registry/apiserverinternal/storageversion/storage/storage.go +++ b/pkg/registry/apiserverinternal/storageversion/storage/storage.go @@ -73,6 +73,12 @@ func (r *StatusREST) New() runtime.Object { return &apiserverinternal.StorageVersion{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/apps/daemonset/storage/storage.go b/pkg/registry/apps/daemonset/storage/storage.go index 666b917f034..84df2450283 100644 --- a/pkg/registry/apps/daemonset/storage/storage.go +++ b/pkg/registry/apps/daemonset/storage/storage.go @@ -88,6 +88,12 @@ func (r *StatusREST) New() runtime.Object { return &apps.DaemonSet{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/apps/deployment/storage/storage.go b/pkg/registry/apps/deployment/storage/storage.go index a3155623946..8b05dd0b663 100644 --- a/pkg/registry/apps/deployment/storage/storage.go +++ b/pkg/registry/apps/deployment/storage/storage.go @@ -140,6 +140,12 @@ func (r *StatusREST) New() runtime.Object { return &apps.Deployment{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) @@ -185,6 +191,12 @@ func (r *RollbackREST) New() runtime.Object { return &apps.DeploymentRollback{} } +// Destroy cleans up resources on shutdown. +func (r *RollbackREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + var _ = rest.NamedCreater(&RollbackREST{}) // Create runs rollback for deployment @@ -283,6 +295,12 @@ func (r *ScaleREST) New() runtime.Object { return &autoscaling.Scale{} } +// Destroy cleans up resources on shutdown. +func (r *ScaleREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves object from Scale storage. func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { obj, err := r.store.Get(ctx, name, options) diff --git a/pkg/registry/apps/replicaset/storage/storage.go b/pkg/registry/apps/replicaset/storage/storage.go index ed0e37026e3..0c467d37327 100644 --- a/pkg/registry/apps/replicaset/storage/storage.go +++ b/pkg/registry/apps/replicaset/storage/storage.go @@ -136,6 +136,12 @@ func (r *StatusREST) New() runtime.Object { return &apps.ReplicaSet{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) @@ -185,6 +191,12 @@ func (r *ScaleREST) New() runtime.Object { return &autoscaling.Scale{} } +// Destroy cleans up resources on shutdown. +func (r *ScaleREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves object from Scale storage. func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { obj, err := r.store.Get(ctx, name, options) diff --git a/pkg/registry/apps/statefulset/storage/storage.go b/pkg/registry/apps/statefulset/storage/storage.go index ac399ad61cf..47558534cdd 100644 --- a/pkg/registry/apps/statefulset/storage/storage.go +++ b/pkg/registry/apps/statefulset/storage/storage.go @@ -124,6 +124,12 @@ func (r *StatusREST) New() runtime.Object { return &apps.StatefulSet{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) @@ -179,6 +185,12 @@ func (r *ScaleREST) New() runtime.Object { return &autoscaling.Scale{} } +// Destroy cleans up resources on shutdown. +func (r *ScaleREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves object from Scale storage. func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { obj, err := r.store.Get(ctx, name, options) diff --git a/pkg/registry/authentication/tokenreview/storage.go b/pkg/registry/authentication/tokenreview/storage.go index c9d24b40b53..46f3e00f1ac 100644 --- a/pkg/registry/authentication/tokenreview/storage.go +++ b/pkg/registry/authentication/tokenreview/storage.go @@ -54,6 +54,12 @@ func (r *REST) New() runtime.Object { return &authentication.TokenReview{} } +// Destroy cleans up resources on shutdown. +func (r *REST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { tokenReview, ok := obj.(*authentication.TokenReview) if !ok { diff --git a/pkg/registry/authorization/localsubjectaccessreview/rest.go b/pkg/registry/authorization/localsubjectaccessreview/rest.go index 8db3c1836f2..e28b03dc368 100644 --- a/pkg/registry/authorization/localsubjectaccessreview/rest.go +++ b/pkg/registry/authorization/localsubjectaccessreview/rest.go @@ -47,6 +47,12 @@ func (r *REST) New() runtime.Object { return &authorizationapi.LocalSubjectAccessReview{} } +// Destroy cleans up resources on shutdown. +func (r *REST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { localSubjectAccessReview, ok := obj.(*authorizationapi.LocalSubjectAccessReview) if !ok { diff --git a/pkg/registry/authorization/selfsubjectaccessreview/rest.go b/pkg/registry/authorization/selfsubjectaccessreview/rest.go index 5fc40fde360..f2c3a9a0b3c 100644 --- a/pkg/registry/authorization/selfsubjectaccessreview/rest.go +++ b/pkg/registry/authorization/selfsubjectaccessreview/rest.go @@ -47,6 +47,12 @@ func (r *REST) New() runtime.Object { return &authorizationapi.SelfSubjectAccessReview{} } +// Destroy cleans up resources on shutdown. +func (r *REST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { selfSAR, ok := obj.(*authorizationapi.SelfSubjectAccessReview) if !ok { diff --git a/pkg/registry/authorization/selfsubjectrulesreview/rest.go b/pkg/registry/authorization/selfsubjectrulesreview/rest.go index 82dc7a521b1..f1c3a9f86c7 100644 --- a/pkg/registry/authorization/selfsubjectrulesreview/rest.go +++ b/pkg/registry/authorization/selfsubjectrulesreview/rest.go @@ -49,6 +49,12 @@ func (r *REST) New() runtime.Object { return &authorizationapi.SelfSubjectRulesReview{} } +// Destroy cleans up resources on shutdown. +func (r *REST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + // Create attempts to get self subject rules in specific namespace. func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { selfSRR, ok := obj.(*authorizationapi.SelfSubjectRulesReview) diff --git a/pkg/registry/authorization/subjectaccessreview/rest.go b/pkg/registry/authorization/subjectaccessreview/rest.go index ae06324e8e6..4bb2072a5f7 100644 --- a/pkg/registry/authorization/subjectaccessreview/rest.go +++ b/pkg/registry/authorization/subjectaccessreview/rest.go @@ -46,6 +46,12 @@ func (r *REST) New() runtime.Object { return &authorizationapi.SubjectAccessReview{} } +// Destroy cleans up resources on shutdown. +func (r *REST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { subjectAccessReview, ok := obj.(*authorizationapi.SubjectAccessReview) if !ok { diff --git a/pkg/registry/autoscaling/horizontalpodautoscaler/storage/storage.go b/pkg/registry/autoscaling/horizontalpodautoscaler/storage/storage.go index 319d2e0ec6c..03171ed1ff0 100644 --- a/pkg/registry/autoscaling/horizontalpodautoscaler/storage/storage.go +++ b/pkg/registry/autoscaling/horizontalpodautoscaler/storage/storage.go @@ -88,6 +88,12 @@ func (r *StatusREST) New() runtime.Object { return &autoscaling.HorizontalPodAutoscaler{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/batch/cronjob/storage/storage.go b/pkg/registry/batch/cronjob/storage/storage.go index 5e36416332a..637dc8f3754 100644 --- a/pkg/registry/batch/cronjob/storage/storage.go +++ b/pkg/registry/batch/cronjob/storage/storage.go @@ -86,6 +86,12 @@ func (r *StatusREST) New() runtime.Object { return &batch.CronJob{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/batch/job/storage/storage.go b/pkg/registry/batch/job/storage/storage.go index 4b9ca99ea50..f6cfa679536 100644 --- a/pkg/registry/batch/job/storage/storage.go +++ b/pkg/registry/batch/job/storage/storage.go @@ -129,6 +129,12 @@ func (r *StatusREST) New() runtime.Object { return &batch.Job{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/certificates/certificates/storage/storage.go b/pkg/registry/certificates/certificates/storage/storage.go index 8e2da90b16d..8de7828efde 100644 --- a/pkg/registry/certificates/certificates/storage/storage.go +++ b/pkg/registry/certificates/certificates/storage/storage.go @@ -89,6 +89,12 @@ func (r *StatusREST) New() runtime.Object { return &certificates.CertificateSigningRequest{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) @@ -122,6 +128,12 @@ func (r *ApprovalREST) New() runtime.Object { return &certificates.CertificateSigningRequest{} } +// Destroy cleans up resources on shutdown. +func (r *ApprovalREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *ApprovalREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/core/componentstatus/rest.go b/pkg/registry/core/componentstatus/rest.go index 0cb8cad23a2..22fe41a0c84 100644 --- a/pkg/registry/core/componentstatus/rest.go +++ b/pkg/registry/core/componentstatus/rest.go @@ -58,6 +58,12 @@ func (rs *REST) New() runtime.Object { return &api.ComponentStatus{} } +// Destroy cleans up resources on shutdown. +func (r *REST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + func (rs *REST) NewList() runtime.Object { return &api.ComponentStatusList{} } diff --git a/pkg/registry/core/namespace/storage/storage.go b/pkg/registry/core/namespace/storage/storage.go index b910e30a20c..37b130092ca 100644 --- a/pkg/registry/core/namespace/storage/storage.go +++ b/pkg/registry/core/namespace/storage/storage.go @@ -98,6 +98,11 @@ func (r *REST) New() runtime.Object { return r.store.New() } +// Destroy cleans up resources on shutdown. +func (r *REST) Destroy() { + r.store.Destroy() +} + func (r *REST) NewList() runtime.Object { return r.store.NewList() } @@ -300,6 +305,12 @@ func (r *StatusREST) New() runtime.Object { return r.store.New() } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) @@ -325,6 +336,12 @@ func (r *FinalizeREST) New() runtime.Object { return r.store.New() } +// Destroy cleans up resources on shutdown. +func (r *FinalizeREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Update alters the status finalizers subset of an object. func (r *FinalizeREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { // We are explicitly setting forceAllowCreate to false in the call to the underlying storage because diff --git a/pkg/registry/core/node/rest/proxy.go b/pkg/registry/core/node/rest/proxy.go index 40b069978d1..40e6431b40b 100644 --- a/pkg/registry/core/node/rest/proxy.go +++ b/pkg/registry/core/node/rest/proxy.go @@ -50,6 +50,12 @@ func (r *ProxyREST) New() runtime.Object { return &api.NodeProxyOptions{} } +// Destroy cleans up resources on shutdown. +func (r *ProxyREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // ConnectMethods returns the list of HTTP methods that can be proxied func (r *ProxyREST) ConnectMethods() []string { return proxyMethods diff --git a/pkg/registry/core/node/storage/storage.go b/pkg/registry/core/node/storage/storage.go index 54057bfb886..56e806770f4 100644 --- a/pkg/registry/core/node/storage/storage.go +++ b/pkg/registry/core/node/storage/storage.go @@ -66,6 +66,12 @@ func (r *StatusREST) New() runtime.Object { return &api.Node{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/core/persistentvolume/storage/storage.go b/pkg/registry/core/persistentvolume/storage/storage.go index 03eaf682fe6..df65eabc879 100644 --- a/pkg/registry/core/persistentvolume/storage/storage.go +++ b/pkg/registry/core/persistentvolume/storage/storage.go @@ -83,6 +83,12 @@ func (r *StatusREST) New() runtime.Object { return &api.PersistentVolume{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/core/persistentvolumeclaim/storage/storage.go b/pkg/registry/core/persistentvolumeclaim/storage/storage.go index 6650606ac57..d472bbd8ef3 100644 --- a/pkg/registry/core/persistentvolumeclaim/storage/storage.go +++ b/pkg/registry/core/persistentvolumeclaim/storage/storage.go @@ -127,6 +127,12 @@ func (r *StatusREST) New() runtime.Object { return &api.PersistentVolumeClaim{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/core/pod/rest/log.go b/pkg/registry/core/pod/rest/log.go index 9d5f320978e..e6b02069b28 100644 --- a/pkg/registry/core/pod/rest/log.go +++ b/pkg/registry/core/pod/rest/log.go @@ -51,6 +51,12 @@ func (r *LogREST) New() runtime.Object { return &api.Pod{} } +// Destroy cleans up resources on shutdown. +func (r *LogREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // ProducesMIMETypes returns a list of the MIME types the specified HTTP verb (GET, POST, DELETE, // PATCH) can respond with. func (r *LogREST) ProducesMIMETypes(verb string) []string { diff --git a/pkg/registry/core/pod/rest/subresources.go b/pkg/registry/core/pod/rest/subresources.go index cb79deca431..76e0cdd4ffb 100644 --- a/pkg/registry/core/pod/rest/subresources.go +++ b/pkg/registry/core/pod/rest/subresources.go @@ -49,6 +49,12 @@ func (r *ProxyREST) New() runtime.Object { return &api.PodProxyOptions{} } +// Destroy cleans up resources on shutdown. +func (r *ProxyREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // ConnectMethods returns the list of HTTP methods that can be proxied func (r *ProxyREST) ConnectMethods() []string { return proxyMethods @@ -91,6 +97,12 @@ func (r *AttachREST) New() runtime.Object { return &api.PodAttachOptions{} } +// Destroy cleans up resources on shutdown. +func (r *AttachREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Connect returns a handler for the pod exec proxy func (r *AttachREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { attachOpts, ok := opts.(*api.PodAttachOptions) @@ -128,6 +140,12 @@ func (r *ExecREST) New() runtime.Object { return &api.PodExecOptions{} } +// Destroy cleans up resources on shutdown. +func (r *ExecREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Connect returns a handler for the pod exec proxy func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { execOpts, ok := opts.(*api.PodExecOptions) @@ -165,6 +183,12 @@ func (r *PortForwardREST) New() runtime.Object { return &api.PodPortForwardOptions{} } +// Destroy cleans up resources on shutdown. +func (r *PortForwardREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // NewConnectOptions returns the versioned object that represents the // portforward parameters func (r *PortForwardREST) NewConnectOptions() (runtime.Object, bool, string) { diff --git a/pkg/registry/core/pod/storage/eviction.go b/pkg/registry/core/pod/storage/eviction.go index 39f2e041673..0fb7ac03eeb 100644 --- a/pkg/registry/core/pod/storage/eviction.go +++ b/pkg/registry/core/pod/storage/eviction.go @@ -95,6 +95,12 @@ func (r *EvictionREST) New() runtime.Object { return &policy.Eviction{} } +// Destroy cleans up resources on shutdown. +func (r *EvictionREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Propagate dry-run takes the dry-run option from the request and pushes it into the eviction object. // It returns an error if they have non-matching dry-run options. func propagateDryRun(eviction *policy.Eviction, options *metav1.CreateOptions) (*metav1.DeleteOptions, error) { diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 7787dbdc0e4..c426b8727ed 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -157,6 +157,12 @@ func (r *BindingREST) New() runtime.Object { return &api.Binding{} } +// Destroy cleans up resources on shutdown. +func (r *BindingREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + var _ = rest.NamedCreater(&BindingREST{}) // Create ensures a pod is bound to a specific host. @@ -263,6 +269,12 @@ func (r *LegacyBindingREST) New() runtime.Object { return r.bindingRest.New() } +// Destroy cleans up resources on shutdown. +func (r *LegacyBindingREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Create ensures a pod is bound to a specific host. func (r *LegacyBindingREST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) { metadata, err := meta.Accessor(obj) @@ -282,6 +294,12 @@ func (r *StatusREST) New() runtime.Object { return &api.Pod{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) @@ -324,6 +342,12 @@ func (r *EphemeralContainersREST) New() runtime.Object { return &api.Pod{} } +// Destroy cleans up resources on shutdown. +func (r *EphemeralContainersREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Update alters the EphemeralContainers field in PodSpec func (r *EphemeralContainersREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) { diff --git a/pkg/registry/core/replicationcontroller/storage/storage.go b/pkg/registry/core/replicationcontroller/storage/storage.go index 1325e719950..39a59f4a73e 100644 --- a/pkg/registry/core/replicationcontroller/storage/storage.go +++ b/pkg/registry/core/replicationcontroller/storage/storage.go @@ -131,6 +131,12 @@ func (r *StatusREST) New() runtime.Object { return &api.ReplicationController{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) @@ -174,6 +180,12 @@ func (r *ScaleREST) New() runtime.Object { return &autoscaling.Scale{} } +// Destroy cleans up resources on shutdown. +func (r *ScaleREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { obj, err := r.store.Get(ctx, name, options) if err != nil { diff --git a/pkg/registry/core/resourcequota/storage/storage.go b/pkg/registry/core/resourcequota/storage/storage.go index d302169e90e..d438ec792e2 100644 --- a/pkg/registry/core/resourcequota/storage/storage.go +++ b/pkg/registry/core/resourcequota/storage/storage.go @@ -82,6 +82,12 @@ func (r *StatusREST) New() runtime.Object { return &api.ResourceQuota{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/core/service/proxy.go b/pkg/registry/core/service/proxy.go index ced40db39dd..f0eb9d0788a 100644 --- a/pkg/registry/core/service/proxy.go +++ b/pkg/registry/core/service/proxy.go @@ -46,6 +46,12 @@ func (r *ProxyREST) New() runtime.Object { return &api.ServiceProxyOptions{} } +// Destroy cleans up resources on shutdown. +func (r *ProxyREST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + // ConnectMethods returns the list of HTTP methods that can be proxied func (r *ProxyREST) ConnectMethods() []string { return proxyMethods diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index eff83977fef..7565101c4cb 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -166,6 +166,12 @@ func (r *StatusREST) New() runtime.Object { return &api.Service{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/core/serviceaccount/storage/token.go b/pkg/registry/core/serviceaccount/storage/token.go index 8ba30ed8446..6f840e53154 100644 --- a/pkg/registry/core/serviceaccount/storage/token.go +++ b/pkg/registry/core/serviceaccount/storage/token.go @@ -43,6 +43,12 @@ func (r *TokenREST) New() runtime.Object { return &authenticationapi.TokenRequest{} } +// Destroy cleans up resources on shutdown. +func (r *TokenREST) Destroy() { + // Given no underlying store, we don't destroy anything + // here explicitly. +} + type TokenREST struct { svcaccts getter pods getter diff --git a/pkg/registry/flowcontrol/flowschema/storage/storage.go b/pkg/registry/flowcontrol/flowschema/storage/storage.go index 089b37fc46c..d1d287e735f 100644 --- a/pkg/registry/flowcontrol/flowschema/storage/storage.go +++ b/pkg/registry/flowcontrol/flowschema/storage/storage.go @@ -81,6 +81,12 @@ func (r *StatusREST) New() runtime.Object { return &flowcontrol.FlowSchema{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/flowcontrol/prioritylevelconfiguration/storage/storage.go b/pkg/registry/flowcontrol/prioritylevelconfiguration/storage/storage.go index 28d1f148963..defbd608b66 100644 --- a/pkg/registry/flowcontrol/prioritylevelconfiguration/storage/storage.go +++ b/pkg/registry/flowcontrol/prioritylevelconfiguration/storage/storage.go @@ -81,6 +81,12 @@ func (r *StatusREST) New() runtime.Object { return &flowcontrol.PriorityLevelConfiguration{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/networking/ingress/storage/storage.go b/pkg/registry/networking/ingress/storage/storage.go index 0f084232f15..71ab37c455e 100644 --- a/pkg/registry/networking/ingress/storage/storage.go +++ b/pkg/registry/networking/ingress/storage/storage.go @@ -80,6 +80,12 @@ func (r *StatusREST) New() runtime.Object { return &networking.Ingress{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/networking/networkpolicy/storage/storage.go b/pkg/registry/networking/networkpolicy/storage/storage.go index 574c0815116..0101077b662 100644 --- a/pkg/registry/networking/networkpolicy/storage/storage.go +++ b/pkg/registry/networking/networkpolicy/storage/storage.go @@ -82,6 +82,12 @@ func (r *StatusREST) New() runtime.Object { return &networkingapi.NetworkPolicy{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/policy/poddisruptionbudget/storage/storage.go b/pkg/registry/policy/poddisruptionbudget/storage/storage.go index 987c200d465..4af2ea2429f 100644 --- a/pkg/registry/policy/poddisruptionbudget/storage/storage.go +++ b/pkg/registry/policy/poddisruptionbudget/storage/storage.go @@ -77,6 +77,12 @@ func (r *StatusREST) New() runtime.Object { return &policyapi.PodDisruptionBudget{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/pkg/registry/rbac/clusterrole/policybased/storage.go b/pkg/registry/rbac/clusterrole/policybased/storage.go index e3a9dd6ff2b..13e40881f4f 100644 --- a/pkg/registry/rbac/clusterrole/policybased/storage.go +++ b/pkg/registry/rbac/clusterrole/policybased/storage.go @@ -46,6 +46,11 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe return &Storage{s, authorizer, ruleResolver} } +// Destroy cleans up resources on shutdown. +func (r *Storage) Destroy() { + // FIXME: Do we have to anything to destroy here? +} + func (r *Storage) NamespaceScoped() bool { return false } diff --git a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go index 6a4b2c8f2e1..8a48fe5a558 100644 --- a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go +++ b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go @@ -47,6 +47,11 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe return &Storage{s, authorizer, ruleResolver} } +// Destroy cleans up resources on shutdown. +func (r *Storage) Destroy() { + // FIXME: Do we have to anything to destroy here? +} + func (r *Storage) NamespaceScoped() bool { return false } diff --git a/pkg/registry/rbac/role/policybased/storage.go b/pkg/registry/rbac/role/policybased/storage.go index 34281be9754..878627f3a28 100644 --- a/pkg/registry/rbac/role/policybased/storage.go +++ b/pkg/registry/rbac/role/policybased/storage.go @@ -45,6 +45,11 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe return &Storage{s, authorizer, ruleResolver} } +// Destroy cleans up resources on shutdown. +func (r *Storage) Destroy() { + // FIXME: Do we have to anything to destroy here? +} + func (r *Storage) NamespaceScoped() bool { return true } diff --git a/pkg/registry/rbac/rolebinding/policybased/storage.go b/pkg/registry/rbac/rolebinding/policybased/storage.go index d73a1e1f93a..ba13bc490fb 100644 --- a/pkg/registry/rbac/rolebinding/policybased/storage.go +++ b/pkg/registry/rbac/rolebinding/policybased/storage.go @@ -48,6 +48,11 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe return &Storage{s, authorizer, ruleResolver} } +// Destroy cleans up resources on shutdown. +func (r *Storage) Destroy() { + // FIXME: Do we have to anything to destroy here? +} + func (r *Storage) NamespaceScoped() bool { return true } diff --git a/pkg/registry/storage/volumeattachment/storage/storage.go b/pkg/registry/storage/volumeattachment/storage/storage.go index cb1d0b25b53..91c1c2a1142 100644 --- a/pkg/registry/storage/volumeattachment/storage/storage.go +++ b/pkg/registry/storage/volumeattachment/storage/storage.go @@ -85,6 +85,12 @@ func (r *StatusREST) New() runtime.Object { return &storageapi.VolumeAttachment{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go index 2eb7a246399..2523076edf9 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go @@ -195,6 +195,12 @@ func (r *StatusREST) New() runtime.Object { return &apiextensions.CustomResourceDefinition{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 6cb17dfdb03..e9659bb4783 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -462,6 +462,9 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object { return &genericapitesting.SimpleList{} } +func (storage *SimpleRESTStorage) Destroy() { +} + func (storage *SimpleRESTStorage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { storage.checkContext(ctx) storage.created = obj.(*genericapitesting.Simple) @@ -547,6 +550,9 @@ func (s *ConnecterRESTStorage) New() runtime.Object { return &genericapitesting.Simple{} } +func (s *ConnecterRESTStorage) Destroy() { +} + func (s *ConnecterRESTStorage) Connect(ctx context.Context, id string, options runtime.Object, responder rest.Responder) (http.Handler, error) { s.receivedConnectOptions = options s.receivedID = id @@ -668,6 +674,9 @@ func (storage *SimpleTypedStorage) New() runtime.Object { return storage.baseType } +func (storage *SimpleTypedStorage) Destroy() { +} + func (storage *SimpleTypedStorage) Get(ctx context.Context, id string, options *metav1.GetOptions) (runtime.Object, error) { storage.checkContext(ctx) return storage.item.DeepCopyObject(), storage.errors["get"] @@ -810,6 +819,9 @@ func (UnimplementedRESTStorage) New() runtime.Object { return &genericapitesting.Simple{} } +func (UnimplementedRESTStorage) Destroy() { +} + // TestUnimplementedRESTStorage ensures that if a rest.Storage does not implement a given // method, that it is literally not registered with the server. In the past, // we registered everything, and returned method not supported if it didn't support @@ -4322,6 +4334,9 @@ func (storage *SimpleXGSubresourceRESTStorage) New() runtime.Object { return &genericapitesting.SimpleXGSubresource{} } +func (storage *SimpleXGSubresourceRESTStorage) Destroy() { +} + func (storage *SimpleXGSubresourceRESTStorage) Get(ctx context.Context, id string, options *metav1.GetOptions) (runtime.Object, error) { return storage.item.DeepCopyObject(), nil } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index a489095d6e0..0e9c5bf0352 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -56,6 +56,11 @@ type Storage interface { // New returns an empty object that can be used with Create and Update after request data has been put into it. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) New() runtime.Object + + // Destroy cleans up its resources on shutdown. + // Destroy has to be implemented in thread-safe way and be prepared + // for being called more than once. + Destroy() } // Scoper indicates what scope the resource is at. It must be specified. diff --git a/staging/src/k8s.io/apiserver/pkg/server/deleted_kinds_test.go b/staging/src/k8s.io/apiserver/pkg/server/deleted_kinds_test.go index 9d2f2d5f44a..12af7fadc3c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/deleted_kinds_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/deleted_kinds_test.go @@ -110,6 +110,9 @@ func (r removedInStorage) New() runtime.Object { return removedInObj{major: r.major, minor: r.minor} } +func (r removedInStorage) Destroy() { +} + type neverRemovedObj struct { } diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 016278c02cf..1deb78252d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -92,9 +92,7 @@ type APIGroupInfo struct { func (a *APIGroupInfo) destroyStorage() { for _, stores := range a.VersionedResourcesStorageMap { for _, store := range stores { - // TODO(wojtek-t): Uncomment once all storage support it. - klog.Errorf("Destroying storage: %v", store) - // store.Destroy() + store.Destroy() } } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index af452347c83..c6b07fc3fff 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -544,6 +544,9 @@ func (p *testGetterStorage) New() runtime.Object { } } +func (p *testGetterStorage) Destroy() { +} + func (p *testGetterStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return nil, nil } @@ -565,6 +568,9 @@ func (p *testNoVerbsStorage) New() runtime.Object { } } +func (p *testNoVerbsStorage) Destroy() { +} + func fakeVersion() version.Info { return version.Info{ Major: "42", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go index 662f1f058f3..d20573ed368 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go @@ -147,6 +147,12 @@ func (r *StatusREST) New() runtime.Object { return &apiregistration.APIService{} } +// Destroy cleans up resources on shutdown. +func (r *StatusREST) Destroy() { + // Given that underlying store is shared with REST, + // we don't destroy it here explicitly. +} + // Get retrieves the object from the storage. It is required to support Patch. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return r.store.Get(ctx, name, options) diff --git a/staging/src/k8s.io/sample-apiserver/pkg/registry/registry.go b/staging/src/k8s.io/sample-apiserver/pkg/registry/registry.go index 32da2eda903..f5107aeed29 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/registry/registry.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/registry/registry.go @@ -20,7 +20,6 @@ import ( "fmt" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" - "k8s.io/apiserver/pkg/registry/rest" ) // REST implements a RESTStorage for API services against etcd @@ -31,7 +30,7 @@ type REST struct { // RESTInPeace is just a simple function that panics on error. // Otherwise returns the given storage object. It is meant to be // a wrapper for wardle registries. -func RESTInPeace(storage rest.StandardStorage, err error) rest.StandardStorage { +func RESTInPeace(storage *REST, err error) *REST { if err != nil { err = fmt.Errorf("unable to create REST storage for a resource due to %v, will die", err) panic(err) From f62c14a9cc24fa7fc5711b0aa74c7534c8d6793b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 19 Apr 2022 11:18:39 +0200 Subject: [PATCH 3/5] Extend StandardStorage with Destroy to implement rbac storage destroy --- pkg/registry/core/pod/storage/eviction_test.go | 3 +++ pkg/registry/rbac/clusterrole/policybased/storage.go | 2 +- pkg/registry/rbac/clusterrolebinding/policybased/storage.go | 2 +- pkg/registry/rbac/role/policybased/storage.go | 2 +- pkg/registry/rbac/rolebinding/policybased/storage.go | 2 +- staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go | 5 +++++ 6 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/registry/core/pod/storage/eviction_test.go b/pkg/registry/core/pod/storage/eviction_test.go index 426029cd4d7..92cc0a3c082 100644 --- a/pkg/registry/core/pod/storage/eviction_test.go +++ b/pkg/registry/core/pod/storage/eviction_test.go @@ -689,3 +689,6 @@ func (ms *mockStore) NewList() runtime.Object { func (ms *mockStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { return nil, nil } + +func (ms *mockStore) Destroy() { +} diff --git a/pkg/registry/rbac/clusterrole/policybased/storage.go b/pkg/registry/rbac/clusterrole/policybased/storage.go index 13e40881f4f..047630732d9 100644 --- a/pkg/registry/rbac/clusterrole/policybased/storage.go +++ b/pkg/registry/rbac/clusterrole/policybased/storage.go @@ -48,7 +48,7 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe // Destroy cleans up resources on shutdown. func (r *Storage) Destroy() { - // FIXME: Do we have to anything to destroy here? + r.StandardStorage.Destroy() } func (r *Storage) NamespaceScoped() bool { diff --git a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go index 8a48fe5a558..ad2de28e008 100644 --- a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go +++ b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go @@ -49,7 +49,7 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe // Destroy cleans up resources on shutdown. func (r *Storage) Destroy() { - // FIXME: Do we have to anything to destroy here? + r.StandardStorage.Destroy() } func (r *Storage) NamespaceScoped() bool { diff --git a/pkg/registry/rbac/role/policybased/storage.go b/pkg/registry/rbac/role/policybased/storage.go index 878627f3a28..c113e1d0f82 100644 --- a/pkg/registry/rbac/role/policybased/storage.go +++ b/pkg/registry/rbac/role/policybased/storage.go @@ -47,7 +47,7 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe // Destroy cleans up resources on shutdown. func (r *Storage) Destroy() { - // FIXME: Do we have to anything to destroy here? + r.StandardStorage.Destroy() } func (r *Storage) NamespaceScoped() bool { diff --git a/pkg/registry/rbac/rolebinding/policybased/storage.go b/pkg/registry/rbac/rolebinding/policybased/storage.go index ba13bc490fb..64883635516 100644 --- a/pkg/registry/rbac/rolebinding/policybased/storage.go +++ b/pkg/registry/rbac/rolebinding/policybased/storage.go @@ -50,7 +50,7 @@ func NewStorage(s rest.StandardStorage, authorizer authorizer.Authorizer, ruleRe // Destroy cleans up resources on shutdown. func (r *Storage) Destroy() { - // FIXME: Do we have to anything to destroy here? + r.StandardStorage.Destroy() } func (r *Storage) NamespaceScoped() bool { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index 0e9c5bf0352..6330ea8f531 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -283,6 +283,11 @@ type StandardStorage interface { GracefulDeleter CollectionDeleter Watcher + + // Destroy cleans up its resources on shutdown. + // Destroy has to be implemented in thread-safe way and be prepared + // for being called more than once. + Destroy() } // Redirector know how to return a remote resource's location. From e95f8f2e42abacc286c660e22e56b51f7e243eb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 5 Apr 2022 13:05:35 +0200 Subject: [PATCH 4/5] Clean apiserver shutdown in integration tests --- test/integration/framework/controlplane_utils.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/framework/controlplane_utils.go b/test/integration/framework/controlplane_utils.go index fc02449a0fa..71527da407b 100644 --- a/test/integration/framework/controlplane_utils.go +++ b/test/integration/framework/controlplane_utils.go @@ -186,6 +186,7 @@ func startAPIServerOrDie(controlPlaneConfig *controlplane.Config, incomingServer m.GenericAPIServer.RunPreShutdownHooks() } close(stopCh) + m.GenericAPIServer.Destroy() s.Close() } From 73da6d15f94a1004d6be9296b60d43f24c713ce7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 5 Apr 2022 20:40:38 +0200 Subject: [PATCH 5/5] Fix TestPriorityLevelIsolation concurrency issue --- .../apiserver/flowcontrol/concurrency_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go index 7c5a11be9d4..d5f314af22b 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -22,6 +22,7 @@ import ( "io" "net/http/httptest" "strings" + "sync" "testing" "time" @@ -104,22 +105,28 @@ func TestPriorityLevelIsolation(t *testing.T) { } stopCh := make(chan struct{}) - defer close(stopCh) + wg := sync.WaitGroup{} + defer func() { + close(stopCh) + wg.Wait() + }() // "elephant" + wg.Add(concurrencyShares + queueLength) streamRequests(concurrencyShares+queueLength, func() { _, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) if err != nil { t.Error(err) } - }, stopCh) + }, &wg, stopCh) // "mouse" + wg.Add(3) streamRequests(3, func() { _, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) if err != nil { t.Error(err) } - }, stopCh) + }, &wg, stopCh) time.Sleep(time.Second * 10) // running in background for a while @@ -312,9 +319,10 @@ func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, usern }) } -func streamRequests(parallel int, request func(), stopCh <-chan struct{}) { +func streamRequests(parallel int, request func(), wg *sync.WaitGroup, stopCh <-chan struct{}) { for i := 0; i < parallel; i++ { go func() { + defer wg.Done() for { select { case <-stopCh: