Have the certificate manager decide if the server is healthy

Prevent a Kubelet from shutting down when the server isn't responding to
us but we cannot get a new certificate. This allows a cluster to coast
if the master is unresponsive or a node is partitioned and their client
cert expires.

Kubernetes-commit: b3a11aa635022761637090f4fc8d5cb57f3f0010
This commit is contained in:
Clayton Coleman
2017-10-05 18:57:53 -04:00
committed by Kubernetes Publisher
parent 8c1471aeda
commit f7a735a8c2
3 changed files with 267 additions and 11 deletions

View File

@@ -30,12 +30,18 @@ import (
"github.com/golang/glog"
certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
"k8s.io/client-go/util/cert"
"k8s.io/kubernetes/pkg/kubelet/util/csr"
)
// certificateWaitBackoff controls the amount and timing of retries when the
// watch for certificate approval is interrupted.
var certificateWaitBackoff = wait.Backoff{Duration: 30 * time.Second, Steps: 4, Factor: 1.5, Jitter: 0.1}
// Manager maintains and updates the certificates in use by this certificate
// manager. In the background it communicates with the API server to get new
// certificates for certificates about to expire.
@@ -49,6 +55,12 @@ type Manager interface {
// certificate manager, as well as the associated certificate and key data
// in PEM format.
Current() *tls.Certificate
// ServerHealthy returns true if the manager is able to communicate with
// the server. This allows a caller to determine whether the cert manager
// thinks it can potentially talk to the API server. The cert manager may
// be very conservative and only return true if recent communication has
// occurred with the server.
ServerHealthy() bool
}
// Config is the set of configuration parameters available for a new Manager.
@@ -132,6 +144,7 @@ type manager struct {
rotationDeadline time.Time
forceRotation bool
certificateExpiration Gauge
serverHealth bool
}
// NewManager returns a new certificate manager. A certificate manager is
@@ -169,6 +182,14 @@ func (m *manager) Current() *tls.Certificate {
return m.cert
}
// ServerHealthy returns true if the cert manager believes the server
// is currently alive.
func (m *manager) ServerHealthy() bool {
m.certAccessLock.RLock()
defer m.certAccessLock.RUnlock()
return m.serverHealth
}
// SetCertificateSigningRequestClient sets the client interface that is used
// for signing new certificates generated as part of rotation. It must be
// called before Start() and can not be used to change the
@@ -203,9 +224,8 @@ func (m *manager) Start() {
// doesn't have a certificate at all yet.
if m.shouldRotate() {
glog.V(1).Infof("shouldRotate() is true, forcing immediate rotation")
_, err := m.rotateCerts()
if err != nil {
glog.Errorf("Could not rotate certificates: %v", err)
if _, err := m.rotateCerts(); err != nil {
utilruntime.HandleError(fmt.Errorf("Could not rotate certificates: %v", err))
}
}
backoff := wait.Backoff{
@@ -219,7 +239,7 @@ func (m *manager) Start() {
glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
time.Sleep(sleepInterval)
if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil {
glog.Errorf("Reached backoff limit, still unable to rotate certs: %v", err)
utilruntime.HandleError(fmt.Errorf("Reached backoff limit, still unable to rotate certs: %v", err))
wait.PollInfinite(128*time.Second, m.rotateCerts)
}
}, 0)
@@ -273,26 +293,58 @@ func (m *manager) shouldRotate() bool {
return time.Now().After(m.rotationDeadline)
}
// rotateCerts attempts to request a client cert from the server, wait a reasonable
// period of time for it to be signed, and then update the cert on disk. If it cannot
// retrieve a cert, it will return false. It will only return error in exceptional cases.
// This method also keeps track of "server health" by interpreting the responses it gets
// from the server on the various calls it makes.
func (m *manager) rotateCerts() (bool, error) {
glog.V(2).Infof("Rotating certificates")
csrPEM, keyPEM, privateKey, err := m.generateCSR()
if err != nil {
glog.Errorf("Unable to generate a certificate signing request: %v", err)
utilruntime.HandleError(fmt.Errorf("Unable to generate a certificate signing request: %v", err))
return false, nil
}
// Call the Certificate Signing Request API to get a certificate for the
// new private key.
crtPEM, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey)
req, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey)
if err != nil {
glog.Errorf("Failed while requesting a signed certificate from the master: %v", err)
utilruntime.HandleError(fmt.Errorf("Failed while requesting a signed certificate from the master: %v", err))
return false, m.updateServerError(err)
}
// Wait for the certificate to be signed. Instead of one long watch, we retry with slighly longer
// intervals each time in order to tolerate failures from the server AND to preserve the liveliness
// of the cert manager loop. This creates slightly more traffic against the API server in return
// for bounding the amount of time we wait when a certificate expires.
var crtPEM []byte
watchDuration := time.Minute
if err := wait.ExponentialBackoff(certificateWaitBackoff, func() (bool, error) {
data, err := csr.WaitForCertificate(m.certSigningRequestClient, req, watchDuration)
switch {
case err == nil:
crtPEM = data
return true, nil
case err == wait.ErrWaitTimeout:
watchDuration += time.Minute
if watchDuration > 5*time.Minute {
watchDuration = 5 * time.Minute
}
return false, nil
default:
utilruntime.HandleError(fmt.Errorf("Unable to check certificate signing status: %v", err))
return false, m.updateServerError(err)
}
}); err != nil {
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
return false, nil
}
cert, err := m.certStore.Update(crtPEM, keyPEM)
if err != nil {
glog.Errorf("Unable to store the new cert/key pair: %v", err)
utilruntime.HandleError(fmt.Errorf("Unable to store the new cert/key pair: %v", err))
return false, nil
}
@@ -335,12 +387,38 @@ var jitteryDuration = func(totalDuration float64) time.Duration {
return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3)
}
// updateCached sets the most recent retrieved cert. It also sets the server
// as assumed healthy.
func (m *manager) updateCached(cert *tls.Certificate) {
m.certAccessLock.Lock()
defer m.certAccessLock.Unlock()
m.serverHealth = true
m.cert = cert
}
// updateServerError takes an error returned by the server and infers
// the health of the server based on the error. It will return nil if
// the error does not require immediate termination of any wait loops,
// and otherwise it will return the error.
func (m *manager) updateServerError(err error) error {
m.certAccessLock.Lock()
defer m.certAccessLock.Unlock()
switch {
case errors.IsUnauthorized(err):
// SSL terminating proxies may report this error instead of the master
m.serverHealth = true
case errors.IsUnexpectedServerError(err):
// generally indicates a proxy or other load balancer problem, rather than a problem coming
// from the master
m.serverHealth = false
default:
// Identify known errors that could be expected for a cert request that
// indicate everything is working normally
m.serverHealth = errors.IsNotFound(err) || errors.IsForbidden(err)
}
return nil
}
func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, key interface{}, err error) {
// Generate a new private key.
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)