From 52876f77e9bb41040768ff80ec881749bfadc3f4 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Mon, 7 May 2018 12:16:05 -0400 Subject: [PATCH 1/2] Always track kubelet -> API connections --- cmd/kubelet/app/server.go | 14 +-- pkg/kubelet/certificate/transport.go | 121 ++++++++++++---------- pkg/kubelet/certificate/transport_test.go | 2 +- 3 files changed, 72 insertions(+), 65 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 1f0029ee1ae..a0b4dcf112d 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -547,13 +547,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { if err != nil { return err } - - // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable - // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper - // or the bootstrapping credentials to potentially lay down new initial config. - if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute); err != nil { - return err - } + } + // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable + // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper + // or the bootstrapping credentials to potentially lay down new initial config. + _, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) + if err != nil { + return err } kubeClient, err = clientset.NewForConfig(clientConfig) diff --git a/pkg/kubelet/certificate/transport.go b/pkg/kubelet/certificate/transport.go index b8d200efc29..f980cb6eb14 100644 --- a/pkg/kubelet/certificate/transport.go +++ b/pkg/kubelet/certificate/transport.go @@ -38,6 +38,8 @@ import ( // // The config must not already provide an explicit transport. // +// The returned function allows forcefully closing all active connections. +// // The returned transport periodically checks the manager to determine if the // certificate has changed. If it has, the transport shuts down all existing client // connections, forcing the client to re-handshake with the server and use the @@ -51,30 +53,15 @@ import ( // // stopCh should be used to indicate when the transport is unused and doesn't need // to continue checking the manager. -func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) error { +func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) { return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitAfter) } // updateTransport is an internal method that exposes how often this method checks that the // client cert has changed. -func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) error { - if clientConfig.Transport != nil { - return fmt.Errorf("there is already a transport configured") - } - tlsConfig, err := restclient.TLSConfigFor(clientConfig) - if err != nil { - return fmt.Errorf("unable to configure TLS for the rest client: %v", err) - } - if tlsConfig == nil { - tlsConfig = &tls.Config{} - } - tlsConfig.Certificates = nil - tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) { - cert := clientCertificateManager.Current() - if cert == nil { - return &tls.Certificate{Certificate: nil}, nil - } - return cert, nil +func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) { + if clientConfig.Transport != nil || clientConfig.Dial != nil { + return nil, fmt.Errorf("there is already a transport or dialer configured") } // Custom dialer that will track all connections it creates. @@ -83,48 +70,67 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig conns: make(map[*closableConn]struct{}), } - lastCertAvailable := time.Now() - lastCert := clientCertificateManager.Current() - go wait.Until(func() { - curr := clientCertificateManager.Current() + tlsConfig, err := restclient.TLSConfigFor(clientConfig) + if err != nil { + return nil, fmt.Errorf("unable to configure TLS for the rest client: %v", err) + } + if tlsConfig == nil { + tlsConfig = &tls.Config{} + } - if exitAfter > 0 { - now := time.Now() - if curr == nil { - // the certificate has been deleted from disk or is otherwise corrupt - if now.After(lastCertAvailable.Add(exitAfter)) { - if clientCertificateManager.ServerHealthy() { - glog.Fatalf("It has been %s since a valid client cert was found and the server is responsive, exiting.", exitAfter) - } else { - glog.Errorf("It has been %s since a valid client cert was found, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", exitAfter) - } - } - } else { - // the certificate is expired - if now.After(curr.Leaf.NotAfter) { - if clientCertificateManager.ServerHealthy() { - glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.") - } else { - glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.") - } - } - lastCertAvailable = now + if clientCertificateManager != nil { + tlsConfig.Certificates = nil + tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert := clientCertificateManager.Current() + if cert == nil { + return &tls.Certificate{Certificate: nil}, nil } + return cert, nil } - if curr == nil || lastCert == curr { - // Cert hasn't been rotated. - return - } - lastCert = curr + lastCertAvailable := time.Now() + lastCert := clientCertificateManager.Current() + go wait.Until(func() { + curr := clientCertificateManager.Current() - glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials") - // The cert has been rotated. Close all existing connections to force the client - // to reperform its TLS handshake with new cert. - // - // See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493 - t.closeAllConns() - }, period, stopCh) + if exitAfter > 0 { + now := time.Now() + if curr == nil { + // the certificate has been deleted from disk or is otherwise corrupt + if now.After(lastCertAvailable.Add(exitAfter)) { + if clientCertificateManager.ServerHealthy() { + glog.Fatalf("It has been %s since a valid client cert was found and the server is responsive, exiting.", exitAfter) + } else { + glog.Errorf("It has been %s since a valid client cert was found, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", exitAfter) + } + } + } else { + // the certificate is expired + if now.After(curr.Leaf.NotAfter) { + if clientCertificateManager.ServerHealthy() { + glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.") + } else { + glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.") + } + } + lastCertAvailable = now + } + } + + if curr == nil || lastCert == curr { + // Cert hasn't been rotated. + return + } + lastCert = curr + + glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials") + // The cert has been rotated. Close all existing connections to force the client + // to reperform its TLS handshake with new cert. + // + // See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493 + t.closeAllConns() + }, period, stopCh) + } clientConfig.Transport = utilnet.SetTransportDefaults(&http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -142,7 +148,8 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig clientConfig.CAData = nil clientConfig.CAFile = "" clientConfig.Insecure = false - return nil + + return t.closeAllConns, nil } // connTracker is a dialer that tracks all open connections it creates. diff --git a/pkg/kubelet/certificate/transport_test.go b/pkg/kubelet/certificate/transport_test.go index d05b28aa618..ef8ea8c7291 100644 --- a/pkg/kubelet/certificate/transport_test.go +++ b/pkg/kubelet/certificate/transport_test.go @@ -187,7 +187,7 @@ func TestRotateShutsDownConnections(t *testing.T) { } // Check for a new cert every 10 milliseconds - if err := updateTransport(stop, 10*time.Millisecond, c, m, 0); err != nil { + if _, err := updateTransport(stop, 10*time.Millisecond, c, m, 0); err != nil { t.Fatal(err) } From 814b065928f86bb27f5e8ad973e4c0cfe8343d4a Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Mon, 7 May 2018 12:16:46 -0400 Subject: [PATCH 2/2] Close all kubelet->API connections on heartbeat failure --- cmd/kubelet/app/server.go | 3 ++- pkg/kubelet/kubelet.go | 5 +++++ pkg/kubelet/kubelet_node_status.go | 3 +++ pkg/kubelet/kubelet_node_status_test.go | 8 ++++++++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index a0b4dcf112d..9e76391246d 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -551,7 +551,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper // or the bootstrapping credentials to potentially lay down new initial config. - _, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) + closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) if err != nil { return err } @@ -591,6 +591,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { kubeDeps.ExternalKubeClient = externalKubeClient if heartbeatClient != nil { kubeDeps.HeartbeatClient = heartbeatClient + kubeDeps.OnHeartbeatFailure = closeAllConns } if eventClient != nil { kubeDeps.EventClient = eventClient diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 05a5064c603..5e94bf539a2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -233,6 +233,7 @@ type Dependencies struct { DockerClientConfig *dockershim.ClientConfig EventClient v1core.EventsGetter HeartbeatClient v1core.CoreV1Interface + OnHeartbeatFailure func() KubeClient clientset.Interface ExternalKubeClient clientset.Interface Mounter mount.Interface @@ -488,6 +489,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nodeName: nodeName, kubeClient: kubeDeps.KubeClient, heartbeatClient: kubeDeps.HeartbeatClient, + onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure, rootDirectory: rootDirectory, resyncInterval: kubeCfg.SyncFrequency.Duration, sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), @@ -873,6 +875,9 @@ type Kubelet struct { iptClient utilipt.Interface rootDirectory string + // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional. + onRepeatedHeartbeatFailure func() + // podWorkers handle syncing Pods in response to events. podWorkers PodWorkers diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 3d842b2f72a..0e06424c4d7 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -350,6 +350,9 @@ func (kl *Kubelet) updateNodeStatus() error { glog.V(5).Infof("Updating node status") for i := 0; i < nodeStatusUpdateRetry; i++ { if err := kl.tryUpdateNodeStatus(i); err != nil { + if i > 0 && kl.onRepeatedHeartbeatFailure != nil { + kl.onRepeatedHeartbeatFailure() + } glog.Errorf("Error updating node status, will retry: %v", err) } else { return nil diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 10c3f7f8565..126bb47a7c3 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -596,6 +596,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { func TestUpdateExistingNodeStatusTimeout(t *testing.T) { attempts := int64(0) + failureCallbacks := int64(0) // set up a listener that hangs connections ln, err := net.Listen("tcp", "127.0.0.1:0") @@ -626,6 +627,9 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) { kubelet := testKubelet.kubelet kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.heartbeatClient, err = v1core.NewForConfig(config) + kubelet.onRepeatedHeartbeatFailure = func() { + atomic.AddInt64(&failureCallbacks, 1) + } kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ @@ -645,6 +649,10 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) { if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry { t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts) } + // should have gotten multiple failure callbacks + if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) { + t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks) + } } func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {