Add stopCh to apiserver & context to kublet commands

Remove SetupSignalContext call from the apiserver

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
Darren Shepherd 2019-08-26 15:55:35 -07:00 committed by Rafael Breno
parent 5c07b21505
commit 0b17f11227
7 changed files with 21 additions and 17 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/spf13/cobra/doc" "github.com/spf13/cobra/doc"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
"k8s.io/kubernetes/cmd/genutils" "k8s.io/kubernetes/cmd/genutils"
apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app" apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
@ -54,7 +55,7 @@ func main() {
switch module { switch module {
case "kube-apiserver": case "kube-apiserver":
// generate docs for kube-apiserver // generate docs for kube-apiserver
apiserver := apiservapp.NewAPIServerCommand() apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler())
doc.GenMarkdownTree(apiserver, outDir) doc.GenMarkdownTree(apiserver, outDir)
case "kube-controller-manager": case "kube-controller-manager":
// generate docs for kube-controller-manager // generate docs for kube-controller-manager
@ -70,7 +71,7 @@ func main() {
doc.GenMarkdownTree(scheduler, outDir) doc.GenMarkdownTree(scheduler, outDir)
case "kubelet": case "kubelet":
// generate docs for kubelet // generate docs for kubelet
kubelet := kubeletapp.NewKubeletCommand() kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext())
doc.GenMarkdownTree(kubelet, outDir) doc.GenMarkdownTree(kubelet, outDir)
case "kubeadm": case "kubeadm":
// resets global flags created by kubelet or other commands e.g. // resets global flags created by kubelet or other commands e.g.

View File

@ -26,6 +26,7 @@ import (
mangen "github.com/cpuguy83/go-md2man/v2/md2man" mangen "github.com/cpuguy83/go-md2man/v2/md2man"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
"k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/cli-runtime/pkg/genericiooptions"
kubectlcmd "k8s.io/kubectl/pkg/cmd" kubectlcmd "k8s.io/kubectl/pkg/cmd"
"k8s.io/kubernetes/cmd/genutils" "k8s.io/kubernetes/cmd/genutils"
@ -62,7 +63,7 @@ func main() {
switch module { switch module {
case "kube-apiserver": case "kube-apiserver":
// generate manpage for kube-apiserver // generate manpage for kube-apiserver
apiserver := apiservapp.NewAPIServerCommand() apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler())
genMarkdown(apiserver, "", outDir) genMarkdown(apiserver, "", outDir)
for _, c := range apiserver.Commands() { for _, c := range apiserver.Commands() {
genMarkdown(c, "kube-apiserver", outDir) genMarkdown(c, "kube-apiserver", outDir)
@ -90,7 +91,7 @@ func main() {
} }
case "kubelet": case "kubelet":
// generate manpage for kubelet // generate manpage for kubelet
kubelet := kubeletapp.NewKubeletCommand() kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext())
genMarkdown(kubelet, "", outDir) genMarkdown(kubelet, "", outDir)
for _, c := range kubelet.Commands() { for _, c := range kubelet.Commands() {
genMarkdown(c, "kubelet", outDir) genMarkdown(c, "kubelet", outDir)

View File

@ -22,6 +22,7 @@ import (
"os" "os"
_ "time/tzdata" // for timeZone support in CronJob _ "time/tzdata" // for timeZone support in CronJob
"k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli" "k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration _ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins _ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
@ -30,7 +31,7 @@ import (
) )
func main() { func main() {
command := app.NewAPIServerCommand() command := app.NewAPIServerCommand(server.SetupSignalHandler())
code := cli.Run(command) code := cli.Run(command)
os.Exit(code) os.Exit(code)
} }

View File

@ -63,7 +63,7 @@ func init() {
} }
// NewAPIServerCommand creates a *cobra.Command object with default parameters // NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command { func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command {
_, featureGate := utilversion.DefaultComponentGlobalsRegistry.ComponentGlobalsOrRegister( _, featureGate := utilversion.DefaultComponentGlobalsRegistry.ComponentGlobalsOrRegister(
utilversion.DefaultKubeComponent, utilversion.DefaultBuildEffectiveVersion(), utilfeature.DefaultMutableFeatureGate) utilversion.DefaultKubeComponent, utilversion.DefaultBuildEffectiveVersion(), utilfeature.DefaultMutableFeatureGate)
s := options.NewServerRunOptions() s := options.NewServerRunOptions()
@ -108,7 +108,7 @@ cluster's shared state through which all other components interact.`,
} }
// add feature enablement metrics // add feature enablement metrics
featureGate.AddMetrics() featureGate.AddMetrics()
return Run(cmd.Context(), completedOptions) return Run(cmd.Context(), completedOptions, stopCh)
}, },
Args: func(cmd *cobra.Command, args []string) error { Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args { for _, arg := range args {
@ -119,7 +119,6 @@ cluster's shared state through which all other components interact.`,
return nil return nil
}, },
} }
cmd.SetContext(genericapiserver.SetupSignalContext())
fs := cmd.Flags() fs := cmd.Flags()
namedFlagSets := s.Flags() namedFlagSets := s.Flags()
@ -137,7 +136,7 @@ cluster's shared state through which all other components interact.`,
} }
// Run runs the specified APIServer. This should never exit. // Run runs the specified APIServer. This should never exit.
func Run(ctx context.Context, opts options.CompletedOptions) error { func Run(ctx context.Context, opts options.CompletedOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version // To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get()) klog.Infof("Version: %+v", version.Get())

View File

@ -60,7 +60,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -134,7 +133,7 @@ const (
) )
// NewKubeletCommand creates a *cobra.Command object with default parameters // NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command { func NewKubeletCommand(ctx context.Context) *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError) cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags() kubeletFlags := options.NewKubeletFlags()
@ -271,7 +270,6 @@ is checked every 20 seconds (also configurable with a flag).`,
if err := checkPermissions(); err != nil { if err := checkPermissions(); err != nil {
klog.ErrorS(err, "kubelet running with insufficient permissions") klog.ErrorS(err, "kubelet running with insufficient permissions")
} }
// make the kubelet's config safe for logging // make the kubelet's config safe for logging
config := kubeletServer.KubeletConfiguration.DeepCopy() config := kubeletServer.KubeletConfiguration.DeepCopy()
for k := range config.StaticPodURLHeader { for k := range config.StaticPodURLHeader {
@ -280,9 +278,6 @@ is checked every 20 seconds (also configurable with a flag).`,
// log the kubelet's config for inspection // log the kubelet's config for inspection
klog.V(5).InfoS("KubeletConfiguration", "configuration", klog.Format(config)) klog.V(5).InfoS("KubeletConfiguration", "configuration", klog.Format(config))
// set up signal context for kubelet shutdown
ctx := genericapiserver.SetupSignalContext()
utilfeature.DefaultMutableFeatureGate.AddMetrics() utilfeature.DefaultMutableFeatureGate.AddMetrics()
// run the kubelet // run the kubelet
return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate) return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)

View File

@ -24,6 +24,7 @@ package main
import ( import (
"os" "os"
"k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli" "k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration _ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration _ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration
@ -32,7 +33,7 @@ import (
) )
func main() { func main() {
command := app.NewKubeletCommand() command := app.NewKubeletCommand(server.SetupSignalContext())
code := cli.Run(command) code := cli.Run(command)
os.Exit(code) os.Exit(code)
} }

View File

@ -47,6 +47,7 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
// APIServer is a server which manages apiserver. // APIServer is a server which manages apiserver.
type APIServer struct { type APIServer struct {
storageConfig storagebackend.Config storageConfig storagebackend.Config
stopCh chan struct{}
cancel func(error) cancel func(error)
} }
@ -54,6 +55,7 @@ type APIServer struct {
func NewAPIServer(storageConfig storagebackend.Config) *APIServer { func NewAPIServer(storageConfig storagebackend.Config) *APIServer {
return &APIServer{ return &APIServer{
storageConfig: storageConfig, storageConfig: storageConfig,
stopCh: make(chan struct{}),
} }
} }
@ -112,7 +114,7 @@ func (a *APIServer) Start(ctx context.Context) error {
return return
} }
err = apiserver.Run(ctx, completedOptions) err = apiserver.Run(ctx, completedOptions, a.stopCh)
if err != nil { if err != nil {
errCh <- fmt.Errorf("run apiserver error: %w", err) errCh <- fmt.Errorf("run apiserver error: %w", err)
return return
@ -132,6 +134,10 @@ func (a *APIServer) Stop() error {
if a.cancel != nil { if a.cancel != nil {
a.cancel(errors.New("stopping API server")) a.cancel(errors.New("stopping API server"))
} }
if a.stopCh != nil {
close(a.stopCh)
a.stopCh = nil
}
return nil return nil
} }