diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 0c2dbc11074..2152c82694c 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -256,12 +256,12 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API // add the kubelet config controller to kubeletDeps kubeletDeps.KubeletConfigController = kubeletConfigController - // set up stopCh here in order to be reused by kubelet and docker shim - stopCh := genericapiserver.SetupSignalHandler() + // set up signal context here in order to be reused by kubelet and docker shim + ctx := genericapiserver.SetupSignalContext() // run the kubelet klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration) - if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil { + if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil { klog.Fatal(err) } }, @@ -403,7 +403,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer. // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will // not be generated. -func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error { +func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error { logOption := logs.NewOptions() logOption.LogFormat = s.Logging.Format logOption.Apply() @@ -412,7 +412,7 @@ func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f if err := initForOS(s.KubeletFlags.WindowsService); err != nil { return fmt.Errorf("failed OS init: %v", err) } - if err := run(s, kubeDeps, featureGate, stopCh); err != nil { + if err := run(ctx, s, kubeDeps, featureGate); err != nil { return fmt.Errorf("failed to run Kubelet: %v", err) } return nil @@ -469,7 +469,7 @@ func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) } } -func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) { +func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) { // Set global feature gates based on the value on the initial KubeletServer err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates) if err != nil { @@ -552,7 +552,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f klog.Warningf("standalone mode, no API client") case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil: - clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName) + clientConfig, closeAllConns, err := buildKubeletClientConfig(ctx, s, nodeName) if err != nil { return err } @@ -597,7 +597,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f return err } kubeDeps.Auth = auth - runAuthenticatorCAReload(stopCh) + runAuthenticatorCAReload(ctx.Done()) } var cgroupRoots []string @@ -799,7 +799,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f select { case <-done: break - case <-stopCh: + case <-ctx.Done(): break } @@ -808,7 +808,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether // bootstrapping is enabled or client certificate rotation is enabled. -func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) { +func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) { if s.RotateCertificates { // Rules for client rotation and the handling of kube config files: // @@ -878,7 +878,7 @@ func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) } if len(s.BootstrapKubeconfig) > 0 { - if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil { + if err := bootstrap.LoadClientCert(ctx, s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil { return nil, nil, err } } diff --git a/pkg/kubelet/certificate/bootstrap/bootstrap.go b/pkg/kubelet/certificate/bootstrap/bootstrap.go index 02bf6056355..f6b129dc869 100644 --- a/pkg/kubelet/certificate/bootstrap/bootstrap.go +++ b/pkg/kubelet/certificate/bootstrap/bootstrap.go @@ -105,7 +105,7 @@ func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig // The kubeconfig at bootstrapPath is used to request a client certificate from the API server. // On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath. // The certificate and key file are stored in certDir. -func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error { +func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error { // Short-circuit if the kubeconfig file exists and is valid. ok, err := isClientConfigStillValid(kubeconfigPath) if err != nil { @@ -156,11 +156,11 @@ func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName type } } - if err := waitForServer(*bootstrapClientConfig, 1*time.Minute); err != nil { + if err := waitForServer(ctx, *bootstrapClientConfig, 1*time.Minute); err != nil { klog.Warningf("Error waiting for apiserver to come up: %v", err) } - certData, err := requestNodeCertificate(bootstrapClient, keyData, nodeName) + certData, err := requestNodeCertificate(ctx, bootstrapClient, keyData, nodeName) if err != nil { return err } @@ -278,7 +278,7 @@ func verifyKeyData(data []byte) bool { return err == nil } -func waitForServer(cfg restclient.Config, deadline time.Duration) error { +func waitForServer(ctx context.Context, cfg restclient.Config, deadline time.Duration) error { cfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion() cfg.Timeout = 1 * time.Second cli, err := restclient.UnversionedRESTClientFor(&cfg) @@ -286,12 +286,12 @@ func waitForServer(cfg restclient.Config, deadline time.Duration) error { return fmt.Errorf("couldn't create client: %v", err) } - ctx, cancel := context.WithTimeout(context.TODO(), deadline) + ctx, cancel := context.WithTimeout(ctx, deadline) defer cancel() var connected bool wait.JitterUntil(func() { - if _, err := cli.Get().AbsPath("/healthz").Do(context.TODO()).Raw(); err != nil { + if _, err := cli.Get().AbsPath("/healthz").Do(ctx).Raw(); err != nil { klog.Infof("Failed to connect to apiserver: %v", err) return } @@ -312,7 +312,7 @@ func waitForServer(cfg restclient.Config, deadline time.Duration) error { // certificate (pem-encoded). If there is any errors, or the watch timeouts, it // will return an error. This is intended for use on nodes (kubelet and // kubeadm). -func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) { +func requestNodeCertificate(ctx context.Context, client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) { subject := &pkix.Name{ Organization: []string{"system:nodes"}, CommonName: "system:node:" + string(nodeName), @@ -349,7 +349,7 @@ func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, n return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), 3600*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3600*time.Second) defer cancel() klog.V(2).Infof("Waiting for client certificate to be issued") diff --git a/pkg/kubelet/certificate/bootstrap/bootstrap_test.go b/pkg/kubelet/certificate/bootstrap/bootstrap_test.go index aaac51fbd4c..afd2cf7851f 100644 --- a/pkg/kubelet/certificate/bootstrap/bootstrap_test.go +++ b/pkg/kubelet/certificate/bootstrap/bootstrap_test.go @@ -17,6 +17,7 @@ limitations under the License. package bootstrap import ( + "context" "fmt" "io/ioutil" "os" @@ -95,7 +96,7 @@ users: } func TestRequestNodeCertificateNoKeyData(t *testing.T) { - certData, err := requestNodeCertificate(newClientset(fakeClient{}), []byte{}, "fake-node-name") + certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), []byte{}, "fake-node-name") if err == nil { t.Errorf("Got no error, wanted error an error because there was an empty private key passed in.") } @@ -113,7 +114,7 @@ func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) { t.Fatalf("Unable to generate a new private key: %v", err) } - certData, err := requestNodeCertificate(client, privateKeyData, "fake-node-name") + certData, err := requestNodeCertificate(context.TODO(), client, privateKeyData, "fake-node-name") if err == nil { t.Errorf("Got no error, wanted error an error because client.Create failed.") } @@ -128,7 +129,7 @@ func TestRequestNodeCertificate(t *testing.T) { t.Fatalf("Unable to generate a new private key: %v", err) } - certData, err := requestNodeCertificate(newClientset(fakeClient{}), privateKeyData, "fake-node-name") + certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), privateKeyData, "fake-node-name") if err != nil { t.Errorf("Got %v, wanted no error.", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/signal.go b/staging/src/k8s.io/apiserver/pkg/server/signal.go index 0ea19d6606d..e5334ae4c15 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/signal.go +++ b/staging/src/k8s.io/apiserver/pkg/server/signal.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "os" "os/signal" ) @@ -27,21 +28,30 @@ var shutdownHandler chan os.Signal // SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned // which is closed on one of these signals. If a second signal is caught, the program // is terminated with exit code 1. +// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can +// be called once. func SetupSignalHandler() <-chan struct{} { + return SetupSignalContext().Done() +} + +// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned. +// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can +// be called once. +func SetupSignalContext() context.Context { close(onlyOneSignalHandler) // panics when called twice shutdownHandler = make(chan os.Signal, 2) - stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) signal.Notify(shutdownHandler, shutdownSignals...) go func() { <-shutdownHandler - close(stop) + cancel() <-shutdownHandler os.Exit(1) // second signal. Exit directly. }() - return stop + return ctx } // RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)