From b3a11aa635022761637090f4fc8d5cb57f3f0010 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 5 Oct 2017 18:57:53 -0400 Subject: [PATCH] Have the certificate manager decide if the server is healthy Prevent a Kubelet from shutting down when the server isn't responding to us but we cannot get a new certificate. This allows a cluster to coast if the master is unresponsive or a node is partitioned and their client cert expires. --- pkg/kubelet/certificate/transport.go | 6 +- pkg/kubelet/certificate/transport_test.go | 5 +- pkg/kubelet/util/csr/BUILD | 1 + pkg/kubelet/util/csr/csr.go | 35 +++- .../k8s.io/client-go/util/certificate/BUILD | 5 + .../util/certificate/certificate_manager.go | 94 ++++++++- .../certificate/certificate_manager_test.go | 179 +++++++++++++++++- 7 files changed, 304 insertions(+), 21 deletions(-) diff --git a/pkg/kubelet/certificate/transport.go b/pkg/kubelet/certificate/transport.go index d206f23cdeb..1683ad09735 100644 --- a/pkg/kubelet/certificate/transport.go +++ b/pkg/kubelet/certificate/transport.go @@ -81,7 +81,11 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig go wait.Until(func() { curr := clientCertificateManager.Current() if exitIfExpired && curr != nil && time.Now().After(curr.Leaf.NotAfter) { - glog.Fatalf("The currently active client certificate has expired, exiting.") + if clientCertificateManager.ServerHealthy() { + glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.") + } else { + glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.") + } } if curr == nil || lastCert == curr { // Cert hasn't been rotated. diff --git a/pkg/kubelet/certificate/transport_test.go b/pkg/kubelet/certificate/transport_test.go index 5dce3eafb4c..6e476c3b803 100644 --- a/pkg/kubelet/certificate/transport_test.go +++ b/pkg/kubelet/certificate/transport_test.go @@ -114,13 +114,16 @@ func newCertificateData(certificatePEM string, keyPEM string) *certificateData { } type fakeManager struct { - cert atomic.Value // Always a *tls.Certificate + cert atomic.Value // Always a *tls.Certificate + healthy bool } func (f *fakeManager) SetCertificateSigningRequestClient(certificatesclient.CertificateSigningRequestInterface) error { return nil } +func (f *fakeManager) ServerHealthy() bool { return f.healthy } + func (f *fakeManager) Start() {} func (f *fakeManager) Current() *tls.Certificate { diff --git a/pkg/kubelet/util/csr/BUILD b/pkg/kubelet/util/csr/BUILD index 3414983f2cf..78c435528b5 100644 --- a/pkg/kubelet/util/csr/BUILD +++ b/pkg/kubelet/util/csr/BUILD @@ -19,6 +19,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", diff --git a/pkg/kubelet/util/csr/csr.go b/pkg/kubelet/util/csr/csr.go index 9bbb6a7bc90..53b67f8650e 100644 --- a/pkg/kubelet/util/csr/csr.go +++ b/pkg/kubelet/util/csr/csr.go @@ -69,7 +69,11 @@ func RequestNodeCertificate(client certificatesclient.CertificateSigningRequestI certificates.UsageClientAuth, } name := digestedName(privateKeyData, subject, usages) - return RequestCertificate(client, csrData, name, usages, privateKey) + req, err := RequestCertificate(client, csrData, name, usages, privateKey) + if err != nil { + return nil, err + } + return WaitForCertificate(client, req, 3600*time.Second) } // RequestCertificate will either use an existing (if this process has run @@ -78,7 +82,7 @@ func RequestNodeCertificate(client certificatesclient.CertificateSigningRequestI // status, once approved by API server, it will return the API server's issued // certificate (pem-encoded). If there is any errors, or the watch timeouts, it // will return an error. -func RequestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, name string, usages []certificates.KeyUsage, privateKey interface{}) (certData []byte, err error) { +func RequestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, name string, usages []certificates.KeyUsage, privateKey interface{}) (req *certificates.CertificateSigningRequest, err error) { csr := &certificates.CertificateSigningRequest{ // Username, UID, Groups will be injected by API server. TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"}, @@ -94,27 +98,31 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter csr.GenerateName = "csr-" } - req, err := client.Create(csr) + req, err = client.Create(csr) switch { case err == nil: case errors.IsAlreadyExists(err) && len(name) > 0: glog.Infof("csr for this node already exists, reusing") req, err = client.Get(name, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("cannot retrieve certificate signing request: %v", err) + return nil, formatError("cannot retrieve certificate signing request: %v", err) } if err := ensureCompatible(req, csr, privateKey); err != nil { return nil, fmt.Errorf("retrieved csr is not compatible: %v", err) } glog.Infof("csr for this node is still valid") default: - return nil, fmt.Errorf("cannot create certificate signing request: %v", err) + return nil, formatError("cannot create certificate signing request: %v", err) } + return req, nil +} +// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error. +func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) { fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String() event, err := cache.ListWatchUntil( - 3600*time.Second, + timeout, &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector @@ -129,7 +137,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter switch event.Type { case watch.Modified, watch.Added: case watch.Deleted: - return false, fmt.Errorf("csr %q was deleted", csr.Name) + return false, fmt.Errorf("csr %q was deleted", req.Name) default: return false, nil } @@ -152,7 +160,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter return nil, wait.ErrWaitTimeout } if err != nil { - return nil, fmt.Errorf("cannot watch on the certificate signing request: %v", err) + return nil, formatError("cannot watch on the certificate signing request: %v", err) } return event.Object.(*certificates.CertificateSigningRequest).Status.Certificate, nil @@ -225,3 +233,14 @@ func ensureCompatible(new, orig *certificates.CertificateSigningRequest, private } return nil } + +// formatError preserves the type of an API message but alters the message. Expects +// a single argument format string, and returns the wrapped error. +func formatError(format string, err error) error { + if s, ok := err.(errors.APIStatus); ok { + se := &errors.StatusError{ErrStatus: s.Status()} + se.ErrStatus.Message = fmt.Sprintf(format, se.ErrStatus.Message) + return se + } + return fmt.Errorf(format, err) +} diff --git a/staging/src/k8s.io/client-go/util/certificate/BUILD b/staging/src/k8s.io/client-go/util/certificate/BUILD index c0e32aaaf46..6903afd3bb2 100644 --- a/staging/src/k8s.io/client-go/util/certificate/BUILD +++ b/staging/src/k8s.io/client-go/util/certificate/BUILD @@ -19,7 +19,10 @@ go_test( tags = ["automanaged"], deps = [ "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", @@ -38,6 +41,8 @@ go_library( "//pkg/kubelet/util/csr:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", diff --git a/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go b/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go index 7eb4aaa74a0..22b14f363d3 100644 --- a/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go +++ b/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go @@ -30,12 +30,18 @@ import ( "github.com/golang/glog" certificates "k8s.io/api/certificates/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" "k8s.io/client-go/util/cert" "k8s.io/kubernetes/pkg/kubelet/util/csr" ) +// certificateWaitBackoff controls the amount and timing of retries when the +// watch for certificate approval is interrupted. +var certificateWaitBackoff = wait.Backoff{Duration: 30 * time.Second, Steps: 4, Factor: 1.5, Jitter: 0.1} + // Manager maintains and updates the certificates in use by this certificate // manager. In the background it communicates with the API server to get new // certificates for certificates about to expire. @@ -49,6 +55,12 @@ type Manager interface { // certificate manager, as well as the associated certificate and key data // in PEM format. Current() *tls.Certificate + // ServerHealthy returns true if the manager is able to communicate with + // the server. This allows a caller to determine whether the cert manager + // thinks it can potentially talk to the API server. The cert manager may + // be very conservative and only return true if recent communication has + // occurred with the server. + ServerHealthy() bool } // Config is the set of configuration parameters available for a new Manager. @@ -132,6 +144,7 @@ type manager struct { rotationDeadline time.Time forceRotation bool certificateExpiration Gauge + serverHealth bool } // NewManager returns a new certificate manager. A certificate manager is @@ -169,6 +182,14 @@ func (m *manager) Current() *tls.Certificate { return m.cert } +// ServerHealthy returns true if the cert manager believes the server +// is currently alive. +func (m *manager) ServerHealthy() bool { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + return m.serverHealth +} + // SetCertificateSigningRequestClient sets the client interface that is used // for signing new certificates generated as part of rotation. It must be // called before Start() and can not be used to change the @@ -203,9 +224,8 @@ func (m *manager) Start() { // doesn't have a certificate at all yet. if m.shouldRotate() { glog.V(1).Infof("shouldRotate() is true, forcing immediate rotation") - _, err := m.rotateCerts() - if err != nil { - glog.Errorf("Could not rotate certificates: %v", err) + if _, err := m.rotateCerts(); err != nil { + utilruntime.HandleError(fmt.Errorf("Could not rotate certificates: %v", err)) } } backoff := wait.Backoff{ @@ -219,7 +239,7 @@ func (m *manager) Start() { glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval) time.Sleep(sleepInterval) if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil { - glog.Errorf("Reached backoff limit, still unable to rotate certs: %v", err) + utilruntime.HandleError(fmt.Errorf("Reached backoff limit, still unable to rotate certs: %v", err)) wait.PollInfinite(128*time.Second, m.rotateCerts) } }, 0) @@ -273,26 +293,58 @@ func (m *manager) shouldRotate() bool { return time.Now().After(m.rotationDeadline) } +// rotateCerts attempts to request a client cert from the server, wait a reasonable +// period of time for it to be signed, and then update the cert on disk. If it cannot +// retrieve a cert, it will return false. It will only return error in exceptional cases. +// This method also keeps track of "server health" by interpreting the responses it gets +// from the server on the various calls it makes. func (m *manager) rotateCerts() (bool, error) { glog.V(2).Infof("Rotating certificates") csrPEM, keyPEM, privateKey, err := m.generateCSR() if err != nil { - glog.Errorf("Unable to generate a certificate signing request: %v", err) + utilruntime.HandleError(fmt.Errorf("Unable to generate a certificate signing request: %v", err)) return false, nil } // Call the Certificate Signing Request API to get a certificate for the // new private key. - crtPEM, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey) + req, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey) if err != nil { - glog.Errorf("Failed while requesting a signed certificate from the master: %v", err) + utilruntime.HandleError(fmt.Errorf("Failed while requesting a signed certificate from the master: %v", err)) + return false, m.updateServerError(err) + } + + // Wait for the certificate to be signed. Instead of one long watch, we retry with slighly longer + // intervals each time in order to tolerate failures from the server AND to preserve the liveliness + // of the cert manager loop. This creates slightly more traffic against the API server in return + // for bounding the amount of time we wait when a certificate expires. + var crtPEM []byte + watchDuration := time.Minute + if err := wait.ExponentialBackoff(certificateWaitBackoff, func() (bool, error) { + data, err := csr.WaitForCertificate(m.certSigningRequestClient, req, watchDuration) + switch { + case err == nil: + crtPEM = data + return true, nil + case err == wait.ErrWaitTimeout: + watchDuration += time.Minute + if watchDuration > 5*time.Minute { + watchDuration = 5 * time.Minute + } + return false, nil + default: + utilruntime.HandleError(fmt.Errorf("Unable to check certificate signing status: %v", err)) + return false, m.updateServerError(err) + } + }); err != nil { + utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err)) return false, nil } cert, err := m.certStore.Update(crtPEM, keyPEM) if err != nil { - glog.Errorf("Unable to store the new cert/key pair: %v", err) + utilruntime.HandleError(fmt.Errorf("Unable to store the new cert/key pair: %v", err)) return false, nil } @@ -335,12 +387,38 @@ var jitteryDuration = func(totalDuration float64) time.Duration { return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3) } +// updateCached sets the most recent retrieved cert. It also sets the server +// as assumed healthy. func (m *manager) updateCached(cert *tls.Certificate) { m.certAccessLock.Lock() defer m.certAccessLock.Unlock() + m.serverHealth = true m.cert = cert } +// updateServerError takes an error returned by the server and infers +// the health of the server based on the error. It will return nil if +// the error does not require immediate termination of any wait loops, +// and otherwise it will return the error. +func (m *manager) updateServerError(err error) error { + m.certAccessLock.Lock() + defer m.certAccessLock.Unlock() + switch { + case errors.IsUnauthorized(err): + // SSL terminating proxies may report this error instead of the master + m.serverHealth = true + case errors.IsUnexpectedServerError(err): + // generally indicates a proxy or other load balancer problem, rather than a problem coming + // from the master + m.serverHealth = false + default: + // Identify known errors that could be expected for a cert request that + // indicate everything is working normally + m.serverHealth = errors.IsNotFound(err) || errors.IsForbidden(err) + } + return nil +} + func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, key interface{}, err error) { // Generate a new private key. privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader) diff --git a/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go b/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go index 3df131e7d2b..ab19e37b12d 100644 --- a/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go +++ b/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go @@ -27,7 +27,10 @@ import ( "time" certificates "k8s.io/api/certificates/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" watch "k8s.io/apimachinery/pkg/watch" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" ) @@ -288,6 +291,7 @@ func TestRotateCertWaitingForResultError(t *testing.T) { }, } + certificateWaitBackoff = wait.Backoff{Steps: 1} if success, err := m.rotateCerts(); success { t.Errorf("Got success from 'rotateCerts', wanted failure.") } else if err != nil { @@ -612,11 +616,170 @@ func TestInitializeOtherRESTClients(t *testing.T) { } else { m.setRotationDeadline() if m.shouldRotate() { - if success, err := certificateManager.(*manager).rotateCerts(); !success { - t.Errorf("Got failure from 'rotateCerts', expected success") - } else if err != nil { + success, err := certificateManager.(*manager).rotateCerts() + if err != nil { t.Errorf("Got error %v, expected none.", err) + return } + if !success { + t.Errorf("Unexpected response 'rotateCerts': %t", success) + return + } + } + } + + certificate = certificateManager.Current() + if !certificatesEqual(certificate, tc.expectedCertAfterStart.certificate) { + t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertAfterStart.certificate)) + } + }) + } +} + +func TestServerHealth(t *testing.T) { + type certs struct { + storeCert *certificateData + bootstrapCert *certificateData + apiCert *certificateData + expectedCertBeforeStart *certificateData + expectedCertAfterStart *certificateData + } + + updatedCerts := certs{ + storeCert: storeCertData, + bootstrapCert: bootstrapCertData, + apiCert: apiServerCertData, + expectedCertBeforeStart: storeCertData, + expectedCertAfterStart: apiServerCertData, + } + + currentCerts := certs{ + storeCert: storeCertData, + bootstrapCert: bootstrapCertData, + apiCert: apiServerCertData, + expectedCertBeforeStart: storeCertData, + expectedCertAfterStart: storeCertData, + } + + testCases := []struct { + description string + certs + + failureType fakeClientFailureType + clientErr error + + expectRotateFail bool + expectHealthy bool + }{ + { + description: "Current certificate, bootstrap certificate", + certs: updatedCerts, + expectHealthy: true, + }, + { + description: "Generic error on create", + certs: currentCerts, + + failureType: createError, + expectRotateFail: true, + }, + { + description: "Unauthorized error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewUnauthorized("unauthorized"), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Generic unauthorized error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(401, "POST", schema.GroupResource{}, "", "", 0, true), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Generic not found error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, true), + expectRotateFail: true, + expectHealthy: false, + }, + { + description: "Not found error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, false), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Conflict error on watch", + certs: currentCerts, + + failureType: watchError, + clientErr: errors.NewGenericServerResponse(409, "POST", schema.GroupResource{}, "", "", 0, false), + expectRotateFail: true, + expectHealthy: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + certificateStore := &fakeStore{ + cert: tc.storeCert.certificate, + } + + certificateManager, err := NewManager(&Config{ + Template: &x509.CertificateRequest{ + Subject: pkix.Name{ + Organization: []string{"system:nodes"}, + CommonName: "system:node:fake-node-name", + }, + }, + Usages: []certificates.KeyUsage{ + certificates.UsageDigitalSignature, + certificates.UsageKeyEncipherment, + certificates.UsageClientAuth, + }, + CertificateStore: certificateStore, + BootstrapCertificatePEM: tc.bootstrapCert.certificatePEM, + BootstrapKeyPEM: tc.bootstrapCert.keyPEM, + CertificateSigningRequestClient: &fakeClient{ + certificatePEM: tc.apiCert.certificatePEM, + failureType: tc.failureType, + err: tc.clientErr, + }, + }) + if err != nil { + t.Errorf("Got %v, wanted no error.", err) + } + + certificate := certificateManager.Current() + if !certificatesEqual(certificate, tc.expectedCertBeforeStart.certificate) { + t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertBeforeStart.certificate)) + } + + if _, ok := certificateManager.(*manager); !ok { + t.Errorf("Expected a '*manager' from 'NewManager'") + } else { + success, err := certificateManager.(*manager).rotateCerts() + if err != nil { + t.Errorf("Got error %v, expected none.", err) + return + } + if !success != tc.expectRotateFail { + t.Errorf("Unexpected response 'rotateCerts': %t", success) + return + } + if actual := certificateManager.(*manager).ServerHealthy(); actual != tc.expectHealthy { + t.Errorf("Unexpected manager server health: %t", actual) } } @@ -641,10 +804,14 @@ type fakeClient struct { certificatesclient.CertificateSigningRequestInterface failureType fakeClientFailureType certificatePEM []byte + err error } func (c fakeClient) List(opts v1.ListOptions) (*certificates.CertificateSigningRequestList, error) { if c.failureType == watchError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Watch error") } csrReply := certificates.CertificateSigningRequestList{ @@ -657,6 +824,9 @@ func (c fakeClient) List(opts v1.ListOptions) (*certificates.CertificateSigningR func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { if c.failureType == createError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Create error") } csrReply := certificates.CertificateSigningRequest{} @@ -666,6 +836,9 @@ func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificat func (c fakeClient) Watch(opts v1.ListOptions) (watch.Interface, error) { if c.failureType == watchError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Watch error") } return &fakeWatch{