From 7b3a3466a884623e9ce5ff1fbdcb66323b2d518c Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 26 Aug 2019 15:55:35 -0700 Subject: [PATCH] Add stopCh to apiserver & context to kublet commands Remove SetupSignalContext call from the apiserver Signed-off-by: galal-hussein --- cmd/genkubedocs/gen_kube_docs.go | 9 ++++----- cmd/genman/gen_kube_man.go | 9 ++++----- cmd/kube-apiserver/apiserver.go | 3 ++- cmd/kube-apiserver/app/server.go | 7 +++---- cmd/kube-scheduler/app/server.go | 12 +++++++----- cmd/kube-scheduler/scheduler.go | 3 ++- cmd/kubelet/app/server.go | 12 ++++-------- cmd/kubelet/kubelet.go | 4 ++-- test/e2e_node/services/apiserver.go | 8 +++++++- 9 files changed, 35 insertions(+), 32 deletions(-) diff --git a/cmd/genkubedocs/gen_kube_docs.go b/cmd/genkubedocs/gen_kube_docs.go index af674f3bf7d..195b08820f7 100644 --- a/cmd/genkubedocs/gen_kube_docs.go +++ b/cmd/genkubedocs/gen_kube_docs.go @@ -18,13 +18,13 @@ package main import ( "bytes" - "context" "fmt" "io" "os" "github.com/spf13/cobra/doc" "github.com/spf13/pflag" + "k8s.io/apiserver/pkg/server" "k8s.io/kubernetes/cmd/genutils" apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app" cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" @@ -46,7 +46,6 @@ func main() { os.Exit(1) } - ctx := context.Background() outDir, err := genutils.OutDir(path) if err != nil { fmt.Fprintf(os.Stderr, "failed to get output directory: %v\n", err) @@ -56,7 +55,7 @@ func main() { switch module { case "kube-apiserver": // generate docs for kube-apiserver - apiserver := apiservapp.NewAPIServerCommand() + apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler()) doc.GenMarkdownTree(apiserver, outDir) case "kube-controller-manager": // generate docs for kube-controller-manager @@ -68,11 +67,11 @@ func main() { doc.GenMarkdownTree(proxy, outDir) case "kube-scheduler": // generate docs for kube-scheduler - scheduler := schapp.NewSchedulerCommand() + scheduler := schapp.NewSchedulerCommand(server.SetupSignalHandler()) doc.GenMarkdownTree(scheduler, outDir) case "kubelet": // generate docs for kubelet - kubelet := kubeletapp.NewKubeletCommand(ctx) + kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext()) doc.GenMarkdownTree(kubelet, outDir) case "kubeadm": // resets global flags created by kubelet or other commands e.g. diff --git a/cmd/genman/gen_kube_man.go b/cmd/genman/gen_kube_man.go index 47f0fc1d0a0..532588ce2c3 100644 --- a/cmd/genman/gen_kube_man.go +++ b/cmd/genman/gen_kube_man.go @@ -18,7 +18,6 @@ package main import ( "bytes" - "context" "fmt" "io" "os" @@ -27,6 +26,7 @@ import ( mangen "github.com/cpuguy83/go-md2man/v2/md2man" "github.com/spf13/cobra" "github.com/spf13/pflag" + "k8s.io/apiserver/pkg/server" "k8s.io/cli-runtime/pkg/genericiooptions" kubectlcmd "k8s.io/kubectl/pkg/cmd" "k8s.io/kubernetes/cmd/genutils" @@ -59,12 +59,11 @@ func main() { // Set environment variables used by command so the output is consistent, // regardless of where we run. os.Setenv("HOME", "/home/username") - ctx := context.Background() switch module { case "kube-apiserver": // generate manpage for kube-apiserver - apiserver := apiservapp.NewAPIServerCommand() + apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler()) genMarkdown(apiserver, "", outDir) for _, c := range apiserver.Commands() { genMarkdown(c, "kube-apiserver", outDir) @@ -85,14 +84,14 @@ func main() { } case "kube-scheduler": // generate manpage for kube-scheduler - scheduler := schapp.NewSchedulerCommand() + scheduler := schapp.NewSchedulerCommand(server.SetupSignalHandler()) genMarkdown(scheduler, "", outDir) for _, c := range scheduler.Commands() { genMarkdown(c, "kube-scheduler", outDir) } case "kubelet": // generate manpage for kubelet - kubelet := kubeletapp.NewKubeletCommand(ctx) + kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext()) genMarkdown(kubelet, "", outDir) for _, c := range kubelet.Commands() { genMarkdown(c, "kubelet", outDir) diff --git a/cmd/kube-apiserver/apiserver.go b/cmd/kube-apiserver/apiserver.go index 1bf05bc5684..1dace05ff91 100644 --- a/cmd/kube-apiserver/apiserver.go +++ b/cmd/kube-apiserver/apiserver.go @@ -22,6 +22,7 @@ import ( "os" _ "time/tzdata" // for timeZone support in CronJob + "k8s.io/apiserver/pkg/server" "k8s.io/component-base/cli" _ "k8s.io/component-base/logs/json/register" // for JSON log format registration _ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins @@ -30,7 +31,7 @@ import ( ) func main() { - command := app.NewAPIServerCommand() + command := app.NewAPIServerCommand(server.SetupSignalHandler()) code := cli.Run(command) os.Exit(code) } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 71ebb317461..6620dc21a67 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -67,7 +67,7 @@ func init() { } // NewAPIServerCommand creates a *cobra.Command object with default parameters -func NewAPIServerCommand() *cobra.Command { +func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command { s := options.NewServerRunOptions() ctx := genericapiserver.SetupSignalContext() featureGate := s.GenericServerRunOptions.ComponentGlobalsRegistry.FeatureGateFor(basecompatibility.DefaultKubeComponent) @@ -114,7 +114,7 @@ cluster's shared state through which all other components interact.`, featureGate.(featuregate.MutableFeatureGate).AddMetrics() // add component version metrics s.GenericServerRunOptions.ComponentGlobalsRegistry.AddMetrics() - return Run(ctx, completedOptions) + return Run(cmd.Context(), completedOptions, stopCh) }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { @@ -125,7 +125,6 @@ cluster's shared state through which all other components interact.`, return nil }, } - cmd.SetContext(ctx) fs := cmd.Flags() namedFlagSets := s.Flags() @@ -145,7 +144,7 @@ cluster's shared state through which all other components interact.`, } // Run runs the specified APIServer. This should never exit. -func Run(ctx context.Context, opts options.CompletedOptions) error { +func Run(ctx context.Context, opts options.CompletedOptions, stopCh <-chan struct{}) error { // To help debugging, immediately log version klog.Infof("Version: %+v", utilversion.Get()) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 803e0c37861..adf53dca7b9 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -35,7 +35,6 @@ import ( "k8s.io/apiserver/pkg/authorization/authorizer" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" apirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/server" genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/mux" @@ -87,7 +86,11 @@ func init() { type Option func(runtime.Registry) error // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions -func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { +func NewSchedulerCommand(stopCh <-chan struct{}, registryOptions ...Option) *cobra.Command { + // explicitly register (if not already registered) the kube effective version and feature gate in DefaultComponentGlobalsRegistry, + // which will be used in NewOptions. + _, _ = s.De.DefaultComponentGlobalsRegistry.ComponentGlobalsOrRegister( + featuregate.DefaultKubeComponent, utilversion.DefaultBuildEffectiveVersion(), utilfeature.DefaultMutableFeatureGate) opts := options.NewOptions() cmd := &cobra.Command{ @@ -105,7 +108,7 @@ for more information about scheduling and the kube-scheduler component.`, return opts.ComponentGlobalsRegistry.Set() }, RunE: func(cmd *cobra.Command, args []string) error { - return runCommand(cmd, opts, registryOptions...) + return runCommand(cmd, opts, stopCh, registryOptions...) }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { @@ -136,7 +139,7 @@ for more information about scheduling and the kube-scheduler component.`, } // runCommand runs the scheduler. -func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error { +func runCommand(cmd *cobra.Command, opts *options.Options, stopCh <-chan struct{}, registryOptions ...Option) error { verflag.PrintAndExitIfRequested() fg := opts.ComponentGlobalsRegistry.FeatureGateFor(basecompatibility.DefaultKubeComponent) // Activate logging as soon as possible, after that @@ -150,7 +153,6 @@ func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Op ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { - stopCh := server.SetupSignalHandler() <-stopCh cancel() }() diff --git a/cmd/kube-scheduler/scheduler.go b/cmd/kube-scheduler/scheduler.go index 71739808dd2..86900f4e8ea 100644 --- a/cmd/kube-scheduler/scheduler.go +++ b/cmd/kube-scheduler/scheduler.go @@ -19,6 +19,7 @@ package main import ( "os" + "k8s.io/apiserver/pkg/server" "k8s.io/component-base/cli" _ "k8s.io/component-base/logs/json/register" // for JSON log format registration _ "k8s.io/component-base/metrics/prometheus/clientgo" @@ -27,7 +28,7 @@ import ( ) func main() { - command := app.NewSchedulerCommand() + command := app.NewSchedulerCommand(server.SetupSignalHandler()) code := cli.Run(command) os.Exit(code) } diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index f069f6b08b9..72ef6a6026a 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -63,7 +63,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" - genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" @@ -282,7 +281,6 @@ is checked every 20 seconds (also configurable with a flag).`, if err := checkPermissions(ctx); err != nil { logger.Error(err, "Kubelet running with insufficient permissions") } - // make the kubelet's config safe for logging config := kubeletServer.KubeletConfiguration.DeepCopy() for k := range config.StaticPodURLHeader { @@ -291,9 +289,6 @@ is checked every 20 seconds (also configurable with a flag).`, // log the kubelet's config for inspection logger.V(5).Info("KubeletConfiguration", "configuration", klog.Format(config)) - // set up signal context for kubelet shutdown - ctx := genericapiserver.SetupSignalContext() - utilfeature.DefaultMutableFeatureGate.AddMetrics() // run the kubelet return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate) @@ -516,7 +511,8 @@ func UnsecuredDependencies(ctx context.Context, s *options.KubeletServer, featur OSInterface: kubecontainer.RealOS{}, VolumePlugins: plugins, DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner), - TLSOptions: tlsOptions}, nil + TLSOptions: tlsOptions, + }, nil } // Run runs the specified KubeletServer with the given Dependencies. This should never exit. @@ -883,7 +879,6 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend kubeDeps.Recorder, kubeDeps.KubeClient, ) - if err != nil { return err } @@ -1284,7 +1279,8 @@ func createAndInitKubelet( kubeDeps *kubelet.Dependencies, hostname string, nodeName types.NodeName, - nodeIPs []net.IP) (k kubelet.Bootstrap, err error) { + nodeIPs []net.IP, +) (k kubelet.Bootstrap, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 114cbd4aabc..598d9f09d63 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -22,9 +22,9 @@ limitations under the License. package main import ( - "context" "os" + "k8s.io/apiserver/pkg/server" "k8s.io/component-base/cli" _ "k8s.io/component-base/logs/json/register" // for JSON log format registration _ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration @@ -33,7 +33,7 @@ import ( ) func main() { - command := app.NewKubeletCommand(context.Background()) + command := app.NewKubeletCommand(server.SetupSignalContext()) code := cli.Run(command) os.Exit(code) } diff --git a/test/e2e_node/services/apiserver.go b/test/e2e_node/services/apiserver.go index d4fe88f782d..69849692d59 100644 --- a/test/e2e_node/services/apiserver.go +++ b/test/e2e_node/services/apiserver.go @@ -47,6 +47,7 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0 // APIServer is a server which manages apiserver. type APIServer struct { storageConfig storagebackend.Config + stopCh chan struct{} cancel func(error) } @@ -54,6 +55,7 @@ type APIServer struct { func NewAPIServer(storageConfig storagebackend.Config) *APIServer { return &APIServer{ storageConfig: storageConfig, + stopCh: make(chan struct{}), } } @@ -112,7 +114,7 @@ func (a *APIServer) Start(ctx context.Context) error { return } - err = apiserver.Run(ctx, completedOptions) + err = apiserver.Run(ctx, completedOptions, a.stopCh) if err != nil { errCh <- fmt.Errorf("run apiserver error: %w", err) return @@ -132,6 +134,10 @@ func (a *APIServer) Stop() error { if a.cancel != nil { a.cancel(errors.New("stopping API server")) } + if a.stopCh != nil { + close(a.stopCh) + a.stopCh = nil + } return nil }