Merge pull request #92786 from answer1991/feature/enhance-bootstrap-certificate

make Kubelet bootstrap certificate signal aware
This commit is contained in:
Kubernetes Prow Robot 2020-07-06 09:52:52 -07:00 committed by GitHub
commit a26e5881d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 36 additions and 25 deletions

View File

@ -256,12 +256,12 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
// add the kubelet config controller to kubeletDeps // add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController kubeletDeps.KubeletConfigController = kubeletConfigController
// set up stopCh here in order to be reused by kubelet and docker shim // set up signal context here in order to be reused by kubelet and docker shim
stopCh := genericapiserver.SetupSignalHandler() ctx := genericapiserver.SetupSignalContext()
// run the kubelet // run the kubelet
klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration) klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil { if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
klog.Fatal(err) klog.Fatal(err)
} }
}, },
@ -403,7 +403,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer. // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated. // not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error { func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
logOption := logs.NewOptions() logOption := logs.NewOptions()
logOption.LogFormat = s.Logging.Format logOption.LogFormat = s.Logging.Format
logOption.Apply() logOption.Apply()
@ -412,7 +412,7 @@ func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
if err := initForOS(s.KubeletFlags.WindowsService); err != nil { if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err) return fmt.Errorf("failed OS init: %v", err)
} }
if err := run(s, kubeDeps, featureGate, stopCh); err != nil { if err := run(ctx, s, kubeDeps, featureGate); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err) return fmt.Errorf("failed to run Kubelet: %v", err)
} }
return nil return nil
@ -469,7 +469,7 @@ func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName)
} }
} }
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) { func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer // Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates) err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil { if err != nil {
@ -552,7 +552,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
klog.Warningf("standalone mode, no API client") klog.Warningf("standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil: case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName) clientConfig, closeAllConns, err := buildKubeletClientConfig(ctx, s, nodeName)
if err != nil { if err != nil {
return err return err
} }
@ -597,7 +597,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
return err return err
} }
kubeDeps.Auth = auth kubeDeps.Auth = auth
runAuthenticatorCAReload(stopCh) runAuthenticatorCAReload(ctx.Done())
} }
var cgroupRoots []string var cgroupRoots []string
@ -799,7 +799,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
select { select {
case <-done: case <-done:
break break
case <-stopCh: case <-ctx.Done():
break break
} }
@ -808,7 +808,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
// buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
// bootstrapping is enabled or client certificate rotation is enabled. // bootstrapping is enabled or client certificate rotation is enabled.
func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) { func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
if s.RotateCertificates { if s.RotateCertificates {
// Rules for client rotation and the handling of kube config files: // Rules for client rotation and the handling of kube config files:
// //
@ -878,7 +878,7 @@ func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName)
} }
if len(s.BootstrapKubeconfig) > 0 { if len(s.BootstrapKubeconfig) > 0 {
if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil { if err := bootstrap.LoadClientCert(ctx, s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
return nil, nil, err return nil, nil, err
} }
} }

View File

