Cleanup rest storage resources on shutdown

This commit is contained in:
Wojciech Tyczyński 2022-04-05 11:00:06 +02:00
parent 65178fec72
commit 0527a0dd45
6 changed files with 73 additions and 13 deletions

View File

@ -159,7 +159,7 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
server, err := CreateServerChain(completeOptions, stopCh)
server, err := CreateServerChain(completeOptions)
if err != nil {
return err
}
@ -173,7 +173,7 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro
}
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err

View File

@ -213,7 +213,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig)
t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
server, err := app.CreateServerChain(completedOptions, stopCh)
server, err := app.CreateServerChain(completedOptions)
if err != nil {
return result, fmt.Errorf("failed to create server chain: %v", err)
}

View File

@ -70,9 +70,12 @@ func StorageWithCacher() generic.StorageDecorator {
if err != nil {
return nil, func() {}, err
}
var once sync.Once
destroyFunc := func() {
cacher.Stop()
d()
once.Do(func() {
cacher.Stop()
d()
})
}
// TODO : Remove RegisterStorageCleanup below when PR

View File

@ -217,7 +217,10 @@ type Store struct {
// If the StorageVersioner is nil, apiserver will leave the
// storageVersionHash as empty in the discovery document.
StorageVersioner runtime.GroupVersioner
// Called to cleanup clients used by the underlying Storage; optional.
// DestroyFunc cleans up clients used by the underlying Storage; optional.
// If set, DestroyFunc has to be implemented in thread-safe way and
// be prepared for being called more than once.
DestroyFunc func()
}
@ -279,6 +282,13 @@ func (e *Store) New() runtime.Object {
return e.NewFunc()
}
// Destroy cleans up its resources on shutdown.
func (e *Store) Destroy() {
if e.DestroyFunc != nil {
e.DestroyFunc()
}
}
// NewList implements rest.Lister.
func (e *Store) NewList() runtime.Object {
return e.NewListFunc()
@ -1433,11 +1443,14 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
previousDestroy := e.DestroyFunc
var once sync.Once
e.DestroyFunc = func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
once.Do(func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
})
}
}
}

View File

@ -89,6 +89,16 @@ type APIGroupInfo struct {
StaticOpenAPISpec *spec.Swagger
}
func (a *APIGroupInfo) destroyStorage() {
for _, stores := range a.VersionedResourcesStorageMap {
for _, store := range stores {
// TODO(wojtek-t): Uncomment once all storage support it.
klog.Errorf("Destroying storage: %v", store)
// store.Destroy()
}
}
}
// GenericAPIServer contains state for a Kubernetes cluster api server.
type GenericAPIServer struct {
// discoveryAddresses is used to build cluster IPs for discovery.
@ -222,6 +232,9 @@ type GenericAPIServer struct {
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
lifecycleSignals lifecycleSignals
// destroyFns contains a list of functions that should be called on shutdown to clean up resources.
destroyFns []func()
// muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered.
// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
// it is exposed for easier composition of the individual servers.
@ -264,6 +277,11 @@ type DelegationTarget interface {
// MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{}
// Destroy cleans up its resources on shutdown.
// Destroy has to be implemented in thread-safe way and be prepared
// for being called more than once.
Destroy()
}
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
@ -301,6 +319,18 @@ func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan st
return s.muxAndDiscoveryCompleteSignals
}
// Destroy cleans up all its and its delegation target resources on shutdown.
// It starts with destroying its own resources and later proceeds with
// its delegation target.
func (s *GenericAPIServer) Destroy() {
for _, destroyFn := range s.destroyFns {
destroyFn()
}
if s.delegationTarget != nil {
s.delegationTarget.Destroy()
}
}
type emptyDelegate struct {
// handler is called at the end of the delegation chain
// when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler.
@ -340,6 +370,8 @@ func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
return map[string]<-chan struct{}{}
}
func (s emptyDelegate) Destroy() {
}
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
type preparedGenericAPIServer struct {
@ -395,6 +427,9 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
// Clean up resources on shutdown.
defer s.Destroy()
// spawn a new goroutine for closing the MuxAndDiscoveryComplete signal
// registration happens during construction of the generic api server
// the last server in the chain aggregates signals from the previous instances
@ -584,6 +619,8 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A
resourceInfos = append(resourceInfos, r...)
}
s.destroyFns = append(s.destroyFns, apiGroupInfo.destroyStorage)
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
// API installation happens before we start listening on the handlers,
@ -595,6 +632,9 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A
return nil
}
// InstallLegacyAPIGroup exposes the given legacy api group in the API.
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
// underlying storage will be destroyed on this servers shutdown.
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
@ -616,7 +656,9 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
return nil
}
// Exposes given api groups in the API.
// InstallAPIGroups exposes given api groups in the API.
// The <apiGroupInfos> passed into this function shouldn't be used elsewhere as the
// underlying storage will be destroyed on this servers shutdown.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
for _, apiGroupInfo := range apiGroupInfos {
// Do not register empty group or empty version. Doing so claims /apis/ for the wrong entity to be returned.
@ -669,7 +711,9 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro
return nil
}
// Exposes the given api group in the API.
// InstallAPIGroup exposes the given api group in the API.
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
// underlying storage will be destroyed on this servers shutdown.
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
return s.InstallAPIGroups(apiGroupInfo)
}

View File

@ -136,7 +136,7 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
stopCh := make(chan struct{})
kubeAPIServer, err := app.CreateServerChain(completedOptions, stopCh)
kubeAPIServer, err := app.CreateServerChain(completedOptions)
if err != nil {
t.Fatal(err)
}