From f7a735a8c2ce07e1a47d8581096412bcf4de32d1 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 5 Oct 2017 18:57:53 -0400 Subject: [PATCH] Have the certificate manager decide if the server is healthy Prevent a Kubelet from shutting down when the server isn't responding to us but we cannot get a new certificate. This allows a cluster to coast if the master is unresponsive or a node is partitioned and their client cert expires. Kubernetes-commit: b3a11aa635022761637090f4fc8d5cb57f3f0010 --- util/certificate/BUILD | 5 + util/certificate/certificate_manager.go | 94 +++++++++- util/certificate/certificate_manager_test.go | 179 ++++++++++++++++++- 3 files changed, 267 insertions(+), 11 deletions(-) diff --git a/util/certificate/BUILD b/util/certificate/BUILD index c0e32aaa..6903afd3 100644 --- a/util/certificate/BUILD +++ b/util/certificate/BUILD @@ -19,7 +19,10 @@ go_test( tags = ["automanaged"], deps = [ "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", @@ -38,6 +41,8 @@ go_library( "//pkg/kubelet/util/csr:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", diff --git a/util/certificate/certificate_manager.go b/util/certificate/certificate_manager.go index 7eb4aaa7..22b14f36 100644 --- a/util/certificate/certificate_manager.go +++ b/util/certificate/certificate_manager.go @@ -30,12 +30,18 @@ import ( "github.com/golang/glog" certificates "k8s.io/api/certificates/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" "k8s.io/client-go/util/cert" "k8s.io/kubernetes/pkg/kubelet/util/csr" ) +// certificateWaitBackoff controls the amount and timing of retries when the +// watch for certificate approval is interrupted. +var certificateWaitBackoff = wait.Backoff{Duration: 30 * time.Second, Steps: 4, Factor: 1.5, Jitter: 0.1} + // Manager maintains and updates the certificates in use by this certificate // manager. In the background it communicates with the API server to get new // certificates for certificates about to expire. @@ -49,6 +55,12 @@ type Manager interface { // certificate manager, as well as the associated certificate and key data // in PEM format. Current() *tls.Certificate + // ServerHealthy returns true if the manager is able to communicate with + // the server. This allows a caller to determine whether the cert manager + // thinks it can potentially talk to the API server. The cert manager may + // be very conservative and only return true if recent communication has + // occurred with the server. + ServerHealthy() bool } // Config is the set of configuration parameters available for a new Manager. @@ -132,6 +144,7 @@ type manager struct { rotationDeadline time.Time forceRotation bool certificateExpiration Gauge + serverHealth bool } // NewManager returns a new certificate manager. A certificate manager is @@ -169,6 +182,14 @@ func (m *manager) Current() *tls.Certificate { return m.cert } +// ServerHealthy returns true if the cert manager believes the server +// is currently alive. +func (m *manager) ServerHealthy() bool { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + return m.serverHealth +} + // SetCertificateSigningRequestClient sets the client interface that is used // for signing new certificates generated as part of rotation. It must be // called before Start() and can not be used to change the @@ -203,9 +224,8 @@ func (m *manager) Start() { // doesn't have a certificate at all yet. if m.shouldRotate() { glog.V(1).Infof("shouldRotate() is true, forcing immediate rotation") - _, err := m.rotateCerts() - if err != nil { - glog.Errorf("Could not rotate certificates: %v", err) + if _, err := m.rotateCerts(); err != nil { + utilruntime.HandleError(fmt.Errorf("Could not rotate certificates: %v", err)) } } backoff := wait.Backoff{ @@ -219,7 +239,7 @@ func (m *manager) Start() { glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval) time.Sleep(sleepInterval) if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil { - glog.Errorf("Reached backoff limit, still unable to rotate certs: %v", err) + utilruntime.HandleError(fmt.Errorf("Reached backoff limit, still unable to rotate certs: %v", err)) wait.PollInfinite(128*time.Second, m.rotateCerts) } }, 0) @@ -273,26 +293,58 @@ func (m *manager) shouldRotate() bool { return time.Now().After(m.rotationDeadline) } +// rotateCerts attempts to request a client cert from the server, wait a reasonable +// period of time for it to be signed, and then update the cert on disk. If it cannot +// retrieve a cert, it will return false. It will only return error in exceptional cases. +// This method also keeps track of "server health" by interpreting the responses it gets +// from the server on the various calls it makes. func (m *manager) rotateCerts() (bool, error) { glog.V(2).Infof("Rotating certificates") csrPEM, keyPEM, privateKey, err := m.generateCSR() if err != nil { - glog.Errorf("Unable to generate a certificate signing request: %v", err) + utilruntime.HandleError(fmt.Errorf("Unable to generate a certificate signing request: %v", err)) return false, nil } // Call the Certificate Signing Request API to get a certificate for the // new private key. - crtPEM, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey) + req, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey) if err != nil { - glog.Errorf("Failed while requesting a signed certificate from the master: %v", err) + utilruntime.HandleError(fmt.Errorf("Failed while requesting a signed certificate from the master: %v", err)) + return false, m.updateServerError(err) + } + + // Wait for the certificate to be signed. Instead of one long watch, we retry with slighly longer + // intervals each time in order to tolerate failures from the server AND to preserve the liveliness + // of the cert manager loop. This creates slightly more traffic against the API server in return + // for bounding the amount of time we wait when a certificate expires. + var crtPEM []byte + watchDuration := time.Minute + if err := wait.ExponentialBackoff(certificateWaitBackoff, func() (bool, error) { + data, err := csr.WaitForCertificate(m.certSigningRequestClient, req, watchDuration) + switch { + case err == nil: + crtPEM = data + return true, nil + case err == wait.ErrWaitTimeout: + watchDuration += time.Minute + if watchDuration > 5*time.Minute { + watchDuration = 5 * time.Minute + } + return false, nil + default: + utilruntime.HandleError(fmt.Errorf("Unable to check certificate signing status: %v", err)) + return false, m.updateServerError(err) + } + }); err != nil { + utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err)) return false, nil } cert, err := m.certStore.Update(crtPEM, keyPEM) if err != nil { - glog.Errorf("Unable to store the new cert/key pair: %v", err) + utilruntime.HandleError(fmt.Errorf("Unable to store the new cert/key pair: %v", err)) return false, nil } @@ -335,12 +387,38 @@ var jitteryDuration = func(totalDuration float64) time.Duration { return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3) } +// updateCached sets the most recent retrieved cert. It also sets the server +// as assumed healthy. func (m *manager) updateCached(cert *tls.Certificate) { m.certAccessLock.Lock() defer m.certAccessLock.Unlock() + m.serverHealth = true m.cert = cert } +// updateServerError takes an error returned by the server and infers +// the health of the server based on the error. It will return nil if +// the error does not require immediate termination of any wait loops, +// and otherwise it will return the error. +func (m *manager) updateServerError(err error) error { + m.certAccessLock.Lock() + defer m.certAccessLock.Unlock() + switch { + case errors.IsUnauthorized(err): + // SSL terminating proxies may report this error instead of the master + m.serverHealth = true + case errors.IsUnexpectedServerError(err): + // generally indicates a proxy or other load balancer problem, rather than a problem coming + // from the master + m.serverHealth = false + default: + // Identify known errors that could be expected for a cert request that + // indicate everything is working normally + m.serverHealth = errors.IsNotFound(err) || errors.IsForbidden(err) + } + return nil +} + func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, key interface{}, err error) { // Generate a new private key. privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader) diff --git a/util/certificate/certificate_manager_test.go b/util/certificate/certificate_manager_test.go index 3df131e7..ab19e37b 100644 --- a/util/certificate/certificate_manager_test.go +++ b/util/certificate/certificate_manager_test.go @@ -27,7 +27,10 @@ import ( "time" certificates "k8s.io/api/certificates/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" watch "k8s.io/apimachinery/pkg/watch" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" ) @@ -288,6 +291,7 @@ func TestRotateCertWaitingForResultError(t *testing.T) { }, } + certificateWaitBackoff = wait.Backoff{Steps: 1} if success, err := m.rotateCerts(); success { t.Errorf("Got success from 'rotateCerts', wanted failure.") } else if err != nil { @@ -612,11 +616,170 @@ func TestInitializeOtherRESTClients(t *testing.T) { } else { m.setRotationDeadline() if m.shouldRotate() { - if success, err := certificateManager.(*manager).rotateCerts(); !success { - t.Errorf("Got failure from 'rotateCerts', expected success") - } else if err != nil { + success, err := certificateManager.(*manager).rotateCerts() + if err != nil { t.Errorf("Got error %v, expected none.", err) + return } + if !success { + t.Errorf("Unexpected response 'rotateCerts': %t", success) + return + } + } + } + + certificate = certificateManager.Current() + if !certificatesEqual(certificate, tc.expectedCertAfterStart.certificate) { + t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertAfterStart.certificate)) + } + }) + } +} + +func TestServerHealth(t *testing.T) { + type certs struct { + storeCert *certificateData + bootstrapCert *certificateData + apiCert *certificateData + expectedCertBeforeStart *certificateData + expectedCertAfterStart *certificateData + } + + updatedCerts := certs{ + storeCert: storeCertData, + bootstrapCert: bootstrapCertData, + apiCert: apiServerCertData, + expectedCertBeforeStart: storeCertData, + expectedCertAfterStart: apiServerCertData, + } + + currentCerts := certs{ + storeCert: storeCertData, + bootstrapCert: bootstrapCertData, + apiCert: apiServerCertData, + expectedCertBeforeStart: storeCertData, + expectedCertAfterStart: storeCertData, + } + + testCases := []struct { + description string + certs + + failureType fakeClientFailureType + clientErr error + + expectRotateFail bool + expectHealthy bool + }{ + { + description: "Current certificate, bootstrap certificate", + certs: updatedCerts, + expectHealthy: true, + }, + { + description: "Generic error on create", + certs: currentCerts, + + failureType: createError, + expectRotateFail: true, + }, + { + description: "Unauthorized error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewUnauthorized("unauthorized"), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Generic unauthorized error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(401, "POST", schema.GroupResource{}, "", "", 0, true), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Generic not found error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, true), + expectRotateFail: true, + expectHealthy: false, + }, + { + description: "Not found error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, false), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Conflict error on watch", + certs: currentCerts, + + failureType: watchError, + clientErr: errors.NewGenericServerResponse(409, "POST", schema.GroupResource{}, "", "", 0, false), + expectRotateFail: true, + expectHealthy: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + certificateStore := &fakeStore{ + cert: tc.storeCert.certificate, + } + + certificateManager, err := NewManager(&Config{ + Template: &x509.CertificateRequest{ + Subject: pkix.Name{ + Organization: []string{"system:nodes"}, + CommonName: "system:node:fake-node-name", + }, + }, + Usages: []certificates.KeyUsage{ + certificates.UsageDigitalSignature, + certificates.UsageKeyEncipherment, + certificates.UsageClientAuth, + }, + CertificateStore: certificateStore, + BootstrapCertificatePEM: tc.bootstrapCert.certificatePEM, + BootstrapKeyPEM: tc.bootstrapCert.keyPEM, + CertificateSigningRequestClient: &fakeClient{ + certificatePEM: tc.apiCert.certificatePEM, + failureType: tc.failureType, + err: tc.clientErr, + }, + }) + if err != nil { + t.Errorf("Got %v, wanted no error.", err) + } + + certificate := certificateManager.Current() + if !certificatesEqual(certificate, tc.expectedCertBeforeStart.certificate) { + t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertBeforeStart.certificate)) + } + + if _, ok := certificateManager.(*manager); !ok { + t.Errorf("Expected a '*manager' from 'NewManager'") + } else { + success, err := certificateManager.(*manager).rotateCerts() + if err != nil { + t.Errorf("Got error %v, expected none.", err) + return + } + if !success != tc.expectRotateFail { + t.Errorf("Unexpected response 'rotateCerts': %t", success) + return + } + if actual := certificateManager.(*manager).ServerHealthy(); actual != tc.expectHealthy { + t.Errorf("Unexpected manager server health: %t", actual) } } @@ -641,10 +804,14 @@ type fakeClient struct { certificatesclient.CertificateSigningRequestInterface failureType fakeClientFailureType certificatePEM []byte + err error } func (c fakeClient) List(opts v1.ListOptions) (*certificates.CertificateSigningRequestList, error) { if c.failureType == watchError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Watch error") } csrReply := certificates.CertificateSigningRequestList{ @@ -657,6 +824,9 @@ func (c fakeClient) List(opts v1.ListOptions) (*certificates.CertificateSigningR func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { if c.failureType == createError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Create error") } csrReply := certificates.CertificateSigningRequest{} @@ -666,6 +836,9 @@ func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificat func (c fakeClient) Watch(opts v1.ListOptions) (watch.Interface, error) { if c.failureType == watchError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Watch error") } return &fakeWatch{