diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 1f0029ee1ae..a0b4dcf112d 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -547,13 +547,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { if err != nil { return err } - - // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable - // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper - // or the bootstrapping credentials to potentially lay down new initial config. - if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute); err != nil { - return err - } + } + // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable + // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper + // or the bootstrapping credentials to potentially lay down new initial config. + _, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) + if err != nil { + return err } kubeClient, err = clientset.NewForConfig(clientConfig) diff --git a/pkg/kubelet/certificate/transport.go b/pkg/kubelet/certificate/transport.go index b8d200efc29..f980cb6eb14 100644 --- a/pkg/kubelet/certificate/transport.go +++ b/pkg/kubelet/certificate/transport.go @@ -38,6 +38,8 @@ import ( // // The config must not already provide an explicit transport. // +// The returned function allows forcefully closing all active connections. +// // The returned transport periodically checks the manager to determine if the // certificate has changed. If it has, the transport shuts down all existing client // connections, forcing the client to re-handshake with the server and use the @@ -51,30 +53,15 @@ import ( // // stopCh should be used to indicate when the transport is unused and doesn't need // to continue checking the manager. -func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) error { +func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) { return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitAfter) } // updateTransport is an internal method that exposes how often this method checks that the // client cert has changed. -func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) error { - if clientConfig.Transport != nil { - return fmt.Errorf("there is already a transport configured") - } - tlsConfig, err := restclient.TLSConfigFor(clientConfig) - if err != nil { - return fmt.Errorf("unable to configure TLS for the rest client: %v", err) - } - if tlsConfig == nil { - tlsConfig = &tls.Config{} - } - tlsConfig.Certificates = nil - tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) { - cert := clientCertificateManager.Current() - if cert == nil { - return &tls.Certificate{Certificate: nil}, nil - } - return cert, nil +func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) { + if clientConfig.Transport != nil || clientConfig.Dial != nil { + return nil, fmt.Errorf("there is already a transport or dialer configured") } // Custom dialer that will track all connections it creates. @@ -83,48 +70,67 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig conns: make(map[*closableConn]struct{}), } - lastCertAvailable := time.Now() - lastCert := clientCertificateManager.Current() - go wait.Until(func() { - curr := clientCertificateManager.Current() + tlsConfig, err := restclient.TLSConfigFor(clientConfig) + if err != nil { + return nil, fmt.Errorf("unable to configure TLS for the rest client: %v", err) + } + if tlsConfig == nil { + tlsConfig = &tls.Config{} + } - if exitAfter > 0 { - now := time.Now() - if curr == nil { - // the certificate has been deleted from disk or is otherwise corrupt - if now.After(lastCertAvailable.Add(exitAfter)) { - if clientCertificateManager.ServerHealthy() { - glog.Fatalf("It has been %s since a valid client cert was found and the server is responsive, exiting.", exitAfter) - } else { - glog.Errorf("It has been %s since a valid client cert was found, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", exitAfter) - } - } - } else { - // the certificate is expired - if now.After(curr.Leaf.NotAfter) { - if clientCertificateManager.ServerHealthy() { - glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.") - } else { - glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.") - } - } - lastCertAvailable = now + if clientCertificateManager != nil { + tlsConfig.Certificates = nil + tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert := clientCertificateManager.Current() + if cert == nil { + return &tls.Certificate{Certificate: nil}, nil } + return cert, nil } - if curr == nil || lastCert == curr { - // Cert hasn't been rotated. - return - } - lastCert = curr + lastCertAvailable := time.Now() + lastCert := clientCertificateManager.Current() + go wait.Until(func() { + curr := clientCertificateManager.Current() - glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials") - // The cert has been rotated. Close all existing connections to force the client - // to reperform its TLS handshake with new cert. - // - // See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493 - t.closeAllConns() - }, period, stopCh) + if exitAfter > 0 { + now := time.Now() + if curr == nil { + // the certificate has been deleted from disk or is otherwise corrupt + if now.After(lastCertAvailable.Add(exitAfter)) { + if clientCertificateManager.ServerHealthy() { + glog.Fatalf("It has been %s since a valid client cert was found and the server is responsive, exiting.", exitAfter) + } else { + glog.Errorf("It has been %s since a valid client cert was found, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", exitAfter) + } + } + } else { + // the certificate is expired + if now.After(curr.Leaf.NotAfter) { + if clientCertificateManager.ServerHealthy() { + glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.") + } else { + glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.") + } + } + lastCertAvailable = now + } + } + + if curr == nil || lastCert == curr { + // Cert hasn't been rotated. + return + } + lastCert = curr + + glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials") + // The cert has been rotated. Close all existing connections to force the client + // to reperform its TLS handshake with new cert. + // + // See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493 + t.closeAllConns() + }, period, stopCh) + } clientConfig.Transport = utilnet.SetTransportDefaults(&http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -142,7 +148,8 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig clientConfig.CAData = nil clientConfig.CAFile = "" clientConfig.Insecure = false - return nil + + return t.closeAllConns, nil } // connTracker is a dialer that tracks all open connections it creates. diff --git a/pkg/kubelet/certificate/transport_test.go b/pkg/kubelet/certificate/transport_test.go index d05b28aa618..ef8ea8c7291 100644 --- a/pkg/kubelet/certificate/transport_test.go +++ b/pkg/kubelet/certificate/transport_test.go @@ -187,7 +187,7 @@ func TestRotateShutsDownConnections(t *testing.T) { } // Check for a new cert every 10 milliseconds - if err := updateTransport(stop, 10*time.Millisecond, c, m, 0); err != nil { + if _, err := updateTransport(stop, 10*time.Millisecond, c, m, 0); err != nil { t.Fatal(err) }