Cleanup etcd healthcheck on shutdown

This commit is contained in:
Wojciech Tyczyński 2022-05-10 11:12:08 +02:00
parent b059f99951
commit cb80082f66
5 changed files with 61 additions and 18 deletions

View File

@ -528,6 +528,11 @@ func completeOpenAPI(config *openapicommon.Config, version *version.Info) {
} }
} }
// StopNotify returns a lifecycle signal of genericapiserver shutting down.
func (c *Config) StopNotify() <-chan struct{} {
return c.lifecycleSignals.ShutdownInitiated.Signaled()
}
// Complete fills in any fields not set that are required to have valid data and can be derived // Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {

View File

@ -226,7 +226,7 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
} }
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig) healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig, c.StopNotify())
if err != nil { if err != nil {
return err return err
} }

View File

@ -21,7 +21,9 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
@ -214,9 +216,12 @@ func TestKMSHealthzEndpoint(t *testing.T) {
}, },
} }
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
serverConfig := &server.Config{} serverConfig := server.NewConfig(codecs)
etcdOptions := &EtcdOptions{ etcdOptions := &EtcdOptions{
EncryptionProviderConfigFilepath: tc.encryptionConfigPath, EncryptionProviderConfigFilepath: tc.encryptionConfigPath,
} }

View File

@ -24,7 +24,6 @@ import (
"path" "path"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
@ -35,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
genericfeatures "k8s.io/apiserver/pkg/features" genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
@ -72,31 +72,64 @@ func init() {
dbMetricsMonitors = make(map[string]struct{}) dbMetricsMonitors = make(map[string]struct{})
} }
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) { func newETCD3HealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available. // constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered // retry in a loop in the background until we successfully create the client, storing the client or error encountered
clientValue := &atomic.Value{} lock := sync.Mutex{}
var client *clientv3.Client
clientErrMsg := &atomic.Value{} clientErr := fmt.Errorf("etcd client connection not yet established")
clientErrMsg.Store("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) { go wait.PollUntil(time.Second, func() (bool, error) {
client, err := newETCD3Client(c.Transport) newClient, err := newETCD3Client(c.Transport)
lock.Lock()
defer lock.Unlock()
// Ensure that server is already not shutting down.
select {
case <-stopCh:
if err == nil {
newClient.Close()
}
return true, nil
default:
}
if err != nil { if err != nil {
clientErrMsg.Store(err.Error()) clientErr = err
return false, nil return false, nil
} }
clientValue.Store(client) client = newClient
clientErrMsg.Store("") clientErr = nil
return true, nil return true, nil
}, wait.NeverStop) }, stopCh)
// Close the client on shutdown.
go func() {
defer utilruntime.HandleCrash()
<-stopCh
lock.Lock()
defer lock.Unlock()
if client != nil {
client.Close()
clientErr = fmt.Errorf("server is shutting down")
}
}()
return func() error { return func() error {
if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 { // Given that client is closed on shutdown we hold the lock for
return fmt.Errorf(errMsg) // the entire period of healthcheck call to ensure that client will
// not be closed during healthcheck.
// Given that healthchecks has a 2s timeout, worst case of blocking
// shutdown for additional 2s seems acceptable.
lock.Lock()
defer lock.Unlock()
if clientErr != nil {
return clientErr
} }
client := clientValue.Load().(*clientv3.Client)
healthcheckTimeout := storagebackend.DefaultHealthcheckTimeout healthcheckTimeout := storagebackend.DefaultHealthcheckTimeout
if c.HealthcheckTimeout != time.Duration(0) { if c.HealthcheckTimeout != time.Duration(0) {
healthcheckTimeout = c.HealthcheckTimeout healthcheckTimeout = c.HealthcheckTimeout

View File

@ -40,12 +40,12 @@ func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (
} }
// CreateHealthCheck creates a healthcheck function based on given config. // CreateHealthCheck creates a healthcheck function based on given config.
func CreateHealthCheck(c storagebackend.Config) (func() error, error) { func CreateHealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) {
switch c.Type { switch c.Type {
case storagebackend.StorageTypeETCD2: case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3HealthCheck(c) return newETCD3HealthCheck(c, stopCh)
default: default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type) return nil, fmt.Errorf("unknown storage type: %s", c.Type)
} }