@ -105,7 +105,7 @@ func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig
// The kubeconfig at bootstrapPath is used to request a client certificate from the API server. // The kubeconfig at bootstrapPath is used to request a client certificate from the API server.
// On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath. // On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath.
// The certificate and key file are stored in certDir. // The certificate and key file are stored in certDir.
func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error { func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error {
// Short-circuit if the kubeconfig file exists and is valid. // Short-circuit if the kubeconfig file exists and is valid.
ok, err := isClientConfigStillValid(kubeconfigPath) ok, err := isClientConfigStillValid(kubeconfigPath)
if err != nil { if err != nil {
@ -156,11 +156,11 @@ func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName type
} }
} }
if err := waitForServer(*bootstrapClientConfig, 1*time.Minute); err != nil { if err := waitForServer(ctx, *bootstrapClientConfig, 1*time.Minute); err != nil {
klog.Warningf("Error waiting for apiserver to come up: %v", err) klog.Warningf("Error waiting for apiserver to come up: %v", err)
} }
certData, err := requestNodeCertificate(bootstrapClient, keyData, nodeName) certData, err := requestNodeCertificate(ctx, bootstrapClient, keyData, nodeName)
if err != nil { if err != nil {
return err return err
} }
@ -278,7 +278,7 @@ func verifyKeyData(data []byte) bool {
return err == nil return err == nil
} }
func waitForServer(cfg restclient.Config, deadline time.Duration) error { func waitForServer(ctx context.Context, cfg restclient.Config, deadline time.Duration) error {
cfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion() cfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
cfg.Timeout = 1 * time.Second cfg.Timeout = 1 * time.Second
cli, err := restclient.UnversionedRESTClientFor(&cfg) cli, err := restclient.UnversionedRESTClientFor(&cfg)
@ -286,12 +286,12 @@ func waitForServer(cfg restclient.Config, deadline time.Duration) error {
return fmt.Errorf("couldn't create client: %v", err) return fmt.Errorf("couldn't create client: %v", err)
} }
ctx, cancel := context.WithTimeout(context.TODO(), deadline) ctx, cancel := context.WithTimeout(ctx, deadline)
defer cancel() defer cancel()
var connected bool var connected bool
wait.JitterUntil(func() { wait.JitterUntil(func() {
if _, err := cli.Get().AbsPath("/healthz").Do(context.TODO()).Raw(); err != nil { if _, err := cli.Get().AbsPath("/healthz").Do(ctx).Raw(); err != nil {
klog.Infof("Failed to connect to apiserver: %v", err) klog.Infof("Failed to connect to apiserver: %v", err)
return return
} }
@ -312,7 +312,7 @@ func waitForServer(cfg restclient.Config, deadline time.Duration) error {
// certificate (pem-encoded). If there is any errors, or the watch timeouts, it // certificate (pem-encoded). If there is any errors, or the watch timeouts, it
// will return an error. This is intended for use on nodes (kubelet and // will return an error. This is intended for use on nodes (kubelet and
// kubeadm). // kubeadm).
func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) { func requestNodeCertificate(ctx context.Context, client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) {
subject := &pkix.Name{ subject := &pkix.Name{
Organization: []string{"system:nodes"}, Organization: []string{"system:nodes"},
CommonName: "system:node:" + string(nodeName), CommonName: "system:node:" + string(nodeName),
@ -349,7 +349,7 @@ func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, n
return nil, err return nil, err
} }
ctx, cancel := context.WithTimeout(context.Background(), 3600*time.Second) ctx, cancel := context.WithTimeout(ctx, 3600*time.Second)
defer cancel() defer cancel()
klog.V(2).Infof("Waiting for client certificate to be issued") klog.V(2).Infof("Waiting for client certificate to be issued")

View File

@ -17,6 +17,7 @@ limitations under the License.
package bootstrap package bootstrap
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -95,7 +96,7 @@ users:
} }
func TestRequestNodeCertificateNoKeyData(t *testing.T) { func TestRequestNodeCertificateNoKeyData(t *testing.T) {
certData, err := requestNodeCertificate(newClientset(fakeClient{}), []byte{}, "fake-node-name") certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), []byte{}, "fake-node-name")
if err == nil { if err == nil {
t.Errorf("Got no error, wanted error an error because there was an empty private key passed in.") t.Errorf("Got no error, wanted error an error because there was an empty private key passed in.")
} }
@ -113,7 +114,7 @@ func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) {
t.Fatalf("Unable to generate a new private key: %v", err) t.Fatalf("Unable to generate a new private key: %v", err)
} }
certData, err := requestNodeCertificate(client, privateKeyData, "fake-node-name") certData, err := requestNodeCertificate(context.TODO(), client, privateKeyData, "fake-node-name")
if err == nil { if err == nil {
t.Errorf("Got no error, wanted error an error because client.Create failed.") t.Errorf("Got no error, wanted error an error because client.Create failed.")
} }
@ -128,7 +129,7 @@ func TestRequestNodeCertificate(t *testing.T) {
t.Fatalf("Unable to generate a new private key: %v", err) t.Fatalf("Unable to generate a new private key: %v", err)
} }
certData, err := requestNodeCertificate(newClientset(fakeClient{}), privateKeyData, "fake-node-name") certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), privateKeyData, "fake-node-name")
if err != nil { if err != nil {
t.Errorf("Got %v, wanted no error.", err) t.Errorf("Got %v, wanted no error.", err)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package server package server
import ( import (
"context"
"os" "os"
"os/signal" "os/signal"
) )
@ -27,21 +28,30 @@ var shutdownHandler chan os.Signal
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned // SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program // which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1. // is terminated with exit code 1.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalHandler() <-chan struct{} { func SetupSignalHandler() <-chan struct{} {
return SetupSignalContext().Done()
}
// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalContext() context.Context {
close(onlyOneSignalHandler) // panics when called twice close(onlyOneSignalHandler) // panics when called twice
shutdownHandler = make(chan os.Signal, 2) shutdownHandler = make(chan os.Signal, 2)
stop := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, shutdownSignals...) signal.Notify(shutdownHandler, shutdownSignals...)
go func() { go func() {
<-shutdownHandler <-shutdownHandler
close(stop) cancel()
<-shutdownHandler <-shutdownHandler
os.Exit(1) // second signal. Exit directly. os.Exit(1) // second signal. Exit directly.
}() }()
return stop return ctx
} }
// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT) // RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)