diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 099b5f50da3..66badd8f2d6 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -342,7 +342,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { if err != nil { return err } - if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager); err != nil { + // we set exitIfExpired to true because we use this client configuration to request new certs - if we are unable + // to request new certs, we will be unable to continue normal operation + if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, true); err != nil { return err } } diff --git a/pkg/kubelet/certificate/bootstrap/BUILD b/pkg/kubelet/certificate/bootstrap/BUILD index e4aeb0e97fa..05d24e0c0f4 100644 --- a/pkg/kubelet/certificate/bootstrap/BUILD +++ b/pkg/kubelet/certificate/bootstrap/BUILD @@ -22,14 +22,16 @@ go_library( srcs = ["bootstrap.go"], importpath = "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap", deps = [ - "//pkg/kubelet/util/csr:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library", + "//vendor/k8s.io/client-go/transport:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", + "//vendor/k8s.io/client-go/util/certificate/csr:go_default_library", ], ) diff --git a/pkg/kubelet/certificate/bootstrap/bootstrap.go b/pkg/kubelet/certificate/bootstrap/bootstrap.go index 2952ba0bd62..d123c0beee5 100644 --- a/pkg/kubelet/certificate/bootstrap/bootstrap.go +++ b/pkg/kubelet/certificate/bootstrap/bootstrap.go @@ -20,16 +20,19 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" certificates "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/transport" certutil "k8s.io/client-go/util/cert" - "k8s.io/kubernetes/pkg/kubelet/util/csr" + "k8s.io/client-go/util/certificate/csr" ) const ( @@ -42,17 +45,15 @@ const ( // On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath. // The certificate and key file are stored in certDir. func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string, nodeName types.NodeName) error { - // Short-circuit if the kubeconfig file already exists. - // TODO: inspect the kubeconfig, ensure a rest client can be built from it, verify client cert expiration, etc. - _, err := os.Stat(kubeconfigPath) - if err == nil { - glog.V(2).Infof("Kubeconfig %s exists, skipping bootstrap", kubeconfigPath) - return nil - } - if !os.IsNotExist(err) { - glog.Errorf("Error reading kubeconfig %s, skipping bootstrap: %v", kubeconfigPath, err) + // Short-circuit if the kubeconfig file exists and is valid. + ok, err := verifyBootstrapClientConfig(kubeconfigPath) + if err != nil { return err } + if ok { + glog.V(2).Infof("Kubeconfig %s exists and is valid, skipping bootstrap", kubeconfigPath) + return nil + } glog.V(2).Info("Using bootstrap kubeconfig to generate TLS client cert, key and kubeconfig file") @@ -72,6 +73,17 @@ func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string, if err != nil { return fmt.Errorf("unable to build bootstrap key path: %v", err) } + // If we are unable to generate a CSR, we remove our key file and start fresh. + // This method is used before enabling client rotation and so we must ensure we + // can make forward progress if we crash and exit when a CSR exists but the cert + // it is signed for has expired. + defer func() { + if !success { + if err := os.Remove(keyPath); err != nil && !os.IsNotExist(err) { + glog.Warningf("Cannot clean up the key file %q: %v", keyPath, err) + } + } + }() keyData, _, err := certutil.LoadOrGenerateKeyFile(keyPath) if err != nil { return err @@ -82,6 +94,13 @@ func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string, if err != nil { return fmt.Errorf("unable to build bootstrap client cert path: %v", err) } + defer func() { + if !success { + if err := os.Remove(certPath); err != nil && !os.IsNotExist(err) { + glog.Warningf("Cannot clean up the cert file %q: %v", certPath, err) + } + } + }() certData, err := csr.RequestNodeCertificate(bootstrapClient.CertificateSigningRequests(), keyData, nodeName) if err != nil { return err @@ -89,13 +108,6 @@ func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string, if err := certutil.WriteCert(certPath, certData); err != nil { return err } - defer func() { - if !success { - if err := os.Remove(certPath); err != nil { - glog.Warningf("Cannot clean up the cert file %q: %v", certPath, err) - } - } - }() // Get the CA data from the bootstrap client config. caFile, caData := bootstrapClientConfig.CAFile, []byte{} @@ -150,3 +162,48 @@ func loadRESTClientConfig(kubeconfig string) (*restclient.Config, error) { loader, ).ClientConfig() } + +// verifyBootstrapClientConfig checks the provided kubeconfig to see if it has a valid +// client certificate. It returns true if the kubeconfig is valid, or an error if bootstrapping +// should stop immediately. +func verifyBootstrapClientConfig(kubeconfigPath string) (bool, error) { + _, err := os.Stat(kubeconfigPath) + if os.IsNotExist(err) { + return false, nil + } + if err != nil { + return false, fmt.Errorf("error reading existing bootstrap kubeconfig %s: %v", kubeconfigPath, err) + } + bootstrapClientConfig, err := loadRESTClientConfig(kubeconfigPath) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Unable to read existing bootstrap client config: %v", err)) + return false, nil + } + transportConfig, err := bootstrapClientConfig.TransportConfig() + if err != nil { + utilruntime.HandleError(fmt.Errorf("Unable to load transport configuration from existing bootstrap client config: %v", err)) + return false, nil + } + // has side effect of populating transport config data fields + if _, err := transport.TLSConfigFor(transportConfig); err != nil { + utilruntime.HandleError(fmt.Errorf("Unable to load TLS configuration from existing bootstrap client config: %v", err)) + return false, nil + } + certs, err := certutil.ParseCertsPEM(transportConfig.TLS.CertData) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Unable to load TLS certificates from existing bootstrap client config: %v", err)) + return false, nil + } + if len(certs) == 0 { + utilruntime.HandleError(fmt.Errorf("Unable to read TLS certificates from existing bootstrap client config: %v", err)) + return false, nil + } + now := time.Now() + for _, cert := range certs { + if now.After(cert.NotAfter) { + utilruntime.HandleError(fmt.Errorf("Part of the existing bootstrap client certificate is expired: %s", cert.NotAfter)) + return false, nil + } + } + return true, nil +} diff --git a/pkg/kubelet/certificate/transport.go b/pkg/kubelet/certificate/transport.go index 49c089b44a9..1683ad09735 100644 --- a/pkg/kubelet/certificate/transport.go +++ b/pkg/kubelet/certificate/transport.go @@ -45,13 +45,13 @@ import ( // // stopCh should be used to indicate when the transport is unused and doesn't need // to continue checking the manager. -func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager) error { - return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager) +func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error { + return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitIfExpired) } // updateTransport is an internal method that exposes how often this method checks that the // client cert has changed. Intended for testing. -func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager) error { +func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error { if clientConfig.Transport != nil { return fmt.Errorf("there is already a transport configured") } @@ -80,6 +80,13 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig lastCert := clientCertificateManager.Current() go wait.Until(func() { curr := clientCertificateManager.Current() + if exitIfExpired && curr != nil && time.Now().After(curr.Leaf.NotAfter) { + if clientCertificateManager.ServerHealthy() { + glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.") + } else { + glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.") + } + } if curr == nil || lastCert == curr { // Cert hasn't been rotated. return diff --git a/pkg/kubelet/certificate/transport_test.go b/pkg/kubelet/certificate/transport_test.go index 306c3c18a03..6e476c3b803 100644 --- a/pkg/kubelet/certificate/transport_test.go +++ b/pkg/kubelet/certificate/transport_test.go @@ -114,13 +114,16 @@ func newCertificateData(certificatePEM string, keyPEM string) *certificateData { } type fakeManager struct { - cert atomic.Value // Always a *tls.Certificate + cert atomic.Value // Always a *tls.Certificate + healthy bool } func (f *fakeManager) SetCertificateSigningRequestClient(certificatesclient.CertificateSigningRequestInterface) error { return nil } +func (f *fakeManager) ServerHealthy() bool { return f.healthy } + func (f *fakeManager) Start() {} func (f *fakeManager) Current() *tls.Certificate { @@ -184,7 +187,7 @@ func TestRotateShutsDownConnections(t *testing.T) { } // Check for a new cert every 10 milliseconds - if err := updateTransport(stop, 10*time.Millisecond, c, m); err != nil { + if err := updateTransport(stop, 10*time.Millisecond, c, m, false); err != nil { t.Fatal(err) } diff --git a/pkg/kubelet/util/BUILD b/pkg/kubelet/util/BUILD index a51fc6daea0..fdb78bddfba 100644 --- a/pkg/kubelet/util/BUILD +++ b/pkg/kubelet/util/BUILD @@ -53,7 +53,6 @@ filegroup( srcs = [ ":package-srcs", "//pkg/kubelet/util/cache:all-srcs", - "//pkg/kubelet/util/csr:all-srcs", "//pkg/kubelet/util/format:all-srcs", "//pkg/kubelet/util/ioutils:all-srcs", "//pkg/kubelet/util/queue:all-srcs", diff --git a/staging/src/k8s.io/client-go/tools/cache/listwatch.go b/staging/src/k8s.io/client-go/tools/cache/listwatch.go index 454d50aadc6..db2329c55a2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/listwatch.go +++ b/staging/src/k8s.io/client-go/tools/cache/listwatch.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/pager" @@ -103,6 +104,8 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) return lw.WatchFunc(options) } +// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout +// if timeout is exceeded without all conditions returning true, or an error if an error occurs. // TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) { if len(conditions) == 0 { @@ -166,5 +169,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch return nil, err } - return watch.Until(timeout, watchInterface, remainingConditions...) + evt, err := watch.Until(timeout, watchInterface, remainingConditions...) + if err == watch.ErrWatchClosed { + // present a consistent error interface to callers + err = wait.ErrWaitTimeout + } + return evt, err } diff --git a/staging/src/k8s.io/client-go/util/certificate/BUILD b/staging/src/k8s.io/client-go/util/certificate/BUILD index 87f88964cd1..f10a2d9e21e 100644 --- a/staging/src/k8s.io/client-go/util/certificate/BUILD +++ b/staging/src/k8s.io/client-go/util/certificate/BUILD @@ -19,7 +19,10 @@ go_test( tags = ["automanaged"], deps = [ "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", @@ -37,12 +40,12 @@ go_library( deps = [ "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", + "//vendor/k8s.io/client-go/util/certificate/csr:go_default_library", ], ) @@ -55,7 +58,10 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//staging/src/k8s.io/client-go/util/certificate/csr:all-srcs", + ], tags = ["automanaged"], visibility = ["//visibility:public"], ) diff --git a/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go b/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go index 08363491b81..e27966f5e1b 100644 --- a/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go +++ b/staging/src/k8s.io/client-go/util/certificate/certificate_manager.go @@ -30,14 +30,18 @@ import ( "github.com/golang/glog" certificates "k8s.io/api/certificates/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" "k8s.io/client-go/util/cert" + "k8s.io/client-go/util/certificate/csr" ) +// certificateWaitBackoff controls the amount and timing of retries when the +// watch for certificate approval is interrupted. +var certificateWaitBackoff = wait.Backoff{Duration: 30 * time.Second, Steps: 4, Factor: 1.5, Jitter: 0.1} + // Manager maintains and updates the certificates in use by this certificate // manager. In the background it communicates with the API server to get new // certificates for certificates about to expire. @@ -51,6 +55,12 @@ type Manager interface { // certificate manager, as well as the associated certificate and key data // in PEM format. Current() *tls.Certificate + // ServerHealthy returns true if the manager is able to communicate with + // the server. This allows a caller to determine whether the cert manager + // thinks it can potentially talk to the API server. The cert manager may + // be very conservative and only return true if recent communication has + // occurred with the server. + ServerHealthy() bool } // Config is the set of configuration parameters available for a new Manager. @@ -134,6 +144,7 @@ type manager struct { rotationDeadline time.Time forceRotation bool certificateExpiration Gauge + serverHealth bool } // NewManager returns a new certificate manager. A certificate manager is @@ -171,6 +182,14 @@ func (m *manager) Current() *tls.Certificate { return m.cert } +// ServerHealthy returns true if the cert manager believes the server +// is currently alive. +func (m *manager) ServerHealthy() bool { + m.certAccessLock.RLock() + defer m.certAccessLock.RUnlock() + return m.serverHealth +} + // SetCertificateSigningRequestClient sets the client interface that is used // for signing new certificates generated as part of rotation. It must be // called before Start() and can not be used to change the @@ -205,9 +224,8 @@ func (m *manager) Start() { // doesn't have a certificate at all yet. if m.shouldRotate() { glog.V(1).Infof("shouldRotate() is true, forcing immediate rotation") - _, err := m.rotateCerts() - if err != nil { - glog.Errorf("Could not rotate certificates: %v", err) + if _, err := m.rotateCerts(); err != nil { + utilruntime.HandleError(fmt.Errorf("Could not rotate certificates: %v", err)) } } backoff := wait.Backoff{ @@ -221,7 +239,7 @@ func (m *manager) Start() { glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval) time.Sleep(sleepInterval) if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil { - glog.Errorf("Reached backoff limit, still unable to rotate certs: %v", err) + utilruntime.HandleError(fmt.Errorf("Reached backoff limit, still unable to rotate certs: %v", err)) wait.PollInfinite(128*time.Second, m.rotateCerts) } }, 0) @@ -275,26 +293,58 @@ func (m *manager) shouldRotate() bool { return time.Now().After(m.rotationDeadline) } +// rotateCerts attempts to request a client cert from the server, wait a reasonable +// period of time for it to be signed, and then update the cert on disk. If it cannot +// retrieve a cert, it will return false. It will only return error in exceptional cases. +// This method also keeps track of "server health" by interpreting the responses it gets +// from the server on the various calls it makes. func (m *manager) rotateCerts() (bool, error) { glog.V(2).Infof("Rotating certificates") - csrPEM, keyPEM, err := m.generateCSR() + csrPEM, keyPEM, privateKey, err := m.generateCSR() if err != nil { - glog.Errorf("Unable to generate a certificate signing request: %v", err) + utilruntime.HandleError(fmt.Errorf("Unable to generate a certificate signing request: %v", err)) return false, nil } // Call the Certificate Signing Request API to get a certificate for the // new private key. - crtPEM, err := requestCertificate(m.certSigningRequestClient, csrPEM, m.usages) + req, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey) if err != nil { - glog.Errorf("Failed while requesting a signed certificate from the master: %v", err) + utilruntime.HandleError(fmt.Errorf("Failed while requesting a signed certificate from the master: %v", err)) + return false, m.updateServerError(err) + } + + // Wait for the certificate to be signed. Instead of one long watch, we retry with slighly longer + // intervals each time in order to tolerate failures from the server AND to preserve the liveliness + // of the cert manager loop. This creates slightly more traffic against the API server in return + // for bounding the amount of time we wait when a certificate expires. + var crtPEM []byte + watchDuration := time.Minute + if err := wait.ExponentialBackoff(certificateWaitBackoff, func() (bool, error) { + data, err := csr.WaitForCertificate(m.certSigningRequestClient, req, watchDuration) + switch { + case err == nil: + crtPEM = data + return true, nil + case err == wait.ErrWaitTimeout: + watchDuration += time.Minute + if watchDuration > 5*time.Minute { + watchDuration = 5 * time.Minute + } + return false, nil + default: + utilruntime.HandleError(fmt.Errorf("Unable to check certificate signing status: %v", err)) + return false, m.updateServerError(err) + } + }); err != nil { + utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err)) return false, nil } cert, err := m.certStore.Update(crtPEM, keyPEM) if err != nil { - glog.Errorf("Unable to store the new cert/key pair: %v", err) + utilruntime.HandleError(fmt.Errorf("Unable to store the new cert/key pair: %v", err)) return false, nil } @@ -337,91 +387,54 @@ var jitteryDuration = func(totalDuration float64) time.Duration { return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3) } +// updateCached sets the most recent retrieved cert. It also sets the server +// as assumed healthy. func (m *manager) updateCached(cert *tls.Certificate) { m.certAccessLock.Lock() defer m.certAccessLock.Unlock() + m.serverHealth = true m.cert = cert } -func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, err error) { +// updateServerError takes an error returned by the server and infers +// the health of the server based on the error. It will return nil if +// the error does not require immediate termination of any wait loops, +// and otherwise it will return the error. +func (m *manager) updateServerError(err error) error { + m.certAccessLock.Lock() + defer m.certAccessLock.Unlock() + switch { + case errors.IsUnauthorized(err): + // SSL terminating proxies may report this error instead of the master + m.serverHealth = true + case errors.IsUnexpectedServerError(err): + // generally indicates a proxy or other load balancer problem, rather than a problem coming + // from the master + m.serverHealth = false + default: + // Identify known errors that could be expected for a cert request that + // indicate everything is working normally + m.serverHealth = errors.IsNotFound(err) || errors.IsForbidden(err) + } + return nil +} + +func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, key interface{}, err error) { // Generate a new private key. privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader) if err != nil { - return nil, nil, fmt.Errorf("unable to generate a new private key: %v", err) + return nil, nil, nil, fmt.Errorf("unable to generate a new private key: %v", err) } der, err := x509.MarshalECPrivateKey(privateKey) if err != nil { - return nil, nil, fmt.Errorf("unable to marshal the new key to DER: %v", err) + return nil, nil, nil, fmt.Errorf("unable to marshal the new key to DER: %v", err) } keyPEM = pem.EncodeToMemory(&pem.Block{Type: cert.ECPrivateKeyBlockType, Bytes: der}) csrPEM, err = cert.MakeCSRFromTemplate(privateKey, m.template) if err != nil { - return nil, nil, fmt.Errorf("unable to create a csr from the private key: %v", err) + return nil, nil, nil, fmt.Errorf("unable to create a csr from the private key: %v", err) } - return csrPEM, keyPEM, nil -} - -// requestCertificate will create a certificate signing request using the PEM -// encoded CSR and send it to API server, then it will watch the object's -// status, once approved by API server, it will return the API server's issued -// certificate (pem-encoded). If there is any errors, or the watch timeouts, it -// will return an error. -// -// NOTE This is a copy of a function with the same name in -// k8s.io/kubernetes/pkg/kubelet/util/csr/csr.go, changing only the package that -// CertificateSigningRequestInterface and KeyUsage are imported from. -func requestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, usages []certificates.KeyUsage) (certData []byte, err error) { - glog.Infof("Requesting new certificate.") - req, err := client.Create(&certificates.CertificateSigningRequest{ - // Username, UID, Groups will be injected by API server. - TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"}, - ObjectMeta: metav1.ObjectMeta{GenerateName: "csr-"}, - - Spec: certificates.CertificateSigningRequestSpec{ - Request: csrData, - Usages: usages, - }, - }) - if err != nil { - return nil, fmt.Errorf("cannot create certificate signing request: %v", err) - } - - // Make a default timeout = 3600s. - var defaultTimeoutSeconds int64 = 3600 - certWatch, err := client.Watch(metav1.ListOptions{ - Watch: true, - TimeoutSeconds: &defaultTimeoutSeconds, - FieldSelector: fields.OneTermEqualSelector("metadata.name", req.Name).String(), - }) - if err != nil { - return nil, fmt.Errorf("cannot watch on the certificate signing request: %v", err) - } - defer certWatch.Stop() - ch := certWatch.ResultChan() - - for { - event, ok := <-ch - if !ok { - break - } - - if event.Type == watch.Modified || event.Type == watch.Added { - if event.Object.(*certificates.CertificateSigningRequest).UID != req.UID { - continue - } - status := event.Object.(*certificates.CertificateSigningRequest).Status - for _, c := range status.Conditions { - if c.Type == certificates.CertificateDenied { - return nil, fmt.Errorf("certificate signing request is not approved, reason: %v, message: %v", c.Reason, c.Message) - } - if c.Type == certificates.CertificateApproved && status.Certificate != nil { - return status.Certificate, nil - } - } - } - } - - return nil, fmt.Errorf("watch channel closed") + return csrPEM, keyPEM, privateKey, nil } diff --git a/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go b/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go index 6a77f99bf52..ab19e37b12d 100644 --- a/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go +++ b/staging/src/k8s.io/client-go/util/certificate/certificate_manager_test.go @@ -27,7 +27,10 @@ import ( "time" certificates "k8s.io/api/certificates/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" watch "k8s.io/apimachinery/pkg/watch" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" ) @@ -288,6 +291,7 @@ func TestRotateCertWaitingForResultError(t *testing.T) { }, } + certificateWaitBackoff = wait.Backoff{Steps: 1} if success, err := m.rotateCerts(); success { t.Errorf("Got success from 'rotateCerts', wanted failure.") } else if err != nil { @@ -612,11 +616,170 @@ func TestInitializeOtherRESTClients(t *testing.T) { } else { m.setRotationDeadline() if m.shouldRotate() { - if success, err := certificateManager.(*manager).rotateCerts(); !success { - t.Errorf("Got failure from 'rotateCerts', expected success") - } else if err != nil { + success, err := certificateManager.(*manager).rotateCerts() + if err != nil { t.Errorf("Got error %v, expected none.", err) + return } + if !success { + t.Errorf("Unexpected response 'rotateCerts': %t", success) + return + } + } + } + + certificate = certificateManager.Current() + if !certificatesEqual(certificate, tc.expectedCertAfterStart.certificate) { + t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertAfterStart.certificate)) + } + }) + } +} + +func TestServerHealth(t *testing.T) { + type certs struct { + storeCert *certificateData + bootstrapCert *certificateData + apiCert *certificateData + expectedCertBeforeStart *certificateData + expectedCertAfterStart *certificateData + } + + updatedCerts := certs{ + storeCert: storeCertData, + bootstrapCert: bootstrapCertData, + apiCert: apiServerCertData, + expectedCertBeforeStart: storeCertData, + expectedCertAfterStart: apiServerCertData, + } + + currentCerts := certs{ + storeCert: storeCertData, + bootstrapCert: bootstrapCertData, + apiCert: apiServerCertData, + expectedCertBeforeStart: storeCertData, + expectedCertAfterStart: storeCertData, + } + + testCases := []struct { + description string + certs + + failureType fakeClientFailureType + clientErr error + + expectRotateFail bool + expectHealthy bool + }{ + { + description: "Current certificate, bootstrap certificate", + certs: updatedCerts, + expectHealthy: true, + }, + { + description: "Generic error on create", + certs: currentCerts, + + failureType: createError, + expectRotateFail: true, + }, + { + description: "Unauthorized error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewUnauthorized("unauthorized"), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Generic unauthorized error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(401, "POST", schema.GroupResource{}, "", "", 0, true), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Generic not found error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, true), + expectRotateFail: true, + expectHealthy: false, + }, + { + description: "Not found error on create", + certs: currentCerts, + + failureType: createError, + clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, false), + expectRotateFail: true, + expectHealthy: true, + }, + { + description: "Conflict error on watch", + certs: currentCerts, + + failureType: watchError, + clientErr: errors.NewGenericServerResponse(409, "POST", schema.GroupResource{}, "", "", 0, false), + expectRotateFail: true, + expectHealthy: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + certificateStore := &fakeStore{ + cert: tc.storeCert.certificate, + } + + certificateManager, err := NewManager(&Config{ + Template: &x509.CertificateRequest{ + Subject: pkix.Name{ + Organization: []string{"system:nodes"}, + CommonName: "system:node:fake-node-name", + }, + }, + Usages: []certificates.KeyUsage{ + certificates.UsageDigitalSignature, + certificates.UsageKeyEncipherment, + certificates.UsageClientAuth, + }, + CertificateStore: certificateStore, + BootstrapCertificatePEM: tc.bootstrapCert.certificatePEM, + BootstrapKeyPEM: tc.bootstrapCert.keyPEM, + CertificateSigningRequestClient: &fakeClient{ + certificatePEM: tc.apiCert.certificatePEM, + failureType: tc.failureType, + err: tc.clientErr, + }, + }) + if err != nil { + t.Errorf("Got %v, wanted no error.", err) + } + + certificate := certificateManager.Current() + if !certificatesEqual(certificate, tc.expectedCertBeforeStart.certificate) { + t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertBeforeStart.certificate)) + } + + if _, ok := certificateManager.(*manager); !ok { + t.Errorf("Expected a '*manager' from 'NewManager'") + } else { + success, err := certificateManager.(*manager).rotateCerts() + if err != nil { + t.Errorf("Got error %v, expected none.", err) + return + } + if !success != tc.expectRotateFail { + t.Errorf("Unexpected response 'rotateCerts': %t", success) + return + } + if actual := certificateManager.(*manager).ServerHealthy(); actual != tc.expectHealthy { + t.Errorf("Unexpected manager server health: %t", actual) } } @@ -641,10 +804,29 @@ type fakeClient struct { certificatesclient.CertificateSigningRequestInterface failureType fakeClientFailureType certificatePEM []byte + err error +} + +func (c fakeClient) List(opts v1.ListOptions) (*certificates.CertificateSigningRequestList, error) { + if c.failureType == watchError { + if c.err != nil { + return nil, c.err + } + return nil, fmt.Errorf("Watch error") + } + csrReply := certificates.CertificateSigningRequestList{ + Items: []certificates.CertificateSigningRequest{ + {ObjectMeta: v1.ObjectMeta{UID: "fake-uid"}}, + }, + } + return &csrReply, nil } func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { if c.failureType == createError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Create error") } csrReply := certificates.CertificateSigningRequest{} @@ -654,6 +836,9 @@ func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificat func (c fakeClient) Watch(opts v1.ListOptions) (watch.Interface, error) { if c.failureType == watchError { + if c.err != nil { + return nil, c.err + } return nil, fmt.Errorf("Watch error") } return &fakeWatch{ diff --git a/pkg/kubelet/util/csr/BUILD b/staging/src/k8s.io/client-go/util/certificate/csr/BUILD similarity index 90% rename from pkg/kubelet/util/csr/BUILD rename to staging/src/k8s.io/client-go/util/certificate/csr/BUILD index 3414983f2cf..c6def5bbf0c 100644 --- a/pkg/kubelet/util/csr/BUILD +++ b/staging/src/k8s.io/client-go/util/certificate/csr/BUILD @@ -9,9 +9,8 @@ load( go_library( name = "go_default_library", srcs = ["csr.go"], - importpath = "k8s.io/kubernetes/pkg/kubelet/util/csr", + importpath = "k8s.io/client-go/util/certificate/csr", deps = [ - "//pkg/apis/certificates/v1beta1:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -19,6 +18,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", @@ -42,7 +42,7 @@ filegroup( go_test( name = "go_default_test", srcs = ["csr_test.go"], - importpath = "k8s.io/kubernetes/pkg/kubelet/util/csr", + importpath = "k8s.io/client-go/util/certificate/csr", library = ":go_default_library", deps = [ "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", diff --git a/pkg/kubelet/util/csr/csr.go b/staging/src/k8s.io/client-go/util/certificate/csr/csr.go similarity index 71% rename from pkg/kubelet/util/csr/csr.go rename to staging/src/k8s.io/client-go/util/certificate/csr/csr.go index 316de1abe5f..22112a5b5b6 100644 --- a/pkg/kubelet/util/csr/csr.go +++ b/staging/src/k8s.io/client-go/util/certificate/csr/csr.go @@ -19,25 +19,26 @@ package csr import ( "crypto" "crypto/sha512" + "crypto/x509" "crypto/x509/pkix" "encoding/base64" + "encoding/pem" "fmt" + "github.com/golang/glog" "reflect" "time" - "github.com/golang/glog" - certificates "k8s.io/api/certificates/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" "k8s.io/client-go/tools/cache" certutil "k8s.io/client-go/util/cert" - certhelper "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" ) // RequestNodeCertificate will create a certificate signing request for a node @@ -68,16 +69,20 @@ func RequestNodeCertificate(client certificatesclient.CertificateSigningRequestI certificates.UsageClientAuth, } name := digestedName(privateKeyData, subject, usages) - return requestCertificate(client, csrData, name, usages, privateKey) + req, err := RequestCertificate(client, csrData, name, usages, privateKey) + if err != nil { + return nil, err + } + return WaitForCertificate(client, req, 3600*time.Second) } -// requestCertificate will either use an existing (if this process has run +// RequestCertificate will either use an existing (if this process has run // before but not to completion) or create a certificate signing request using the // PEM encoded CSR and send it to API server, then it will watch the object's // status, once approved by API server, it will return the API server's issued // certificate (pem-encoded). If there is any errors, or the watch timeouts, it // will return an error. -func requestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, name string, usages []certificates.KeyUsage, privateKey interface{}) (certData []byte, err error) { +func RequestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, name string, usages []certificates.KeyUsage, privateKey interface{}) (req *certificates.CertificateSigningRequest, err error) { csr := &certificates.CertificateSigningRequest{ // Username, UID, Groups will be injected by API server. TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"}, @@ -89,28 +94,35 @@ func requestCertificate(client certificatesclient.CertificateSigningRequestInter Usages: usages, }, } + if len(csr.Name) == 0 { + csr.GenerateName = "csr-" + } - req, err := client.Create(csr) + req, err = client.Create(csr) switch { case err == nil: - case errors.IsAlreadyExists(err): + case errors.IsAlreadyExists(err) && len(name) > 0: glog.Infof("csr for this node already exists, reusing") req, err = client.Get(name, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("cannot retrieve certificate signing request: %v", err) + return nil, formatError("cannot retrieve certificate signing request: %v", err) } if err := ensureCompatible(req, csr, privateKey); err != nil { return nil, fmt.Errorf("retrieved csr is not compatible: %v", err) } glog.Infof("csr for this node is still valid") default: - return nil, fmt.Errorf("cannot create certificate signing request: %v", err) + return nil, formatError("cannot create certificate signing request: %v", err) } + return req, nil +} +// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error. +func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) { fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String() event, err := cache.ListWatchUntil( - 3600*time.Second, + timeout, &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector @@ -124,6 +136,8 @@ func requestCertificate(client certificatesclient.CertificateSigningRequestInter func(event watch.Event) (bool, error) { switch event.Type { case watch.Modified, watch.Added: + case watch.Deleted: + return false, fmt.Errorf("csr %q was deleted", req.Name) default: return false, nil } @@ -142,12 +156,14 @@ func requestCertificate(client certificatesclient.CertificateSigningRequestInter return false, nil }, ) + if err == wait.ErrWaitTimeout { + return nil, wait.ErrWaitTimeout + } if err != nil { - return nil, fmt.Errorf("cannot watch on the certificate signing request: %v", err) + return nil, formatError("cannot watch on the certificate signing request: %v", err) } return event.Object.(*certificates.CertificateSigningRequest).Status.Certificate, nil - } // This digest should include all the relevant pieces of the CSR we care about. @@ -184,11 +200,11 @@ func digestedName(privateKeyData []byte, subject *pkix.Name, usages []certificat // ensureCompatible ensures that a CSR object is compatible with an original CSR func ensureCompatible(new, orig *certificates.CertificateSigningRequest, privateKey interface{}) error { - newCsr, err := certhelper.ParseCSR(new) + newCsr, err := ParseCSR(new) if err != nil { return fmt.Errorf("unable to parse new csr: %v", err) } - origCsr, err := certhelper.ParseCSR(orig) + origCsr, err := ParseCSR(orig) if err != nil { return fmt.Errorf("unable to parse original csr: %v", err) } @@ -203,5 +219,43 @@ func ensureCompatible(new, orig *certificates.CertificateSigningRequest, private if err := newCsr.CheckSignature(); err != nil { return fmt.Errorf("error validating signature new CSR against old key: %v", err) } + if len(new.Status.Certificate) > 0 { + certs, err := certutil.ParseCertsPEM(new.Status.Certificate) + if err != nil { + return fmt.Errorf("error parsing signed certificate for CSR: %v", err) + } + now := time.Now() + for _, cert := range certs { + if now.After(cert.NotAfter) { + return fmt.Errorf("one of the certificates for the CSR has expired: %s", cert.NotAfter) + } + } + } return nil } + +// formatError preserves the type of an API message but alters the message. Expects +// a single argument format string, and returns the wrapped error. +func formatError(format string, err error) error { + if s, ok := err.(errors.APIStatus); ok { + se := &errors.StatusError{ErrStatus: s.Status()} + se.ErrStatus.Message = fmt.Sprintf(format, se.ErrStatus.Message) + return se + } + return fmt.Errorf(format, err) +} + +// ParseCSR extracts the CSR from the API object and decodes it. +func ParseCSR(obj *certificates.CertificateSigningRequest) (*x509.CertificateRequest, error) { + // extract PEM from request object + pemBytes := obj.Spec.Request + block, _ := pem.Decode(pemBytes) + if block == nil || block.Type != "CERTIFICATE REQUEST" { + return nil, fmt.Errorf("PEM block type must be CERTIFICATE REQUEST") + } + csr, err := x509.ParseCertificateRequest(block.Bytes) + if err != nil { + return nil, err + } + return csr, nil +} diff --git a/pkg/kubelet/util/csr/csr_test.go b/staging/src/k8s.io/client-go/util/certificate/csr/csr_test.go similarity index 100% rename from pkg/kubelet/util/csr/csr_test.go rename to staging/src/k8s.io/client-go/util/certificate/csr/csr_test.go