From cb80082f666e0e5fe220df32e31a8face18e9393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 10 May 2022 11:12:08 +0200 Subject: [PATCH] Cleanup etcd healthcheck on shutdown --- .../src/k8s.io/apiserver/pkg/server/config.go | 5 ++ .../apiserver/pkg/server/options/etcd.go | 2 +- .../apiserver/pkg/server/options/etcd_test.go | 7 ++- .../storage/storagebackend/factory/etcd3.go | 61 ++++++++++++++----- .../storage/storagebackend/factory/factory.go | 4 +- 5 files changed, 61 insertions(+), 18 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index f2ea85b82f9..580f5eeff73 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -528,6 +528,11 @@ func completeOpenAPI(config *openapicommon.Config, version *version.Info) { } } +// StopNotify returns a lifecycle signal of genericapiserver shutting down. +func (c *Config) StopNotify() <-chan struct{} { + return c.lifecycleSignals.ShutdownInitiated.Signaled() +} + // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 625ce28be2e..951a699aa3f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -226,7 +226,7 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac } func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { - healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig) + healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig, c.StopNotify()) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go index 423059d4af6..bd454800aa3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go @@ -21,7 +21,9 @@ import ( "testing" "time" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -214,9 +216,12 @@ func TestKMSHealthzEndpoint(t *testing.T) { }, } + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - serverConfig := &server.Config{} + serverConfig := server.NewConfig(codecs) etcdOptions := &EtcdOptions{ EncryptionProviderConfigFilepath: tc.encryptionConfigPath, } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index f54c9421f20..eb95c0facee 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -24,7 +24,6 @@ import ( "path" "strings" "sync" - "sync/atomic" "time" grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -35,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilnet "k8s.io/apimachinery/pkg/util/net" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" genericfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server/egressselector" @@ -72,31 +72,64 @@ func init() { dbMetricsMonitors = make(map[string]struct{}) } -func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) { +func newETCD3HealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) { // constructing the etcd v3 client blocks and times out if etcd is not available. // retry in a loop in the background until we successfully create the client, storing the client or error encountered - clientValue := &atomic.Value{} - - clientErrMsg := &atomic.Value{} - clientErrMsg.Store("etcd client connection not yet established") + lock := sync.Mutex{} + var client *clientv3.Client + clientErr := fmt.Errorf("etcd client connection not yet established") go wait.PollUntil(time.Second, func() (bool, error) { - client, err := newETCD3Client(c.Transport) + newClient, err := newETCD3Client(c.Transport) + + lock.Lock() + defer lock.Unlock() + + // Ensure that server is already not shutting down. + select { + case <-stopCh: + if err == nil { + newClient.Close() + } + return true, nil + default: + } + if err != nil { - clientErrMsg.Store(err.Error()) + clientErr = err return false, nil } - clientValue.Store(client) - clientErrMsg.Store("") + client = newClient + clientErr = nil return true, nil - }, wait.NeverStop) + }, stopCh) + + // Close the client on shutdown. + go func() { + defer utilruntime.HandleCrash() + <-stopCh + + lock.Lock() + defer lock.Unlock() + if client != nil { + client.Close() + clientErr = fmt.Errorf("server is shutting down") + } + }() return func() error { - if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 { - return fmt.Errorf(errMsg) + // Given that client is closed on shutdown we hold the lock for + // the entire period of healthcheck call to ensure that client will + // not be closed during healthcheck. + // Given that healthchecks has a 2s timeout, worst case of blocking + // shutdown for additional 2s seems acceptable. + lock.Lock() + defer lock.Unlock() + if clientErr != nil { + return clientErr } - client := clientValue.Load().(*clientv3.Client) + healthcheckTimeout := storagebackend.DefaultHealthcheckTimeout if c.HealthcheckTimeout != time.Duration(0) { healthcheckTimeout = c.HealthcheckTimeout diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index 68c45a18f01..d1d3492168f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -40,12 +40,12 @@ func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) ( } // CreateHealthCheck creates a healthcheck function based on given config. -func CreateHealthCheck(c storagebackend.Config) (func() error, error) { +func CreateHealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) { switch c.Type { case storagebackend.StorageTypeETCD2: return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: - return newETCD3HealthCheck(c) + return newETCD3HealthCheck(c, stopCh) default: return nil, fmt.Errorf("unknown storage type: %s", c.Type) }